Remove CacheQueryResult from FullDataMetaFile
This commit is contained in:
+2
-2
@@ -130,7 +130,7 @@ public class FullDataFileHandler implements IFullDataSourceProvider
|
||||
|
||||
// future wrapper necessary in order to handle file read errors
|
||||
CompletableFuture<IFullDataSource> futureWrapper = new CompletableFuture<>();
|
||||
metaFile.loadOrGetCachedDataSourceAsync().exceptionally((e) ->
|
||||
metaFile.getOrLoadCachedDataSourceAsync().exceptionally((e) ->
|
||||
{
|
||||
FullDataMetaFile newMetaFile = this.removeCorruptedFile(pos, metaFile, e);
|
||||
|
||||
@@ -380,7 +380,7 @@ public class FullDataFileHandler implements IFullDataSourceProvider
|
||||
final ArrayList<CompletableFuture<Void>> loadDataFutures = new ArrayList<>(existingFiles.size());
|
||||
for (FullDataMetaFile existingFile : existingFiles)
|
||||
{
|
||||
loadDataFutures.add(existingFile.loadOrGetCachedDataSourceAsync()
|
||||
loadDataFutures.add(existingFile.getOrLoadCachedDataSourceAsync()
|
||||
.exceptionally((ex) -> /*Ignore file read errors*/null)
|
||||
.thenAccept((existingFullDataSource) ->
|
||||
{
|
||||
|
||||
+80
-96
@@ -64,7 +64,6 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
public boolean genQueueChecked = false;
|
||||
|
||||
private volatile boolean markedNeedUpdate = false;
|
||||
private volatile boolean inCrit = false;
|
||||
|
||||
public AbstractFullDataSourceLoader fullDataSourceLoader;
|
||||
public Class<? extends IFullDataSource> dataType;
|
||||
@@ -188,9 +187,9 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
|
||||
private void makeUpdateCompletionStage(CompletableFuture<IFullDataSource> completer, CompletableFuture<IFullDataSource> currentStage)
|
||||
{
|
||||
currentStage.thenCompose(
|
||||
(fullDataSource) -> {
|
||||
markedNeedUpdate = false;
|
||||
currentStage.thenCompose((fullDataSource) ->
|
||||
{
|
||||
this.markedNeedUpdate = false;
|
||||
return this.fullDataSourceProvider.onDataFileUpdate(fullDataSource, this, this::_updateAndWriteDataSource, this::_applyWriteQueueToFullDataSource);
|
||||
})
|
||||
.whenComplete((fullDataSource, ex) ->
|
||||
@@ -206,7 +205,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
new DataObjSoftTracker(this, fullDataSource);
|
||||
}
|
||||
//LOGGER.info("Updated file "+this.file);
|
||||
if (pos.sectionDetailLevel == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL)
|
||||
if (this.pos.sectionDetailLevel == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL)
|
||||
DebugRenderer.makeParticle(
|
||||
new DebugRenderer.BoxParticle(
|
||||
new DebugRenderer.Box(this.pos, 64f, 72f, 0.03f, Color.green.darker()),
|
||||
@@ -215,21 +214,21 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
);
|
||||
|
||||
this.cachedFullDataSourceRef = new SoftReference<>(fullDataSource);
|
||||
inCrit = false;
|
||||
dataSourceLoadFutureRef.set(null);
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
completer.complete(fullDataSource);
|
||||
|
||||
if (this.markedNeedUpdate)
|
||||
{
|
||||
// trigger another update
|
||||
this.loadOrGetCachedDataSourceAsync();
|
||||
this.getOrLoadCachedDataSourceAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void makeLoadCompletionStage(ExecutorService executorService, CompletableFuture<IFullDataSource> completer)
|
||||
{
|
||||
makeUpdateCompletionStage(completer, CompletableFuture.supplyAsync(() -> {
|
||||
this.makeUpdateCompletionStage(completer, CompletableFuture.supplyAsync(() ->
|
||||
{
|
||||
// Load the file.
|
||||
IFullDataSource fullDataSource;
|
||||
try (FileInputStream fileInputStream = this.getFileInputStream();
|
||||
@@ -258,48 +257,62 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
|
||||
|
||||
|
||||
// Cause: Generic Type runtime casting cannot safety check it.
|
||||
// However, the Union type ensures the 'data' should only contain the listed type.
|
||||
public CompletableFuture<IFullDataSource> loadOrGetCachedDataSourceAsync()
|
||||
public CompletableFuture<IFullDataSource> getOrLoadCachedDataSourceAsync()
|
||||
{
|
||||
debugPhantomLifeCycleCheck();
|
||||
|
||||
CacheQueryResult result = this.getCachedDataSourceAsync();
|
||||
|
||||
if (result.needsLoad)
|
||||
CompletableFuture<IFullDataSource> dataSourceLoadFuture = this.getCachedDataSourceAsync();
|
||||
if (dataSourceLoadFuture != null)
|
||||
{
|
||||
LodUtil.assertTrue(!this.inCrit);
|
||||
this.inCrit = true;
|
||||
// return the in-process future
|
||||
return dataSourceLoadFuture;
|
||||
}
|
||||
else
|
||||
{
|
||||
// there is no cached data, we'll have to load it
|
||||
|
||||
CompletableFuture<IFullDataSource> future = result.future;
|
||||
// don't continue if the provider has been shut down
|
||||
ExecutorService executorService = this.fullDataSourceProvider.getIOExecutor();
|
||||
if (executorService.isTerminated())
|
||||
dataSourceLoadFuture = new CompletableFuture<>();
|
||||
if (!this.dataSourceLoadFutureRef.compareAndSet(null, dataSourceLoadFuture))
|
||||
{
|
||||
this.inCrit = false;
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
future.complete(null);
|
||||
return future;
|
||||
}
|
||||
|
||||
// create a new Meta file
|
||||
if (!this.doesFileExist)
|
||||
{
|
||||
this.makeCreateCompletionStage(future);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Otherwise, load and update file
|
||||
if (this.baseMetaData == null)
|
||||
{
|
||||
throw new IllegalStateException("Meta data not loaded!");
|
||||
}
|
||||
|
||||
this.makeLoadCompletionStage(executorService, future);
|
||||
// two threads attempted to start this job at the same time, only use the first future
|
||||
dataSourceLoadFuture = this.dataSourceLoadFutureRef.get();
|
||||
}
|
||||
}
|
||||
|
||||
return result.future;
|
||||
|
||||
|
||||
if (!this.doesFileExist)
|
||||
{
|
||||
// create a new Meta file and data source
|
||||
|
||||
this.makeCreateCompletionStage(dataSourceLoadFuture);
|
||||
}
|
||||
else
|
||||
{
|
||||
// load the existing Meta file and data source
|
||||
|
||||
if (this.baseMetaData == null)
|
||||
{
|
||||
throw new IllegalStateException("Meta data not loaded!");
|
||||
}
|
||||
|
||||
|
||||
ExecutorService executorService = this.fullDataSourceProvider.getIOExecutor();
|
||||
if (!executorService.isTerminated())
|
||||
{
|
||||
// load the data source
|
||||
this.makeLoadCompletionStage(executorService, dataSourceLoadFuture);
|
||||
}
|
||||
else
|
||||
{
|
||||
// don't load anything if the provider has been shut down
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
dataSourceLoadFuture.complete(null);
|
||||
return dataSourceLoadFuture;
|
||||
}
|
||||
}
|
||||
|
||||
return dataSourceLoadFuture;
|
||||
}
|
||||
|
||||
|
||||
@@ -310,19 +323,14 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
data.getDataDetailLevel(), data.getWorldGenStep(), (loader == null ? 0 : loader.datatypeId), data.getBinaryDataFormatVersion(), Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return one of the following:
|
||||
* the cached {@link IFullDataSource},
|
||||
* a future that will complete once the {@link FullDataMetaFile#writeQueueRef} has been written,
|
||||
* or null if nothing has been cached and nothing is being loaded
|
||||
*/
|
||||
private CacheQueryResult getCachedDataSourceAsync()
|
||||
/** @return returns null if {@link FullDataMetaFile#cachedFullDataSourceRef} is empty and no cached {@link IFullDataSource} exists. */
|
||||
private CompletableFuture<IFullDataSource> getCachedDataSourceAsync()
|
||||
{
|
||||
// this data source is being written to, use the existing future
|
||||
CompletableFuture<IFullDataSource> dataSourceLoadFuture = this.dataSourceLoadFutureRef.get();
|
||||
if (dataSourceLoadFuture != null)
|
||||
{
|
||||
return new CacheQueryResult(dataSourceLoadFuture, false);
|
||||
return dataSourceLoadFuture;
|
||||
}
|
||||
|
||||
|
||||
@@ -330,60 +338,48 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
IFullDataSource cachedFullDataSource = this.cachedFullDataSourceRef.get();
|
||||
if (cachedFullDataSource == null)
|
||||
{
|
||||
// Make a new future, and CAS it into the dataSourceLoadFutureRef, or return the existing future
|
||||
CompletableFuture<IFullDataSource> newFuture = new CompletableFuture<>();
|
||||
CompletableFuture<IFullDataSource> cas = AtomicsUtil.compareAndExchange(this.dataSourceLoadFutureRef, null, newFuture);
|
||||
if (cas == null)
|
||||
{
|
||||
return new CacheQueryResult(newFuture, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
return new CacheQueryResult(cas, false);
|
||||
}
|
||||
// no cached data exists and no one is trying to load it
|
||||
return null;
|
||||
}
|
||||
else
|
||||
{
|
||||
// The file is cached in RAM
|
||||
boolean needUpdate = !this.writeQueueRef.get().queue.isEmpty() || this.markedNeedUpdate;
|
||||
// cached data exists
|
||||
|
||||
if (!needUpdate)
|
||||
boolean dataNeedsUpdating = !this.writeQueueRef.get().queue.isEmpty() || this.markedNeedUpdate;
|
||||
if (!dataNeedsUpdating)
|
||||
{
|
||||
// return the cached data
|
||||
return new CacheQueryResult(CompletableFuture.completedFuture(cachedFullDataSource), false);
|
||||
return CompletableFuture.completedFuture(cachedFullDataSource);
|
||||
}
|
||||
else
|
||||
{
|
||||
// either write the queue or return the future that is waiting for the queue write
|
||||
// update the data using the write queue, wait for the update to finish, then return the data source
|
||||
|
||||
// Do a CAS on inCacheWriteLock to ensure that we are the only thread that is writing to the cache,
|
||||
// or if we fail, then that means someone else is already doing it, and we can just return the future
|
||||
CompletableFuture<IFullDataSource> future = new CompletableFuture<>();
|
||||
CompletableFuture<IFullDataSource> compareAndSwapFuture = AtomicsUtil.compareAndExchange(this.dataSourceLoadFutureRef, null, future);
|
||||
if (compareAndSwapFuture != null)
|
||||
// Create a new future if one doesn't already exist
|
||||
CompletableFuture<IFullDataSource> newFuture = new CompletableFuture<>();
|
||||
CompletableFuture<IFullDataSource> oldFuture = AtomicsUtil.compareAndExchange(this.dataSourceLoadFutureRef, null, newFuture);
|
||||
|
||||
if (oldFuture != null)
|
||||
{
|
||||
// a write is already in progress, return its future.
|
||||
return new CacheQueryResult(compareAndSwapFuture, false);
|
||||
// An update is already in progress, return its future.
|
||||
return oldFuture;
|
||||
}
|
||||
else
|
||||
{
|
||||
LodUtil.assertTrue(!this.inCrit);
|
||||
this.inCrit = true;
|
||||
// don't continue if the provider has been shut down
|
||||
ExecutorService executorService = this.fullDataSourceProvider.getIOExecutor();
|
||||
if (executorService.isTerminated())
|
||||
if (!executorService.isTerminated())
|
||||
{
|
||||
this.inCrit = false;
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
future.complete(null);
|
||||
// write for the update to finish before returning the data source
|
||||
this.makeUpdateCompletionStage(newFuture, CompletableFuture.supplyAsync(() -> cachedFullDataSource, executorService));
|
||||
}
|
||||
else
|
||||
{
|
||||
// write the queue to the data source by triggering an update
|
||||
this.makeUpdateCompletionStage(future, CompletableFuture.supplyAsync(() -> cachedFullDataSource, executorService));
|
||||
// don't update anything if the provider has been shut down
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
newFuture.complete(null);
|
||||
}
|
||||
|
||||
return new CacheQueryResult(future, false);
|
||||
return newFuture;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -436,7 +432,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
if (!isEmpty)
|
||||
{
|
||||
// This will flush the data to disk.
|
||||
return this.loadOrGetCachedDataSourceAsync().thenApply((fullDataSource) -> null /* ignore the result, just wait for the load to finish*/ );
|
||||
return this.getOrLoadCachedDataSourceAsync().thenApply((fullDataSource) -> null /* ignore the result, just wait for the load to finish*/ );
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -634,18 +630,6 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
// helper classes //
|
||||
//================//
|
||||
|
||||
private static final class CacheQueryResult
|
||||
{
|
||||
public final CompletableFuture<IFullDataSource> future;
|
||||
public final boolean needsLoad;
|
||||
public CacheQueryResult(CompletableFuture<IFullDataSource> future, boolean needsLoad)
|
||||
{
|
||||
this.future = future;
|
||||
this.needsLoad = needsLoad;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//TODO: use ConcurrentAppendSingleSwapContainer<LodDataSource> instead of below:
|
||||
private static class GuardedMultiAppendQueue
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user