Add full Data source pooling for certain sampling operations

May cause some concurrency issues
This commit is contained in:
James Seibel
2023-09-21 07:11:41 -05:00
parent f5fd27d513
commit cd7463728f
12 changed files with 325 additions and 117 deletions
@@ -23,35 +23,49 @@ import com.google.common.collect.HashMultimap;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataMetaFile;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractFullDataSourceLoader
{
public static final HashMultimap<Class<? extends IFullDataSource>, AbstractFullDataSourceLoader> LOADER_REGISTRY = HashMultimap.create();
public static final HashMap<Long, Class<? extends IFullDataSource>> DATATYPE_ID_REGISTRY = new HashMap<>();
private static final int AVAILABLE_PROCESSOR_COUNT = Runtime.getRuntime().availableProcessors();
public static final HashMultimap<Class<? extends IFullDataSource>, AbstractFullDataSourceLoader> loaderRegistry = HashMultimap.create();
public final Class<? extends IFullDataSource> fullDataSourceClass;
public static final HashMap<Long, Class<? extends IFullDataSource>> datatypeIdRegistry = new HashMap<>();
public final long datatypeId;
public final byte[] loaderSupportedVersions;
/** used when pooling data sources */
private final ArrayList<IFullDataSource> cachedSources = new ArrayList<>();
private final ReadWriteLock cacheReadWriteLock = new ReentrantReadWriteLock();
//=============//
// constructor //
//=============//
public AbstractFullDataSourceLoader(Class<? extends IFullDataSource> fullDataSourceClass, long datatypeId, byte[] loaderSupportedVersions)
{
this.datatypeId = datatypeId;
this.loaderSupportedVersions = loaderSupportedVersions;
Arrays.sort(loaderSupportedVersions); // sort to allow fast access
this.fullDataSourceClass = fullDataSourceClass;
if (datatypeIdRegistry.containsKey(datatypeId) && datatypeIdRegistry.get(datatypeId) != fullDataSourceClass)
if (DATATYPE_ID_REGISTRY.containsKey(datatypeId) && DATATYPE_ID_REGISTRY.get(datatypeId) != fullDataSourceClass)
{
throw new IllegalArgumentException("Loader for datatypeId " + datatypeId + " already registered with different class: "
+ datatypeIdRegistry.get(datatypeId) + " != " + fullDataSourceClass);
+ DATATYPE_ID_REGISTRY.get(datatypeId) + " != " + fullDataSourceClass);
}
Set<AbstractFullDataSourceLoader> loaders = loaderRegistry.get(fullDataSourceClass);
Set<AbstractFullDataSourceLoader> loaders = LOADER_REGISTRY.get(fullDataSourceClass);
if (loaders.stream().anyMatch(other ->
{
// see if any loaderSupportsVersion conflicts with this one
@@ -68,31 +82,129 @@ public abstract class AbstractFullDataSourceLoader
throw new IllegalArgumentException("Loader for class " + fullDataSourceClass + " that supports one of the version in "
+ Arrays.toString(loaderSupportedVersions) + " already registered!");
}
datatypeIdRegistry.put(datatypeId, fullDataSourceClass);
loaderRegistry.put(fullDataSourceClass, this);
DATATYPE_ID_REGISTRY.put(datatypeId, fullDataSourceClass);
LOADER_REGISTRY.put(fullDataSourceClass, this);
}
//================//
// loader getters //
//================//
public static AbstractFullDataSourceLoader getLoader(long dataTypeId, byte dataVersion)
{
return LOADER_REGISTRY.get(DATATYPE_ID_REGISTRY.get(dataTypeId)).stream()
.filter(loader -> Arrays.binarySearch(loader.loaderSupportedVersions, dataVersion) >= 0)
.findFirst().orElse(null);
}
public static AbstractFullDataSourceLoader getLoader(Class<? extends IFullDataSource> clazz, byte dataVersion)
{
return LOADER_REGISTRY.get(clazz).stream()
.filter(loader -> Arrays.binarySearch(loader.loaderSupportedVersions, dataVersion) >= 0)
.findFirst().orElse(null);
}
//==================//
// abstract methods //
//==================//
protected abstract IFullDataSource createEmptyDataSource(DhSectionPos pos);
//==============//
// data loading //
//==============//
/**
* Can return null if any of the requirements aren't met.
*
* @throws InterruptedException if the loader thread is interrupted, generally happens when the level is shutting down
*/
public abstract IFullDataSource loadData(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException;
public static AbstractFullDataSourceLoader getLoader(long dataTypeId, byte dataVersion)
public IFullDataSource loadDataSource(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException
{
return loaderRegistry.get(datatypeIdRegistry.get(dataTypeId)).stream()
.filter(l -> Arrays.binarySearch(l.loaderSupportedVersions, dataVersion) >= 0)
.findFirst().orElse(null);
IFullDataSource dataSource = this.createEmptyDataSource(dataFile.pos);
dataSource.populateFromStream(dataFile, inputStream, level);
return dataSource;
}
public static AbstractFullDataSourceLoader getLoader(Class<? extends IFullDataSource> clazz, byte dataVersion)
/** Should be used in conjunction with {@link AbstractFullDataSourceLoader#returnPooledDataSource} to return the pooled sources. */
public IFullDataSource loadTemporaryDataSource(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException
{
return loaderRegistry.get(clazz).stream()
.filter(loader -> Arrays.binarySearch(loader.loaderSupportedVersions, dataVersion) >= 0)
.findFirst().orElse(null);
IFullDataSource dataSource = this.tryGetPooledSource();
if (dataSource != null)
{
dataSource.repopulateFromStream(dataFile, inputStream, level);
}
else
{
dataSource = this.loadDataSource(dataFile, inputStream, level);
}
return dataSource;
}
//=====================//
// data source pooling //
//=====================//
/** @return null if no pooled source exists */
public IFullDataSource tryGetPooledSource()
{
try
{
this.cacheReadWriteLock.readLock().lock();
int index = this.cachedSources.size() - 1;
if (index == -1)
{
return null;
}
else
{
return this.cachedSources.remove(index);
}
}
finally
{
this.cacheReadWriteLock.readLock().unlock();
}
}
/**
* Doesn't have to be called, if a data source isn't returned, nothing will be leaked.
* It just means a new source must be constructed next time {@link AbstractFullDataSourceLoader#tryGetPooledSource} is called.
*/
public void returnPooledDataSource(IFullDataSource dataSource)
{
if (dataSource == null)
{
return;
}
else if (dataSource.getClass() != this.fullDataSourceClass)
{
return;
}
else if (this.cachedSources.size() > 25)
{
return;
}
try
{
this.cacheReadWriteLock.writeLock().lock();
this.cachedSources.add(dataSource);
}
finally
{
this.cacheReadWriteLock.writeLock().unlock();
}
}
}
@@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.dataObjects.fullData.loader;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataMetaFile;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
@@ -29,17 +30,9 @@ import java.io.IOException;
public class CompleteFullDataSourceLoader extends AbstractFullDataSourceLoader
{
public CompleteFullDataSourceLoader()
{
super(CompleteFullDataSource.class, CompleteFullDataSource.TYPE_ID, new byte[]{CompleteFullDataSource.DATA_FORMAT_VERSION});
}
public CompleteFullDataSourceLoader() { super(CompleteFullDataSource.class, CompleteFullDataSource.TYPE_ID, new byte[]{CompleteFullDataSource.DATA_FORMAT_VERSION}); }
@Override
public IFullDataSource loadData(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException
{
CompleteFullDataSource dataSource = CompleteFullDataSource.createEmpty(dataFile.pos);
dataSource.populateFromStream(dataFile, inputStream, level);
return dataSource;
}
protected IFullDataSource createEmptyDataSource(DhSectionPos pos) { return CompleteFullDataSource.createEmpty(pos); }
}
@@ -19,9 +19,11 @@
package com.seibel.distanthorizons.core.dataObjects.fullData.loader;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataMetaFile;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.HighDetailIncompleteFullDataSource;
@@ -29,17 +31,9 @@ import java.io.IOException;
public class HighDetailIncompleteFullDataSourceLoader extends AbstractFullDataSourceLoader
{
public HighDetailIncompleteFullDataSourceLoader()
{
super(HighDetailIncompleteFullDataSource.class, HighDetailIncompleteFullDataSource.TYPE_ID, new byte[]{HighDetailIncompleteFullDataSource.DATA_FORMAT_VERSION});
}
public HighDetailIncompleteFullDataSourceLoader() { super(HighDetailIncompleteFullDataSource.class, HighDetailIncompleteFullDataSource.TYPE_ID, new byte[]{HighDetailIncompleteFullDataSource.DATA_FORMAT_VERSION}); }
@Override
public IFullDataSource loadData(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException
{
HighDetailIncompleteFullDataSource dataSource = HighDetailIncompleteFullDataSource.createEmpty(dataFile.pos);
dataSource.populateFromStream(dataFile, inputStream, level);
return dataSource;
}
protected IFullDataSource createEmptyDataSource(DhSectionPos pos) { return HighDetailIncompleteFullDataSource.createEmpty(pos); }
}
@@ -19,9 +19,11 @@
package com.seibel.distanthorizons.core.dataObjects.fullData.loader;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataMetaFile;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.LowDetailIncompleteFullDataSource;
@@ -29,17 +31,9 @@ import java.io.IOException;
public class LowDetailIncompleteFullDataSourceLoader extends AbstractFullDataSourceLoader
{
public LowDetailIncompleteFullDataSourceLoader()
{
super(LowDetailIncompleteFullDataSource.class, LowDetailIncompleteFullDataSource.TYPE_ID, new byte[]{LowDetailIncompleteFullDataSource.DATA_FORMAT_VERSION});
}
public LowDetailIncompleteFullDataSourceLoader() { super(LowDetailIncompleteFullDataSource.class, LowDetailIncompleteFullDataSource.TYPE_ID, new byte[]{LowDetailIncompleteFullDataSource.DATA_FORMAT_VERSION}); }
@Override
public IFullDataSource loadData(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException
{
LowDetailIncompleteFullDataSource dataSource = LowDetailIncompleteFullDataSource.createEmpty(dataFile.pos);
dataSource.populateFromStream(dataFile, inputStream, level);
return dataSource;
}
protected IFullDataSource createEmptyDataSource(DhSectionPos pos) { return LowDetailIncompleteFullDataSource.createEmpty(pos); }
}
@@ -62,7 +62,7 @@ public class CompleteFullDataSource extends FullDataArrayAccessor implements IFu
/** written to the binary file to mark what {@link IFullDataSource} the binary file corresponds to */
public static final long TYPE_ID = "CompleteFullDataSource".hashCode();
private final DhSectionPos sectionPos;
private DhSectionPos sectionPos;
private boolean isEmpty = true;
public EDhApiWorldGenerationStep worldGenStep = EDhApiWorldGenerationStep.EMPTY;
@@ -415,6 +415,14 @@ public class CompleteFullDataSource extends FullDataArrayAccessor implements IFu
@Override
public DhSectionPos getSectionPos() { return this.sectionPos; }
@Override
public void resizeDataStructuresForRepopulation(DhSectionPos pos)
{
// no data structures need to be changed, only the source's position
this.sectionPos = pos;
}
@Override
public byte getDataDetailLevel() { return (byte) (this.sectionPos.getDetailLevel() - SECTION_SIZE_OFFSET); }
@@ -77,12 +77,12 @@ public class HighDetailIncompleteFullDataSource implements IIncompleteFullDataSo
protected final FullDataPointIdMap mapping;
private final DhSectionPos sectionPos;
private final FullDataArrayAccessor[] sparseData;
private final DhLodPos chunkPos;
private DhSectionPos sectionPos;
private FullDataArrayAccessor[] sparseData;
private DhLodPos chunkPos;
public final int sectionCount;
public final int dataPointsPerSection;
public int sectionCount;
public int dataPointsPerSection;
public boolean isEmpty = true;
public EDhApiWorldGenerationStep worldGenStep = EDhApiWorldGenerationStep.EMPTY;
private boolean isPromoted = false;
@@ -362,7 +362,18 @@ public class HighDetailIncompleteFullDataSource implements IIncompleteFullDataSo
{
for (int dataPointColIndex = 0; dataPointColIndex < dataPoints[arrayAccessorIndex].length; dataPointColIndex++)
{
System.arraycopy(dataPoints[arrayAccessorIndex][dataPointColIndex], 0, this.sparseData[arrayAccessorIndex].get(dataPointColIndex).getRaw(), 0, dataPoints[dataPointColIndex].length);
long[] incomingColumn = dataPoints[arrayAccessorIndex][dataPointColIndex];
long[] destinationColumn = this.sparseData[arrayAccessorIndex].get(dataPointColIndex).getRaw();
// use the existing arrays if possible
if (incomingColumn.length == destinationColumn.length)
{
System.arraycopy(incomingColumn, 0, destinationColumn, 0, incomingColumn.length);
}
else
{
this.sparseData[arrayAccessorIndex].get(dataPointColIndex).setNew(incomingColumn);
}
}
}
}
@@ -420,6 +431,27 @@ public class HighDetailIncompleteFullDataSource implements IIncompleteFullDataSo
@Override
public DhSectionPos getSectionPos() { return this.sectionPos; }
@Override
public void resizeDataStructuresForRepopulation(DhSectionPos pos)
{
// update the position
this.sectionPos = pos;
this.sectionCount = BitShiftUtil.powerOfTwo(this.sectionPos.getDetailLevel() - SPARSE_UNIT_DETAIL);
this.dataPointsPerSection = SECTION_SIZE / this.sectionCount;
this.chunkPos = this.sectionPos.getMinCornerLodPos(SPARSE_UNIT_DETAIL);
// update the data container
int dataPointCount = this.sectionCount * this.sectionCount;
if (this.sparseData.length != dataPointCount)
{
this.sparseData = new FullDataArrayAccessor[this.sectionCount * this.sectionCount];
}
}
@Override
public byte getDataDetailLevel() { return (byte) (this.sectionPos.getDetailLevel() - SECTION_SIZE_OFFSET); }
@@ -67,7 +67,7 @@ public class LowDetailIncompleteFullDataSource extends FullDataArrayAccessor imp
public static final long TYPE_ID = "LowDetailIncompleteFullDataSource".hashCode();
private final DhSectionPos sectionPos;
private DhSectionPos sectionPos;
private final BitSet isColumnNotEmpty;
private boolean isEmpty = true;
@@ -294,6 +294,14 @@ public class LowDetailIncompleteFullDataSource extends FullDataArrayAccessor imp
@Override
public DhSectionPos getSectionPos() { return this.sectionPos; }
@Override
public void resizeDataStructuresForRepopulation(DhSectionPos pos)
{
// no data structures need to be changed, only the source's position
this.sectionPos = pos;
}
@Override
public byte getDataDetailLevel() { return (byte) (this.sectionPos.getDetailLevel() - SECTION_SIZE_OFFSET); }
@Override
@@ -278,4 +278,11 @@ public interface IFullDataSource
*/
void populateFromStream(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException;
/**
* Should only be implemented by {@link IStreamableFullDataSource} to prevent potential stream read/write inconsistencies.
*
* @see IStreamableFullDataSource#repopulateFromStream(FullDataMetaFile, DhDataInputStream, IDhLevel)
*/
void repopulateFromStream(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException;
}
@@ -22,11 +22,10 @@ package com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces;
import com.seibel.distanthorizons.api.enums.worldGeneration.EDhApiWorldGenerationStep;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.dataObjects.fullData.FullDataPointIdMap;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.FullDataArrayAccessor;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataMetaFile;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataOutputStream;
import com.seibel.distanthorizons.core.util.objects.dataStreams.*;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import java.io.IOException;
@@ -40,8 +39,6 @@ import java.io.IOException;
*
* @param <SummaryDataType> defines the object holding this data source's summary data, extends {@link IStreamableFullDataSource.FullDataSourceSummaryData}.
* @param <DataContainerType> defines the object holding the data points, probably long[][] or long[][][].
* @apiNote James would've preferred to have this as an abstract class,
* however that is impossible. See the apiNote in
* {@link IStreamableFullDataSource#populateFromStream(FullDataMetaFile, DhDataInputStream, IDhLevel) populateFromStream}
* for the full reasoning.
*/
@@ -52,14 +49,28 @@ public interface IStreamableFullDataSource<SummaryDataType extends IStreamableFu
// stream handling //
//=================//
/**
* Clears and then overwrites any data in this object with the data from the given file and stream.
* This is expected to be used with an existing {@link IStreamableFullDataSource} and can be used in place of a constructor to reuse an existing {@link IStreamableFullDataSource} object.
*
* @see IStreamableFullDataSource#populateFromStream
*/
@Override
default void repopulateFromStream(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException
{
// clear/overwrite the old data
this.resizeDataStructuresForRepopulation(dataFile.pos);
this.getMapping().clear(dataFile.pos);
// set the new data
this.populateFromStream(dataFile, inputStream, level);
}
/**
* Overwrites any data in this object with the data from the given file and stream.
* This is expected to be used with an empty {@link IStreamableFullDataSource} and functions similar to a constructor.
*
* @apiNote James would've preferred that {@link IStreamableFullDataSource} was an abstract class,
* so this could've been a constructor.
* However, several inheritors of this interface already extend {@link FullDataArrayAccessor}, making that impossible.
*/
@Override
default void populateFromStream(FullDataMetaFile dataFile, DhDataInputStream inputStream, IDhLevel level) throws IOException, InterruptedException
{
SummaryDataType summaryData = this.readSourceSummaryInfo(dataFile, inputStream, level);
@@ -79,6 +90,7 @@ public interface IStreamableFullDataSource<SummaryDataType extends IStreamableFu
}
@Override
default void writeToStream(DhDataOutputStream outputStream, IDhLevel level) throws IOException
{
this.writeSourceSummaryInfo(level, outputStream);
@@ -94,6 +106,9 @@ public interface IStreamableFullDataSource<SummaryDataType extends IStreamableFu
/** Note: this should only be used if the data source is being reused. Normally data sources shouldn't change. */
void resizeDataStructuresForRepopulation(DhSectionPos pos);
/**
* Includes information about the source file that doesn't need to be saved in each data point. Like the source's size and y-level.
*/
@@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.file.fullDatafile;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
import com.seibel.distanthorizons.core.dataObjects.fullData.loader.AbstractFullDataSourceLoader;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.HighDetailIncompleteFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.LowDetailIncompleteFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
@@ -377,7 +378,7 @@ public class FullDataFileHandler implements IFullDataSourceProvider
}
/** populates the given data source using the given array of files */
protected CompletableFuture<IIncompleteFullDataSource> sampleFromFileArray(IIncompleteFullDataSource recipientFullDataSource, ArrayList<FullDataMetaFile> existingFiles)
protected CompletableFuture<IIncompleteFullDataSource> sampleFromFileArray(IIncompleteFullDataSource recipientFullDataSource, ArrayList<FullDataMetaFile> existingFiles, boolean cacheLoadedDataSources)
{
boolean showFullDataFileSampling = Config.Client.Advanced.Debugging.DebugWireframe.showFullDataFileSampling.get();
if (showFullDataFileSampling)
@@ -388,47 +389,64 @@ public class FullDataFileHandler implements IFullDataSourceProvider
}
// read in the existing data
final ArrayList<CompletableFuture<Void>> loadDataFutures = new ArrayList<>(existingFiles.size());
final ArrayList<CompletableFuture<IFullDataSource>> sampleDataFutures = new ArrayList<>(existingFiles.size());
for (int i = 0; i < existingFiles.size(); i++)
{
FullDataMetaFile existingFile = existingFiles.get(i);
CompletableFuture<Void> loadFileFuture = existingFile.getOrLoadCachedDataSourceAsync()
.handle((existingFullDataSource, ex) ->
{
if (existingFullDataSource == null || ex != null)
{
// Ignore file read errors
//LOGGER.warn(recipientFullDataSource.getSectionPos()+" sample from, file read error for file "+existingFile.pos+": "+ex.getMessage(), ex);
return null;
}
if (showFullDataFileSampling)
{
DebugRenderer.makeParticle(new DebugRenderer.BoxParticle(
new DebugRenderer.Box(recipientFullDataSource.getSectionPos(), 64f, 72f, 0.03f, Color.MAGENTA.darker()),
0.2, 32f));
}
try
{
recipientFullDataSource.sampleFrom(existingFullDataSource);
}
catch (Exception e)
{
LOGGER.warn("Unable to sample "+existingFullDataSource.getSectionPos()+" into "+recipientFullDataSource.getSectionPos());
//throw e;
}
// hopefully clearing the cached data source immediately after we are done with it will help the garbage collector a bit
existingFile.clearCachedDataSource();
return null; // TODO remove the need to un-necessarily return null
});
loadDataFutures.add(loadFileFuture);
CompletableFuture<IFullDataSource> loadFileFuture = cacheLoadedDataSources ? existingFile.getOrLoadCachedDataSourceAsync() : existingFile.getDataSourceWithoutCachingAsync();
CompletableFuture<IFullDataSource> sampleSourceFuture = loadFileFuture.whenComplete((existingFullDataSource, ex) ->
{
if (existingFullDataSource == null || ex != null)
{
// Ignore file read errors
//LOGGER.warn(recipientFullDataSource.getSectionPos()+" sample from, file read error for file "+existingFile.pos+": "+ex.getMessage(), ex);
return;
}
if (showFullDataFileSampling)
{
DebugRenderer.makeParticle(new DebugRenderer.BoxParticle(
new DebugRenderer.Box(recipientFullDataSource.getSectionPos(), 64f, 72f, 0.03f, Color.MAGENTA.darker()),
0.2, 32f));
}
try
{
recipientFullDataSource.sampleFrom(existingFullDataSource);
}
catch (Exception e)
{
LOGGER.warn("Unable to sample "+existingFullDataSource.getSectionPos()+" into "+recipientFullDataSource.getSectionPos(), e);
//throw e;
}
// pooling temporary data sources massively reduces garbage collector overhead when just sampling (going from ~8 GB/sec to ~90 MB/sec)
if (!cacheLoadedDataSources && !existingFile.cacheLoadingDataSource)
{
existingFile.clearCachedDataSource();
// get the data loader
AbstractFullDataSourceLoader dataSourceLoader;
if (existingFile.fullDataSourceLoader != null)
{
dataSourceLoader = existingFile.fullDataSourceLoader;
}
else
{
// shouldn't normally happen, but sometimes does
dataSourceLoader = AbstractFullDataSourceLoader.getLoader(existingFile.baseMetaData.dataTypeId, existingFile.baseMetaData.binaryDataFormatVersion);
}
dataSourceLoader.returnPooledDataSource(existingFullDataSource);
}
});
sampleDataFutures.add(sampleSourceFuture);
}
return CompletableFuture.allOf(loadDataFutures.toArray(new CompletableFuture[0]))
return CompletableFuture.allOf(sampleDataFutures.toArray(new CompletableFuture[0]))
.thenApply(voidObj -> recipientFullDataSource);
}
@@ -461,7 +479,7 @@ public class FullDataFileHandler implements IFullDataSourceProvider
else
{
this.makeFiles(missing, existFiles);
return this.sampleFromFileArray(source, existFiles).thenApply(IIncompleteFullDataSource::tryPromotingToCompleteDataSource)
return this.sampleFromFileArray(source, existFiles, false).thenApply(IIncompleteFullDataSource::tryPromotingToCompleteDataSource)
.exceptionally((e) ->
{
FullDataMetaFile newMetaFile = this.removeCorruptedFile(pos, file, e);
@@ -42,7 +42,6 @@ import com.seibel.distanthorizons.core.pos.DhLodPos;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.dataObjects.fullData.loader.AbstractFullDataSourceLoader;
import com.seibel.distanthorizons.core.util.AtomicsUtil;
import com.seibel.distanthorizons.core.util.FileUtil;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
@@ -88,6 +87,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
*/
private DataSourceReferenceTracker.FullDataSourceSoftRef cachedFullDataSourceRef = new DataSourceReferenceTracker.FullDataSourceSoftRef(this,null);
private final AtomicReference<CompletableFuture<IFullDataSource>> dataSourceLoadFutureRef = new AtomicReference<>(null);
public volatile Boolean cacheLoadingDataSource = null;
// === Concurrent Write tracking ===
private final AtomicReference<GuardedMultiAppendQueue> writeQueueRef = new AtomicReference<>(new GuardedMultiAppendQueue());
@@ -168,7 +168,8 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
if (dataExists)
{
this.cachedFullDataSourceRef.close();
this.cachedFullDataSourceRef.clear();
this.cachedFullDataSourceRef.clear();
this.cacheLoadingDataSource = null;
}
return dataExists;
@@ -176,13 +177,20 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
public CompletableFuture<IFullDataSource> getOrLoadCachedDataSourceAsync()
public CompletableFuture<IFullDataSource> getDataSourceWithoutCachingAsync() { return this.getOrLoadCachedDataSourceAsync(false); }
public CompletableFuture<IFullDataSource> getOrLoadCachedDataSourceAsync() { return this.getOrLoadCachedDataSourceAsync(true); } // TODO broken when accessed by multiple threads
private CompletableFuture<IFullDataSource> getOrLoadCachedDataSourceAsync(boolean cacheLoadingSource)
{
checkAndLogPhantomDataSourceLifeCycles();
CompletableFuture<IFullDataSource> potentialLoadFuture = this.getCachedDataSourceAsync();
if (potentialLoadFuture != null)
{
if (cacheLoadingSource)
{
this.cacheLoadingDataSource = true;
}
// return the in-process future
return potentialLoadFuture;
}
@@ -196,6 +204,8 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
// two threads attempted to start this job at the same time, only use the first future
potentialLoadFuture = this.dataSourceLoadFutureRef.get();
}
this.cacheLoadingDataSource = cacheLoadingSource;
}
@@ -246,11 +256,18 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
try (FileInputStream fileInputStream = this.getFileInputStream();
DhDataInputStream compressedStream = new DhDataInputStream(fileInputStream))
{
fullDataSource = this.fullDataSourceLoader.loadData(this, compressedStream, this.level);
if (cacheLoadingSource)
{
fullDataSource = this.fullDataSourceLoader.loadDataSource(this, compressedStream, this.level);
}
else
{
fullDataSource = this.fullDataSourceLoader.loadTemporaryDataSource(this, compressedStream, this.level);
}
}
catch (Exception ex)
{
// TODO temporary fix
/// TODO temporary fix
dataSourceLoadFuture.completeExceptionally(ex);
this.dataSourceLoadFutureRef.set(null);
@@ -392,11 +409,11 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
public CompletableFuture<Void> flushAndSaveAsync()
{
checkAndLogPhantomDataSourceLifeCycles();
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty() && !needsUpdate;
boolean isEmpty = this.writeQueueRef.get().queue.isEmpty() && !this.needsUpdate;
if (!isEmpty)
{
// This will flush the data to disk.
return this.getOrLoadCachedDataSourceAsync().thenApply((fullDataSource) -> null /* ignore the result, just wait for the load to finish*/ );
return this.getDataSourceWithoutCachingAsync().thenApply((fullDataSource) -> null /* ignore the result, just wait for the load to finish*/ );
}
else
{
@@ -603,8 +620,11 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
}
// save the updated data source
this.cachedFullDataSourceRef = new DataSourceReferenceTracker.FullDataSourceSoftRef(this, fullDataSource);
if (this.cacheLoadingDataSource)
{
// save the updated data source
this.cachedFullDataSourceRef = new DataSourceReferenceTracker.FullDataSourceSoftRef(this, fullDataSource);
}
// the task is complete
completionFuture.complete(fullDataSource);
@@ -613,7 +633,14 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile implements I
if (this.needsUpdate)
{
// another update was requested while this update was being processed
this.getOrLoadCachedDataSourceAsync();
if (this.cacheLoadingDataSource)
{
this.getOrLoadCachedDataSourceAsync();
}
else
{
this.getDataSourceWithoutCachingAsync();
}
}
});
@@ -134,11 +134,11 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
{
ArrayList<FullDataMetaFile> existingFiles = new ArrayList<>();
byte sectDetailLevel = (byte) (DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL + maxSectDataDetailLevel);
pos.forEachChildAtLevel(sectDetailLevel, p -> existingFiles.add(getLoadOrMakeFile(p, true)));
return sampleFromFileArray(dataSource, existingFiles).thenApply(this::tryPromoteDataSource)
.exceptionally((e) ->
pos.forEachChildAtLevel(sectDetailLevel, childPos -> existingFiles.add(this.getLoadOrMakeFile(childPos, true)));
return this.sampleFromFileArray(dataSource, existingFiles, true).thenApply(this::tryPromoteDataSource)
.exceptionally((ex) ->
{
FullDataMetaFile newMetaFile = removeCorruptedFile(pos, file, e);
FullDataMetaFile newMetaFile = this.removeCorruptedFile(pos, file, ex);
return null;
});
}
@@ -186,7 +186,7 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
{
// There are other data source files to sample from.
this.makeFiles(missingPositions, existingFiles);
return this.sampleFromFileArray(data, existingFiles)
return this.sampleFromFileArray(data, existingFiles, true)
.thenApply(this::tryPromoteDataSource)
.exceptionally((e) ->
{