diff --git a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java index 217c9cc9f..e4f20b243 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java @@ -50,6 +50,7 @@ import org.jetbrains.annotations.Nullable; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** Contains code and variables used by both {@link ClientApi} and {@link ServerApi} */ public class SharedApi @@ -72,6 +73,12 @@ public class SharedApi */ private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD_AND_PLAYER = 1_000; + /** + * Used to limit how many tasks are queued at once. + * This is done so the chunks nearest to the player will trigger first. + */ + private static final AtomicInteger GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF = new AtomicInteger(0); + /** how many milliseconds must pass before an overloaded message can be sent in chat or the log */ private static final int MIN_MS_BETWEEN_OVERLOADED_LOG_MESSAGE = 30_000; @@ -123,6 +130,7 @@ public class SharedApi AbstractDhRepo.closeAllConnections(); // needs to be closed on world shutdown to clear out un-processed chunks UPDATE_POS_MANAGER.clear(); + GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.set(0); // recommend that the garbage collector cleans up any objects from the old world and thread pools System.gc(); @@ -352,39 +360,54 @@ public class SharedApi + tryStartNextQueuedChunk(); + } + /** + * TODO this is bad logic. + * This should be fired on a separate looping thread or something else. + * Only firing whenever a new chunk is added to the queue can cause the queue to miss things. + */ + private static void tryStartNextQueuedChunk() + { // queue updates up to the number of CPU cores allocated for the job // (this prevents doing extra work queuing tasks that may not be necessary) // and makes sure the chunks closest to the player are updated first PriorityTaskPicker.Executor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); - if (executor != null && executor.getQueueSize() < executor.getPoolSize()) - { - try - { - executor.execute(SharedApi::processQueuedChunkUpdate); - } - catch (RejectedExecutionException ignore) - { - // the executor was shut down, it should be back up shortly and able to accept new jobs - } - } - } - private static void processQueuedChunkUpdate() - { - //LOGGER.trace(chunkWrapper.getChunkPos() + " " + executor.getActiveCount() + " / " + executor.getQueue().size() + " - " + executor.getCompletedTaskCount()); - - UpdateChunkData updateData = UPDATE_POS_MANAGER.popClosest(); - if (updateData == null) + if (executor == null + || GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.getAndIncrement() > executor.getPoolSize()) { + GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.decrementAndGet(); return; } - IChunkWrapper chunkWrapper = updateData.chunkWrapper; - @Nullable ArrayList neighbourChunkList = updateData.neighbourChunkList; - IDhLevel dhLevel = updateData.dhLevel; - try { + executor.execute(SharedApi::processQueuedChunkUpdate); + } + catch (RejectedExecutionException ignore) + { + // the executor was shut down, it should be back up shortly and able to accept new jobs + GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.decrementAndGet(); + } + } + private static void processQueuedChunkUpdate() + { + DhChunkPos updatingChunkPos = null; + try + { + UpdateChunkData updateData = UPDATE_POS_MANAGER.popClosest(); + if (updateData == null) + { + return; + } + + IChunkWrapper chunkWrapper = updateData.chunkWrapper; + updatingChunkPos = chunkWrapper.getChunkPos(); + @Nullable ArrayList neighbourChunkList = updateData.neighbourChunkList; + IDhLevel dhLevel = updateData.dhLevel; + + boolean checkChunkHash = !Config.Common.LodBuilding.disableUnchangedChunkCheck.get(); // check if this chunk has been converted into an LOD already @@ -438,23 +461,12 @@ public class SharedApi } catch (Exception e) { - LOGGER.error("Unexpected error when updating chunk at pos: [" + chunkWrapper.getChunkPos() + "]", e); + LOGGER.error("Unexpected error when updating chunk at pos: [" + updatingChunkPos + "]", e); } finally { - // queue the next position if there are still positions to process - AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); - if (executor != null && !UPDATE_POS_MANAGER.updateDataByChunkPos.isEmpty()) - { - try - { - executor.execute(SharedApi::processQueuedChunkUpdate); - } - catch (RejectedExecutionException ignore) - { - // the executor was shut down, it should be back up shortly and able to accept new jobs - } - } + GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.decrementAndGet(); + tryStartNextQueuedChunk(); } } @@ -609,6 +621,11 @@ public class SharedApi } DhChunkPos closest = this.closestQueue.poll(); + if (closest == null) + { + return null; + } + this.furthestQueue.remove(closest); return this.updateDataByChunkPos.remove(closest); }