From c49a61f118e00837efbdca659b5d3744f8f5d406 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Tue, 7 Jan 2025 00:05:16 +0500 Subject: [PATCH] Add more comments --- .../util/threading/PriorityTaskPicker.java | 37 ++++++++++++++++--- .../core/util/threading/ThreadPoolUtil.java | 16 +++++--- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java index effc3d8b6..e7ab21a00 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java @@ -21,14 +21,24 @@ public class PriorityTaskPicker new ArrayBlockingQueue<>(this.threadCountConfig.getMax()) ); + // Queue of executors, used to distribute tasks across executors based on priority private final ArrayList executorQueue = new ArrayList<>(); private int nextExecutorQueuePos = 0; + // Lock to ensure task picking logic is thread-safe private final ReentrantLock taskPickerLock = new ReentrantLock(); + // Indicates whether a task picking attempt is needed private final AtomicBoolean shouldPickTask = new AtomicBoolean(false); + // Tracks the number of active threads private final AtomicInteger occupiedThreads = new AtomicInteger(0); - + /** + * Creates an executor with a specific priority. + * Higher priority executors have more entries in the distribution queue, giving them a greater chance to run tasks. + * + * @param priority the priority level of the executor + * @return a newly created Executor + */ public Executor createExecutor(int priority) { Executor executor = new Executor(); @@ -36,6 +46,7 @@ public class PriorityTaskPicker int entriesToAdd = priority + 1; int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size()); + // Distribute the executor's entries in the queue, ensuring fair distribution for (; entriesToAdd > 0; entriesToAdd--) { this.executorQueue.add(executor); @@ -45,6 +56,10 @@ public class PriorityTaskPicker return executor; } + /** + * Tries to start the next task by iterating over executors in the queue. + * Ensures thread limits are respected and only one thread iterates over the executorQueue at a time. + */ private void tryStartNextTask() { this.shouldPickTask.set(true); @@ -53,16 +68,16 @@ public class PriorityTaskPicker { try { + // Exit if there's no longer a need to pick a task if (!this.shouldPickTask.compareAndSet(true, false)) { return; } - int threadCount = this.threadCountConfig.get(); - + // Iterate over the executors in the queue, attempting to start tasks for ( int counter = 0; - counter < this.executorQueue.size() && this.occupiedThreads.get() < threadCount; + counter < this.executorQueue.size() && this.occupiedThreads.get() < this.threadCountConfig.get(); counter++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size() ) { @@ -71,8 +86,11 @@ public class PriorityTaskPicker Runnable task = executor.tasks.poll(); if (task != null) { + // Update variables related to task status this.occupiedThreads.getAndIncrement(); executor.runningTasks.getAndIncrement(); + + // Prevent exiting early since there might be more than this.executorQueue.size() tasks waiting in queue counter--; this.threadPoolExecutor.execute(task); @@ -82,10 +100,15 @@ public class PriorityTaskPicker finally { this.taskPickerLock.unlock(); + + // If someone else manages to pick up a lock before us, we'll leave early, and they will do our work } } } + /** + * Shuts down the thread pool immediately, stopping all tasks. + */ public void shutdown() { this.threadPoolExecutor.shutdownNow(); @@ -115,23 +138,27 @@ public class PriorityTaskPicker long timeElapsed = System.nanoTime() - startTime; this.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed)); + // Update variables related to task status PriorityTaskPicker.this.occupiedThreads.getAndDecrement(); this.runningTasks.getAndDecrement(); this.completedTasks.getAndIncrement(); + // Attempt to start another task PriorityTaskPicker.this.tryStartNextTask(); } }); + // Attempt to pick up the task immediately PriorityTaskPicker.this.tryStartNextTask(); } + public int getQueueSize() { return this.tasks.size(); } public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); } public int getRunningTaskCount() { return this.runningTasks.get(); } public int getCompletedTaskCount() { return this.completedTasks.get(); } - /** will return Nan if nothing has been submitted yet */ + /** Will return NaN if nothing has been submitted yet */ public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java index debdc301f..3d697f9aa 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java @@ -89,11 +89,17 @@ public class ThreadPoolUtil { // thread pools taskPicker = new PriorityTaskPicker(); - networkCompressionThreadPool = taskPicker.createExecutor(3); // Data should never pile up waiting to be sent - fileHandlerThreadPool = taskPicker.createExecutor(3); // loading in new LODs is second-highest priority - chunkToLodBuilderThreadPool = taskPicker.createExecutor(2); // We want to make sure any chunk changes are found - updatePropagatorThreadPool = taskPicker.createExecutor(2); // update propagation needs to be slightly higher priority than world gen - worldGenThreadPool = taskPicker.createExecutor(1); // higher priorities mean the threads will run first + + // IO should never be stuck waiting for something else to complete + networkCompressionThreadPool = taskPicker.createExecutor(3); + fileHandlerThreadPool = taskPicker.createExecutor(3); + + // Normal priority tasks + chunkToLodBuilderThreadPool = taskPicker.createExecutor(2); + updatePropagatorThreadPool = taskPicker.createExecutor(2); + + // World gen tasks are heavy and nothing strictly depends on them, so it may wait a bit + worldGenThreadPool = taskPicker.createExecutor(1);