Fix beacon update locks

This commit is contained in:
James Seibel
2026-02-14 22:01:25 -06:00
parent df52f41b87
commit 0714697e05
8 changed files with 661 additions and 369 deletions
@@ -1,305 +0,0 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.KeyedLockContainer;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.logging.DhLogger;
import org.jetbrains.annotations.NotNull;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* Used to batch together multiple data source updates that all
* affect the same position.
*/
public class DelayedFullDataSourceSaveCache implements AutoCloseable
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
/**
* a cache won't automatically clean itself unless we trigger it's clean method
* if not done then we'd only see the cache invalidate when new inserts happen,
* which causes weird behavior when placing/breaking blocks.
*/
private static final ThreadPoolExecutor BACKGROUND_CLEAN_UP_THREAD = ThreadUtil.makeSingleDaemonThreadPool("delayed save cache cleaner");
private static final Set<WeakReference<DelayedFullDataSourceSaveCache>> SAVE_CACHE_SET = Collections.newSetFromMap(new ConcurrentHashMap<>());
/** how long between clean up checks */
private static final int CLEANUP_CHECK_TIME_IN_MS = 1_000;
private final ConcurrentHashMap<Long, DataSourceSavedTimePair> dataSourceByPosition = new ConcurrentHashMap<Long, DataSourceSavedTimePair>();
/* don't let two threads load the same position at the same time */
protected final KeyedLockContainer<Long> saveLockContainer = new KeyedLockContainer<>();
private final ISaveDataSourceFunc onSaveTimeoutAsyncFunc;
private final int saveDelayInMs;
//=============//
// constructor //
//=============//
//region
static
{
BACKGROUND_CLEAN_UP_THREAD.execute(() -> runCleanupLoop());
}
public DelayedFullDataSourceSaveCache(@NotNull ISaveDataSourceFunc onSaveTimeoutAsyncFunc, int saveDelayInMs)
{
this.onSaveTimeoutAsyncFunc = onSaveTimeoutAsyncFunc;
// we can't clean items faster than the cleanup timer fires
if (saveDelayInMs < CLEANUP_CHECK_TIME_IN_MS)
{
LOGGER.warn("The save delay ["+saveDelayInMs+"] shouldn't be less than the cleanup check timer interval ["+CLEANUP_CHECK_TIME_IN_MS+"].");
}
this.saveDelayInMs = saveDelayInMs;
SAVE_CACHE_SET.add(new WeakReference<>(this));
}
//endregion
//==============//
// update queue //
//==============//
//region
/**
* Writing into memory is done synchronously so inputDataSource can
* be closed after this method finishes.
*/
public void writeDataSourceToMemoryAndQueueSave(@NotNull FullDataSourceV2 inputDataSource)
{
long inputPos = inputDataSource.getPos();
ReentrantLock lockForPos = this.saveLockContainer.getLockForPos(inputPos);
try
{
lockForPos.lock();
FullDataSourceV2 memoryDataSource;
DataSourceSavedTimePair pair = this.dataSourceByPosition.getOrDefault(inputPos, null);
if (pair == null)
{
// no data currently in the memory cache for this position
memoryDataSource = FullDataSourceV2.createEmpty(inputPos);
pair = new DataSourceSavedTimePair(memoryDataSource);
DataSourceSavedTimePair oldPair = this.dataSourceByPosition.put(inputPos, pair);
if (oldPair != null)
{
// shouldn't happen, but just in case
this.handleDataSourceRemoval(oldPair.dataSource);
}
}
else
{
memoryDataSource = pair.dataSource;
}
// write the new data into memory
memoryDataSource.updateFromDataSource(inputDataSource);
// keep track of when the last time we saved something was
pair.updateLastWrittenTimestamp();
}
finally
{
lockForPos.unlock();
}
}
/** when this method is called the datasource should no longer be in the memory cache */
public void handleDataSourceRemoval(@NotNull FullDataSourceV2 removedDataSource)
{
this.onSaveTimeoutAsyncFunc.saveAsync(removedDataSource)
.handle((voidObj, throwable) ->
{
try
{
// if this close method is fired multiple times
// monoliths can appear due to concurrent writing to the
// backend arrays
removedDataSource.close();
}
catch (Exception e)
{
LOGGER.error("Unable to close datasource ["+ DhSectionPos.toString(removedDataSource.getPos()) +"], error: ["+e.getMessage()+"].", e);
}
return null;
});
}
//endregion
//==============//
// List methods //
//==============//
//region
public int getUnsavedCount() { return this.dataSourceByPosition.size(); }
public void flush() { this.cleanUp(true); }
/** Removes everything from the memory cache and fires the {@link DelayedFullDataSourceSaveCache#onSaveTimeoutAsyncFunc} for each. */
public void cleanUp(boolean flushAll)
{
Enumeration<Long> keyIterator = this.dataSourceByPosition.keys();
while (keyIterator.hasMoreElements())
{
Long pos = keyIterator.nextElement();
ReentrantLock posLock = this.saveLockContainer.getLockForPos(pos);
try
{
posLock.lock();
DataSourceSavedTimePair savedPair = this.dataSourceByPosition.getOrDefault(pos, null);
if (savedPair != null)
{
if (flushAll
|| savedPair.dataSourceHasTimedOut(this.saveDelayInMs))
{
this.dataSourceByPosition.remove(pos);
this.handleDataSourceRemoval(savedPair.dataSource);
}
}
}
finally
{
posLock.unlock();
}
}
}
//endregion
//================//
// static cleanup //
//================//
//region
private static void runCleanupLoop()
{
while (true)
{
try
{
try
{
Thread.sleep(CLEANUP_CHECK_TIME_IN_MS);
}
catch (InterruptedException ignore) { }
SAVE_CACHE_SET.forEach((cacheRef) ->
{
DelayedFullDataSourceSaveCache cache = cacheRef.get();
if (cache == null)
{
// shouldn't be necessary, but if we forget to manually close a cache, this will prevent leaking
SAVE_CACHE_SET.remove(cacheRef);
}
else
{
cache.cleanUp(false);
}
});
}
catch (Exception e)
{
LOGGER.error("Unexpected error in cleanup thread: [" + e.getMessage() + "].", e);
}
}
}
//endregion
//================//
// base overrides //
//================//
//region
@Override
public void close()
{
// not the fastest way to handle removing,
// but we shouldn't have more than 20 or so at once
// so this should be just fine
SAVE_CACHE_SET.removeIf((cacheRef) ->
{
DelayedFullDataSourceSaveCache cache = cacheRef.get();
return cache != null && cache.equals(this);
});
}
//endregion
//================//
// helper classes //
//================//
//region
@FunctionalInterface
public interface ISaveDataSourceFunc
{
/** called after the timeout expires */
CompletableFuture<Void> saveAsync(FullDataSourceV2 inputDataSource);
}
/**
* used to keep track of when data sources
* were written to so we can flush them once
* enough time has passed.
*/
private static class DataSourceSavedTimePair
{
@NotNull
public final FullDataSourceV2 dataSource;
/** the last unix millisecond time this data source was written to */
public long lastWrittenDateTimeMs;
public DataSourceSavedTimePair(@NotNull FullDataSourceV2 dataSource)
{
this.dataSource = dataSource;
this.lastWrittenDateTimeMs = System.currentTimeMillis();
}
public void updateLastWrittenTimestamp()
{ this.lastWrittenDateTimeMs = System.currentTimeMillis(); }
public boolean dataSourceHasTimedOut(long msTillTimeout)
{
long currentTime = System.currentTimeMillis();
long timeSinceUpdate = currentTime - this.lastWrittenDateTimeMs;
return (timeSinceUpdate > msTillTimeout);
}
}
//endregion
}
@@ -24,7 +24,7 @@ import com.seibel.distanthorizons.core.api.internal.SharedApi;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataUpdatePropagatorV2;
import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedDataSourceSaveCache;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.generation.DhLightingEngine;
import com.seibel.distanthorizons.core.generation.IFullDataSourceRetrievalQueue;
@@ -40,8 +40,6 @@ import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.ExceptionUtil;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.jetbrains.annotations.NotNull;
@@ -73,7 +71,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
private final AtomicReference<IFullDataSourceRetrievalQueue> worldGenQueueRef = new AtomicReference<>(null);
private final ArrayList<IOnWorldGenCompleteListener> onWorldGenTaskCompleteListeners = new ArrayList<>();
protected final DelayedFullDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedFullDataSourceSaveCache(this::onDataSourceSaveAsync, 10_000);
protected final DelayedDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedDataSourceSaveCache(this::onDataSourceSaveAsync, 10_000);
private final ConcurrentHashMap<Long, CompletableFuture<DataSourceRetrievalResult>> queuedRetrievalFutureByPos = new ConcurrentHashMap<>();
@@ -19,16 +19,11 @@
package com.seibel.distanthorizons.core.level;
import com.seibel.distanthorizons.api.enums.rendering.EDhApiBlockMaterial;
import com.seibel.distanthorizons.api.interfaces.render.IDhApiRenderableBoxGroup;
import com.seibel.distanthorizons.api.methods.events.abstractEvents.DhApiChunkModifiedEvent;
import com.seibel.distanthorizons.api.methods.events.sharedParameterObjects.DhApiRenderParam;
import com.seibel.distanthorizons.api.objects.math.DhApiVec3d;
import com.seibel.distanthorizons.api.objects.render.DhApiRenderableBox;
import com.seibel.distanthorizons.api.objects.render.DhApiRenderableBoxGroupShading;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.DelayedFullDataSourceSaveCache;
import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedBeaconSaveCache;
import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedDataSourceSaveCache;
import com.seibel.distanthorizons.core.generation.DhLightingEngine;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhChunkPos;
@@ -36,28 +31,21 @@ import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos;
import com.seibel.distanthorizons.core.render.renderer.generic.CloudRenderHandler;
import com.seibel.distanthorizons.core.render.renderer.generic.GenericObjectRenderer;
import com.seibel.distanthorizons.core.render.renderer.generic.GenericRenderObjectFactory;
import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO;
import com.seibel.distanthorizons.core.sql.dto.ChunkHashDTO;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.repo.BeaconBeamRepo;
import com.seibel.distanthorizons.core.sql.repo.ChunkHashRepo;
import com.seibel.distanthorizons.core.util.ColorUtil;
import com.seibel.distanthorizons.core.util.KeyedLockContainer;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.chunk.IChunkWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapper;
import com.seibel.distanthorizons.coreapi.DependencyInjection.ApiEventInjector;
import com.seibel.distanthorizons.coreapi.ModInfo;
import com.seibel.distanthorizons.coreapi.util.MathUtil;
import com.seibel.distanthorizons.core.logging.DhLogger;
import org.jetbrains.annotations.Nullable;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -77,8 +65,8 @@ public abstract class AbstractDhLevel implements IDhLevel
public BeaconBeamRepo beaconBeamRepo;
protected final KeyedLockContainer<Long> beaconUpdateLockContainer = new KeyedLockContainer<>();
protected final DelayedFullDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedFullDataSourceSaveCache(this::onDataSourceSaveAsync, 3_000);
protected final DelayedDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedDataSourceSaveCache(this::onDataSourceSaveAsync, 1_000);
protected final DelayedBeaconSaveCache delayedBeaconSaveCache = new DelayedBeaconSaveCache(this::updateBeaconBeamsBetweenBlockPos, 1_000);
/** contains the {@link DhChunkPos} for each {@link DhSectionPos} that are queued to save */
protected final ConcurrentHashMap<Long, HashSet<DhChunkPos>> updatedChunkPosSetBySectionPos = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<DhChunkPos, Integer> updatedChunkHashesByChunkPos = new ConcurrentHashMap<>();
@@ -178,7 +166,7 @@ public abstract class AbstractDhLevel implements IDhLevel
// merging writes together in memory significantly improves throughput, since most
// chunk modifications will be right next to each other, IE effecting the same LODs
this.delayedFullDataSourceSaveCache.writeDataSourceToMemoryAndQueueSave(dataSource);
this.delayedFullDataSourceSaveCache.writeToMemoryAndQueueSave(dataSource);
}
}
@@ -226,8 +214,6 @@ public abstract class AbstractDhLevel implements IDhLevel
// repos //
//=======//
// chunk hash //
@Override
public int getChunkHash(DhChunkPos pos)
{
@@ -251,12 +237,12 @@ public abstract class AbstractDhLevel implements IDhLevel
{
int minBlockX = DhSectionPos.getMinCornerBlockX(sectionPos);
int minBlockZ = DhSectionPos.getMinCornerBlockZ(sectionPos);
// TODO special logic had to be done for DhChunkPos.getMaxBlock,
// does that need to be done here?
// The DhChunkPos issue caused beacons to appear/disappear incorrectly on negative chunk borders
int maxBlockX = minBlockX + DhSectionPos.getBlockWidth(sectionPos);
int maxBlockZ = minBlockZ + DhSectionPos.getBlockWidth(sectionPos);
// no delayed cache needed since each hit will be at a different position
// and this is generally called when LODs are received from the
// server, so repeat hits to the same position are unlikely
this.updateBeaconBeamsBetweenBlockPos(
sectionPos,
minBlockX, maxBlockX,
@@ -267,22 +253,11 @@ public abstract class AbstractDhLevel implements IDhLevel
@Override
public void updateBeaconBeamsForChunkPos(DhChunkPos chunkPos, List<BeaconBeamDTO> activeBeamList)
{
long sectionPos = DhSectionPos.encodeContaining(DhSectionPos.SECTION_BLOCK_DETAIL_LEVEL, chunkPos);
int minBlockX = chunkPos.getMinBlockX();
int minBlockZ = chunkPos.getMinBlockZ();
int maxBlockX = chunkPos.getMaxBlockX();
int maxBlockZ = chunkPos.getMaxBlockZ();
//LOGGER.info("beacons ["+activeBeamList.size()+"] at ["+chunkPos+"] x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"].");
this.updateBeaconBeamsBetweenBlockPos(
sectionPos,
minBlockX, maxBlockX,
minBlockZ, maxBlockZ,
activeBeamList
);
{
// a delayed cache is used to prevent lock contention
// when flying through new chunks (or updating modified chunks)
// that are right next to each other (requiring up to 16 hits to the same LOD position
this.delayedBeaconSaveCache.queueBeaconBeamUpdatesForChunkPos(chunkPos, activeBeamList);
}
private void updateBeaconBeamsBetweenBlockPos(
@@ -298,8 +273,11 @@ public abstract class AbstractDhLevel implements IDhLevel
}
// locked to prevent two threads from updating the same section at the same time
ReentrantLock lock = this.beaconUpdateLockContainer.getLockForPos(sectionPosForLock); // TODO this can cause a lot of slow-downs
//LOGGER.info("beacons ["+activeBeamList.size()+"] at x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"].");
// locked to prevent two threads from updating the same position at the same time
ReentrantLock lock = this.beaconUpdateLockContainer.getLockForPos(sectionPosForLock);
try
{
lock.lock();
@@ -330,14 +308,10 @@ public abstract class AbstractDhLevel implements IDhLevel
for (DhBlockPos beaconPos : allPosSet)
{
if (minBlockX <= beaconPos.getX() && beaconPos.getX() <= maxBlockX
&& minBlockZ <= beaconPos.getZ() && beaconPos.getZ() <= maxBlockZ)
{
//// don't modify beacons outside the updated range
//continue;
}
else
if (minBlockX > beaconPos.getX() || beaconPos.getX() > maxBlockX
|| minBlockZ > beaconPos.getZ() || beaconPos.getZ() > maxBlockZ)
{
// don't modify beacons outside the updated range
continue;
}
@@ -346,12 +320,12 @@ public abstract class AbstractDhLevel implements IDhLevel
BeaconBeamDTO activeBeam = activeBeamByPos.get(beaconPos);
if (activeBeam != null)
{
//LOGGER.info("add beacon ["+activeBeam.blockPos+"] x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"].");
if (existingBeam == null)
{
// new beam found, add to DB
this.beaconBeamRepo.save(activeBeam);
//LOGGER.info("new beacon ["+activeBeam+"].");
}
else
{
@@ -360,6 +334,12 @@ public abstract class AbstractDhLevel implements IDhLevel
{
// beam colors were changed
this.beaconBeamRepo.save(activeBeam);
//LOGGER.info("change beacon ["+existingBeam+"] -> ["+activeBeam+"].");
}
else
{
//LOGGER.info("existing beacon ["+existingBeam+"].");
}
}
}
@@ -367,7 +347,8 @@ public abstract class AbstractDhLevel implements IDhLevel
{
// beam no longer exists at position, remove from DB
this.beaconBeamRepo.deleteWithKey(beaconPos);
//LOGGER.info("remove beacon ["+beaconPos+"] x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"].");
//LOGGER.info("remove beacon ["+existingBeam+"].");
}
}
}
@@ -0,0 +1,249 @@
package com.seibel.distanthorizons.core.util.delayedSaveCache;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.util.KeyedLockContainer;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.logging.DhLogger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* Used to batch together multiple updates that all
* affect the same position.
*
* @see AbstractSaveObjContainer
*/
public abstract class AbstractDelayedSaveCache<TSaveObj, TSaveContainer extends AbstractSaveObjContainer<TSaveObj>> implements AutoCloseable
{
private static final DhLogger LOGGER = new DhLoggerBuilder()
.name(AbstractDelayedSaveCache.class.getSimpleName())
.build();
/**
* a cache won't automatically clean itself unless we trigger it's clean method
* if not done then we'd only see the cache invalidate when new inserts happen,
* which causes weird behavior when placing/breaking blocks.
*/
private static final ThreadPoolExecutor BACKGROUND_CLEAN_UP_THREAD = ThreadUtil.makeSingleDaemonThreadPool("delayed save cache cleaner");
private static final Set<WeakReference<AbstractDelayedSaveCache<?,?>>> SAVE_CACHE_SET = Collections.newSetFromMap(new ConcurrentHashMap<>());
/** how long between clean up checks */
private static final int CLEANUP_CHECK_TIME_IN_MS = 1_000;
protected final ConcurrentHashMap<Long, TSaveContainer> saveContainerByPosition = new ConcurrentHashMap<>();
/* don't let two threads load the same position at the same time */
protected final KeyedLockContainer<Long> saveLockContainer = new KeyedLockContainer<>();
/** how long a {@link TSaveContainer} should have lived before being eligible for automatic flushing. */
protected final int saveDelayInMs;
//=============//
// constructor //
//=============//
//region
static
{
BACKGROUND_CLEAN_UP_THREAD.execute(() -> runCleanupLoop());
}
public AbstractDelayedSaveCache(int saveDelayInMs)
{
// we can't clean items faster than the cleanup timer fires
if (saveDelayInMs < CLEANUP_CHECK_TIME_IN_MS)
{
LOGGER.warn("The save delay ["+saveDelayInMs+"] shouldn't be less than the cleanup check timer interval ["+CLEANUP_CHECK_TIME_IN_MS+"].");
}
this.saveDelayInMs = saveDelayInMs;
SAVE_CACHE_SET.add(new WeakReference<>(this));
}
//endregion
//==================//
// abstract methods //
//==================//
//region
protected abstract TSaveContainer createEmptySaveObjContainer(long inputPos);
/** when this method is called the {@link TSaveContainer} should no longer be in the memory cache */
protected abstract void handleDataSourceRemoval(@NotNull TSaveContainer saveContainer);
//endregion
//==============//
// update queue //
//==============//
//region
/**
* Writing into memory is done synchronously so inputDataSource can
* be closed after this method finishes.
*
* @param inputObj whether or not this can be null will depend on the specific implementation of this class.
*
* @return the new (or pre-existing) {@link TSaveContainer} so child objects can modify it if needed
*/
public TSaveContainer writeToMemoryAndQueueSave(long inputPos, @Nullable TSaveObj inputObj)
{
ReentrantLock lockForPos = this.saveLockContainer.getLockForPos(inputPos);
try
{
lockForPos.lock();
TSaveContainer container = this.saveContainerByPosition.get(inputPos);
if (container == null)
{
// no data currently in the memory cache for this position
container = this.createEmptySaveObjContainer(inputPos);
TSaveContainer oldContainer = this.saveContainerByPosition.put(inputPos, container);
if (oldContainer != null)
{
// shouldn't happen, but just in case
this.handleDataSourceRemoval(oldContainer);
}
}
// write the new data into memory
container.update(inputObj);
// keep track of when the last time we saved something was
container.updateLastWrittenTimestamp();
return container;
}
finally
{
lockForPos.unlock();
}
}
//endregion
//==============//
// List methods //
//==============//
//region
public int getUnsavedCount() { return this.saveContainerByPosition.size(); }
//endregion
//===============//
// cleanup/flush //
//===============//
//region
public void flush() { this.cleanUp(true); }
/** Removes everything from the memory cache and fires the {@link AbstractDelayedSaveCache#handleDataSourceRemoval} for each. */
public void cleanUp(boolean flushAll)
{
Enumeration<Long> keyIterator = this.saveContainerByPosition.keys();
while (keyIterator.hasMoreElements())
{
Long pos = keyIterator.nextElement();
ReentrantLock posLock = this.saveLockContainer.getLockForPos(pos);
try
{
posLock.lock();
TSaveContainer savedContainer = this.saveContainerByPosition.get(pos);
if (savedContainer != null)
{
if (flushAll
|| savedContainer.hasTimedOut(this.saveDelayInMs))
{
this.handleDataSourceRemoval(savedContainer);
this.saveContainerByPosition.remove(pos);
}
}
}
finally
{
posLock.unlock();
}
}
}
private static void runCleanupLoop()
{
while (true)
{
try
{
try
{
Thread.sleep(CLEANUP_CHECK_TIME_IN_MS);
}
catch (InterruptedException ignore) { }
SAVE_CACHE_SET.forEach((cacheRef) ->
{
AbstractDelayedSaveCache<?,?> cache = cacheRef.get();
if (cache == null)
{
// shouldn't be necessary, but if we forget to manually close a cache, this will prevent leaking
SAVE_CACHE_SET.remove(cacheRef);
}
else
{
cache.cleanUp(false);
}
});
}
catch (Exception e)
{
LOGGER.error("Unexpected error in cleanup thread: [" + e.getMessage() + "].", e);
}
}
}
//endregion
//================//
// base overrides //
//================//
//region
@Override
public void close()
{
// not the fastest way to handle removing,
// but we shouldn't have more than 20 or so at once
// so this should be just fine
SAVE_CACHE_SET.removeIf((cacheRef) ->
{
AbstractDelayedSaveCache<?,?> cache = cacheRef.get();
return cache != null && cache.equals(this);
});
}
//endregion
}
@@ -0,0 +1,64 @@
package com.seibel.distanthorizons.core.util.delayedSaveCache;
import org.jetbrains.annotations.Nullable;
/**
* used to keep track of when data sources
* were written to so we can flush them once
* enough time has passed.
*
* @see AbstractDelayedSaveCache
*/
public abstract class AbstractSaveObjContainer<T>
{
/** the last unix millisecond time this data source was written to */
public long lastWrittenDateTimeMs;
//=============//
// constructor //
//=============//
//region
public AbstractSaveObjContainer() { this.lastWrittenDateTimeMs = System.currentTimeMillis(); }
//endregion
//==================//
// abstract methods //
//==================//
//region
/**
* Can be used to merge multiple objects together if the implementation requires it.
*
* @param newObj whether or not this can be null will depend on the specific implementation of the parent {@link AbstractDelayedSaveCache}.
*/
public abstract void update(@Nullable T newObj);
//endregion
//=========//
// timeout //
//=========//
//region
public void updateLastWrittenTimestamp() { this.lastWrittenDateTimeMs = System.currentTimeMillis(); }
public boolean hasTimedOut(long msTillTimeout)
{
long currentTime = System.currentTimeMillis();
long timeSinceUpdate = currentTime - this.lastWrittenDateTimeMs;
return (timeSinceUpdate > msTillTimeout);
}
//endregion
}
@@ -0,0 +1,172 @@
package com.seibel.distanthorizons.core.util.delayedSaveCache;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhChunkPos;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos;
import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.*;
public class DelayedBeaconSaveCache extends AbstractDelayedSaveCache<BeaconBeamDTO, DelayedBeaconSaveCache.BeaconSaveObjContainer>
{
private static final DhLogger LOGGER = new DhLoggerBuilder()
.name(DelayedBeaconSaveCache.class.getSimpleName())
.build();
private final @NotNull ISaveBeaconsFunc saveBeaconsFunc;
//=============//
// constructor //
//=============//
//region
public DelayedBeaconSaveCache(
@NotNull ISaveBeaconsFunc saveBeaconsFunc,
int saveDelayInMs)
{
super(saveDelayInMs);
this.saveBeaconsFunc = saveBeaconsFunc;
}
//endregion
//================//
// save to memory //
//================//
//region
/**
* Replaces {@link DelayedBeaconSaveCache#writeToMemoryAndQueueSave(long, BeaconBeamDTO)}
* due to some additional information being needed.
*/
public void queueBeaconBeamUpdatesForChunkPos(@NotNull DhChunkPos chunkPos, @NotNull List<BeaconBeamDTO> activeBeamList)
{
BeaconSaveObjContainer container = super.writeToMemoryAndQueueSave(DhSectionPos.encodeContaining((byte)6, chunkPos), null);
container.addBeaconsAtChunkPos(chunkPos, activeBeamList);
}
/**
* deprecated since we want to use {@link DelayedBeaconSaveCache#queueBeaconBeamUpdatesForChunkPos} instead
* so we can track additional info.
*/
@Deprecated
@Override
public BeaconSaveObjContainer writeToMemoryAndQueueSave(long inputPos, BeaconBeamDTO inputObj) { throw new UnsupportedOperationException("Use queueBeaconBeamUpdatesForChunkPos instead"); }
//endregion
//====================//
// abstract overrides //
//====================//
//region
@Override
protected BeaconSaveObjContainer createEmptySaveObjContainer(long inputPos) { return new BeaconSaveObjContainer(inputPos); }
@Override
protected void handleDataSourceRemoval(@NotNull BeaconSaveObjContainer saveContainer)
{
for (DhChunkPos chunkPos : saveContainer.beaconsByBlockPosByChunkPos.keySet())
{
HashMap<DhBlockPos, BeaconBeamDTO> beaconsByBlockPos = saveContainer.beaconsByBlockPosByChunkPos.get(chunkPos);
ArrayList<BeaconBeamDTO> beaconList = new ArrayList<>(beaconsByBlockPos.values());
int minBlockX = chunkPos.getMinBlockX();
int minBlockZ = chunkPos.getMinBlockZ();
int maxBlockX = chunkPos.getMaxBlockX();
int maxBlockZ = chunkPos.getMaxBlockZ();
this.saveBeaconsFunc.updateBeaconBeamsBetweenBlockPos(
saveContainer.pos,
minBlockX, maxBlockX,
minBlockZ, maxBlockZ,
beaconList
);
}
}
//endregion
//================//
// helper classes //
//================//
//region
@FunctionalInterface
public interface ISaveBeaconsFunc
{
/** called after the timeout expires */
void updateBeaconBeamsBetweenBlockPos(
long sectionPosForLock,
int minBlockX, int maxBlockX,
int minBlockZ, int maxBlockZ,
List<BeaconBeamDTO> activeBeamList
);
}
public static class BeaconSaveObjContainer extends AbstractSaveObjContainer<BeaconBeamDTO>
{
public final long pos;
public final HashMap<DhChunkPos, HashMap<DhBlockPos, BeaconBeamDTO>> beaconsByBlockPosByChunkPos = new HashMap<>();
//=============//
// constructor //
//=============//
//region
public BeaconSaveObjContainer(long pos) { this.pos = pos; }
//endregion
//===========//
// overrides //
//===========//
//region
public void addBeaconsAtChunkPos(DhChunkPos chunkPos, List<BeaconBeamDTO> activeBeamList)
{
HashMap<DhBlockPos, BeaconBeamDTO> beaconsByBlockPos = this.beaconsByBlockPosByChunkPos.get(chunkPos);
if (!this.beaconsByBlockPosByChunkPos.containsKey(chunkPos))
{
beaconsByBlockPos = new HashMap<>();
this.beaconsByBlockPosByChunkPos.put(chunkPos, beaconsByBlockPos);
}
for (BeaconBeamDTO beacon : activeBeamList)
{
beaconsByBlockPos.put(beacon.blockPos, beacon);
}
}
/**
* This logic is handled via {@link BeaconSaveObjContainer#addBeaconsAtChunkPos} instead
* due to requiring some additional information.
*/
@Override
public void update(@Nullable BeaconBeamDTO newObj) { }
//endregion
}
//endregion
}
@@ -0,0 +1,114 @@
package com.seibel.distanthorizons.core.util.delayedSaveCache;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CompletableFuture;
public class DelayedDataSourceSaveCache extends AbstractDelayedSaveCache<FullDataSourceV2, DelayedDataSourceSaveCache.DataSourceSaveObjContainer>
{
private static final DhLogger LOGGER = new DhLoggerBuilder()
.name(DelayedDataSourceSaveCache.class.getSimpleName())
.build();
private final ISaveDataSourceFunc onSaveTimeoutAsyncFunc;
//=============//
// constructor //
//=============//
//region
public DelayedDataSourceSaveCache(@NotNull ISaveDataSourceFunc onSaveTimeoutAsyncFunc, int saveDelayInMs)
{
super(saveDelayInMs);
this.onSaveTimeoutAsyncFunc = onSaveTimeoutAsyncFunc;
}
//endregion
//===========//
// overrides //
//===========//
//region
public void writeToMemoryAndQueueSave(@NotNull FullDataSourceV2 inputObj) { super.writeToMemoryAndQueueSave(inputObj.getPos(), inputObj); }
@Override
protected DataSourceSaveObjContainer createEmptySaveObjContainer(long inputPos) { return new DataSourceSaveObjContainer(inputPos); }
@Override
protected void handleDataSourceRemoval(@NotNull DataSourceSaveObjContainer saveContainer)
{
FullDataSourceV2 removedDataSource = saveContainer.dataSource;
this.onSaveTimeoutAsyncFunc.saveAsync(removedDataSource)
.handle((voidObj, throwable) ->
{
try
{
// if this close method is fired multiple times
// monoliths can appear due to concurrent writing to the
// backend arrays
removedDataSource.close();
}
catch (Exception e)
{
LOGGER.error("Unable to close datasource ["+ DhSectionPos.toString(removedDataSource.getPos()) +"], error: ["+e.getMessage()+"].", e);
}
return null;
});
}
//endregion
//================//
// helper classes //
//================//
//region
@FunctionalInterface
public interface ISaveDataSourceFunc
{
/** called after the timeout expires */
CompletableFuture<Void> saveAsync(@NotNull FullDataSourceV2 inputDataSource);
}
public static class DataSourceSaveObjContainer extends AbstractSaveObjContainer<FullDataSourceV2>
{
private final @NotNull FullDataSourceV2 dataSource;
public DataSourceSaveObjContainer(long inputPos)
{
this.dataSource = FullDataSourceV2.createEmpty(inputPos);
}
@Override
public void update(@Nullable FullDataSourceV2 newObj)
{
// shouldn't happen, but just in case
if (newObj == null)
{
throw new NullPointerException();
}
this.dataSource.updateFromDataSource(newObj);
}
}
//endregion
}
@@ -20,16 +20,21 @@
package tests;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.DelayedFullDataSourceSaveCache;
import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedDataSourceSaveCache;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.util.objects.pooling.PhantomArrayListCheckout;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A few very basic tests to confirm {@link DelayedFullDataSourceSaveCache}
* A few very basic tests to confirm {@link DelayedDataSourceSaveCache}
* is working properly.
*
* @author James Seibel
@@ -37,19 +42,27 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class DelayedSaveCacheTest
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
static
{
// allow all logging levels
Configurator.setRootLevel(Level.ALL);
}
// commented out for now since it makes the normal build take longer
//@Test
@Test
public void CacheExpirationAndPoolingTest() throws InterruptedException
{
// how many times any data source has been "written to disk"
AtomicInteger diskSaveCountRef = new AtomicInteger(0);
DelayedFullDataSourceSaveCache cache = new DelayedFullDataSourceSaveCache((FullDataSourceV2 fullDataSource) ->
DelayedDataSourceSaveCache cache = new DelayedDataSourceSaveCache((FullDataSourceV2 fullDataSource) ->
{
diskSaveCountRef.getAndIncrement();
return this.onDataSourceSaveAsync(fullDataSource);
return CompletableFuture.completedFuture(null);
}, 1_000);
@@ -58,11 +71,13 @@ public class DelayedSaveCacheTest
// single item and manual flush //
//==============================//
//LOGGER.info("============ Single item - manual flush ============");
PhantomArrayListCheckout initialCheckout;
try (FullDataSourceV2 initialSource = FullDataSourceV2.createEmpty(DhSectionPos.encode((byte)6, 0, 0)))
{
initialCheckout = initialSource.getPhantomArrayCheckoutForUnitTesting();
cache.writeDataSourceToMemoryAndQueueSave(initialSource);
cache.writeToMemoryAndQueueSave( initialSource);
}
Assert.assertEquals("only 1 item should be in the cache", 1, cache.getUnsavedCount());
Assert.assertEquals("no disk saves should have happened yet", 0, diskSaveCountRef.get());
@@ -78,6 +93,8 @@ public class DelayedSaveCacheTest
// quick group position //
//======================//
//LOGGER.info("============ quick group - auto flush ============");
// write multiple items for the same position
for (int i = 0; i < 4; i++)
{
@@ -86,7 +103,7 @@ public class DelayedSaveCacheTest
PhantomArrayListCheckout loopCheckout = loopSource.getPhantomArrayCheckoutForUnitTesting();
Assert.assertEquals(initialCheckout, loopCheckout);
cache.writeDataSourceToMemoryAndQueueSave(loopSource);
cache.writeToMemoryAndQueueSave(loopSource);
}
}
// each item writes to the same place
@@ -104,6 +121,8 @@ public class DelayedSaveCacheTest
// slow group position //
//=====================//
//LOGGER.info("============ slow group - auto flush ============");
// write multiple items for the same position
for (int i = 0; i < 4; i++)
{
@@ -112,7 +131,7 @@ public class DelayedSaveCacheTest
PhantomArrayListCheckout loopCheckout = loopSource.getPhantomArrayCheckoutForUnitTesting();
Assert.assertEquals(initialCheckout, loopCheckout);
cache.writeDataSourceToMemoryAndQueueSave(loopSource);
cache.writeToMemoryAndQueueSave(loopSource);
}
// long enough to prevent a timeout, but short enough that they don't happen all at once
@@ -128,7 +147,7 @@ public class DelayedSaveCacheTest
Assert.assertEquals("third timeout expected", 3, diskSaveCountRef.get());
}
private CompletableFuture<Void> onDataSourceSaveAsync(FullDataSourceV2 fullDataSource)
{ return CompletableFuture.completedFuture(null); }
}