Overhaul FullDataMetaFile async completion stages and write queue application
This commit is contained in:
+1
-22
@@ -410,7 +410,7 @@ public class FullDataFileHandler implements IFullDataSourceProvider
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<IFullDataSource> onCreateDataFile(FullDataMetaFile file)
|
||||
public CompletableFuture<IFullDataSource> onDataFileCreatedAsync(FullDataMetaFile file)
|
||||
{
|
||||
DhSectionPos pos = file.pos;
|
||||
IIncompleteFullDataSource source = this.makeEmptyDataSource(pos);
|
||||
@@ -445,27 +445,6 @@ public class FullDataFileHandler implements IFullDataSourceProvider
|
||||
return this.getLoadOrMakeFile(pos, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<IFullDataSource> onDataFileUpdate(
|
||||
IFullDataSource source, FullDataMetaFile file,
|
||||
Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater)
|
||||
{
|
||||
boolean changed = updater.apply(source);
|
||||
|
||||
if (source instanceof IIncompleteFullDataSource)
|
||||
{
|
||||
IFullDataSource newSource = ((IIncompleteFullDataSource) source).tryPromotingToCompleteDataSource();
|
||||
changed |= newSource != source;
|
||||
source = newSource;
|
||||
}
|
||||
|
||||
if (changed)
|
||||
{
|
||||
onUpdated.accept(source);
|
||||
}
|
||||
return CompletableFuture.completedFuture(source);
|
||||
}
|
||||
|
||||
|
||||
|
||||
//==========================//
|
||||
|
||||
+220
-173
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IIncompleteFullDataSource;
|
||||
import com.seibel.distanthorizons.core.file.metaData.AbstractMetaDataContainerFile;
|
||||
import com.seibel.distanthorizons.core.file.metaData.BaseMetaData;
|
||||
import com.seibel.distanthorizons.core.level.IDhLevel;
|
||||
@@ -71,7 +72,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
public Class<? extends IFullDataSource> fullDataSourceClass;
|
||||
|
||||
|
||||
private volatile boolean markedNeedUpdate = false;
|
||||
private volatile boolean needsUpdate = false;
|
||||
|
||||
private final IDhLevel level;
|
||||
private final IFullDataSourceProvider fullDataSourceProvider;
|
||||
@@ -157,108 +158,54 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
return this.cachedFullDataSourceRef.get();
|
||||
}
|
||||
|
||||
private void makeUpdateCompletionStage(CompletableFuture<IFullDataSource> completer, CompletableFuture<IFullDataSource> currentStage)
|
||||
{
|
||||
currentStage.thenCompose((fullDataSource) ->
|
||||
{
|
||||
this.markedNeedUpdate = false;
|
||||
return this.fullDataSourceProvider.onDataFileUpdate(fullDataSource, this, this::_updateAndWriteDataSource, this::_applyWriteQueueToFullDataSource);
|
||||
})
|
||||
.whenComplete((fullDataSource, ex) ->
|
||||
{
|
||||
if (ex != null && !LodUtil.isInterruptOrReject(ex))
|
||||
{
|
||||
LOGGER.error("Error updating file [" + this.file + "]: ", ex);
|
||||
}
|
||||
|
||||
if (fullDataSource != null)
|
||||
{
|
||||
new DataObjTracker(fullDataSource);
|
||||
new DataObjSoftTracker(this, fullDataSource);
|
||||
}
|
||||
|
||||
//LOGGER.info("Updated file "+this.file);
|
||||
if (this.pos.sectionDetailLevel == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL)
|
||||
DebugRenderer.makeParticle(
|
||||
new DebugRenderer.BoxParticle(
|
||||
new DebugRenderer.Box(this.pos, 64f, 72f, 0.03f, Color.green.darker()),
|
||||
0.2, 32f
|
||||
)
|
||||
);
|
||||
|
||||
this.cachedFullDataSourceRef = new SoftReference<>(fullDataSource);
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
completer.complete(fullDataSource);
|
||||
|
||||
if (this.markedNeedUpdate)
|
||||
{
|
||||
// trigger another update
|
||||
this.getOrLoadCachedDataSourceAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void makeLoadCompletionStage(ExecutorService executorService, CompletableFuture<IFullDataSource> completer)
|
||||
{
|
||||
this.makeUpdateCompletionStage(completer, CompletableFuture.supplyAsync(() ->
|
||||
{
|
||||
// Load the file.
|
||||
IFullDataSource fullDataSource;
|
||||
try (FileInputStream fileInputStream = this.getFileInputStream();
|
||||
DhDataInputStream compressedStream = new DhDataInputStream(fileInputStream))
|
||||
{
|
||||
fullDataSource = this.fullDataSourceLoader.loadData(this, compressedStream, this.level);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// can happen if there is a missing file or the file was incorrectly formatted, or terminated early
|
||||
throw new CompletionException(ex);
|
||||
}
|
||||
return fullDataSource;
|
||||
}, executorService));
|
||||
}
|
||||
|
||||
private void makeCreateCompletionStage(CompletableFuture<IFullDataSource> completer)
|
||||
{
|
||||
this.makeUpdateCompletionStage(completer, this.fullDataSourceProvider.onCreateDataFile(this)
|
||||
.thenApply((fullDataSource) ->
|
||||
{
|
||||
this.baseMetaData = this._makeBaseMetaData(fullDataSource);
|
||||
return fullDataSource;
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
|
||||
public CompletableFuture<IFullDataSource> getOrLoadCachedDataSourceAsync()
|
||||
{
|
||||
checkAndLogPhantomDataSourceLifeCycles();
|
||||
|
||||
CompletableFuture<IFullDataSource> dataSourceLoadFuture = this.getCachedDataSourceAsync();
|
||||
if (dataSourceLoadFuture != null)
|
||||
CompletableFuture<IFullDataSource> potentialLoadFuture = this.getCachedDataSourceAsync();
|
||||
if (potentialLoadFuture != null)
|
||||
{
|
||||
// return the in-process future
|
||||
return dataSourceLoadFuture;
|
||||
return potentialLoadFuture;
|
||||
}
|
||||
else
|
||||
{
|
||||
// there is no cached data, we'll have to load it
|
||||
|
||||
dataSourceLoadFuture = new CompletableFuture<>();
|
||||
if (!this.dataSourceLoadFutureRef.compareAndSet(null, dataSourceLoadFuture))
|
||||
potentialLoadFuture = new CompletableFuture<>();
|
||||
if (!this.dataSourceLoadFutureRef.compareAndSet(null, potentialLoadFuture))
|
||||
{
|
||||
// two threads attempted to start this job at the same time, only use the first future
|
||||
dataSourceLoadFuture = this.dataSourceLoadFutureRef.get();
|
||||
potentialLoadFuture = this.dataSourceLoadFutureRef.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
CompletableFuture<IFullDataSource> dataSourceLoadFuture = potentialLoadFuture;
|
||||
if (!this.doesFileExist)
|
||||
{
|
||||
// create a new Meta file and data source
|
||||
|
||||
this.makeCreateCompletionStage(dataSourceLoadFuture);
|
||||
this.fullDataSourceProvider.onDataFileCreatedAsync(this)
|
||||
.thenApply((fullDataSource) ->
|
||||
{
|
||||
AbstractFullDataSourceLoader dataSourceLoader = AbstractFullDataSourceLoader.getLoader(fullDataSource.getClass(), fullDataSource.getBinaryDataFormatVersion());
|
||||
|
||||
this.baseMetaData = new BaseMetaData(
|
||||
fullDataSource.getSectionPos(), -1,
|
||||
fullDataSource.getDataDetailLevel(), fullDataSource.getWorldGenStep(),
|
||||
(dataSourceLoader == null ? 0 : dataSourceLoader.datatypeId), fullDataSource.getBinaryDataFormatVersion(), Long.MAX_VALUE);
|
||||
|
||||
return fullDataSource;
|
||||
})
|
||||
.thenCompose((fullDataSource) -> this.applyWriteQueueAndSaveAsync(fullDataSource))
|
||||
.thenAccept((fullDataSource) ->
|
||||
{
|
||||
dataSourceLoadFuture.complete(fullDataSource);
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -274,13 +221,35 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
if (!executorService.isTerminated())
|
||||
{
|
||||
// load the data source
|
||||
this.makeLoadCompletionStage(executorService, dataSourceLoadFuture);
|
||||
|
||||
CompletableFuture.supplyAsync(() ->
|
||||
{
|
||||
// Load the file.
|
||||
IFullDataSource fullDataSource;
|
||||
try (FileInputStream fileInputStream = this.getFileInputStream();
|
||||
DhDataInputStream compressedStream = new DhDataInputStream(fileInputStream))
|
||||
{
|
||||
fullDataSource = this.fullDataSourceLoader.loadData(this, compressedStream, this.level);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// can happen if there is a missing file or the file was incorrectly formatted, or terminated early
|
||||
throw new CompletionException(ex);
|
||||
}
|
||||
return fullDataSource;
|
||||
}, executorService)
|
||||
.thenCompose((fullDataSource) -> this.applyWriteQueueAndSaveAsync(fullDataSource))
|
||||
.thenAccept((fullDataSource) ->
|
||||
{
|
||||
dataSourceLoadFuture.complete(fullDataSource);
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
// don't load anything if the provider has been shut down
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
// don't load anything if the provider has been shut down
|
||||
dataSourceLoadFuture.complete(null);
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
return dataSourceLoadFuture;
|
||||
}
|
||||
}
|
||||
@@ -289,12 +258,6 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
}
|
||||
|
||||
|
||||
private BaseMetaData _makeBaseMetaData(IFullDataSource data)
|
||||
{
|
||||
AbstractFullDataSourceLoader loader = AbstractFullDataSourceLoader.getLoader(data.getClass(), data.getBinaryDataFormatVersion());
|
||||
return new BaseMetaData(data.getSectionPos(), -1,
|
||||
data.getDataDetailLevel(), data.getWorldGenStep(), (loader == null ? 0 : loader.datatypeId), data.getBinaryDataFormatVersion(), Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/** @return returns null if {@link FullDataMetaFile#cachedFullDataSourceRef} is empty and no cached {@link IFullDataSource} exists. */
|
||||
private CompletableFuture<IFullDataSource> getCachedDataSourceAsync()
|
||||
@@ -318,7 +281,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
{
|
||||
// cached data exists
|
||||
|
||||
boolean dataNeedsUpdating = !this.writeQueueRef.get().queue.isEmpty() || this.markedNeedUpdate;
|
||||
boolean dataNeedsUpdating = !this.writeQueueRef.get().queue.isEmpty() || this.needsUpdate;
|
||||
if (!dataNeedsUpdating)
|
||||
{
|
||||
// return the cached data
|
||||
@@ -342,8 +305,15 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
ExecutorService executorService = this.fullDataSourceProvider.getIOExecutor();
|
||||
if (!executorService.isTerminated())
|
||||
{
|
||||
// write for the update to finish before returning the data source
|
||||
this.makeUpdateCompletionStage(newFuture, CompletableFuture.supplyAsync(() -> cachedFullDataSource, executorService));
|
||||
// wait for the update to finish before returning the data source
|
||||
|
||||
CompletableFuture.supplyAsync(() -> cachedFullDataSource, executorService)
|
||||
.thenCompose((fullDataSource) -> this.applyWriteQueueAndSaveAsync(fullDataSource))
|
||||
.thenAccept((fullDataSource) ->
|
||||
{
|
||||
newFuture.complete(fullDataSource);
|
||||
this.dataSourceLoadFutureRef.set(null);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -401,7 +371,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
public CompletableFuture<Void> flushAndSaveAsync()
|
||||
{
|
||||
checkAndLogPhantomDataSourceLifeCycles();
|
||||
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty() && !markedNeedUpdate;
|
||||
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty() && !needsUpdate;
|
||||
if (!isEmpty)
|
||||
{
|
||||
// This will flush the data to disk.
|
||||
@@ -414,86 +384,11 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
}
|
||||
|
||||
|
||||
/** updates this object to match the given {@link IFullDataSource} and then writes the new data to file. */
|
||||
private void _updateAndWriteDataSource(IFullDataSource fullDataSource)
|
||||
{
|
||||
if (fullDataSource.isEmpty())
|
||||
{
|
||||
// delete the empty data source
|
||||
if (this.file.exists() && !this.file.delete())
|
||||
{
|
||||
LOGGER.warn("Failed to delete data file at " + this.file);
|
||||
}
|
||||
this.doesFileExist = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
// update the data source and write the new data to file
|
||||
|
||||
//LOGGER.info("Saving data file of {}", data.getSectionPos());
|
||||
try
|
||||
{
|
||||
// Write/Update data
|
||||
LodUtil.assertTrue(this.baseMetaData != null);
|
||||
|
||||
this.baseMetaData.dataLevel = fullDataSource.getDataDetailLevel();
|
||||
this.fullDataSourceLoader = AbstractFullDataSourceLoader.getLoader(fullDataSource.getClass(), fullDataSource.getBinaryDataFormatVersion());
|
||||
LodUtil.assertTrue(this.fullDataSourceLoader != null, "No loader for " + fullDataSource.getClass() + " (v" + fullDataSource.getBinaryDataFormatVersion() + ")");
|
||||
|
||||
this.fullDataSourceClass = fullDataSource.getClass();
|
||||
this.baseMetaData.dataTypeId = (this.fullDataSourceLoader == null) ? 0 : this.fullDataSourceLoader.datatypeId;
|
||||
this.baseMetaData.binaryDataFormatVersion = fullDataSource.getBinaryDataFormatVersion();
|
||||
|
||||
super.writeData((bufferedOutputStream) -> fullDataSource.writeToStream((bufferedOutputStream), this.level));
|
||||
this.doesFileExist = true;
|
||||
}
|
||||
catch (ClosedByInterruptException e) // thrown by buffers that are interrupted
|
||||
{
|
||||
// expected if the file handler is shut down, the exception can be ignored
|
||||
// LOGGER.warn("FullData file writing interrupted.", e);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOGGER.error("Failed to save updated data file at " + this.file + " for section " + this.pos, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** @return true if the queue was not empty and data was applied to the {@link IFullDataSource}. */
|
||||
private boolean _applyWriteQueueToFullDataSource(IFullDataSource fullDataSource)
|
||||
{
|
||||
// Poll the write queue
|
||||
// First check if write queue is empty, then swap the write queue.
|
||||
// Must be done in this order to ensure isMemoryAddressValid work properly. See isMemoryAddressValid() for details.
|
||||
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty();
|
||||
if (!isEmpty)
|
||||
{
|
||||
this._swapWriteQueue();
|
||||
for (ChunkSizedFullDataAccessor chunk : this.backWriteQueue.queue)
|
||||
{
|
||||
fullDataSource.update(chunk);
|
||||
}
|
||||
this.backWriteQueue.queue.clear();
|
||||
//LOGGER.info("Updated Data file at {} for sect {} with {} chunk writes.", path, pos, count);
|
||||
}
|
||||
return !isEmpty || !doesFileExist;
|
||||
}
|
||||
private void _swapWriteQueue()
|
||||
{
|
||||
GuardedMultiAppendQueue writeQueue = this.writeQueueRef.getAndSet(this.backWriteQueue);
|
||||
// Acquire write lock and then release it again as we only need to ensure that the queue
|
||||
// is not being appended to by another thread. Note that the above atomic swap &
|
||||
// the guarantee that all append first acquire the appendLock means after the locK() call,
|
||||
// there will be no other threads able to or is currently appending to the queue.
|
||||
// Note: The above needs the getAndSet() to have at least Release Memory order.
|
||||
// (not that java supports anything non volatile for getAndSet()...)
|
||||
writeQueue.appendLock.writeLock().lock();
|
||||
writeQueue.appendLock.writeLock().unlock();
|
||||
this.backWriteQueue = writeQueue;
|
||||
}
|
||||
|
||||
|
||||
public void markNeedUpdate() { this.markedNeedUpdate = true; }
|
||||
|
||||
|
||||
public void markNeedsUpdate() { this.needsUpdate = true; }
|
||||
|
||||
|
||||
|
||||
@@ -540,7 +435,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
}
|
||||
|
||||
IFullDataSource cached = this.cachedFullDataSourceRef.get();
|
||||
if (this.markedNeedUpdate)
|
||||
if (this.needsUpdate)
|
||||
{
|
||||
debugRenderer.renderBox(new DebugRenderer.Box(this.pos, 80f, 96f, 0.05f, Color.red));
|
||||
}
|
||||
@@ -567,7 +462,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
color = Color.RED;
|
||||
}
|
||||
|
||||
boolean needsUpdate = !this.writeQueueRef.get().queue.isEmpty() || this.markedNeedUpdate;
|
||||
boolean needsUpdate = !this.writeQueueRef.get().queue.isEmpty() || this.needsUpdate;
|
||||
if (needsUpdate)
|
||||
{
|
||||
color = color.darker().darker();
|
||||
@@ -606,6 +501,158 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
|
||||
return fileInputStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the {@link FullDataMetaFile#writeQueueRef} to the current {@link IFullDataSource}
|
||||
* and stores the result in {@link FullDataMetaFile#cachedFullDataSourceRef}.
|
||||
*/
|
||||
@SuppressWarnings("resource") // due to DataObjTracker and DataObjSoftTracker being created outside a try-catch block
|
||||
private CompletableFuture<IFullDataSource> applyWriteQueueAndSaveAsync(IFullDataSource fullDataSourceToUpdate)
|
||||
{
|
||||
CompletableFuture<IFullDataSource> completionFuture = new CompletableFuture<>();
|
||||
|
||||
|
||||
boolean dataChanged = this.applyWriteQueueToFullDataSource(fullDataSourceToUpdate);
|
||||
this.needsUpdate = false;
|
||||
|
||||
// attempt to promote the data source
|
||||
if (fullDataSourceToUpdate instanceof IIncompleteFullDataSource)
|
||||
{
|
||||
IFullDataSource newSource = ((IIncompleteFullDataSource) fullDataSourceToUpdate).tryPromotingToCompleteDataSource();
|
||||
dataChanged |= (newSource != fullDataSourceToUpdate);
|
||||
fullDataSourceToUpdate = newSource;
|
||||
}
|
||||
|
||||
// the provider may need to modify other files based on this data source changing
|
||||
this.fullDataSourceProvider.onDataFileUpdateAsync(fullDataSourceToUpdate, this, dataChanged)
|
||||
.whenComplete((dataFileUpdateResult, ex) ->
|
||||
{
|
||||
if (ex != null && !LodUtil.isInterruptOrReject(ex))
|
||||
{
|
||||
LOGGER.error("Error updating file [" + this.file + "]: ", ex);
|
||||
}
|
||||
|
||||
IFullDataSource fullDataSource = dataFileUpdateResult.fullDataSource;
|
||||
boolean dataSourceChanged = dataFileUpdateResult.dataSourceChanged;
|
||||
|
||||
|
||||
// only save to file if something was changed
|
||||
if (dataSourceChanged)
|
||||
{
|
||||
this.writeDataSource(fullDataSource);
|
||||
}
|
||||
|
||||
// keep track of non-null data sources
|
||||
if (fullDataSource != null)
|
||||
{
|
||||
new DataObjTracker(fullDataSource);
|
||||
new DataObjSoftTracker(this, fullDataSource);
|
||||
}
|
||||
|
||||
if (this.pos.sectionDetailLevel == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL)
|
||||
{
|
||||
DebugRenderer.makeParticle(new DebugRenderer.BoxParticle(
|
||||
new DebugRenderer.Box(this.pos, 64f, 72f, 0.03f, Color.green.darker()),
|
||||
0.2, 32f));
|
||||
}
|
||||
|
||||
|
||||
// save the updated data source
|
||||
this.cachedFullDataSourceRef = new SoftReference<>(fullDataSource);
|
||||
|
||||
// the task is complete
|
||||
completionFuture.complete(fullDataSource);
|
||||
|
||||
|
||||
if (this.needsUpdate)
|
||||
{
|
||||
// another update was requested while this update was being processed
|
||||
this.getOrLoadCachedDataSourceAsync();
|
||||
}
|
||||
});
|
||||
|
||||
return completionFuture;
|
||||
}
|
||||
|
||||
/** @return true if the queue was not empty and chunk data was applied to this meta file's {@link IFullDataSource}. */
|
||||
private boolean applyWriteQueueToFullDataSource(IFullDataSource fullDataSource)
|
||||
{
|
||||
// swap the write queue if it has queued chunks.
|
||||
// Must be done in this order to ensure IWorldGenTaskTracker.isMemoryAddressValid() work properly. See IWorldGenTaskTracker.isMemoryAddressValid() for details.
|
||||
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty();
|
||||
if (!isEmpty)
|
||||
{
|
||||
this.swapWriteQueues();
|
||||
for (ChunkSizedFullDataAccessor chunk : this.backWriteQueue.queue)
|
||||
{
|
||||
fullDataSource.update(chunk);
|
||||
}
|
||||
|
||||
this.backWriteQueue.queue.clear();
|
||||
//LOGGER.info("Updated Data file at {} for sect {} with {} chunk writes.", path, pos, count);
|
||||
}
|
||||
|
||||
return !isEmpty || !this.doesFileExist;
|
||||
}
|
||||
private void swapWriteQueues()
|
||||
{
|
||||
GuardedMultiAppendQueue writeQueue = this.writeQueueRef.getAndSet(this.backWriteQueue);
|
||||
|
||||
// Acquire write lock and then release it again as we only need to ensure that the queue
|
||||
// is not being appended to by another thread. Note that the above atomic swap &
|
||||
// the guarantee that all append first acquire the appendLock means after the locK() call,
|
||||
// there will be no other threads able to or is currently appending to the queue.
|
||||
// Note: The above needs the getAndSet() to have at least Release Memory order.
|
||||
// (not that java supports anything non volatile for getAndSet()...)
|
||||
writeQueue.appendLock.writeLock().lock();
|
||||
writeQueue.appendLock.writeLock().unlock();
|
||||
|
||||
this.backWriteQueue = writeQueue;
|
||||
}
|
||||
|
||||
private void writeDataSource(IFullDataSource fullDataSource)
|
||||
{
|
||||
if (fullDataSource.isEmpty())
|
||||
{
|
||||
// delete the empty data source
|
||||
if (this.file.exists() && !this.file.delete())
|
||||
{
|
||||
LOGGER.warn("Failed to delete data file at " + this.file);
|
||||
}
|
||||
this.doesFileExist = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
// update the data source and write the new data to file
|
||||
|
||||
//LOGGER.info("Saving data file of {}", data.getSectionPos());
|
||||
try
|
||||
{
|
||||
// Write/Update data
|
||||
LodUtil.assertTrue(this.baseMetaData != null);
|
||||
|
||||
this.baseMetaData.dataLevel = fullDataSource.getDataDetailLevel();
|
||||
this.fullDataSourceLoader = AbstractFullDataSourceLoader.getLoader(fullDataSource.getClass(), fullDataSource.getBinaryDataFormatVersion());
|
||||
LodUtil.assertTrue(this.fullDataSourceLoader != null, "No loader for " + fullDataSource.getClass() + " (v" + fullDataSource.getBinaryDataFormatVersion() + ")");
|
||||
|
||||
this.fullDataSourceClass = fullDataSource.getClass();
|
||||
this.baseMetaData.dataTypeId = (this.fullDataSourceLoader == null) ? 0 : this.fullDataSourceLoader.datatypeId;
|
||||
this.baseMetaData.binaryDataFormatVersion = fullDataSource.getBinaryDataFormatVersion();
|
||||
|
||||
super.writeData((bufferedOutputStream) -> fullDataSource.writeToStream((bufferedOutputStream), this.level));
|
||||
this.doesFileExist = true;
|
||||
}
|
||||
catch (ClosedByInterruptException e) // thrown by buffers that are interrupted
|
||||
{
|
||||
// expected if the file handler is shut down, the exception can be ignored
|
||||
//LOGGER.warn("FullData file writing interrupted.", e);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOGGER.error("Failed to save updated data file at " + this.file + " for section " + this.pos, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
//================//
|
||||
|
||||
+16
-31
@@ -73,7 +73,7 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
|
||||
metaFile.genQueueChecked = false; // unset it so it can be checked again
|
||||
if (data != null)
|
||||
{
|
||||
metaFile.markNeedUpdate();
|
||||
metaFile.markNeedsUpdate();
|
||||
}
|
||||
});
|
||||
flushAndSave(); // Trigger an update to the meta files
|
||||
@@ -170,7 +170,7 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
|
||||
|
||||
// Try update the gen queue on this data source. If null, then nothing was done.
|
||||
@Nullable
|
||||
private CompletableFuture<IFullDataSource> updateFromExistingDataSources(FullDataMetaFile file, IIncompleteFullDataSource data)
|
||||
private CompletableFuture<IFullDataSource> updateFromExistingDataSourcesAsync(FullDataMetaFile file, IIncompleteFullDataSource data)
|
||||
{
|
||||
DhSectionPos pos = file.pos;
|
||||
ArrayList<FullDataMetaFile> existingFiles = new ArrayList<>();
|
||||
@@ -196,58 +196,43 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<IFullDataSource> onCreateDataFile(FullDataMetaFile file)
|
||||
public CompletableFuture<IFullDataSource> onDataFileCreatedAsync(FullDataMetaFile file)
|
||||
{
|
||||
DhSectionPos pos = file.pos;
|
||||
IIncompleteFullDataSource data = makeEmptyDataSource(pos);
|
||||
CompletableFuture<IFullDataSource> future = updateFromExistingDataSources(file, data);
|
||||
IIncompleteFullDataSource data = this.makeEmptyDataSource(pos);
|
||||
CompletableFuture<IFullDataSource> future = this.updateFromExistingDataSourcesAsync(file, data);
|
||||
// Cant start gen task, so return the data
|
||||
return future == null ? CompletableFuture.completedFuture(data) : future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<IFullDataSource> onDataFileUpdate(
|
||||
IFullDataSource source, FullDataMetaFile file,
|
||||
Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater)
|
||||
public CompletableFuture<DataFileUpdateResult> onDataFileUpdateAsync(IFullDataSource fullDataSource, FullDataMetaFile file, boolean dataChanged)
|
||||
{
|
||||
boolean changed = updater.apply(source);
|
||||
LodUtil.assertTrue(file.doesFileExist || changed);
|
||||
LodUtil.assertTrue(file.doesFileExist || dataChanged);
|
||||
|
||||
if (source instanceof IIncompleteFullDataSource)
|
||||
|
||||
if (fullDataSource instanceof CompleteFullDataSource)
|
||||
{
|
||||
IFullDataSource newSource = tryPromoteDataSource((IIncompleteFullDataSource) source);
|
||||
changed |= newSource != source;
|
||||
source = newSource;
|
||||
this.incompleteDataSources.remove(fullDataSource.getSectionPos());
|
||||
}
|
||||
this.fireOnGenPosSuccessListeners(fullDataSource.getSectionPos());
|
||||
|
||||
if (source instanceof CompleteFullDataSource)
|
||||
{
|
||||
this.fireOnGenPosSuccessListeners(source.getSectionPos());
|
||||
}
|
||||
this.fireOnGenPosSuccessListeners(source.getSectionPos());
|
||||
|
||||
if (source instanceof IIncompleteFullDataSource && !file.genQueueChecked)
|
||||
if (fullDataSource instanceof IIncompleteFullDataSource && !file.genQueueChecked)
|
||||
{
|
||||
IWorldGenerationQueue worldGenQueue = this.worldGenQueueRef.get();
|
||||
if (worldGenQueue != null)
|
||||
{
|
||||
CompletableFuture<IFullDataSource> future = this.updateFromExistingDataSources(file, (IIncompleteFullDataSource) source);
|
||||
CompletableFuture<IFullDataSource> future = this.updateFromExistingDataSourcesAsync(file, (IIncompleteFullDataSource) fullDataSource);
|
||||
if (future != null)
|
||||
{
|
||||
return future.thenApply((newSource) ->
|
||||
{
|
||||
onUpdated.accept(newSource);
|
||||
return newSource;
|
||||
});
|
||||
final boolean finalDataChanged = dataChanged;
|
||||
return future.thenApply((newSource) -> new DataFileUpdateResult(newSource, finalDataChanged));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (changed)
|
||||
{
|
||||
onUpdated.accept(source);
|
||||
}
|
||||
return CompletableFuture.completedFuture(source);
|
||||
return CompletableFuture.completedFuture(new DataFileUpdateResult(fullDataSource, dataChanged));
|
||||
}
|
||||
|
||||
private void onWorldGenTaskComplete(WorldGenResult genTaskResult, Throwable exception, GenTask genTask, DhSectionPos pos)
|
||||
|
||||
+25
-4
@@ -28,8 +28,6 @@ import java.io.File;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
public interface IFullDataSourceProvider extends AutoCloseable
|
||||
{
|
||||
@@ -43,11 +41,34 @@ public interface IFullDataSourceProvider extends AutoCloseable
|
||||
//long getCacheVersion(DhSectionPos sectionPos);
|
||||
//boolean isCacheVersionValid(DhSectionPos sectionPos, long cacheVersion);
|
||||
|
||||
CompletableFuture<IFullDataSource> onCreateDataFile(FullDataMetaFile file);
|
||||
CompletableFuture<IFullDataSource> onDataFileUpdate(IFullDataSource source, FullDataMetaFile file, Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater);
|
||||
CompletableFuture<IFullDataSource> onDataFileCreatedAsync(FullDataMetaFile file);
|
||||
default CompletableFuture<DataFileUpdateResult> onDataFileUpdateAsync(IFullDataSource fullDataSource, FullDataMetaFile file, boolean dataChanged) { return CompletableFuture.completedFuture(new DataFileUpdateResult(fullDataSource, dataChanged)); }
|
||||
File computeDataFilePath(DhSectionPos pos);
|
||||
ExecutorService getIOExecutor();
|
||||
|
||||
@Nullable
|
||||
FullDataMetaFile getFileIfExist(DhSectionPos pos);
|
||||
|
||||
|
||||
|
||||
//================//
|
||||
// helper classes //
|
||||
//================//
|
||||
|
||||
/**
|
||||
* After a {@link FullDataMetaFile} has been updated the {@link IFullDataSourceProvider} may also need to modify it. <br>
|
||||
* This specifically happens during world generation.
|
||||
*/
|
||||
class DataFileUpdateResult
|
||||
{
|
||||
IFullDataSource fullDataSource;
|
||||
boolean dataSourceChanged;
|
||||
|
||||
public DataFileUpdateResult(IFullDataSource fullDataSource, boolean dataSourceChanged)
|
||||
{
|
||||
this.fullDataSource = fullDataSource;
|
||||
this.dataSourceChanged = dataSourceChanged;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user