refactor fullDataMetaFile

This commit is contained in:
James Seibel
2023-03-11 11:00:14 -06:00
parent c5c298708f
commit 3510facd1e
@@ -78,11 +78,12 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
}
// ===========================
// Create a new metaFile
public FullDataMetaFile(IFullDataSourceProvider handler, IDhLevel level, DhSectionPos pos) throws IOException {
public FullDataMetaFile(IFullDataSourceProvider handler, IDhLevel level, DhSectionPos pos) throws IOException
{
super(handler.computeDataFilePath(pos), pos);
debugCheck();
this.handler = handler;
@@ -90,8 +91,9 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
LodUtil.assertTrue(metaData == null);
doesFileExist = false;
}
public FullDataMetaFile(IFullDataSourceProvider handler, IDhLevel level, File path) throws IOException {
public FullDataMetaFile(IFullDataSourceProvider handler, IDhLevel level, File path) throws IOException
{
super(path);
debugCheck();
this.handler = handler;
@@ -105,7 +107,10 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
dataType = loader.clazz;
doesFileExist = true;
}
public CompletableFuture<Void> flushAndSave()
{
debugCheck();
@@ -119,12 +124,12 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
return CompletableFuture.completedFuture(null);
}
}
// public long getCacheVersion() {
// debugCheck();
// return (this.metaData == null) ? 0 : this.metaData.dataVersion.get();
// }
// public boolean isCacheVersionValid(long cacheVersion)
// {
// debugCheck();
@@ -162,6 +167,8 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
{
appendLock.unlock();
}
//LOGGER.info("write queue length for pos "+this.pos+": " + writeQueue.queue.size());
}
// Cause: Generic Type runtime casting cannot safety check it.
@@ -223,7 +230,7 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
// Load the file.
IFullDataSource data;
try (FileInputStream fileInputStream = this.getDataContent();
try (FileInputStream fileInputStream = this.getFileInputStream();
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream))
{
data = this.loader.loadData(this, bufferedInputStream, this.level);
@@ -291,47 +298,62 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
// "unchecked": Suppress casting of CompletableFuture<?> to CompletableFuture<LodDataSource>
// "PointlessBooleanExpression": Suppress explicit (boolean == false) check for more understandable CAS operation code.
@SuppressWarnings({"unchecked"})
private CompletableFuture<IFullDataSource> _readCachedAsync(Object obj) {
private CompletableFuture<IFullDataSource> _readCachedAsync(Object obj)
{
// Has file cached in RAM and not freed yet.
if ((obj instanceof SoftReference<?>)) {
if ((obj instanceof SoftReference<?>))
{
Object inner = ((SoftReference<?>)obj).get();
if (inner != null) {
if (inner != null)
{
LodUtil.assertTrue(inner instanceof IFullDataSource);
boolean isEmpty = writeQueue.get().queue.isEmpty();
// If the queue is empty, and the CAS on inCacheWriteLock succeeds, then we are the thread
// that will be applying the changes to the cache.
if (!isEmpty) {
if (!isEmpty)
{
// Do a CAS on inCacheWriteLock to ensure that we are the only thread that is writing to the cache,
// or if we fail, then that means someone else is already doing it, and we can just return the future
CompletableFuture<IFullDataSource> future = new CompletableFuture<>();
CompletableFuture<IFullDataSource> cas = AtomicsUtil.compareAndExchange(inCacheWriteAccessFuture, null, future);
if (cas == null) {
try {
CompletableFuture<IFullDataSource> compareAndSwapFuture = AtomicsUtil.compareAndExchange(inCacheWriteAccessFuture, null, future);
if (compareAndSwapFuture == null)
{
try
{
data.set(future);
handler.onDataFileRefresh((IFullDataSource) inner, metaData, this::applyWriteQueue, this::saveChanges).handle((v, e) -> {
if (e != null) {
LOGGER.error("Error refreshing data {}: ", pos, e);
handler.onDataFileRefresh((IFullDataSource) inner, metaData, this::applyWriteQueue, this::saveChanges).handle((fullDataSource, exception) ->
{
if (exception != null)
{
LOGGER.error("Error refreshing data "+pos+": "+exception);
future.complete(null);
data.set(null);
} else {
future.complete(v);
new DataObjTracker(v);
data.set(new SoftReference<>(v));
}
else
{
future.complete(fullDataSource);
new DataObjTracker(fullDataSource);
data.set(new SoftReference<>(fullDataSource));
}
inCacheWriteAccessFuture.set(null);
return v;
return fullDataSource;
});
return future;
} catch (Exception e) {
LOGGER.error("Error while doing refreshes to LodDataSource at {}: ", pos, e);
}
catch (Exception e)
{
LOGGER.error("Error while doing refreshes to LodDataSource at " + pos + ": " + e);
return CompletableFuture.completedFuture((IFullDataSource) inner);
}
} else {
// or, return the future that will be completed when the write is done.
return cas;
}
} else {
else
{
// or, return the future that will be completed when the write is done.
return compareAndSwapFuture;
}
}
else
{
// or, return the cached data.
return CompletableFuture.completedFuture((IFullDataSource) inner);
}
@@ -418,33 +440,39 @@ public class FullDataMetaFile extends AbstractMetaDataContainerFile
return !isEmpty;
}
private FileInputStream getDataContent() throws IOException
private FileInputStream getFileInputStream() throws IOException
{
FileInputStream fin = new FileInputStream(this.file);
FileInputStream fileInputStream = new FileInputStream(this.file);
int toSkip = METADATA_SIZE_IN_BYTES;
while (toSkip > 0)
{
long skipped = fin.skip(toSkip);
long skipped = fileInputStream.skip(toSkip);
if (skipped == 0)
{
throw new IOException("Invalid file: Failed to skip metadata.");
}
toSkip -= skipped;
}
if (toSkip != 0)
{
throw new IOException("File IO Error: Failed to skip metadata.");
}
return fin;
return fileInputStream;
}
public static void debugCheck() {
public static void debugCheck()
{
DataObjTracker phantom = (DataObjTracker) lifeCycleDebugQueue.poll();
while (phantom != null) {
LOGGER.info("Data {} is freed. {} remaining", phantom.pos, lifeCycleDebugSet.size());
// wait for the tracker to be garbage collected(?)
while (phantom != null)
{
LOGGER.info("Full Data at pos: "+phantom.pos+" has been freed. "+lifeCycleDebugSet.size()+" Full Data files remaining.");
phantom.close();
phantom = (DataObjTracker) lifeCycleDebugQueue.poll();
}
}
}