Continue on impl the gen queue re-queueing. (Might have broke buffer updates tho...)

This commit is contained in:
TomTheFurry
2023-06-18 16:54:40 +08:00
parent 808380b461
commit 64b7e2ef33
8 changed files with 319 additions and 478 deletions
@@ -18,8 +18,10 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFull
import com.seibel.distanthorizons.core.util.FileUtil;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import org.apache.logging.log4j.Logger;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.*;
@@ -358,11 +360,49 @@ public class FullDataFileHandler implements IFullDataSourceProvider
// }
// return file.isCacheVersionValid(cacheVersion);
// }
protected IIncompleteFullDataSource makeDataSource(DhSectionPos pos)
{
return pos.sectionDetailLevel <= HighDetailIncompleteFullDataSource.MAX_SECTION_DETAIL ?
HighDetailIncompleteFullDataSource.createEmpty(pos) : LowDetailIncompleteFullDataSource.createEmpty(pos);
}
protected CompletableFuture<IIncompleteFullDataSource> sampleFromFiles(IIncompleteFullDataSource source, ArrayList<FullDataMetaFile> existingFiles)
{
// read in the existing data
final ArrayList<CompletableFuture<Void>> loadDataFutures = new ArrayList<>(existingFiles.size());
for (FullDataMetaFile existingFile : existingFiles)
{
loadDataFutures.add(existingFile.loadOrGetCachedDataSourceAsync()
.exceptionally((ex) -> /*Ignore file read errors*/null)
.thenAccept((fullDataSource) ->
{
if (fullDataSource == null) return;
//this.checkIfSectionNeedsAdditionalGeneration(pos, fullDataSource);
//LOGGER.info("Merging data from {} into {}", data.getSectionPos(), pos);
source.sampleFrom(fullDataSource);
})
);
}
return CompletableFuture.allOf(loadDataFutures.toArray(new CompletableFuture[0])).thenApply(v -> source);
}
protected void makeFiles(ArrayList<DhSectionPos> posList, ArrayList<FullDataMetaFile> output) {
for (DhSectionPos missingPos : posList)
{
FullDataMetaFile newFile = this.getOrMakeFile(missingPos);
if (newFile != null)
{
output.add(newFile);
}
}
}
@Override
public CompletableFuture<IFullDataSource> onCreateDataFile(FullDataMetaFile file)
{
DhSectionPos pos = file.pos;
IIncompleteFullDataSource source = this.makeDataSource(pos);
ArrayList<FullDataMetaFile> existFiles = new ArrayList<>();
ArrayList<DhSectionPos> missing = new ArrayList<>();
this.getDataFilesForPosition(pos, pos, existFiles, missing);
@@ -370,49 +410,20 @@ public class FullDataFileHandler implements IFullDataSourceProvider
if (missing.size() == 1 && existFiles.isEmpty() && missing.get(0).equals(pos))
{
// None exist.
IIncompleteFullDataSource incompleteDataSource = pos.sectionDetailLevel <= HighDetailIncompleteFullDataSource.MAX_SECTION_DETAIL ?
HighDetailIncompleteFullDataSource.createEmpty(pos) : LowDetailIncompleteFullDataSource.createEmpty(pos);
return CompletableFuture.completedFuture(incompleteDataSource);
return CompletableFuture.completedFuture(source);
}
else
{
for (DhSectionPos missingPos : missing)
{
FullDataMetaFile newFile = this.getOrMakeFile(missingPos);
if (newFile != null)
makeFiles(missing, existFiles);
return sampleFromFiles(source, existFiles).thenApply(IIncompleteFullDataSource::tryPromotingToCompleteDataSource)
.exceptionally((e) ->
{
existFiles.add(newFile);
}
}
final ArrayList<CompletableFuture<Void>> futures = new ArrayList<>(existFiles.size());
final IIncompleteFullDataSource incompleteFullDataSource = pos.sectionDetailLevel <= HighDetailIncompleteFullDataSource.MAX_SECTION_DETAIL ?
HighDetailIncompleteFullDataSource.createEmpty(pos) :
LowDetailIncompleteFullDataSource.createEmpty(pos);
for (FullDataMetaFile metaFile : existFiles)
{
futures.add(metaFile.loadOrGetCachedDataSourceAsync()
.thenAccept((data) ->
{
if (data != null)
{
//LOGGER.info("Merging data from {} into {}", data.getSectionPos(), pos);
incompleteFullDataSource.sampleFrom(data);
}
})
.exceptionally((e) ->
{
FullDataMetaFile newMetaFile = this.removeCorruptedFile(pos, metaFile, e);
return null;
})
);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply((v) -> incompleteFullDataSource.tryPromotingToCompleteDataSource());
FullDataMetaFile newMetaFile = this.removeCorruptedFile(pos, file, e);
return null;
});
}
}
private FullDataMetaFile removeCorruptedFile(DhSectionPos pos, FullDataMetaFile metaFile, Throwable exception)
protected FullDataMetaFile removeCorruptedFile(DhSectionPos pos, FullDataMetaFile metaFile, Throwable exception)
{
LOGGER.error("Error reading Data file ["+pos+"]", exception);
@@ -424,8 +435,8 @@ public class FullDataFileHandler implements IFullDataSourceProvider
}
@Override
public CompletableFuture<IFullDataSource> onDataFileLoaded(IFullDataSource source, BaseMetaData metaData,
Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater, boolean justCreated)
public CompletableFuture<IFullDataSource> onDataFileUpdate(IFullDataSource source, FullDataMetaFile file,
Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater)
{
boolean changed = updater.apply(source);
// if (changed)
@@ -446,40 +457,6 @@ public class FullDataFileHandler implements IFullDataSourceProvider
}
return CompletableFuture.completedFuture(source);
}
@Override
public CompletableFuture<IFullDataSource> onDataFileRefresh(IFullDataSource source, BaseMetaData metaData, Function<IFullDataSource, Boolean> updater, Consumer<IFullDataSource> onUpdated)
{
if (fileHandlerThreadPool.isTerminated())
{
return CompletableFuture.completedFuture(source);
}
return CompletableFuture.supplyAsync(() ->
{
IFullDataSource sourceLocal = source;
boolean changed = updater.apply(sourceLocal);
// if (changed)
// {
// metaData.dataVersion.incrementAndGet();
// }
if (sourceLocal instanceof IIncompleteFullDataSource)
{
IFullDataSource newSource = ((IIncompleteFullDataSource) sourceLocal).tryPromotingToCompleteDataSource();
changed |= newSource != sourceLocal;
sourceLocal = newSource;
}
if (changed)
{
onUpdated.accept(sourceLocal);
}
return sourceLocal;
}, fileHandlerThreadPool);
}
@Override
public File computeDataFilePath(DhSectionPos pos) { return new File(this.saveDir, pos.serialize() + ".lod"); }
@@ -538,5 +515,4 @@ public class FullDataFileHandler implements IFullDataSourceProvider
{
FullDataMetaFile.debugPhantomLifeCycleCheck();
}
}
@@ -1,5 +1,6 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
import java.awt.*;
import java.io.*;
import java.lang.ref.*;
import java.nio.channels.ClosedByInterruptException;
@@ -8,7 +9,7 @@ import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.file.metaData.AbstractMetaDataContainerFile;
@@ -21,21 +22,26 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.loader.AbstractFullD
import com.seibel.distanthorizons.core.util.AtomicsUtil;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import org.apache.logging.log4j.Logger;
/**
* Represents a File that contains a {@link IFullDataSource}.
*/
public class FullDataMetaFile extends AbstractMetaDataContainerFile
public class FullDataMetaFile extends AbstractMetaDataContainerFile implements IDebugRenderable
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger(FullDataMetaFile.class.getSimpleName());
private final IDhLevel level;
private final IFullDataSourceProvider fullDataSourceProvider;
private boolean doesFileExist;
public boolean doesFileExist;
//TODO: Atm can't find a better way to store when genQueue is checked.
public boolean genQueueChecked = false;
private boolean markedNeedUpdate = false;
public AbstractFullDataSourceLoader fullDataSourceLoader;
public Class<? extends IFullDataSource> dataType;
@@ -46,7 +52,25 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
* This will make null checks simpler.
*/
private SoftReference<IFullDataSource> cachedFullDataSource = new SoftReference<>(null);
private CompletableFuture<IFullDataSource> dataSourceWriteQueueFuture;
private final AtomicReference<CompletableFuture<IFullDataSource>> dataSourceLoadFutureRef = new AtomicReference<>(null);
@Override
public void debugRender(DebugRenderer r) {
IFullDataSource cached = cachedFullDataSource.get();
Color c = Color.black;
if (cached != null) {
if (cached instanceof CompleteFullDataSource) {
c = Color.GREEN;
} else {
c = Color.YELLOW;
}
} else if (dataSourceLoadFutureRef.get() != null) {
c = Color.BLUE;
} else if (doesFileExist) {
c = Color.RED;
}
//r.renderBox(pos, 50, 200, 0.05f, c);
}
//TODO: use ConcurrentAppendSingleSwapContainer<LodDataSource> instead of below:
private static class GuardedMultiAppendQueue
@@ -60,17 +84,13 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
private final AtomicReference<GuardedMultiAppendQueue> writeQueueRef = new AtomicReference<>(new GuardedMultiAppendQueue());
private GuardedMultiAppendQueue backWriteQueue = new GuardedMultiAppendQueue();
// ===========================
private final AtomicReference<CompletableFuture<IFullDataSource>> inCacheWriteAccessFuture = new AtomicReference<>(null);
// ===Object lifetime stuff===
private static final ReferenceQueue<IFullDataSource> lifeCycleDebugQueue = new ReferenceQueue<>();
private static final Set<DataObjTracker> lifeCycleDebugSet = ConcurrentHashMap.newKeySet();
private static class DataObjTracker extends PhantomReference<IFullDataSource> implements Closeable
{
private final DhSectionPos pos;
DataObjTracker(IFullDataSource data)
{
super(data, lifeCycleDebugQueue);
@@ -104,6 +124,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
this.level = level;
LodUtil.assertTrue(this.baseMetaData == null);
this.doesFileExist = false;
DebugRenderer.register(this);
}
/**
@@ -128,9 +149,10 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
}
this.dataType = this.fullDataSourceLoader.clazz;
DebugRenderer.register(this);
}
public void markNeedUpdate() { this.markedNeedUpdate = true; }
//==========//
// get data //
@@ -144,130 +166,100 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
return this.cachedFullDataSource.get();
}
public CompletableFuture<IFullDataSource> forceReload() {
cachedFullDataSource = new SoftReference<>(null);
return loadOrGetCachedDataSourceAsync();
private void makeUpdateCompletionStage(CompletableFuture<IFullDataSource> completer, CompletableFuture<IFullDataSource> currentStage)
{
markedNeedUpdate = false;
currentStage.thenCompose(
(fullDataSource) -> this.fullDataSourceProvider.onDataFileUpdate(fullDataSource, this, this::_updateAndWriteDataSource, this::_applyWriteQueueToFullDataSource))
.whenComplete((fullDataSource, ex) ->
{
if (ex instanceof CompletionException) {
ex = ex.getCause();
}
if (ex instanceof InterruptedException || ex instanceof RejectedExecutionException)
{
// this exception can be ignored
}
else if (ex != null) {
LOGGER.error("Error loading file "+this.file+": ", ex);
}
if (fullDataSource != null) {
new DataObjTracker(fullDataSource);
}
LOGGER.info("Updated file "+this.file);
this.cachedFullDataSource = new SoftReference<>(fullDataSource);
inCrit = false;
dataSourceLoadFutureRef.set(null);
completer.complete(fullDataSource);
});
}
private void makeLoadCompletionStage(ExecutorService executorService, CompletableFuture<IFullDataSource> completer)
{
makeUpdateCompletionStage(completer, CompletableFuture.supplyAsync(() -> {
// Load the file.
IFullDataSource fullDataSource;
try (FileInputStream fileInputStream = this.getFileInputStream();
DhDataInputStream compressedStream = new DhDataInputStream(fileInputStream))
{
fullDataSource = this.fullDataSourceLoader.loadData(this, compressedStream, this.level);
}
catch (Exception ex)
{
// can happen if there is a missing file or the file was incorrectly formatted, or terminated early
throw new CompletionException(ex);
}
return fullDataSource;
}, executorService));
}
private void makeCreateCompletionStage(ExecutorService executorService, CompletableFuture<IFullDataSource> completer)
{
makeUpdateCompletionStage(completer, this.fullDataSourceProvider.onCreateDataFile(this)
.thenApply((fullDataSource) ->
{
this.baseMetaData = this._makeBaseMetaData(fullDataSource);
return fullDataSource;
}));
}
private volatile boolean inCrit = false;
// Cause: Generic Type runtime casting cannot safety check it.
// However, the Union type ensures the 'data' should only contain the listed type.
public CompletableFuture<IFullDataSource> loadOrGetCachedDataSourceAsync()
{
debugPhantomLifeCycleCheck();
CompletableFuture<IFullDataSource> getCachedFuture = this.getCachedDataSourceAsync();
if (getCachedFuture != null)
{
return getCachedFuture;
}
CompletableFuture<IFullDataSource> future = new CompletableFuture<>();
if (!this.doesFileExist)
{
// create a new Meta file
this.fullDataSourceProvider.onCreateDataFile(this)
.thenApply((fullDataSource) ->
{
this.baseMetaData = this._makeBaseMetaData(fullDataSource);
return fullDataSource;
})
.thenCompose((fullDataSource) -> this.fullDataSourceProvider.onDataFileLoaded(fullDataSource, this.baseMetaData, this::_updateAndWriteDataSource, this::_applyWriteQueueToFullDataSource, true))
.whenComplete((fullDataSource, exception) ->
{
if (exception != null)
{
LOGGER.error("Uncaught error on creation "+this.file+": ", exception);
future.complete(null);
this.cachedFullDataSource = new SoftReference<>(null);
}
else
{
future.complete(fullDataSource);
new DataObjTracker(fullDataSource);
this.cachedFullDataSource = new SoftReference<>(fullDataSource);
}
});
}
else
{
// read in the existing meta file's data
if (this.baseMetaData == null)
{
throw new IllegalStateException("Meta data not loaded!");
}
CacheQueryResult result = this.getCachedDataSourceAsync();
if (result.needsLoad) {
LodUtil.assertTrue(!inCrit);
inCrit = true;
CompletableFuture<IFullDataSource> future = result.future;
// don't continue if the provider has been shut down
ExecutorService executorService = this.fullDataSourceProvider.getIOExecutor();
ExecutorService executorService = this.fullDataSourceProvider.getIOExecutor();
if (executorService.isTerminated())
{
inCrit = false;
dataSourceLoadFutureRef.set(null);
future.complete(null);
return future;
}
CompletableFuture.supplyAsync(() ->
{
// Load the file.
IFullDataSource fullDataSource;
try (FileInputStream fileInputStream = this.getFileInputStream();
DhDataInputStream compressedStream = new DhDataInputStream(fileInputStream))
{
fullDataSource = this.fullDataSourceLoader.loadData(this, compressedStream, this.level);
}
catch (Exception ex)
{
if (ex instanceof InterruptedException)
{
//LOGGER.warn(FullDataMetaFile.class.getSimpleName()+" loadOrGetCachedAsync interrupted.");
return null;
}
// can happen if there is a missing file or the file was incorrectly formatted
throw new CompletionException(ex);
}
// confirm that this thread is in control
LodUtil.assertTrue(this.inCacheWriteAccessFuture.get() == null,
"No one should be writing to the cache while we are in the process of " +
"loading one into the cache! Is this a deadlock?");
return fullDataSource;
}, executorService)
.thenCompose((fullDataSource) -> this.fullDataSourceProvider.onDataFileLoaded(fullDataSource, this.baseMetaData, this::_updateAndWriteDataSource, this::_applyWriteQueueToFullDataSource, false))
.exceptionally((ex) ->
{
if (ex instanceof InterruptedException)
{
// this exception can be ignored
//LOGGER.warn(FullDataMetaFile.class.getSimpleName()+" loadOrGetCachedAsync interrupted.");
return null;
}
else if (ex instanceof RejectedExecutionException)
{
// this exception can be ignored
//LOGGER.warn(FullDataMetaFile.class.getSimpleName()+" loadOrGetCachedAsync attempted to use a closed thread pool.");
return null;
}
LOGGER.error("Error loading file "+this.file+": ", ex);
this.cachedFullDataSource = new SoftReference<>(null);
future.completeExceptionally(ex);
return null; // the return value here doesn't matter
})
.whenComplete((fullDataSource, e) ->
{
future.complete(fullDataSource);
new DataObjTracker(fullDataSource);
this.cachedFullDataSource = new SoftReference<>(fullDataSource);
});
// create a new Meta file
if (!doesFileExist) {
makeCreateCompletionStage(executorService, future);
}
// Otherwise, load and update file
else {
if (this.baseMetaData == null) throw new IllegalStateException("Meta data not loaded!");
makeLoadCompletionStage(executorService, future);
}
}
return future;
return result.future;
}
/** @return a stream for the data contained in this file, skips the metadata from {@link AbstractMetaDataContainerFile}. */
private FileInputStream getFileInputStream() throws IOException
{
@@ -297,34 +289,52 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
return new BaseMetaData(data.getSectionPos(), -1,
data.getDataDetailLevel(), data.getWorldGenStep(), (loader == null ? 0 : loader.datatypeId), data.getBinaryDataFormatVersion());
}
private static final class CacheQueryResult {
public final CompletableFuture<IFullDataSource> future;
public final boolean needsLoad;
public CacheQueryResult(CompletableFuture<IFullDataSource> future, boolean needsLoad) {
this.future = future;
this.needsLoad = needsLoad;
}
}
/**
* @return one of the following:
* the cached {@link IFullDataSource},
* a future that will complete once the {@link FullDataMetaFile#writeQueueRef} has been written,
* or null if nothing has been cached and nothing is being loaded
*/
private CompletableFuture<IFullDataSource> getCachedDataSourceAsync()
private CacheQueryResult getCachedDataSourceAsync()
{
// this data source is being written to, use the existing future
if (this.dataSourceWriteQueueFuture != null)
CompletableFuture<IFullDataSource> dataSourceLoadFuture = this.dataSourceLoadFutureRef.get();
if (dataSourceLoadFuture != null)
{
return this.dataSourceWriteQueueFuture;
return new CacheQueryResult(dataSourceLoadFuture, false);
}
// attempt to get the cached data source
IFullDataSource cachedFullDataSource = this.cachedFullDataSource.get();
if (cachedFullDataSource != null)
if (cachedFullDataSource == null) {
// Make a new future, and CAS it into the dataSourceLoadFutureRef, or return the existing future
CompletableFuture<IFullDataSource> newFuture = new CompletableFuture<>();
CompletableFuture<IFullDataSource> cas = AtomicsUtil.compareAndExchange(dataSourceLoadFutureRef, null, newFuture);
if (cas == null) {
return new CacheQueryResult(newFuture, true);
} else {
return new CacheQueryResult(cas, false);
}
}
else
{
// The file is cached in RAM
boolean writeQueueEmpty = this.writeQueueRef.get().queue.isEmpty();
if (writeQueueEmpty)
boolean needUpdate = !this.writeQueueRef.get().queue.isEmpty() || markedNeedUpdate;
if (!needUpdate)
{
// return the cached data
return CompletableFuture.completedFuture(cachedFullDataSource);
return new CacheQueryResult(CompletableFuture.completedFuture(cachedFullDataSource), false);
}
else
{
@@ -333,48 +343,32 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
// Do a CAS on inCacheWriteLock to ensure that we are the only thread that is writing to the cache,
// or if we fail, then that means someone else is already doing it, and we can just return the future
CompletableFuture<IFullDataSource> future = new CompletableFuture<>();
CompletableFuture<IFullDataSource> compareAndSwapFuture = AtomicsUtil.compareAndExchange(this.inCacheWriteAccessFuture, null, future);
CompletableFuture<IFullDataSource> compareAndSwapFuture = AtomicsUtil.compareAndExchange(dataSourceLoadFutureRef, null, future);
if (compareAndSwapFuture != null)
{
// a write is already in progress, return its future.
return compareAndSwapFuture;
return new CacheQueryResult(compareAndSwapFuture, false);
}
else
{
// write the queue to the data source
this.dataSourceWriteQueueFuture = future;
this.fullDataSourceProvider.onDataFileRefresh(cachedFullDataSource, this.baseMetaData, this::_applyWriteQueueToFullDataSource, this::_updateAndWriteDataSource)
.handle((fullDataSource, exception) ->
{
if (exception != null)
{
LOGGER.error("Error refreshing data "+this.pos+": "+exception+" "+exception.getMessage(), exception);
future.complete(null);
this.cachedFullDataSource = new SoftReference<>(null);
}
else
{
future.complete(fullDataSource);
new DataObjTracker(fullDataSource);
this.cachedFullDataSource = new SoftReference<>(fullDataSource);
}
this.dataSourceWriteQueueFuture = null;
this.inCacheWriteAccessFuture.set(null);
return fullDataSource;
});
return future;
LodUtil.assertTrue(!inCrit);
inCrit = true;
// don't continue if the provider has been shut down
ExecutorService executorService = this.fullDataSourceProvider.getIOExecutor();
if (executorService.isTerminated())
{
inCrit = false;
dataSourceLoadFutureRef.set(null);
future.complete(null);
}
else {
// write the queue to the data source by triggering an update
makeUpdateCompletionStage(future, CompletableFuture.supplyAsync(() -> cachedFullDataSource, executorService));
}
return new CacheQueryResult(future, false);
}
}
}
// the data source hasn't been loaded
// and isn't in the process of being loaded
return null;
}
@@ -494,7 +488,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
this.backWriteQueue.queue.clear();
//LOGGER.info("Updated Data file at {} for sect {} with {} chunk writes.", path, pos, count);
}
return !isEmpty;
return !isEmpty || !doesFileExist;
}
private void _swapWriteQueue()
{
@@ -72,13 +72,14 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
for (FullDataMetaFile metaFile : this.fileBySectionPos.values())
{
metaFile.genQueueChecked = false; // unset it so it can be checked again
IFullDataSource data = metaFile.getCachedDataSourceNowOrNull();
if (data instanceof IIncompleteFullDataSource) {
//todo
//metaFile.flushAndSaveAsync().thenApply(() -> metaFile.forceReload());
// need manual marking for update.
metaFile.markNeedUpdate();
}
}
flushAndSave(); // Trigger an update to the meta files
}
public void clearGenerationQueue() { this.worldGenQueueRef.set(null); }
@@ -99,67 +100,32 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
// events //
//========//
private CompletableFuture<IFullDataSource> spawnGenTasks(FullDataMetaFile file, @Nullable IIncompleteFullDataSource data)
{
DhSectionPos pos = file.pos;
ArrayList<FullDataMetaFile> existingFiles = new ArrayList<>();
ArrayList<DhSectionPos> missingPositions = new ArrayList<>();
this.getDataFilesForPosition(pos, pos, existingFiles, missingPositions);
// confirm the quad tree has at least one node in it
LodUtil.assertTrue(!missingPositions.isEmpty() || !existingFiles.isEmpty());
// determine the type of dataSource that should be used for this position
IIncompleteFullDataSource incompleteFullDataSource;
if (data == null)
{
if (pos.sectionDetailLevel <= HighDetailIncompleteFullDataSource.MAX_SECTION_DETAIL)
{
incompleteFullDataSource = HighDetailIncompleteFullDataSource.createEmpty(pos);
}
else
{
incompleteFullDataSource = LowDetailIncompleteFullDataSource.createEmpty(pos);
}
}
else
{
incompleteFullDataSource = data;
}
@Nullable
private CompletableFuture<IFullDataSource> tryStartGenTask(FullDataMetaFile file, IIncompleteFullDataSource dataSource) {
WorldGenerationQueue worldGenQueue = this.worldGenQueueRef.get();
// breaks down the missing positions into the desired detail level that the gen queue could accept
if (worldGenQueue != null) {
if (worldGenQueue != null && !file.genQueueChecked) {
DhSectionPos pos = file.pos;
file.genQueueChecked = true;
byte maxSectDataDetailLevel = worldGenQueue.largestDataDetail;
byte targetDataDetailLevel = incompleteFullDataSource.getDataDetailLevel();
byte targetDataDetailLevel = dataSource.getDataDetailLevel();
if (targetDataDetailLevel > maxSectDataDetailLevel) {
ArrayList<FullDataMetaFile> existingFiles = new ArrayList<>();
byte sectDetailLevel = (byte) (DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL + maxSectDataDetailLevel);
missingPositions = missingPositions.stream()
.flatMap(missingPos -> {
if (missingPos.sectionDetailLevel > sectDetailLevel) {
// split this position into smaller positions
ArrayList<DhSectionPos> splitPositions = new ArrayList<>();
missingPos.forEachChildAtLevel(sectDetailLevel, splitPositions::add);
return splitPositions.stream();
} else {
return Stream.of(missingPos);
}
})
.collect(Collectors.toCollection(ArrayList::new));
pos.forEachChildAtLevel(sectDetailLevel, this::getOrMakeFile);
return sampleFromFiles(dataSource, existingFiles).thenApply(IIncompleteFullDataSource::tryPromotingToCompleteDataSource)
.exceptionally((e) ->
{
FullDataMetaFile newMetaFile = removeCorruptedFile(pos, file, e);
return null;
});
}
}
if (missingPositions.size() == 1 && existingFiles.isEmpty() && missingPositions.get(0).equals(pos))
{
// No LOD data exists for this position yet
if (worldGenQueue != null)
{
else {
this.incompleteSourceGenRequests.add(pos);
// queue this section to be generated
GenTask genTask = new GenTask(pos, new WeakReference<>(incompleteFullDataSource));
worldGenQueue.submitGenTask(new DhLodPos(pos), incompleteFullDataSource.getDataDetailLevel(), genTask)
GenTask genTask = new GenTask(pos, new WeakReference<>(dataSource));
worldGenQueue.submitGenTask(new DhLodPos(pos), dataSource.getDataDetailLevel(), genTask)
.whenComplete((genTaskResult, ex) ->
{
this.onWorldGenTaskComplete(genTaskResult, ex, genTask, pos);
@@ -167,59 +133,53 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
this.incompleteSourceGenRequests.remove(pos);
});
}
// return the empty dataSource (it will be populated later)
return CompletableFuture.completedFuture(incompleteFullDataSource);
return CompletableFuture.completedFuture(dataSource);
}
else
{
// LOD data exists for this position
return null;
}
// create the missing metaData files
for (DhSectionPos missingPos : missingPositions)
{
FullDataMetaFile newFile = this.getOrMakeFile(missingPos);
if (newFile != null)
// Try update the gen queue on this data source. If null, then nothing was done.
@Nullable
private CompletableFuture<IFullDataSource> updateDataGenStatus(FullDataMetaFile file, IIncompleteFullDataSource data)
{
DhSectionPos pos = file.pos;
ArrayList<FullDataMetaFile> existingFiles = new ArrayList<>();
ArrayList<DhSectionPos> missingPositions = new ArrayList<>();
this.getDataFilesForPosition(pos, pos, existingFiles, missingPositions);
if (missingPositions.size() == 1) {
// Only missing myself. I.e. no child file data exists yet.
return tryStartGenTask(file, data);
}
else {
// Has stuff to sample.
makeFiles(missingPositions, existingFiles);
return sampleFromFiles(data, existingFiles).thenApply(IIncompleteFullDataSource::tryPromotingToCompleteDataSource)
.exceptionally((e) ->
{
existingFiles.add(newFile);
}
}
// LOGGER.debug("Creating "+pos+" from sampling "+existingFiles.size()+" files: "+existingFiles);
// read in the existing data
final ArrayList<CompletableFuture<Void>> loadDataFutures = new ArrayList<>(existingFiles.size());
for (FullDataMetaFile existingFile : existingFiles)
{
loadDataFutures.add(existingFile.loadOrGetCachedDataSourceAsync()
.exceptionally((ex) -> /*Ignore file read errors*/null)
.thenAccept((fullDataSource) ->
{
if (fullDataSource == null)
{
return;
}
//this.checkIfSectionNeedsAdditionalGeneration(pos, fullDataSource);
//LOGGER.info("Merging data from {} into {}", data.getSectionPos(), pos);
incompleteFullDataSource.sampleFrom(fullDataSource);
})
);
}
return CompletableFuture.allOf(loadDataFutures.toArray(new CompletableFuture[0]))
.thenApply((voidValue) -> incompleteFullDataSource.tryPromotingToCompleteDataSource());
FullDataMetaFile newMetaFile = removeCorruptedFile(pos, file, e);
return null;
});
}
}
@Override
public CompletableFuture<IFullDataSource> onCreateDataFile(FullDataMetaFile file)
{
return this.spawnGenTasks(file, null);
DhSectionPos pos = file.pos;
IIncompleteFullDataSource data = makeDataSource(pos);
CompletableFuture<IFullDataSource> future = updateDataGenStatus(file, data);
// Cant start gen task, so return the data
return future == null ? CompletableFuture.completedFuture(data) : future;
}
@Override
public CompletableFuture<IFullDataSource> onDataFileLoaded(IFullDataSource source, BaseMetaData metaData,
Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater, boolean justCreated)
public CompletableFuture<IFullDataSource> onDataFileUpdate(IFullDataSource source, FullDataMetaFile file,
Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater)
{
boolean changed = updater.apply(source);
LodUtil.assertTrue(file.doesFileExist || changed);
if (source instanceof IIncompleteFullDataSource)
{
@@ -228,118 +188,32 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
source = newSource;
}
if (source instanceof IIncompleteFullDataSource && !justCreated) {
return this.spawnGenTasks(getOrMakeFile(metaData.pos), (IIncompleteFullDataSource)source)
.thenApply((newSource) -> {
if (source instanceof CompleteFullDataSource)
{
this.fireOnGenPosSuccessListeners(source.getSectionPos());
}
this.fireOnGenPosSuccessListeners(source.getSectionPos());
if (source instanceof IIncompleteFullDataSource && !file.genQueueChecked) {
WorldGenerationQueue worldGenQueue = this.worldGenQueueRef.get();
if (worldGenQueue != null) {
CompletableFuture<IFullDataSource> future = updateDataGenStatus(file, (IIncompleteFullDataSource) source);
if (future != null) {
return future.thenApply((newSource) -> {
onUpdated.accept(newSource);
return newSource;
});
}
else {
if (changed)
{
onUpdated.accept(source);
}
return CompletableFuture.completedFuture(source);
}
}
@Override
public CompletableFuture<IFullDataSource> onDataFileRefresh(IFullDataSource source, BaseMetaData metaData, Function<IFullDataSource, Boolean> updater, Consumer<IFullDataSource> onUpdated)
{
return super.onDataFileRefresh(source, metaData, updater, (IFullDataSource d) -> {
this.fireOnGenPosSuccessListeners(source.getSectionPos());
onUpdated.accept(d);
});
}
/**
* Checks if the given {@link IFullDataSource} is fully generated and
* if it isn't, creates the necessary world gen request(s) to finish it. <br>
* Should be used to fill out partially generated {@link IFullDataSource}'s,
* not populate empty ones.
*/
private void checkIfSectionNeedsAdditionalGeneration(DhSectionPos pos, IFullDataSource fullDataSource)
{
WorldGenerationQueue worldGenQueue = this.worldGenQueueRef.get();
if (worldGenQueue == null)
{
// world generation is disabled
return;
}
if (fullDataSource == null || fullDataSource.isEmpty())
{
// none of the data source has been generated, those generation requests should be handled elsewhere
return;
}
else if (this.incompleteSourceGenRequests.contains(pos))
{
return;
}
ArrayList<DhSectionPos> ungeneratedPosList = fullDataSource.getUngeneratedPosList(worldGenQueue.maxGranularity, true);
if (ungeneratedPosList.size() == 0)
{
// this section doesn't need to be generated
return;
}
//LOGGER.info("["+ungeneratedPosList.size()+"] missing sub positions for pos: ["+pos+"]. Number of gen requests queued: ["+this.incompleteSourceGenRequests.size()+"].");
List<CompletableFuture<WorldGenResult>> futureList = new ArrayList<>();
for (DhSectionPos ungenChildPos : ungeneratedPosList)
{
// don't queue the same section twice
if (this.incompleteSourceGenRequests.contains(ungenChildPos))
{
LOGGER.warn("checkIfSectionNeedsAdditionalGeneration skipping duplicate gen request for pos: ["+ungenChildPos+"].");
continue;
}
this.incompleteSourceGenRequests.add(ungenChildPos);
// make sure a full data source file exists before generating
// (the files can be
if (!this.fileBySectionPos.containsKey(pos))
{
FullDataMetaFile metaFile = this.getOrMakeFile(pos);
if (metaFile == null)
{
LOGGER.error("fireOnGenPosSuccessListeners unable to create file for pos: ["+pos+"].");
return;
}
}
// FIXME this can cause duplicate terrain generation requests
GenTask genTask = new GenTask(ungenChildPos, new WeakReference<>(fullDataSource));
CompletableFuture<WorldGenResult> future = worldGenQueue.submitGenTask(new DhLodPos(ungenChildPos), fullDataSource.getDataDetailLevel(), genTask)
.whenComplete((genTaskResult, ex) ->
{
this.onWorldGenTaskComplete(genTaskResult, ex, genTask, ungenChildPos);
this.fireOnGenPosSuccessListeners(pos);
this.incompleteSourceGenRequests.remove(ungenChildPos);
});
futureList.add(future);
}
if (futureList.size() != 0)
if (changed)
{
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.whenComplete((genTaskResult, ex) ->
{
// parent pos has completed generation
this.fireOnGenPosSuccessListeners(pos);
this.incompleteSourceGenRequests.remove(pos);
});
onUpdated.accept(source);
}
return CompletableFuture.completedFuture(source);
}
private void onWorldGenTaskComplete(WorldGenResult genTaskResult, Throwable exception, GenTask genTask, DhSectionPos pos)
{
if (exception != null)
@@ -27,8 +27,7 @@ public interface IFullDataSourceProvider extends AutoCloseable
//boolean isCacheVersionValid(DhSectionPos sectionPos, long cacheVersion);
CompletableFuture<IFullDataSource> onCreateDataFile(FullDataMetaFile file);
CompletableFuture<IFullDataSource> onDataFileLoaded(IFullDataSource source, BaseMetaData metaData, Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater, boolean justCreated);
CompletableFuture<IFullDataSource> onDataFileRefresh(IFullDataSource source, BaseMetaData metaData, Function<IFullDataSource, Boolean> updater, Consumer<IFullDataSource> onUpdated);
CompletableFuture<IFullDataSource> onDataFileUpdate(IFullDataSource source, FullDataMetaFile file, Consumer<IFullDataSource> onUpdated, Function<IFullDataSource, Boolean> updater);
File computeDataFilePath(DhSectionPos pos);
ExecutorService getIOExecutor();
@@ -32,13 +32,12 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
public DhClientLevel(AbstractSaveStructure saveStructure, IClientLevelWrapper clientLevelWrapper)
{
super();
if (saveStructure.getFullDataFolder(clientLevelWrapper).mkdirs())
{
LOGGER.warn("unable to create data folder.");
}
this.levelWrapper = clientLevelWrapper;
this.saveStructure = saveStructure;
if (saveStructure.getFullDataFolder(levelWrapper).mkdirs())
{
LOGGER.warn("unable to create full data folder.");
}
dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure.getFullDataFolder(levelWrapper));
clientside = new ClientLevelModule(this);
clientside.startRenderer();
@@ -33,6 +33,10 @@ public class DhClientServerLevel extends DhLevel implements IDhClientLevel, IDhS
public DhClientServerLevel(AbstractSaveStructure saveStructure, IServerLevelWrapper serverLevelWrapper)
{
if (saveStructure.getFullDataFolder(serverLevelWrapper).mkdirs())
{
LOGGER.warn("unable to create data folder.");
}
serverside = new ServerLevelModule(this, serverLevelWrapper, saveStructure);
clientside = new ClientLevelModule(this);
LOGGER.info("Started "+DhClientServerLevel.class.getSimpleName()+" for "+ serverLevelWrapper +" with saves at "+saveStructure);
@@ -21,6 +21,10 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
public DhServerLevel(AbstractSaveStructure saveStructure, IServerLevelWrapper serverLevelWrapper)
{
if (saveStructure.getFullDataFolder(serverLevelWrapper).mkdirs())
{
LOGGER.warn("unable to create data folder.");
}
serverside = new ServerLevelModule(this, serverLevelWrapper, saveStructure);
LOGGER.info("Started DHLevel for {} with saves at {}", serverLevelWrapper, saveStructure);
}
@@ -13,31 +13,6 @@ import java.util.function.Predicate;
public class AtomicsUtil
{
public static <T> T compareAndExchange(AtomicReference<T> atomic, T expected, T newValue)
{
while (true)
{
T oldValue = atomic.get();
if (oldValue != expected)
{
return oldValue;
}
else if (atomic.weakCompareAndSet(expected, newValue))
{
return expected;
}
}
}
public static <T> BooleanObjectImmutablePair<T> compareAndExchangeWeak(AtomicReference<T> atomic, T expected, T newValue) {
T oldValue = atomic.get();
if (oldValue == expected && atomic.weakCompareAndSet(expected, newValue)) {
return new BooleanObjectImmutablePair<>(true, expected);
} else {
return new BooleanObjectImmutablePair<>(false, oldValue);
}
}
public static <T> T conditionalAndExchange(AtomicReference<T> atomic, Predicate<T> requirement, T newValue) {
while (true) {
T oldValue = atomic.get();
@@ -54,9 +29,25 @@ public class AtomicsUtil
return new BooleanObjectImmutablePair<>(false, oldValue);
}
}
public static <T> T compareAndExchange(AtomicReference<T> atomic, T expected, T newValue)
{
while (true) {
T oldValue = atomic.get();
if (oldValue != expected) return oldValue;
if (atomic.weakCompareAndSet(expected, newValue)) return expected;
}
}
public static <T> BooleanObjectImmutablePair<T> compareAndExchangeWeak(AtomicReference<T> atomic, T expected, T newValue) {
T oldValue = atomic.get();
if (oldValue == expected && atomic.weakCompareAndSet(expected, newValue)) {
return new BooleanObjectImmutablePair<>(true, expected);
} else {
return new BooleanObjectImmutablePair<>(false, oldValue);
}
}
// Additionally, we implement some helper methods for frequently used atomic operations. //
// Compare with expected value and set new value if equal. Then return whatever value the atomic now contains.