Separate DH pool threads and new executor "Render Loader"

Having separate threads for each task behind the scenes allows for easier performance monitoring vs having a single threadpool that handles everything.
This commit is contained in:
James Seibel
2025-10-04 20:10:10 -05:00
parent bd517e54cf
commit aed5bb4163
8 changed files with 227 additions and 107 deletions
@@ -1,11 +1,6 @@
package com.seibel.distanthorizons.core.dataObjects.render;
import com.google.common.cache.Cache;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dataObjects.transformers.FullDataToRenderDataTransformer;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;
public class FullDataSourceProviderV1<TDhLevel extends IDhLevel>
@@ -210,8 +210,18 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
}
PriorityTaskPicker.Executor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor();
if (fileExecutor == null || fileExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2)
PriorityTaskPicker.Executor renderLoadExecutor = ThreadPoolUtil.getRenderLoadingExecutor();
if (renderLoadExecutor == null
|| renderLoadExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2)
{
// don't queue additional world gen requests if the render loader handler is overwhelmed,
// otherwise LODs may not load in properly
return false;
}
PriorityTaskPicker.Executor fileHandlerExecutor = ThreadPoolUtil.getFileHandlerExecutor();
if (fileHandlerExecutor == null
|| fileHandlerExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2)
{
// don't queue additional world gen requests if the file handler is overwhelmed,
// otherwise LODs may not load in properly
@@ -82,6 +82,7 @@ public class F3Screen
// multi thread pools
PriorityTaskPicker.Executor worldGenPool = ThreadPoolUtil.getWorldGenExecutor();
PriorityTaskPicker.Executor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor();
PriorityTaskPicker.Executor renderLoadingPool = ThreadPoolUtil.getRenderLoadingExecutor();
PriorityTaskPicker.Executor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor();
PriorityTaskPicker.Executor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor();
PriorityTaskPicker.Executor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor();
@@ -129,6 +130,7 @@ public class F3Screen
{
// multi thread pools
messageList.add(getThreadPoolStatString("World Gen/Import", worldGenPool));
messageList.add(getThreadPoolStatString("Render Load", renderLoadingPool));
messageList.add(getThreadPoolStatString("File Handler", fileHandlerPool));
messageList.add(getThreadPoolStatString("Update Propagator", updatePool));
messageList.add(getThreadPoolStatString("LOD Builder", lodBuilderPool));
@@ -22,8 +22,6 @@ package com.seibel.distanthorizons.core.render;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.cache.Cache;
import com.seibel.distanthorizons.api.interfaces.render.IDhApiRenderableBoxGroup;
import com.seibel.distanthorizons.api.objects.render.DhApiRenderableBox;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dataObjects.render.CachedColumnRenderSource;
@@ -185,7 +183,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
return false;
}
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor();
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getRenderLoadingExecutor();
if (executor == null || executor.isTerminated())
{
return false;
@@ -335,7 +333,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor();
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getRenderLoadingExecutor();
if (executor == null || executor.isTerminated())
{
// should only happen if the threadpool is actively being re-sized
@@ -695,7 +693,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
{
// remove the task from our executor if present
// note: don't cancel the task since that prevents cleanup, we just don't want it to run
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor();
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getRenderLoadingExecutor();
if (executor != null && !executor.isTerminated())
{
Runnable runnable = this.getAndBuildRenderDataRunnable;
@@ -1,7 +1,7 @@
package com.seibel.distanthorizons.core.util.threading;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.config.listeners.IConfigListener;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
import org.apache.logging.log4j.Logger;
@@ -13,111 +13,131 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
/**
* This handles dividing work DH needs to do across
* DH's thread pool.
*/
public class PriorityTaskPicker
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private final ConfigEntry<Integer> threadCountConfig = Config.Common.MultiThreading.numberOfThreads;
private final RateLimitedThreadPoolExecutor threadPoolExecutor = new RateLimitedThreadPoolExecutor(
this.threadCountConfig.getMax(),
new DhThreadFactory("PriorityTaskPicker", Thread.MIN_PRIORITY, false),
new ArrayBlockingQueue<>(this.threadCountConfig.getMax())
);
// List of executors
/** the list of currently registered executors */
private final ArrayList<Executor> executors = new ArrayList<>();
// Lock to ensure task picking logic is thread-safe
/** Lock to ensure task picking logic is thread-safe */
private final ReentrantLock taskPickerLock = new ReentrantLock();
// Tracks the number of active threads
private final AtomicInteger occupiedThreads = new AtomicInteger(0);
/** Tracks the number of active threads */
private final AtomicInteger occupiedThreadsRef = new AtomicInteger(0);
private final AtomicBoolean isShutDownRef = new AtomicBoolean(false);
//==================//
// executor methods //
//==================//
//==========//
// executor //
//==========//
/**
* Creates an executor.
*
* @return a newly created Executor
*/
public Executor createExecutor()
public Executor createExecutor(String name)
{
Executor executor = new Executor();
Executor executor = new Executor(this, name);
this.executors.add(executor);
return executor;
}
/**
* Tries to start the next task by iterating over executors in the queue.
* Ensures thread limits are respected and only one thread iterates over the executorQueue at a time.
* Tries to start the next queued task
* for one of the available executors.
*/
private void tryStartNextTask()
{
if (this.taskPickerLock.tryLock())
// only let one thread start the next task to prevent concurrency errors
if (!this.taskPickerLock.tryLock())
{
try
return;
}
try
{
// Limit how many tasks can be queued for a given pool before moving to the next pool.
// This allows the picker to spread out the work a little more vs having the threads
// only work on a single executor's queue at a time
int maxQueuedBeforeOverflow = Math.max(1, Config.Common.MultiThreading.numberOfThreads.get() / 2);
// fill up executors that have run for less time first,
// this prevents long-running tasks from taking up all the CPU time
Iterator<Executor> iterator = this.getExecutorIteratorSortedByShortestTotalRunTime();
while (iterator.hasNext())
{
// TODO limit based on thread count so visual VM is easier to parse
for (Executor executor : (Iterable<? extends Executor>) this.executors.stream().sorted(Comparator.comparingLong(executor -> executor.totalRuntimeNanos.get()))::iterator)
Executor executor = iterator.next();
int queuedTaskCount = 0;
TrackedRunnable task;
// start tasks until we're running as many threads as acceptable by the config,
// or until this executor is empty,
// or until we should move on to the next executor
while (this.occupiedThreadsRef.get() < Config.Common.MultiThreading.numberOfThreads.get()
&& (task = executor.taskQueue.poll()) != null
&& queuedTaskCount <= maxQueuedBeforeOverflow)
{
TrackedRunnable task;
queuedTaskCount++;
while (this.occupiedThreads.get() < this.threadCountConfig.get() && (task = executor.tasks.poll()) != null)
try
{
try
executor.runTask(task);
this.occupiedThreadsRef.getAndIncrement();
}
catch (RejectedExecutionException e)
{
if (this.isShutDownRef.get())
{
// Attempt to start another task
this.threadPoolExecutor.execute(task);
// Update variables related to task status
this.occupiedThreads.getAndIncrement();
executor.runningTasks.getAndIncrement();
// Clear this executor's tasks since we no longer expect anything to execute.
executor.taskQueue.clear();
}
catch (RejectedExecutionException e)
else
{
if (this.isShutDownRef.get())
{
// Clear executor's tasks since we no longer expect anything to execute
// Tasks from other executors will be cleared by the outer for loop
executor.tasks.clear();
}
else
{
throw e;
}
throw e;
}
}
}
}
finally
{
this.taskPickerLock.unlock();
// If someone else manages to pick up a lock before us, we'll leave early, and they will do our work
}
}
finally
{
this.taskPickerLock.unlock();
}
}
private Iterator<Executor> getExecutorIteratorSortedByShortestTotalRunTime()
{
Stream<Executor> stream = this.executors.stream();
// returns smaller numbers first
stream = stream.sorted(Comparator.comparingLong((executor) -> executor.totalRuntimeNanos.get()));
return stream.iterator();
}
/** Shuts down the thread pool immediately, stopping all tasks. */
public void shutdown()
/** Blocking, shuts down the thread pool immediately, stopping all tasks. */
public void shutdownNow()
{
LOGGER.info("Shutting down PriorityTaskPicker thread pool...");
this.isShutDownRef.set(true);
try
{
this.threadPoolExecutor.shutdown();
if (!this.threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS))
for (int i = 0; i < this.executors.size(); i++)
{
this.threadPoolExecutor.shutdownNow();
Executor executor = this.executors.get(i);
if (executor != null)
{
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS))
{
executor.shutdownNow();
}
}
}
}
catch (InterruptedException e)
@@ -132,68 +152,158 @@ public class PriorityTaskPicker
// helper classes //
//================//
public class Executor extends AbstractExecutorService
/**
* Each executor handles a specific type of work that DH needs done.
* By separating out task into its own executor it allows for easier performance monitoring
* via a tool like Visual VM and fairly spreading out CPU time between tasks.
*/
public static class Executor extends AbstractExecutorService implements IConfigListener
{
private final Queue<TrackedRunnable> tasks = new ConcurrentLinkedQueue<>();
private final PriorityTaskPicker parentTaskPicker;
private final String name;
private final Queue<TrackedRunnable> taskQueue = new ConcurrentLinkedQueue<>();
private final AtomicInteger runningTasksRef = new AtomicInteger(0);
private final AtomicInteger completedTasksRef = new AtomicInteger(0);
private final AtomicInteger runningTasks = new AtomicInteger(0);
private final AtomicInteger completedTasks = new AtomicInteger(0);
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
private final AtomicLong totalRuntimeNanos = new AtomicLong(0);
/** used for performance logging */
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
/** holds the threads this {@link Executor} can run */
private RateLimitedThreadPoolExecutor threadPoolExecutor;
//=============//
// constructor //
//=============//
public Executor(PriorityTaskPicker parentTaskPicker, String name)
{
this.parentTaskPicker = parentTaskPicker;
this.name = name;
this.threadPoolExecutor = this.createThreadPool();
Config.Common.MultiThreading.numberOfThreads.addListener(this);
}
private RateLimitedThreadPoolExecutor createThreadPool()
{
return new RateLimitedThreadPoolExecutor(
Config.Common.MultiThreading.numberOfThreads.get(),
new DhThreadFactory(this.name, Thread.MIN_PRIORITY, false),
new ArrayBlockingQueue<>(Runtime.getRuntime().availableProcessors())
);
}
//=================//
// config handling //
//=================//
@Override
public void onConfigValueSet()
{
RateLimitedThreadPoolExecutor oldExecutor = this.threadPoolExecutor;
this.threadPoolExecutor = this.createThreadPool();
// shut down the old executor after replacing it with the new one
// to make sure no tasks are lost in the transfer
if (oldExecutor != null)
{
oldExecutor.shutdown();
}
}
//=====================//
// task queue handling //
//=====================//
@Override
public void execute(@NotNull Runnable command)
{
this.tasks.add(new TrackedRunnable(this, command));
this.taskQueue.add(new TrackedRunnable(this.parentTaskPicker, this, command));
// Attempt to pick up the task immediately
PriorityTaskPicker.this.tryStartNextTask();
// Attempt to start the task immediately
this.parentTaskPicker.tryStartNextTask();
}
/** The passed in {@link Runnable} must be exactly the same as the one passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */
public void remove(@NotNull Runnable command) { this.taskQueue.removeIf(trackedRunnable -> trackedRunnable.command == command); }
public void runTask(@NotNull Runnable command)
{
this.threadPoolExecutor.execute(command);
this.runningTasksRef.getAndIncrement();
}
public int getQueueSize() { return this.tasks.size(); }
public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); }
public int getQueueSize() { return this.taskQueue.size(); }
public int getPoolSize() { return Config.Common.MultiThreading.numberOfThreads.get(); }
public int getRunningTaskCount() { return this.runningTasks.get(); }
public int getCompletedTaskCount() { return this.completedTasks.get(); }
public int getRunningTaskCount() { return this.runningTasksRef.get(); }
public int getCompletedTaskCount() { return this.completedTasksRef.get(); }
/** Will return NaN if nothing has been submitted yet */
public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); }
/** The passed in {@link Runnable} must be exactly the same as the one passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */
public void remove(@NotNull Runnable command) { this.tasks.removeIf(trackedRunnable -> trackedRunnable.command == command); }
//==========//
// shutdown //
//==========//
@Override
public void shutdown() { throw new UnsupportedOperationException(); }
public void shutdown() { this.threadPoolExecutor.shutdown(); }
@Override
public @NotNull List<Runnable> shutdownNow() { throw new UnsupportedOperationException(); }
public @NotNull List<Runnable> shutdownNow() { return this.threadPoolExecutor.shutdownNow(); }
@Override
public boolean isShutdown() { return false; }
public boolean isShutdown() { return this.threadPoolExecutor.isShutdown(); }
@Override
public boolean isTerminated() { return false; }
public boolean isTerminated() { return this.threadPoolExecutor.isTerminated(); }
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) { return false; }
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException
{ return this.threadPoolExecutor.awaitTermination(timeout, unit); }
}
/** used so we can {@link PriorityTaskPicker.Executor#remove(Runnable)} using the original {@link Runnable} */
private class TrackedRunnable implements Runnable
private static class TrackedRunnable implements Runnable
{
private final PriorityTaskPicker parentTaskPicker;
private final Executor executor;
/** the runnable passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */
public final Runnable command;
public TrackedRunnable(Executor executor, Runnable command)
//=============//
// constructor //
//=============//
public TrackedRunnable(PriorityTaskPicker parentTaskPicker, Executor executor, Runnable command)
{
this.parentTaskPicker = parentTaskPicker;
this.executor = executor;
this.command = command;
}
//=========//
// running //
//=========//
@Override
public void run()
{
@@ -208,16 +318,17 @@ public class PriorityTaskPicker
this.executor.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed));
// Update variables related to task status
PriorityTaskPicker.this.occupiedThreads.getAndDecrement();
this.executor.runningTasks.getAndDecrement();
this.executor.completedTasks.getAndIncrement();
this.parentTaskPicker.occupiedThreadsRef.getAndDecrement();
this.executor.runningTasksRef.getAndDecrement();
this.executor.completedTasksRef.getAndIncrement();
this.executor.totalRuntimeNanos.addAndGet(timeElapsed);
// Attempt to start another task
PriorityTaskPicker.this.tryStartNextTask();
this.parentTaskPicker.tryStartNextTask();
}
}
}
}
@@ -96,7 +96,7 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
* Deprecated since most of the time this doesn't do what we want or need.
* In James testing any tasks started with {@link CompletableFuture#runAsync(Runnable, Executor)}
* or {@link CompletableFuture#supplyAsync(Supplier, Executor)} converted the {@link Runnable}
* and {@link CompletableFuture} into objects that didn't support being cancled and removed
* and {@link CompletableFuture} into objects that didn't support being canceled and removed
* from the queue. The canceled tasks were correctly never run, but couldn't be purged.
*/
@Deprecated
@@ -45,6 +45,10 @@ public class ThreadPoolUtil
@Nullable
public static PriorityTaskPicker.Executor getFileHandlerExecutor() { return fileHandlerThreadPool; }
private static PriorityTaskPicker.Executor renderSectionLoadThreadPool;
@Nullable
public static PriorityTaskPicker.Executor getRenderLoadingExecutor() { return renderSectionLoadThreadPool; }
private static PriorityTaskPicker.Executor updatePropagatorThreadPool;
@Nullable
public static PriorityTaskPicker.Executor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool; }
@@ -95,15 +99,16 @@ public class ThreadPoolUtil
if (taskPicker != null)
{
taskPicker.shutdown();
taskPicker.shutdownNow();
}
taskPicker = new PriorityTaskPicker();
networkCompressionThreadPool = taskPicker.createExecutor();
fileHandlerThreadPool = taskPicker.createExecutor();
chunkToLodBuilderThreadPool = taskPicker.createExecutor();
updatePropagatorThreadPool = taskPicker.createExecutor();
worldGenThreadPool = taskPicker.createExecutor();
networkCompressionThreadPool = taskPicker.createExecutor("Network");
fileHandlerThreadPool = taskPicker.createExecutor("IO");
renderSectionLoadThreadPool = taskPicker.createExecutor("Render Loader");
chunkToLodBuilderThreadPool = taskPicker.createExecutor("LOD Builder");
updatePropagatorThreadPool = taskPicker.createExecutor("Update Propagator");
worldGenThreadPool = taskPicker.createExecutor("World Gen");
@@ -128,7 +133,7 @@ public class ThreadPoolUtil
public static void shutdownThreadPools()
{
// standalone threads
taskPicker.shutdown();
taskPicker.shutdownNow();
beaconCullingThreadPool.shutdown();
fullDataMigrationThreadPool.shutdown();
}