Add more comments

This commit is contained in:
s809
2025-01-07 00:05:16 +05:00
parent 14cd0c4b09
commit c49a61f118
2 changed files with 43 additions and 10 deletions
@@ -21,14 +21,24 @@ public class PriorityTaskPicker
new ArrayBlockingQueue<>(this.threadCountConfig.getMax()) new ArrayBlockingQueue<>(this.threadCountConfig.getMax())
); );
// Queue of executors, used to distribute tasks across executors based on priority
private final ArrayList<Executor> executorQueue = new ArrayList<>(); private final ArrayList<Executor> executorQueue = new ArrayList<>();
private int nextExecutorQueuePos = 0; private int nextExecutorQueuePos = 0;
// Lock to ensure task picking logic is thread-safe
private final ReentrantLock taskPickerLock = new ReentrantLock(); private final ReentrantLock taskPickerLock = new ReentrantLock();
// Indicates whether a task picking attempt is needed
private final AtomicBoolean shouldPickTask = new AtomicBoolean(false); private final AtomicBoolean shouldPickTask = new AtomicBoolean(false);
// Tracks the number of active threads
private final AtomicInteger occupiedThreads = new AtomicInteger(0); private final AtomicInteger occupiedThreads = new AtomicInteger(0);
/**
* Creates an executor with a specific priority.
* Higher priority executors have more entries in the distribution queue, giving them a greater chance to run tasks.
*
* @param priority the priority level of the executor
* @return a newly created Executor
*/
public Executor createExecutor(int priority) public Executor createExecutor(int priority)
{ {
Executor executor = new Executor(); Executor executor = new Executor();
@@ -36,6 +46,7 @@ public class PriorityTaskPicker
int entriesToAdd = priority + 1; int entriesToAdd = priority + 1;
int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size()); int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size());
// Distribute the executor's entries in the queue, ensuring fair distribution
for (; entriesToAdd > 0; entriesToAdd--) for (; entriesToAdd > 0; entriesToAdd--)
{ {
this.executorQueue.add(executor); this.executorQueue.add(executor);
@@ -45,6 +56,10 @@ public class PriorityTaskPicker
return executor; return executor;
} }
/**
* Tries to start the next task by iterating over executors in the queue.
* Ensures thread limits are respected and only one thread iterates over the executorQueue at a time.
*/
private void tryStartNextTask() private void tryStartNextTask()
{ {
this.shouldPickTask.set(true); this.shouldPickTask.set(true);
@@ -53,16 +68,16 @@ public class PriorityTaskPicker
{ {
try try
{ {
// Exit if there's no longer a need to pick a task
if (!this.shouldPickTask.compareAndSet(true, false)) if (!this.shouldPickTask.compareAndSet(true, false))
{ {
return; return;
} }
int threadCount = this.threadCountConfig.get(); // Iterate over the executors in the queue, attempting to start tasks
for ( for (
int counter = 0; int counter = 0;
counter < this.executorQueue.size() && this.occupiedThreads.get() < threadCount; counter < this.executorQueue.size() && this.occupiedThreads.get() < this.threadCountConfig.get();
counter++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size() counter++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size()
) )
{ {
@@ -71,8 +86,11 @@ public class PriorityTaskPicker
Runnable task = executor.tasks.poll(); Runnable task = executor.tasks.poll();
if (task != null) if (task != null)
{ {
// Update variables related to task status
this.occupiedThreads.getAndIncrement(); this.occupiedThreads.getAndIncrement();
executor.runningTasks.getAndIncrement(); executor.runningTasks.getAndIncrement();
// Prevent exiting early since there might be more than this.executorQueue.size() tasks waiting in queue
counter--; counter--;
this.threadPoolExecutor.execute(task); this.threadPoolExecutor.execute(task);
@@ -82,10 +100,15 @@ public class PriorityTaskPicker
finally finally
{ {
this.taskPickerLock.unlock(); this.taskPickerLock.unlock();
// If someone else manages to pick up a lock before us, we'll leave early, and they will do our work
} }
} }
} }
/**
* Shuts down the thread pool immediately, stopping all tasks.
*/
public void shutdown() public void shutdown()
{ {
this.threadPoolExecutor.shutdownNow(); this.threadPoolExecutor.shutdownNow();
@@ -115,23 +138,27 @@ public class PriorityTaskPicker
long timeElapsed = System.nanoTime() - startTime; long timeElapsed = System.nanoTime() - startTime;
this.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed)); this.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed));
// Update variables related to task status
PriorityTaskPicker.this.occupiedThreads.getAndDecrement(); PriorityTaskPicker.this.occupiedThreads.getAndDecrement();
this.runningTasks.getAndDecrement(); this.runningTasks.getAndDecrement();
this.completedTasks.getAndIncrement(); this.completedTasks.getAndIncrement();
// Attempt to start another task
PriorityTaskPicker.this.tryStartNextTask(); PriorityTaskPicker.this.tryStartNextTask();
} }
}); });
// Attempt to pick up the task immediately
PriorityTaskPicker.this.tryStartNextTask(); PriorityTaskPicker.this.tryStartNextTask();
} }
public int getQueueSize() { return this.tasks.size(); } public int getQueueSize() { return this.tasks.size(); }
public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); } public int getPoolSize() { return PriorityTaskPicker.this.threadCountConfig.get(); }
public int getRunningTaskCount() { return this.runningTasks.get(); } public int getRunningTaskCount() { return this.runningTasks.get(); }
public int getCompletedTaskCount() { return this.completedTasks.get(); } public int getCompletedTaskCount() { return this.completedTasks.get(); }
/** will return Nan if nothing has been submitted yet */ /** Will return NaN if nothing has been submitted yet */
public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); } public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); }
@@ -89,11 +89,17 @@ public class ThreadPoolUtil
{ {
// thread pools // thread pools
taskPicker = new PriorityTaskPicker(); taskPicker = new PriorityTaskPicker();
networkCompressionThreadPool = taskPicker.createExecutor(3); // Data should never pile up waiting to be sent
fileHandlerThreadPool = taskPicker.createExecutor(3); // loading in new LODs is second-highest priority // IO should never be stuck waiting for something else to complete
chunkToLodBuilderThreadPool = taskPicker.createExecutor(2); // We want to make sure any chunk changes are found networkCompressionThreadPool = taskPicker.createExecutor(3);
updatePropagatorThreadPool = taskPicker.createExecutor(2); // update propagation needs to be slightly higher priority than world gen fileHandlerThreadPool = taskPicker.createExecutor(3);
worldGenThreadPool = taskPicker.createExecutor(1); // higher priorities mean the threads will run first
// Normal priority tasks
chunkToLodBuilderThreadPool = taskPicker.createExecutor(2);
updatePropagatorThreadPool = taskPicker.createExecutor(2);
// World gen tasks are heavy and nothing strictly depends on them, so it may wait a bit
worldGenThreadPool = taskPicker.createExecutor(1);