Move networked chunk updating to the LodBuilder thread

This is done both to prevent starvation, infinitely growing tasks/memory, and simplify the AbstractDhServerLevel.updateDataSourcesAsync() method.
This commit is contained in:
James Seibel
2025-01-04 09:40:01 -06:00
parent 4dd2faad67
commit 6a9986ccd3
6 changed files with 66 additions and 60 deletions
@@ -5,7 +5,6 @@ import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.dto.IBaseDTO;
import com.seibel.distanthorizons.core.util.LodUtil;
@@ -70,6 +69,8 @@ public abstract class AbstractDataSourceHandler
public final ArrayList<IDataSourceUpdateFunc<TDataSource>> dateSourceUpdateListeners = new ArrayList<>();
public final ConcurrentHashMap<Long, CompletableFuture<Void>> updateDataSourceFutureByPos = new ConcurrentHashMap<>();
//=============//
@@ -187,7 +188,7 @@ public abstract class AbstractDataSourceHandler
public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource)
{
ThreadPoolExecutor executor = ThreadPoolUtil.getUpdatePropagatorExecutor();
ThreadPoolExecutor executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
@@ -196,23 +197,35 @@ public abstract class AbstractDataSourceHandler
try
{
// run file handling on a separate thread
this.markUpdateStart(inputDataSource.getPos());
return CompletableFuture.runAsync(() ->
return this.updateDataSourceFutureByPos.compute(inputDataSource.getPos(), (Long newPos, CompletableFuture<Void> future) ->
{
try
if (future != null)
{
this.updateDataSourceAtPos(inputDataSource.getPos(), inputDataSource, true);
future.cancel(false);
this.markUpdateEnd(newPos);
}
catch (Exception e)
// run file handling on a separate thread
this.markUpdateStart(newPos);
future = CompletableFuture.runAsync(() ->
{
LOGGER.error("Unexpected error in async data source update, error: "+e.getMessage(), e);
}
finally
{
this.markUpdateEnd(inputDataSource.getPos());
}
}, executor);
try
{
this.updateDataSourceAtPos(newPos, inputDataSource, true);
}
catch (Exception e)
{
LOGGER.error("Unexpected error in async data source update at pos: ["+DhSectionPos.toString(newPos)+"], error: ["+e.getMessage()+"].", e);
}
finally
{
this.markUpdateEnd(newPos);
this.updateDataSourceFutureByPos.remove(newPos);
}
}, executor);
return future;
});
}
catch (RejectedExecutionException ignore)
{
@@ -8,6 +8,7 @@ import org.jetbrains.annotations.NotNull;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -23,6 +24,7 @@ public class DelayedFullDataSourceSaveCache
public final ConcurrentHashMap<Long, FullDataSourceV2> dataSourceByPosition = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, TimerTask> saveTimerTasksBySectionPos = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, CompletableFuture<Void>> futureBySectionPos = new ConcurrentHashMap<>();
private final ISaveDataSourceFunc onSaveTimeoutFunc;
private final int saveDelayInMs;
@@ -49,16 +51,20 @@ public class DelayedFullDataSourceSaveCache
* Writing into memory is done synchronously so inputDataSource can
* be closed after this method finishes.
*/
public void writeDataSourceToMemoryAndQueueSave(FullDataSourceV2 inputDataSource)
public CompletableFuture<Void> writeDataSourceToMemoryAndQueueSave(FullDataSourceV2 inputDataSource)
{
long dataSourcePos = inputDataSource.getPos();
this.dataSourceByPosition.compute(dataSourcePos, (inputPos, temporaryDataSource) ->
CompletableFuture<Void> future = this.futureBySectionPos.computeIfAbsent(dataSourcePos, (inputPos) -> new CompletableFuture<>());
this.dataSourceByPosition.compute(dataSourcePos, (inputPos, memoryDataSource) ->
{
if (temporaryDataSource == null)
if (memoryDataSource == null)
{
temporaryDataSource = FullDataSourceV2.createEmpty(inputPos);
// should not be closed since it will be used by other threads
memoryDataSource = FullDataSourceV2.createEmpty(inputPos);
}
temporaryDataSource.update(inputDataSource);
memoryDataSource.update(inputDataSource);
TimerTask timerTask = new TimerTask()
@@ -80,6 +86,14 @@ public class DelayedFullDataSourceSaveCache
{
LOGGER.error("Failed to save updated data for section ["+dataSourcePos+"], error: ["+e.getMessage()+"]", e);
}
finally
{
CompletableFuture<Void> future = DelayedFullDataSourceSaveCache.this.futureBySectionPos.remove(dataSourcePos);
if (future != null)
{
future.complete(null);
}
}
}
};
try
@@ -90,7 +104,7 @@ public class DelayedFullDataSourceSaveCache
{
// James isn't sure why this is possible since this logic is inside a lock,
// maybe the timer is just async enough that there can be problems?
LOGGER.warn("Attempted to queue an already canceled task. Pos: ["+dataSourcePos+"], task already queued for pos: ["+this.saveTimerTasksBySectionPos.containsKey(dataSourcePos)+"]");
//LOGGER.warn("Attempted to queue an already canceled task. Pos: ["+dataSourcePos+"], task already queued for pos: ["+this.saveTimerTasksBySectionPos.containsKey(dataSourcePos)+"]");
}
@@ -102,8 +116,10 @@ public class DelayedFullDataSourceSaveCache
oldTask.cancel();
}
return temporaryDataSource;
return memoryDataSource;
});
return future;
}
public int getUnsavedCount() { return this.dataSourceByPosition.size(); }
@@ -45,6 +45,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public abstract class AbstractDhLevel implements IDhLevel
@@ -144,7 +145,7 @@ public abstract class AbstractDhLevel implements IDhLevel
public int getUnsavedDataSourceCount() { return this.delayedFullDataSourceSaveCache.getUnsavedCount(); }
@Override
public void updateChunkAsync(IChunkWrapper chunkWrapper, int chunkHash)
public CompletableFuture<Void> updateChunkAsync(IChunkWrapper chunkWrapper, int chunkHash)
{
// data source synchronously written to memory so it can be safely closed
try (FullDataSourceV2 dataSource = FullDataSourceV2.createFromChunk(chunkWrapper))
@@ -152,7 +153,7 @@ public abstract class AbstractDhLevel implements IDhLevel
if (dataSource == null)
{
// This can happen if, among other reasons, a chunk save is superseded by a later event
return;
return CompletableFuture.completedFuture(null);
}
@@ -168,7 +169,7 @@ public abstract class AbstractDhLevel implements IDhLevel
this.updatedChunkHashesByChunkPos.put(chunkWrapper.getChunkPos(), chunkHash);
// batch updates to reduce overhead when flying around or breaking/placing a lot of blocks in an area
this.delayedFullDataSourceSaveCache.writeDataSourceToMemoryAndQueueSave(dataSource);
return this.delayedFullDataSourceSaveCache.writeDataSourceToMemoryAndQueueSave(dataSource);
}
}
@@ -237,23 +237,16 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
@Override
public CompletableFuture<Void> updateDataSourcesAsync(FullDataSourceV2 data)
{
if (!Config.Server.enableRealTimeUpdates.get())
{
return this.getFullDataProvider().updateDataSourceAsync(data);
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to send FullDataPartialUpdateMessage - getNetworkCompressionExecutor() is null");
return this.getFullDataProvider().updateDataSourceAsync(data);
}
try
{
CompletableFuture.runAsync(() ->
return this.getFullDataProvider()
.updateDataSourceAsync(data)
.thenRun(() ->
{
Objects.requireNonNull(this.beaconBeamRepo);
if (!Config.Server.enableRealTimeUpdates.get())
{
return;
}
LodUtil.assertTrue(this.beaconBeamRepo != null, "beaconBeamRepo should not be null");
try (FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos())))
{
for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers())
@@ -280,15 +273,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
}
}
}
}, executor);
}
catch (RejectedExecutionException ignore)
{
// the executor was shut down, it should be back up shortly and able to accept new jobs
}
return this.getFullDataProvider().updateDataSourceAsync(data);
});
}
@@ -113,15 +113,6 @@ public class DhClientServerLevel extends AbstractDhServerLevel implements IDhCli
@Override
public void clearRenderCache() { this.clientside.clearRenderCache(); }
@Override
public CompletableFuture<Void> updateDataSourcesAsync(FullDataSourceV2 data)
{
return CompletableFuture.allOf(
super.updateDataSourcesAsync(data),
this.clientside.updateDataSourcesAsync(data)
);
}
//===========//
@@ -48,7 +48,7 @@ public interface IDhLevel extends AutoCloseable, GeneratedFullDataSourceProvider
/** @return 0 if no hash is known */
int getChunkHash(DhChunkPos pos);
void updateChunkAsync(IChunkWrapper chunk, int newChunkHash);
CompletableFuture<Void> updateChunkAsync(IChunkWrapper chunk, int newChunkHash);
void loadBeaconBeamsInPos(long pos);
void updateBeaconBeamsForChunk(IChunkWrapper chunkToUpdate, ArrayList<IChunkWrapper> nearbyChunkList);