Fix region load bug & fix corrupt file load error handling & fix downsampling bug & fix sparse failing to save the promoted version

This commit is contained in:
TomTheFurry
2022-09-11 00:12:22 +08:00
parent 050d88dd13
commit 0a70ec9986
8 changed files with 90 additions and 47 deletions
@@ -111,6 +111,7 @@ public class SparseDataSource implements LodDataSource {
LodUtil.assertTrue(pos.sectionDetail < sectionPos.sectionDetail);
LodUtil.assertTrue(pos.overlaps(sectionPos));
if (sparseSource.isEmpty) return;
isEmpty = false;
// Downsample needed
DhLodPos basePos = sectionPos.getCorner(SPARSE_UNIT_DETAIL);
@@ -135,6 +136,7 @@ public class SparseDataSource implements LodDataSource {
LodUtil.assertTrue(pos.sectionDetail < sectionPos.sectionDetail);
LodUtil.assertTrue(pos.overlaps(sectionPos));
if (fullSource.isEmpty) return;
isEmpty = false;
// Downsample needed
DhLodPos basePos = sectionPos.getCorner(SPARSE_UNIT_DETAIL);
DhLodPos dataPos = pos.getCorner(SPARSE_UNIT_DETAIL);
@@ -377,17 +377,18 @@ public class GenerationQueue implements Closeable {
taskGroups.values().forEach(g -> g.members.forEach(t -> t.future.complete(false)));
taskGroups.clear();
ArrayList<CompletableFuture<Void>> array = new ArrayList<>(inProgress.size());
inProgress.values().forEach(runningTask -> array.add(
runningTask.genFuture.exceptionally((ex) -> {
if (ex instanceof CompletionException) ex = ex.getCause();
if (!UncheckedInterruptedException.isThrowableInterruption(ex))
logger.error("Error when terminating data generation for section {}", runningTask.group.pos, ex);
return null;
})));
closer = CompletableFuture.allOf(array.toArray(CompletableFuture[]::new));
if (cancelCurrentGeneration) {
array.forEach(f -> f.cancel(alsoInterruptRunning));
}
inProgress.values().forEach(runningTask ->
{
CompletableFuture<Void> genFuture = runningTask.genFuture; // Do this to prevent it getting swapped out
if (cancelCurrentGeneration) genFuture.cancel(alsoInterruptRunning);
array.add(genFuture.handle((v, ex) -> {
if (ex instanceof CompletionException) ex = ex.getCause();
if (!UncheckedInterruptedException.isThrowableInterruption(ex))
logger.error("Error when terminating data generation for section {}", runningTask.group.pos, ex);
return null;
}));
});
closer = CompletableFuture.allOf(array.toArray(CompletableFuture[]::new)); //FIXME: Closer threading issues with pollAndStartClosest
looseTasks.forEach(t -> t.future.complete(false));
looseTasks.clear();
return closer;
@@ -15,6 +15,7 @@ public class RenderBufferHandler {
class RenderBufferNode implements AutoCloseable {
public final DhSectionPos pos;
public volatile RenderBufferNode[] children = null;
//FIXME: The multiple Atomics will cause race conditions between them!
public final AtomicReference<RenderBuffer> renderBufferSlotOpaque = new AtomicReference<>();
public final AtomicReference<RenderBuffer> renderBufferSlotTransparent = new AtomicReference<>();
@@ -75,10 +76,14 @@ public class RenderBufferHandler {
boolean shouldRender = section.canRender();
if (!shouldRender) {
//TODO: Does this really need to force the old buffer to not be rendered?
// RenderBuffer buff = renderBufferSlot.getAndSet(null);
// if (buff != null) {
// buff.close();
// }
RenderBuffer buff = renderBufferSlotOpaque.getAndSet(null);
if (buff != null) {
buff.close();
}
buff = renderBufferSlotTransparent.getAndSet(null);
if (buff != null) {
buff.close();
}
} else {
LodUtil.assertTrue(container != null); // section.isLoaded() should have ensured this
container.trySwapRenderBuffer(target, renderBufferSlotOpaque, renderBufferSlotTransparent);
@@ -25,6 +25,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
public class DataFileHandler implements IDataSourceProvider {
// Note: Single main thread only for now. May make it multi-thread later, depending on the usage.
@@ -56,9 +57,14 @@ public class DataFileHandler implements IDataSourceProvider {
DataMetaFile metaFile = new DataMetaFile(this, level, file);
filesByPos.put(metaFile.pos, metaFile);
} catch (IOException e) {
LOGGER.error("Failed to read file {}. File will be deleted.", file, e);
if (!file.delete()) {
LOGGER.error("Failed to delete file {}.", file);
LOGGER.error("Failed to read data meta file at {}: ", file, e);
File corruptedFile = new File(file.getParentFile(), file.getName() + ".corrupted");
if (corruptedFile.exists()) corruptedFile.delete();
if (file.renameTo(corruptedFile)) {
LOGGER.error("Renamed corrupted file to {}", file.getName() + ".corrupted");
} else {
LOGGER.error("Failed to rename corrupted file to {}. Will try and delete file", file.getName() + ".corrupted");
file.delete();
}
}
}
@@ -295,17 +301,28 @@ public class DataFileHandler implements IDataSourceProvider {
}
@Override
public LodDataSource onDataFileLoaded(LodDataSource source, Consumer<LodDataSource> updater) {
updater.accept(source);
if (source instanceof SparseDataSource) return ((SparseDataSource) source).trySelfPromote();
public LodDataSource onDataFileLoaded(LodDataSource source, Function<LodDataSource, Boolean> updater, Consumer<LodDataSource> onUpdated) {
boolean changed = updater.apply(source);
if (source instanceof SparseDataSource) {
LodDataSource newSource = ((SparseDataSource) source).trySelfPromote();
changed |= newSource != source;
source = newSource;
}
if (changed) onUpdated.accept(source);
return source;
}
@Override
public CompletableFuture<LodDataSource> onDataFileRefresh(LodDataSource source, Consumer<LodDataSource> updater) {
public CompletableFuture<LodDataSource> onDataFileRefresh(LodDataSource source, Function<LodDataSource, Boolean> updater, Consumer<LodDataSource> onUpdated) {
return CompletableFuture.supplyAsync(() -> {
updater.accept(source);
if (source instanceof SparseDataSource) return ((SparseDataSource) source).trySelfPromote();
return source;
LodDataSource sourceLocal = source;
boolean changed = updater.apply(sourceLocal);
if (sourceLocal instanceof SparseDataSource) {
LodDataSource newSource = ((SparseDataSource) sourceLocal).trySelfPromote();
changed |= newSource != sourceLocal;
sourceLocal = newSource;
}
if (changed) onUpdated.accept(sourceLocal);
return sourceLocal;
}, fileReaderThread);
}
@@ -154,7 +154,7 @@ public class DataMetaFile extends MetaFile
metaData = makeMetaData(data);
return data;
})
.thenApply((data) -> handler.onDataFileLoaded(data, this::applyWriteQueue))
.thenApply((data) -> handler.onDataFileLoaded(data, this::applyWriteQueue, this::saveChanges))
.whenComplete((v, e) -> {
if (e != null) {
LOGGER.error("Uncaught error on creation {}: ", path, e);
@@ -180,7 +180,7 @@ public class DataMetaFile extends MetaFile
// Apply the write queue
LodUtil.assertTrue(!inCacheWriteAccessAsserter.get(),"No one should be writing to the cache while we are in the process of " +
"loading one into the cache! Is this a deadlock?");
data = handler.onDataFileLoaded(data, this::applyWriteQueue);
data = handler.onDataFileLoaded(data, this::applyWriteQueue, this::saveChanges);
// Finally, return the data.
return data;
}, handler.getIOExecutor())
@@ -230,7 +230,7 @@ public class DataMetaFile extends MetaFile
// For now, I'll go for the latter option and just hope nothing goes wrong...
if (inCacheWriteAccessAsserter.getAndSet(true) == false) {
try {
return handler.onDataFileRefresh((LodDataSource) inner, this::applyWriteQueue);
return handler.onDataFileRefresh((LodDataSource) inner, this::applyWriteQueue, this::saveChanges);
} catch (Exception e) {
LOGGER.error("Error while applying changes to LodDataSource at {}: ", pos, e);
} finally {
@@ -268,8 +268,24 @@ public class DataMetaFile extends MetaFile
_backQueue = queue;
}
private void saveChanges(LodDataSource data) {
try {
// Write/Update data
LodUtil.assertTrue(metaData != null);
metaData.dataLevel = data.getDataDetail();
loader = DataSourceLoader.getLoader(data.getClass(), data.getDataVersion());
LodUtil.assertTrue(loader != null, "No loader for {} (v{})", data.getClass(), data.getDataVersion());
dataType = data.getClass();
metaData.dataTypeId = loader == null ? 0 : loader.datatypeId;
metaData.loaderVersion = data.getDataVersion();
super.writeData((out) -> data.saveData(level, this, out));
} catch (IOException e) {
LOGGER.error("Failed to save updated data file at {} for sect {}", path, pos, e);
}
}
// Return whether any write has happened to the data
private void applyWriteQueue(LodDataSource data) {
private boolean applyWriteQueue(LodDataSource data) {
// Poll the write queue
// First check if write queue is empty, then swap the write queue.
// Must be done in this order to ensure isValid work properly. See isValid() for details.
@@ -281,21 +297,9 @@ public class DataMetaFile extends MetaFile
data.update(chunk);
}
_backQueue.queue.clear();
try {
// Write/Update data
LodUtil.assertTrue(metaData != null);
metaData.dataLevel = data.getDataDetail();
loader = DataSourceLoader.getLoader(data.getClass(), data.getDataVersion());
LodUtil.assertTrue(loader != null, "No loader for {} (v{})", data.getClass(), data.getDataVersion());
dataType = data.getClass();
metaData.dataTypeId = loader == null ? 0 : loader.datatypeId;
metaData.loaderVersion = data.getDataVersion();
super.writeData((out) -> data.saveData(level, this, out));
LOGGER.info("Updated Data file at {} for sect {} with {} chunk writes.", path, pos, count);
} catch (IOException e) {
LOGGER.error("Failed to save updated data file at {} for sect {}", path, pos, e);
}
LOGGER.info("Updated Data file at {} for sect {} with {} chunk writes.", path, pos, count);
}
return !isEmpty;
}
private FileInputStream getDataContent() throws IOException {
@@ -15,6 +15,7 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
public interface IDataSourceProvider extends AutoCloseable {
void addScannedFile(Collection<File> detectedFiles);
@@ -26,8 +27,8 @@ public interface IDataSourceProvider extends AutoCloseable {
long getLatestCacheVersion(DhSectionPos sectionPos);
CompletableFuture<LodDataSource> onCreateDataFile(DataMetaFile file);
LodDataSource onDataFileLoaded(LodDataSource source, Consumer<LodDataSource> updater);
CompletableFuture<LodDataSource> onDataFileRefresh(LodDataSource source, Consumer<LodDataSource> updater);
LodDataSource onDataFileLoaded(LodDataSource source, Function<LodDataSource, Boolean> updater, Consumer<LodDataSource> onUpdated);
CompletableFuture<LodDataSource> onDataFileRefresh(LodDataSource source, Function<LodDataSource, Boolean> updater, Consumer<LodDataSource> onUpdated);
File computeDataFilePath(DhSectionPos pos);
Executor getIOExecutor();
@@ -53,7 +53,15 @@ public class RenderFileHandler implements IRenderSourceProvider {
RenderMetaFile metaFile = new RenderMetaFile(this, file);
filesByPos.put(metaFile.pos, metaFile);
} catch (IOException e) {
throw new RuntimeException(e);
LOGGER.error("Failed to read render meta file at {}: ", file, e);
File corruptedFile = new File(file.getParentFile(), file.getName() + ".corrupted");
if (corruptedFile.exists()) corruptedFile.delete();
if (file.renameTo(corruptedFile)) {
LOGGER.error("Renamed corrupted file to {}", file.getName() + ".corrupted");
} else {
LOGGER.error("Failed to rename corrupted file to {}. Will try and delete file", file.getName() + ".corrupted");
file.delete();
}
}
}
}
@@ -1,5 +1,7 @@
package com.seibel.lod.core.a7.util;
import java.util.concurrent.CompletionException;
public class UncheckedInterruptedException extends RuntimeException {
public UncheckedInterruptedException(String message) {
super(message);
@@ -32,9 +34,12 @@ public class UncheckedInterruptedException extends RuntimeException {
throw convert((InterruptedException) t);
} else if (t instanceof UncheckedInterruptedException) {
throw (UncheckedInterruptedException) t;
} else if (t instanceof CompletionException) {
rethrowIfIsInterruption(t.getCause());
}
}
public static boolean isThrowableInterruption(Throwable t) {
return t instanceof InterruptedException || t instanceof UncheckedInterruptedException;
return t instanceof InterruptedException || t instanceof UncheckedInterruptedException
|| (t instanceof CompletionException && isThrowableInterruption(t.getCause()));
}
}