From 742f8b53bbd91ec09e3871999c29a2ca6a35effc Mon Sep 17 00:00:00 2001 From: TomTheFurry Date: Wed, 15 Jun 2022 22:17:59 +0800 Subject: [PATCH] Continue work on the complex concurrent file system --- .../lod/core/objects/a7/data/DataFile.java | 12 +- .../objects/a7/data/DataSourceLoader.java | 7 +- .../core/objects/a7/data/LodDataSource.java | 1 + .../a7/datatype/column/Alpha6DataLoader.java | 6 +- .../a7/datatype/column/ColumnDataLoader.java | 7 +- .../lod/core/objects/a7/io/DataSource.java | 14 ++ .../lod/core/objects/a7/io/FileScanner.java | 4 + .../lod/core/objects/a7/io/MetaFile.java | 104 +++++++++++- .../core/objects/a7/io/file/DataMetaFile.java | 160 +++++++++++++++--- .../a7/io/file/LocalDataFileHandler.java | 145 +++++++++++++++- .../lod/core/objects/a7/pos/DhSectionPos.java | 2 +- 11 files changed, 407 insertions(+), 55 deletions(-) create mode 100644 src/main/java/com/seibel/lod/core/objects/a7/io/DataSource.java create mode 100644 src/main/java/com/seibel/lod/core/objects/a7/io/FileScanner.java diff --git a/src/main/java/com/seibel/lod/core/objects/a7/data/DataFile.java b/src/main/java/com/seibel/lod/core/objects/a7/data/DataFile.java index 0872797f9..161ede894 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/data/DataFile.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/data/DataFile.java @@ -10,6 +10,7 @@ import java.io.*; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +@Deprecated public class DataFile { //Metadata format: // @@ -86,6 +87,8 @@ public class DataFile { this.dataType = loader.clazz; this.loaderVersion = loaderVersion; } + + public FileInputStream getDataContent() throws IOException { FileInputStream fin = new FileInputStream(path); int toSkip = METADATA_SIZE; @@ -103,14 +106,7 @@ public class DataFile { } LodDataSource load(DHLevel level) { - if (loadedData != null) return loadedData; - try { - loadedData = loader.loadData(this, level); - return loadedData; - } catch (IOException e) { - //FIXME: Log and review this handling - return null; - } + throw new UnsupportedOperationException("Deprecated"); } public boolean verifyPath() { diff --git a/src/main/java/com/seibel/lod/core/objects/a7/data/DataSourceLoader.java b/src/main/java/com/seibel/lod/core/objects/a7/data/DataSourceLoader.java index 58f25446f..e3099b134 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/data/DataSourceLoader.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/data/DataSourceLoader.java @@ -2,9 +2,8 @@ package com.seibel.lod.core.objects.a7.data; import com.google.common.collect.HashMultimap; import com.seibel.lod.core.objects.a7.DHLevel; -import com.seibel.lod.core.objects.a7.pos.DhSectionPos; +import com.seibel.lod.core.objects.a7.io.file.DataMetaFile; -import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -38,14 +37,14 @@ public abstract class DataSourceLoader { return false; })) { throw new IllegalArgumentException("Loader for class " + clazz + " that supports one of the version in " - + loaderSupportedVersions + " already registered!"); + + Arrays.toString(loaderSupportedVersions) + " already registered!"); } datatypeIdRegistry.put(datatypeId, clazz); loaderRegistry.put(datatypeId, this); } // Can return null as meaning the requirement is not met - public abstract LodDataSource loadData(DataFile dataFile, DHLevel level) throws IOException; + public abstract LodDataSource loadData(DataMetaFile dataFile, InputStream data, DHLevel level) throws IOException; public List foldersToScan(File levelFolderPath) { return Collections.emptyList(); diff --git a/src/main/java/com/seibel/lod/core/objects/a7/data/LodDataSource.java b/src/main/java/com/seibel/lod/core/objects/a7/data/LodDataSource.java index 99cfc9efd..33a3e35f7 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/data/LodDataSource.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/data/LodDataSource.java @@ -9,4 +9,5 @@ public interface LodDataSource { DataSourceLoader getLatestLoader(); DhSectionPos getSectionPos(); byte getDataDetail(); + void setLocalVersion(int localVer); } diff --git a/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/Alpha6DataLoader.java b/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/Alpha6DataLoader.java index 0d293070a..2c9ca7ff0 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/Alpha6DataLoader.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/Alpha6DataLoader.java @@ -3,6 +3,7 @@ package com.seibel.lod.core.objects.a7.datatype.column; import com.seibel.lod.core.enums.config.EVerticalQuality; import com.seibel.lod.core.objects.a7.DHLevel; import com.seibel.lod.core.objects.a7.data.*; +import com.seibel.lod.core.objects.a7.io.file.DataMetaFile; import com.seibel.lod.core.objects.a7.pos.DhSectionPos; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; @@ -20,11 +21,10 @@ public class Alpha6DataLoader extends OldDataSourceLoader implements OldFileConv } @Override - public LodDataSource loadData(DataFile dataFile, DHLevel level) { + public LodDataSource loadData(DataMetaFile dataFile, InputStream data, DHLevel level) { //TODO: Add decompressor here try ( - FileInputStream fin = dataFile.getDataContent(); - XZCompressorInputStream xzIn = new XZCompressorInputStream(fin); + XZCompressorInputStream xzIn = new XZCompressorInputStream(data); DataInputStream dis = new DataInputStream(xzIn); ) { return new OldColumnDatatype(dataFile.pos, dis, dataFile.loaderVersion, level, 1); diff --git a/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/ColumnDataLoader.java b/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/ColumnDataLoader.java index e4bc45a3c..c9a0f36b6 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/ColumnDataLoader.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/datatype/column/ColumnDataLoader.java @@ -3,9 +3,9 @@ package com.seibel.lod.core.objects.a7.datatype.column; import com.seibel.lod.core.config.Config; import com.seibel.lod.core.enums.config.EVerticalQuality; import com.seibel.lod.core.objects.a7.DHLevel; -import com.seibel.lod.core.objects.a7.data.DataFile; import com.seibel.lod.core.objects.a7.data.DataFileHandler; import com.seibel.lod.core.objects.a7.data.LodDataSource; +import com.seibel.lod.core.objects.a7.io.file.DataMetaFile; import com.seibel.lod.core.objects.a7.pos.DhSectionPos; import java.io.*; @@ -21,11 +21,10 @@ public class ColumnDataLoader extends DataSourceSaver { } @Override - public LodDataSource loadData(DataFile dataFile, DHLevel level) { + public LodDataSource loadData(DataMetaFile dataFile, InputStream data, DHLevel level) { try ( - FileInputStream fin = dataFile.getDataContent(); //TODO: Add decompressor here - DataInputStream dis = new DataInputStream(fin); + DataInputStream dis = new DataInputStream(data); ) { return new ColumnDatatype(dataFile.pos, dis, dataFile.loaderVersion, level); } catch (IOException e) { diff --git a/src/main/java/com/seibel/lod/core/objects/a7/io/DataSource.java b/src/main/java/com/seibel/lod/core/objects/a7/io/DataSource.java new file mode 100644 index 000000000..5260124da --- /dev/null +++ b/src/main/java/com/seibel/lod/core/objects/a7/io/DataSource.java @@ -0,0 +1,14 @@ +package com.seibel.lod.core.objects.a7.io; + +import com.seibel.lod.core.objects.a7.data.LodDataSource; +import com.seibel.lod.core.objects.a7.datatype.full.FullDatatype; +import com.seibel.lod.core.objects.a7.pos.DhSectionPos; + +import java.util.concurrent.CompletableFuture; + +public interface DataSource { + CompletableFuture read(DhSectionPos pos); + void write(DhSectionPos sectionPos, FullDatatype chunkData); + + CompletableFuture flushAndSave(); +} diff --git a/src/main/java/com/seibel/lod/core/objects/a7/io/FileScanner.java b/src/main/java/com/seibel/lod/core/objects/a7/io/FileScanner.java new file mode 100644 index 000000000..23700f561 --- /dev/null +++ b/src/main/java/com/seibel/lod/core/objects/a7/io/FileScanner.java @@ -0,0 +1,4 @@ +package com.seibel.lod.core.objects.a7.io; + +public class FileScanner { +} diff --git a/src/main/java/com/seibel/lod/core/objects/a7/io/MetaFile.java b/src/main/java/com/seibel/lod/core/objects/a7/io/MetaFile.java index b02ae962c..9d95ec11b 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/io/MetaFile.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/io/MetaFile.java @@ -1,8 +1,13 @@ package com.seibel.lod.core.objects.a7.io; import java.io.*; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import com.seibel.lod.core.objects.a7.data.DataFile; +import com.seibel.lod.core.objects.a7.data.DataSourceLoader; import com.seibel.lod.core.objects.a7.pos.DhSectionPos; +import com.seibel.lod.core.util.LodUtil; public abstract class MetaFile { //Metadata format: @@ -20,22 +25,103 @@ public abstract class MetaFile { // // 8 bytes: datatype identifier // - // 8 bytes: unused + // 8 bytes: timestamp // Total size: 32 bytes public static final int METADATA_SIZE = 32; public static final int METADATA_MAGIC_BYTES = 0x44_48_76_30; - protected final File path; - protected final DhSectionPos pos; + public final DhSectionPos pos; + + public File path; + public int checksum; + public long timestamp; public byte dataLevel; + + //Loader stuff + public DataSourceLoader loader; + public Class dataType; public byte loaderVersion; - - public MetaFile(File path, DhSectionPos pos) - { - this.path = path; - this.pos = pos; + + // Load a metaFile in this path. It also automatically read the metadata. + protected MetaFile(File path) throws IOException { + validatePath(); + try (FileInputStream fin = new FileInputStream(path)) { + MappedByteBuffer buffer = fin.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, METADATA_SIZE); + this.path = path; + + int magic = buffer.getInt(); + if (magic != METADATA_MAGIC_BYTES) { + throw new IOException("Invalid file: Magic bytes check failed."); + } + int x = buffer.getInt(); + int y = buffer.getInt(); // Unused + int z = buffer.getInt(); + int checksum = buffer.getInt(); + byte detailLevel = buffer.get(); + dataLevel = buffer.get(); + byte loaderVersion = buffer.get(); + byte unused = buffer.get(); + long dataTypeId = buffer.getLong(); + long timestamp = buffer.getLong(); + LodUtil.assertTrue(buffer.remaining() == 0); + + this.pos = new DhSectionPos(detailLevel, x, z); + this.loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion); + if (loader == null) { + throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")"); + } + this.dataType = loader.clazz; + this.loaderVersion = loaderVersion; + } + } + + // Make a new MetaFile. It doesn't load or write any metadata itself. + protected MetaFile(File path, DhSectionPos pos) { + this.path = path; + this.pos = pos; + } + + protected void save() {} //TODO: Implement + + private void validatePath() throws IOException { + if (!path.exists()) throw new IOException("File missing"); + if (!path.isFile()) throw new IOException("Not a file"); + if (!path.canRead()) throw new IOException("File not readable"); + if (!path.canWrite()) throw new IOException("File not writable"); + } + + protected void updateMetaData() throws IOException { + validatePath(); + try (FileInputStream fin = new FileInputStream(path)) { + MappedByteBuffer buffer = fin.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, METADATA_SIZE); + int magic = buffer.getInt(); + if (magic != METADATA_MAGIC_BYTES) { + throw new IOException("Invalid file: Magic bytes check failed."); + } + int x = buffer.getInt(); + int y = buffer.getInt(); // Unused + int z = buffer.getInt(); + int checksum = buffer.getInt(); + byte detailLevel = buffer.get(); + dataLevel = buffer.get(); + byte loaderVersion = buffer.get(); + byte unused = buffer.get(); + long dataTypeId = buffer.getLong(); + long timestamp = buffer.getLong(); + LodUtil.assertTrue(buffer.remaining() == 0); + + DhSectionPos newPos = new DhSectionPos(detailLevel, x, z); + if (!newPos.equals(pos)) { + throw new IOException("Invalid file: Section position changed."); + } + this.loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion); + if (loader == null) { + throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")"); + } + this.dataType = loader.clazz; + this.loaderVersion = loaderVersion; + } } - } diff --git a/src/main/java/com/seibel/lod/core/objects/a7/io/file/DataMetaFile.java b/src/main/java/com/seibel/lod/core/objects/a7/io/file/DataMetaFile.java index 01cd1dca5..8681147c0 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/io/file/DataMetaFile.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/io/file/DataMetaFile.java @@ -1,39 +1,99 @@ package com.seibel.lod.core.objects.a7.io.file; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; import java.lang.ref.SoftReference; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.seibel.lod.core.logging.DhLoggerBuilder; import com.seibel.lod.core.objects.a7.data.LodDataSource; +import com.seibel.lod.core.objects.a7.datatype.full.FullDatatype; import com.seibel.lod.core.objects.a7.io.MetaFile; import com.seibel.lod.core.objects.a7.pos.DhSectionPos; import com.seibel.lod.core.util.LodUtil; +import org.apache.logging.log4j.Logger; public class DataMetaFile extends MetaFile { - AtomicInteger localVersion = new AtomicInteger(); + public static Logger LOGGER = DhLoggerBuilder.getLogger("FileMetadata"); + AtomicInteger localVersion = new AtomicInteger(); // This MUST be atomic // The '?' type should either be: - // SoftReference, or - // CompletableFuture, or - // null + // Reference, or - Dirtied file that needs to be saved + // SoftReference, or - Non-dirty file that can be GCed + // CompletableFuture, or - File that is being loaded + // null - Nothing is loaded or being loaded AtomicReference data = new AtomicReference(null); - - + + //TODO: use ConcurrentAppendSingleSwapContainer instead of below: + private static class GuardedMultiAppendQueue { + ReentrantReadWriteLock appendLock = new ReentrantReadWriteLock(); + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + } + AtomicReference writeQueue = + new AtomicReference<>(new GuardedMultiAppendQueue()); + GuardedMultiAppendQueue _backQueue = new GuardedMultiAppendQueue(); + + public void addToWriteQueue(FullDatatype datatype) { + GuardedMultiAppendQueue queue = writeQueue.get(); + // Using read lock is OK, because the queue's underlying data structure is thread-safe. + // This lock is only used to insure on polling the queue, that the queue is not being + // modified by another thread. + Lock appendLock = queue.appendLock.readLock(); + appendLock.lock(); + try { + queue.queue.add(datatype); + } finally { + appendLock.unlock(); + } + } + private void swapWriteQueue() { + GuardedMultiAppendQueue queue = writeQueue.getAndSet(_backQueue); + // Acquire write lock and then release it again as we only need to ensure that the queue + // is not being appended to by another thread. Note that the above atomic swap & + // the guarantee that all append first acquire the appendLock means after the locK() call, + // there will be no other threads able to or is currently appending to the queue. + // Note: The above needs the getAndSet() to have at least Release Memory order. + // (not that java supports anything non volatile for getAndSet()...) + queue.appendLock.writeLock().lock(); + queue.appendLock.writeLock().unlock(); + _backQueue = queue; + } + + // Load a metaFile in this path. It also automatically read the metadata. + public DataMetaFile(File path) throws IOException { + super(path); + } + + // Make a new MetaFile. It doesn't load or write any metadata itself. public DataMetaFile(File path, DhSectionPos pos) { super(path, pos); - // TODO Auto-generated constructor stub } public boolean isValid(int version) { - return (localVersion.get() == version); + boolean isValid = false; + // First check if write queue is empty, then check if localVersion is equal to version. + // Must be done in this order as writer will increment localVersion before polling in the write queue. + // Note: Be careful with the localVerion read's memory order if we do switch over to java 1.9. + // It should be acquire or higher! + + isValid = writeQueue.get().queue.isEmpty(); // The 'get()' & 'isEmpty()' enforce a memory barrier. + // Also, we are just querying the state, and this means no + // need to get any locks for the queue. + isValid &= localVersion.get() == version; // The 'get()' enforce a memory barrier. + return isValid; } - private CompletableFuture readCached(Object obj) { + private CompletableFuture _readCached(Object obj) { // Has file cached in RAM and not freed yet. - if (obj != null && (obj instanceof SoftReference)) { + if ((obj instanceof SoftReference)) { Object inner = ((SoftReference)obj).get(); if (inner != null) { LodUtil.assertTrue(inner instanceof LodDataSource); @@ -43,19 +103,18 @@ public class DataMetaFile extends MetaFile { //==== Cached file out of scrope. ==== // Someone is already trying to complete it. so just return the obj. - if (obj != null && (obj instanceof CompletableFuture)) { + if ((obj instanceof CompletableFuture)) { return (CompletableFuture)obj; } return null; } // Cause: Generic Type runtime casting cannot safety check it. - // However, the Union type ensures the 'data' should only contain the listed type. - @SuppressWarnings("unchecked") + // However, the Union type ensures the 'data' should only contain the listed type. public CompletableFuture loadOrGetCached(Executor fileReaderThreads) { Object obj = data.get(); - CompletableFuture cached = readCached(obj); + CompletableFuture cached = _readCached(obj); if (cached != null) return cached; CompletableFuture future = new CompletableFuture(); @@ -64,21 +123,78 @@ public class DataMetaFile extends MetaFile { boolean worked = data.compareAndSet(obj, future); if (!worked) return loadOrGetCached(fileReaderThreads); - // Would use ComplatableFuture.completeAsync(...), But, java 8 doesn't have it! :( + // Would use CompletableFuture.completeAsync(...), But, java 8 doesn't have it! :( //return future.completeAsync(this::loadFile, fileReaderThreads); - - CompletableFuture.supplyAsync(this::loadFile, fileReaderThreads).whenComplete((f, e) -> { - if (e != null) future.completeExceptionally(e); + CompletableFuture.supplyAsync(this::loadAndUpdateDataSource, fileReaderThreads).whenComplete((f, e) -> { + if (e != null) { + LOGGER.error("Uncaught error loading file {}: ", path, e); + future.complete(null); + } future.complete(f); }); return future; } - private LodDataSource loadFile() { - // TODO + private LodDataSource loadAndUpdateDataSource() { + LodDataSource data = loadFile(); + if (data == null) return null; + + // Poll the write queue + // First check if write queue is empty, then swap the write queue. + // Must be done in this order to ensure isValid work properly. See isValid() for details. + boolean isEmpty = writeQueue.get().queue.isEmpty(); + int localVer; + if (!isEmpty) { + localVer = localVersion.incrementAndGet(); + swapWriteQueue(); + // TODO: Use _backQueue to apply the changes into the data. + // TODO: Trigger a save to disk. + } else localVer = localVersion.get(); + data.setLocalVersion(localVer); + // Finally, return the data. return null; } - - + private LodDataSource loadFile() { + // Refresh the metadata. + try { + super.updateMetaData(); + } catch (IOException e) { + LOGGER.warn("Metadata for file {} changed unexpectedly and in an invalid state. Dropping file.", path, e); + return null; + } + + // Load the file. + try (FileInputStream fio = getDataContent()){ + return loader.loadData(this, fio, null); // FIXME: somehow get the level object???? + } catch (IOException e) { + LOGGER.warn("Failed to load file {}. Dropping file.", path, e); + return null; + } + } + private FileInputStream getDataContent() throws IOException { + FileInputStream fin = new FileInputStream(path); + int toSkip = METADATA_SIZE; + while (toSkip > 0) { + long skipped = fin.skip(toSkip); + if (skipped == 0) { + throw new IOException("Invalid file: Failed to skip metadata."); + } + toSkip -= skipped; + } + if (toSkip != 0) { + throw new IOException("File IO Error: Failed to skip metadata."); + } + return fin; + } + + + public CompletableFuture flushAndSave(Executor fileWriterThreads) { + boolean isEmpty = writeQueue.get().queue.isEmpty(); + if (!isEmpty) { + return loadOrGetCached(fileWriterThreads).thenApply((unused) -> null); // This will flush the data to disk. + } else { + return CompletableFuture.completedFuture(null); + } + } } diff --git a/src/main/java/com/seibel/lod/core/objects/a7/io/file/LocalDataFileHandler.java b/src/main/java/com/seibel/lod/core/objects/a7/io/file/LocalDataFileHandler.java index ab8d9ecb5..f031e5f9f 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/io/file/LocalDataFileHandler.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/io/file/LocalDataFileHandler.java @@ -1,8 +1,145 @@ package com.seibel.lod.core.objects.a7.io.file; -public class LocalDataFileHandler { - - - +import com.google.common.collect.HashMultimap; +import com.seibel.lod.core.logging.DhLoggerBuilder; +import com.seibel.lod.core.objects.a7.data.LodDataSource; +import com.seibel.lod.core.objects.a7.datatype.full.FullDatatype; +import com.seibel.lod.core.objects.a7.io.DataSource; +import com.seibel.lod.core.objects.a7.pos.DhSectionPos; +import com.seibel.lod.core.util.LodUtil; +import org.apache.logging.log4j.Logger; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +public class LocalDataFileHandler implements DataSource { + // Note: Single main thread only for now. May make it multi-thread later, depending on the usage. + ExecutorService fileReaderThread = LodUtil.makeSingleThreadPool("FileReaderThread"); + Logger logger = DhLoggerBuilder.getLogger("LocalDataFileHandler"); + + ConcurrentHashMap files = new ConcurrentHashMap<>(); + + boolean isScanned = false; + + File saveDir; + public LocalDataFileHandler(File saveRootDir) { + this.saveDir = saveRootDir; + } + + /* + * Caller must ensure that this method is called only once, + * and that this object is not used before this method is called. + */ + public void addScannedFile(Collection detectedFiles) { + HashMultimap filesByPos = HashMultimap.create(); + { // Sort files by pos. + for (File file : detectedFiles) { + try { + DataMetaFile metaFile = new DataMetaFile(file); + filesByPos.put(metaFile.pos, metaFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + // Warn for multiple files with the same pos, and then select the one with latest timestamp. + for (DhSectionPos pos : filesByPos.keySet()) { + Collection metaFiles = filesByPos.get(pos); + DataMetaFile fileToUse; + if (metaFiles.size() > 1) { + fileToUse = Collections.max(metaFiles, Comparator.comparingLong(a -> a.timestamp)); + { + StringBuilder sb = new StringBuilder(); + sb.append("Multiple files with the same pos: "); + sb.append(pos); + sb.append("\n"); + for (DataMetaFile metaFile : metaFiles) { + sb.append("\t"); + sb.append(metaFile.path); + sb.append("\n"); + } + sb.append("\tUsing: "); + sb.append(fileToUse.path); + sb.append("\n"); + sb.append("(Other files will be renamed by appending \".old\" to their name.)"); + logger.warn(sb.toString()); + + // Rename all other files with the same pos to .old + for (DataMetaFile metaFile : metaFiles) { + if (metaFile == fileToUse) continue; + File oldFile = new File(metaFile.path + ".old"); + try { + if (!metaFile.path.renameTo(oldFile)) throw new RuntimeException("Renaming failed"); + } catch (Exception e) { + logger.error("Failed to rename file: " + metaFile.path + " to " + oldFile, e); + } + } + } + } else { + fileToUse = metaFiles.iterator().next(); + } + // Add file to the list of files. + files.put(pos, fileToUse); + } + } + + /* + * This call is concurrent. I.e. it supports multiple threads calling this method at the same time. + */ + @Override + public CompletableFuture read(DhSectionPos pos) { + DataMetaFile metaFile = files.get(pos); + if (metaFile == null) { + return CompletableFuture.completedFuture(null); + } + return metaFile.loadOrGetCached(fileReaderThread); + } + + /* + * This call is concurrent. I.e. it supports multiple threads calling this method at the same time. + */ + @Override + public void write(DhSectionPos sectionPos, FullDatatype chunkData) { + DataMetaFile metaFile = files.get(sectionPos); + if (metaFile != null) { // Fast path: if there is a file for this section, just write to it. + metaFile.addToWriteQueue(chunkData); + return; + } + // Slow path: if there is no file for this section, create one. + + DataMetaFile newMetaFile = new DataMetaFile(saveDir, sectionPos); + + // We add to the queue first so on CAS onto the map, no other thread + // will see the new file without our write entry. + newMetaFile.addToWriteQueue(chunkData); + DataMetaFile casResult = files.putIfAbsent(sectionPos, newMetaFile); // This is a CAS with expected null value. + if (casResult != null) { // another thread already created the file. CAS failed. + // Drop our version and use the cas result. + casResult.addToWriteQueue(chunkData); + } + } + + /* + * This call is concurrent. I.e. it supports multiple threads calling this method at the same time. + */ + @Override + public CompletableFuture flushAndSave() { + ArrayList> futures = new ArrayList>(); + for (DataMetaFile metaFile : files.values()) { + futures.add(metaFile.flushAndSave(fileReaderThread)); + } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + } + + private File computeDefaultFilePath(DhSectionPos pos) { //TODO: Temp code as we haven't decided on the file naming & location yet. + return new File(saveDir, pos.serialize() + ".lod"); + } } diff --git a/src/main/java/com/seibel/lod/core/objects/a7/pos/DhSectionPos.java b/src/main/java/com/seibel/lod/core/objects/a7/pos/DhSectionPos.java index 949b2957d..a2dd4f0b9 100644 --- a/src/main/java/com/seibel/lod/core/objects/a7/pos/DhSectionPos.java +++ b/src/main/java/com/seibel/lod/core/objects/a7/pos/DhSectionPos.java @@ -96,7 +96,7 @@ public class DhSectionPos { sectionZ == that.sectionZ; } - // Serialize() is different from toString() as this reqires it to NEVER be changed, and should be in a short format + // Serialize() is different from toString() as this requires it to NEVER be changed, and should be in a short format public String serialize() { return "[" + sectionDetail + ',' + sectionX + ',' + sectionZ + ']'; }