Refactor thread pool handling
This commit is contained in:
@@ -34,6 +34,7 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
|
||||
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
|
||||
import com.seibel.distanthorizons.core.util.LodUtil;
|
||||
import com.seibel.distanthorizons.core.util.objects.Pair;
|
||||
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
|
||||
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
|
||||
import com.seibel.distanthorizons.core.world.*;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.chunk.IChunkWrapper;
|
||||
@@ -344,8 +345,8 @@ public class SharedApi
|
||||
// queue updates up to the number of CPU cores allocated for the job
|
||||
// (this prevents doing extra work queuing tasks that may not be necessary)
|
||||
// and makes sure the chunks closest to the player are updated first
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
if (executor != null && executor.getQueue().size() < executor.getCorePoolSize())
|
||||
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
if (executor != null && executor.getQueueSize() < executor.getPoolSize())
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -432,7 +433,7 @@ public class SharedApi
|
||||
finally
|
||||
{
|
||||
// queue the next position if there are still positions to process
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
if (executor != null && !UPDATE_POS_MANAGER.updateDataByChunkPos.isEmpty())
|
||||
{
|
||||
try
|
||||
|
||||
+2
-2
@@ -114,7 +114,7 @@ public abstract class AbstractDataSourceHandler
|
||||
*/
|
||||
public CompletableFuture<TDataSource> getAsync(long pos)
|
||||
{
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
if (executor == null || executor.isTerminated())
|
||||
{
|
||||
return CompletableFuture.completedFuture(null);
|
||||
@@ -188,7 +188,7 @@ public abstract class AbstractDataSourceHandler
|
||||
|
||||
public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource)
|
||||
{
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
if (executor == null || executor.isTerminated())
|
||||
{
|
||||
return CompletableFuture.completedFuture(null);
|
||||
|
||||
+2
-1
@@ -18,6 +18,7 @@ import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
@@ -96,7 +97,7 @@ public class FullDataSourceProviderV1<TDhLevel extends IDhLevel>
|
||||
*/
|
||||
public CompletableFuture<FullDataSourceV1> getAsync(long pos)
|
||||
{
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
if (executor == null || executor.isTerminated())
|
||||
{
|
||||
return CompletableFuture.completedFuture(null);
|
||||
|
||||
+3
-3
@@ -40,7 +40,7 @@ import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
|
||||
import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo;
|
||||
import com.seibel.distanthorizons.core.util.ThreadUtil;
|
||||
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
|
||||
import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore;
|
||||
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
|
||||
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
import it.unimi.dsi.fastutil.longs.LongArrayList;
|
||||
@@ -209,7 +209,7 @@ public class FullDataSourceProviderV2
|
||||
{
|
||||
Thread.sleep(UPDATE_QUEUE_THREAD_DELAY_IN_MS);
|
||||
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getUpdatePropagatorExecutor();
|
||||
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getUpdatePropagatorExecutor();
|
||||
if (executor == null || executor.isTerminated())
|
||||
{
|
||||
continue;
|
||||
@@ -226,7 +226,7 @@ public class FullDataSourceProviderV2
|
||||
}
|
||||
|
||||
// queue parent updates
|
||||
if (executor.getQueue().size() < MAX_UPDATE_TASK_COUNT
|
||||
if (executor.getQueueSize() < MAX_UPDATE_TASK_COUNT
|
||||
&& this.parentUpdatingPosSet.size() < MAX_UPDATE_TASK_COUNT)
|
||||
{
|
||||
// get the positions that need to be applied to their parents
|
||||
|
||||
+5
-4
@@ -36,6 +36,7 @@ import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
|
||||
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
|
||||
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
|
||||
import com.seibel.distanthorizons.core.util.LodUtil;
|
||||
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
|
||||
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
|
||||
import com.seibel.distanthorizons.coreapi.util.BitShiftUtil;
|
||||
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
|
||||
@@ -195,16 +196,16 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
|
||||
}
|
||||
|
||||
|
||||
ThreadPoolExecutor updateExecutor = ThreadPoolUtil.getUpdatePropagatorExecutor();
|
||||
if (updateExecutor == null || updateExecutor.getQueue().size() >= MAX_UPDATE_TASK_COUNT / 2)
|
||||
PriorityTaskPicker.Executor updateExecutor = ThreadPoolUtil.getUpdatePropagatorExecutor();
|
||||
if (updateExecutor == null || updateExecutor.getQueueSize() >= MAX_UPDATE_TASK_COUNT / 2)
|
||||
{
|
||||
// don't queue additional world gen requests if the updater is behind
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
ThreadPoolExecutor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
if (fileExecutor == null || fileExecutor.getQueue().size() >= MAX_UPDATE_TASK_COUNT / 2)
|
||||
PriorityTaskPicker.Executor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
if (fileExecutor == null || fileExecutor.getQueueSize() >= MAX_UPDATE_TASK_COUNT / 2)
|
||||
{
|
||||
// don't queue additional world gen requests if the file handler is overwhelmed,
|
||||
// otherwise LODs may not load in properly
|
||||
|
||||
+4
-3
@@ -44,6 +44,7 @@ import com.seibel.distanthorizons.core.util.ThreadUtil;
|
||||
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
|
||||
import com.seibel.distanthorizons.core.util.objects.UncheckedInterruptedException;
|
||||
import com.seibel.distanthorizons.core.util.LodUtil;
|
||||
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
|
||||
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
|
||||
import com.seibel.distanthorizons.core.world.DhApiWorldProxy;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.IWrapperFactory;
|
||||
@@ -229,7 +230,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
|
||||
}
|
||||
public boolean isGeneratorBusy()
|
||||
{
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getWorldGenExecutor();
|
||||
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getWorldGenExecutor();
|
||||
if (executor == null)
|
||||
{
|
||||
// shouldn't happen, but just in case, don't queue more tasks
|
||||
@@ -238,7 +239,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
|
||||
|
||||
int worldGenThreadCount = Math.max(Config.Common.MultiThreading.numberOfThreads.get(), 1);
|
||||
int maxWorldGenTaskCount = worldGenThreadCount * MAX_QUEUED_TASKS_PER_THREAD;
|
||||
return executor.getQueue().size() > maxWorldGenTaskCount;
|
||||
return executor.getQueueSize() > maxWorldGenTaskCount;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -567,7 +568,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
|
||||
try
|
||||
{
|
||||
int waitTimeInSeconds = 3;
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getWorldGenExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getWorldGenExecutor();
|
||||
if (executor != null && !executor.awaitTermination(waitTimeInSeconds, TimeUnit.SECONDS))
|
||||
{
|
||||
LOGGER.warn("World generator thread pool shutdown didn't complete after [" + waitTimeInSeconds + "] seconds. Some world generator requests may still be running.");
|
||||
|
||||
@@ -28,7 +28,7 @@ import com.seibel.distanthorizons.core.pooling.PhantomArrayListPool;
|
||||
import com.seibel.distanthorizons.core.pos.DhSectionPos;
|
||||
import com.seibel.distanthorizons.core.render.RenderBufferHandler;
|
||||
import com.seibel.distanthorizons.core.render.renderer.generic.GenericObjectRenderer;
|
||||
import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor;
|
||||
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
|
||||
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
|
||||
import com.seibel.distanthorizons.core.world.AbstractDhWorld;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
@@ -80,11 +80,11 @@ public class F3Screen
|
||||
public static void addStringToDisplay(List<String> messageList)
|
||||
{
|
||||
// multi thread pools
|
||||
ThreadPoolExecutor worldGenPool = ThreadPoolUtil.getWorldGenExecutor();
|
||||
ThreadPoolExecutor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
ThreadPoolExecutor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor();
|
||||
ThreadPoolExecutor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
ThreadPoolExecutor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
PriorityTaskPicker.Executor worldGenPool = ThreadPoolUtil.getWorldGenExecutor();
|
||||
PriorityTaskPicker.Executor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
PriorityTaskPicker.Executor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor();
|
||||
PriorityTaskPicker.Executor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor();
|
||||
PriorityTaskPicker.Executor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
|
||||
// single thread pools
|
||||
ThreadPoolExecutor cleanupPool = ThreadPoolUtil.getCleanupExecutor();
|
||||
@@ -191,25 +191,23 @@ public class F3Screen
|
||||
// helper methods //
|
||||
//================//
|
||||
|
||||
private static String getThreadPoolStatString(String name, ThreadPoolExecutor pool)
|
||||
private static String getThreadPoolStatString(String name, PriorityTaskPicker.Executor pool)
|
||||
{
|
||||
String queueSize = (pool != null) ? NUMBER_FORMAT.format(pool.getQueue().size()) : "-";
|
||||
String queueSize = (pool != null) ? NUMBER_FORMAT.format(pool.getQueueSize()) : "-";
|
||||
String completedCount = (pool != null) ? NUMBER_FORMAT.format(pool.getCompletedTaskCount()) : "-";
|
||||
|
||||
String message = name+", Tasks: "+queueSize+", Done: "+completedCount;
|
||||
|
||||
if (pool != null && pool.getClass() == RateLimitedThreadPoolExecutor.class)
|
||||
if (pool != null)
|
||||
{
|
||||
RateLimitedThreadPoolExecutor rateLimitedPool = ((RateLimitedThreadPoolExecutor) pool);
|
||||
|
||||
// active threads
|
||||
int activeThreadCount = rateLimitedPool.semaphoresAcquired.get();
|
||||
int threadCount = ThreadPoolUtil.getThreadCount();
|
||||
int activeThreadCount = pool.getRunningTaskCount();
|
||||
int threadCount = pool.getPoolSize();
|
||||
message += ", Active: "+activeThreadCount+"/"+threadCount;
|
||||
|
||||
// thread runtime
|
||||
String runTimeAvgStr;
|
||||
double runTimeAvgInMs = rateLimitedPool.getAverageRunTimeInMs();
|
||||
double runTimeAvgInMs = pool.getAverageRunTimeInMs();
|
||||
if (!Double.isNaN(runTimeAvgInMs))
|
||||
{
|
||||
runTimeAvgStr = NUMBER_FORMAT.format(runTimeAvgInMs);
|
||||
|
||||
+1
-1
@@ -228,7 +228,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
|
||||
{
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(response.payload);
|
||||
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
if (executor == null)
|
||||
{
|
||||
LOGGER.warn("Unable to handle FullDataPayload - getNetworkCompressionExecutor() is null");
|
||||
|
||||
+3
-6
@@ -16,10 +16,7 @@ import org.apache.logging.log4j.LogManager;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class FullDataSourceRequestHandler
|
||||
@@ -75,7 +72,7 @@ public class FullDataSourceRequestHandler
|
||||
}
|
||||
|
||||
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
if (executor == null)
|
||||
{
|
||||
// shouldn't normally happen, but just in case
|
||||
@@ -195,7 +192,7 @@ public class FullDataSourceRequestHandler
|
||||
continue;
|
||||
}
|
||||
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
if (executor == null)
|
||||
{
|
||||
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
|
||||
|
||||
@@ -38,6 +38,7 @@ import com.seibel.distanthorizons.core.render.glObject.GLProxy;
|
||||
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
|
||||
import com.seibel.distanthorizons.core.dataObjects.render.bufferBuilding.ColumnRenderBuffer;
|
||||
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
|
||||
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
|
||||
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
import it.unimi.dsi.fastutil.longs.LongArrayList;
|
||||
@@ -132,7 +133,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
|
||||
return;
|
||||
}
|
||||
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor();
|
||||
if (executor == null || executor.isTerminated())
|
||||
{
|
||||
return;
|
||||
@@ -140,7 +141,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
|
||||
|
||||
// don't queue up an infinite number of tasks
|
||||
// doing so will cause memory use to balloon when swapping between dimensions
|
||||
if (executor.getQueue().size() > executor.getPoolSize())
|
||||
if (executor.getQueueSize() > executor.getPoolSize())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -19,11 +19,7 @@
|
||||
|
||||
package com.seibel.distanthorizons.core.util;
|
||||
|
||||
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
|
||||
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
|
||||
import com.seibel.distanthorizons.core.util.threading.DhThreadFactory;
|
||||
import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore;
|
||||
import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor;
|
||||
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
|
||||
import com.seibel.distanthorizons.coreapi.ModInfo;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@@ -43,55 +39,10 @@ public class ThreadUtil
|
||||
|
||||
public static final String THREAD_NAME_PREFIX = ModInfo.THREAD_NAME_PREFIX;
|
||||
|
||||
/** used to track and remove old listeners for certain pools if the thread pool is recreated. */
|
||||
private static final ConcurrentHashMap<String, ConfigChangeListener<Double>> THREAD_CHANGE_LISTENERS_BY_THREAD_NAME = new ConcurrentHashMap<>();
|
||||
|
||||
// TODO move all "Runtime.getRuntime().availableProcessors()" calls here
|
||||
|
||||
|
||||
|
||||
//===================//
|
||||
// rate limited pool //
|
||||
//===================//
|
||||
|
||||
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, DhThreadFactory threadFactory, ConfigEntry<Double> runTimeRatioConfigEntry, PrioritySemaphore activeThreadCountSemaphore, int priority)
|
||||
{
|
||||
// remove the old listener if one exists
|
||||
if (THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.containsKey(threadFactory.threadName))
|
||||
{
|
||||
// note: this assumes only one thread pool exists with a given name
|
||||
THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.get(threadFactory.threadName).close();
|
||||
THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.remove(threadFactory.threadName);
|
||||
}
|
||||
|
||||
if (!threadFactory.threadName.startsWith(THREAD_NAME_PREFIX))
|
||||
{
|
||||
// this will only happen if a ThreadFactory is passed in that doesn't have the correct thread name
|
||||
LOGGER.warn("Thread pool with the name ["+threadFactory.threadName+"] is missing the expected Distant Horizons thread prefix ["+THREAD_NAME_PREFIX+"].");
|
||||
}
|
||||
|
||||
|
||||
RateLimitedThreadPoolExecutor executor = makeRateLimitedThreadPool(poolSize, runTimeRatioConfigEntry.get(), threadFactory, activeThreadCountSemaphore, priority);
|
||||
|
||||
ConfigChangeListener<Double> changeListener = new ConfigChangeListener<>(runTimeRatioConfigEntry, (newRunTimeRatio) -> { executor.runTimeRatio = newRunTimeRatio; });
|
||||
THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.put(threadFactory.threadName, changeListener);
|
||||
|
||||
return executor;
|
||||
}
|
||||
|
||||
|
||||
/** should only be used if there isn't a config controlling the run time ratio of this thread pool */
|
||||
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, String name, Double runTimeRatio, int threadPriority, PrioritySemaphore activeThreadCountSemaphore, int priority)
|
||||
{
|
||||
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, new DhThreadFactory(name, threadPriority, false), activeThreadCountSemaphore, priority);
|
||||
}
|
||||
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, Double runTimeRatio, DhThreadFactory threadFactory, PrioritySemaphore activeThreadCountSemaphore, int priority)
|
||||
{
|
||||
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, threadFactory, activeThreadCountSemaphore, priority);
|
||||
}
|
||||
|
||||
|
||||
|
||||
//===============//
|
||||
// standard pool //
|
||||
//===============//
|
||||
|
||||
-113
@@ -1,113 +0,0 @@
|
||||
/*
|
||||
* This file is part of the Distant Horizons mod
|
||||
* licensed under the GNU LGPL v3 License.
|
||||
*
|
||||
* Copyright (C) 2020-2023 James Seibel
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License as published by
|
||||
* the Free Software Foundation, version 3.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.seibel.distanthorizons.core.util.threading;
|
||||
|
||||
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
|
||||
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
|
||||
import com.seibel.distanthorizons.core.util.ThreadUtil;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
|
||||
/**
|
||||
* Handles thread pools with config values for their
|
||||
* thread count and run time ratio.
|
||||
*/
|
||||
public class ConfigThreadPool
|
||||
{
|
||||
/** Caution must be used to prevent deadlock */
|
||||
private final PrioritySemaphore threadSemaphore;
|
||||
/** higher numbers run first */
|
||||
private final int priority;
|
||||
|
||||
public RateLimitedThreadPoolExecutor executor = null;
|
||||
private int threadCount = 0;
|
||||
|
||||
public final DhThreadFactory threadFactory;
|
||||
|
||||
public final ConfigChangeListener<Integer> threadCountConfigListener;
|
||||
public final ConfigEntry<Integer> threadCountConfig;
|
||||
public final ConfigEntry<Double> runTimeRatioConfig;
|
||||
|
||||
|
||||
|
||||
//=============//
|
||||
// constructor //
|
||||
//=============//
|
||||
|
||||
public ConfigThreadPool(DhThreadFactory threadFactory, ConfigEntry<Integer> threadCountConfig, ConfigEntry<Double> runTimeRatioConfig, PrioritySemaphore threadSemaphore, int priority)
|
||||
{
|
||||
this.threadFactory = threadFactory;
|
||||
this.threadSemaphore = threadSemaphore;
|
||||
this.priority = priority;
|
||||
|
||||
this.threadCountConfig = threadCountConfig;
|
||||
this.threadCountConfigListener = new ConfigChangeListener<>(threadCountConfig,
|
||||
(threadCount) -> { this.setThreadPoolSize(threadCount); });
|
||||
this.runTimeRatioConfig = runTimeRatioConfig;
|
||||
|
||||
this.setThreadPoolSize(threadCountConfig.get());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
//==============//
|
||||
// thread setup //
|
||||
//==============//
|
||||
|
||||
public void setThreadPoolSize(int threadPoolSize)
|
||||
{
|
||||
Queue<Runnable> incompleteRunnableQueue = null;
|
||||
if (this.executor != null)
|
||||
{
|
||||
// close the previous thread pool if one exists
|
||||
this.executor.shutdown(); // don't do shutdown now since we don't want to throw any interrupt exceptions
|
||||
incompleteRunnableQueue = this.executor.getQueue();
|
||||
}
|
||||
|
||||
this.threadCount = threadPoolSize;
|
||||
this.executor = ThreadUtil.makeRateLimitedThreadPool(this.threadCount, this.threadFactory, this.runTimeRatioConfig, this.threadSemaphore, this.priority);
|
||||
|
||||
if (incompleteRunnableQueue != null)
|
||||
{
|
||||
for (Runnable runnable : incompleteRunnableQueue)
|
||||
{
|
||||
this.executor.execute(runnable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops any executing tasks and destroys the executor. <br>
|
||||
* Does nothing if the executor isn't running.
|
||||
*/
|
||||
public void shutdownExecutorService()
|
||||
{
|
||||
if (this.executor != null)
|
||||
{
|
||||
this.executor.shutdownNow();
|
||||
}
|
||||
|
||||
this.threadCount = 0;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
-164
@@ -1,164 +0,0 @@
|
||||
package com.seibel.distanthorizons.core.util.threading;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* For use with {@link RateLimitedThreadPoolExecutor}
|
||||
*/
|
||||
public class PrioritySemaphore
|
||||
{
|
||||
public int maxPermitCount;
|
||||
public int currentPermitCount;
|
||||
|
||||
private final PriorityBlockingQueue<ThreadWithPriority> queue = new PriorityBlockingQueue<>();
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private final Random random = new Random();
|
||||
|
||||
|
||||
|
||||
//=============//
|
||||
// constructor //
|
||||
//=============//
|
||||
|
||||
public PrioritySemaphore(int permits)
|
||||
{
|
||||
this.maxPermitCount = permits;
|
||||
this.currentPermitCount = this.maxPermitCount;
|
||||
}
|
||||
|
||||
|
||||
|
||||
//==================//
|
||||
// permit acquiring //
|
||||
//==================//
|
||||
|
||||
/** Similar to {@link Semaphore#acquire()} */
|
||||
public void acquire(RateLimitedThreadPoolExecutor executor) throws InterruptedException
|
||||
{
|
||||
Thread thread = Thread.currentThread();
|
||||
this.lock.lock();
|
||||
try
|
||||
{
|
||||
if (this.currentPermitCount > 0)
|
||||
{
|
||||
// a permit is available,
|
||||
// this thread can run normally
|
||||
this.currentPermitCount--;
|
||||
|
||||
// return to prevent running the thread's wait() method below
|
||||
return;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
this.lock.unlock();
|
||||
}
|
||||
|
||||
|
||||
// no permit is available
|
||||
// this has to be outside the try-finally to prevent holding the lock while waiting
|
||||
synchronized (thread)
|
||||
{
|
||||
// Calculation rules:
|
||||
// - Executors with higher priority need less tasks to run before other executors
|
||||
// If one executor has the priority of 3 and other if of 4,
|
||||
// the latter one will need 1/4 fewer tasks in queue to get its tasks running
|
||||
// - Executors with short-lived tasks run before longer lived ones
|
||||
// 100k value is a multiplier to prevent precision loss
|
||||
int priority = (int) ((executor.priority + 1) * executor.getTaskCount() * 100000 / executor.getAverageRunTimeInMs());
|
||||
|
||||
// this thread will be run when a permit is available
|
||||
this.queue.put(new ThreadWithPriority(thread, priority));
|
||||
|
||||
thread.wait();
|
||||
}
|
||||
}
|
||||
|
||||
/** Similar to {@link Semaphore#release()} */
|
||||
public void release()
|
||||
{
|
||||
this.lock.lock();
|
||||
try
|
||||
{
|
||||
// wake up the nex thread if one is queued
|
||||
if (!this.queue.isEmpty())
|
||||
{
|
||||
Thread nextThread = this.queue.poll().thread;
|
||||
synchronized (nextThread)
|
||||
{
|
||||
nextThread.notify();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
this.currentPermitCount++;
|
||||
// don't increase past the max allowed (this can happen when changing the max permit count)
|
||||
this.currentPermitCount = Math.min(this.currentPermitCount, this.maxPermitCount);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
this.lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
//=================//
|
||||
// permit changing //
|
||||
//=================//
|
||||
|
||||
public void changePermitCount(int val)
|
||||
{
|
||||
// find the max number of permits to increase by
|
||||
int permitCountIncrease = Math.max(0, val - this.maxPermitCount);
|
||||
|
||||
this.lock.lock();
|
||||
try
|
||||
{
|
||||
this.currentPermitCount += permitCountIncrease;
|
||||
this.maxPermitCount = val;
|
||||
}
|
||||
finally
|
||||
{
|
||||
this.lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public int availablePermits() { return this.currentPermitCount; }
|
||||
|
||||
|
||||
|
||||
//================//
|
||||
// helper classes //
|
||||
//================//
|
||||
|
||||
/** simple sortable container to track a thread and it's priority */
|
||||
private static class ThreadWithPriority implements Comparable<ThreadWithPriority>
|
||||
{
|
||||
private final Thread thread;
|
||||
private final int priority;
|
||||
|
||||
public ThreadWithPriority(Thread thread, int priority)
|
||||
{
|
||||
this.thread = thread;
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull ThreadWithPriority other)
|
||||
{
|
||||
// highest number first
|
||||
return Integer.compare(other.priority, this.priority);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
+154
@@ -0,0 +1,154 @@
|
||||
package com.seibel.distanthorizons.core.util.threading;
|
||||
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
|
||||
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class PriorityTaskPicker
|
||||
{
|
||||
private final ConfigEntry<Integer> threadCountConfig = Config.Common.MultiThreading.numberOfThreads;
|
||||
|
||||
private final RateLimitedThreadPoolExecutor threadPoolExecutor = new RateLimitedThreadPoolExecutor(
|
||||
this.threadCountConfig.getMax(),
|
||||
new DhThreadFactory("PriorityTaskPicker", Thread.MIN_PRIORITY, false),
|
||||
new ArrayBlockingQueue<>(this.threadCountConfig.getMax())
|
||||
);
|
||||
|
||||
private final ArrayList<Executor> executorQueue = new ArrayList<>();
|
||||
private int nextExecutorQueuePos = 0;
|
||||
|
||||
private final ReentrantLock taskPickerLock = new ReentrantLock();
|
||||
private final AtomicBoolean shouldPickTask = new AtomicBoolean(false);
|
||||
private final AtomicInteger occupiedThreads = new AtomicInteger(0);
|
||||
|
||||
|
||||
public Executor createExecutor(int priority)
|
||||
{
|
||||
Executor executor = new Executor();
|
||||
|
||||
int entriesToAdd = priority + 1;
|
||||
int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size());
|
||||
|
||||
for (; entriesToAdd > 0; entriesToAdd--)
|
||||
{
|
||||
this.executorQueue.add(executor);
|
||||
Collections.rotate(this.executorQueue, -gapBetweenEntries);
|
||||
}
|
||||
|
||||
return executor;
|
||||
}
|
||||
|
||||
private void tryStartNextTask()
|
||||
{
|
||||
this.shouldPickTask.set(true);
|
||||
|
||||
while (this.taskPickerLock.tryLock())
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!this.shouldPickTask.compareAndSet(true, false))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
int threadCount = this.threadCountConfig.get();
|
||||
|
||||
for (
|
||||
int counter = 0;
|
||||
counter < this.executorQueue.size() && this.occupiedThreads.get() < threadCount;
|
||||
counter++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size()
|
||||
)
|
||||
{
|
||||
Executor executor = this.executorQueue.get(this.nextExecutorQueuePos);
|
||||
|
||||
Runnable task = executor.tasks.poll();
|
||||
if (task != null)
|
||||
{
|
||||
this.occupiedThreads.getAndIncrement();
|
||||
executor.runningTasks.getAndIncrement();
|
||||
counter--;
|
||||
|
||||
this.threadPoolExecutor.execute(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
this.taskPickerLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown()
|
||||
{
|
||||
this.threadPoolExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
|
||||
public class Executor extends AbstractExecutorService
|
||||
{
|
||||
private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private final AtomicInteger runningTasks = new AtomicInteger(0);
|
||||
private final AtomicInteger completedTasks = new AtomicInteger(0);
|
||||
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
|
||||
|
||||
|
||||
@Override
|
||||
public void execute(@NotNull Runnable command)
|
||||
{
|
||||
this.tasks.add(() -> {
|
||||
long startTime = System.nanoTime();
|
||||
try
|
||||
{
|
||||
command.run();
|
||||
}
|
||||
finally
|
||||
{
|
||||
long timeElapsed = System.nanoTime() - startTime;
|
||||
this.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed));
|
||||
|
||||
PriorityTaskPicker.this.occupiedThreads.getAndDecrement();
|
||||
this.runningTasks.getAndDecrement();
|
||||
this.completedTasks.getAndIncrement();
|
||||
|
||||
PriorityTaskPicker.this.tryStartNextTask();
|
||||
}
|
||||
});
|
||||
|
||||
PriorityTaskPicker.this.tryStartNextTask();
|
||||
}
|
||||
|
||||
public int getQueueSize() { return this.tasks.size(); }
|
||||
public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); }
|
||||
|
||||
public int getRunningTaskCount() { return this.runningTasks.get(); }
|
||||
public int getCompletedTaskCount() { return this.completedTasks.get(); }
|
||||
/** will return Nan if nothing has been submitted yet */
|
||||
public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); }
|
||||
|
||||
|
||||
@Override
|
||||
public void shutdown() { throw new UnsupportedOperationException(); }
|
||||
|
||||
@Override
|
||||
public @NotNull List<Runnable> shutdownNow() { throw new UnsupportedOperationException(); }
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() { return false; }
|
||||
@Override
|
||||
public boolean isTerminated() { return false; }
|
||||
|
||||
@Override
|
||||
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) { return false; }
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
+16
-111
@@ -19,49 +19,26 @@
|
||||
|
||||
package com.seibel.distanthorizons.core.util.threading;
|
||||
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
|
||||
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
|
||||
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Can be used to more finely control CPU usage and
|
||||
* reduce CPU usage if only 1 thread is already assigned.
|
||||
*/
|
||||
public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements Comparable<RateLimitedThreadPoolExecutor>
|
||||
public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
|
||||
{
|
||||
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
|
||||
/** logs include the thread name by default which can help diagnose deadlocks */
|
||||
private static final boolean LOG_SEMAPHORE_ACTIONS = false;
|
||||
|
||||
|
||||
public volatile double runTimeRatio;
|
||||
public final ConfigEntry<Double> runTimeRatioConfig = Config.Common.MultiThreading.threadRunTimeRatio;
|
||||
|
||||
/** When this thread started running its last task */
|
||||
private final ThreadLocal<Long> runStartNanoTimeRef = ThreadLocal.withInitial(() -> -1L);
|
||||
/** How long it took this thread to run its last task */
|
||||
private final ThreadLocal<Long> lastRunDurationNanoTimeRef = ThreadLocal.withInitial(() -> -1L);
|
||||
|
||||
/**
|
||||
* Does nothing if {@link RateLimitedThreadPoolExecutor#threadSemaphore}
|
||||
* is null. <br>
|
||||
* Higher numbers have higher priority.
|
||||
*/
|
||||
public final int priority;
|
||||
/** if null this thread pool will run independently of other pools */
|
||||
@Nullable
|
||||
public final PrioritySemaphore threadSemaphore;
|
||||
/** will always be zero if no semaphore is present */
|
||||
public final AtomicInteger semaphoresAcquired = new AtomicInteger(0);
|
||||
|
||||
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
|
||||
private final ThreadLocal<Long> runStartTime = ThreadLocal.withInitial(() -> -1L);
|
||||
|
||||
|
||||
|
||||
@@ -69,18 +46,12 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
|
||||
// constructors //
|
||||
//==============//
|
||||
|
||||
public RateLimitedThreadPoolExecutor(
|
||||
int corePoolSize, double runTimeRatio, ThreadFactory threadFactory,
|
||||
@Nullable PrioritySemaphore prioritySemaphore, int priority)
|
||||
public RateLimitedThreadPoolExecutor(int poolSize, ThreadFactory threadFactory, BlockingQueue<Runnable> workQueue)
|
||||
{
|
||||
super(corePoolSize, corePoolSize,
|
||||
super(poolSize, poolSize,
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<>(), // TODO using a PriorityBlockingQueue would be nice to allow for prioritizing tasks, but then all tasks must be Comparable
|
||||
workQueue,
|
||||
threadFactory);
|
||||
|
||||
this.runTimeRatio = runTimeRatio;
|
||||
this.priority = priority;
|
||||
this.threadSemaphore = prioritySemaphore;
|
||||
}
|
||||
|
||||
|
||||
@@ -92,36 +63,7 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
|
||||
@Override
|
||||
protected void beforeExecute(Thread thread, Runnable runnable)
|
||||
{
|
||||
long deltaMs = TimeUnit.NANOSECONDS.toMillis(this.lastRunDurationNanoTimeRef.get());
|
||||
this.runTimeInMsRollingAverage.addValue(deltaMs);
|
||||
|
||||
if (this.runTimeRatio < 1.0 && this.lastRunDurationNanoTimeRef.get() != -1)
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep((long) (deltaMs / this.runTimeRatio - deltaMs));
|
||||
}
|
||||
catch (InterruptedException ignored) { }
|
||||
}
|
||||
|
||||
if (this.threadSemaphore != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Warning, this can cause deadlock if one thread calls another.
|
||||
this.threadSemaphore.acquire(this);
|
||||
this.semaphoresAcquired.getAndAdd(1);
|
||||
|
||||
if (LOG_SEMAPHORE_ACTIONS)
|
||||
{
|
||||
LOGGER.debug("acquired, available count: ["+this.threadSemaphore.availablePermits()+"]");
|
||||
}
|
||||
}
|
||||
catch (InterruptedException ignore) { }
|
||||
}
|
||||
|
||||
|
||||
this.runStartNanoTimeRef.set(System.nanoTime());
|
||||
this.runStartTime.set(System.nanoTime());
|
||||
|
||||
super.beforeExecute(thread, runnable);
|
||||
}
|
||||
@@ -131,18 +73,14 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
|
||||
{
|
||||
super.afterExecute(runnable, throwable);
|
||||
|
||||
this.lastRunDurationNanoTimeRef.set(System.nanoTime() - this.runStartNanoTimeRef.get());
|
||||
|
||||
|
||||
if (this.threadSemaphore != null)
|
||||
try
|
||||
{
|
||||
this.threadSemaphore.release();
|
||||
this.semaphoresAcquired.getAndAdd(-1);
|
||||
|
||||
if (LOG_SEMAPHORE_ACTIONS)
|
||||
{
|
||||
LOGGER.debug("released, available count: ["+this.threadSemaphore.availablePermits()+"]");
|
||||
}
|
||||
long runTime = System.nanoTime() - this.runStartTime.get();
|
||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis((long) (runTime / this.runTimeRatioConfig.get() - runTime)));
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,15 +88,6 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
|
||||
protected void terminated()
|
||||
{
|
||||
super.terminated();
|
||||
|
||||
// release all held semaphores (shouldn't normally be necessary, but just in case)
|
||||
if (this.threadSemaphore != null)
|
||||
{
|
||||
if (LOG_SEMAPHORE_ACTIONS)
|
||||
{
|
||||
LOGGER.info("terminated, released ["+this.semaphoresAcquired+"], available count: ["+this.threadSemaphore.availablePermits()+"]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,28 +104,4 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
|
||||
super.purge();
|
||||
}
|
||||
|
||||
|
||||
|
||||
//==============//
|
||||
// running time //
|
||||
//==============//
|
||||
|
||||
/** will return Nan if nothing has been submitted yet */
|
||||
public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); }
|
||||
|
||||
|
||||
|
||||
//============//
|
||||
// comparison //
|
||||
//============//
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull RateLimitedThreadPoolExecutor other)
|
||||
{
|
||||
// highest number first
|
||||
return Integer.compare(other.priority, this.priority);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
+21
-65
@@ -19,12 +19,10 @@
|
||||
|
||||
package com.seibel.distanthorizons.core.util.threading;
|
||||
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
|
||||
import com.seibel.distanthorizons.core.util.ThreadUtil;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* Holds each thread pool the system uses.
|
||||
@@ -40,20 +38,20 @@ public class ThreadPoolUtil
|
||||
// standalone thread pools all handle independent systems
|
||||
// and don't interfere with any other pool
|
||||
|
||||
public static final DhThreadFactory FILE_HANDLER_THREAD_FACTORY = new DhThreadFactory("File Handler", Thread.MIN_PRIORITY, false);
|
||||
private static ConfigThreadPool fileHandlerThreadPool;
|
||||
@Nullable
|
||||
public static ThreadPoolExecutor getFileHandlerExecutor() { return fileHandlerThreadPool.executor; }
|
||||
private static PriorityTaskPicker taskPicker;
|
||||
|
||||
public static final DhThreadFactory UPDATE_PROPAGATOR_THREAD_FACTORY = new DhThreadFactory("LOD Update Propagator", Thread.MIN_PRIORITY, false);
|
||||
private static ConfigThreadPool updatePropagatorThreadPool;
|
||||
private static PriorityTaskPicker.Executor fileHandlerThreadPool;
|
||||
@Nullable
|
||||
public static ThreadPoolExecutor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool.executor; }
|
||||
public static PriorityTaskPicker.Executor getFileHandlerExecutor() { return fileHandlerThreadPool; }
|
||||
|
||||
private static PriorityTaskPicker.Executor updatePropagatorThreadPool;
|
||||
@Nullable
|
||||
public static PriorityTaskPicker.Executor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool; }
|
||||
|
||||
public static final DhThreadFactory WORLD_GEN_THREAD_FACTORY = new DhThreadFactory("World Gen", Thread.MIN_PRIORITY, false);
|
||||
private static ConfigThreadPool worldGenThreadPool;
|
||||
private static PriorityTaskPicker.Executor worldGenThreadPool;
|
||||
@Nullable
|
||||
public static ThreadPoolExecutor getWorldGenExecutor() { return worldGenThreadPool.executor; }
|
||||
public static PriorityTaskPicker.Executor getWorldGenExecutor() { return worldGenThreadPool; }
|
||||
|
||||
public static final String CLEANUP_THREAD_NAME = "Cleanup";
|
||||
private static ThreadPoolExecutor cleanupThreadPool;
|
||||
@@ -65,10 +63,9 @@ public class ThreadPoolUtil
|
||||
@Nullable
|
||||
public static ThreadPoolExecutor getBeaconCullingExecutor() { return beaconCullingThreadPool; }
|
||||
|
||||
public static final DhThreadFactory NETWORK_COMPRESSION_THREAD_FACTORY = new DhThreadFactory("Network Compression", Thread.MIN_PRIORITY, false);
|
||||
private static ConfigThreadPool networkCompressionThreadPool;
|
||||
private static PriorityTaskPicker.Executor networkCompressionThreadPool;
|
||||
@Nullable
|
||||
public static ThreadPoolExecutor getNetworkCompressionExecutor() { return networkCompressionThreadPool.executor; }
|
||||
public static PriorityTaskPicker.Executor getNetworkCompressionExecutor() { return networkCompressionThreadPool; }
|
||||
|
||||
|
||||
|
||||
@@ -78,18 +75,9 @@ public class ThreadPoolUtil
|
||||
public static ThreadPoolExecutor getFullDataMigrationExecutor() { return fullDataMigrationThreadPool; }
|
||||
|
||||
|
||||
public static final DhThreadFactory CHUNK_TO_LOD_BUILDER_THREAD_FACTORY = new DhThreadFactory("LOD Builder - Chunk to Lod Builder", Thread.MIN_PRIORITY, false);
|
||||
private static ConfigThreadPool chunkToLodBuilderThreadPool;
|
||||
private static PriorityTaskPicker.Executor chunkToLodBuilderThreadPool;
|
||||
@Nullable
|
||||
public static ThreadPoolExecutor getChunkToLodBuilderExecutor() { return (chunkToLodBuilderThreadPool != null) ? chunkToLodBuilderThreadPool.executor : null; }
|
||||
|
||||
|
||||
|
||||
/** how many total threads can be used */
|
||||
private static int threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get();
|
||||
public static int getThreadCount() { return threadSemaphoreCount; }
|
||||
private static PrioritySemaphore threadSemaphore = null;
|
||||
private static ConfigChangeListener<Integer> threadSemaphoreConfigListener = null;
|
||||
public static PriorityTaskPicker.Executor getChunkToLodBuilderExecutor() { return chunkToLodBuilderThreadPool; }
|
||||
|
||||
|
||||
|
||||
@@ -99,33 +87,13 @@ public class ThreadPoolUtil
|
||||
|
||||
public static void setupThreadPools()
|
||||
{
|
||||
// create thread semaphore
|
||||
threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get();
|
||||
threadSemaphore = new PrioritySemaphore(threadSemaphoreCount);
|
||||
threadSemaphoreConfigListener = new ConfigChangeListener<>(Config.Common.MultiThreading.numberOfThreads, (val) ->
|
||||
{
|
||||
threadSemaphore.changePermitCount(val);
|
||||
threadSemaphoreCount = val;
|
||||
});
|
||||
|
||||
|
||||
|
||||
// thread pools
|
||||
chunkToLodBuilderThreadPool = new ConfigThreadPool(CHUNK_TO_LOD_BUILDER_THREAD_FACTORY,
|
||||
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
|
||||
threadSemaphore, 3); // We want to make sure any chunk changes are found
|
||||
fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY,
|
||||
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
|
||||
threadSemaphore, 2); // loading in new LODs is second highest priority
|
||||
updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY,
|
||||
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
|
||||
threadSemaphore, 1); // update propagation needs to be slightly higher priority than world gen
|
||||
worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY,
|
||||
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
|
||||
threadSemaphore, 0); // higher priorities mean the threads will run first
|
||||
networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY,
|
||||
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
|
||||
threadSemaphore, 0); // networking can probably have similar priority to world gen since they work similarly
|
||||
taskPicker = new PriorityTaskPicker();
|
||||
networkCompressionThreadPool = taskPicker.createExecutor(3); // Data should never pile up waiting to be sent
|
||||
fileHandlerThreadPool = taskPicker.createExecutor(3); // loading in new LODs is second-highest priority
|
||||
chunkToLodBuilderThreadPool = taskPicker.createExecutor(2); // We want to make sure any chunk changes are found
|
||||
updatePropagatorThreadPool = taskPicker.createExecutor(2); // update propagation needs to be slightly higher priority than world gen
|
||||
worldGenThreadPool = taskPicker.createExecutor(1); // higher priorities mean the threads will run first
|
||||
|
||||
|
||||
|
||||
@@ -139,22 +107,10 @@ public class ThreadPoolUtil
|
||||
public static void shutdownThreadPools()
|
||||
{
|
||||
// standalone threads
|
||||
fileHandlerThreadPool.shutdownExecutorService();
|
||||
updatePropagatorThreadPool.shutdownExecutorService();
|
||||
worldGenThreadPool.shutdownExecutorService();
|
||||
networkCompressionThreadPool.shutdownExecutorService();
|
||||
taskPicker.shutdown();
|
||||
cleanupThreadPool.shutdown();
|
||||
beaconCullingThreadPool.shutdown();
|
||||
fullDataMigrationThreadPool.shutdown();
|
||||
chunkToLodBuilderThreadPool.shutdownExecutorService();
|
||||
|
||||
threadSemaphore = null;
|
||||
|
||||
if (threadSemaphoreConfigListener != null)
|
||||
{
|
||||
threadSemaphoreConfigListener.close();
|
||||
threadSemaphoreConfigListener = null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user