diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java index ac3aa6b7d..68a5a47a5 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PriorityTaskPicker.java @@ -3,13 +3,12 @@ package com.seibel.distanthorizons.core.util.threading; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.config.types.ConfigEntry; import com.seibel.distanthorizons.core.util.objects.RollingAverage; -import com.seibel.distanthorizons.coreapi.util.BitShiftUtil; import org.jetbrains.annotations.NotNull; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; public class PriorityTaskPicker @@ -22,14 +21,11 @@ public class PriorityTaskPicker new ArrayBlockingQueue<>(this.threadCountConfig.getMax()) ); - // Queue of executors, used to distribute tasks across executors based on priority - private final ArrayList executorQueue = new ArrayList<>(); - private int nextExecutorQueuePos = 0; + // List of executors + private final ArrayList executors = new ArrayList<>(); // Lock to ensure task picking logic is thread-safe private final ReentrantLock taskPickerLock = new ReentrantLock(); - // Indicates whether a task picking attempt is needed - private final AtomicBoolean shouldPickTask = new AtomicBoolean(false); // Tracks the number of active threads private final AtomicInteger occupiedThreads = new AtomicInteger(0); @@ -42,26 +38,14 @@ public class PriorityTaskPicker //==================// /** - * Creates an executor with a specific priority. - * Higher priority executors have more exponentially entries in the distribution queue, giving them a greater chance to run tasks. + * Creates an executor. * - * @param priority the priority level of the executor * @return a newly created Executor */ - public Executor createExecutor(int priority) + public Executor createExecutor() { Executor executor = new Executor(); - - int entriesToAdd = BitShiftUtil.powerOfTwo(priority); - int gapBetweenEntries = (int) (1 / (double) entriesToAdd * this.executorQueue.size()); - - // Distribute the executor's entries in the queue, ensuring fair distribution - for (; entriesToAdd > 0; entriesToAdd--) - { - this.executorQueue.add(executor); - Collections.rotate(this.executorQueue, -gapBetweenEntries); - } - + this.executors.add(executor); return executor; } @@ -71,33 +55,15 @@ public class PriorityTaskPicker */ private void tryStartNextTask() { - this.shouldPickTask.set(true); - - while (this.taskPickerLock.tryLock()) + if (this.taskPickerLock.tryLock()) { try { - // Exit if there's no longer a need to pick a task - if (!this.shouldPickTask.compareAndSet(true, false)) + for (Executor executor : (Iterable) this.executors.stream().sorted(Comparator.comparingLong(executor -> executor.totalRuntimeNanos.get()))::iterator) { - // There is a small chance for a task to end up in a 'limbo' state, - // when this.shouldPickTask got set to true right here and this.taskPickerLock is not unlocked yet, - // but we'll disregard that since tasks get added often enough for this to not be an issue + TrackedRunnable task; - return; - } - - // Iterate over the executors in the queue, attempting to start tasks - for ( - int taskPickAttempts = 0; - taskPickAttempts < this.executorQueue.size() && this.occupiedThreads.get() < this.threadCountConfig.get(); - taskPickAttempts++, this.nextExecutorQueuePos = (this.nextExecutorQueuePos + 1) % this.executorQueue.size() - ) - { - Executor executor = this.executorQueue.get(this.nextExecutorQueuePos); - - TrackedRunnable task = executor.tasks.poll(); - if (task != null) + while (this.occupiedThreads.get() < this.threadCountConfig.get() && (task = executor.tasks.poll()) != null) { try { @@ -107,9 +73,6 @@ public class PriorityTaskPicker // Update variables related to task status this.occupiedThreads.getAndIncrement(); executor.runningTasks.getAndIncrement(); - - // Prevent exiting early since there might be more than this.executorQueue.size() tasks waiting in queue - taskPickAttempts = 0; } catch (RejectedExecutionException e) { @@ -170,6 +133,7 @@ public class PriorityTaskPicker private final AtomicInteger runningTasks = new AtomicInteger(0); private final AtomicInteger completedTasks = new AtomicInteger(0); private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200); + private final AtomicLong totalRuntimeNanos = new AtomicLong(0); @Override @@ -241,6 +205,7 @@ public class PriorityTaskPicker PriorityTaskPicker.this.occupiedThreads.getAndDecrement(); this.executor.runningTasks.getAndDecrement(); this.executor.completedTasks.getAndIncrement(); + this.executor.totalRuntimeNanos.addAndGet(timeElapsed); // Attempt to start another task PriorityTaskPicker.this.tryStartNextTask(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java index d9356e16d..fd8ab2291 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java @@ -92,18 +92,11 @@ public class ThreadPoolUtil // thread pools taskPicker = new PriorityTaskPicker(); - // IO should never be stuck waiting for something else to complete - networkCompressionThreadPool = taskPicker.createExecutor(4); - fileHandlerThreadPool = taskPicker.createExecutor(4); - - // Normal priority tasks - chunkToLodBuilderThreadPool = taskPicker.createExecutor(3); - updatePropagatorThreadPool = taskPicker.createExecutor(2); - - // World gen tasks are heavy and nothing strictly depends on them, so they may wait a bit - worldGenThreadPool = taskPicker.createExecutor(0); - - + networkCompressionThreadPool = taskPicker.createExecutor(); + fileHandlerThreadPool = taskPicker.createExecutor(); + chunkToLodBuilderThreadPool = taskPicker.createExecutor(); + updatePropagatorThreadPool = taskPicker.createExecutor(); + worldGenThreadPool = taskPicker.createExecutor(); // single thread pools beaconCullingThreadPool = ThreadUtil.makeSingleThreadPool(BEACON_CULLING_THREAD_NAME);