From 6a9986ccd399814014ff8db7e7913dd6713f21de Mon Sep 17 00:00:00 2001 From: James Seibel Date: Sat, 4 Jan 2025 09:40:01 -0600 Subject: [PATCH] Move networked chunk updating to the LodBuilder thread This is done both to prevent starvation, infinitely growing tasks/memory, and simplify the AbstractDhServerLevel.updateDataSourcesAsync() method. --- .../core/file/AbstractDataSourceHandler.java | 43 ++++++++++++------- .../DelayedFullDataSourceSaveCache.java | 30 ++++++++++--- .../core/level/AbstractDhLevel.java | 7 +-- .../core/level/AbstractDhServerLevel.java | 35 +++++---------- .../core/level/DhClientServerLevel.java | 9 ---- .../distanthorizons/core/level/IDhLevel.java | 2 +- 6 files changed, 66 insertions(+), 60 deletions(-) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java index ad9790846..5950b048f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java @@ -5,7 +5,6 @@ import com.seibel.distanthorizons.core.file.structure.ISaveStructure; import com.seibel.distanthorizons.core.level.IDhLevel; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.pos.DhSectionPos; -import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo; import com.seibel.distanthorizons.core.sql.dto.IBaseDTO; import com.seibel.distanthorizons.core.util.LodUtil; @@ -70,6 +69,8 @@ public abstract class AbstractDataSourceHandler public final ArrayList> dateSourceUpdateListeners = new ArrayList<>(); + public final ConcurrentHashMap> updateDataSourceFutureByPos = new ConcurrentHashMap<>(); + //=============// @@ -187,7 +188,7 @@ public abstract class AbstractDataSourceHandler public CompletableFuture updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource) { - ThreadPoolExecutor executor = ThreadPoolUtil.getUpdatePropagatorExecutor(); + ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); if (executor == null || executor.isTerminated()) { return CompletableFuture.completedFuture(null); @@ -196,23 +197,35 @@ public abstract class AbstractDataSourceHandler try { - // run file handling on a separate thread - this.markUpdateStart(inputDataSource.getPos()); - return CompletableFuture.runAsync(() -> + return this.updateDataSourceFutureByPos.compute(inputDataSource.getPos(), (Long newPos, CompletableFuture future) -> { - try + if (future != null) { - this.updateDataSourceAtPos(inputDataSource.getPos(), inputDataSource, true); + future.cancel(false); + this.markUpdateEnd(newPos); } - catch (Exception e) + + // run file handling on a separate thread + this.markUpdateStart(newPos); + future = CompletableFuture.runAsync(() -> { - LOGGER.error("Unexpected error in async data source update, error: "+e.getMessage(), e); - } - finally - { - this.markUpdateEnd(inputDataSource.getPos()); - } - }, executor); + try + { + this.updateDataSourceAtPos(newPos, inputDataSource, true); + } + catch (Exception e) + { + LOGGER.error("Unexpected error in async data source update at pos: ["+DhSectionPos.toString(newPos)+"], error: ["+e.getMessage()+"].", e); + } + finally + { + this.markUpdateEnd(newPos); + this.updateDataSourceFutureByPos.remove(newPos); + } + }, executor); + + return future; + }); } catch (RejectedExecutionException ignore) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/DelayedFullDataSourceSaveCache.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/DelayedFullDataSourceSaveCache.java index 56682c1c1..aa59239a9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/DelayedFullDataSourceSaveCache.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/DelayedFullDataSourceSaveCache.java @@ -8,6 +8,7 @@ import org.jetbrains.annotations.NotNull; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** @@ -23,6 +24,7 @@ public class DelayedFullDataSourceSaveCache public final ConcurrentHashMap dataSourceByPosition = new ConcurrentHashMap<>(); private final ConcurrentHashMap saveTimerTasksBySectionPos = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> futureBySectionPos = new ConcurrentHashMap<>(); private final ISaveDataSourceFunc onSaveTimeoutFunc; private final int saveDelayInMs; @@ -49,16 +51,20 @@ public class DelayedFullDataSourceSaveCache * Writing into memory is done synchronously so inputDataSource can * be closed after this method finishes. */ - public void writeDataSourceToMemoryAndQueueSave(FullDataSourceV2 inputDataSource) + public CompletableFuture writeDataSourceToMemoryAndQueueSave(FullDataSourceV2 inputDataSource) { long dataSourcePos = inputDataSource.getPos(); - this.dataSourceByPosition.compute(dataSourcePos, (inputPos, temporaryDataSource) -> + + CompletableFuture future = this.futureBySectionPos.computeIfAbsent(dataSourcePos, (inputPos) -> new CompletableFuture<>()); + + this.dataSourceByPosition.compute(dataSourcePos, (inputPos, memoryDataSource) -> { - if (temporaryDataSource == null) + if (memoryDataSource == null) { - temporaryDataSource = FullDataSourceV2.createEmpty(inputPos); + // should not be closed since it will be used by other threads + memoryDataSource = FullDataSourceV2.createEmpty(inputPos); } - temporaryDataSource.update(inputDataSource); + memoryDataSource.update(inputDataSource); TimerTask timerTask = new TimerTask() @@ -80,6 +86,14 @@ public class DelayedFullDataSourceSaveCache { LOGGER.error("Failed to save updated data for section ["+dataSourcePos+"], error: ["+e.getMessage()+"]", e); } + finally + { + CompletableFuture future = DelayedFullDataSourceSaveCache.this.futureBySectionPos.remove(dataSourcePos); + if (future != null) + { + future.complete(null); + } + } } }; try @@ -90,7 +104,7 @@ public class DelayedFullDataSourceSaveCache { // James isn't sure why this is possible since this logic is inside a lock, // maybe the timer is just async enough that there can be problems? - LOGGER.warn("Attempted to queue an already canceled task. Pos: ["+dataSourcePos+"], task already queued for pos: ["+this.saveTimerTasksBySectionPos.containsKey(dataSourcePos)+"]"); + //LOGGER.warn("Attempted to queue an already canceled task. Pos: ["+dataSourcePos+"], task already queued for pos: ["+this.saveTimerTasksBySectionPos.containsKey(dataSourcePos)+"]"); } @@ -102,8 +116,10 @@ public class DelayedFullDataSourceSaveCache oldTask.cancel(); } - return temporaryDataSource; + return memoryDataSource; }); + + return future; } public int getUnsavedCount() { return this.dataSourceByPosition.size(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java index 1382fd9a8..684059aa6 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java @@ -45,6 +45,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; public abstract class AbstractDhLevel implements IDhLevel @@ -144,7 +145,7 @@ public abstract class AbstractDhLevel implements IDhLevel public int getUnsavedDataSourceCount() { return this.delayedFullDataSourceSaveCache.getUnsavedCount(); } @Override - public void updateChunkAsync(IChunkWrapper chunkWrapper, int chunkHash) + public CompletableFuture updateChunkAsync(IChunkWrapper chunkWrapper, int chunkHash) { // data source synchronously written to memory so it can be safely closed try (FullDataSourceV2 dataSource = FullDataSourceV2.createFromChunk(chunkWrapper)) @@ -152,7 +153,7 @@ public abstract class AbstractDhLevel implements IDhLevel if (dataSource == null) { // This can happen if, among other reasons, a chunk save is superseded by a later event - return; + return CompletableFuture.completedFuture(null); } @@ -168,7 +169,7 @@ public abstract class AbstractDhLevel implements IDhLevel this.updatedChunkHashesByChunkPos.put(chunkWrapper.getChunkPos(), chunkHash); // batch updates to reduce overhead when flying around or breaking/placing a lot of blocks in an area - this.delayedFullDataSourceSaveCache.writeDataSourceToMemoryAndQueueSave(dataSource); + return this.delayedFullDataSourceSaveCache.writeDataSourceToMemoryAndQueueSave(dataSource); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index c49b3b113..ac44fc7fb 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -237,23 +237,16 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I @Override public CompletableFuture updateDataSourcesAsync(FullDataSourceV2 data) { - if (!Config.Server.enableRealTimeUpdates.get()) - { - return this.getFullDataProvider().updateDataSourceAsync(data); - } - - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); - if (executor == null) - { - LOGGER.warn("Unable to send FullDataPartialUpdateMessage - getNetworkCompressionExecutor() is null"); - return this.getFullDataProvider().updateDataSourceAsync(data); - } - - try - { - CompletableFuture.runAsync(() -> + return this.getFullDataProvider() + .updateDataSourceAsync(data) + .thenRun(() -> { - Objects.requireNonNull(this.beaconBeamRepo); + if (!Config.Server.enableRealTimeUpdates.get()) + { + return; + } + + LodUtil.assertTrue(this.beaconBeamRepo != null, "beaconBeamRepo should not be null"); try (FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos()))) { for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers()) @@ -280,15 +273,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I } } } - }, executor); - } - catch (RejectedExecutionException ignore) - { - // the executor was shut down, it should be back up shortly and able to accept new jobs - } - - - return this.getFullDataProvider().updateDataSourceAsync(data); + }); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientServerLevel.java index 0ffe9a7de..5d3e156a1 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientServerLevel.java @@ -113,15 +113,6 @@ public class DhClientServerLevel extends AbstractDhServerLevel implements IDhCli @Override public void clearRenderCache() { this.clientside.clearRenderCache(); } - @Override - public CompletableFuture updateDataSourcesAsync(FullDataSourceV2 data) - { - return CompletableFuture.allOf( - super.updateDataSourcesAsync(data), - this.clientside.updateDataSourcesAsync(data) - ); - } - //===========// diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java index f63afc80d..01dd069a8 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java @@ -48,7 +48,7 @@ public interface IDhLevel extends AutoCloseable, GeneratedFullDataSourceProvider /** @return 0 if no hash is known */ int getChunkHash(DhChunkPos pos); - void updateChunkAsync(IChunkWrapper chunk, int newChunkHash); + CompletableFuture updateChunkAsync(IChunkWrapper chunk, int newChunkHash); void loadBeaconBeamsInPos(long pos); void updateBeaconBeamsForChunk(IChunkWrapper chunkToUpdate, ArrayList nearbyChunkList);