|
|
|
@@ -3,16 +3,15 @@ package com.seibel.lod.core.file.fullDatafile;
|
|
|
|
|
import java.io.*;
|
|
|
|
|
import java.lang.ref.*;
|
|
|
|
|
import java.nio.channels.ClosedByInterruptException;
|
|
|
|
|
import java.nio.file.FileAlreadyExistsException;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
|
|
|
|
import com.seibel.lod.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
|
|
|
|
|
import com.seibel.lod.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
|
|
|
|
|
import com.seibel.lod.core.dataObjects.fullData.loader.AbstractFullDataSourceLoader;
|
|
|
|
|
import com.seibel.lod.core.dependencyInjection.SingletonInjector;
|
|
|
|
|
import com.seibel.lod.core.file.metaData.BaseMetaData;
|
|
|
|
|
import com.seibel.lod.core.pos.DhLodPos;
|
|
|
|
|
import com.seibel.lod.core.file.metaData.AbstractMetaDataContainerFile;
|
|
|
|
@@ -21,43 +20,53 @@ import com.seibel.lod.core.pos.DhSectionPos;
|
|
|
|
|
import com.seibel.lod.core.logging.DhLoggerBuilder;
|
|
|
|
|
import com.seibel.lod.core.util.AtomicsUtil;
|
|
|
|
|
import com.seibel.lod.core.util.LodUtil;
|
|
|
|
|
import com.seibel.lod.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
|
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Related to the stored Blockstate/Biome ID data.
|
|
|
|
|
* Represents a File that contains a {@link IFullDataSource}.
|
|
|
|
|
*/
|
|
|
|
|
public class FullDataMetaFile extends AbstractMetaDataContainerFile
|
|
|
|
|
{
|
|
|
|
|
private static final Logger LOGGER = DhLoggerBuilder.getLogger(FullDataMetaFile.class.getSimpleName());
|
|
|
|
|
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final IDhLevel level;
|
|
|
|
|
private final IFullDataSourceProvider handler;
|
|
|
|
|
private final IFullDataSourceProvider fullDataSourceProvider;
|
|
|
|
|
private boolean doesFileExist;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public AbstractFullDataSourceLoader fullDataSourceLoader;
|
|
|
|
|
public Class<? extends IFullDataSource> dataType;
|
|
|
|
|
// The '?' type should either be:
|
|
|
|
|
// SoftReference<LodDataSource>, or - Non-dirty file that can be GCed
|
|
|
|
|
// CompletableFuture<LodDataSource>, or - File that is being loaded. No guarantee that the type is promotable or not
|
|
|
|
|
// null - Nothing is loaded or being loaded
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Deprecated: this should be split up into multiple variables to prevent datatype confusion
|
|
|
|
|
*
|
|
|
|
|
* The '?' type should either be:
|
|
|
|
|
* SoftReference<LodDataSource>, or - Non-dirty file that can be GCed
|
|
|
|
|
* CompletableFuture<LodDataSource>, or - File that is being loaded. No guarantee that the type is promotable or not
|
|
|
|
|
* null - Nothing is loaded or being loaded
|
|
|
|
|
*/
|
|
|
|
|
@Deprecated
|
|
|
|
|
AtomicReference<Object> data = new AtomicReference<>(null);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//TODO: use ConcurrentAppendSingleSwapContainer<LodDataSource> instead of below:
|
|
|
|
|
private static class GuardedMultiAppendQueue {
|
|
|
|
|
private static class GuardedMultiAppendQueue
|
|
|
|
|
{
|
|
|
|
|
ReentrantReadWriteLock appendLock = new ReentrantReadWriteLock();
|
|
|
|
|
ConcurrentLinkedQueue<ChunkSizedFullDataAccessor> queue = new ConcurrentLinkedQueue<>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// ===Concurrent Write stuff===
|
|
|
|
|
AtomicReference<GuardedMultiAppendQueue> writeQueue =
|
|
|
|
|
new AtomicReference<>(new GuardedMultiAppendQueue());
|
|
|
|
|
GuardedMultiAppendQueue _backQueue = new GuardedMultiAppendQueue();
|
|
|
|
|
private final AtomicReference<GuardedMultiAppendQueue> writeQueueRef = new AtomicReference<>(new GuardedMultiAppendQueue());
|
|
|
|
|
private GuardedMultiAppendQueue backWriteQueue = new GuardedMultiAppendQueue();
|
|
|
|
|
// ===========================
|
|
|
|
|
|
|
|
|
|
private AtomicReference<CompletableFuture<IFullDataSource>> inCacheWriteAccessFuture = new AtomicReference<>(null);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final AtomicReference<CompletableFuture<IFullDataSource>> inCacheWriteAccessFuture = new AtomicReference<>(null);
|
|
|
|
|
|
|
|
|
|
// ===Object lifetime stuff===
|
|
|
|
|
private static final ReferenceQueue<IFullDataSource> lifeCycleDebugQueue = new ReferenceQueue<>();
|
|
|
|
|
private static final Set<DataObjTracker> lifeCycleDebugSet = ConcurrentHashMap.newKeySet();
|
|
|
|
@@ -81,128 +90,94 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Create a new metaFile
|
|
|
|
|
public FullDataMetaFile(IFullDataSourceProvider handler, IDhLevel level, DhSectionPos pos) throws IOException
|
|
|
|
|
//==============//
|
|
|
|
|
// constructors //
|
|
|
|
|
//==============//
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates a new file.
|
|
|
|
|
* @throws FileAlreadyExistsException if a file already exists.
|
|
|
|
|
*/
|
|
|
|
|
public FullDataMetaFile(IFullDataSourceProvider fullDataSourceProvider, IDhLevel level, DhSectionPos pos) throws FileAlreadyExistsException
|
|
|
|
|
{
|
|
|
|
|
super(handler.computeDataFilePath(pos), pos);
|
|
|
|
|
debugCheck();
|
|
|
|
|
this.handler = handler;
|
|
|
|
|
this.level = level;
|
|
|
|
|
LodUtil.assertTrue(metaData == null);
|
|
|
|
|
doesFileExist = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public FullDataMetaFile(IFullDataSourceProvider handler, IDhLevel level, File path) throws IOException
|
|
|
|
|
{
|
|
|
|
|
super(path);
|
|
|
|
|
debugCheck();
|
|
|
|
|
this.handler = handler;
|
|
|
|
|
this.level = level;
|
|
|
|
|
LodUtil.assertTrue(metaData != null);
|
|
|
|
|
fullDataSourceLoader = AbstractFullDataSourceLoader.getLoader(metaData.dataTypeId, metaData.loaderVersion);
|
|
|
|
|
if (fullDataSourceLoader == null) {
|
|
|
|
|
throw new IOException("Invalid file: Data type loader not found: "
|
|
|
|
|
+ metaData.dataTypeId + "(v" + metaData.loaderVersion + ")");
|
|
|
|
|
}
|
|
|
|
|
dataType = fullDataSourceLoader.clazz;
|
|
|
|
|
doesFileExist = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public CompletableFuture<Void> flushAndSaveAsync()
|
|
|
|
|
{
|
|
|
|
|
debugCheck();
|
|
|
|
|
boolean isEmpty = this.writeQueue.get().queue.isEmpty();
|
|
|
|
|
if (!isEmpty)
|
|
|
|
|
{
|
|
|
|
|
return this.loadOrGetCachedAsync().thenApply((unused) -> null); // This will flush the data to disk.
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
return CompletableFuture.completedFuture(null);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// public long getCacheVersion() {
|
|
|
|
|
// debugCheck();
|
|
|
|
|
// return (this.metaData == null) ? 0 : this.metaData.dataVersion.get();
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// public boolean isCacheVersionValid(long cacheVersion)
|
|
|
|
|
// {
|
|
|
|
|
// debugCheck();
|
|
|
|
|
// boolean noWrite = this.writeQueue.get().queue.isEmpty();
|
|
|
|
|
// if (!noWrite)
|
|
|
|
|
// {
|
|
|
|
|
// return false;
|
|
|
|
|
// }
|
|
|
|
|
// else
|
|
|
|
|
// {
|
|
|
|
|
// BaseMetaData getData = this.metaData;
|
|
|
|
|
// //NOTE: Do this instead of direct compare so values that wrapped around still work correctly.
|
|
|
|
|
// return (getData == null ? 0 : this.metaData.dataVersion.get()) - cacheVersion <= 0;
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
public void addToWriteQueue(ChunkSizedFullDataAccessor chunkDataSource)
|
|
|
|
|
{
|
|
|
|
|
debugCheck();
|
|
|
|
|
DhLodPos chunkLodPos = new DhLodPos(LodUtil.CHUNK_DETAIL_LEVEL, chunkDataSource.pos.x, chunkDataSource.pos.z);
|
|
|
|
|
LodUtil.assertTrue(pos.getSectionBBoxPos().overlapsExactly(chunkLodPos), "Chunk pos "+chunkLodPos+" doesn't overlap with section "+pos);
|
|
|
|
|
//LOGGER.info("Write Chunk {} to file {}", chunkPos, pos);
|
|
|
|
|
super(fullDataSourceProvider.computeDataFilePath(pos), pos);
|
|
|
|
|
debugPhantomLifeCycleCheck();
|
|
|
|
|
|
|
|
|
|
GuardedMultiAppendQueue writeQueue = this.writeQueue.get();
|
|
|
|
|
// Using read lock is OK, because the queue's underlying data structure is thread-safe.
|
|
|
|
|
// This lock is only used to insure on polling the queue, that the queue is not being
|
|
|
|
|
// modified by another thread.
|
|
|
|
|
Lock appendLock = writeQueue.appendLock.readLock();
|
|
|
|
|
appendLock.lock();
|
|
|
|
|
try
|
|
|
|
|
this.fullDataSourceProvider = fullDataSourceProvider;
|
|
|
|
|
this.level = level;
|
|
|
|
|
LodUtil.assertTrue(this.baseMetaData == null);
|
|
|
|
|
this.doesFileExist = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Uses an existing file.
|
|
|
|
|
* @throws IOException if the file was formatted incorrectly
|
|
|
|
|
* @throws FileNotFoundException if no file exists for the given path
|
|
|
|
|
*/
|
|
|
|
|
public FullDataMetaFile(IFullDataSourceProvider fullDataSourceProvider, IDhLevel level, File file) throws IOException, FileNotFoundException
|
|
|
|
|
{
|
|
|
|
|
super(file);
|
|
|
|
|
debugPhantomLifeCycleCheck();
|
|
|
|
|
|
|
|
|
|
this.fullDataSourceProvider = fullDataSourceProvider;
|
|
|
|
|
this.level = level;
|
|
|
|
|
LodUtil.assertTrue(this.baseMetaData != null);
|
|
|
|
|
this.doesFileExist = true;
|
|
|
|
|
|
|
|
|
|
this.fullDataSourceLoader = AbstractFullDataSourceLoader.getLoader(this.baseMetaData.dataTypeId, this.baseMetaData.binaryDataFormatVersion);
|
|
|
|
|
if (this.fullDataSourceLoader == null)
|
|
|
|
|
{
|
|
|
|
|
writeQueue.queue.add(chunkDataSource);
|
|
|
|
|
}
|
|
|
|
|
finally
|
|
|
|
|
{
|
|
|
|
|
appendLock.unlock();
|
|
|
|
|
throw new IOException("Invalid file: Data type loader not found: "+this.baseMetaData.dataTypeId+"(v"+this.baseMetaData.binaryDataFormatVersion +")");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//LOGGER.info("write queue length for pos "+this.pos+": " + writeQueue.queue.size());
|
|
|
|
|
this.dataType = this.fullDataSourceLoader.clazz;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//==========//
|
|
|
|
|
// get data //
|
|
|
|
|
//==========//
|
|
|
|
|
|
|
|
|
|
// Cause: Generic Type runtime casting cannot safety check it.
|
|
|
|
|
// However, the Union type ensures the 'data' should only contain the listed type.
|
|
|
|
|
public CompletableFuture<IFullDataSource> loadOrGetCachedAsync()
|
|
|
|
|
public CompletableFuture<IFullDataSource> loadOrGetCachedDataSourceAsync()
|
|
|
|
|
{
|
|
|
|
|
debugCheck();
|
|
|
|
|
debugPhantomLifeCycleCheck();
|
|
|
|
|
Object obj = this.data.get();
|
|
|
|
|
|
|
|
|
|
CompletableFuture<IFullDataSource> cached = this._readCachedAsync(obj);
|
|
|
|
|
CompletableFuture<IFullDataSource> cached = this.getCachedDataSourceAsync(obj);
|
|
|
|
|
if (cached != null)
|
|
|
|
|
{
|
|
|
|
|
return cached;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CompletableFuture<IFullDataSource> future = new CompletableFuture<>();
|
|
|
|
|
|
|
|
|
|
// Would use faster and non-nesting Compare and exchange. But java 8 doesn't have it! :(
|
|
|
|
|
boolean worked = this.data.compareAndSet(obj, future); // TODO obj and future are different object types, would this ever return true?
|
|
|
|
|
boolean worked = this.data.compareAndSet(obj, future); // TODO if data was a future it would have a different memory address, would this ever return true?
|
|
|
|
|
if (!worked)
|
|
|
|
|
{
|
|
|
|
|
return this.loadOrGetCachedAsync();
|
|
|
|
|
// TODO wouldn't this cause an infinite loop?
|
|
|
|
|
return this.loadOrGetCachedDataSourceAsync();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// After cas. We are in exclusive control.
|
|
|
|
|
if (!this.doesFileExist)
|
|
|
|
|
{
|
|
|
|
|
this.handler.onCreateDataFile(this)
|
|
|
|
|
// create a new Meta file
|
|
|
|
|
|
|
|
|
|
this.fullDataSourceProvider.onCreateDataFile(this)
|
|
|
|
|
.thenApply((data) ->
|
|
|
|
|
{
|
|
|
|
|
this.metaData = makeMetaData(data);
|
|
|
|
|
this.baseMetaData = this._makeBaseMetaData(data);
|
|
|
|
|
return data;
|
|
|
|
|
})
|
|
|
|
|
.thenApply((data) -> this.handler.onDataFileLoaded(data, this.metaData, this::saveChanges, this::applyWriteQueue))
|
|
|
|
|
.thenApply((data) -> this.fullDataSourceProvider.onDataFileLoaded(data, this.baseMetaData, this::_updateAndWriteDataSource, this::_applyWriteQueueToFullDataSource))
|
|
|
|
|
.whenComplete((fullDataSource, exception) ->
|
|
|
|
|
{
|
|
|
|
|
if (exception != null)
|
|
|
|
@@ -221,16 +196,19 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// read in the existing meta file's data
|
|
|
|
|
|
|
|
|
|
if (this.baseMetaData == null)
|
|
|
|
|
{
|
|
|
|
|
throw new IllegalStateException("Meta data not loaded!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CompletableFuture.supplyAsync(() ->
|
|
|
|
|
{
|
|
|
|
|
if (this.metaData == null)
|
|
|
|
|
{
|
|
|
|
|
throw new IllegalStateException("Meta data not loaded!"); // TODO should this be a CompletionException?
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Load the file.
|
|
|
|
|
IFullDataSource fullDataSource;
|
|
|
|
|
try (FileInputStream fileInputStream = this.getFileInputStream();
|
|
|
|
|
try (FileInputStream fileInputStream = this._getFileInputStream();
|
|
|
|
|
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream))
|
|
|
|
|
{
|
|
|
|
|
fullDataSource = this.fullDataSourceLoader.loadData(this, bufferedInputStream, this.level);
|
|
|
|
@@ -247,31 +225,34 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
|
|
|
|
|
throw new CompletionException(ex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Apply the write queue
|
|
|
|
|
|
|
|
|
|
// confirm that this thread is in control
|
|
|
|
|
LodUtil.assertTrue(this.inCacheWriteAccessFuture.get() == null,
|
|
|
|
|
"No one should be writing to the cache while we are in the process of " +
|
|
|
|
|
"loading one into the cache! Is this a deadlock?");
|
|
|
|
|
|
|
|
|
|
fullDataSource = this.handler.onDataFileLoaded(fullDataSource, this.metaData, this::saveChanges, this::applyWriteQueue);
|
|
|
|
|
// fire the onDataLoaded method
|
|
|
|
|
fullDataSource = this.fullDataSourceProvider.onDataFileLoaded(fullDataSource, this.baseMetaData, this::_updateAndWriteDataSource, this::_applyWriteQueueToFullDataSource);
|
|
|
|
|
return fullDataSource;
|
|
|
|
|
}, this.handler.getIOExecutor())
|
|
|
|
|
|
|
|
|
|
}, this.fullDataSourceProvider.getIOExecutor())
|
|
|
|
|
.exceptionally((ex) ->
|
|
|
|
|
{
|
|
|
|
|
if (ex instanceof InterruptedException)
|
|
|
|
|
{
|
|
|
|
|
// this exception can be ignored
|
|
|
|
|
//LOGGER.warn(FullDataMetaFile.class.getSimpleName()+" loadOrGetCachedAsync interrupted.");
|
|
|
|
|
//future.completeExceptionally(ex); // this exception can be ignored
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
else if (ex instanceof RejectedExecutionException)
|
|
|
|
|
{
|
|
|
|
|
// this exception can be ignored
|
|
|
|
|
//LOGGER.warn(FullDataMetaFile.class.getSimpleName()+" loadOrGetCachedAsync attempted to use a closed thread pool.");
|
|
|
|
|
//future.completeExceptionally(ex); // this exception can be ignored
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOGGER.error("Error loading file {}: ", this.file, ex);
|
|
|
|
|
LOGGER.error("Error loading file "+this.file+": ", ex);
|
|
|
|
|
this.data.set(null);
|
|
|
|
|
|
|
|
|
|
future.completeExceptionally(ex);
|
|
|
|
@@ -285,127 +266,212 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Would use CompletableFuture.completeAsync(...), But, java 8 doesn't have it! :(
|
|
|
|
|
//return future.completeAsync(this::loadAndUpdateDataSource, fileReaderThreads);
|
|
|
|
|
|
|
|
|
|
return future;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static BaseMetaData makeMetaData(IFullDataSource data) {
|
|
|
|
|
/** @return a stream for the data contained in this file, skips the metadata from {@link AbstractMetaDataContainerFile}. */
|
|
|
|
|
private FileInputStream _getFileInputStream() throws IOException
|
|
|
|
|
{
|
|
|
|
|
FileInputStream fileInputStream = new FileInputStream(this.file);
|
|
|
|
|
|
|
|
|
|
// skip the meta-data bytes
|
|
|
|
|
int bytesToSkip = AbstractMetaDataContainerFile.METADATA_SIZE_IN_BYTES;
|
|
|
|
|
while (bytesToSkip > 0)
|
|
|
|
|
{
|
|
|
|
|
long skippedByteCount = fileInputStream.skip(bytesToSkip);
|
|
|
|
|
if (skippedByteCount == 0)
|
|
|
|
|
{
|
|
|
|
|
throw new IOException("Invalid file: Failed to skip metadata.");
|
|
|
|
|
}
|
|
|
|
|
bytesToSkip -= skippedByteCount;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (bytesToSkip != 0)
|
|
|
|
|
{
|
|
|
|
|
throw new IOException("File IO Error: Failed to skip metadata.");
|
|
|
|
|
}
|
|
|
|
|
return fileInputStream;
|
|
|
|
|
}
|
|
|
|
|
private BaseMetaData _makeBaseMetaData(IFullDataSource data)
|
|
|
|
|
{
|
|
|
|
|
AbstractFullDataSourceLoader loader = AbstractFullDataSourceLoader.getLoader(data.getClass(), data.getBinaryDataFormatVersion());
|
|
|
|
|
return new BaseMetaData(data.getSectionPos(), -1,
|
|
|
|
|
data.getDataDetailLevel(), data.getWorldGenStep(), (loader == null ? 0 : loader.datatypeId), data.getBinaryDataFormatVersion());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// "unchecked": Suppress casting of CompletableFuture<?> to CompletableFuture<LodDataSource>
|
|
|
|
|
// "PointlessBooleanExpression": Suppress explicit (boolean == false) check for more understandable CAS operation code.
|
|
|
|
|
private CompletableFuture<IFullDataSource> _readCachedAsync(Object obj)
|
|
|
|
|
/**
|
|
|
|
|
* @return either the cached {@link IFullDataSource},
|
|
|
|
|
* a future that will complete once the {@link FullDataMetaFile#writeQueueRef} has been written,
|
|
|
|
|
* or null if something went wrong
|
|
|
|
|
*/
|
|
|
|
|
private CompletableFuture<IFullDataSource> getCachedDataSourceAsync(Object obj)
|
|
|
|
|
{
|
|
|
|
|
// Has file cached in RAM and not freed yet.
|
|
|
|
|
if ((obj instanceof SoftReference<?>))
|
|
|
|
|
{
|
|
|
|
|
Object inner = ((SoftReference<?>)obj).get();
|
|
|
|
|
if (inner != null)
|
|
|
|
|
// The file is cached in RAM
|
|
|
|
|
|
|
|
|
|
IFullDataSource innerFullDataSource = (IFullDataSource) ((SoftReference<?>)obj).get();
|
|
|
|
|
if (innerFullDataSource != null)
|
|
|
|
|
{
|
|
|
|
|
LodUtil.assertTrue(inner instanceof IFullDataSource);
|
|
|
|
|
boolean isEmpty = writeQueue.get().queue.isEmpty();
|
|
|
|
|
boolean writeQueueEmpty = this.writeQueueRef.get().queue.isEmpty();
|
|
|
|
|
|
|
|
|
|
// If the queue is empty, and the CAS on inCacheWriteLock succeeds, then we are the thread
|
|
|
|
|
// that will be applying the changes to the cache.
|
|
|
|
|
if (!isEmpty)
|
|
|
|
|
if (writeQueueEmpty)
|
|
|
|
|
{
|
|
|
|
|
// Do a CAS on inCacheWriteLock to ensure that we are the only thread that is writing to the cache,
|
|
|
|
|
// or if we fail, then that means someone else is already doing it, and we can just return the future
|
|
|
|
|
CompletableFuture<IFullDataSource> future = new CompletableFuture<>();
|
|
|
|
|
CompletableFuture<IFullDataSource> compareAndSwapFuture = AtomicsUtil.compareAndExchange(inCacheWriteAccessFuture, null, future);
|
|
|
|
|
if (compareAndSwapFuture == null)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
data.set(future);
|
|
|
|
|
handler.onDataFileRefresh((IFullDataSource) inner, metaData, this::applyWriteQueue, this::saveChanges).handle((fullDataSource, exception) ->
|
|
|
|
|
{
|
|
|
|
|
if (exception != null)
|
|
|
|
|
{
|
|
|
|
|
LOGGER.error("Error refreshing data "+pos+": "+exception);
|
|
|
|
|
future.complete(null);
|
|
|
|
|
data.set(null);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
future.complete(fullDataSource);
|
|
|
|
|
new DataObjTracker(fullDataSource);
|
|
|
|
|
data.set(new SoftReference<>(fullDataSource));
|
|
|
|
|
}
|
|
|
|
|
inCacheWriteAccessFuture.set(null);
|
|
|
|
|
return fullDataSource;
|
|
|
|
|
});
|
|
|
|
|
return future;
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
LOGGER.error("Error while doing refreshes to LodDataSource at " + pos + ": " + e);
|
|
|
|
|
return CompletableFuture.completedFuture((IFullDataSource) inner);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// or, return the future that will be completed when the write is done.
|
|
|
|
|
return compareAndSwapFuture;
|
|
|
|
|
}
|
|
|
|
|
// return the cached data
|
|
|
|
|
return CompletableFuture.completedFuture(innerFullDataSource);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// or, return the cached data.
|
|
|
|
|
return CompletableFuture.completedFuture((IFullDataSource) inner);
|
|
|
|
|
// either write the queue or return the future that is waiting for the queue write
|
|
|
|
|
|
|
|
|
|
// Do a CAS on inCacheWriteLock to ensure that we are the only thread that is writing to the cache,
|
|
|
|
|
// or if we fail, then that means someone else is already doing it, and we can just return the future
|
|
|
|
|
CompletableFuture<IFullDataSource> future = new CompletableFuture<>();
|
|
|
|
|
CompletableFuture<IFullDataSource> compareAndSwapFuture = AtomicsUtil.compareAndExchange(this.inCacheWriteAccessFuture, null, future);
|
|
|
|
|
if (compareAndSwapFuture != null)
|
|
|
|
|
{
|
|
|
|
|
// a write is already in progress, return its future.
|
|
|
|
|
return compareAndSwapFuture;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// write the queue to the data source
|
|
|
|
|
|
|
|
|
|
// try // TODO is this try necessary?
|
|
|
|
|
// {
|
|
|
|
|
this.data.set(future);
|
|
|
|
|
|
|
|
|
|
this.fullDataSourceProvider.onDataFileRefresh(innerFullDataSource, this.baseMetaData, this::_applyWriteQueueToFullDataSource, this::_updateAndWriteDataSource)
|
|
|
|
|
.handle((fullDataSource, exception) ->
|
|
|
|
|
{
|
|
|
|
|
if (exception != null)
|
|
|
|
|
{
|
|
|
|
|
LOGGER.error("Error refreshing data "+this.pos+": "+exception+" "+exception.getMessage());
|
|
|
|
|
future.complete(null);
|
|
|
|
|
this.data.set(null);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
future.complete(fullDataSource);
|
|
|
|
|
new DataObjTracker(fullDataSource);
|
|
|
|
|
this.data.set(new SoftReference<>(fullDataSource));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.inCacheWriteAccessFuture.set(null);
|
|
|
|
|
return fullDataSource;
|
|
|
|
|
});
|
|
|
|
|
return future;
|
|
|
|
|
// }
|
|
|
|
|
// catch (Exception e)
|
|
|
|
|
// {
|
|
|
|
|
// LOGGER.error("Error while doing refreshes to LodDataSource at "+this.pos+": "+e);
|
|
|
|
|
// return CompletableFuture.completedFuture(innerFullDataSource);
|
|
|
|
|
// }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//==== Cached file out of scrope. ====
|
|
|
|
|
// Someone is already trying to complete it. so just return the obj.
|
|
|
|
|
if ((obj instanceof CompletableFuture<?>)) {
|
|
|
|
|
return (CompletableFuture<IFullDataSource>)obj;
|
|
|
|
|
|
|
|
|
|
//==== Cached file out of scope ====
|
|
|
|
|
// Someone is already trying to complete it. so return the in-progress future.
|
|
|
|
|
if ((obj instanceof CompletableFuture<?>))
|
|
|
|
|
{
|
|
|
|
|
return (CompletableFuture<IFullDataSource>) obj;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void swapWriteQueue() {
|
|
|
|
|
GuardedMultiAppendQueue queue = writeQueue.getAndSet(_backQueue);
|
|
|
|
|
// Acquire write lock and then release it again as we only need to ensure that the queue
|
|
|
|
|
// is not being appended to by another thread. Note that the above atomic swap &
|
|
|
|
|
// the guarantee that all append first acquire the appendLock means after the locK() call,
|
|
|
|
|
// there will be no other threads able to or is currently appending to the queue.
|
|
|
|
|
// Note: The above needs the getAndSet() to have at least Release Memory order.
|
|
|
|
|
// (not that java supports anything non volatile for getAndSet()...)
|
|
|
|
|
queue.appendLock.writeLock().lock();
|
|
|
|
|
queue.appendLock.writeLock().unlock();
|
|
|
|
|
_backQueue = queue;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//===============//
|
|
|
|
|
// data updating //
|
|
|
|
|
//===============//
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Adds the given {@link ChunkSizedFullDataAccessor} to the write queue,
|
|
|
|
|
* which will be applied to the object at some undefined time in the future.
|
|
|
|
|
*/
|
|
|
|
|
public void addToWriteQueue(ChunkSizedFullDataAccessor chunkAccessor)
|
|
|
|
|
{
|
|
|
|
|
debugPhantomLifeCycleCheck();
|
|
|
|
|
|
|
|
|
|
DhLodPos chunkLodPos = new DhLodPos(LodUtil.CHUNK_DETAIL_LEVEL, chunkAccessor.pos.x, chunkAccessor.pos.z);
|
|
|
|
|
|
|
|
|
|
LodUtil.assertTrue(this.pos.getSectionBBoxPos().overlapsExactly(chunkLodPos), "Chunk pos "+chunkLodPos+" doesn't exactly overlap with section "+this.pos);
|
|
|
|
|
//LOGGER.info("Write Chunk {} to file {}", chunkPos, pos);
|
|
|
|
|
|
|
|
|
|
GuardedMultiAppendQueue writeQueue = this.writeQueueRef.get();
|
|
|
|
|
// Using read lock is OK, because the queue's underlying data structure is thread-safe.
|
|
|
|
|
// This lock is only used to insure on polling the queue, that the queue is not being
|
|
|
|
|
// modified by another thread.
|
|
|
|
|
ReentrantReadWriteLock.ReadLock appendLock = writeQueue.appendLock.readLock();
|
|
|
|
|
appendLock.lock();
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
writeQueue.queue.add(chunkAccessor);
|
|
|
|
|
}
|
|
|
|
|
finally
|
|
|
|
|
{
|
|
|
|
|
appendLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//LOGGER.info("write queue length for pos "+this.pos+": " + writeQueue.queue.size());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void saveChanges(IFullDataSource fullDataSource)
|
|
|
|
|
|
|
|
|
|
/** Applies any queued {@link ChunkSizedFullDataAccessor} to this metadata's {@link IFullDataSource} and writes the data to file. */
|
|
|
|
|
public CompletableFuture<Void> flushAndSaveAsync()
|
|
|
|
|
{
|
|
|
|
|
if (fullDataSource.isEmpty())
|
|
|
|
|
debugPhantomLifeCycleCheck();
|
|
|
|
|
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty();
|
|
|
|
|
if (!isEmpty)
|
|
|
|
|
{
|
|
|
|
|
if (file.exists() && !file.delete())
|
|
|
|
|
{
|
|
|
|
|
LOGGER.warn("Failed to delete data file at {}", file);
|
|
|
|
|
}
|
|
|
|
|
doesFileExist = false;
|
|
|
|
|
// This will flush the data to disk.
|
|
|
|
|
return this.loadOrGetCachedDataSourceAsync().thenApply((fullDataSource) -> null /* ignore the result, just wait for the load to finish*/);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
return CompletableFuture.completedFuture(null);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** updates this object to match the given {@link IFullDataSource} and then writes the new data to file. */
|
|
|
|
|
private void _updateAndWriteDataSource(IFullDataSource fullDataSource)
|
|
|
|
|
{
|
|
|
|
|
if (fullDataSource.isEmpty())
|
|
|
|
|
{
|
|
|
|
|
// delete the empty data source
|
|
|
|
|
if (this.file.exists() && !this.file.delete())
|
|
|
|
|
{
|
|
|
|
|
LOGGER.warn("Failed to delete data file at "+this.file);
|
|
|
|
|
}
|
|
|
|
|
this.doesFileExist = false;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// update the data source and write the new data to file
|
|
|
|
|
|
|
|
|
|
//LOGGER.info("Saving data file of {}", data.getSectionPos());
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
// Write/Update data
|
|
|
|
|
LodUtil.assertTrue(metaData != null);
|
|
|
|
|
metaData.dataLevel = fullDataSource.getDataDetailLevel();
|
|
|
|
|
fullDataSourceLoader = AbstractFullDataSourceLoader.getLoader(fullDataSource.getClass(), fullDataSource.getBinaryDataFormatVersion());
|
|
|
|
|
LodUtil.assertTrue(fullDataSourceLoader != null, "No loader for "+fullDataSource.getClass()+" (v"+fullDataSource.getBinaryDataFormatVersion()+")");
|
|
|
|
|
dataType = fullDataSource.getClass();
|
|
|
|
|
metaData.dataTypeId = (fullDataSourceLoader == null) ? 0 : fullDataSourceLoader.datatypeId;
|
|
|
|
|
metaData.loaderVersion = fullDataSource.getBinaryDataFormatVersion();
|
|
|
|
|
super.writeData((outputStream) -> fullDataSource.writeToStream(outputStream, level));
|
|
|
|
|
doesFileExist = true;
|
|
|
|
|
LodUtil.assertTrue(this.baseMetaData != null);
|
|
|
|
|
|
|
|
|
|
this.baseMetaData.dataLevel = fullDataSource.getDataDetailLevel();
|
|
|
|
|
this.fullDataSourceLoader = AbstractFullDataSourceLoader.getLoader(fullDataSource.getClass(), fullDataSource.getBinaryDataFormatVersion());
|
|
|
|
|
LodUtil.assertTrue(this.fullDataSourceLoader != null, "No loader for "+fullDataSource.getClass()+" (v"+fullDataSource.getBinaryDataFormatVersion()+")");
|
|
|
|
|
|
|
|
|
|
this.dataType = fullDataSource.getClass();
|
|
|
|
|
this.baseMetaData.dataTypeId = (this.fullDataSourceLoader == null) ? 0 : this.fullDataSourceLoader.datatypeId;
|
|
|
|
|
this.baseMetaData.binaryDataFormatVersion = fullDataSource.getBinaryDataFormatVersion();
|
|
|
|
|
|
|
|
|
|
super.writeData((outputStream) -> fullDataSource.writeToStream(outputStream, this.level));
|
|
|
|
|
this.doesFileExist = true;
|
|
|
|
|
}
|
|
|
|
|
catch (ClosedByInterruptException e) // thrown by buffers that are interrupted
|
|
|
|
|
{
|
|
|
|
@@ -414,55 +480,53 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
|
|
|
|
|
}
|
|
|
|
|
catch (IOException e)
|
|
|
|
|
{
|
|
|
|
|
LOGGER.error("Failed to save updated data file at "+file+" for sect "+pos, e);
|
|
|
|
|
LOGGER.error("Failed to save updated data file at "+this.file+" for section "+this.pos, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** @return whether any writing has happened to the data */
|
|
|
|
|
private boolean applyWriteQueue(IFullDataSource fullDataSource)
|
|
|
|
|
/** @return true if the queue was not empty and data was applied to the {@link IFullDataSource}. */
|
|
|
|
|
private boolean _applyWriteQueueToFullDataSource(IFullDataSource fullDataSource)
|
|
|
|
|
{
|
|
|
|
|
// TODO this isn't being called enough
|
|
|
|
|
|
|
|
|
|
// Poll the write queue
|
|
|
|
|
// First check if write queue is empty, then swap the write queue.
|
|
|
|
|
// Must be done in this order to ensure isMemoryAddressValid work properly. See isMemoryAddressValid() for details.
|
|
|
|
|
boolean isEmpty = this.writeQueue.get().queue.isEmpty();
|
|
|
|
|
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty();
|
|
|
|
|
if (!isEmpty)
|
|
|
|
|
{
|
|
|
|
|
this.swapWriteQueue();
|
|
|
|
|
int count = this._backQueue.queue.size();
|
|
|
|
|
for (ChunkSizedFullDataAccessor chunk : this._backQueue.queue)
|
|
|
|
|
this._swapWriteQueue();
|
|
|
|
|
for (ChunkSizedFullDataAccessor chunk : this.backWriteQueue.queue)
|
|
|
|
|
{
|
|
|
|
|
fullDataSource.update(chunk);
|
|
|
|
|
}
|
|
|
|
|
this._backQueue.queue.clear();
|
|
|
|
|
this.backWriteQueue.queue.clear();
|
|
|
|
|
//LOGGER.info("Updated Data file at {} for sect {} with {} chunk writes.", path, pos, count);
|
|
|
|
|
}
|
|
|
|
|
return !isEmpty;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private FileInputStream getFileInputStream() throws IOException
|
|
|
|
|
private void _swapWriteQueue()
|
|
|
|
|
{
|
|
|
|
|
FileInputStream fileInputStream = new FileInputStream(this.file);
|
|
|
|
|
int toSkip = METADATA_SIZE_IN_BYTES;
|
|
|
|
|
while (toSkip > 0)
|
|
|
|
|
{
|
|
|
|
|
long skipped = fileInputStream.skip(toSkip);
|
|
|
|
|
if (skipped == 0)
|
|
|
|
|
{
|
|
|
|
|
throw new IOException("Invalid file: Failed to skip metadata.");
|
|
|
|
|
}
|
|
|
|
|
toSkip -= skipped;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (toSkip != 0)
|
|
|
|
|
{
|
|
|
|
|
throw new IOException("File IO Error: Failed to skip metadata.");
|
|
|
|
|
}
|
|
|
|
|
return fileInputStream;
|
|
|
|
|
GuardedMultiAppendQueue writeQueue = this.writeQueueRef.getAndSet(this.backWriteQueue);
|
|
|
|
|
// Acquire write lock and then release it again as we only need to ensure that the queue
|
|
|
|
|
// is not being appended to by another thread. Note that the above atomic swap &
|
|
|
|
|
// the guarantee that all append first acquire the appendLock means after the locK() call,
|
|
|
|
|
// there will be no other threads able to or is currently appending to the queue.
|
|
|
|
|
// Note: The above needs the getAndSet() to have at least Release Memory order.
|
|
|
|
|
// (not that java supports anything non volatile for getAndSet()...)
|
|
|
|
|
writeQueue.appendLock.writeLock().lock();
|
|
|
|
|
writeQueue.appendLock.writeLock().unlock();
|
|
|
|
|
this.backWriteQueue = writeQueue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static void debugCheck()
|
|
|
|
|
|
|
|
|
|
//===========//
|
|
|
|
|
// debugging //
|
|
|
|
|
//===========//
|
|
|
|
|
|
|
|
|
|
public static void debugPhantomLifeCycleCheck()
|
|
|
|
|
{
|
|
|
|
|
DataObjTracker phantom = (DataObjTracker) lifeCycleDebugQueue.poll();
|
|
|
|
|
|
|
|
|
|