Fix update propagator only re-queueing once queue empty

This commit is contained in:
James Seibel
2026-02-08 19:53:03 -06:00
parent 8afe388eb6
commit 92d9e631a7
@@ -34,7 +34,7 @@ public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseab
/** indicates how long the update queue thread should wait between queuing ticks */
protected static final int PROPAGATE_QUEUE_THREAD_DELAY_IN_MS = 250;
public static final int NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD = 5;
public static final int NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD = 10;
/** how many parent update tasks can be in the queue at once */
public static int getMaxPropagateTaskCount() { return NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get(); }
@@ -135,121 +135,129 @@ public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseab
int maxUpdateTaskCount = getMaxPropagateTaskCount();
// queue parent updates
if (executor.getQueueSize() < maxUpdateTaskCount
&& this.updatingPosSet.size() < maxUpdateTaskCount)
if (executor.getQueueSize() > maxUpdateTaskCount
|| this.updatingPosSet.size() > maxUpdateTaskCount)
{
// get the positions that need to be applied to their parents
LongArrayList parentUpdatePosList = this.provider.repo.getPositionsToUpdate(targetBlockPos.getX(), targetBlockPos.getZ(), maxUpdateTaskCount);
// combine updates together based on their parent
HashMap<Long, HashSet<Long>> updatePosByParentPos = new HashMap<>();
for (Long pos : parentUpdatePosList)
return;
}
// get the positions that need to be applied to their parents
LongArrayList parentUpdatePosList = this.provider.repo.getPositionsToUpdate(targetBlockPos.getX(), targetBlockPos.getZ(), maxUpdateTaskCount);
// combine updates together based on their parent
HashMap<Long, HashSet<Long>> updatePosByParentPos = new HashMap<>();
for (Long pos : parentUpdatePosList)
{
updatePosByParentPos.compute(DhSectionPos.getParentPos(pos), (parentPos, updatePosSet) ->
{
updatePosByParentPos.compute(DhSectionPos.getParentPos(pos), (parentPos, updatePosSet) ->
if (updatePosSet == null)
{
if (updatePosSet == null)
{
updatePosSet = new HashSet<>();
}
updatePosSet.add(pos);
return updatePosSet;
});
updatePosSet = new HashSet<>();
}
updatePosSet.add(pos);
return updatePosSet;
});
}
// queue the updates
for (Long parentUpdatePos : updatePosByParentPos.keySet())
{
// stop if there are already a bunch of updates queued
if (this.updatingPosSet.size() > maxUpdateTaskCount
|| executor.getQueueSize() > maxUpdateTaskCount)
{
break;
}
// queue the updates
for (Long parentUpdatePos : updatePosByParentPos.keySet())
// skip any already-queued positions
if (!this.updatingPosSet.add(parentUpdatePos))
{
// stop if there are already a bunch of updates queued
if (this.updatingPosSet.size() > maxUpdateTaskCount
|| executor.getQueueSize() > maxUpdateTaskCount
|| !this.updatingPosSet.add(parentUpdatePos))
continue;
}
try
{
executor.execute(() ->
{
break;
}
try
{
executor.execute(() ->
ReentrantLock parentWriteLock = this.dataUpdater.updateLockProvider.getLock(parentUpdatePos);
boolean parentLocked = false;
try
{
ReentrantLock parentWriteLock = this.dataUpdater.updateLockProvider.getLock(parentUpdatePos);
boolean parentLocked = false;
try
//LOGGER.info("updating parent: "+parentUpdatePos);
// Locking the parent before the children should prevent deadlocks.
// TryLock is used instead of lock so this thread can handle a different update.
if (parentWriteLock.tryLock())
{
//LOGGER.info("updating parent: "+parentUpdatePos);
parentLocked = true;
this.dataUpdater.lockedPosSet.add(parentUpdatePos);
// Locking the parent before the children should prevent deadlocks.
// TryLock is used instead of lock so this thread can handle a different update.
if (parentWriteLock.tryLock())
try (FullDataSourceV2 parentDataSource = this.provider.get(parentUpdatePos))
{
parentLocked = true;
this.dataUpdater.lockedPosSet.add(parentUpdatePos);
try (FullDataSourceV2 parentDataSource = this.provider.get(parentUpdatePos))
// will return null if the file handler is shutting down
if (parentDataSource != null)
{
// will return null if the file handler is shutting down
if (parentDataSource != null)
// apply each child pos to the parent
for (Long childPos : updatePosByParentPos.get(parentUpdatePos))
{
// apply each child pos to the parent
for (Long childPos : updatePosByParentPos.get(parentUpdatePos))
ReentrantLock childReadLock = this.dataUpdater.updateLockProvider.getLock(childPos);
try
{
ReentrantLock childReadLock = this.dataUpdater.updateLockProvider.getLock(childPos);
try
childReadLock.lock();
this.dataUpdater.lockedPosSet.add(childPos);
try (FullDataSourceV2 childDataSource = this.provider.get(childPos))
{
childReadLock.lock();
this.dataUpdater.lockedPosSet.add(childPos);
try (FullDataSourceV2 childDataSource = this.provider.get(childPos))
// can return null when the file handler is being shut down
if (childDataSource != null)
{
// can return null when the file handler is being shut down
if (childDataSource != null)
{
parentDataSource.updateFromDataSource(childDataSource);
}
parentDataSource.updateFromDataSource(childDataSource);
}
}
catch (Exception e)
{
LOGGER.error("Unexpected in parent update propagation for parent pos: ["+DhSectionPos.toString(parentUpdatePos)+"], child pos: [" + DhSectionPos.toString(parentUpdatePos) + "], Error: [" + e.getMessage() + "].", e);
}
finally
{
this.provider.repo.setApplyToParent(childPos, false);
childReadLock.unlock();
this.dataUpdater.lockedPosSet.remove(childPos);
}
}
if (DhSectionPos.getDetailLevel(parentUpdatePos) < FullDataSourceProviderV2.ROOT_SECTION_DETAIL_LEVEL)
catch (Exception e)
{
parentDataSource.applyToParent = true;
LOGGER.error("Unexpected in parent update propagation for parent pos: ["+DhSectionPos.toString(parentUpdatePos)+"], child pos: [" + DhSectionPos.toString(parentUpdatePos) + "], Error: [" + e.getMessage() + "].", e);
}
finally
{
this.provider.repo.setApplyToParent(childPos, false);
childReadLock.unlock();
this.dataUpdater.lockedPosSet.remove(childPos);
}
this.dataUpdater.updateDataSource(parentDataSource);
}
if (DhSectionPos.getDetailLevel(parentUpdatePos) < FullDataSourceProviderV2.ROOT_SECTION_DETAIL_LEVEL)
{
parentDataSource.applyToParent = true;
}
this.dataUpdater.updateDataSource(parentDataSource);
}
}
}
finally
}
finally
{
if (parentLocked)
{
if (parentLocked)
{
parentWriteLock.unlock();
this.dataUpdater.lockedPosSet.remove(parentUpdatePos);
}
this.updatingPosSet.remove(parentUpdatePos);
parentWriteLock.unlock();
this.dataUpdater.lockedPosSet.remove(parentUpdatePos);
}
});
}
catch (RejectedExecutionException ignore)
{ /* the executor was shut down, it should be back up shortly and able to accept new jobs */ }
catch (Exception e)
{
this.updatingPosSet.remove(parentUpdatePos);
throw e;
}
this.updatingPosSet.remove(parentUpdatePos);
}
});
}
catch (RejectedExecutionException ignore)
{ /* the executor was shut down, it should be back up shortly and able to accept new jobs */ }
catch (Exception e)
{
this.updatingPosSet.remove(parentUpdatePos);
throw e;
}
}
}