This commit is contained in:
James Seibel
2025-01-07 20:00:33 -06:00
16 changed files with 267 additions and 540 deletions
@@ -34,6 +34,7 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.objects.Pair;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.world.*;
import com.seibel.distanthorizons.core.wrapperInterfaces.chunk.IChunkWrapper;
@@ -344,8 +345,8 @@ public class SharedApi
// queue updates up to the number of CPU cores allocated for the job
// (this prevents doing extra work queuing tasks that may not be necessary)
// and makes sure the chunks closest to the player are updated first
ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor != null && executor.getQueue().size() < executor.getCorePoolSize())
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor != null && executor.getQueueSize() < executor.getPoolSize())
{
try
{
@@ -432,7 +433,7 @@ public class SharedApi
finally
{
// queue the next position if there are still positions to process
ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor != null && !UPDATE_POS_MANAGER.updateDataByChunkPos.isEmpty())
{
try
@@ -114,7 +114,7 @@ public abstract class AbstractDataSourceHandler
*/
public CompletableFuture<TDataSource> getAsync(long pos)
{
ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
@@ -188,7 +188,7 @@ public abstract class AbstractDataSourceHandler
public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource)
{
ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
@@ -18,6 +18,7 @@ import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
@@ -96,7 +97,7 @@ public class FullDataSourceProviderV1<TDhLevel extends IDhLevel>
*/
public CompletableFuture<FullDataSourceV1> getAsync(long pos)
{
ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
@@ -40,6 +40,7 @@ import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import it.unimi.dsi.fastutil.longs.LongArrayList;
@@ -208,7 +209,7 @@ public class FullDataSourceProviderV2
{
Thread.sleep(UPDATE_QUEUE_THREAD_DELAY_IN_MS);
ThreadPoolExecutor executor = ThreadPoolUtil.getUpdatePropagatorExecutor();
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getUpdatePropagatorExecutor();
if (executor == null || executor.isTerminated())
{
continue;
@@ -225,7 +226,7 @@ public class FullDataSourceProviderV2
}
// queue parent updates
if (executor.getQueue().size() < MAX_UPDATE_TASK_COUNT
if (executor.getQueueSize() < MAX_UPDATE_TASK_COUNT
&& this.parentUpdatingPosSet.size() < MAX_UPDATE_TASK_COUNT)
{
// get the positions that need to be applied to their parents
@@ -36,6 +36,7 @@ import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
@@ -194,16 +195,16 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
}
ThreadPoolExecutor updateExecutor = ThreadPoolUtil.getUpdatePropagatorExecutor();
if (updateExecutor == null || updateExecutor.getQueue().size() >= MAX_UPDATE_TASK_COUNT / 2)
PriorityTaskPicker.Executor updateExecutor = ThreadPoolUtil.getUpdatePropagatorExecutor();
if (updateExecutor == null || updateExecutor.getQueueSize() >= MAX_UPDATE_TASK_COUNT / 2)
{
// don't queue additional world gen requests if the updater is behind
return false;
}
ThreadPoolExecutor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor();
if (fileExecutor == null || fileExecutor.getQueue().size() >= MAX_UPDATE_TASK_COUNT / 2)
PriorityTaskPicker.Executor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor();
if (fileExecutor == null || fileExecutor.getQueueSize() >= MAX_UPDATE_TASK_COUNT / 2)
{
// don't queue additional world gen requests if the file handler is overwhelmed,
// otherwise LODs may not load in properly
@@ -45,6 +45,7 @@ import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
import com.seibel.distanthorizons.core.util.objects.UncheckedInterruptedException;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.world.DhApiWorldProxy;
import com.seibel.distanthorizons.core.wrapperInterfaces.IWrapperFactory;
@@ -234,7 +235,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
}
public boolean isGeneratorBusy()
{
ThreadPoolExecutor executor = ThreadPoolUtil.getWorldGenExecutor();
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getWorldGenExecutor();
if (executor == null)
{
// shouldn't happen, but just in case, don't queue more tasks
@@ -243,7 +244,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
int worldGenThreadCount = Math.max(Config.Common.MultiThreading.numberOfThreads.get(), 1);
int maxWorldGenTaskCount = worldGenThreadCount * MAX_QUEUED_TASKS_PER_THREAD;
return executor.getQueue().size() > maxWorldGenTaskCount;
return executor.getQueueSize() > maxWorldGenTaskCount;
}
/**
@@ -603,7 +604,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
try
{
int waitTimeInSeconds = 3;
ThreadPoolExecutor executor = ThreadPoolUtil.getWorldGenExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getWorldGenExecutor();
if (executor != null && !executor.awaitTermination(waitTimeInSeconds, TimeUnit.SECONDS))
{
LOGGER.warn("World generator thread pool shutdown didn't complete after [" + waitTimeInSeconds + "] seconds. Some world generator requests may still be running.");
@@ -28,7 +28,7 @@ import com.seibel.distanthorizons.core.pooling.PhantomArrayListPool;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.RenderBufferHandler;
import com.seibel.distanthorizons.core.render.renderer.generic.GenericObjectRenderer;
import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.world.AbstractDhWorld;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
@@ -80,11 +80,11 @@ public class F3Screen
public static void addStringToDisplay(List<String> messageList)
{
// multi thread pools
ThreadPoolExecutor worldGenPool = ThreadPoolUtil.getWorldGenExecutor();
ThreadPoolExecutor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor();
ThreadPoolExecutor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor();
ThreadPoolExecutor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor();
ThreadPoolExecutor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor();
PriorityTaskPicker.Executor worldGenPool = ThreadPoolUtil.getWorldGenExecutor();
PriorityTaskPicker.Executor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor();
PriorityTaskPicker.Executor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor();
PriorityTaskPicker.Executor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor();
PriorityTaskPicker.Executor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor();
// single thread pools
ThreadPoolExecutor cleanupPool = ThreadPoolUtil.getCleanupExecutor();
@@ -191,25 +191,23 @@ public class F3Screen
// helper methods //
//================//
private static String getThreadPoolStatString(String name, ThreadPoolExecutor pool)
private static String getThreadPoolStatString(String name, PriorityTaskPicker.Executor pool)
{
String queueSize = (pool != null) ? NUMBER_FORMAT.format(pool.getQueue().size()) : "-";
String queueSize = (pool != null) ? NUMBER_FORMAT.format(pool.getQueueSize()) : "-";
String completedCount = (pool != null) ? NUMBER_FORMAT.format(pool.getCompletedTaskCount()) : "-";
String message = name+", Tasks: "+queueSize+", Done: "+completedCount;
if (pool != null && pool.getClass() == RateLimitedThreadPoolExecutor.class)
if (pool != null)
{
RateLimitedThreadPoolExecutor rateLimitedPool = ((RateLimitedThreadPoolExecutor) pool);
// active threads
int activeThreadCount = rateLimitedPool.semaphoresAcquired.get();
int threadCount = ThreadPoolUtil.getThreadCount();
int activeThreadCount = pool.getRunningTaskCount();
int threadCount = pool.getPoolSize();
message += ", Active: "+activeThreadCount+"/"+threadCount;
// thread runtime
String runTimeAvgStr;
double runTimeAvgInMs = rateLimitedPool.getAverageRunTimeInMs();
double runTimeAvgInMs = pool.getAverageRunTimeInMs();
if (!Double.isNaN(runTimeAvgInMs))
{
runTimeAvgStr = NUMBER_FORMAT.format(runTimeAvgInMs);
@@ -228,7 +228,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
{
FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(response.payload);
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to handle FullDataPayload - getNetworkCompressionExecutor() is null");
@@ -17,10 +17,7 @@ import org.apache.logging.log4j.LogManager;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class FullDataSourceRequestHandler
@@ -76,7 +73,7 @@ public class FullDataSourceRequestHandler
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
// shouldn't normally happen, but just in case
@@ -196,7 +193,7 @@ public class FullDataSourceRequestHandler
continue;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
AbstractExecutorService executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
@@ -38,6 +38,7 @@ import com.seibel.distanthorizons.core.render.glObject.GLProxy;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.dataObjects.render.bufferBuilding.ColumnRenderBuffer;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import it.unimi.dsi.fastutil.longs.LongArrayList;
@@ -132,7 +133,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
return;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getFileHandlerExecutor();
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor();
if (executor == null || executor.isTerminated())
{
return;
@@ -140,7 +141,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
// don't queue up an infinite number of tasks
// doing so will cause memory use to balloon when swapping between dimensions
if (executor.getQueue().size() > executor.getPoolSize())
if (executor.getQueueSize() > executor.getPoolSize())
{
return;
}
@@ -19,11 +19,7 @@
package com.seibel.distanthorizons.core.util;
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.util.threading.DhThreadFactory;
import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore;
import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.coreapi.ModInfo;
import org.apache.logging.log4j.LogManager;
@@ -43,55 +39,10 @@ public class ThreadUtil
public static final String THREAD_NAME_PREFIX = ModInfo.THREAD_NAME_PREFIX;
/** used to track and remove old listeners for certain pools if the thread pool is recreated. */
private static final ConcurrentHashMap<String, ConfigChangeListener<Double>> THREAD_CHANGE_LISTENERS_BY_THREAD_NAME = new ConcurrentHashMap<>();
// TODO move all "Runtime.getRuntime().availableProcessors()" calls here
//===================//
// rate limited pool //
//===================//
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, DhThreadFactory threadFactory, ConfigEntry<Double> runTimeRatioConfigEntry, PrioritySemaphore activeThreadCountSemaphore, int priority)
{
// remove the old listener if one exists
if (THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.containsKey(threadFactory.threadName))
{
// note: this assumes only one thread pool exists with a given name
THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.get(threadFactory.threadName).close();
THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.remove(threadFactory.threadName);
}
if (!threadFactory.threadName.startsWith(THREAD_NAME_PREFIX))
{
// this will only happen if a ThreadFactory is passed in that doesn't have the correct thread name
LOGGER.warn("Thread pool with the name ["+threadFactory.threadName+"] is missing the expected Distant Horizons thread prefix ["+THREAD_NAME_PREFIX+"].");
}
RateLimitedThreadPoolExecutor executor = makeRateLimitedThreadPool(poolSize, runTimeRatioConfigEntry.get(), threadFactory, activeThreadCountSemaphore, priority);
ConfigChangeListener<Double> changeListener = new ConfigChangeListener<>(runTimeRatioConfigEntry, (newRunTimeRatio) -> { executor.runTimeRatio = newRunTimeRatio; });
THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.put(threadFactory.threadName, changeListener);
return executor;
}
/** should only be used if there isn't a config controlling the run time ratio of this thread pool */
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, String name, Double runTimeRatio, int threadPriority, PrioritySemaphore activeThreadCountSemaphore, int priority)
{
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, new DhThreadFactory(name, threadPriority, false), activeThreadCountSemaphore, priority);
}
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, Double runTimeRatio, DhThreadFactory threadFactory, PrioritySemaphore activeThreadCountSemaphore, int priority)
{
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, threadFactory, activeThreadCountSemaphore, priority);
}
//===============//
// standard pool //
//===============//
@@ -1,113 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.util.threading;
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import java.util.List;
import java.util.Queue;
/**
* Handles thread pools with config values for their
* thread count and run time ratio.
*/
public class ConfigThreadPool
{
/** Caution must be used to prevent deadlock */
private final PrioritySemaphore threadSemaphore;
/** higher numbers run first */
private final int priority;
public RateLimitedThreadPoolExecutor executor = null;
private int threadCount = 0;
public final DhThreadFactory threadFactory;
public final ConfigChangeListener<Integer> threadCountConfigListener;
public final ConfigEntry<Integer> threadCountConfig;
public final ConfigEntry<Double> runTimeRatioConfig;
//=============//
// constructor //
//=============//
public ConfigThreadPool(DhThreadFactory threadFactory, ConfigEntry<Integer> threadCountConfig, ConfigEntry<Double> runTimeRatioConfig, PrioritySemaphore threadSemaphore, int priority)
{
this.threadFactory = threadFactory;
this.threadSemaphore = threadSemaphore;
this.priority = priority;
this.threadCountConfig = threadCountConfig;
this.threadCountConfigListener = new ConfigChangeListener<>(threadCountConfig,
(threadCount) -> { this.setThreadPoolSize(threadCount); });
this.runTimeRatioConfig = runTimeRatioConfig;
this.setThreadPoolSize(threadCountConfig.get());
}
//==============//
// thread setup //
//==============//
public void setThreadPoolSize(int threadPoolSize)
{
Queue<Runnable> incompleteRunnableQueue = null;
if (this.executor != null)
{
// close the previous thread pool if one exists
this.executor.shutdown(); // don't do shutdown now since we don't want to throw any interrupt exceptions
incompleteRunnableQueue = this.executor.getQueue();
}
this.threadCount = threadPoolSize;
this.executor = ThreadUtil.makeRateLimitedThreadPool(this.threadCount, this.threadFactory, this.runTimeRatioConfig, this.threadSemaphore, this.priority);
if (incompleteRunnableQueue != null)
{
for (Runnable runnable : incompleteRunnableQueue)
{
this.executor.execute(runnable);
}
}
}
/**
* Stops any executing tasks and destroys the executor. <br>
* Does nothing if the executor isn't running.
*/
public void shutdownExecutorService()
{
if (this.executor != null)
{
this.executor.shutdownNow();
}
this.threadCount = 0;
}
}
@@ -1,164 +0,0 @@
package com.seibel.distanthorizons.core.util.threading;
import org.jetbrains.annotations.NotNull;
import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;
/**
* For use with {@link RateLimitedThreadPoolExecutor}
*/
public class PrioritySemaphore
{
public int maxPermitCount;
public int currentPermitCount;
private final PriorityBlockingQueue<ThreadWithPriority> queue = new PriorityBlockingQueue<>();
private final ReentrantLock lock = new ReentrantLock();
private final Random random = new Random();
//=============//
// constructor //
//=============//
public PrioritySemaphore(int permits)
{
this.maxPermitCount = permits;
this.currentPermitCount = this.maxPermitCount;
}
//==================//
// permit acquiring //
//==================//
/** Similar to {@link Semaphore#acquire()} */
public void acquire(RateLimitedThreadPoolExecutor executor) throws InterruptedException
{
Thread thread = Thread.currentThread();
this.lock.lock();
try
{
if (this.currentPermitCount > 0)
{
// a permit is available,
// this thread can run normally
this.currentPermitCount--;
// return to prevent running the thread's wait() method below
return;
}
}
finally
{
this.lock.unlock();
}
// no permit is available
// this has to be outside the try-finally to prevent holding the lock while waiting
synchronized (thread)
{
// Calculation rules:
// - Executors with higher priority need less tasks to run before other executors
// If one executor has the priority of 3 and other if of 4,
// the latter one will need 1/4 fewer tasks in queue to get its tasks running
// - Executors with short-lived tasks run before longer lived ones
// 100k value is a multiplier to prevent precision loss
int priority = (int) ((executor.priority + 1) * executor.getTaskCount() * 100000 / executor.getAverageRunTimeInMs());
// this thread will be run when a permit is available
this.queue.put(new ThreadWithPriority(thread, priority));
thread.wait();
}
}
/** Similar to {@link Semaphore#release()} */
public void release()
{
this.lock.lock();
try
{
// wake up the nex thread if one is queued
if (!this.queue.isEmpty())
{
Thread nextThread = this.queue.poll().thread;
synchronized (nextThread)
{
nextThread.notify();
}
}
else
{
this.currentPermitCount++;
// don't increase past the max allowed (this can happen when changing the max permit count)
this.currentPermitCount = Math.min(this.currentPermitCount, this.maxPermitCount);
}
}
finally
{
this.lock.unlock();
}
}
//=================//
// permit changing //
//=================//
public void changePermitCount(int val)
{
// find the max number of permits to increase by
int permitCountIncrease = Math.max(0, val - this.maxPermitCount);
this.lock.lock();
try
{
this.currentPermitCount += permitCountIncrease;
this.maxPermitCount = val;
}
finally
{
this.lock.unlock();
}
}
public int availablePermits() { return this.currentPermitCount; }
//================//
// helper classes //
//================//
/** simple sortable container to track a thread and it's priority */
private static class ThreadWithPriority implements Comparable<ThreadWithPriority>
{
private final Thread thread;
private final int priority;
public ThreadWithPriority(Thread thread, int priority)
{
this.thread = thread;
this.priority = priority;
}
@Override
public int compareTo(@NotNull ThreadWithPriority other)
{
// highest number first
return Integer.compare(other.priority, this.priority);
}
}
}
@@ -0,0 +1,185 @@
package com.seibel.distanthorizons.core.util.threading;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class PriorityTaskPicker
{
private final ConfigEntry<Integer> threadCountConfig = Config.Common.MultiThreading.numberOfThreads;
private final RateLimitedThreadPoolExecutor threadPoolExecutor = new RateLimitedThreadPoolExecutor(
this.threadCountConfig.getMax(),
new DhThreadFactory("PriorityTaskPicker", Thread.MIN_PRIORITY, false),
new ArrayBlockingQueue<>(this.threadCountConfig.getMax())
);
// Queue of executors, used to distribute tasks across executors based on priority
private final ArrayList<Executor> executorQueue = new ArrayList<>();
private int nextExecutorQueuePos = 0;
// Lock to ensure task picking logic is thread-safe
private final ReentrantLock taskPickerLock = new ReentrantLock();
// Indicates whether a task picking attempt is needed
private final AtomicBoolean shouldPickTask = new AtomicBoolean(false);
// Tracks the number of active threads
private final AtomicInteger occupiedThreads = new AtomicInteger(0);
/**
* Creates an executor with a specific priority.
* Higher priority executors have more entries in the distribution queue, giving them a greater chance to run tasks.
*
* @param priority the priority level of the executor
* @return a newly created Executor
*/
public Executor createExecutor(int priority)
{
Executor executor = new Executor();
int entriesToAdd = priority + 1;
int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size());
// Distribute the executor's entries in the queue, ensuring fair distribution
for (; entriesToAdd > 0; entriesToAdd--)
{
this.executorQueue.add(executor);
Collections.rotate(this.executorQueue, -gapBetweenEntries);
}
return executor;
}
/**
* Tries to start the next task by iterating over executors in the queue.
* Ensures thread limits are respected and only one thread iterates over the executorQueue at a time.
*/
private void tryStartNextTask()
{
this.shouldPickTask.set(true);
while (this.taskPickerLock.tryLock())
{
try
{
// Exit if there's no longer a need to pick a task
if (!this.shouldPickTask.compareAndSet(true, false))
{
// There is a small chance for a task to end up in a 'limbo' state,
// when this.shouldPickTask got set to true right here and this.taskPickerLock is not unlocked yet,
// but we'll disregard that since tasks get added often enough for this to not be an issue
return;
}
// Iterate over the executors in the queue, attempting to start tasks
for (
int taskPickAttempts = 0;
taskPickAttempts < this.executorQueue.size() && this.occupiedThreads.get() < this.threadCountConfig.get();
taskPickAttempts++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size()
)
{
Executor executor = this.executorQueue.get(this.nextExecutorQueuePos);
Runnable task = executor.tasks.poll();
if (task != null)
{
// Update variables related to task status
this.occupiedThreads.getAndIncrement();
executor.runningTasks.getAndIncrement();
// Prevent exiting early since there might be more than this.executorQueue.size() tasks waiting in queue
taskPickAttempts = 0;
this.threadPoolExecutor.execute(task);
}
}
}
finally
{
this.taskPickerLock.unlock();
// If someone else manages to pick up a lock before us, we'll leave early, and they will do our work
}
}
}
/**
* Shuts down the thread pool immediately, stopping all tasks.
*/
public void shutdown()
{
this.threadPoolExecutor.shutdownNow();
}
public class Executor extends AbstractExecutorService
{
private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
private final AtomicInteger runningTasks = new AtomicInteger(0);
private final AtomicInteger completedTasks = new AtomicInteger(0);
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
@Override
public void execute(@NotNull Runnable command)
{
this.tasks.add(() -> {
long startTime = System.nanoTime();
try
{
command.run();
}
finally
{
long timeElapsed = System.nanoTime() - startTime;
this.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed));
// Update variables related to task status
PriorityTaskPicker.this.occupiedThreads.getAndDecrement();
this.runningTasks.getAndDecrement();
this.completedTasks.getAndIncrement();
// Attempt to start another task
PriorityTaskPicker.this.tryStartNextTask();
}
});
// Attempt to pick up the task immediately
PriorityTaskPicker.this.tryStartNextTask();
}
public int getQueueSize() { return this.tasks.size(); }
public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); }
public int getRunningTaskCount() { return this.runningTasks.get(); }
public int getCompletedTaskCount() { return this.completedTasks.get(); }
/** Will return NaN if nothing has been submitted yet */
public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); }
@Override
public void shutdown() { throw new UnsupportedOperationException(); }
@Override
public @NotNull List<Runnable> shutdownNow() { throw new UnsupportedOperationException(); }
@Override
public boolean isShutdown() { return false; }
@Override
public boolean isTerminated() { return false; }
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) { return false; }
}
}
@@ -19,49 +19,26 @@
package com.seibel.distanthorizons.core.util.threading;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
/**
* Can be used to more finely control CPU usage and
* reduce CPU usage if only 1 thread is already assigned.
*/
public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements Comparable<RateLimitedThreadPoolExecutor>
public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
/** logs include the thread name by default which can help diagnose deadlocks */
private static final boolean LOG_SEMAPHORE_ACTIONS = false;
public volatile double runTimeRatio;
public final ConfigEntry<Double> runTimeRatioConfig = Config.Common.MultiThreading.threadRunTimeRatio;
/** When this thread started running its last task */
private final ThreadLocal<Long> runStartNanoTimeRef = ThreadLocal.withInitial(() -> -1L);
/** How long it took this thread to run its last task */
private final ThreadLocal<Long> lastRunDurationNanoTimeRef = ThreadLocal.withInitial(() -> -1L);
/**
* Does nothing if {@link RateLimitedThreadPoolExecutor#threadSemaphore}
* is null. <br>
* Higher numbers have higher priority.
*/
public final int priority;
/** if null this thread pool will run independently of other pools */
@Nullable
public final PrioritySemaphore threadSemaphore;
/** will always be zero if no semaphore is present */
public final AtomicInteger semaphoresAcquired = new AtomicInteger(0);
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
private final ThreadLocal<Long> runStartTime = ThreadLocal.withInitial(() -> -1L);
@@ -69,18 +46,12 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
// constructors //
//==============//
public RateLimitedThreadPoolExecutor(
int corePoolSize, double runTimeRatio, ThreadFactory threadFactory,
@Nullable PrioritySemaphore prioritySemaphore, int priority)
public RateLimitedThreadPoolExecutor(int poolSize, ThreadFactory threadFactory, BlockingQueue<Runnable> workQueue)
{
super(corePoolSize, corePoolSize,
super(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), // TODO using a PriorityBlockingQueue would be nice to allow for prioritizing tasks, but then all tasks must be Comparable
workQueue,
threadFactory);
this.runTimeRatio = runTimeRatio;
this.priority = priority;
this.threadSemaphore = prioritySemaphore;
}
@@ -92,36 +63,7 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
@Override
protected void beforeExecute(Thread thread, Runnable runnable)
{
long deltaMs = TimeUnit.NANOSECONDS.toMillis(this.lastRunDurationNanoTimeRef.get());
this.runTimeInMsRollingAverage.addValue(deltaMs);
if (this.runTimeRatio < 1.0 && this.lastRunDurationNanoTimeRef.get() != -1)
{
try
{
Thread.sleep((long) (deltaMs / this.runTimeRatio - deltaMs));
}
catch (InterruptedException ignored) { }
}
if (this.threadSemaphore != null)
{
try
{
// Warning, this can cause deadlock if one thread calls another.
this.threadSemaphore.acquire(this);
this.semaphoresAcquired.getAndAdd(1);
if (LOG_SEMAPHORE_ACTIONS)
{
LOGGER.debug("acquired, available count: ["+this.threadSemaphore.availablePermits()+"]");
}
}
catch (InterruptedException ignore) { }
}
this.runStartNanoTimeRef.set(System.nanoTime());
this.runStartTime.set(System.nanoTime());
super.beforeExecute(thread, runnable);
}
@@ -131,18 +73,14 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
{
super.afterExecute(runnable, throwable);
this.lastRunDurationNanoTimeRef.set(System.nanoTime() - this.runStartNanoTimeRef.get());
if (this.threadSemaphore != null)
try
{
this.threadSemaphore.release();
this.semaphoresAcquired.getAndAdd(-1);
if (LOG_SEMAPHORE_ACTIONS)
{
LOGGER.debug("released, available count: ["+this.threadSemaphore.availablePermits()+"]");
}
long runTime = System.nanoTime() - this.runStartTime.get();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis((long) (runTime / this.runTimeRatioConfig.get() - runTime)));
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
@@ -150,15 +88,6 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
protected void terminated()
{
super.terminated();
// release all held semaphores (shouldn't normally be necessary, but just in case)
if (this.threadSemaphore != null)
{
if (LOG_SEMAPHORE_ACTIONS)
{
LOGGER.info("terminated, released ["+this.semaphoresAcquired+"], available count: ["+this.threadSemaphore.availablePermits()+"]");
}
}
}
/**
@@ -175,28 +104,4 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements
super.purge();
}
//==============//
// running time //
//==============//
/** will return Nan if nothing has been submitted yet */
public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); }
//============//
// comparison //
//============//
@Override
public int compareTo(@NotNull RateLimitedThreadPoolExecutor other)
{
// highest number first
return Integer.compare(other.priority, this.priority);
}
}
@@ -19,12 +19,10 @@
package com.seibel.distanthorizons.core.util.threading;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;
/**
* Holds each thread pool the system uses.
@@ -40,20 +38,20 @@ public class ThreadPoolUtil
// standalone thread pools all handle independent systems
// and don't interfere with any other pool
public static final DhThreadFactory FILE_HANDLER_THREAD_FACTORY = new DhThreadFactory("File Handler", Thread.MIN_PRIORITY, false);
private static ConfigThreadPool fileHandlerThreadPool;
@Nullable
public static ThreadPoolExecutor getFileHandlerExecutor() { return fileHandlerThreadPool.executor; }
private static PriorityTaskPicker taskPicker;
public static final DhThreadFactory UPDATE_PROPAGATOR_THREAD_FACTORY = new DhThreadFactory("LOD Update Propagator", Thread.MIN_PRIORITY, false);
private static ConfigThreadPool updatePropagatorThreadPool;
private static PriorityTaskPicker.Executor fileHandlerThreadPool;
@Nullable
public static ThreadPoolExecutor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool.executor; }
public static PriorityTaskPicker.Executor getFileHandlerExecutor() { return fileHandlerThreadPool; }
private static PriorityTaskPicker.Executor updatePropagatorThreadPool;
@Nullable
public static PriorityTaskPicker.Executor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool; }
public static final DhThreadFactory WORLD_GEN_THREAD_FACTORY = new DhThreadFactory("World Gen", Thread.MIN_PRIORITY, false);
private static ConfigThreadPool worldGenThreadPool;
private static PriorityTaskPicker.Executor worldGenThreadPool;
@Nullable
public static ThreadPoolExecutor getWorldGenExecutor() { return worldGenThreadPool.executor; }
public static PriorityTaskPicker.Executor getWorldGenExecutor() { return worldGenThreadPool; }
public static final String CLEANUP_THREAD_NAME = "Cleanup";
private static ThreadPoolExecutor cleanupThreadPool;
@@ -65,10 +63,9 @@ public class ThreadPoolUtil
@Nullable
public static ThreadPoolExecutor getBeaconCullingExecutor() { return beaconCullingThreadPool; }
public static final DhThreadFactory NETWORK_COMPRESSION_THREAD_FACTORY = new DhThreadFactory("Network Compression", Thread.MIN_PRIORITY, false);
private static ConfigThreadPool networkCompressionThreadPool;
private static PriorityTaskPicker.Executor networkCompressionThreadPool;
@Nullable
public static ThreadPoolExecutor getNetworkCompressionExecutor() { return networkCompressionThreadPool.executor; }
public static PriorityTaskPicker.Executor getNetworkCompressionExecutor() { return networkCompressionThreadPool; }
@@ -78,18 +75,9 @@ public class ThreadPoolUtil
public static ThreadPoolExecutor getFullDataMigrationExecutor() { return fullDataMigrationThreadPool; }
public static final DhThreadFactory CHUNK_TO_LOD_BUILDER_THREAD_FACTORY = new DhThreadFactory("LOD Builder - Chunk to Lod Builder", Thread.MIN_PRIORITY, false);
private static ConfigThreadPool chunkToLodBuilderThreadPool;
private static PriorityTaskPicker.Executor chunkToLodBuilderThreadPool;
@Nullable
public static ThreadPoolExecutor getChunkToLodBuilderExecutor() { return (chunkToLodBuilderThreadPool != null) ? chunkToLodBuilderThreadPool.executor : null; }
/** how many total threads can be used */
private static int threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get();
public static int getThreadCount() { return threadSemaphoreCount; }
private static PrioritySemaphore threadSemaphore = null;
private static ConfigChangeListener<Integer> threadSemaphoreConfigListener = null;
public static PriorityTaskPicker.Executor getChunkToLodBuilderExecutor() { return chunkToLodBuilderThreadPool; }
@@ -99,33 +87,19 @@ public class ThreadPoolUtil
public static void setupThreadPools()
{
// create thread semaphore
threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get();
threadSemaphore = new PrioritySemaphore(threadSemaphoreCount);
threadSemaphoreConfigListener = new ConfigChangeListener<>(Config.Common.MultiThreading.numberOfThreads, (val) ->
{
threadSemaphore.changePermitCount(val);
threadSemaphoreCount = val;
});
// thread pools
chunkToLodBuilderThreadPool = new ConfigThreadPool(CHUNK_TO_LOD_BUILDER_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 3); // We want to make sure any chunk changes are found
fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 2); // loading in new LODs is second highest priority
updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 1); // update propagation needs to be slightly higher priority than world gen
worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 0); // higher priorities mean the threads will run first
networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 0); // networking can probably have similar priority to world gen since they work similarly
taskPicker = new PriorityTaskPicker();
// IO should never be stuck waiting for something else to complete
networkCompressionThreadPool = taskPicker.createExecutor(3);
fileHandlerThreadPool = taskPicker.createExecutor(3);
// Normal priority tasks
chunkToLodBuilderThreadPool = taskPicker.createExecutor(2);
updatePropagatorThreadPool = taskPicker.createExecutor(2);
// World gen tasks are heavy and nothing strictly depends on them, so it may wait a bit
worldGenThreadPool = taskPicker.createExecutor(1);
@@ -139,22 +113,10 @@ public class ThreadPoolUtil
public static void shutdownThreadPools()
{
// standalone threads
fileHandlerThreadPool.shutdownExecutorService();
updatePropagatorThreadPool.shutdownExecutorService();
worldGenThreadPool.shutdownExecutorService();
networkCompressionThreadPool.shutdownExecutorService();
taskPicker.shutdown();
cleanupThreadPool.shutdown();
beaconCullingThreadPool.shutdown();
fullDataMigrationThreadPool.shutdown();
chunkToLodBuilderThreadPool.shutdownExecutorService();
threadSemaphore = null;
if (threadSemaphoreConfigListener != null)
{
threadSemaphoreConfigListener.close();
threadSemaphoreConfigListener = null;
}
}
}