Fix paused executors not running tasks
This commit is contained in:
+56
-4
@@ -15,6 +15,7 @@ import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -26,6 +27,21 @@ public class PriorityTaskPicker
|
||||
{
|
||||
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
|
||||
|
||||
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1, new DhThreadFactory("Task Picker Re-queue Schedule", Thread.NORM_PRIORITY, true));
|
||||
/**
|
||||
* If a thread is paused we can end up in a situation where
|
||||
* all the other pools are done, causing {@link PriorityTaskPicker#tryStartNextTask()}
|
||||
* to queue nothing, preventing the paused pools from running until
|
||||
* more work is queued. <br><br>
|
||||
*
|
||||
* This way we can check on those paused threads at a future time
|
||||
* to queue their work.
|
||||
*/
|
||||
private static final int MS_TO_CHECK_ON_PAUSED_THREADS = 1_000;
|
||||
|
||||
private final AtomicReference<ScheduledFuture<?>> scheduledFutureRef = new AtomicReference<>();
|
||||
private final Runnable startNextTaskBlockingRunnable = () -> this.startNextTask(true);
|
||||
|
||||
|
||||
/** the list of currently registered executors */
|
||||
private final ArrayList<Executor> executors = new ArrayList<>();
|
||||
@@ -56,17 +72,32 @@ public class PriorityTaskPicker
|
||||
* Tries to start the next queued task
|
||||
* for one of the available executors.
|
||||
*/
|
||||
private void tryStartNextTask()
|
||||
private void tryStartNextTask() { this.startNextTask(false); }
|
||||
|
||||
private void startNextTask(boolean waitForLock)
|
||||
{
|
||||
// only let one thread start the next task to prevent concurrency errors
|
||||
if (!this.taskPickerLock.tryLock())
|
||||
if (waitForLock)
|
||||
{
|
||||
return;
|
||||
// generally this will be done if an executor is paused,
|
||||
// and we want to check up on it later
|
||||
this.taskPickerLock.lock();
|
||||
}
|
||||
else
|
||||
{
|
||||
// most threads won't need to try queuing the next
|
||||
// task since that means someone else is already working on it
|
||||
if (!this.taskPickerLock.tryLock())
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
boolean executorPaused = false;
|
||||
|
||||
// fill up executors that have run for less time first,
|
||||
// this prevents long-running tasks from taking up all the CPU time
|
||||
Iterator<Executor> iterator = this.getExecutorIteratorSortedByShortestTotalRunTime();
|
||||
@@ -77,7 +108,7 @@ public class PriorityTaskPicker
|
||||
// skip executors that are paused
|
||||
if (!executor.canRun())
|
||||
{
|
||||
// TODO try to re-queue tasks after a timeout
|
||||
executorPaused = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -106,6 +137,27 @@ public class PriorityTaskPicker
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// if an executor is paused then we'll
|
||||
// need to check on it again sometime in the future
|
||||
// otherwise we may not start the next task for a while
|
||||
ScheduledFuture<?> newScheduledFuture = null;
|
||||
if (executorPaused)
|
||||
{
|
||||
newScheduledFuture = SCHEDULED_EXECUTOR_SERVICE.schedule(
|
||||
this.startNextTaskBlockingRunnable,
|
||||
MS_TO_CHECK_ON_PAUSED_THREADS, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
ScheduledFuture<?> oldScheduledFuture = this.scheduledFutureRef.getAndSet(newScheduledFuture);
|
||||
if (oldScheduledFuture != null)
|
||||
{
|
||||
// stop the last scheduled check,
|
||||
// we just checked the queue and will want to wait the full
|
||||
// timeout first
|
||||
oldScheduledFuture.cancel(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
||||
+10
-2
@@ -55,7 +55,6 @@ public class ThreadPoolUtil
|
||||
@Nullable
|
||||
public static PriorityTaskPicker.Executor getUpdatePropagatorExecutor() { return updatePropagatorThreadPool; }
|
||||
|
||||
public static final DhThreadFactory WORLD_GEN_THREAD_FACTORY = new DhThreadFactory("World Gen", Thread.MIN_PRIORITY, false);
|
||||
private static PriorityTaskPicker.Executor worldGenThreadPool;
|
||||
@Nullable
|
||||
public static PriorityTaskPicker.Executor getWorldGenExecutor() { return worldGenThreadPool; }
|
||||
@@ -172,10 +171,19 @@ public class ThreadPoolUtil
|
||||
public static boolean onlyRunThreadIfCameraMovingSlowly()
|
||||
{
|
||||
double cameraSpeed = ClientApi.INSTANCE.cameraSpeedRollingAverage.getAverage();
|
||||
// stop these threads if moving a little bit slower than max elytra speed
|
||||
double maxAllowedSpeed = (LodUtil.ROCKET_ELYTRA_SPEED_IN_BLOCKS_PER_SEC - 10.0);
|
||||
if (cameraSpeed > maxAllowedSpeed)
|
||||
{
|
||||
// pause this thread pool if the user is moving too fast
|
||||
// pause if the user is moving too fast
|
||||
return false;
|
||||
}
|
||||
|
||||
PriorityTaskPicker.Executor executor = getRenderLoadingExecutor();
|
||||
if (executor != null
|
||||
&& executor.getQueueSize() > 0)
|
||||
{
|
||||
// pause if LODs are being loaded for rendering
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user