Continue improving stabilises of the system

This commit is contained in:
TomTheFurry
2022-09-19 14:54:45 +08:00
parent 4158573129
commit 905e73fd1c
6 changed files with 67 additions and 45 deletions
@@ -5,12 +5,10 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.seibel.lod.core.datatype.full.ChunkSizedData;
import com.seibel.lod.core.level.ILevel;
import com.seibel.lod.core.config.Config;
import com.seibel.lod.core.logging.ConfigBasedLogger;
import com.seibel.lod.core.pos.DhChunkPos;
import com.seibel.lod.core.util.*;
import com.seibel.lod.core.util.objects.EventLoop;
import com.seibel.lod.core.wrapperInterfaces.chunk.IChunkWrapper;
import org.apache.logging.log4j.LogManager;
@@ -18,6 +16,9 @@ import org.apache.logging.log4j.LogManager;
public class ChunkToLodBuilder {
public static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Debugging.DebugSwitch.logLodBuilderEvent.get());
public static final long MAX_TICK_TIME_NS = 1000000000L / 20L;
public static final int THREAD_COUNT = 1;
static class Task {
final DhChunkPos chunkPos;
final CompletableFuture<ChunkSizedData> future;
@@ -28,7 +29,7 @@ public class ChunkToLodBuilder {
}
private final ConcurrentHashMap<DhChunkPos, IChunkWrapper> latestChunkToBuild = new ConcurrentHashMap<>();
private final ConcurrentLinkedDeque<Task> taskToBuild = new ConcurrentLinkedDeque<>();
private final ExecutorService executor = LodUtil.makeThreadPool(4, ChunkToLodBuilder.class);
private final ExecutorService executor = LodUtil.makeThreadPool(THREAD_COUNT, ChunkToLodBuilder.class);
private final AtomicInteger runningCount = new AtomicInteger(0);
public CompletableFuture<ChunkSizedData> tryGenerateData(IChunkWrapper chunk) {
@@ -45,48 +46,63 @@ public class ChunkToLodBuilder {
}
public void tick() {
while (true) {
if (runningCount.get() > 8192) return;
Task task = taskToBuild.pollFirst();
if (task == null) return; // There's no jobs.
IChunkWrapper latestChunk = latestChunkToBuild.remove(task.chunkPos); // Basically an Exchange operation
if (latestChunk == null) {
LOGGER.error("Somehow Task at {} has latestChunk as null! Skipping task!", task.chunkPos);
task.future.complete(null);
return;
}
if (runningCount.get() >= THREAD_COUNT) return;
if (taskToBuild.isEmpty()) return;
for (int i = 0; i<THREAD_COUNT; i++) {
runningCount.incrementAndGet();
CompletableFuture.supplyAsync(() -> {
long time = System.nanoTime();
if (LodDataBuilder.canGenerateLodFromChunk(latestChunk)) {
ChunkSizedData data = LodDataBuilder.createChunkData(latestChunk);
if (data != null) {
long time2 = System.nanoTime();
LOGGER.info("Processed Task at {} using {}", task.chunkPos, Duration.ofNanos(time2 - time));
task.future.complete(data);
return true;
}
CompletableFuture.runAsync(() -> {
try {
_tick();
} finally {
runningCount.decrementAndGet();
}
return false;
}, executor).handle((b, ex) -> {
runningCount.decrementAndGet();
if (ex == null && b) return true;
if (ex != null) {
LOGGER.error("Error while processing Task at {}!", task.chunkPos, ex);
}
// Failed to build due to chunk not meeting requirement.
IChunkWrapper casChunk = latestChunkToBuild.putIfAbsent(task.chunkPos, latestChunk); // CAS operation with expected=null
if (casChunk == null) // That means CAS have been successful
taskToBuild.addLast(task); // Then add back the same old task.
else // Else, it means someone managed to sneak in a new gen request in this pos. Then lets drop this old task.
task.future.complete(null);
return false;
});
}, executor);
}
}
private void _tick() {
}
long time = System.nanoTime();
int count = 0;
boolean allDone = false;
while (true) {
if (System.nanoTime() - time > MAX_TICK_TIME_NS && !taskToBuild.isEmpty()) break;
Task task = taskToBuild.pollFirst();
if (task == null) {
allDone = true;
break;
}
count++;
IChunkWrapper latestChunk = latestChunkToBuild.remove(task.chunkPos); // Basically an Exchange operation
if (latestChunk == null) {
LOGGER.error("Somehow Task at {} has latestChunk as null! Skipping task!", task.chunkPos);
task.future.complete(null);
continue;
}
try {
if (LodDataBuilder.canGenerateLodFromChunk(latestChunk)) {
ChunkSizedData data = LodDataBuilder.createChunkData(latestChunk);
if (data != null) {
task.future.complete(data);
continue;
}
}
} catch (Exception ex) {
LOGGER.error("Error while processing Task at {}!", task.chunkPos, ex);
}
// Failed to build due to chunk not meeting requirement.
IChunkWrapper casChunk = latestChunkToBuild.putIfAbsent(task.chunkPos, latestChunk); // CAS operation with expected=null
if (casChunk == null || latestChunk.isStillValid()) // That means CAS have been successful
taskToBuild.addLast(task); // Then add back the same old task.
else // Else, it means someone managed to sneak in a new gen request in this pos. Then lets drop this old task.
task.future.complete(null);
count--;
}
long time2 = System.nanoTime();
if (!allDone) {
//LOGGER.info("Completed {} tasks in {} in this tick", count, Duration.ofNanos(time2 - time));
} else if (count > 0) {
//LOGGER.info("Completed all {} tasks in {}", count, Duration.ofNanos(time2 - time));
}
}
}
@@ -13,7 +13,7 @@ import java.util.concurrent.ExecutorService;
//TODO: Merge this with FullToColumnTransformer
public class DataRenderTransformer {
public static final ExecutorService TRANSFORMER_THREADS
= LodUtil.makeThreadPool(2, "Data/Render Transformer");
= LodUtil.makeThreadPool(4, "Data/Render Transformer");
public static CompletableFuture<LodRenderSource> transformDataSource(LodDataSource data, IClientLevel level) {
return CompletableFuture.supplyAsync(() -> transform(data, level), TRANSFORMER_THREADS);
@@ -5,6 +5,7 @@ import com.seibel.lod.core.datatype.LodDataSource;
import com.seibel.lod.core.datatype.full.ChunkSizedData;
import com.seibel.lod.core.datatype.full.FullDataSource;
import com.seibel.lod.core.datatype.full.SparseDataSource;
import com.seibel.lod.core.file.MetaFile;
import com.seibel.lod.core.level.ILevel;
import com.seibel.lod.core.pos.DhLodPos;
import com.seibel.lod.core.pos.DhSectionPos;
@@ -300,8 +301,10 @@ public class DataFileHandler implements IDataSourceProvider {
}
@Override
public LodDataSource onDataFileLoaded(LodDataSource source, Function<LodDataSource, Boolean> updater, Consumer<LodDataSource> onUpdated) {
public LodDataSource onDataFileLoaded(LodDataSource source, MetaFile.MetaData metaData,
Consumer<LodDataSource> onUpdated, Function<LodDataSource, Boolean> updater) {
boolean changed = updater.apply(source);
if (changed) metaData.dataVersion.incrementAndGet();
if (source instanceof SparseDataSource) {
LodDataSource newSource = ((SparseDataSource) source).trySelfPromote();
changed |= newSource != source;
@@ -151,7 +151,7 @@ public class DataMetaFile extends MetaFile
metaData = makeMetaData(data);
return data;
})
.thenApply((data) -> handler.onDataFileLoaded(data, this::applyWriteQueue, this::saveChanges))
.thenApply((data) -> handler.onDataFileLoaded(data, metaData, this::saveChanges, this::applyWriteQueue))
.whenComplete((v, e) -> {
if (e != null) {
LOGGER.error("Uncaught error on creation {}: ", path, e);
@@ -177,7 +177,7 @@ public class DataMetaFile extends MetaFile
// Apply the write queue
LodUtil.assertTrue(!inCacheWriteAccessAsserter.get(),"No one should be writing to the cache while we are in the process of " +
"loading one into the cache! Is this a deadlock?");
data = handler.onDataFileLoaded(data, this::applyWriteQueue, this::saveChanges);
data = handler.onDataFileLoaded(data, metaData, this::saveChanges, this::applyWriteQueue);
// Finally, return the data.
return data;
}, handler.getIOExecutor())
@@ -2,6 +2,7 @@ package com.seibel.lod.core.file.datafile;
import com.seibel.lod.core.datatype.LodDataSource;
import com.seibel.lod.core.datatype.full.ChunkSizedData;
import com.seibel.lod.core.file.MetaFile;
import com.seibel.lod.core.pos.DhSectionPos;
import java.io.File;
@@ -21,7 +22,7 @@ public interface IDataSourceProvider extends AutoCloseable {
long getLatestCacheVersion(DhSectionPos sectionPos);
CompletableFuture<LodDataSource> onCreateDataFile(DataMetaFile file);
LodDataSource onDataFileLoaded(LodDataSource source, Function<LodDataSource, Boolean> updater, Consumer<LodDataSource> onUpdated);
LodDataSource onDataFileLoaded(LodDataSource source, MetaFile.MetaData metaData, Consumer<LodDataSource> onUpdated, Function<LodDataSource, Boolean> updater);
CompletableFuture<LodDataSource> onDataFileRefresh(LodDataSource source, Function<LodDataSource, Boolean> updater, Consumer<LodDataSource> onUpdated);
File computeDataFilePath(DhSectionPos pos);
Executor getIOExecutor();
@@ -92,4 +92,6 @@ public interface IChunkWrapper extends IBindable
IBiomeWrapper getBiome(int x, int y, int z);
DhChunkPos getChunkPos();
boolean isStillValid();
}