file handling refactoring
This commit is contained in:
@@ -10,12 +10,14 @@ import java.io.InputStream;
|
||||
|
||||
public class SparseDataLoader extends AbstractDataSourceLoader
|
||||
{
|
||||
public SparseDataLoader() {
|
||||
super(SparseDataSource.class, SparseDataSource.TYPE_ID, new byte[]{SparseDataSource.LATEST_VERSION});
|
||||
}
|
||||
public SparseDataLoader()
|
||||
{
|
||||
super(SparseDataSource.class, SparseDataSource.TYPE_ID, new byte[] { SparseDataSource.LATEST_VERSION });
|
||||
}
|
||||
|
||||
@Override
|
||||
public ILodDataSource loadData(DataMetaFile dataFile, InputStream data, IDhLevel level) throws IOException {
|
||||
return SparseDataSource.loadData(dataFile, data, level);
|
||||
}
|
||||
public ILodDataSource loadData(DataMetaFile dataFile, InputStream data, IDhLevel level) throws IOException
|
||||
{
|
||||
return SparseDataSource.loadData(dataFile, data, level);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.seibel.lod.core.datatype.full.ChunkSizedData;
|
||||
import com.seibel.lod.core.datatype.full.FullDataSource;
|
||||
import com.seibel.lod.core.datatype.full.SparseDataSource;
|
||||
import com.seibel.lod.core.datatype.full.SpottyDataSource;
|
||||
import com.seibel.lod.core.file.FileUtil;
|
||||
import com.seibel.lod.core.file.metaData.MetaData;
|
||||
import com.seibel.lod.core.level.IDhLevel;
|
||||
import com.seibel.lod.core.pos.DhLodPos;
|
||||
@@ -17,14 +18,12 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
@@ -150,7 +149,7 @@ public class DataFileHandler implements IDataSourceProvider
|
||||
}
|
||||
}
|
||||
|
||||
protected DataMetaFile atomicGetOrMakeFile(DhSectionPos pos)
|
||||
protected DataMetaFile getOrMakeFile(DhSectionPos pos)
|
||||
{
|
||||
DataMetaFile metaFile = this.files.get(pos);
|
||||
if (metaFile == null)
|
||||
@@ -266,7 +265,7 @@ public class DataFileHandler implements IDataSourceProvider
|
||||
public CompletableFuture<ILodDataSource> read(DhSectionPos pos)
|
||||
{
|
||||
this.topDetailLevel.updateAndGet(v -> Math.max(v, pos.sectionDetailLevel));
|
||||
DataMetaFile metaFile = this.atomicGetOrMakeFile(pos);
|
||||
DataMetaFile metaFile = this.getOrMakeFile(pos);
|
||||
if (metaFile == null)
|
||||
{
|
||||
return CompletableFuture.completedFuture(null);
|
||||
@@ -351,7 +350,7 @@ public class DataFileHandler implements IDataSourceProvider
|
||||
{
|
||||
for (DhSectionPos missingPos : missing)
|
||||
{
|
||||
DataMetaFile newFile = this.atomicGetOrMakeFile(missingPos);
|
||||
DataMetaFile newFile = this.getOrMakeFile(missingPos);
|
||||
if (newFile != null)
|
||||
{
|
||||
existFiles.add(newFile);
|
||||
@@ -362,7 +361,7 @@ public class DataFileHandler implements IDataSourceProvider
|
||||
SparseDataSource.createEmpty(pos) :
|
||||
SpottyDataSource.createEmpty(pos);
|
||||
|
||||
for (DataMetaFile f : existFiles)
|
||||
for (DataMetaFile metaFile : existFiles)
|
||||
{
|
||||
futures.add(f.loadOrGetCached()
|
||||
.exceptionally((ex) -> null)
|
||||
@@ -445,4 +444,5 @@ public class DataFileHandler implements IDataSourceProvider
|
||||
DataMetaFile.debugCheck();
|
||||
//TODO
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -54,18 +54,21 @@ public class DataMetaFile extends AbstractMetaDataFile
|
||||
// ===Object lifetime stuff===
|
||||
private static final ReferenceQueue<ILodDataSource> lifeCycleDebugQueue = new ReferenceQueue<>();
|
||||
private static final Set<DataObjTracker> lifeCycleDebugSet = ConcurrentHashMap.newKeySet();
|
||||
private static class DataObjTracker extends PhantomReference<ILodDataSource> implements Closeable {
|
||||
private static class DataObjTracker extends PhantomReference<ILodDataSource> implements Closeable
|
||||
{
|
||||
private final DhSectionPos pos;
|
||||
DataObjTracker(ILodDataSource data) {
|
||||
|
||||
DataObjTracker(ILodDataSource data)
|
||||
{
|
||||
super(data, lifeCycleDebugQueue);
|
||||
//LOGGER.info("Phantom created on {}! count: {}", data.getSectionPos(), lifeCycleDebugSet.size());
|
||||
lifeCycleDebugSet.add(this);
|
||||
pos = data.getSectionPos();
|
||||
this.pos = data.getSectionPos();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
lifeCycleDebugSet.remove(this);
|
||||
}
|
||||
public void close() { lifeCycleDebugSet.remove(this); }
|
||||
|
||||
}
|
||||
// ===========================
|
||||
|
||||
@@ -100,7 +103,7 @@ public class DataMetaFile extends AbstractMetaDataFile
|
||||
debugCheck();
|
||||
boolean isEmpty = writeQueue.get().queue.isEmpty();
|
||||
if (!isEmpty) {
|
||||
return loadOrGetCached().thenApply((unused) -> null); // This will flush the data to disk.
|
||||
return loadOrGetCachedAsync().thenApply((unused) -> null); // This will flush the data to disk.
|
||||
} else {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
@@ -144,54 +147,76 @@ public class DataMetaFile extends AbstractMetaDataFile
|
||||
|
||||
// Cause: Generic Type runtime casting cannot safety check it.
|
||||
// However, the Union type ensures the 'data' should only contain the listed type.
|
||||
public CompletableFuture<ILodDataSource> loadOrGetCached() {
|
||||
public CompletableFuture<ILodDataSource> loadOrGetCachedAsync()
|
||||
{
|
||||
debugCheck();
|
||||
Object obj = data.get();
|
||||
Object obj = this.data.get();
|
||||
|
||||
CompletableFuture<ILodDataSource> cached = _readCached(obj);
|
||||
if (cached != null) return cached;
|
||||
CompletableFuture<ILodDataSource> cached = this._readCachedAsync(obj);
|
||||
if (cached != null)
|
||||
{
|
||||
return cached;
|
||||
}
|
||||
|
||||
CompletableFuture<ILodDataSource> future = new CompletableFuture<>();
|
||||
|
||||
// Would use faster and non-nesting Compare and exchange. But java 8 doesn't have it! :(
|
||||
boolean worked = data.compareAndSet(obj, future);
|
||||
if (!worked) return loadOrGetCached();
|
||||
boolean worked = this.data.compareAndSet(obj, future);
|
||||
if (!worked)
|
||||
{
|
||||
return this.loadOrGetCachedAsync();
|
||||
}
|
||||
|
||||
// After cas. We are in exclusive control.
|
||||
if (!doesFileExist) {
|
||||
handler.onCreateDataFile(this)
|
||||
.thenApply((data) -> {
|
||||
metaData = makeMetaData(data);
|
||||
if (!this.doesFileExist)
|
||||
{
|
||||
this.handler.onCreateDataFile(this)
|
||||
.thenApply((data) ->
|
||||
{
|
||||
this.metaData = makeMetaData(data);
|
||||
return data;
|
||||
})
|
||||
.thenApply((data) -> handler.onDataFileLoaded(data, metaData, this::saveChanges, this::applyWriteQueue))
|
||||
.whenComplete((v, e) -> {
|
||||
if (e != null) {
|
||||
LOGGER.error("Uncaught error on creation {}: ", path, e);
|
||||
.thenApply((data) -> this.handler.onDataFileLoaded(data, this.metaData, this::saveChanges, this::applyWriteQueue))
|
||||
.whenComplete((v, e) ->
|
||||
{
|
||||
if (e != null)
|
||||
{
|
||||
future.complete(null);
|
||||
data.set(null);
|
||||
} else {
|
||||
this.data.set(null);
|
||||
}
|
||||
else
|
||||
{
|
||||
future.complete(v);
|
||||
new DataObjTracker(v);
|
||||
data.set(new SoftReference<>(v));
|
||||
this.data.set(new SoftReference<>(v));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
if (metaData == null)
|
||||
}
|
||||
else
|
||||
{
|
||||
CompletableFuture.supplyAsync(() ->
|
||||
{
|
||||
if (this.metaData == null)
|
||||
{
|
||||
throw new IllegalStateException("Meta data not loaded!");
|
||||
}
|
||||
|
||||
// Load the file.
|
||||
ILodDataSource data;
|
||||
try (FileInputStream fio = getDataContent()){
|
||||
data = loader.loadData(this, fio, level);
|
||||
} catch (IOException e) {
|
||||
try (FileInputStream fio = this.getDataContent())
|
||||
{
|
||||
data = this.loader.loadData(this, fio, this.level);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
// Apply the write queue
|
||||
LodUtil.assertTrue(inCacheWriteAccessFuture.get() == null,"No one should be writing to the cache while we are in the process of " +
|
||||
"loading one into the cache! Is this a deadlock?");
|
||||
data = handler.onDataFileLoaded(data, metaData, this::saveChanges, this::applyWriteQueue);
|
||||
// Finally, return the data.
|
||||
LodUtil.assertTrue(this.inCacheWriteAccessFuture.get() == null,
|
||||
"No one should be writing to the cache while we are in the process of " +
|
||||
"loading one into the cache! Is this a deadlock?");
|
||||
|
||||
data = this.handler.onDataFileLoaded(data, this.metaData, this::saveChanges, this::applyWriteQueue);
|
||||
return data;
|
||||
}, handler.getIOExecutor())
|
||||
.whenComplete((f, e) -> {
|
||||
@@ -202,11 +227,11 @@ public class DataMetaFile extends AbstractMetaDataFile
|
||||
} else {
|
||||
future.complete(f);
|
||||
new DataObjTracker(f);
|
||||
data.set(new SoftReference<>(f));
|
||||
this.data.set(new SoftReference<>(f));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Would use CompletableFuture.completeAsync(...), But, java 8 doesn't have it! :(
|
||||
//return future.completeAsync(this::loadAndUpdateDataSource, fileReaderThreads);
|
||||
return future;
|
||||
@@ -221,7 +246,7 @@ public class DataMetaFile extends AbstractMetaDataFile
|
||||
// "unchecked": Suppress casting of CompletableFuture<?> to CompletableFuture<LodDataSource>
|
||||
// "PointlessBooleanExpression": Suppress explicit (boolean == false) check for more understandable CAS operation code.
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private CompletableFuture<ILodDataSource> _readCached(Object obj) {
|
||||
private CompletableFuture<ILodDataSource> _readCachedAsync(Object obj) {
|
||||
// Has file cached in RAM and not freed yet.
|
||||
if ((obj instanceof SoftReference<?>)) {
|
||||
Object inner = ((SoftReference<?>)obj).get();
|
||||
@@ -329,17 +354,21 @@ public class DataMetaFile extends AbstractMetaDataFile
|
||||
return !isEmpty;
|
||||
}
|
||||
|
||||
private FileInputStream getDataContent() throws IOException {
|
||||
private FileInputStream getDataContent() throws IOException
|
||||
{
|
||||
FileInputStream fin = new FileInputStream(path);
|
||||
int toSkip = METADATA_SIZE;
|
||||
while (toSkip > 0) {
|
||||
while (toSkip > 0)
|
||||
{
|
||||
long skipped = fin.skip(toSkip);
|
||||
if (skipped == 0) {
|
||||
if (skipped == 0)
|
||||
{
|
||||
throw new IOException("Invalid file: Failed to skip metadata.");
|
||||
}
|
||||
toSkip -= skipped;
|
||||
}
|
||||
if (toSkip != 0) {
|
||||
if (toSkip != 0)
|
||||
{
|
||||
throw new IOException("File IO Error: Failed to skip metadata.");
|
||||
}
|
||||
return fin;
|
||||
|
||||
@@ -85,7 +85,7 @@ public class GeneratedDataFileHandler extends DataFileHandler
|
||||
// create the missing metaData files
|
||||
for (DhSectionPos missingPos : missingPositions)
|
||||
{
|
||||
DataMetaFile newFile = this.atomicGetOrMakeFile(missingPos);
|
||||
DataMetaFile newFile = this.getOrMakeFile(missingPos);
|
||||
if (newFile != null)
|
||||
{
|
||||
existingFiles.add(newFile);
|
||||
@@ -98,7 +98,7 @@ public class GeneratedDataFileHandler extends DataFileHandler
|
||||
final ArrayList<CompletableFuture<Void>> futures = new ArrayList<>(existingFiles.size());
|
||||
for (DataMetaFile existingFile : existingFiles)
|
||||
{
|
||||
futures.add(existingFile.loadOrGetCached()
|
||||
futures.add(existingFile.loadOrGetCachedAsync()
|
||||
.exceptionally((ex) -> /*Ignore file read errors*/null)
|
||||
.thenAccept((data) ->
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user