Improve chunk processing throughput

This commit is contained in:
James Seibel
2025-01-26 17:05:37 -06:00
parent 2d1859c77d
commit dd3903f66e
@@ -50,6 +50,7 @@ import org.jetbrains.annotations.Nullable;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/** Contains code and variables used by both {@link ClientApi} and {@link ServerApi} */
public class SharedApi
@@ -72,6 +73,12 @@ public class SharedApi
*/
private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD_AND_PLAYER = 1_000;
/**
* Used to limit how many tasks are queued at once.
* This is done so the chunks nearest to the player will trigger first.
*/
private static final AtomicInteger GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF = new AtomicInteger(0);
/** how many milliseconds must pass before an overloaded message can be sent in chat or the log */
private static final int MIN_MS_BETWEEN_OVERLOADED_LOG_MESSAGE = 30_000;
@@ -123,6 +130,7 @@ public class SharedApi
AbstractDhRepo.closeAllConnections();
// needs to be closed on world shutdown to clear out un-processed chunks
UPDATE_POS_MANAGER.clear();
GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.set(0);
// recommend that the garbage collector cleans up any objects from the old world and thread pools
System.gc();
@@ -352,39 +360,54 @@ public class SharedApi
tryStartNextQueuedChunk();
}
/**
* TODO this is bad logic.
* This should be fired on a separate looping thread or something else.
* Only firing whenever a new chunk is added to the queue can cause the queue to miss things.
*/
private static void tryStartNextQueuedChunk()
{
// queue updates up to the number of CPU cores allocated for the job
// (this prevents doing extra work queuing tasks that may not be necessary)
// and makes sure the chunks closest to the player are updated first
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor != null && executor.getQueueSize() < executor.getPoolSize())
{
try
{
executor.execute(SharedApi::processQueuedChunkUpdate);
}
catch (RejectedExecutionException ignore)
{
// the executor was shut down, it should be back up shortly and able to accept new jobs
}
}
}
private static void processQueuedChunkUpdate()
{
//LOGGER.trace(chunkWrapper.getChunkPos() + " " + executor.getActiveCount() + " / " + executor.getQueue().size() + " - " + executor.getCompletedTaskCount());
UpdateChunkData updateData = UPDATE_POS_MANAGER.popClosest();
if (updateData == null)
if (executor == null
|| GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.getAndIncrement() > executor.getPoolSize())
{
GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.decrementAndGet();
return;
}
IChunkWrapper chunkWrapper = updateData.chunkWrapper;
@Nullable ArrayList<IChunkWrapper> neighbourChunkList = updateData.neighbourChunkList;
IDhLevel dhLevel = updateData.dhLevel;
try
{
executor.execute(SharedApi::processQueuedChunkUpdate);
}
catch (RejectedExecutionException ignore)
{
// the executor was shut down, it should be back up shortly and able to accept new jobs
GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.decrementAndGet();
}
}
private static void processQueuedChunkUpdate()
{
DhChunkPos updatingChunkPos = null;
try
{
UpdateChunkData updateData = UPDATE_POS_MANAGER.popClosest();
if (updateData == null)
{
return;
}
IChunkWrapper chunkWrapper = updateData.chunkWrapper;
updatingChunkPos = chunkWrapper.getChunkPos();
@Nullable ArrayList<IChunkWrapper> neighbourChunkList = updateData.neighbourChunkList;
IDhLevel dhLevel = updateData.dhLevel;
boolean checkChunkHash = !Config.Common.LodBuilding.disableUnchangedChunkCheck.get();
// check if this chunk has been converted into an LOD already
@@ -438,23 +461,12 @@ public class SharedApi
}
catch (Exception e)
{
LOGGER.error("Unexpected error when updating chunk at pos: [" + chunkWrapper.getChunkPos() + "]", e);
LOGGER.error("Unexpected error when updating chunk at pos: [" + updatingChunkPos + "]", e);
}
finally
{
// queue the next position if there are still positions to process
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor != null && !UPDATE_POS_MANAGER.updateDataByChunkPos.isEmpty())
{
try
{
executor.execute(SharedApi::processQueuedChunkUpdate);
}
catch (RejectedExecutionException ignore)
{
// the executor was shut down, it should be back up shortly and able to accept new jobs
}
}
GLOBAL_UPDATING_CHUNK_TASKS_COUNT_REF.decrementAndGet();
tryStartNextQueuedChunk();
}
}
@@ -609,6 +621,11 @@ public class SharedApi
}
DhChunkPos closest = this.closestQueue.poll();
if (closest == null)
{
return null;
}
this.furthestQueue.remove(closest);
return this.updateDataByChunkPos.remove(closest);
}