diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/DelayedFullDataSourceSaveCache.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/DelayedFullDataSourceSaveCache.java deleted file mode 100644 index f2f22fbf9..000000000 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/DelayedFullDataSourceSaveCache.java +++ /dev/null @@ -1,305 +0,0 @@ -package com.seibel.distanthorizons.core.file.fullDatafile; - -import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; -import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; -import com.seibel.distanthorizons.core.pos.DhSectionPos; -import com.seibel.distanthorizons.core.util.KeyedLockContainer; -import com.seibel.distanthorizons.core.util.ThreadUtil; -import com.seibel.distanthorizons.core.logging.DhLogger; -import org.jetbrains.annotations.NotNull; - -import java.lang.ref.WeakReference; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Set; -import java.util.concurrent.*; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Used to batch together multiple data source updates that all - * affect the same position. - */ -public class DelayedFullDataSourceSaveCache implements AutoCloseable -{ - private static final DhLogger LOGGER = new DhLoggerBuilder().build(); - - /** - * a cache won't automatically clean itself unless we trigger it's clean method - * if not done then we'd only see the cache invalidate when new inserts happen, - * which causes weird behavior when placing/breaking blocks. - */ - private static final ThreadPoolExecutor BACKGROUND_CLEAN_UP_THREAD = ThreadUtil.makeSingleDaemonThreadPool("delayed save cache cleaner"); - private static final Set> SAVE_CACHE_SET = Collections.newSetFromMap(new ConcurrentHashMap<>()); - /** how long between clean up checks */ - private static final int CLEANUP_CHECK_TIME_IN_MS = 1_000; - - - - private final ConcurrentHashMap dataSourceByPosition = new ConcurrentHashMap(); - - /* don't let two threads load the same position at the same time */ - protected final KeyedLockContainer saveLockContainer = new KeyedLockContainer<>(); - - private final ISaveDataSourceFunc onSaveTimeoutAsyncFunc; - private final int saveDelayInMs; - - - - //=============// - // constructor // - //=============// - //region - - static - { - BACKGROUND_CLEAN_UP_THREAD.execute(() -> runCleanupLoop()); - } - - public DelayedFullDataSourceSaveCache(@NotNull ISaveDataSourceFunc onSaveTimeoutAsyncFunc, int saveDelayInMs) - { - this.onSaveTimeoutAsyncFunc = onSaveTimeoutAsyncFunc; - - // we can't clean items faster than the cleanup timer fires - if (saveDelayInMs < CLEANUP_CHECK_TIME_IN_MS) - { - LOGGER.warn("The save delay ["+saveDelayInMs+"] shouldn't be less than the cleanup check timer interval ["+CLEANUP_CHECK_TIME_IN_MS+"]."); - } - this.saveDelayInMs = saveDelayInMs; - - - SAVE_CACHE_SET.add(new WeakReference<>(this)); - } - - //endregion - - - - //==============// - // update queue // - //==============// - //region - - /** - * Writing into memory is done synchronously so inputDataSource can - * be closed after this method finishes. - */ - public void writeDataSourceToMemoryAndQueueSave(@NotNull FullDataSourceV2 inputDataSource) - { - long inputPos = inputDataSource.getPos(); - - ReentrantLock lockForPos = this.saveLockContainer.getLockForPos(inputPos); - try - { - lockForPos.lock(); - - FullDataSourceV2 memoryDataSource; - - DataSourceSavedTimePair pair = this.dataSourceByPosition.getOrDefault(inputPos, null); - if (pair == null) - { - // no data currently in the memory cache for this position - memoryDataSource = FullDataSourceV2.createEmpty(inputPos); - pair = new DataSourceSavedTimePair(memoryDataSource); - DataSourceSavedTimePair oldPair = this.dataSourceByPosition.put(inputPos, pair); - if (oldPair != null) - { - // shouldn't happen, but just in case - this.handleDataSourceRemoval(oldPair.dataSource); - } - } - else - { - memoryDataSource = pair.dataSource; - } - - // write the new data into memory - memoryDataSource.updateFromDataSource(inputDataSource); - // keep track of when the last time we saved something was - pair.updateLastWrittenTimestamp(); - } - finally - { - lockForPos.unlock(); - } - } - - /** when this method is called the datasource should no longer be in the memory cache */ - public void handleDataSourceRemoval(@NotNull FullDataSourceV2 removedDataSource) - { - this.onSaveTimeoutAsyncFunc.saveAsync(removedDataSource) - .handle((voidObj, throwable) -> - { - try - { - // if this close method is fired multiple times - // monoliths can appear due to concurrent writing to the - // backend arrays - removedDataSource.close(); - } - catch (Exception e) - { - LOGGER.error("Unable to close datasource ["+ DhSectionPos.toString(removedDataSource.getPos()) +"], error: ["+e.getMessage()+"].", e); - } - - return null; - }); - } - - //endregion - - - - //==============// - // List methods // - //==============// - //region - - public int getUnsavedCount() { return this.dataSourceByPosition.size(); } - - public void flush() { this.cleanUp(true); } - /** Removes everything from the memory cache and fires the {@link DelayedFullDataSourceSaveCache#onSaveTimeoutAsyncFunc} for each. */ - public void cleanUp(boolean flushAll) - { - Enumeration keyIterator = this.dataSourceByPosition.keys(); - while (keyIterator.hasMoreElements()) - { - Long pos = keyIterator.nextElement(); - ReentrantLock posLock = this.saveLockContainer.getLockForPos(pos); - try - { - posLock.lock(); - - DataSourceSavedTimePair savedPair = this.dataSourceByPosition.getOrDefault(pos, null); - if (savedPair != null) - { - if (flushAll - || savedPair.dataSourceHasTimedOut(this.saveDelayInMs)) - { - this.dataSourceByPosition.remove(pos); - this.handleDataSourceRemoval(savedPair.dataSource); - } - } - } - finally - { - posLock.unlock(); - } - } - } - - //endregion - - - - //================// - // static cleanup // - //================// - //region - - private static void runCleanupLoop() - { - while (true) - { - try - { - try - { - Thread.sleep(CLEANUP_CHECK_TIME_IN_MS); - } - catch (InterruptedException ignore) { } - - SAVE_CACHE_SET.forEach((cacheRef) -> - { - DelayedFullDataSourceSaveCache cache = cacheRef.get(); - if (cache == null) - { - // shouldn't be necessary, but if we forget to manually close a cache, this will prevent leaking - SAVE_CACHE_SET.remove(cacheRef); - } - else - { - cache.cleanUp(false); - } - }); - } - catch (Exception e) - { - LOGGER.error("Unexpected error in cleanup thread: [" + e.getMessage() + "].", e); - } - } - } - - //endregion - - - - //================// - // base overrides // - //================// - //region - - @Override - public void close() - { - // not the fastest way to handle removing, - // but we shouldn't have more than 20 or so at once - // so this should be just fine - SAVE_CACHE_SET.removeIf((cacheRef) -> - { - DelayedFullDataSourceSaveCache cache = cacheRef.get(); - return cache != null && cache.equals(this); - }); - } - - //endregion - - - - //================// - // helper classes // - //================// - //region - - @FunctionalInterface - public interface ISaveDataSourceFunc - { - /** called after the timeout expires */ - CompletableFuture saveAsync(FullDataSourceV2 inputDataSource); - } - - /** - * used to keep track of when data sources - * were written to so we can flush them once - * enough time has passed. - */ - private static class DataSourceSavedTimePair - { - @NotNull - public final FullDataSourceV2 dataSource; - /** the last unix millisecond time this data source was written to */ - public long lastWrittenDateTimeMs; - - - public DataSourceSavedTimePair(@NotNull FullDataSourceV2 dataSource) - { - this.dataSource = dataSource; - this.lastWrittenDateTimeMs = System.currentTimeMillis(); - } - - - public void updateLastWrittenTimestamp() - { this.lastWrittenDateTimeMs = System.currentTimeMillis(); } - - public boolean dataSourceHasTimedOut(long msTillTimeout) - { - long currentTime = System.currentTimeMillis(); - long timeSinceUpdate = currentTime - this.lastWrittenDateTimeMs; - return (timeSinceUpdate > msTillTimeout); - } - } - - //endregion - - - -} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index 4fdf7effc..0b135fb14 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -24,7 +24,7 @@ import com.seibel.distanthorizons.core.api.internal.SharedApi; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2; -import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataUpdatePropagatorV2; +import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedDataSourceSaveCache; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; import com.seibel.distanthorizons.core.generation.DhLightingEngine; import com.seibel.distanthorizons.core.generation.IFullDataSourceRetrievalQueue; @@ -40,8 +40,6 @@ import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.util.ExceptionUtil; import com.seibel.distanthorizons.core.util.LodUtil; -import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker; -import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import it.unimi.dsi.fastutil.bytes.ByteArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.jetbrains.annotations.NotNull; @@ -73,7 +71,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im private final AtomicReference worldGenQueueRef = new AtomicReference<>(null); private final ArrayList onWorldGenTaskCompleteListeners = new ArrayList<>(); - protected final DelayedFullDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedFullDataSourceSaveCache(this::onDataSourceSaveAsync, 10_000); + protected final DelayedDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedDataSourceSaveCache(this::onDataSourceSaveAsync, 10_000); private final ConcurrentHashMap> queuedRetrievalFutureByPos = new ConcurrentHashMap<>(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java index 1a3817c71..84dcbf3c0 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhLevel.java @@ -19,16 +19,11 @@ package com.seibel.distanthorizons.core.level; -import com.seibel.distanthorizons.api.enums.rendering.EDhApiBlockMaterial; -import com.seibel.distanthorizons.api.interfaces.render.IDhApiRenderableBoxGroup; import com.seibel.distanthorizons.api.methods.events.abstractEvents.DhApiChunkModifiedEvent; -import com.seibel.distanthorizons.api.methods.events.sharedParameterObjects.DhApiRenderParam; -import com.seibel.distanthorizons.api.objects.math.DhApiVec3d; -import com.seibel.distanthorizons.api.objects.render.DhApiRenderableBox; -import com.seibel.distanthorizons.api.objects.render.DhApiRenderableBoxGroupShading; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; -import com.seibel.distanthorizons.core.file.fullDatafile.DelayedFullDataSourceSaveCache; +import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedBeaconSaveCache; +import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedDataSourceSaveCache; import com.seibel.distanthorizons.core.generation.DhLightingEngine; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.pos.DhChunkPos; @@ -36,28 +31,21 @@ import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos; import com.seibel.distanthorizons.core.render.renderer.generic.CloudRenderHandler; import com.seibel.distanthorizons.core.render.renderer.generic.GenericObjectRenderer; -import com.seibel.distanthorizons.core.render.renderer.generic.GenericRenderObjectFactory; import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO; import com.seibel.distanthorizons.core.sql.dto.ChunkHashDTO; import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo; import com.seibel.distanthorizons.core.sql.repo.BeaconBeamRepo; import com.seibel.distanthorizons.core.sql.repo.ChunkHashRepo; -import com.seibel.distanthorizons.core.util.ColorUtil; import com.seibel.distanthorizons.core.util.KeyedLockContainer; import com.seibel.distanthorizons.core.util.LodUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.chunk.IChunkWrapper; -import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapper; import com.seibel.distanthorizons.coreapi.DependencyInjection.ApiEventInjector; -import com.seibel.distanthorizons.coreapi.ModInfo; -import com.seibel.distanthorizons.coreapi.util.MathUtil; import com.seibel.distanthorizons.core.logging.DhLogger; import org.jetbrains.annotations.Nullable; -import java.awt.*; import java.io.File; import java.io.IOException; import java.sql.SQLException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -77,8 +65,8 @@ public abstract class AbstractDhLevel implements IDhLevel public BeaconBeamRepo beaconBeamRepo; protected final KeyedLockContainer beaconUpdateLockContainer = new KeyedLockContainer<>(); - - protected final DelayedFullDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedFullDataSourceSaveCache(this::onDataSourceSaveAsync, 3_000); + protected final DelayedDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedDataSourceSaveCache(this::onDataSourceSaveAsync, 1_000); + protected final DelayedBeaconSaveCache delayedBeaconSaveCache = new DelayedBeaconSaveCache(this::updateBeaconBeamsBetweenBlockPos, 1_000); /** contains the {@link DhChunkPos} for each {@link DhSectionPos} that are queued to save */ protected final ConcurrentHashMap> updatedChunkPosSetBySectionPos = new ConcurrentHashMap<>(); protected final ConcurrentHashMap updatedChunkHashesByChunkPos = new ConcurrentHashMap<>(); @@ -178,7 +166,7 @@ public abstract class AbstractDhLevel implements IDhLevel // merging writes together in memory significantly improves throughput, since most // chunk modifications will be right next to each other, IE effecting the same LODs - this.delayedFullDataSourceSaveCache.writeDataSourceToMemoryAndQueueSave(dataSource); + this.delayedFullDataSourceSaveCache.writeToMemoryAndQueueSave(dataSource); } } @@ -226,8 +214,6 @@ public abstract class AbstractDhLevel implements IDhLevel // repos // //=======// - // chunk hash // - @Override public int getChunkHash(DhChunkPos pos) { @@ -251,12 +237,12 @@ public abstract class AbstractDhLevel implements IDhLevel { int minBlockX = DhSectionPos.getMinCornerBlockX(sectionPos); int minBlockZ = DhSectionPos.getMinCornerBlockZ(sectionPos); - // TODO special logic had to be done for DhChunkPos.getMaxBlock, - // does that need to be done here? - // The DhChunkPos issue caused beacons to appear/disappear incorrectly on negative chunk borders int maxBlockX = minBlockX + DhSectionPos.getBlockWidth(sectionPos); int maxBlockZ = minBlockZ + DhSectionPos.getBlockWidth(sectionPos); + // no delayed cache needed since each hit will be at a different position + // and this is generally called when LODs are received from the + // server, so repeat hits to the same position are unlikely this.updateBeaconBeamsBetweenBlockPos( sectionPos, minBlockX, maxBlockX, @@ -267,22 +253,11 @@ public abstract class AbstractDhLevel implements IDhLevel @Override public void updateBeaconBeamsForChunkPos(DhChunkPos chunkPos, List activeBeamList) - { - long sectionPos = DhSectionPos.encodeContaining(DhSectionPos.SECTION_BLOCK_DETAIL_LEVEL, chunkPos); - - int minBlockX = chunkPos.getMinBlockX(); - int minBlockZ = chunkPos.getMinBlockZ(); - int maxBlockX = chunkPos.getMaxBlockX(); - int maxBlockZ = chunkPos.getMaxBlockZ(); - - //LOGGER.info("beacons ["+activeBeamList.size()+"] at ["+chunkPos+"] x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"]."); - - this.updateBeaconBeamsBetweenBlockPos( - sectionPos, - minBlockX, maxBlockX, - minBlockZ, maxBlockZ, - activeBeamList - ); + { + // a delayed cache is used to prevent lock contention + // when flying through new chunks (or updating modified chunks) + // that are right next to each other (requiring up to 16 hits to the same LOD position + this.delayedBeaconSaveCache.queueBeaconBeamUpdatesForChunkPos(chunkPos, activeBeamList); } private void updateBeaconBeamsBetweenBlockPos( @@ -298,8 +273,11 @@ public abstract class AbstractDhLevel implements IDhLevel } - // locked to prevent two threads from updating the same section at the same time - ReentrantLock lock = this.beaconUpdateLockContainer.getLockForPos(sectionPosForLock); // TODO this can cause a lot of slow-downs + //LOGGER.info("beacons ["+activeBeamList.size()+"] at x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"]."); + + + // locked to prevent two threads from updating the same position at the same time + ReentrantLock lock = this.beaconUpdateLockContainer.getLockForPos(sectionPosForLock); try { lock.lock(); @@ -330,14 +308,10 @@ public abstract class AbstractDhLevel implements IDhLevel for (DhBlockPos beaconPos : allPosSet) { - if (minBlockX <= beaconPos.getX() && beaconPos.getX() <= maxBlockX - && minBlockZ <= beaconPos.getZ() && beaconPos.getZ() <= maxBlockZ) - { - //// don't modify beacons outside the updated range - //continue; - } - else + if (minBlockX > beaconPos.getX() || beaconPos.getX() > maxBlockX + || minBlockZ > beaconPos.getZ() || beaconPos.getZ() > maxBlockZ) { + // don't modify beacons outside the updated range continue; } @@ -346,12 +320,12 @@ public abstract class AbstractDhLevel implements IDhLevel BeaconBeamDTO activeBeam = activeBeamByPos.get(beaconPos); if (activeBeam != null) { - //LOGGER.info("add beacon ["+activeBeam.blockPos+"] x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"]."); - if (existingBeam == null) { // new beam found, add to DB this.beaconBeamRepo.save(activeBeam); + + //LOGGER.info("new beacon ["+activeBeam+"]."); } else { @@ -360,6 +334,12 @@ public abstract class AbstractDhLevel implements IDhLevel { // beam colors were changed this.beaconBeamRepo.save(activeBeam); + + //LOGGER.info("change beacon ["+existingBeam+"] -> ["+activeBeam+"]."); + } + else + { + //LOGGER.info("existing beacon ["+existingBeam+"]."); } } } @@ -367,7 +347,8 @@ public abstract class AbstractDhLevel implements IDhLevel { // beam no longer exists at position, remove from DB this.beaconBeamRepo.deleteWithKey(beaconPos); - //LOGGER.info("remove beacon ["+beaconPos+"] x["+minBlockX+"]-["+maxBlockX+"] z["+minBlockZ+"]-["+maxBlockZ+"]."); + + //LOGGER.info("remove beacon ["+existingBeam+"]."); } } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/AbstractDelayedSaveCache.java b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/AbstractDelayedSaveCache.java new file mode 100644 index 000000000..7e014a9c9 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/AbstractDelayedSaveCache.java @@ -0,0 +1,249 @@ +package com.seibel.distanthorizons.core.util.delayedSaveCache; + +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; +import com.seibel.distanthorizons.core.util.KeyedLockContainer; +import com.seibel.distanthorizons.core.util.ThreadUtil; +import com.seibel.distanthorizons.core.logging.DhLogger; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Used to batch together multiple updates that all + * affect the same position. + * + * @see AbstractSaveObjContainer + */ +public abstract class AbstractDelayedSaveCache> implements AutoCloseable +{ + private static final DhLogger LOGGER = new DhLoggerBuilder() + .name(AbstractDelayedSaveCache.class.getSimpleName()) + .build(); + + /** + * a cache won't automatically clean itself unless we trigger it's clean method + * if not done then we'd only see the cache invalidate when new inserts happen, + * which causes weird behavior when placing/breaking blocks. + */ + private static final ThreadPoolExecutor BACKGROUND_CLEAN_UP_THREAD = ThreadUtil.makeSingleDaemonThreadPool("delayed save cache cleaner"); + private static final Set>> SAVE_CACHE_SET = Collections.newSetFromMap(new ConcurrentHashMap<>()); + /** how long between clean up checks */ + private static final int CLEANUP_CHECK_TIME_IN_MS = 1_000; + + + + protected final ConcurrentHashMap saveContainerByPosition = new ConcurrentHashMap<>(); + + /* don't let two threads load the same position at the same time */ + protected final KeyedLockContainer saveLockContainer = new KeyedLockContainer<>(); + + /** how long a {@link TSaveContainer} should have lived before being eligible for automatic flushing. */ + protected final int saveDelayInMs; + + + + //=============// + // constructor // + //=============// + //region + + static + { + BACKGROUND_CLEAN_UP_THREAD.execute(() -> runCleanupLoop()); + } + + public AbstractDelayedSaveCache(int saveDelayInMs) + { + // we can't clean items faster than the cleanup timer fires + if (saveDelayInMs < CLEANUP_CHECK_TIME_IN_MS) + { + LOGGER.warn("The save delay ["+saveDelayInMs+"] shouldn't be less than the cleanup check timer interval ["+CLEANUP_CHECK_TIME_IN_MS+"]."); + } + this.saveDelayInMs = saveDelayInMs; + + + SAVE_CACHE_SET.add(new WeakReference<>(this)); + } + + //endregion + + + + //==================// + // abstract methods // + //==================// + //region + + protected abstract TSaveContainer createEmptySaveObjContainer(long inputPos); + + /** when this method is called the {@link TSaveContainer} should no longer be in the memory cache */ + protected abstract void handleDataSourceRemoval(@NotNull TSaveContainer saveContainer); + + //endregion + + + + //==============// + // update queue // + //==============// + //region + + /** + * Writing into memory is done synchronously so inputDataSource can + * be closed after this method finishes. + * + * @param inputObj whether or not this can be null will depend on the specific implementation of this class. + * + * @return the new (or pre-existing) {@link TSaveContainer} so child objects can modify it if needed + */ + public TSaveContainer writeToMemoryAndQueueSave(long inputPos, @Nullable TSaveObj inputObj) + { + ReentrantLock lockForPos = this.saveLockContainer.getLockForPos(inputPos); + try + { + lockForPos.lock(); + + TSaveContainer container = this.saveContainerByPosition.get(inputPos); + if (container == null) + { + // no data currently in the memory cache for this position + container = this.createEmptySaveObjContainer(inputPos); + TSaveContainer oldContainer = this.saveContainerByPosition.put(inputPos, container); + if (oldContainer != null) + { + // shouldn't happen, but just in case + this.handleDataSourceRemoval(oldContainer); + } + } + + // write the new data into memory + container.update(inputObj); + // keep track of when the last time we saved something was + container.updateLastWrittenTimestamp(); + + return container; + } + finally + { + lockForPos.unlock(); + } + } + + //endregion + + + + //==============// + // List methods // + //==============// + //region + + public int getUnsavedCount() { return this.saveContainerByPosition.size(); } + + //endregion + + + + //===============// + // cleanup/flush // + //===============// + //region + + public void flush() { this.cleanUp(true); } + /** Removes everything from the memory cache and fires the {@link AbstractDelayedSaveCache#handleDataSourceRemoval} for each. */ + public void cleanUp(boolean flushAll) + { + Enumeration keyIterator = this.saveContainerByPosition.keys(); + + while (keyIterator.hasMoreElements()) + { + Long pos = keyIterator.nextElement(); + ReentrantLock posLock = this.saveLockContainer.getLockForPos(pos); + try + { + posLock.lock(); + + TSaveContainer savedContainer = this.saveContainerByPosition.get(pos); + if (savedContainer != null) + { + if (flushAll + || savedContainer.hasTimedOut(this.saveDelayInMs)) + { + this.handleDataSourceRemoval(savedContainer); + this.saveContainerByPosition.remove(pos); + } + } + } + finally + { + posLock.unlock(); + } + } + } + + private static void runCleanupLoop() + { + while (true) + { + try + { + try + { + Thread.sleep(CLEANUP_CHECK_TIME_IN_MS); + } + catch (InterruptedException ignore) { } + + SAVE_CACHE_SET.forEach((cacheRef) -> + { + AbstractDelayedSaveCache cache = cacheRef.get(); + if (cache == null) + { + // shouldn't be necessary, but if we forget to manually close a cache, this will prevent leaking + SAVE_CACHE_SET.remove(cacheRef); + } + else + { + cache.cleanUp(false); + } + }); + } + catch (Exception e) + { + LOGGER.error("Unexpected error in cleanup thread: [" + e.getMessage() + "].", e); + } + } + } + + //endregion + + + + //================// + // base overrides // + //================// + //region + + @Override + public void close() + { + // not the fastest way to handle removing, + // but we shouldn't have more than 20 or so at once + // so this should be just fine + SAVE_CACHE_SET.removeIf((cacheRef) -> + { + AbstractDelayedSaveCache cache = cacheRef.get(); + return cache != null && cache.equals(this); + }); + } + + //endregion + + + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/AbstractSaveObjContainer.java b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/AbstractSaveObjContainer.java new file mode 100644 index 000000000..d5e133967 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/AbstractSaveObjContainer.java @@ -0,0 +1,64 @@ +package com.seibel.distanthorizons.core.util.delayedSaveCache; + +import org.jetbrains.annotations.Nullable; + +/** + * used to keep track of when data sources + * were written to so we can flush them once + * enough time has passed. + * + * @see AbstractDelayedSaveCache + */ +public abstract class AbstractSaveObjContainer +{ + /** the last unix millisecond time this data source was written to */ + public long lastWrittenDateTimeMs; + + + + //=============// + // constructor // + //=============// + //region + + public AbstractSaveObjContainer() { this.lastWrittenDateTimeMs = System.currentTimeMillis(); } + + //endregion + + + + //==================// + // abstract methods // + //==================// + //region + + /** + * Can be used to merge multiple objects together if the implementation requires it. + * + * @param newObj whether or not this can be null will depend on the specific implementation of the parent {@link AbstractDelayedSaveCache}. + */ + public abstract void update(@Nullable T newObj); + + //endregion + + + + //=========// + // timeout // + //=========// + //region + + public void updateLastWrittenTimestamp() { this.lastWrittenDateTimeMs = System.currentTimeMillis(); } + + public boolean hasTimedOut(long msTillTimeout) + { + long currentTime = System.currentTimeMillis(); + long timeSinceUpdate = currentTime - this.lastWrittenDateTimeMs; + return (timeSinceUpdate > msTillTimeout); + } + + //endregion + + + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/DelayedBeaconSaveCache.java b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/DelayedBeaconSaveCache.java new file mode 100644 index 000000000..9df778ce3 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/DelayedBeaconSaveCache.java @@ -0,0 +1,172 @@ +package com.seibel.distanthorizons.core.util.delayedSaveCache; + +import com.seibel.distanthorizons.core.logging.DhLogger; +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; +import com.seibel.distanthorizons.core.pos.DhChunkPos; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos; +import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.*; + +public class DelayedBeaconSaveCache extends AbstractDelayedSaveCache +{ + private static final DhLogger LOGGER = new DhLoggerBuilder() + .name(DelayedBeaconSaveCache.class.getSimpleName()) + .build(); + + private final @NotNull ISaveBeaconsFunc saveBeaconsFunc; + + + + //=============// + // constructor // + //=============// + //region + + public DelayedBeaconSaveCache( + @NotNull ISaveBeaconsFunc saveBeaconsFunc, + int saveDelayInMs) + { + super(saveDelayInMs); + this.saveBeaconsFunc = saveBeaconsFunc; + } + + //endregion + + + + //================// + // save to memory // + //================// + //region + + /** + * Replaces {@link DelayedBeaconSaveCache#writeToMemoryAndQueueSave(long, BeaconBeamDTO)} + * due to some additional information being needed. + */ + public void queueBeaconBeamUpdatesForChunkPos(@NotNull DhChunkPos chunkPos, @NotNull List activeBeamList) + { + BeaconSaveObjContainer container = super.writeToMemoryAndQueueSave(DhSectionPos.encodeContaining((byte)6, chunkPos), null); + container.addBeaconsAtChunkPos(chunkPos, activeBeamList); + } + + /** + * deprecated since we want to use {@link DelayedBeaconSaveCache#queueBeaconBeamUpdatesForChunkPos} instead + * so we can track additional info. + */ + @Deprecated + @Override + public BeaconSaveObjContainer writeToMemoryAndQueueSave(long inputPos, BeaconBeamDTO inputObj) { throw new UnsupportedOperationException("Use queueBeaconBeamUpdatesForChunkPos instead"); } + + //endregion + + + + //====================// + // abstract overrides // + //====================// + //region + + @Override + protected BeaconSaveObjContainer createEmptySaveObjContainer(long inputPos) { return new BeaconSaveObjContainer(inputPos); } + + @Override + protected void handleDataSourceRemoval(@NotNull BeaconSaveObjContainer saveContainer) + { + for (DhChunkPos chunkPos : saveContainer.beaconsByBlockPosByChunkPos.keySet()) + { + HashMap beaconsByBlockPos = saveContainer.beaconsByBlockPosByChunkPos.get(chunkPos); + ArrayList beaconList = new ArrayList<>(beaconsByBlockPos.values()); + + int minBlockX = chunkPos.getMinBlockX(); + int minBlockZ = chunkPos.getMinBlockZ(); + int maxBlockX = chunkPos.getMaxBlockX(); + int maxBlockZ = chunkPos.getMaxBlockZ(); + + this.saveBeaconsFunc.updateBeaconBeamsBetweenBlockPos( + saveContainer.pos, + minBlockX, maxBlockX, + minBlockZ, maxBlockZ, + beaconList + ); + } + } + + //endregion + + + + //================// + // helper classes // + //================// + //region + + @FunctionalInterface + public interface ISaveBeaconsFunc + { + /** called after the timeout expires */ + void updateBeaconBeamsBetweenBlockPos( + long sectionPosForLock, + int minBlockX, int maxBlockX, + int minBlockZ, int maxBlockZ, + List activeBeamList + ); + } + + public static class BeaconSaveObjContainer extends AbstractSaveObjContainer + { + public final long pos; + public final HashMap> beaconsByBlockPosByChunkPos = new HashMap<>(); + + + + //=============// + // constructor // + //=============// + //region + + public BeaconSaveObjContainer(long pos) { this.pos = pos; } + + //endregion + + + + //===========// + // overrides // + //===========// + //region + + public void addBeaconsAtChunkPos(DhChunkPos chunkPos, List activeBeamList) + { + HashMap beaconsByBlockPos = this.beaconsByBlockPosByChunkPos.get(chunkPos); + if (!this.beaconsByBlockPosByChunkPos.containsKey(chunkPos)) + { + beaconsByBlockPos = new HashMap<>(); + this.beaconsByBlockPosByChunkPos.put(chunkPos, beaconsByBlockPos); + } + + for (BeaconBeamDTO beacon : activeBeamList) + { + beaconsByBlockPos.put(beacon.blockPos, beacon); + } + } + + /** + * This logic is handled via {@link BeaconSaveObjContainer#addBeaconsAtChunkPos} instead + * due to requiring some additional information. + */ + @Override + public void update(@Nullable BeaconBeamDTO newObj) { } + + //endregion + + } + + //endregion + + + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/DelayedDataSourceSaveCache.java b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/DelayedDataSourceSaveCache.java new file mode 100644 index 000000000..e2e435c84 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/delayedSaveCache/DelayedDataSourceSaveCache.java @@ -0,0 +1,114 @@ +package com.seibel.distanthorizons.core.util.delayedSaveCache; + +import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.logging.DhLogger; +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.CompletableFuture; + +public class DelayedDataSourceSaveCache extends AbstractDelayedSaveCache +{ + private static final DhLogger LOGGER = new DhLoggerBuilder() + .name(DelayedDataSourceSaveCache.class.getSimpleName()) + .build(); + + private final ISaveDataSourceFunc onSaveTimeoutAsyncFunc; + + + + //=============// + // constructor // + //=============// + //region + + public DelayedDataSourceSaveCache(@NotNull ISaveDataSourceFunc onSaveTimeoutAsyncFunc, int saveDelayInMs) + { + super(saveDelayInMs); + this.onSaveTimeoutAsyncFunc = onSaveTimeoutAsyncFunc; + } + + //endregion + + + + //===========// + // overrides // + //===========// + //region + + public void writeToMemoryAndQueueSave(@NotNull FullDataSourceV2 inputObj) { super.writeToMemoryAndQueueSave(inputObj.getPos(), inputObj); } + + @Override + protected DataSourceSaveObjContainer createEmptySaveObjContainer(long inputPos) { return new DataSourceSaveObjContainer(inputPos); } + + @Override + protected void handleDataSourceRemoval(@NotNull DataSourceSaveObjContainer saveContainer) + { + FullDataSourceV2 removedDataSource = saveContainer.dataSource; + this.onSaveTimeoutAsyncFunc.saveAsync(removedDataSource) + .handle((voidObj, throwable) -> + { + try + { + // if this close method is fired multiple times + // monoliths can appear due to concurrent writing to the + // backend arrays + removedDataSource.close(); + } + catch (Exception e) + { + LOGGER.error("Unable to close datasource ["+ DhSectionPos.toString(removedDataSource.getPos()) +"], error: ["+e.getMessage()+"].", e); + } + + return null; + }); + } + + //endregion + + + + //================// + // helper classes // + //================// + //region + + @FunctionalInterface + public interface ISaveDataSourceFunc + { + /** called after the timeout expires */ + CompletableFuture saveAsync(@NotNull FullDataSourceV2 inputDataSource); + } + + public static class DataSourceSaveObjContainer extends AbstractSaveObjContainer + { + private final @NotNull FullDataSourceV2 dataSource; + + public DataSourceSaveObjContainer(long inputPos) + { + this.dataSource = FullDataSourceV2.createEmpty(inputPos); + } + + @Override + public void update(@Nullable FullDataSourceV2 newObj) + { + // shouldn't happen, but just in case + if (newObj == null) + { + throw new NullPointerException(); + } + + this.dataSource.updateFromDataSource(newObj); + } + + } + + //endregion + + + + +} diff --git a/core/src/test/java/tests/DelayedSaveCacheTest.java b/core/src/test/java/tests/DelayedSaveCacheTest.java index c7e01d334..a679fa3ef 100644 --- a/core/src/test/java/tests/DelayedSaveCacheTest.java +++ b/core/src/test/java/tests/DelayedSaveCacheTest.java @@ -20,16 +20,21 @@ package tests; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; -import com.seibel.distanthorizons.core.file.fullDatafile.DelayedFullDataSourceSaveCache; +import com.seibel.distanthorizons.core.util.delayedSaveCache.DelayedDataSourceSaveCache; +import com.seibel.distanthorizons.core.logging.DhLogger; +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.util.objects.pooling.PhantomArrayListCheckout; import com.seibel.distanthorizons.core.pos.DhSectionPos; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; import org.junit.Assert; +import org.junit.Test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; /** - * A few very basic tests to confirm {@link DelayedFullDataSourceSaveCache} + * A few very basic tests to confirm {@link DelayedDataSourceSaveCache} * is working properly. * * @author James Seibel @@ -37,19 +42,27 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class DelayedSaveCacheTest { + private static final DhLogger LOGGER = new DhLoggerBuilder().build(); + + static + { + // allow all logging levels + Configurator.setRootLevel(Level.ALL); + } + // commented out for now since it makes the normal build take longer - //@Test + @Test public void CacheExpirationAndPoolingTest() throws InterruptedException { // how many times any data source has been "written to disk" AtomicInteger diskSaveCountRef = new AtomicInteger(0); - DelayedFullDataSourceSaveCache cache = new DelayedFullDataSourceSaveCache((FullDataSourceV2 fullDataSource) -> + DelayedDataSourceSaveCache cache = new DelayedDataSourceSaveCache((FullDataSourceV2 fullDataSource) -> { diskSaveCountRef.getAndIncrement(); - return this.onDataSourceSaveAsync(fullDataSource); + return CompletableFuture.completedFuture(null); }, 1_000); @@ -58,11 +71,13 @@ public class DelayedSaveCacheTest // single item and manual flush // //==============================// + //LOGGER.info("============ Single item - manual flush ============"); + PhantomArrayListCheckout initialCheckout; try (FullDataSourceV2 initialSource = FullDataSourceV2.createEmpty(DhSectionPos.encode((byte)6, 0, 0))) { initialCheckout = initialSource.getPhantomArrayCheckoutForUnitTesting(); - cache.writeDataSourceToMemoryAndQueueSave(initialSource); + cache.writeToMemoryAndQueueSave( initialSource); } Assert.assertEquals("only 1 item should be in the cache", 1, cache.getUnsavedCount()); Assert.assertEquals("no disk saves should have happened yet", 0, diskSaveCountRef.get()); @@ -78,6 +93,8 @@ public class DelayedSaveCacheTest // quick group position // //======================// + //LOGGER.info("============ quick group - auto flush ============"); + // write multiple items for the same position for (int i = 0; i < 4; i++) { @@ -86,7 +103,7 @@ public class DelayedSaveCacheTest PhantomArrayListCheckout loopCheckout = loopSource.getPhantomArrayCheckoutForUnitTesting(); Assert.assertEquals(initialCheckout, loopCheckout); - cache.writeDataSourceToMemoryAndQueueSave(loopSource); + cache.writeToMemoryAndQueueSave(loopSource); } } // each item writes to the same place @@ -104,6 +121,8 @@ public class DelayedSaveCacheTest // slow group position // //=====================// + //LOGGER.info("============ slow group - auto flush ============"); + // write multiple items for the same position for (int i = 0; i < 4; i++) { @@ -112,7 +131,7 @@ public class DelayedSaveCacheTest PhantomArrayListCheckout loopCheckout = loopSource.getPhantomArrayCheckoutForUnitTesting(); Assert.assertEquals(initialCheckout, loopCheckout); - cache.writeDataSourceToMemoryAndQueueSave(loopSource); + cache.writeToMemoryAndQueueSave(loopSource); } // long enough to prevent a timeout, but short enough that they don't happen all at once @@ -128,7 +147,7 @@ public class DelayedSaveCacheTest Assert.assertEquals("third timeout expected", 3, diskSaveCountRef.get()); } - private CompletableFuture onDataSourceSaveAsync(FullDataSourceV2 fullDataSource) - { return CompletableFuture.completedFuture(null); } + + }