diff --git a/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/render/CachedColumnRenderSource.java b/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/render/CachedColumnRenderSource.java index 4b05bb260..330186ae5 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/render/CachedColumnRenderSource.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/render/CachedColumnRenderSource.java @@ -1,11 +1,6 @@ package com.seibel.distanthorizons.core.dataObjects.render; import com.google.common.cache.Cache; -import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; -import com.seibel.distanthorizons.core.dataObjects.transformers.FullDataToRenderDataTransformer; -import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2; -import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; -import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java index 6aaa765d2..1170cfbc6 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV1.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.ReentrantLock; public class FullDataSourceProviderV1 diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index ff6cd9755..041c80d3f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -210,8 +210,18 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im } - PriorityTaskPicker.Executor fileExecutor = ThreadPoolUtil.getFileHandlerExecutor(); - if (fileExecutor == null || fileExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2) + PriorityTaskPicker.Executor renderLoadExecutor = ThreadPoolUtil.getRenderLoadingExecutor(); + if (renderLoadExecutor == null + || renderLoadExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2) + { + // don't queue additional world gen requests if the render loader handler is overwhelmed, + // otherwise LODs may not load in properly + return false; + } + + PriorityTaskPicker.Executor fileHandlerExecutor = ThreadPoolUtil.getFileHandlerExecutor(); + if (fileHandlerExecutor == null + || fileHandlerExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2) { // don't queue additional world gen requests if the file handler is overwhelmed, // otherwise LODs may not load in properly diff --git a/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java b/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java index 2de88c548..f3267ea17 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java @@ -82,6 +82,7 @@ public class F3Screen // multi thread pools PriorityTaskPicker.Executor worldGenPool = ThreadPoolUtil.getWorldGenExecutor(); PriorityTaskPicker.Executor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor(); + PriorityTaskPicker.Executor renderLoadingPool = ThreadPoolUtil.getRenderLoadingExecutor(); PriorityTaskPicker.Executor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor(); PriorityTaskPicker.Executor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor(); PriorityTaskPicker.Executor networkPool = ThreadPoolUtil.getNetworkCompressionExecutor(); @@ -129,6 +130,7 @@ public class F3Screen { // multi thread pools messageList.add(getThreadPoolStatString("World Gen/Import", worldGenPool)); + messageList.add(getThreadPoolStatString("Render Load", renderLoadingPool)); messageList.add(getThreadPoolStatString("File Handler", fileHandlerPool)); messageList.add(getThreadPoolStatString("Update Propagator", updatePool)); messageList.add(getThreadPoolStatString("LOD Builder", lodBuilderPool)); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java index 33512c0e4..2b9f3a850 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java @@ -22,8 +22,6 @@ package com.seibel.distanthorizons.core.render; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.cache.Cache; -import com.seibel.distanthorizons.api.interfaces.render.IDhApiRenderableBoxGroup; -import com.seibel.distanthorizons.api.objects.render.DhApiRenderableBox; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.dataObjects.render.CachedColumnRenderSource; @@ -185,7 +183,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable return false; } - PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor(); + PriorityTaskPicker.Executor executor = ThreadPoolUtil.getRenderLoadingExecutor(); if (executor == null || executor.isTerminated()) { return false; @@ -335,7 +333,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable - PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor(); + PriorityTaskPicker.Executor executor = ThreadPoolUtil.getRenderLoadingExecutor(); if (executor == null || executor.isTerminated()) { // should only happen if the threadpool is actively being re-sized @@ -695,7 +693,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable { // remove the task from our executor if present // note: don't cancel the task since that prevents cleanup, we just don't want it to run - PriorityTaskPicker.Executor executor = ThreadPoolUtil.getFileHandlerExecutor(); + PriorityTaskPicker.Executor executor = ThreadPoolUtil.getRenderLoadingExecutor(); if (executor != null && !executor.isTerminated()) { Runnable runnable = this.getAndBuildRenderDataRunnable; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java index 7523ebce4..26024997b 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java @@ -1,7 +1,7 @@ package com.seibel.distanthorizons.core.util.threading; import com.seibel.distanthorizons.core.config.Config; -import com.seibel.distanthorizons.core.config.types.ConfigEntry; +import com.seibel.distanthorizons.core.config.listeners.IConfigListener; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.util.objects.RollingAverage; import org.apache.logging.log4j.Logger; @@ -13,111 +13,131 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; +/** + * This handles dividing work DH needs to do across + * DH's thread pool. + */ public class PriorityTaskPicker { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); - private final ConfigEntry threadCountConfig = Config.Common.MultiThreading.numberOfThreads; - - private final RateLimitedThreadPoolExecutor threadPoolExecutor = new RateLimitedThreadPoolExecutor( - this.threadCountConfig.getMax(), - new DhThreadFactory("PriorityTaskPicker", Thread.MIN_PRIORITY, false), - new ArrayBlockingQueue<>(this.threadCountConfig.getMax()) - ); - - // List of executors + /** the list of currently registered executors */ private final ArrayList executors = new ArrayList<>(); - // Lock to ensure task picking logic is thread-safe + /** Lock to ensure task picking logic is thread-safe */ private final ReentrantLock taskPickerLock = new ReentrantLock(); - // Tracks the number of active threads - private final AtomicInteger occupiedThreads = new AtomicInteger(0); + /** Tracks the number of active threads */ + private final AtomicInteger occupiedThreadsRef = new AtomicInteger(0); private final AtomicBoolean isShutDownRef = new AtomicBoolean(false); - //==================// - // executor methods // - //==================// + //==========// + // executor // + //==========// - /** - * Creates an executor. - * - * @return a newly created Executor - */ - public Executor createExecutor() + public Executor createExecutor(String name) { - Executor executor = new Executor(); + Executor executor = new Executor(this, name); this.executors.add(executor); return executor; } /** - * Tries to start the next task by iterating over executors in the queue. - * Ensures thread limits are respected and only one thread iterates over the executorQueue at a time. + * Tries to start the next queued task + * for one of the available executors. */ private void tryStartNextTask() { - if (this.taskPickerLock.tryLock()) + // only let one thread start the next task to prevent concurrency errors + if (!this.taskPickerLock.tryLock()) { - try + return; + } + + + try + { + // Limit how many tasks can be queued for a given pool before moving to the next pool. + // This allows the picker to spread out the work a little more vs having the threads + // only work on a single executor's queue at a time + int maxQueuedBeforeOverflow = Math.max(1, Config.Common.MultiThreading.numberOfThreads.get() / 2); + + // fill up executors that have run for less time first, + // this prevents long-running tasks from taking up all the CPU time + Iterator iterator = this.getExecutorIteratorSortedByShortestTotalRunTime(); + while (iterator.hasNext()) { - // TODO limit based on thread count so visual VM is easier to parse - for (Executor executor : (Iterable) this.executors.stream().sorted(Comparator.comparingLong(executor -> executor.totalRuntimeNanos.get()))::iterator) + Executor executor = iterator.next(); + int queuedTaskCount = 0; + + TrackedRunnable task; + + // start tasks until we're running as many threads as acceptable by the config, + // or until this executor is empty, + // or until we should move on to the next executor + while (this.occupiedThreadsRef.get() < Config.Common.MultiThreading.numberOfThreads.get() + && (task = executor.taskQueue.poll()) != null + && queuedTaskCount <= maxQueuedBeforeOverflow) { - TrackedRunnable task; + queuedTaskCount++; - while (this.occupiedThreads.get() < this.threadCountConfig.get() && (task = executor.tasks.poll()) != null) + try { - try + executor.runTask(task); + this.occupiedThreadsRef.getAndIncrement(); + } + catch (RejectedExecutionException e) + { + if (this.isShutDownRef.get()) { - // Attempt to start another task - this.threadPoolExecutor.execute(task); - - // Update variables related to task status - this.occupiedThreads.getAndIncrement(); - executor.runningTasks.getAndIncrement(); + // Clear this executor's tasks since we no longer expect anything to execute. + executor.taskQueue.clear(); } - catch (RejectedExecutionException e) + else { - if (this.isShutDownRef.get()) - { - // Clear executor's tasks since we no longer expect anything to execute - // Tasks from other executors will be cleared by the outer for loop - executor.tasks.clear(); - } - else - { - throw e; - } + throw e; } } } } - finally - { - this.taskPickerLock.unlock(); - - // If someone else manages to pick up a lock before us, we'll leave early, and they will do our work - } + } + finally + { + this.taskPickerLock.unlock(); } } + private Iterator getExecutorIteratorSortedByShortestTotalRunTime() + { + Stream stream = this.executors.stream(); + // returns smaller numbers first + stream = stream.sorted(Comparator.comparingLong((executor) -> executor.totalRuntimeNanos.get())); + return stream.iterator(); + } - /** Shuts down the thread pool immediately, stopping all tasks. */ - public void shutdown() + /** Blocking, shuts down the thread pool immediately, stopping all tasks. */ + public void shutdownNow() { LOGGER.info("Shutting down PriorityTaskPicker thread pool..."); this.isShutDownRef.set(true); try { - this.threadPoolExecutor.shutdown(); - if (!this.threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) + for (int i = 0; i < this.executors.size(); i++) { - this.threadPoolExecutor.shutdownNow(); + Executor executor = this.executors.get(i); + if (executor != null) + { + executor.shutdown(); + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) + { + executor.shutdownNow(); + } + } } } catch (InterruptedException e) @@ -132,68 +152,158 @@ public class PriorityTaskPicker // helper classes // //================// - public class Executor extends AbstractExecutorService + /** + * Each executor handles a specific type of work that DH needs done. + * By separating out task into its own executor it allows for easier performance monitoring + * via a tool like Visual VM and fairly spreading out CPU time between tasks. + */ + public static class Executor extends AbstractExecutorService implements IConfigListener { - private final Queue tasks = new ConcurrentLinkedQueue<>(); + private final PriorityTaskPicker parentTaskPicker; + private final String name; + + private final Queue taskQueue = new ConcurrentLinkedQueue<>(); + + private final AtomicInteger runningTasksRef = new AtomicInteger(0); + private final AtomicInteger completedTasksRef = new AtomicInteger(0); - private final AtomicInteger runningTasks = new AtomicInteger(0); - private final AtomicInteger completedTasks = new AtomicInteger(0); - private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200); private final AtomicLong totalRuntimeNanos = new AtomicLong(0); + /** used for performance logging */ + private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200); + /** holds the threads this {@link Executor} can run */ + private RateLimitedThreadPoolExecutor threadPoolExecutor; + + + + //=============// + // constructor // + //=============// + + public Executor(PriorityTaskPicker parentTaskPicker, String name) + { + this.parentTaskPicker = parentTaskPicker; + this.name = name; + + this.threadPoolExecutor = this.createThreadPool(); + + Config.Common.MultiThreading.numberOfThreads.addListener(this); + } + + private RateLimitedThreadPoolExecutor createThreadPool() + { + return new RateLimitedThreadPoolExecutor( + Config.Common.MultiThreading.numberOfThreads.get(), + new DhThreadFactory(this.name, Thread.MIN_PRIORITY, false), + new ArrayBlockingQueue<>(Runtime.getRuntime().availableProcessors()) + ); + } + + + + //=================// + // config handling // + //=================// + + @Override + public void onConfigValueSet() + { + RateLimitedThreadPoolExecutor oldExecutor = this.threadPoolExecutor; + this.threadPoolExecutor = this.createThreadPool(); + + // shut down the old executor after replacing it with the new one + // to make sure no tasks are lost in the transfer + if (oldExecutor != null) + { + oldExecutor.shutdown(); + } + } + + + + //=====================// + // task queue handling // + //=====================// @Override public void execute(@NotNull Runnable command) { - this.tasks.add(new TrackedRunnable(this, command)); + this.taskQueue.add(new TrackedRunnable(this.parentTaskPicker, this, command)); - // Attempt to pick up the task immediately - PriorityTaskPicker.this.tryStartNextTask(); + // Attempt to start the task immediately + this.parentTaskPicker.tryStartNextTask(); + } + + /** The passed in {@link Runnable} must be exactly the same as the one passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */ + public void remove(@NotNull Runnable command) { this.taskQueue.removeIf(trackedRunnable -> trackedRunnable.command == command); } + + + public void runTask(@NotNull Runnable command) + { + this.threadPoolExecutor.execute(command); + this.runningTasksRef.getAndIncrement(); } - public int getQueueSize() { return this.tasks.size(); } - public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); } + public int getQueueSize() { return this.taskQueue.size(); } + public int getPoolSize() { return Config.Common.MultiThreading.numberOfThreads.get(); } - public int getRunningTaskCount() { return this.runningTasks.get(); } - public int getCompletedTaskCount() { return this.completedTasks.get(); } + public int getRunningTaskCount() { return this.runningTasksRef.get(); } + public int getCompletedTaskCount() { return this.completedTasksRef.get(); } /** Will return NaN if nothing has been submitted yet */ public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); } - /** The passed in {@link Runnable} must be exactly the same as the one passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */ - public void remove(@NotNull Runnable command) { this.tasks.removeIf(trackedRunnable -> trackedRunnable.command == command); } + + //==========// + // shutdown // + //==========// @Override - public void shutdown() { throw new UnsupportedOperationException(); } + public void shutdown() { this.threadPoolExecutor.shutdown(); } @Override - public @NotNull List shutdownNow() { throw new UnsupportedOperationException(); } + public @NotNull List shutdownNow() { return this.threadPoolExecutor.shutdownNow(); } @Override - public boolean isShutdown() { return false; } + public boolean isShutdown() { return this.threadPoolExecutor.isShutdown(); } @Override - public boolean isTerminated() { return false; } + public boolean isTerminated() { return this.threadPoolExecutor.isTerminated(); } @Override - public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) { return false; } + public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException + { return this.threadPoolExecutor.awaitTermination(timeout, unit); } } /** used so we can {@link PriorityTaskPicker.Executor#remove(Runnable)} using the original {@link Runnable} */ - private class TrackedRunnable implements Runnable + private static class TrackedRunnable implements Runnable { + private final PriorityTaskPicker parentTaskPicker; private final Executor executor; /** the runnable passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */ public final Runnable command; - public TrackedRunnable(Executor executor, Runnable command) + + + //=============// + // constructor // + //=============// + + public TrackedRunnable(PriorityTaskPicker parentTaskPicker, Executor executor, Runnable command) { + this.parentTaskPicker = parentTaskPicker; this.executor = executor; this.command = command; } + + + //=========// + // running // + //=========// + @Override public void run() { @@ -208,16 +318,17 @@ public class PriorityTaskPicker this.executor.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed)); // Update variables related to task status - PriorityTaskPicker.this.occupiedThreads.getAndDecrement(); - this.executor.runningTasks.getAndDecrement(); - this.executor.completedTasks.getAndIncrement(); + this.parentTaskPicker.occupiedThreadsRef.getAndDecrement(); + this.executor.runningTasksRef.getAndDecrement(); + this.executor.completedTasksRef.getAndIncrement(); this.executor.totalRuntimeNanos.addAndGet(timeElapsed); - // Attempt to start another task - PriorityTaskPicker.this.tryStartNextTask(); + this.parentTaskPicker.tryStartNextTask(); } } } + + } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java index 0d94b9384..dc932088a 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java @@ -96,7 +96,7 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor * Deprecated since most of the time this doesn't do what we want or need. * In James testing any tasks started with {@link CompletableFuture#runAsync(Runnable, Executor)} * or {@link CompletableFuture#supplyAsync(Supplier, Executor)} converted the {@link Runnable} - * and {@link CompletableFuture} into objects that didn't support being cancled and removed + * and {@link CompletableFuture} into objects that didn't support being canceled and removed * from the queue. The canceled tasks were correctly never run, but couldn't be purged. */ @Deprecated diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java index d9cd5e348..21d236c12 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java @@ -45,6 +45,10 @@ public class ThreadPoolUtil @Nullable public static PriorityTaskPicker.Executor getFileHandlerExecutor() { return fileHandlerThreadPool; } + private static PriorityTaskPicker.Executor renderSectionLoadThreadPool; + @Nullable + public static PriorityTaskPicker.Executor getRenderLoadingExecutor() { return renderSectionLoadThreadPool; } + private static PriorityTaskPicker.Executor updatePropagatorThreadPool; @Nullable public static PriorityTaskPicker.Executor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool; } @@ -95,15 +99,16 @@ public class ThreadPoolUtil if (taskPicker != null) { - taskPicker.shutdown(); + taskPicker.shutdownNow(); } taskPicker = new PriorityTaskPicker(); - networkCompressionThreadPool = taskPicker.createExecutor(); - fileHandlerThreadPool = taskPicker.createExecutor(); - chunkToLodBuilderThreadPool = taskPicker.createExecutor(); - updatePropagatorThreadPool = taskPicker.createExecutor(); - worldGenThreadPool = taskPicker.createExecutor(); + networkCompressionThreadPool = taskPicker.createExecutor("Network"); + fileHandlerThreadPool = taskPicker.createExecutor("IO"); + renderSectionLoadThreadPool = taskPicker.createExecutor("Render Loader"); + chunkToLodBuilderThreadPool = taskPicker.createExecutor("LOD Builder"); + updatePropagatorThreadPool = taskPicker.createExecutor("Update Propagator"); + worldGenThreadPool = taskPicker.createExecutor("World Gen"); @@ -128,7 +133,7 @@ public class ThreadPoolUtil public static void shutdownThreadPools() { // standalone threads - taskPicker.shutdown(); + taskPicker.shutdownNow(); beaconCullingThreadPool.shutdown(); fullDataMigrationThreadPool.shutdown(); }