Finish up 99% of file & cache handling + creation + management + many things

This commit is contained in:
TomTheFurry
2022-06-25 23:12:17 +08:00
parent 2bb78fcc88
commit 82ef1581dd
13 changed files with 446 additions and 89 deletions
@@ -15,19 +15,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicReference;
/**
* Example on how to register a loader:
* <pre>
public static RenderDataSource testAndConstruct(LodDataSource dataSource, DhSectionPos sectionPos) {
ColumnRenderContainer container = new ColumnRenderContainer(10, -100);
container.startFillData(dataSource);
return container;
}
static {
RenderDataSource.registorLoader(ColumnRenderContainer::testAndConstruct, 0);
}
</pre>
*/
public interface LodRenderSource {
void enableRender(LodQuadTree quadTree);
void disableRender();
@@ -45,5 +32,7 @@ public interface LodRenderSource {
void saveRender(IClientLevel level, RenderMetaFile file, OutputStream dataStream) throws IOException;
void update(DHChunkPos chunkPos, ChunkSizedData chunkData);
void update(ChunkSizedData chunkData);
byte getRenderVersion();
}
@@ -21,7 +21,7 @@ public class ColumnRenderLoader extends RenderSourceLoader {
//TODO: Add decompressor here
DataInputStream dis = new DataInputStream(data);
) {
return new ColumnRenderSource(dataFile.pos, dis, dataFile.dataVersion, level);
return new ColumnRenderSource(dataFile.pos, dis, dataFile.loaderVersion, level);
}
}
@@ -312,7 +312,12 @@ public class ColumnRenderSource implements LodRenderSource, IColumnDatatype {
}
@Override
public void update(DHChunkPos chunkPos, ChunkSizedData chunkData) {
public void update(ChunkSizedData chunkData) {
//TODO Update render data directly
}
@Override
public byte getRenderVersion() {
return LATEST_VERSION;
}
}
@@ -0,0 +1,30 @@
package com.seibel.lod.core.a7.datatype.transform;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.LodRenderSource;
import com.seibel.lod.core.a7.datatype.column.ColumnRenderLoader;
import com.seibel.lod.core.a7.datatype.column.ColumnRenderSource;
import com.seibel.lod.core.a7.level.IClientLevel;
import com.seibel.lod.core.util.LodUtil;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class DataRenderTransformer {
public static final ExecutorService TRANSFORMER_THREADS
= LodUtil.makeSingleThreadPool("Data/Render Transformer");
public static CompletableFuture<LodRenderSource> transformDataSource(LodDataSource data, IClientLevel level) {
return CompletableFuture.supplyAsync(() -> transform(data, level), TRANSFORMER_THREADS);
}
public static CompletableFuture<LodRenderSource> asyncTransformDataSource(CompletableFuture<LodDataSource> data, IClientLevel level) {
return data.thenApplyAsync((d) -> transform(d, level), TRANSFORMER_THREADS);
}
private static LodRenderSource transform(LodDataSource dataSource, IClientLevel level) {
if (dataSource == null) return null;
return ColumnRenderLoader.loaderRegistry.get(ColumnRenderSource.class)
.stream().findFirst().get().createRender(dataSource, level);
}
}
@@ -44,7 +44,7 @@ public class LodSection {
public void load(IRenderSourceProvider renderDataProvider, RenderSourceLoader renderDataSourceClass) {
if (loadFuture != null || lodRenderSource != null) throw new IllegalStateException("Reloading is not supported!");
loadFuture = renderDataProvider.createRenderData(renderDataSourceClass, pos);
loadFuture = renderDataProvider.read(pos);
}
public void tick(LodQuadTree quadTree) {
@@ -9,14 +9,18 @@ import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
import com.seibel.lod.core.a7.datatype.DataSourceLoader;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
public class MetaFile {
public static final Logger LOGGER = DhLoggerBuilder.getLogger("FileMetadata");
//Metadata format:
//
// 4 bytes: magic bytes: "DHv0" (in ascii: 0x44 48 76 30) (this also signal the metadata format)
@@ -47,9 +51,8 @@ public class MetaFile {
public byte dataLevel;
//Loader stuff
public DataSourceLoader loader;
public Class<?> dataType;
public byte dataVersion;
public long dataTypeId;
public byte loaderVersion;
// Load a metaFile in this path. It also automatically read the metadata.
protected MetaFile(File path) throws IOException {
@@ -65,22 +68,15 @@ public class MetaFile {
int x = buffer.getInt();
int y = buffer.getInt(); // Unused
int z = buffer.getInt();
int checksum = buffer.getInt();
checksum = buffer.getInt();
byte detailLevel = buffer.get();
dataLevel = buffer.get();
byte loaderVersion = buffer.get();
loaderVersion = buffer.get();
byte unused = buffer.get();
long dataTypeId = buffer.getLong();
long timestamp = buffer.getLong();
dataTypeId = buffer.getLong();
timestamp = buffer.getLong();
LodUtil.assertTrue(buffer.remaining() == 0);
this.pos = new DhSectionPos(detailLevel, x, z);
this.loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")");
}
this.dataType = loader.clazz;
this.dataVersion = loaderVersion;
pos = new DhSectionPos(detailLevel, x, z);
}
}
@@ -123,16 +119,11 @@ public class MetaFile {
if (!newPos.equals(pos)) {
throw new IOException("Invalid file: Section position changed.");
}
this.loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")");
}
this.dataType = loader.clazz;
this.dataVersion = loaderVersion;
this.loaderVersion = loaderVersion;
}
}
protected void writeData(BiConsumer<MetaFile, OutputStream> dataWriter) throws IOException {
protected void writeData(Consumer<OutputStream> dataWriter) throws IOException {
validatePath();
File tempFile = File.createTempFile("", "tmp", path.getParentFile());
tempFile.deleteOnExit();
@@ -144,10 +135,8 @@ public class MetaFile {
try (OutputStream channelOut = Channels.newOutputStream(file);
BufferedOutputStream bufferedOut = new BufferedOutputStream(channelOut); // TODO: Is default buffer size ok? Do we even need to buffer?
CheckedOutputStream checkedOut = new CheckedOutputStream(bufferedOut, new Adler32())) { // TODO: Is Adler32 ok?
dataWriter.accept(this, checkedOut);
dataWriter.accept(checkedOut);
checksum = (int) checkedOut.getChecksum().getValue();
timestamp = System.currentTimeMillis(); // TODO: Do we need to use server synced time?
// Warn: This may become an attack vector! Be careful!
}
file.position(0);
// Write metadata
@@ -159,9 +148,9 @@ public class MetaFile {
buff.putInt(checksum);
buff.put(pos.sectionDetail);
buff.put(dataLevel);
buff.put(dataVersion);
buff.put(loaderVersion);
buff.put(Byte.MIN_VALUE); // Unused
buff.putLong(loader.datatypeId);
buff.putLong(dataTypeId);
buff.putLong(timestamp);
LodUtil.assertTrue(buff.remaining() == 0);
buff.flip();
@@ -12,22 +12,21 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.DataSourceLoader;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.save.io.MetaFile;
import com.seibel.lod.core.a7.level.ILevel;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
import org.spongepowered.asm.mixin.injection.Inject;
public class DataMetaFile extends MetaFile {
public static Logger LOGGER = DhLoggerBuilder.getLogger("FileMetadata");
private final ILevel level;
public DataSourceLoader loader;
public Class<? extends LodDataSource> dataType;
AtomicInteger localVersion = new AtomicInteger(); // This MUST be atomic
// The '?' type should either be:
// Reference<LodDataSource>, or - Dirtied file that needs to be saved
// SoftReference<LodDataSource>, or - Non-dirty file that can be GCed
// CompletableFuture<LodDataSource>, or - File that is being loaded
// null - Nothing is loaded or being loaded
@@ -36,13 +35,13 @@ public class DataMetaFile extends MetaFile {
//TODO: use ConcurrentAppendSingleSwapContainer<LodDataSource> instead of below:
private static class GuardedMultiAppendQueue {
ReentrantReadWriteLock appendLock = new ReentrantReadWriteLock();
ConcurrentLinkedQueue<FullFormat> queue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<ChunkSizedData> queue = new ConcurrentLinkedQueue<>();
}
AtomicReference<GuardedMultiAppendQueue> writeQueue =
new AtomicReference<>(new GuardedMultiAppendQueue());
GuardedMultiAppendQueue _backQueue = new GuardedMultiAppendQueue();
public void addToWriteQueue(FullFormat datatype) {
public void addToWriteQueue(ChunkSizedData datatype) {
GuardedMultiAppendQueue queue = writeQueue.get();
// Using read lock is OK, because the queue's underlying data structure is thread-safe.
// This lock is only used to insure on polling the queue, that the queue is not being
@@ -72,6 +71,12 @@ public class DataMetaFile extends MetaFile {
public DataMetaFile(ILevel level, File path) throws IOException {
super(path);
this.level = level;
loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: "
+ dataTypeId + "(v" + loaderVersion + ")");
}
dataType = loader.clazz;
}
// Make a new MetaFile. It doesn't load or write any metadata itself.
@@ -81,7 +86,7 @@ public class DataMetaFile extends MetaFile {
}
public boolean isValid(int version) {
boolean isValid = false;
boolean isValid;
// First check if write queue is empty, then check if localVersion is equal to version.
// Must be done in this order as writer will increment localVersion before polling in the write queue.
// Note: Be careful with the localVerion read's memory order if we do switch over to java 1.9.
@@ -93,13 +98,17 @@ public class DataMetaFile extends MetaFile {
isValid &= localVersion.get() == version; // The 'get()' enforce a memory barrier.
return isValid;
}
// Suppress casting of CompletableFuture<?> to CompletableFuture<LodDataSource>
@SuppressWarnings("unchecked")
private CompletableFuture<LodDataSource> _readCached(Object obj) {
// Has file cached in RAM and not freed yet.
if ((obj instanceof SoftReference<?>)) {
Object inner = ((SoftReference<?>)obj).get();
if (inner != null) {
LodUtil.assertTrue(inner instanceof LodDataSource);
//TODO: Apply the write if queue is not empty
return CompletableFuture.completedFuture((LodDataSource)inner);
}
}
@@ -120,20 +129,22 @@ public class DataMetaFile extends MetaFile {
CompletableFuture<LodDataSource> cached = _readCached(obj);
if (cached != null) return cached;
CompletableFuture<LodDataSource> future = new CompletableFuture<LodDataSource>();
CompletableFuture<LodDataSource> future = new CompletableFuture<>();
// Would use faster and non-nesting Compare and exchange. But java 8 doesn't have it! :(
boolean worked = data.compareAndSet(obj, future);
if (!worked) return loadOrGetCached(fileReaderThreads);
// Would use CompletableFuture.completeAsync(...), But, java 8 doesn't have it! :(
//return future.completeAsync(this::loadFile, fileReaderThreads);
CompletableFuture.supplyAsync(this::loadAndUpdateDataSource, fileReaderThreads).whenComplete((f, e) -> {
//return future.completeAsync(this::loadAndUpdateDataSource, fileReaderThreads);
CompletableFuture.supplyAsync(this::loadAndUpdateDataSource, fileReaderThreads)
.whenComplete((f, e) -> {
if (e != null) {
LOGGER.error("Uncaught error loading file {}: ", path, e);
future.complete(null);
}
future.complete(f);
data.set(new SoftReference<>(f));
});
return future;
}
@@ -201,13 +212,27 @@ public class DataMetaFile extends MetaFile {
}
}
@Override
protected void updateMetaData() throws IOException {
super.updateMetaData();
loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")");
}
dataType = loader.clazz;
dataTypeId = loader.datatypeId;
}
private void write(LodDataSource data) {
try {
super.writeData((meta, out) -> {
meta.dataLevel = data.getDataDetail();
meta.dataType = data.getClass();
meta.loader = DataSourceLoader.getLoader(data.getClass(), data.getDataVersion());
meta.dataVersion = data.getDataVersion();
dataLevel = data.getDataDetail();
loader = DataSourceLoader.getLoader(data.getClass(), data.getDataVersion());
dataType = data.getClass();
dataTypeId = loader.datatypeId;
loaderVersion = data.getDataVersion();
timestamp = System.currentTimeMillis(); // TODO: Do we need to use server synced time?
// Warn: This may become an attack vector! Be careful!
super.writeData((out) -> {
try {
data.saveData(level, this, out);
} catch (IOException e) {
@@ -1,8 +1,10 @@
package com.seibel.lod.core.a7.save.io.file;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.objects.DHChunkPos;
import java.io.File;
import java.util.Collection;
@@ -12,6 +14,8 @@ public interface IDataSourceProvider extends AutoCloseable {
void addScannedFile(Collection<File> detectedFiles);
CompletableFuture<LodDataSource> read(DhSectionPos pos);
void write(DhSectionPos sectionPos, FullFormat chunkData);
void write(DhSectionPos sectionPos, ChunkSizedData chunkData);
CompletableFuture<Void> flushAndSave();
boolean isCacheValid(DhSectionPos sectionPos, long timestamp);
}
@@ -2,10 +2,12 @@ package com.seibel.lod.core.a7.save.io.file;
import com.google.common.collect.HashMultimap;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.level.IServerLevel;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.objects.DHChunkPos;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
@@ -21,15 +23,13 @@ import java.util.concurrent.ExecutorService;
public class LocalDataFileHandler implements IDataSourceProvider {
// Note: Single main thread only for now. May make it multi-thread later, depending on the usage.
ExecutorService fileReaderThread = LodUtil.makeSingleThreadPool("FileReaderThread");
Logger logger = DhLoggerBuilder.getLogger("LocalDataFileHandler");
ConcurrentHashMap<DhSectionPos, DataMetaFile> files = new ConcurrentHashMap<>();
boolean isScanned = false;
File saveDir;
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
final ExecutorService fileReaderThread = LodUtil.makeSingleThreadPool("FileReaderThread");
final ConcurrentHashMap<DhSectionPos, DataMetaFile> files = new ConcurrentHashMap<>();
final IServerLevel level;
final File saveDir;
public LocalDataFileHandler(IServerLevel level, File saveRootDir) {
this.saveDir = saveRootDir;
this.level = level;
@@ -73,7 +73,7 @@ public class LocalDataFileHandler implements IDataSourceProvider {
sb.append(fileToUse.path);
sb.append("\n");
sb.append("(Other files will be renamed by appending \".old\" to their name.)");
logger.warn(sb.toString());
LOGGER.warn(sb.toString());
// Rename all other files with the same pos to .old
for (DataMetaFile metaFile : metaFiles) {
@@ -82,7 +82,7 @@ public class LocalDataFileHandler implements IDataSourceProvider {
try {
if (!metaFile.path.renameTo(oldFile)) throw new RuntimeException("Renaming failed");
} catch (Exception e) {
logger.error("Failed to rename file: " + metaFile.path + " to " + oldFile, e);
LOGGER.error("Failed to rename file: " + metaFile.path + " to " + oldFile, e);
}
}
}
@@ -110,7 +110,7 @@ public class LocalDataFileHandler implements IDataSourceProvider {
* This call is concurrent. I.e. it supports multiple threads calling this method at the same time.
*/
@Override
public void write(DhSectionPos sectionPos, FullFormat chunkData) {
public void write(DhSectionPos sectionPos, ChunkSizedData chunkData) {
DataMetaFile metaFile = files.get(sectionPos);
if (metaFile != null) { // Fast path: if there is a file for this section, just write to it.
metaFile.addToWriteQueue(chunkData);
@@ -142,6 +142,14 @@ public class LocalDataFileHandler implements IDataSourceProvider {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
@Override
public boolean isCacheValid(DhSectionPos sectionPos, long timestamp) {
DataMetaFile file = files.get(sectionPos);
if (file == null) return false;
//TODO
return true;
}
private File computeDefaultFilePath(DhSectionPos pos) { //TODO: Temp code as we haven't decided on the file naming & location yet.
return new File(saveDir, pos.serialize() + ".lod");
}
@@ -1,6 +1,7 @@
package com.seibel.lod.core.a7.save.io.file;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.pos.DhSectionPos;
@@ -20,7 +21,7 @@ public class RemoteDataFileHandler implements IDataSourceProvider {
}
@Override
public void write(DhSectionPos sectionPos, FullFormat chunkData) {
public void write(DhSectionPos sectionPos, ChunkSizedData chunkData) {
}
@@ -29,6 +30,11 @@ public class RemoteDataFileHandler implements IDataSourceProvider {
return null;
}
@Override
public boolean isCacheValid(DhSectionPos sectionPos, long timestamp) {
return false;
}
@Override
public void close() throws Exception {
@@ -1,6 +1,7 @@
package com.seibel.lod.core.a7.save.io.render;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.datatype.LodRenderSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import java.io.File;
@@ -8,7 +9,8 @@ import java.util.Collection;
import java.util.concurrent.CompletableFuture;
public interface IRenderSourceProvider extends AutoCloseable {
CompletableFuture<LodRenderSource> read(DhSectionPos pos);
void addScannedFile(Collection<File> detectedFiles);
void write(DhSectionPos sectionPos, FullFormat chunkData);
void write(DhSectionPos sectionPos, ChunkSizedData chunkData);
CompletableFuture<Void> flushAndSave();
}
@@ -1,45 +1,153 @@
package com.seibel.lod.core.a7.save.io.render;
import com.google.common.collect.HashMultimap;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.LodRenderSource;
import com.seibel.lod.core.a7.datatype.RenderSourceLoader;
import com.seibel.lod.core.a7.datatype.column.ColumnRenderLoader;
import com.seibel.lod.core.a7.datatype.column.ColumnRenderSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.level.IClientLevel;
import com.seibel.lod.core.a7.save.io.file.DataMetaFile;
import com.seibel.lod.core.a7.save.io.file.IDataSourceProvider;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.objects.DHChunkPos;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
import org.lwjgl.system.CallbackI;
import java.io.File;
import java.util.Collection;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
public class RenderFileHandler implements IRenderSourceProvider {
final File renderCacheFolder;
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
final ExecutorService renderCacheThread = LodUtil.makeSingleThreadPool("RenderCacheThread");
final ConcurrentHashMap<DhSectionPos, RenderMetaFile> files = new ConcurrentHashMap<>();
final IClientLevel level;
final File saveDir;
final IDataSourceProvider dataSourceProvider;
ExecutorService renderCacheThread = LodUtil.makeSingleThreadPool("RenderCacheThread");
Logger logger = DhLoggerBuilder.getLogger("RenderCache");
public RenderFileHandler(IDataSourceProvider sourceProvider, File renderCacheFolder) {
public RenderFileHandler(IDataSourceProvider sourceProvider, IClientLevel level, File saveRootDir) {
this.dataSourceProvider = sourceProvider;
this.renderCacheFolder = renderCacheFolder;
this.level = level;
this.saveDir = saveRootDir;
}
/*
* Caller must ensure that this method is called only once,
* and that this object is not used before this method is called.
*/
@Override
public void addScannedFile(Collection<File> detectedFiles) {
HashMultimap<DhSectionPos, RenderMetaFile> filesByPos = HashMultimap.create();
{ // Sort files by pos.
for (File file : detectedFiles) {
try {
RenderMetaFile metaFile = new RenderMetaFile(
dataSourceProvider::isCacheValid,
dataSourceProvider::read,
level, file
);
filesByPos.put(metaFile.pos, metaFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
// Warn for multiple files with the same pos, and then select the one with latest timestamp.
for (DhSectionPos pos : filesByPos.keySet()) {
Collection<RenderMetaFile> metaFiles = filesByPos.get(pos);
RenderMetaFile fileToUse;
if (metaFiles.size() > 1) {
fileToUse = Collections.max(metaFiles, Comparator.comparingLong(a -> a.timestamp));
{
StringBuilder sb = new StringBuilder();
sb.append("Multiple files with the same pos: ");
sb.append(pos);
sb.append("\n");
for (RenderMetaFile metaFile : metaFiles) {
sb.append("\t");
sb.append(metaFile.path);
sb.append("\n");
}
sb.append("\tUsing: ");
sb.append(fileToUse.path);
sb.append("\n");
sb.append("(Other files will be renamed by appending \".old\" to their name.)");
LOGGER.warn(sb.toString());
// Rename all other files with the same pos to .old
for (RenderMetaFile metaFile : metaFiles) {
if (metaFile == fileToUse) continue;
File oldFile = new File(metaFile.path + ".old");
try {
if (!metaFile.path.renameTo(oldFile)) throw new RuntimeException("Renaming failed");
} catch (Exception e) {
LOGGER.error("Failed to rename file: " + metaFile.path + " to " + oldFile, e);
}
}
}
} else {
fileToUse = metaFiles.iterator().next();
}
// Add file to the list of files.
files.put(pos, fileToUse);
}
}
/*
* This call is concurrent. I.e. it supports multiple threads calling this method at the same time.
*/
@Override
public void write(DhSectionPos sectionPos, FullFormat chunkData) {
public CompletableFuture<LodRenderSource> read(DhSectionPos pos) {
RenderMetaFile metaFile = files.computeIfAbsent(pos, (p) -> new RenderMetaFile(
dataSourceProvider::isCacheValid,
dataSourceProvider::read,
level, computeDefaultFilePath(p), p));
return metaFile.loadOrGetCached(renderCacheThread);
}
/*
* This call is concurrent. I.e. it supports multiple threads calling this method at the same time.
*/
@Override
public void write(DhSectionPos sectionPos, ChunkSizedData chunkData) {
dataSourceProvider.write(sectionPos, chunkData);
RenderMetaFile metaFile = files.get(sectionPos);
if (metaFile != null) { // Fast path: if there is a file for this section, just write to it.
metaFile.updateChunkIfNeeded(chunkData);
}
}
/*
* This call is concurrent. I.e. it supports multiple threads calling this method at the same time.
*/
@Override
public CompletableFuture<Void> flushAndSave() {
return null;
ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
for (RenderMetaFile metaFile : files.values()) {
futures.add(metaFile.flushAndSave(renderCacheThread));
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private File computeDefaultFilePath(DhSectionPos pos) { //TODO: Temp code as we haven't decided on the file naming & location yet.
return new File(saveDir, pos.serialize() + ".lod");
}
@Override
public void close() {
ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
for (RenderMetaFile metaFile : files.values()) {
futures.add(metaFile.flushAndSave(renderCacheThread));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
}
@@ -1,18 +1,209 @@
package com.seibel.lod.core.a7.save.io.render;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.LodRenderSource;
import com.seibel.lod.core.a7.datatype.RenderSourceLoader;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.datatype.transform.DataRenderTransformer;
import com.seibel.lod.core.a7.level.IClientLevel;
import com.seibel.lod.core.a7.level.ILevel;
import com.seibel.lod.core.a7.save.io.MetaFile;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.a7.save.io.file.DataMetaFile;
import com.seibel.lod.core.util.LodUtil;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class RenderMetaFile extends MetaFile {
private final IClientLevel level;
public RenderSourceLoader loader;
public Class<? extends LodRenderSource> dataType;
protected RenderMetaFile(File path) throws IOException {
super(path);
// The '?' type should either be:
// SoftReference<LodRenderSource>, or - File that may still be loaded
// CompletableFuture<LodRenderSource>,or - File that is being loaded
// null - Nothing is loaded or being loaded
AtomicReference<Object> data = new AtomicReference<>(null);
//FIXME: This can cause concurrent modification of LodRenderSource.
// Not sure if it will cause issues or not.
public void updateChunkIfNeeded(ChunkSizedData chunkData) {
CompletableFuture<LodRenderSource> source = _readCached(data.get());
if (source.isDone()) source.join().update(chunkData);
}
protected RenderMetaFile(File path, DhSectionPos pos) {
public CompletableFuture<Void> flushAndSave(ExecutorService renderCacheThread) {
CompletableFuture<LodRenderSource> source = _readCached(data.get());
return source.thenAccept((a)->{});
//TODO: Should we save the data or let user re-calculate it on new load?
}
@FunctionalInterface
public interface CacheValidator {
boolean isCacheValid(DhSectionPos sectionPos, long timestamp);
}
@FunctionalInterface
public interface CacheSourceProducer {
CompletableFuture<LodDataSource> getSourceFuture(DhSectionPos sectionPos);
}
CacheValidator validator;
CacheSourceProducer source;
// Load a metaFile in this path. It also automatically read the metadata.
public RenderMetaFile(CacheValidator validator, CacheSourceProducer source,
IClientLevel level, File path) throws IOException {
super(path);
this.level = level;
loader = RenderSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: "
+ dataTypeId + "(v" + loaderVersion + ")");
}
dataType = loader.clazz;
this.validator = validator;
this.source = source;
}
// Make a new MetaFile. It doesn't load or write any metadata itself.
public RenderMetaFile(CacheValidator validator, CacheSourceProducer source,
IClientLevel level, File path, DhSectionPos pos) {
super(path, pos);
this.level = level;
this.validator = validator;
this.source = source;
}
// Suppress casting of CompletableFuture<?> to CompletableFuture<LodRenderSource>
@SuppressWarnings("unchecked")
private CompletableFuture<LodRenderSource> _readCached(Object obj) {
// Has file cached in RAM and not freed yet.
if ((obj instanceof SoftReference<?>)) {
Object inner = ((SoftReference<?>)obj).get();
if (inner != null) {
LodUtil.assertTrue(inner instanceof LodRenderSource);
return CompletableFuture.completedFuture((LodRenderSource)inner);
}
}
//==== Cached file out of scrope. ====
// Someone is already trying to complete it. so just return the obj.
if ((obj instanceof CompletableFuture<?>)) {
return (CompletableFuture<LodRenderSource>)obj;
}
return null;
}
// Cause: Generic Type runtime casting cannot safety check it.
// However, the Union type ensures the 'data' should only contain the listed type.
public CompletableFuture<LodRenderSource> loadOrGetCached(Executor fileReaderThreads) {
Object obj = data.get();
CompletableFuture<LodRenderSource> cached = _readCached(obj);
if (cached != null) return cached;
// Create an empty and non-completed future.
// Note: I do this before actually filling in the future so that I can ensure only
// one task is submitted to the thread pool.
CompletableFuture<LodRenderSource> future = new CompletableFuture<>();
// Would use faster and non-nesting Compare and exchange. But java 8 doesn't have it! :(
boolean worked = data.compareAndSet(obj, future);
if (!worked) return loadOrGetCached(fileReaderThreads);
// Now, there should only ever be one thread at a time here due to the CAS operation above.
// Would use CompletableFuture.completeAsync(...), But, java 8 doesn't have it! :(
//return future.completeAsync(this::loadAndUpdateRenderSource, fileReaderThreads);
CompletableFuture.supplyAsync(() -> buildFuture(fileReaderThreads), fileReaderThreads)
.thenCompose((sourceCompletableFuture) -> sourceCompletableFuture)
.whenComplete((renderSource, e) -> {
if (e != null) {
LOGGER.error("Uncaught error loading file {}: ", path, e);
future.complete(null);
}
future.complete(renderSource);
data.set(new SoftReference<>(renderSource));
});
return future;
}
private CompletableFuture<LodRenderSource> buildFuture(Executor executorService) {
if (path.exists()) {
try {
updateMetaData();
if (validator.isCacheValid(pos, timestamp)) {
// Load the file.
try (FileInputStream fio = getDataContent()) {
return CompletableFuture.completedFuture(
loader.loadRender(this, fio, level));
}
}
} catch (IOException e) {
LOGGER.warn("Failed to read render cache at {}:", path, e);
LOGGER.warn("Will ignore cache file.");
}
}
// Otherwise, re-query and make the RenderSource
CompletableFuture<LodDataSource> dataFuture = source.getSourceFuture(pos);
return dataFuture.thenCombineAsync(
DataRenderTransformer.asyncTransformDataSource(dataFuture, level),
this::write, executorService);
}
private FileInputStream getDataContent() throws IOException {
FileInputStream fin = new FileInputStream(path);
int toSkip = METADATA_SIZE;
while (toSkip > 0) {
long skipped = fin.skip(toSkip);
if (skipped == 0) {
throw new IOException("Invalid file: Failed to skip metadata.");
}
toSkip -= skipped;
}
if (toSkip != 0) {
throw new IOException("File IO Error: Failed to skip metadata.");
}
return fin;
}
@Override
protected void updateMetaData() throws IOException {
super.updateMetaData();
loader = RenderSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")");
}
dataType = loader.clazz;
dataTypeId = loader.renderTypeId;
}
private LodRenderSource write(LodDataSource parent, LodRenderSource render) {
if (parent == null) return null;
try {
//TODO: Update Timestamp & stuff based on parent
dataLevel = parent.getDataDetail();
loader = RenderSourceLoader.getLoader(render.getClass(), render.getRenderVersion());
dataType = render.getClass();
dataTypeId = loader.renderTypeId;
loaderVersion = render.getRenderVersion();
super.writeData((out) -> {
try {
render.saveRender(level, this, out);
} catch (IOException e) {
LOGGER.error("Failed to save data for file {}", path, e);
}
});
} catch (IOException e) {
LOGGER.error("Failed to write data for file {}", path, e);
}
return render;
}
}