Continue work on the complex concurrent file system

This commit is contained in:
TomTheFurry
2022-06-15 22:17:59 +08:00
parent b2942f51e2
commit 742f8b53bb
11 changed files with 407 additions and 55 deletions
@@ -10,6 +10,7 @@ import java.io.*;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@Deprecated
public class DataFile {
//Metadata format:
//
@@ -86,6 +87,8 @@ public class DataFile {
this.dataType = loader.clazz;
this.loaderVersion = loaderVersion;
}
public FileInputStream getDataContent() throws IOException {
FileInputStream fin = new FileInputStream(path);
int toSkip = METADATA_SIZE;
@@ -103,14 +106,7 @@ public class DataFile {
}
LodDataSource load(DHLevel level) {
if (loadedData != null) return loadedData;
try {
loadedData = loader.loadData(this, level);
return loadedData;
} catch (IOException e) {
//FIXME: Log and review this handling
return null;
}
throw new UnsupportedOperationException("Deprecated");
}
public boolean verifyPath() {
@@ -2,9 +2,8 @@ package com.seibel.lod.core.objects.a7.data;
import com.google.common.collect.HashMultimap;
import com.seibel.lod.core.objects.a7.DHLevel;
import com.seibel.lod.core.objects.a7.pos.DhSectionPos;
import com.seibel.lod.core.objects.a7.io.file.DataMetaFile;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -38,14 +37,14 @@ public abstract class DataSourceLoader {
return false;
})) {
throw new IllegalArgumentException("Loader for class " + clazz + " that supports one of the version in "
+ loaderSupportedVersions + " already registered!");
+ Arrays.toString(loaderSupportedVersions) + " already registered!");
}
datatypeIdRegistry.put(datatypeId, clazz);
loaderRegistry.put(datatypeId, this);
}
// Can return null as meaning the requirement is not met
public abstract LodDataSource loadData(DataFile dataFile, DHLevel level) throws IOException;
public abstract LodDataSource loadData(DataMetaFile dataFile, InputStream data, DHLevel level) throws IOException;
public List<File> foldersToScan(File levelFolderPath) {
return Collections.emptyList();
@@ -9,4 +9,5 @@ public interface LodDataSource {
DataSourceLoader getLatestLoader();
DhSectionPos getSectionPos();
byte getDataDetail();
void setLocalVersion(int localVer);
}
@@ -3,6 +3,7 @@ package com.seibel.lod.core.objects.a7.datatype.column;
import com.seibel.lod.core.enums.config.EVerticalQuality;
import com.seibel.lod.core.objects.a7.DHLevel;
import com.seibel.lod.core.objects.a7.data.*;
import com.seibel.lod.core.objects.a7.io.file.DataMetaFile;
import com.seibel.lod.core.objects.a7.pos.DhSectionPos;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
@@ -20,11 +21,10 @@ public class Alpha6DataLoader extends OldDataSourceLoader implements OldFileConv
}
@Override
public LodDataSource loadData(DataFile dataFile, DHLevel level) {
public LodDataSource loadData(DataMetaFile dataFile, InputStream data, DHLevel level) {
//TODO: Add decompressor here
try (
FileInputStream fin = dataFile.getDataContent();
XZCompressorInputStream xzIn = new XZCompressorInputStream(fin);
XZCompressorInputStream xzIn = new XZCompressorInputStream(data);
DataInputStream dis = new DataInputStream(xzIn);
) {
return new OldColumnDatatype(dataFile.pos, dis, dataFile.loaderVersion, level, 1);
@@ -3,9 +3,9 @@ package com.seibel.lod.core.objects.a7.datatype.column;
import com.seibel.lod.core.config.Config;
import com.seibel.lod.core.enums.config.EVerticalQuality;
import com.seibel.lod.core.objects.a7.DHLevel;
import com.seibel.lod.core.objects.a7.data.DataFile;
import com.seibel.lod.core.objects.a7.data.DataFileHandler;
import com.seibel.lod.core.objects.a7.data.LodDataSource;
import com.seibel.lod.core.objects.a7.io.file.DataMetaFile;
import com.seibel.lod.core.objects.a7.pos.DhSectionPos;
import java.io.*;
@@ -21,11 +21,10 @@ public class ColumnDataLoader extends DataSourceSaver {
}
@Override
public LodDataSource loadData(DataFile dataFile, DHLevel level) {
public LodDataSource loadData(DataMetaFile dataFile, InputStream data, DHLevel level) {
try (
FileInputStream fin = dataFile.getDataContent();
//TODO: Add decompressor here
DataInputStream dis = new DataInputStream(fin);
DataInputStream dis = new DataInputStream(data);
) {
return new ColumnDatatype(dataFile.pos, dis, dataFile.loaderVersion, level);
} catch (IOException e) {
@@ -0,0 +1,14 @@
package com.seibel.lod.core.objects.a7.io;
import com.seibel.lod.core.objects.a7.data.LodDataSource;
import com.seibel.lod.core.objects.a7.datatype.full.FullDatatype;
import com.seibel.lod.core.objects.a7.pos.DhSectionPos;
import java.util.concurrent.CompletableFuture;
public interface DataSource {
CompletableFuture<LodDataSource> read(DhSectionPos pos);
void write(DhSectionPos sectionPos, FullDatatype chunkData);
CompletableFuture<Void> flushAndSave();
}
@@ -0,0 +1,4 @@
package com.seibel.lod.core.objects.a7.io;
public class FileScanner {
}
@@ -1,8 +1,13 @@
package com.seibel.lod.core.objects.a7.io;
import java.io.*;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import com.seibel.lod.core.objects.a7.data.DataFile;
import com.seibel.lod.core.objects.a7.data.DataSourceLoader;
import com.seibel.lod.core.objects.a7.pos.DhSectionPos;
import com.seibel.lod.core.util.LodUtil;
public abstract class MetaFile {
//Metadata format:
@@ -20,22 +25,103 @@ public abstract class MetaFile {
//
// 8 bytes: datatype identifier
//
// 8 bytes: unused
// 8 bytes: timestamp
// Total size: 32 bytes
public static final int METADATA_SIZE = 32;
public static final int METADATA_MAGIC_BYTES = 0x44_48_76_30;
protected final File path;
protected final DhSectionPos pos;
public final DhSectionPos pos;
public File path;
public int checksum;
public long timestamp;
public byte dataLevel;
//Loader stuff
public DataSourceLoader loader;
public Class<?> dataType;
public byte loaderVersion;
public MetaFile(File path, DhSectionPos pos)
{
this.path = path;
this.pos = pos;
// Load a metaFile in this path. It also automatically read the metadata.
protected MetaFile(File path) throws IOException {
validatePath();
try (FileInputStream fin = new FileInputStream(path)) {
MappedByteBuffer buffer = fin.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, METADATA_SIZE);
this.path = path;
int magic = buffer.getInt();
if (magic != METADATA_MAGIC_BYTES) {
throw new IOException("Invalid file: Magic bytes check failed.");
}
int x = buffer.getInt();
int y = buffer.getInt(); // Unused
int z = buffer.getInt();
int checksum = buffer.getInt();
byte detailLevel = buffer.get();
dataLevel = buffer.get();
byte loaderVersion = buffer.get();
byte unused = buffer.get();
long dataTypeId = buffer.getLong();
long timestamp = buffer.getLong();
LodUtil.assertTrue(buffer.remaining() == 0);
this.pos = new DhSectionPos(detailLevel, x, z);
this.loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")");
}
this.dataType = loader.clazz;
this.loaderVersion = loaderVersion;
}
}
// Make a new MetaFile. It doesn't load or write any metadata itself.
protected MetaFile(File path, DhSectionPos pos) {
this.path = path;
this.pos = pos;
}
protected void save() {} //TODO: Implement
private void validatePath() throws IOException {
if (!path.exists()) throw new IOException("File missing");
if (!path.isFile()) throw new IOException("Not a file");
if (!path.canRead()) throw new IOException("File not readable");
if (!path.canWrite()) throw new IOException("File not writable");
}
protected void updateMetaData() throws IOException {
validatePath();
try (FileInputStream fin = new FileInputStream(path)) {
MappedByteBuffer buffer = fin.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, METADATA_SIZE);
int magic = buffer.getInt();
if (magic != METADATA_MAGIC_BYTES) {
throw new IOException("Invalid file: Magic bytes check failed.");
}
int x = buffer.getInt();
int y = buffer.getInt(); // Unused
int z = buffer.getInt();
int checksum = buffer.getInt();
byte detailLevel = buffer.get();
dataLevel = buffer.get();
byte loaderVersion = buffer.get();
byte unused = buffer.get();
long dataTypeId = buffer.getLong();
long timestamp = buffer.getLong();
LodUtil.assertTrue(buffer.remaining() == 0);
DhSectionPos newPos = new DhSectionPos(detailLevel, x, z);
if (!newPos.equals(pos)) {
throw new IOException("Invalid file: Section position changed.");
}
this.loader = DataSourceLoader.getLoader(dataTypeId, loaderVersion);
if (loader == null) {
throw new IOException("Invalid file: Data type loader not found: " + dataTypeId + "(v" + loaderVersion + ")");
}
this.dataType = loader.clazz;
this.loaderVersion = loaderVersion;
}
}
}
@@ -1,39 +1,99 @@
package com.seibel.lod.core.objects.a7.io.file;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.objects.a7.data.LodDataSource;
import com.seibel.lod.core.objects.a7.datatype.full.FullDatatype;
import com.seibel.lod.core.objects.a7.io.MetaFile;
import com.seibel.lod.core.objects.a7.pos.DhSectionPos;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
public class DataMetaFile extends MetaFile {
AtomicInteger localVersion = new AtomicInteger();
public static Logger LOGGER = DhLoggerBuilder.getLogger("FileMetadata");
AtomicInteger localVersion = new AtomicInteger(); // This MUST be atomic
// The '?' type should either be:
// SoftReference<LodDataSource>, or
// CompletableFuture<LodDataSource>, or
// null
// Reference<LodDataSource>, or - Dirtied file that needs to be saved
// SoftReference<LodDataSource>, or - Non-dirty file that can be GCed
// CompletableFuture<LodDataSource>, or - File that is being loaded
// null - Nothing is loaded or being loaded
AtomicReference<Object> data = new AtomicReference<Object>(null);
//TODO: use ConcurrentAppendSingleSwapContainer<LodDataSource> instead of below:
private static class GuardedMultiAppendQueue {
ReentrantReadWriteLock appendLock = new ReentrantReadWriteLock();
ConcurrentLinkedQueue<FullDatatype> queue = new ConcurrentLinkedQueue<>();
}
AtomicReference<GuardedMultiAppendQueue> writeQueue =
new AtomicReference<>(new GuardedMultiAppendQueue());
GuardedMultiAppendQueue _backQueue = new GuardedMultiAppendQueue();
public void addToWriteQueue(FullDatatype datatype) {
GuardedMultiAppendQueue queue = writeQueue.get();
// Using read lock is OK, because the queue's underlying data structure is thread-safe.
// This lock is only used to insure on polling the queue, that the queue is not being
// modified by another thread.
Lock appendLock = queue.appendLock.readLock();
appendLock.lock();
try {
queue.queue.add(datatype);
} finally {
appendLock.unlock();
}
}
private void swapWriteQueue() {
GuardedMultiAppendQueue queue = writeQueue.getAndSet(_backQueue);
// Acquire write lock and then release it again as we only need to ensure that the queue
// is not being appended to by another thread. Note that the above atomic swap &
// the guarantee that all append first acquire the appendLock means after the locK() call,
// there will be no other threads able to or is currently appending to the queue.
// Note: The above needs the getAndSet() to have at least Release Memory order.
// (not that java supports anything non volatile for getAndSet()...)
queue.appendLock.writeLock().lock();
queue.appendLock.writeLock().unlock();
_backQueue = queue;
}
// Load a metaFile in this path. It also automatically read the metadata.
public DataMetaFile(File path) throws IOException {
super(path);
}
// Make a new MetaFile. It doesn't load or write any metadata itself.
public DataMetaFile(File path, DhSectionPos pos) {
super(path, pos);
// TODO Auto-generated constructor stub
}
public boolean isValid(int version) {
return (localVersion.get() == version);
boolean isValid = false;
// First check if write queue is empty, then check if localVersion is equal to version.
// Must be done in this order as writer will increment localVersion before polling in the write queue.
// Note: Be careful with the localVerion read's memory order if we do switch over to java 1.9.
// It should be acquire or higher!
isValid = writeQueue.get().queue.isEmpty(); // The 'get()' & 'isEmpty()' enforce a memory barrier.
// Also, we are just querying the state, and this means no
// need to get any locks for the queue.
isValid &= localVersion.get() == version; // The 'get()' enforce a memory barrier.
return isValid;
}
private CompletableFuture<LodDataSource> readCached(Object obj) {
private CompletableFuture<LodDataSource> _readCached(Object obj) {
// Has file cached in RAM and not freed yet.
if (obj != null && (obj instanceof SoftReference<?>)) {
if ((obj instanceof SoftReference<?>)) {
Object inner = ((SoftReference<?>)obj).get();
if (inner != null) {
LodUtil.assertTrue(inner instanceof LodDataSource);
@@ -43,19 +103,18 @@ public class DataMetaFile extends MetaFile {
//==== Cached file out of scrope. ====
// Someone is already trying to complete it. so just return the obj.
if (obj != null && (obj instanceof CompletableFuture<?>)) {
if ((obj instanceof CompletableFuture<?>)) {
return (CompletableFuture<LodDataSource>)obj;
}
return null;
}
// Cause: Generic Type runtime casting cannot safety check it.
// However, the Union type ensures the 'data' should only contain the listed type.
@SuppressWarnings("unchecked")
// However, the Union type ensures the 'data' should only contain the listed type.
public CompletableFuture<LodDataSource> loadOrGetCached(Executor fileReaderThreads) {
Object obj = data.get();
CompletableFuture<LodDataSource> cached = readCached(obj);
CompletableFuture<LodDataSource> cached = _readCached(obj);
if (cached != null) return cached;
CompletableFuture<LodDataSource> future = new CompletableFuture<LodDataSource>();
@@ -64,21 +123,78 @@ public class DataMetaFile extends MetaFile {
boolean worked = data.compareAndSet(obj, future);
if (!worked) return loadOrGetCached(fileReaderThreads);
// Would use ComplatableFuture.completeAsync(...), But, java 8 doesn't have it! :(
// Would use CompletableFuture.completeAsync(...), But, java 8 doesn't have it! :(
//return future.completeAsync(this::loadFile, fileReaderThreads);
CompletableFuture.supplyAsync(this::loadFile, fileReaderThreads).whenComplete((f, e) -> {
if (e != null) future.completeExceptionally(e);
CompletableFuture.supplyAsync(this::loadAndUpdateDataSource, fileReaderThreads).whenComplete((f, e) -> {
if (e != null) {
LOGGER.error("Uncaught error loading file {}: ", path, e);
future.complete(null);
}
future.complete(f);
});
return future;
}
private LodDataSource loadFile() {
// TODO
private LodDataSource loadAndUpdateDataSource() {
LodDataSource data = loadFile();
if (data == null) return null;
// Poll the write queue
// First check if write queue is empty, then swap the write queue.
// Must be done in this order to ensure isValid work properly. See isValid() for details.
boolean isEmpty = writeQueue.get().queue.isEmpty();
int localVer;
if (!isEmpty) {
localVer = localVersion.incrementAndGet();
swapWriteQueue();
// TODO: Use _backQueue to apply the changes into the data.
// TODO: Trigger a save to disk.
} else localVer = localVersion.get();
data.setLocalVersion(localVer);
// Finally, return the data.
return null;
}
private LodDataSource loadFile() {
// Refresh the metadata.
try {
super.updateMetaData();
} catch (IOException e) {
LOGGER.warn("Metadata for file {} changed unexpectedly and in an invalid state. Dropping file.", path, e);
return null;
}
// Load the file.
try (FileInputStream fio = getDataContent()){
return loader.loadData(this, fio, null); // FIXME: somehow get the level object????
} catch (IOException e) {
LOGGER.warn("Failed to load file {}. Dropping file.", path, e);
return null;
}
}
private FileInputStream getDataContent() throws IOException {
FileInputStream fin = new FileInputStream(path);
int toSkip = METADATA_SIZE;
while (toSkip > 0) {
long skipped = fin.skip(toSkip);
if (skipped == 0) {
throw new IOException("Invalid file: Failed to skip metadata.");
}
toSkip -= skipped;
}
if (toSkip != 0) {
throw new IOException("File IO Error: Failed to skip metadata.");
}
return fin;
}
public CompletableFuture<Void> flushAndSave(Executor fileWriterThreads) {
boolean isEmpty = writeQueue.get().queue.isEmpty();
if (!isEmpty) {
return loadOrGetCached(fileWriterThreads).thenApply((unused) -> null); // This will flush the data to disk.
} else {
return CompletableFuture.completedFuture(null);
}
}
}
@@ -1,8 +1,145 @@
package com.seibel.lod.core.objects.a7.io.file;
public class LocalDataFileHandler {
import com.google.common.collect.HashMultimap;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.objects.a7.data.LodDataSource;
import com.seibel.lod.core.objects.a7.datatype.full.FullDatatype;
import com.seibel.lod.core.objects.a7.io.DataSource;
import com.seibel.lod.core.objects.a7.pos.DhSectionPos;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
public class LocalDataFileHandler implements DataSource {
// Note: Single main thread only for now. May make it multi-thread later, depending on the usage.
ExecutorService fileReaderThread = LodUtil.makeSingleThreadPool("FileReaderThread");
Logger logger = DhLoggerBuilder.getLogger("LocalDataFileHandler");
ConcurrentHashMap<DhSectionPos, DataMetaFile> files = new ConcurrentHashMap<>();
boolean isScanned = false;
File saveDir;
public LocalDataFileHandler(File saveRootDir) {
this.saveDir = saveRootDir;
}
/*
* Caller must ensure that this method is called only once,
* and that this object is not used before this method is called.
*/
public void addScannedFile(Collection<File> detectedFiles) {
HashMultimap<DhSectionPos, DataMetaFile> filesByPos = HashMultimap.create();
{ // Sort files by pos.
for (File file : detectedFiles) {
try {
DataMetaFile metaFile = new DataMetaFile(file);
filesByPos.put(metaFile.pos, metaFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
// Warn for multiple files with the same pos, and then select the one with latest timestamp.
for (DhSectionPos pos : filesByPos.keySet()) {
Collection<DataMetaFile> metaFiles = filesByPos.get(pos);
DataMetaFile fileToUse;
if (metaFiles.size() > 1) {
fileToUse = Collections.max(metaFiles, Comparator.comparingLong(a -> a.timestamp));
{
StringBuilder sb = new StringBuilder();
sb.append("Multiple files with the same pos: ");
sb.append(pos);
sb.append("\n");
for (DataMetaFile metaFile : metaFiles) {
sb.append("\t");
sb.append(metaFile.path);
sb.append("\n");
}
sb.append("\tUsing: ");
sb.append(fileToUse.path);
sb.append("\n");
sb.append("(Other files will be renamed by appending \".old\" to their name.)");
logger.warn(sb.toString());
// Rename all other files with the same pos to .old
for (DataMetaFile metaFile : metaFiles) {
if (metaFile == fileToUse) continue;
File oldFile = new File(metaFile.path + ".old");
try {
if (!metaFile.path.renameTo(oldFile)) throw new RuntimeException("Renaming failed");
} catch (Exception e) {
logger.error("Failed to rename file: " + metaFile.path + " to " + oldFile, e);
}
}
}
} else {
fileToUse = metaFiles.iterator().next();
}
// Add file to the list of files.
files.put(pos, fileToUse);
}
}
/*
* This call is concurrent. I.e. it supports multiple threads calling this method at the same time.
*/
@Override
public CompletableFuture<LodDataSource> read(DhSectionPos pos) {
DataMetaFile metaFile = files.get(pos);
if (metaFile == null) {
return CompletableFuture.completedFuture(null);
}
return metaFile.loadOrGetCached(fileReaderThread);
}
/*
* This call is concurrent. I.e. it supports multiple threads calling this method at the same time.
*/
@Override
public void write(DhSectionPos sectionPos, FullDatatype chunkData) {
DataMetaFile metaFile = files.get(sectionPos);
if (metaFile != null) { // Fast path: if there is a file for this section, just write to it.
metaFile.addToWriteQueue(chunkData);
return;
}
// Slow path: if there is no file for this section, create one.
DataMetaFile newMetaFile = new DataMetaFile(saveDir, sectionPos);
// We add to the queue first so on CAS onto the map, no other thread
// will see the new file without our write entry.
newMetaFile.addToWriteQueue(chunkData);
DataMetaFile casResult = files.putIfAbsent(sectionPos, newMetaFile); // This is a CAS with expected null value.
if (casResult != null) { // another thread already created the file. CAS failed.
// Drop our version and use the cas result.
casResult.addToWriteQueue(chunkData);
}
}
/*
* This call is concurrent. I.e. it supports multiple threads calling this method at the same time.
*/
@Override
public CompletableFuture<Void> flushAndSave() {
ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
for (DataMetaFile metaFile : files.values()) {
futures.add(metaFile.flushAndSave(fileReaderThread));
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private File computeDefaultFilePath(DhSectionPos pos) { //TODO: Temp code as we haven't decided on the file naming & location yet.
return new File(saveDir, pos.serialize() + ".lod");
}
}
@@ -96,7 +96,7 @@ public class DhSectionPos {
sectionZ == that.sectionZ;
}
// Serialize() is different from toString() as this reqires it to NEVER be changed, and should be in a short format
// Serialize() is different from toString() as this requires it to NEVER be changed, and should be in a short format
public String serialize() {
return "[" + sectionDetail + ',' + sectionX + ',' + sectionZ + ']';
}