diff --git a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java index b8cb312b1..de37e12e9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java @@ -34,6 +34,7 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo; import com.seibel.distanthorizons.core.util.LodUtil; import com.seibel.distanthorizons.core.util.objects.Pair; +import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.world.*; import com.seibel.distanthorizons.core.wrapperInterfaces.chunk.IChunkWrapper; @@ -344,8 +345,8 @@ public class SharedApi // queue updates up to the number of CPU cores allocated for the job // (this prevents doing extra work queuing tasks that may not be necessary) // and makes sure the chunks closest to the player are updated first - ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); - if (executor != null && executor.getQueue().size() < executor.getCorePoolSize()) + PriorityTaskPicker.Executor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); + if (executor != null && executor.getQueueSize() < executor.getPoolSize()) { try { @@ -432,7 +433,7 @@ public class SharedApi finally { // queue the next position if there are still positions to process - ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); if (executor != null && !UPDATE_POS_MANAGER.updateDataByChunkPos.isEmpty()) { try diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java index 5950b048f..267d822aa 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/AbstractDataSourceHandler.java @@ -114,7 +114,7 @@ public abstract class AbstractDataSourceHandler */ public CompletableFuture getAsync(long pos) { - ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor(); if (executor == null || executor.isTerminated()) { return CompletableFuture.completedFuture(null); @@ -188,7 +188,7 @@ public abstract class AbstractDataSourceHandler public CompletableFuture updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource) { - ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); if (executor == null || executor.isTerminated()) { return CompletableFuture.completedFuture(null); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java index 0039b56e4..d48f82df7 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java @@ -18,6 +18,7 @@ import java.io.File; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; @@ -96,7 +97,7 @@ public class FullDataSourceProviderV1 */ public CompletableFuture getAsync(long pos) { - ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor(); if (executor == null || executor.isTerminated()) { return CompletableFuture.completedFuture(null); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java index 6ae54b04c..83e62601f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java @@ -40,6 +40,7 @@ import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo; import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo; import com.seibel.distanthorizons.core.util.ThreadUtil; import com.seibel.distanthorizons.core.util.objects.DataCorruptedException; +import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import it.unimi.dsi.fastutil.longs.LongArrayList; @@ -208,7 +209,7 @@ public class FullDataSourceProviderV2 { Thread.sleep(UPDATE_QUEUE_THREAD_DELAY_IN_MS); - ThreadPoolExecutor executor = ThreadPoolUtil.getUpdatePropagatorExecutor(); + PriorityTaskPicker.Executor executor = ThreadPoolUtil.getUpdatePropagatorExecutor(); if (executor == null || executor.isTerminated()) { continue; @@ -225,7 +226,7 @@ public class FullDataSourceProviderV2 } // queue parent updates - if (executor.getQueue().size() < MAX_UPDATE_TASK_COUNT + if (executor.getQueueSize() < MAX_UPDATE_TASK_COUNT && this.parentUpdatingPosSet.size() < MAX_UPDATE_TASK_COUNT) { // get the positions that need to be applied to their parents diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index 2edd1d6d5..4bb2c8cc9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -36,6 +36,7 @@ import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.util.LodUtil; +import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import it.unimi.dsi.fastutil.bytes.ByteArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList; @@ -194,16 +195,16 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im } - ThreadPoolExecutor updateExecutor = ThreadPoolUtil.getUpdatePropagatorExecutor(); - if (updateExecutor == null || updateExecutor.getQueue().size() >= MAX_UPDATE_TASK_COUNT / 2) + PriorityTaskPicker.Executor updateExecutor = ThreadPoolUtil.getUpdatePropagatorExecutor(); + if (updateExecutor == null || updateExecutor.getQueueSize() >= MAX_UPDATE_TASK_COUNT / 2) { // don't queue additional world gen requests if the updater is behind return false; } - ThreadPoolExecutor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor(); - if (fileExecutor == null || fileExecutor.getQueue().size() >= MAX_UPDATE_TASK_COUNT / 2) + PriorityTaskPicker.Executor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor(); + if (fileExecutor == null || fileExecutor.getQueueSize() >= MAX_UPDATE_TASK_COUNT / 2) { // don't queue additional world gen requests if the file handler is overwhelmed, // otherwise LODs may not load in properly diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java index f52f476c4..bf953a93e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java @@ -45,6 +45,7 @@ import com.seibel.distanthorizons.core.util.objects.DataCorruptedException; import com.seibel.distanthorizons.core.util.objects.RollingAverage; import com.seibel.distanthorizons.core.util.objects.UncheckedInterruptedException; import com.seibel.distanthorizons.core.util.LodUtil; +import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.world.DhApiWorldProxy; import com.seibel.distanthorizons.core.wrapperInterfaces.IWrapperFactory; @@ -234,7 +235,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb } public boolean isGeneratorBusy() { - ThreadPoolExecutor executor = ThreadPoolUtil.getWorldGenExecutor(); + PriorityTaskPicker.Executor executor = ThreadPoolUtil.getWorldGenExecutor(); if (executor == null) { // shouldn't happen, but just in case, don't queue more tasks @@ -243,7 +244,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb int worldGenThreadCount = Math.max(Config.Common.MultiThreading.numberOfThreads.get(), 1); int maxWorldGenTaskCount = worldGenThreadCount * MAX_QUEUED_TASKS_PER_THREAD; - return executor.getQueue().size() > maxWorldGenTaskCount; + return executor.getQueueSize() > maxWorldGenTaskCount; } /** @@ -603,7 +604,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb try { int waitTimeInSeconds = 3; - ThreadPoolExecutor executor = ThreadPoolUtil.getWorldGenExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getWorldGenExecutor(); if (executor != null && !executor.awaitTermination(waitTimeInSeconds, TimeUnit.SECONDS)) { LOGGER.warn("World generator thread pool shutdown didn't complete after [" + waitTimeInSeconds + "] seconds. Some world generator requests may still be running."); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java b/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java index 4cccb3f6c..2fa18341e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java @@ -28,7 +28,7 @@ import com.seibel.distanthorizons.core.pooling.PhantomArrayListPool; import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.render.RenderBufferHandler; import com.seibel.distanthorizons.core.render.renderer.generic.GenericObjectRenderer; -import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor; +import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.world.AbstractDhWorld; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; @@ -80,11 +80,11 @@ public class F3Screen public static void addStringToDisplay(List messageList) { // multi thread pools - ThreadPoolExecutor worldGenPool = ThreadPoolUtil.getWorldGenExecutor(); - ThreadPoolExecutor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor(); - ThreadPoolExecutor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor(); - ThreadPoolExecutor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor(); - ThreadPoolExecutor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor(); + PriorityTaskPicker.Executor worldGenPool = ThreadPoolUtil.getWorldGenExecutor(); + PriorityTaskPicker.Executor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor(); + PriorityTaskPicker.Executor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor(); + PriorityTaskPicker.Executor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor(); + PriorityTaskPicker.Executor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor(); // single thread pools ThreadPoolExecutor cleanupPool = ThreadPoolUtil.getCleanupExecutor(); @@ -191,25 +191,23 @@ public class F3Screen // helper methods // //================// - private static String getThreadPoolStatString(String name, ThreadPoolExecutor pool) + private static String getThreadPoolStatString(String name, PriorityTaskPicker.Executor pool) { - String queueSize = (pool != null) ? NUMBER_FORMAT.format(pool.getQueue().size()) : "-"; + String queueSize = (pool != null) ? NUMBER_FORMAT.format(pool.getQueueSize()) : "-"; String completedCount = (pool != null) ? NUMBER_FORMAT.format(pool.getCompletedTaskCount()) : "-"; String message = name+", Tasks: "+queueSize+", Done: "+completedCount; - if (pool != null && pool.getClass() == RateLimitedThreadPoolExecutor.class) + if (pool != null) { - RateLimitedThreadPoolExecutor rateLimitedPool = ((RateLimitedThreadPoolExecutor) pool); - // active threads - int activeThreadCount = rateLimitedPool.semaphoresAcquired.get(); - int threadCount = ThreadPoolUtil.getThreadCount(); + int activeThreadCount = pool.getRunningTaskCount(); + int threadCount = pool.getPoolSize(); message += ", Active: "+activeThreadCount+"/"+threadCount; // thread runtime String runTimeAvgStr; - double runTimeAvgInMs = rateLimitedPool.getAverageRunTimeInMs(); + double runTimeAvgInMs = pool.getAverageRunTimeInMs(); if (!Double.isNaN(runTimeAvgInMs)) { runTimeAvgStr = NUMBER_FORMAT.format(runTimeAvgInMs); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index 8c0904257..d1396607d 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -228,7 +228,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende { FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(response.payload); - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor(); if (executor == null) { LOGGER.warn("Unable to handle FullDataPayload - getNetworkCompressionExecutor() is null"); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java index efde9a57f..1c0312d96 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java @@ -17,10 +17,7 @@ import org.apache.logging.log4j.LogManager; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; public class FullDataSourceRequestHandler @@ -76,7 +73,7 @@ public class FullDataSourceRequestHandler } - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor(); if (executor == null) { // shouldn't normally happen, but just in case @@ -196,7 +193,7 @@ public class FullDataSourceRequestHandler continue; } - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor(); if (executor == null) { LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java index 62008245e..faedc61cc 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java @@ -38,6 +38,7 @@ import com.seibel.distanthorizons.core.render.glObject.GLProxy; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.dataObjects.render.bufferBuilding.ColumnRenderBuffer; import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; +import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import it.unimi.dsi.fastutil.longs.LongArrayList; @@ -132,7 +133,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable return; } - ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor(); + PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor(); if (executor == null || executor.isTerminated()) { return; @@ -140,7 +141,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable // don't queue up an infinite number of tasks // doing so will cause memory use to balloon when swapping between dimensions - if (executor.getQueue().size() > executor.getPoolSize()) + if (executor.getQueueSize() > executor.getPoolSize()) { return; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java index 5b1fdc144..a7383b769 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java @@ -19,11 +19,7 @@ package com.seibel.distanthorizons.core.util; -import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener; -import com.seibel.distanthorizons.core.config.types.ConfigEntry; import com.seibel.distanthorizons.core.util.threading.DhThreadFactory; -import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore; -import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.coreapi.ModInfo; import org.apache.logging.log4j.LogManager; @@ -43,55 +39,10 @@ public class ThreadUtil public static final String THREAD_NAME_PREFIX = ModInfo.THREAD_NAME_PREFIX; - /** used to track and remove old listeners for certain pools if the thread pool is recreated. */ - private static final ConcurrentHashMap> THREAD_CHANGE_LISTENERS_BY_THREAD_NAME = new ConcurrentHashMap<>(); - // TODO move all "Runtime.getRuntime().availableProcessors()" calls here - //===================// - // rate limited pool // - //===================// - - public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, DhThreadFactory threadFactory, ConfigEntry runTimeRatioConfigEntry, PrioritySemaphore activeThreadCountSemaphore, int priority) - { - // remove the old listener if one exists - if (THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.containsKey(threadFactory.threadName)) - { - // note: this assumes only one thread pool exists with a given name - THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.get(threadFactory.threadName).close(); - THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.remove(threadFactory.threadName); - } - - if (!threadFactory.threadName.startsWith(THREAD_NAME_PREFIX)) - { - // this will only happen if a ThreadFactory is passed in that doesn't have the correct thread name - LOGGER.warn("Thread pool with the name ["+threadFactory.threadName+"] is missing the expected Distant Horizons thread prefix ["+THREAD_NAME_PREFIX+"]."); - } - - - RateLimitedThreadPoolExecutor executor = makeRateLimitedThreadPool(poolSize, runTimeRatioConfigEntry.get(), threadFactory, activeThreadCountSemaphore, priority); - - ConfigChangeListener changeListener = new ConfigChangeListener<>(runTimeRatioConfigEntry, (newRunTimeRatio) -> { executor.runTimeRatio = newRunTimeRatio; }); - THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.put(threadFactory.threadName, changeListener); - - return executor; - } - - - /** should only be used if there isn't a config controlling the run time ratio of this thread pool */ - public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, String name, Double runTimeRatio, int threadPriority, PrioritySemaphore activeThreadCountSemaphore, int priority) - { - return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, new DhThreadFactory(name, threadPriority, false), activeThreadCountSemaphore, priority); - } - public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, Double runTimeRatio, DhThreadFactory threadFactory, PrioritySemaphore activeThreadCountSemaphore, int priority) - { - return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, threadFactory, activeThreadCountSemaphore, priority); - } - - - //===============// // standard pool // //===============// diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ConfigThreadPool.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ConfigThreadPool.java deleted file mode 100644 index b26210eb3..000000000 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ConfigThreadPool.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * This file is part of the Distant Horizons mod - * licensed under the GNU LGPL v3 License. - * - * Copyright (C) 2020-2023 James Seibel - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, version 3. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program. If not, see . - */ - -package com.seibel.distanthorizons.core.util.threading; - -import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener; -import com.seibel.distanthorizons.core.config.types.ConfigEntry; -import com.seibel.distanthorizons.core.util.ThreadUtil; - -import java.util.List; -import java.util.Queue; - -/** - * Handles thread pools with config values for their - * thread count and run time ratio. - */ -public class ConfigThreadPool -{ - /** Caution must be used to prevent deadlock */ - private final PrioritySemaphore threadSemaphore; - /** higher numbers run first */ - private final int priority; - - public RateLimitedThreadPoolExecutor executor = null; - private int threadCount = 0; - - public final DhThreadFactory threadFactory; - - public final ConfigChangeListener threadCountConfigListener; - public final ConfigEntry threadCountConfig; - public final ConfigEntry runTimeRatioConfig; - - - - //=============// - // constructor // - //=============// - - public ConfigThreadPool(DhThreadFactory threadFactory, ConfigEntry threadCountConfig, ConfigEntry runTimeRatioConfig, PrioritySemaphore threadSemaphore, int priority) - { - this.threadFactory = threadFactory; - this.threadSemaphore = threadSemaphore; - this.priority = priority; - - this.threadCountConfig = threadCountConfig; - this.threadCountConfigListener = new ConfigChangeListener<>(threadCountConfig, - (threadCount) -> { this.setThreadPoolSize(threadCount); }); - this.runTimeRatioConfig = runTimeRatioConfig; - - this.setThreadPoolSize(threadCountConfig.get()); - } - - - - - //==============// - // thread setup // - //==============// - - public void setThreadPoolSize(int threadPoolSize) - { - Queue incompleteRunnableQueue = null; - if (this.executor != null) - { - // close the previous thread pool if one exists - this.executor.shutdown(); // don't do shutdown now since we don't want to throw any interrupt exceptions - incompleteRunnableQueue = this.executor.getQueue(); - } - - this.threadCount = threadPoolSize; - this.executor = ThreadUtil.makeRateLimitedThreadPool(this.threadCount, this.threadFactory, this.runTimeRatioConfig, this.threadSemaphore, this.priority); - - if (incompleteRunnableQueue != null) - { - for (Runnable runnable : incompleteRunnableQueue) - { - this.executor.execute(runnable); - } - } - } - - /** - * Stops any executing tasks and destroys the executor.
- * Does nothing if the executor isn't running. - */ - public void shutdownExecutorService() - { - if (this.executor != null) - { - this.executor.shutdownNow(); - } - - this.threadCount = 0; - } - - -} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PrioritySemaphore.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PrioritySemaphore.java deleted file mode 100644 index 0bb7cc5ff..000000000 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PrioritySemaphore.java +++ /dev/null @@ -1,164 +0,0 @@ -package com.seibel.distanthorizons.core.util.threading; - -import org.jetbrains.annotations.NotNull; - -import java.util.Random; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.locks.ReentrantLock; - -/** - * For use with {@link RateLimitedThreadPoolExecutor} - */ -public class PrioritySemaphore -{ - public int maxPermitCount; - public int currentPermitCount; - - private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); - private final ReentrantLock lock = new ReentrantLock(); - - private final Random random = new Random(); - - - - //=============// - // constructor // - //=============// - - public PrioritySemaphore(int permits) - { - this.maxPermitCount = permits; - this.currentPermitCount = this.maxPermitCount; - } - - - - //==================// - // permit acquiring // - //==================// - - /** Similar to {@link Semaphore#acquire()} */ - public void acquire(RateLimitedThreadPoolExecutor executor) throws InterruptedException - { - Thread thread = Thread.currentThread(); - this.lock.lock(); - try - { - if (this.currentPermitCount > 0) - { - // a permit is available, - // this thread can run normally - this.currentPermitCount--; - - // return to prevent running the thread's wait() method below - return; - } - } - finally - { - this.lock.unlock(); - } - - - // no permit is available - // this has to be outside the try-finally to prevent holding the lock while waiting - synchronized (thread) - { - // Calculation rules: - // - Executors with higher priority need less tasks to run before other executors - // If one executor has the priority of 3 and other if of 4, - // the latter one will need 1/4 fewer tasks in queue to get its tasks running - // - Executors with short-lived tasks run before longer lived ones - // 100k value is a multiplier to prevent precision loss - int priority = (int) ((executor.priority + 1) * executor.getTaskCount() * 100000 / executor.getAverageRunTimeInMs()); - - // this thread will be run when a permit is available - this.queue.put(new ThreadWithPriority(thread, priority)); - - thread.wait(); - } - } - - /** Similar to {@link Semaphore#release()} */ - public void release() - { - this.lock.lock(); - try - { - // wake up the nex thread if one is queued - if (!this.queue.isEmpty()) - { - Thread nextThread = this.queue.poll().thread; - synchronized (nextThread) - { - nextThread.notify(); - } - } - else - { - this.currentPermitCount++; - // don't increase past the max allowed (this can happen when changing the max permit count) - this.currentPermitCount = Math.min(this.currentPermitCount, this.maxPermitCount); - } - } - finally - { - this.lock.unlock(); - } - } - - - - //=================// - // permit changing // - //=================// - - public void changePermitCount(int val) - { - // find the max number of permits to increase by - int permitCountIncrease = Math.max(0, val - this.maxPermitCount); - - this.lock.lock(); - try - { - this.currentPermitCount += permitCountIncrease; - this.maxPermitCount = val; - } - finally - { - this.lock.unlock(); - } - } - - public int availablePermits() { return this.currentPermitCount; } - - - - //================// - // helper classes // - //================// - - /** simple sortable container to track a thread and it's priority */ - private static class ThreadWithPriority implements Comparable - { - private final Thread thread; - private final int priority; - - public ThreadWithPriority(Thread thread, int priority) - { - this.thread = thread; - this.priority = priority; - } - - @Override - public int compareTo(@NotNull ThreadWithPriority other) - { - // highest number first - return Integer.compare(other.priority, this.priority); - } - - } - - -} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java new file mode 100644 index 000000000..2c76ad373 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java @@ -0,0 +1,185 @@ +package com.seibel.distanthorizons.core.util.threading; + +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.config.types.ConfigEntry; +import com.seibel.distanthorizons.core.util.objects.RollingAverage; +import org.jetbrains.annotations.NotNull; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class PriorityTaskPicker +{ + private final ConfigEntry threadCountConfig = Config.Common.MultiThreading.numberOfThreads; + + private final RateLimitedThreadPoolExecutor threadPoolExecutor = new RateLimitedThreadPoolExecutor( + this.threadCountConfig.getMax(), + new DhThreadFactory("PriorityTaskPicker", Thread.MIN_PRIORITY, false), + new ArrayBlockingQueue<>(this.threadCountConfig.getMax()) + ); + + // Queue of executors, used to distribute tasks across executors based on priority + private final ArrayList executorQueue = new ArrayList<>(); + private int nextExecutorQueuePos = 0; + + // Lock to ensure task picking logic is thread-safe + private final ReentrantLock taskPickerLock = new ReentrantLock(); + // Indicates whether a task picking attempt is needed + private final AtomicBoolean shouldPickTask = new AtomicBoolean(false); + // Tracks the number of active threads + private final AtomicInteger occupiedThreads = new AtomicInteger(0); + + /** + * Creates an executor with a specific priority. + * Higher priority executors have more entries in the distribution queue, giving them a greater chance to run tasks. + * + * @param priority the priority level of the executor + * @return a newly created Executor + */ + public Executor createExecutor(int priority) + { + Executor executor = new Executor(); + + int entriesToAdd = priority + 1; + int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size()); + + // Distribute the executor's entries in the queue, ensuring fair distribution + for (; entriesToAdd > 0; entriesToAdd--) + { + this.executorQueue.add(executor); + Collections.rotate(this.executorQueue, -gapBetweenEntries); + } + + return executor; + } + + /** + * Tries to start the next task by iterating over executors in the queue. + * Ensures thread limits are respected and only one thread iterates over the executorQueue at a time. + */ + private void tryStartNextTask() + { + this.shouldPickTask.set(true); + + while (this.taskPickerLock.tryLock()) + { + try + { + // Exit if there's no longer a need to pick a task + if (!this.shouldPickTask.compareAndSet(true, false)) + { + // There is a small chance for a task to end up in a 'limbo' state, + // when this.shouldPickTask got set to true right here and this.taskPickerLock is not unlocked yet, + // but we'll disregard that since tasks get added often enough for this to not be an issue + + return; + } + + // Iterate over the executors in the queue, attempting to start tasks + for ( + int taskPickAttempts = 0; + taskPickAttempts < this.executorQueue.size() && this.occupiedThreads.get() < this.threadCountConfig.get(); + taskPickAttempts++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size() + ) + { + Executor executor = this.executorQueue.get(this.nextExecutorQueuePos); + + Runnable task = executor.tasks.poll(); + if (task != null) + { + // Update variables related to task status + this.occupiedThreads.getAndIncrement(); + executor.runningTasks.getAndIncrement(); + + // Prevent exiting early since there might be more than this.executorQueue.size() tasks waiting in queue + taskPickAttempts = 0; + + this.threadPoolExecutor.execute(task); + } + } + } + finally + { + this.taskPickerLock.unlock(); + + // If someone else manages to pick up a lock before us, we'll leave early, and they will do our work + } + } + } + + /** + * Shuts down the thread pool immediately, stopping all tasks. + */ + public void shutdown() + { + this.threadPoolExecutor.shutdownNow(); + } + + + public class Executor extends AbstractExecutorService + { + private final Queue tasks = new ConcurrentLinkedQueue<>(); + + private final AtomicInteger runningTasks = new AtomicInteger(0); + private final AtomicInteger completedTasks = new AtomicInteger(0); + private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200); + + + @Override + public void execute(@NotNull Runnable command) + { + this.tasks.add(() -> { + long startTime = System.nanoTime(); + try + { + command.run(); + } + finally + { + long timeElapsed = System.nanoTime() - startTime; + this.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed)); + + // Update variables related to task status + PriorityTaskPicker.this.occupiedThreads.getAndDecrement(); + this.runningTasks.getAndDecrement(); + this.completedTasks.getAndIncrement(); + + // Attempt to start another task + PriorityTaskPicker.this.tryStartNextTask(); + } + }); + + // Attempt to pick up the task immediately + PriorityTaskPicker.this.tryStartNextTask(); + } + + + public int getQueueSize() { return this.tasks.size(); } + public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); } + + public int getRunningTaskCount() { return this.runningTasks.get(); } + public int getCompletedTaskCount() { return this.completedTasks.get(); } + /** Will return NaN if nothing has been submitted yet */ + public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); } + + + @Override + public void shutdown() { throw new UnsupportedOperationException(); } + + @Override + public @NotNull List shutdownNow() { throw new UnsupportedOperationException(); } + + @Override + public boolean isShutdown() { return false; } + @Override + public boolean isTerminated() { return false; } + + @Override + public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) { return false; } + + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java index efc8df1da..7ba53b579 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java @@ -19,49 +19,26 @@ package com.seibel.distanthorizons.core.util.threading; +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.config.types.ConfigEntry; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; -import com.seibel.distanthorizons.core.util.objects.RollingAverage; import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import java.util.ConcurrentModificationException; -import java.util.Iterator; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; /** * Can be used to more finely control CPU usage and * reduce CPU usage if only 1 thread is already assigned. */ -public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements Comparable +public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); - /** logs include the thread name by default which can help diagnose deadlocks */ - private static final boolean LOG_SEMAPHORE_ACTIONS = false; - - public volatile double runTimeRatio; + public final ConfigEntry runTimeRatioConfig = Config.Common.MultiThreading.threadRunTimeRatio; /** When this thread started running its last task */ - private final ThreadLocal runStartNanoTimeRef = ThreadLocal.withInitial(() -> -1L); - /** How long it took this thread to run its last task */ - private final ThreadLocal lastRunDurationNanoTimeRef = ThreadLocal.withInitial(() -> -1L); - - /** - * Does nothing if {@link RateLimitedThreadPoolExecutor#threadSemaphore} - * is null.
- * Higher numbers have higher priority. - */ - public final int priority; - /** if null this thread pool will run independently of other pools */ - @Nullable - public final PrioritySemaphore threadSemaphore; - /** will always be zero if no semaphore is present */ - public final AtomicInteger semaphoresAcquired = new AtomicInteger(0); - - private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200); + private final ThreadLocal runStartTime = ThreadLocal.withInitial(() -> -1L); @@ -69,18 +46,12 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements // constructors // //==============// - public RateLimitedThreadPoolExecutor( - int corePoolSize, double runTimeRatio, ThreadFactory threadFactory, - @Nullable PrioritySemaphore prioritySemaphore, int priority) + public RateLimitedThreadPoolExecutor(int poolSize, ThreadFactory threadFactory, BlockingQueue workQueue) { - super(corePoolSize, corePoolSize, + super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), // TODO using a PriorityBlockingQueue would be nice to allow for prioritizing tasks, but then all tasks must be Comparable + workQueue, threadFactory); - - this.runTimeRatio = runTimeRatio; - this.priority = priority; - this.threadSemaphore = prioritySemaphore; } @@ -92,36 +63,7 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements @Override protected void beforeExecute(Thread thread, Runnable runnable) { - long deltaMs = TimeUnit.NANOSECONDS.toMillis(this.lastRunDurationNanoTimeRef.get()); - this.runTimeInMsRollingAverage.addValue(deltaMs); - - if (this.runTimeRatio < 1.0 && this.lastRunDurationNanoTimeRef.get() != -1) - { - try - { - Thread.sleep((long) (deltaMs / this.runTimeRatio - deltaMs)); - } - catch (InterruptedException ignored) { } - } - - if (this.threadSemaphore != null) - { - try - { - // Warning, this can cause deadlock if one thread calls another. - this.threadSemaphore.acquire(this); - this.semaphoresAcquired.getAndAdd(1); - - if (LOG_SEMAPHORE_ACTIONS) - { - LOGGER.debug("acquired, available count: ["+this.threadSemaphore.availablePermits()+"]"); - } - } - catch (InterruptedException ignore) { } - } - - - this.runStartNanoTimeRef.set(System.nanoTime()); + this.runStartTime.set(System.nanoTime()); super.beforeExecute(thread, runnable); } @@ -131,18 +73,14 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements { super.afterExecute(runnable, throwable); - this.lastRunDurationNanoTimeRef.set(System.nanoTime() - this.runStartNanoTimeRef.get()); - - - if (this.threadSemaphore != null) + try { - this.threadSemaphore.release(); - this.semaphoresAcquired.getAndAdd(-1); - - if (LOG_SEMAPHORE_ACTIONS) - { - LOGGER.debug("released, available count: ["+this.threadSemaphore.availablePermits()+"]"); - } + long runTime = System.nanoTime() - this.runStartTime.get(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis((long) (runTime / this.runTimeRatioConfig.get() - runTime))); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); } } @@ -150,15 +88,6 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements protected void terminated() { super.terminated(); - - // release all held semaphores (shouldn't normally be necessary, but just in case) - if (this.threadSemaphore != null) - { - if (LOG_SEMAPHORE_ACTIONS) - { - LOGGER.info("terminated, released ["+this.semaphoresAcquired+"], available count: ["+this.threadSemaphore.availablePermits()+"]"); - } - } } /** @@ -175,28 +104,4 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements super.purge(); } - - - //==============// - // running time // - //==============// - - /** will return Nan if nothing has been submitted yet */ - public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); } - - - - //============// - // comparison // - //============// - - @Override - public int compareTo(@NotNull RateLimitedThreadPoolExecutor other) - { - // highest number first - return Integer.compare(other.priority, this.priority); - } - - - } \ No newline at end of file diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java index e90a043a3..3d697f9aa 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java @@ -19,12 +19,10 @@ package com.seibel.distanthorizons.core.util.threading; -import com.seibel.distanthorizons.core.config.Config; -import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener; import com.seibel.distanthorizons.core.util.ThreadUtil; import org.jetbrains.annotations.Nullable; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; /** * Holds each thread pool the system uses. @@ -40,20 +38,20 @@ public class ThreadPoolUtil // standalone thread pools all handle independent systems // and don't interfere with any other pool - public static final DhThreadFactory FILE_HANDLER_THREAD_FACTORY = new DhThreadFactory("File Handler", Thread.MIN_PRIORITY, false); - private static ConfigThreadPool fileHandlerThreadPool; - @Nullable - public static ThreadPoolExecutor getFileHandlerExecutor() { return fileHandlerThreadPool.executor; } + private static PriorityTaskPicker taskPicker; - public static final DhThreadFactory UPDATE_PROPAGATOR_THREAD_FACTORY = new DhThreadFactory("LOD Update Propagator", Thread.MIN_PRIORITY, false); - private static ConfigThreadPool updatePropagatorThreadPool; + private static PriorityTaskPicker.Executor fileHandlerThreadPool; @Nullable - public static ThreadPoolExecutor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool.executor; } + public static PriorityTaskPicker.Executor getFileHandlerExecutor() { return fileHandlerThreadPool; } + + private static PriorityTaskPicker.Executor updatePropagatorThreadPool; + @Nullable + public static PriorityTaskPicker.Executor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool; } public static final DhThreadFactory WORLD_GEN_THREAD_FACTORY = new DhThreadFactory("World Gen", Thread.MIN_PRIORITY, false); - private static ConfigThreadPool worldGenThreadPool; + private static PriorityTaskPicker.Executor worldGenThreadPool; @Nullable - public static ThreadPoolExecutor getWorldGenExecutor() { return worldGenThreadPool.executor; } + public static PriorityTaskPicker.Executor getWorldGenExecutor() { return worldGenThreadPool; } public static final String CLEANUP_THREAD_NAME = "Cleanup"; private static ThreadPoolExecutor cleanupThreadPool; @@ -65,10 +63,9 @@ public class ThreadPoolUtil @Nullable public static ThreadPoolExecutor getBeaconCullingExecutor() { return beaconCullingThreadPool; } - public static final DhThreadFactory NETWORK_COMPRESSION_THREAD_FACTORY = new DhThreadFactory("Network Compression", Thread.MIN_PRIORITY, false); - private static ConfigThreadPool networkCompressionThreadPool; + private static PriorityTaskPicker.Executor networkCompressionThreadPool; @Nullable - public static ThreadPoolExecutor getNetworkCompressionExecutor() { return networkCompressionThreadPool.executor; } + public static PriorityTaskPicker.Executor getNetworkCompressionExecutor() { return networkCompressionThreadPool; } @@ -78,18 +75,9 @@ public class ThreadPoolUtil public static ThreadPoolExecutor getFullDataMigrationExecutor() { return fullDataMigrationThreadPool; } - public static final DhThreadFactory CHUNK_TO_LOD_BUILDER_THREAD_FACTORY = new DhThreadFactory("LOD Builder - Chunk to Lod Builder", Thread.MIN_PRIORITY, false); - private static ConfigThreadPool chunkToLodBuilderThreadPool; + private static PriorityTaskPicker.Executor chunkToLodBuilderThreadPool; @Nullable - public static ThreadPoolExecutor getChunkToLodBuilderExecutor() { return (chunkToLodBuilderThreadPool != null) ? chunkToLodBuilderThreadPool.executor : null; } - - - - /** how many total threads can be used */ - private static int threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get(); - public static int getThreadCount() { return threadSemaphoreCount; } - private static PrioritySemaphore threadSemaphore = null; - private static ConfigChangeListener threadSemaphoreConfigListener = null; + public static PriorityTaskPicker.Executor getChunkToLodBuilderExecutor() { return chunkToLodBuilderThreadPool; } @@ -99,33 +87,19 @@ public class ThreadPoolUtil public static void setupThreadPools() { - // create thread semaphore - threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get(); - threadSemaphore = new PrioritySemaphore(threadSemaphoreCount); - threadSemaphoreConfigListener = new ConfigChangeListener<>(Config.Common.MultiThreading.numberOfThreads, (val) -> - { - threadSemaphore.changePermitCount(val); - threadSemaphoreCount = val; - }); - - - // thread pools - chunkToLodBuilderThreadPool = new ConfigThreadPool(CHUNK_TO_LOD_BUILDER_THREAD_FACTORY, - Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, - threadSemaphore, 3); // We want to make sure any chunk changes are found - fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY, - Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, - threadSemaphore, 2); // loading in new LODs is second highest priority - updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY, - Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, - threadSemaphore, 1); // update propagation needs to be slightly higher priority than world gen - worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY, - Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, - threadSemaphore, 0); // higher priorities mean the threads will run first - networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY, - Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, - threadSemaphore, 0); // networking can probably have similar priority to world gen since they work similarly + taskPicker = new PriorityTaskPicker(); + + // IO should never be stuck waiting for something else to complete + networkCompressionThreadPool = taskPicker.createExecutor(3); + fileHandlerThreadPool = taskPicker.createExecutor(3); + + // Normal priority tasks + chunkToLodBuilderThreadPool = taskPicker.createExecutor(2); + updatePropagatorThreadPool = taskPicker.createExecutor(2); + + // World gen tasks are heavy and nothing strictly depends on them, so it may wait a bit + worldGenThreadPool = taskPicker.createExecutor(1); @@ -139,22 +113,10 @@ public class ThreadPoolUtil public static void shutdownThreadPools() { // standalone threads - fileHandlerThreadPool.shutdownExecutorService(); - updatePropagatorThreadPool.shutdownExecutorService(); - worldGenThreadPool.shutdownExecutorService(); - networkCompressionThreadPool.shutdownExecutorService(); + taskPicker.shutdown(); cleanupThreadPool.shutdown(); beaconCullingThreadPool.shutdown(); fullDataMigrationThreadPool.shutdown(); - chunkToLodBuilderThreadPool.shutdownExecutorService(); - - threadSemaphore = null; - - if (threadSemaphoreConfigListener != null) - { - threadSemaphoreConfigListener.close(); - threadSemaphoreConfigListener = null; - } } }