Balance tasks in thread pool using elapsed time instead of priorities
This commit is contained in:
+12
-47
@@ -3,13 +3,12 @@ package com.seibel.distanthorizons.core.util.threading;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
|
||||
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
|
||||
import com.seibel.distanthorizons.coreapi.util.BitShiftUtil;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class PriorityTaskPicker
|
||||
@@ -22,14 +21,11 @@ public class PriorityTaskPicker
|
||||
new ArrayBlockingQueue<>(this.threadCountConfig.getMax())
|
||||
);
|
||||
|
||||
// Queue of executors, used to distribute tasks across executors based on priority
|
||||
private final ArrayList<Executor> executorQueue = new ArrayList<>();
|
||||
private int nextExecutorQueuePos = 0;
|
||||
// List of executors
|
||||
private final ArrayList<Executor> executors = new ArrayList<>();
|
||||
|
||||
// Lock to ensure task picking logic is thread-safe
|
||||
private final ReentrantLock taskPickerLock = new ReentrantLock();
|
||||
// Indicates whether a task picking attempt is needed
|
||||
private final AtomicBoolean shouldPickTask = new AtomicBoolean(false);
|
||||
// Tracks the number of active threads
|
||||
private final AtomicInteger occupiedThreads = new AtomicInteger(0);
|
||||
|
||||
@@ -42,26 +38,14 @@ public class PriorityTaskPicker
|
||||
//==================//
|
||||
|
||||
/**
|
||||
* Creates an executor with a specific priority.
|
||||
* Higher priority executors have more exponentially entries in the distribution queue, giving them a greater chance to run tasks.
|
||||
* Creates an executor.
|
||||
*
|
||||
* @param priority the priority level of the executor
|
||||
* @return a newly created Executor
|
||||
*/
|
||||
public Executor createExecutor(int priority)
|
||||
public Executor createExecutor()
|
||||
{
|
||||
Executor executor = new Executor();
|
||||
|
||||
int entriesToAdd = BitShiftUtil.powerOfTwo(priority);
|
||||
int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size());
|
||||
|
||||
// Distribute the executor's entries in the queue, ensuring fair distribution
|
||||
for (; entriesToAdd > 0; entriesToAdd--)
|
||||
{
|
||||
this.executorQueue.add(executor);
|
||||
Collections.rotate(this.executorQueue, -gapBetweenEntries);
|
||||
}
|
||||
|
||||
this.executors.add(executor);
|
||||
return executor;
|
||||
}
|
||||
|
||||
@@ -71,33 +55,15 @@ public class PriorityTaskPicker
|
||||
*/
|
||||
private void tryStartNextTask()
|
||||
{
|
||||
this.shouldPickTask.set(true);
|
||||
|
||||
while (this.taskPickerLock.tryLock())
|
||||
if (this.taskPickerLock.tryLock())
|
||||
{
|
||||
try
|
||||
{
|
||||
// Exit if there's no longer a need to pick a task
|
||||
if (!this.shouldPickTask.compareAndSet(true, false))
|
||||
for (Executor executor : (Iterable<? extends Executor>) this.executors.stream().sorted(Comparator.comparingLong(executor -> executor.totalRuntimeNanos.get()))::iterator)
|
||||
{
|
||||
// There is a small chance for a task to end up in a 'limbo' state,
|
||||
// when this.shouldPickTask got set to true right here and this.taskPickerLock is not unlocked yet,
|
||||
// but we'll disregard that since tasks get added often enough for this to not be an issue
|
||||
TrackedRunnable task;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Iterate over the executors in the queue, attempting to start tasks
|
||||
for (
|
||||
int taskPickAttempts = 0;
|
||||
taskPickAttempts < this.executorQueue.size() && this.occupiedThreads.get() < this.threadCountConfig.get();
|
||||
taskPickAttempts++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size()
|
||||
)
|
||||
{
|
||||
Executor executor = this.executorQueue.get(this.nextExecutorQueuePos);
|
||||
|
||||
TrackedRunnable task = executor.tasks.poll();
|
||||
if (task != null)
|
||||
while (this.occupiedThreads.get() < this.threadCountConfig.get() && (task = executor.tasks.poll()) != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -107,9 +73,6 @@ public class PriorityTaskPicker
|
||||
// Update variables related to task status
|
||||
this.occupiedThreads.getAndIncrement();
|
||||
executor.runningTasks.getAndIncrement();
|
||||
|
||||
// Prevent exiting early since there might be more than this.executorQueue.size() tasks waiting in queue
|
||||
taskPickAttempts = 0;
|
||||
}
|
||||
catch (RejectedExecutionException e)
|
||||
{
|
||||
@@ -170,6 +133,7 @@ public class PriorityTaskPicker
|
||||
private final AtomicInteger runningTasks = new AtomicInteger(0);
|
||||
private final AtomicInteger completedTasks = new AtomicInteger(0);
|
||||
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
|
||||
private final AtomicLong totalRuntimeNanos = new AtomicLong(0);
|
||||
|
||||
|
||||
@Override
|
||||
@@ -241,6 +205,7 @@ public class PriorityTaskPicker
|
||||
PriorityTaskPicker.this.occupiedThreads.getAndDecrement();
|
||||
this.executor.runningTasks.getAndDecrement();
|
||||
this.executor.completedTasks.getAndIncrement();
|
||||
this.executor.totalRuntimeNanos.addAndGet(timeElapsed);
|
||||
|
||||
// Attempt to start another task
|
||||
PriorityTaskPicker.this.tryStartNextTask();
|
||||
|
||||
+5
-12
@@ -92,18 +92,11 @@ public class ThreadPoolUtil
|
||||
// thread pools
|
||||
taskPicker = new PriorityTaskPicker();
|
||||
|
||||
// IO should never be stuck waiting for something else to complete
|
||||
networkCompressionThreadPool = taskPicker.createExecutor(4);
|
||||
fileHandlerThreadPool = taskPicker.createExecutor(4);
|
||||
|
||||
// Normal priority tasks
|
||||
chunkToLodBuilderThreadPool = taskPicker.createExecutor(3);
|
||||
updatePropagatorThreadPool = taskPicker.createExecutor(2);
|
||||
|
||||
// World gen tasks are heavy and nothing strictly depends on them, so they may wait a bit
|
||||
worldGenThreadPool = taskPicker.createExecutor(0);
|
||||
|
||||
|
||||
networkCompressionThreadPool = taskPicker.createExecutor();
|
||||
fileHandlerThreadPool = taskPicker.createExecutor();
|
||||
chunkToLodBuilderThreadPool = taskPicker.createExecutor();
|
||||
updatePropagatorThreadPool = taskPicker.createExecutor();
|
||||
worldGenThreadPool = taskPicker.createExecutor();
|
||||
|
||||
// single thread pools
|
||||
beaconCullingThreadPool = ThreadUtil.makeSingleThreadPool(BEACON_CULLING_THREAD_NAME);
|
||||
|
||||
Reference in New Issue
Block a user