Simplify executor task submitting logic
This commit is contained in:
+36
-40
@@ -90,10 +90,9 @@ public class PriorityTaskPicker
|
||||
{
|
||||
Executor executor = this.executorQueue.get(this.nextExecutorQueuePos);
|
||||
|
||||
TrackableRunnerContainer runnableContainer = executor.tasks.poll();
|
||||
if (runnableContainer != null)
|
||||
TrackedRunnable task = executor.tasks.poll();
|
||||
if (task != null)
|
||||
{
|
||||
Runnable task = runnableContainer.wrappedRunnable;
|
||||
try
|
||||
{
|
||||
// Attempt to start another task
|
||||
@@ -155,7 +154,7 @@ public class PriorityTaskPicker
|
||||
|
||||
public class Executor extends AbstractExecutorService
|
||||
{
|
||||
private final Queue<TrackableRunnerContainer> tasks = new ConcurrentLinkedQueue<>();
|
||||
private final Queue<TrackedRunnable> tasks = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private final AtomicInteger runningTasks = new AtomicInteger(0);
|
||||
private final AtomicInteger completedTasks = new AtomicInteger(0);
|
||||
@@ -165,29 +164,7 @@ public class PriorityTaskPicker
|
||||
@Override
|
||||
public void execute(@NotNull Runnable command)
|
||||
{
|
||||
Runnable wrappedRunnable = () ->
|
||||
{
|
||||
long startTime = System.nanoTime();
|
||||
try
|
||||
{
|
||||
command.run();
|
||||
}
|
||||
finally
|
||||
{
|
||||
long timeElapsed = System.nanoTime() - startTime;
|
||||
this.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed));
|
||||
|
||||
// Update variables related to task status
|
||||
PriorityTaskPicker.this.occupiedThreads.getAndDecrement();
|
||||
this.runningTasks.getAndDecrement();
|
||||
this.completedTasks.getAndIncrement();
|
||||
|
||||
// Attempt to start another task
|
||||
PriorityTaskPicker.this.tryStartNextTask();
|
||||
}
|
||||
};
|
||||
this.tasks.add(new TrackableRunnerContainer(command, wrappedRunnable));
|
||||
|
||||
this.tasks.add(new TrackedRunnable(this, command));
|
||||
|
||||
// Attempt to pick up the task immediately
|
||||
PriorityTaskPicker.this.tryStartNextTask();
|
||||
@@ -204,8 +181,7 @@ public class PriorityTaskPicker
|
||||
|
||||
|
||||
/** The passed in {@link Runnable} must be exactly the same as the one passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */
|
||||
public void remove(@NotNull Runnable command)
|
||||
{ this.tasks.removeIf((pair) -> pair.originalRunnable.equals(command)); }
|
||||
public void remove(@NotNull Runnable command) { this.tasks.removeIf(trackedRunnable -> trackedRunnable.command == command); }
|
||||
|
||||
@Override
|
||||
public void shutdown() { throw new UnsupportedOperationException(); }
|
||||
@@ -224,20 +200,40 @@ public class PriorityTaskPicker
|
||||
}
|
||||
|
||||
/** used so we can {@link PriorityTaskPicker.Executor#remove(Runnable)} using the original {@link Runnable} */
|
||||
private static class TrackableRunnerContainer
|
||||
private class TrackedRunnable implements Runnable
|
||||
{
|
||||
/** the runnable passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */
|
||||
public final Runnable originalRunnable;
|
||||
/**
|
||||
* the runnable actually triggered by the {@link PriorityTaskPicker.Executor}
|
||||
* contains additional logic necessary to run the exector.
|
||||
*/
|
||||
public final Runnable wrappedRunnable;
|
||||
private final Executor executor;
|
||||
|
||||
public TrackableRunnerContainer(Runnable originalRunnable, Runnable wrappedRunnable)
|
||||
/** the runnable passed into {@link PriorityTaskPicker.Executor#execute(Runnable)} */
|
||||
public final Runnable command;
|
||||
|
||||
public TrackedRunnable(Executor executor, Runnable command)
|
||||
{
|
||||
this.originalRunnable = originalRunnable;
|
||||
this.wrappedRunnable = wrappedRunnable;
|
||||
this.executor = executor;
|
||||
this.command = command;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
long startTime = System.nanoTime();
|
||||
try
|
||||
{
|
||||
this.command.run();
|
||||
}
|
||||
finally
|
||||
{
|
||||
long timeElapsed = System.nanoTime() - startTime;
|
||||
this.executor.runTimeInMsRollingAverage.addValue(TimeUnit.NANOSECONDS.toMillis(timeElapsed));
|
||||
|
||||
// Update variables related to task status
|
||||
PriorityTaskPicker.this.occupiedThreads.getAndDecrement();
|
||||
this.executor.runningTasks.getAndDecrement();
|
||||
this.executor.completedTasks.getAndIncrement();
|
||||
|
||||
// Attempt to start another task
|
||||
PriorityTaskPicker.this.tryStartNextTask();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user