diff --git a/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java b/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java index a61287f87..b9b444732 100644 --- a/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java +++ b/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java @@ -31,7 +31,7 @@ public final class ModInfo public static final String DEDICATED_SERVER_INITIAL_PATH = "dedicated_server_initial"; /** Incremented every time any packets are added, changed or removed, with a few exceptions. */ - public static final int PROTOCOL_VERSION = 7; + public static final int PROTOCOL_VERSION = 8; public static final String WRAPPER_PACKET_PATH = "message"; /** The internal mod name */ diff --git a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java index 192110a5b..d79be8372 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java @@ -66,7 +66,7 @@ public class SharedApi private static final UpdateChunkPosManager UPDATE_POS_MANAGER = new UpdateChunkPosManager(); /** how many chunks can be queued for updating per thread, used to prevent updates from infinitely pilling up if the user flies around extremely fast */ - private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 500; + private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 1_000; /** how many milliseconds must pass before an overloaded message can be sent in chat or the log */ private static final int MIN_MS_BETWEEN_OVERLOADED_LOG_MESSAGE = 30_000; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java index bd8f2191b..03e55bdfe 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java @@ -1340,6 +1340,7 @@ public class Config .build(); public static final ConfigEntry numberOfUpdatePropagatorThreads = new ConfigEntry.Builder() + .setServersideShortName("numberOfUpdatePropagatorThreads") .setMinDefaultMax(1, ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultThreadCount(), Runtime.getRuntime().availableProcessors()) @@ -1360,6 +1361,7 @@ public class Config + THREAD_NOTE) .build(); public static final ConfigEntry runTimeRatioForUpdatePropagatorThreads = new ConfigEntry.Builder() + .setServersideShortName("runTimeRatioForUpdatePropagatorThreads") .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultRunTimeRatio(), 1.0) .comment(THREAD_RUN_TIME_RATIO_NOTE) .build(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java index 6a8f4b201..1ce14c48e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java @@ -21,13 +21,13 @@ package com.seibel.distanthorizons.core.file.fullDatafile; import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; import com.seibel.distanthorizons.core.api.internal.ClientApi; -import com.seibel.distanthorizons.core.api.internal.SharedApi; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV1; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; import com.seibel.distanthorizons.core.file.AbstractDataSourceHandler; +import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult; import com.seibel.distanthorizons.core.level.IDhLevel; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.pos.DhSectionPos; @@ -40,7 +40,6 @@ import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo; import com.seibel.distanthorizons.core.util.ThreadUtil; import com.seibel.distanthorizons.core.util.objects.DataCorruptedException; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; -import com.seibel.distanthorizons.core.world.EWorldEnvironment; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.logging.log4j.Logger; @@ -137,15 +136,8 @@ public class FullDataSourceProviderV2 this.migrationThreadPool.execute(this::convertLegacyDataSources); // update propagation doesn't need to be run on the server since only the highest detail level is needed - if (SharedApi.getEnvironment() != EWorldEnvironment.SERVER_ONLY) - { - this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue ["+levelId+"]"); - this.updateQueueProcessor.execute(this::runUpdateQueue); - } - else - { - this.updateQueueProcessor = null; - } + this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue [" + levelId + "]"); + this.updateQueueProcessor.execute(this::runUpdateQueue); } @@ -604,7 +596,8 @@ public class FullDataSourceProviderV2 public int getMaxPossibleRetrievalPositionCountForPos(Long pos) { return -1; } /** @return true if the position was queued, false if not */ - public boolean queuePositionForRetrieval(Long genPos) { return false; } + @Nullable + public CompletableFuture queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests) { return null; } /** does nothing if the given position isn't present in the queue */ public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf) { } @@ -632,8 +625,6 @@ public class FullDataSourceProviderV2 @Nullable public Long getTimestampForPos(long pos) { return this.repo.getTimestampForPos(pos); } - public Map getTimestampsForRange(byte detailLevel, int startPosX, int startPosZ, int endPosX, int endPosZ) - { return this.repo.getTimestampsForRange(detailLevel, startPosX, startPosZ, endPosX, endPosZ); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index 4aeacfb65..f347086e7 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -38,6 +38,7 @@ import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.coreapi.util.BitShiftUtil; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.awt.*; @@ -60,10 +61,10 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im * TODO this should be dynamically allocated based on CPU load * and abilities. */ - public static final int MAX_WORLD_GEN_REQUESTS_PER_THREAD = 20; + public static final int MAX_WORLD_GEN_REQUESTS_PER_THREAD = 20; - private final AtomicReference worldGenQueueRef = new AtomicReference<>(null); + protected final AtomicReference worldGenQueueRef = new AtomicReference<>(null); private final ArrayList onWorldGenTaskCompleteListeners = new ArrayList<>(); protected final DelayedFullDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedFullDataSourceSaveCache(this::onDataSourceSave, 5_000); @@ -170,8 +171,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im } - IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get(); - if (worldGenQueue == null) + if (this.worldGenQueueRef.get() == null) { // we can't queue anything if the world generator isn't set up yet return false; @@ -208,25 +208,47 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im return false; } - - // don't queue additional world gen requests beyond the max allotted count - return worldGenQueue.getWaitingTaskCount() < maxQueueCount; + return true; } @Override - public boolean queuePositionForRetrieval(Long genPos) + public CompletableFuture queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests) { IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get(); if (worldGenQueue == null) { - return false; + return null; + } + + if (!allowAboveMaxGenRequests) + { + int maxQueueCount = MAX_WORLD_GEN_REQUESTS_PER_THREAD * Config.Common.MultiThreading.numberOfWorldGenerationThreads.get(); + if (worldGenQueue.getWaitingTaskCount() >= maxQueueCount) + { + return null; + } } WorldGenTaskTracker genTaskTracker = new WorldGenTaskTracker(genPos); CompletableFuture worldGenFuture = worldGenQueue.submitRetrievalTask(genPos, (byte) (DhSectionPos.getDetailLevel(genPos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL), genTaskTracker); - worldGenFuture.whenComplete((genTaskResult, ex) -> this.onWorldGenTaskComplete(genTaskResult, ex)); + worldGenFuture.whenComplete((genTaskResult, ex) -> + { + LOGGER.info("gen task complete ["+DhSectionPos.toString(genPos)+"]"); + //this.onWorldGenTaskComplete(genTaskResult, ex); + }); - return true; + return worldGenFuture; + } + + @Override + protected void updateDataSourceAtPos(long updatePos, @NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos) + { + super.updateDataSourceAtPos(updatePos, inputData, lockOnUpdatePos); + + //if (SharedApi.getEnvironment() != EWorldEnvironment.CLIENT_ONLY) + // LOGGER.info("updated ["+DhSectionPos.toString(updatePos)+"]"); + + this.onWorldGenTaskComplete(WorldGenResult.CreateSuccess(updatePos), null); } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java index f35cff1b5..48c2f4e8f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java @@ -20,24 +20,20 @@ package com.seibel.distanthorizons.core.file.fullDatafile; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; import com.seibel.distanthorizons.core.generation.RemoteWorldRetrievalQueue; -import com.seibel.distanthorizons.core.level.IDhClientLevel; +import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult; +import com.seibel.distanthorizons.core.level.IDhLevel; import com.seibel.distanthorizons.core.level.WorldGenModule; -import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.multiplayer.client.SyncOnLoadRequestQueue; -import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; -import com.seibel.distanthorizons.core.util.TimerUtil; -import com.seibel.distanthorizons.coreapi.util.BitShiftUtil; -import org.apache.logging.log4j.Logger; +import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import org.jetbrains.annotations.Nullable; import java.io.File; -import java.util.Map; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** @@ -46,10 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvider { - private static final Logger LOGGER = DhLoggerBuilder.getLogger(); - private static final Timer DELAY_UPDATE_TIMER = TimerUtil.CreateTimer("Remote DataSource Visited Pos Removal Timer"); - /** auto remove visited positions from the set after a given amount of time to prevent the set from growing infinitely */ - private static final int VISITED_POSITION_REMOVAL_TIME_IN_MS = 20 * 60 * 1_000; // 20 minutes + private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class); @Nullable private final SyncOnLoadRequestQueue syncOnLoadRequestQueue; @@ -62,7 +55,7 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide //=============// public RemoteFullDataSourceProvider( - IDhClientLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride, + IDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride, @Nullable SyncOnLoadRequestQueue syncOnLoadRequestQueue) { super(level, saveStructure, saveDirOverride); @@ -70,24 +63,6 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide } - @Override - public boolean queuePositionForRetrieval(Long genPos) - { - if (this.syncOnLoadRequestQueue == null) - { - return super.queuePositionForRetrieval(genPos); - } - - int maxGenerationRequestDistance = this.syncOnLoadRequestQueue.networkState.sessionConfig.getMaxGenerationRequestDistance(); - DhBlockPos2D targetPos = this.level.getTargetPosForGeneration(); - if (targetPos == null || DhSectionPos.getChebyshevSignedBlockDistance(genPos, targetPos) / 16 > maxGenerationRequestDistance) - { - return false; - } - - return super.queuePositionForRetrieval(genPos); - } - //==================// // override methods // @@ -110,62 +85,32 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide } + //===========================// // request timestamp updates // // from server // //===========================// - // get the timestamp for every maximum detail position in this section - int posToMinimumDetailScale = BitShiftUtil.powerOfTwo(DhSectionPos.getDetailLevel(pos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL); - Map timestamps = this.getTimestampsForRange( - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, - DhSectionPos.getX(pos) * posToMinimumDetailScale, - DhSectionPos.getZ(pos) * posToMinimumDetailScale, - (DhSectionPos.getX(pos) + 1) * posToMinimumDetailScale, - (DhSectionPos.getZ(pos) + 1) * posToMinimumDetailScale - ); - - DhSectionPos.forEachChildAtDetailLevel(pos, DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos -> + Long timestamp = this.getTimestampForPos(pos); + if (timestamp != null) { - int maxSyncOnLoadDistance = this.syncOnLoadRequestQueue.networkState.sessionConfig.getMaxSyncOnLoadDistance(); - DhBlockPos2D targetPos = this.level.getTargetPosForGeneration(); - if (targetPos == null || DhSectionPos.getChebyshevSignedBlockDistance(childPos, targetPos) / 16 > maxSyncOnLoadDistance) - { - return; - } - - if (!this.visitedPositions.add(childPos)) - { - return; - } - this.queueVisitedPositionForRemoval(childPos); - - // check if the server has newer versions of these LODs - Long subTimestamp = timestamps.get(childPos); - if (subTimestamp != null) - { - this.syncOnLoadRequestQueue.submitRequest(childPos, subTimestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave); - } - }); + this.syncOnLoadRequestQueue.submitRequest(pos, timestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave); + } return super.get(pos); } - /** this is done to prevent infinite set growth */ - private void queueVisitedPositionForRemoval(long pos) + + + @Override + public CompletableFuture queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests) { - TimerTask timerTask = new TimerTask() + RemoteWorldRetrievalQueue worldGenQueue = (RemoteWorldRetrievalQueue) this.worldGenQueueRef.get(); + if (worldGenQueue == null) { - @Override - public void run() - { - RemoteFullDataSourceProvider.this.visitedPositions.remove(pos); - } - }; - try - { - DELAY_UPDATE_TIMER.schedule(timerTask, VISITED_POSITION_REMOVAL_TIME_IN_MS); + return null; } - catch (IllegalStateException ignore) { /* shouldn't happen, but there have been issues like this in the past */ } + + return super.queuePositionForRetrieval(genPos, worldGenQueue.isPosCloserThanFarthestWaiting(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos()), genPos)); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java index 18a5c4ad9..bef67c9a6 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java @@ -7,11 +7,14 @@ import com.seibel.distanthorizons.core.level.DhClientLevel; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.multiplayer.client.AbstractFullDataNetworkRequestQueue; import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState; +import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.util.LodUtil; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQueue implements IFullDataSourceRetrievalQueue, IDebugRenderable @@ -39,7 +42,7 @@ public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQue public void startAndSetTargetPos(DhBlockPos2D targetPos) { super.tick(targetPos); } @Override - public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; } + public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL + 12; } // TODO should be the same as what the server's update propgator can provide @Override public byte highestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; } @@ -47,9 +50,23 @@ public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQue public CompletableFuture submitRetrievalTask(long sectionPos, byte requiredDataDetail, IWorldGenTaskTracker tracker) { return super.submitRequest(sectionPos, tracker.getDataSourceConsumer()) - .thenApply(retrievalSuccess -> retrievalSuccess - ? WorldGenResult.CreateSuccess(sectionPos) - : WorldGenResult.CreateFail()); + .thenApply(requestResult -> + { + switch (requestResult) + { + case SUCCEEDED: + return WorldGenResult.CreateSuccess(sectionPos); + case FAILED: + return WorldGenResult.CreateFail(); + case REQUIRES_SPLITTING: + List> childFutures = new ArrayList<>(4); + DhSectionPos.forEachChild(sectionPos, childPos -> childFutures.add(this.submitRetrievalTask(childPos, requiredDataDetail, tracker))); + return WorldGenResult.CreateSplit(childFutures); + } + + LodUtil.assertNotReach(); + return WorldGenResult.CreateFail(); + }); } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java index 05fefd901..13f89c920 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java @@ -479,8 +479,8 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // getters / setters // //===================// - public int getWaitingTaskCount() { return this.waitingTasks.size(); } - public int getInProgressTaskCount() { return this.inProgressGenTasksByLodPos.size(); } + @Override public int getWaitingTaskCount() { return this.waitingTasks.size(); } + @Override public int getInProgressTaskCount() { return this.inProgressGenTasksByLodPos.size(); } @Override public byte lowestDataDetail() { return this.lowestDataDetail; } @@ -492,7 +492,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb @Override public void setEstimatedTotalTaskCount(int newEstimate) { this.estimatedTotalTaskCount = newEstimate; } - public void addDebugMenuStringsToList(List messageList) { } + @Override public void addDebugMenuStringsToList(List messageList) { } @@ -500,7 +500,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // shutdown // //==========// - public CompletableFuture startClosingAsync(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) + @Override public CompletableFuture startClosingAsync(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) { LOGGER.info("Closing world gen queue"); this.queueingThread.shutdownNow(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index df537730b..c49b3b113 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -4,9 +4,9 @@ import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; -import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.logging.f3.F3Screen; +import com.seibel.distanthorizons.core.multiplayer.server.FullDataSourceRequestHandler; import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState; import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager; import com.seibel.distanthorizons.core.network.exceptions.RequestOutOfRangeException; @@ -17,7 +17,6 @@ import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage; import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; -import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage; import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; @@ -27,24 +26,16 @@ import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import javax.annotation.CheckForNull; import javax.annotation.Nullable; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.*; public abstract class AbstractDhServerLevel extends AbstractDhLevel implements IDhServerLevel { protected static final Logger LOGGER = DhLoggerBuilder.getLogger(); - private static final ConfigBasedLogger NETWORK_LOGGER = new ConfigBasedLogger(LogManager.getLogger(), - () -> Config.Common.Logging.logNetworkEvent.get()); - - /** 1 Mebibyte minus 576 bytes for other info */ - public static final int FULL_DATA_SPLIT_SIZE_IN_BYTES = 1_048_000; public final ServerLevelModule serverside; protected final IServerLevelWrapper serverLevelWrapper; @@ -58,9 +49,9 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I */ protected final ConcurrentLinkedQueue worldGenPlayerCenteringQueue = new ConcurrentLinkedQueue<>(); - private final ConcurrentMap requestGroupByPos = new ConcurrentHashMap<>(); - private final ConcurrentMap requestGroupByFutureId = new ConcurrentHashMap<>(); + private final FullDataSourceRequestHandler requestHandler = new FullDataSourceRequestHandler(this); + private final boolean NSizedGenerationSupported = false; //=============// @@ -104,52 +95,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I @Override public void serverTick() { - // Send finished data source requests - for (Map.Entry entry : this.requestGroupByPos.entrySet()) - { - DataSourceRequestGroup requestGroup = entry.getValue(); - - if (requestGroup.fullDataSource == null) - { - continue; - } - - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]"); - - // Make this group unavailable for adding into - this.requestGroupByPos.remove(entry.getKey()); - requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); - if (executor == null) - { - LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); - continue; - } - CompletableFuture.runAsync(() -> - { - Objects.requireNonNull(this.beaconBeamRepo); - try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey()))) - { - for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) - { - this.requestGroupByFutureId.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState == null) - { - continue; - } - - serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { - msg.sendResponse(new FullDataSourceResponseMessage(payload)); - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - }); - } - } - }, executor); - } + this.requestHandler.tick(); } @Override @@ -188,9 +134,8 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I { serverPlayerState.networkSession.registerHandler(FullDataSourceRequestMessage.class, (message) -> { - if (!this.messagePlayerInThisLevel(message)) + if (!this.validatePlayerInCurrentLevel(message)) { - // we can't handle players in other levels, don't continue return; } @@ -206,7 +151,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I message.sendResponse(new RequestOutOfRangeException("Distance too large: " + distanceFromPlayer + " > " + Config.Server.maxGenerationRequestDistance.get())); return; } - this.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet); + this.requestHandler.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet); } else { @@ -215,131 +160,20 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I message.sendResponse(new RequestOutOfRangeException("Distance too large: " + distanceFromPlayer + " > " + Config.Server.maxSyncOnLoadRequestDistance.get())); return; } - this.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet); + this.requestHandler.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet); } }); serverPlayerState.networkSession.registerHandler(CancelMessage.class, msg -> { - DataSourceRequestGroup requestGroup = this.requestGroupByFutureId.remove(msg.futureId); - if (requestGroup == null) - { - return; - } - - // If this fails, the group is being removed and completing cancellation is not necessary - if (requestGroup.requestRemoveSemaphore.tryAcquire()) - { - // Prevent adding requests in case the group will be removed by this cancellation - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestRemoveSemaphore.release(); - - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - - FullDataSourceRequestMessage requestMessage = requestGroup.requestMessages.remove(msg.futureId); - if (requestGroup.requestMessages.isEmpty()) - { - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestMessage.sectionPos) + "]."); - this.requestGroupByPos.remove(requestMessage.sectionPos); - this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos); - } - else - { - requestGroup.requestAddSemaphore.release(Short.MAX_VALUE); - } - } + this.requestHandler.cancelRequest(msg.futureId); }); } - private void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) - { - if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad()) - { - message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); - return; - } - - if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message)) - { - return; - } - - - // the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated - long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1; - // the server timestamp will be null if no LOD data exists for this position - Long serverTimestamp = this.serverside.fullDataFileHandler.getTimestampForPos(message.sectionPos); - if (serverTimestamp == null - || serverTimestamp <= clientTimestamp) - { - // either no data exists to sync, or the client is already up to date - rateLimiterSet.syncOnLoginRateLimiter.release(); - message.sendResponse(new FullDataSourceResponseMessage(null)); - return; - } - - - - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); - if (executor == null) - { - // shouldn't normally happen, but just in case - LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); - return; - } - - this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource -> - { - Objects.requireNonNull(this.beaconBeamRepo); - try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos))) - { - serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { - message.sendResponse(new FullDataSourceResponseMessage(payload)); - rateLimiterSet.syncOnLoginRateLimiter.release(); - }); - } - }, executor); - } - private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) - { - if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled()) - { - message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); - return; - } - - if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message)) - { - return; - } - - while (true) - { - DataSourceRequestGroup requestGroup = this.requestGroupByPos.computeIfAbsent(message.sectionPos, pos -> - { - DataSourceRequestGroup newGroup = new DataSourceRequestGroup(); - this.tryFulfillDataSourceRequestGroup(newGroup, pos); - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "]."); - return newGroup; - }); - - // If this fails, loop until either a permit is acquired or the group is removed to create another one - if (!requestGroup.requestAddSemaphore.tryAcquire()) - { - Thread.yield(); - continue; - } - - this.requestGroupByFutureId.put(message.futureId, requestGroup); - requestGroup.requestMessages.put(message.futureId, message); - requestGroup.requestAddSemaphore.release(); - break; - } - } /** May send an error message in response if the message is a {@link AbstractTrackableMessage} */ - private boolean messagePlayerInThisLevel(T message) + private boolean validatePlayerInCurrentLevel(T message) { if (!(message instanceof ILevelRelatedMessage)) { @@ -383,35 +217,12 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I // world gen // //===========// + public boolean isNSizedGenerationSupported() { return this.NSizedGenerationSupported; } + @Override public void onWorldGenTaskComplete(long pos) { - DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos); - if (requestGroup != null) - { - requestGroup.worldGenTaskComplete = true; - this.tryFulfillDataSourceRequestGroup(requestGroup, pos); - } - } - - private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos) - { - this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource -> - { - if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps)) - { - requestGroup.fullDataSource = fullDataSource; - } - else if (requestGroup.worldGenTaskComplete) - { - // If the returned data source is not fully generated, try reading it again - this.tryFulfillDataSourceRequestGroup(requestGroup, pos); - } - else - { - this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos); - } - }); + this.requestHandler.onWorldGenTaskComplete(pos); } @@ -551,31 +362,4 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I LOGGER.info("Closed DHLevel for [" + this.getLevelWrapper() + "]."); } - - - //================// - // helper classes // - //================// - - private static class DataSourceRequestGroup - { - public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); - - /** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */ - boolean worldGenTaskComplete = false; - - @CheckForNull - public FullDataSourceV2 fullDataSource = null; - - /** - * These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled, - * as opposed to ReentrantReadWriteLocks which would allow the locking thread continue using it anyway.
- * Short.MAX_VALUE is chosen as a large enough number so non-exclusive accesses never block each other. - */ - public final Semaphore requestAddSemaphore = new Semaphore(Short.MAX_VALUE, true); - /** @see DataSourceRequestGroup#requestAddSemaphore */ - public final Semaphore requestRemoveSemaphore = new Semaphore(Short.MAX_VALUE, true); - - } - } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index 330f79cb6..daf7881bd 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -10,6 +10,7 @@ import com.seibel.distanthorizons.core.logging.ConfigBasedSpamLogger; import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; import com.seibel.distanthorizons.core.network.exceptions.RequestOutOfRangeException; import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; import com.seibel.distanthorizons.core.network.session.SessionClosedException; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.*; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -105,30 +107,44 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende // request submitting // //====================// - public CompletableFuture submitRequest(long sectionPos, Consumer dataSourceConsumer) + public CompletableFuture submitRequest(long sectionPos, Consumer dataSourceConsumer) { return this.submitRequest(sectionPos, null, dataSourceConsumer); } - public CompletableFuture submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer dataSourceConsumer) + public CompletableFuture submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer dataSourceConsumer) { - LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); - - RequestQueueEntry entry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp); - entry.future.whenComplete((success, throwable) -> + AtomicBoolean added = new AtomicBoolean(false); + RequestQueueEntry entry = this.waitingTasksBySectionPos.compute(sectionPos, (k, existingQueueEntry) -> { - this.waitingTasksBySectionPos.remove(sectionPos); - - if (throwable instanceof CancellationException) + if (existingQueueEntry != null) { - return; + return existingQueueEntry; } - this.finishedRequests.incrementAndGet(); - if (!success || throwable != null) + RequestQueueEntry newEntry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp); + newEntry.future.whenComplete((requestResult, throwable) -> { - this.failedRequests.incrementAndGet(); - } + this.waitingTasksBySectionPos.remove(sectionPos); + + if (requestResult != RequestResult.REQUIRES_SPLITTING) + { + this.finishedRequests.incrementAndGet(); + } + + if ((requestResult == null || requestResult == RequestResult.FAILED) + || (throwable != null && !(throwable instanceof CancellationException))) + { + this.failedRequests.incrementAndGet(); + } + }); + + added.set(true); + return newEntry; }); - this.waitingTasksBySectionPos.put(sectionPos, entry); + if (!added.get()) + { + return CompletableFuture.completedFuture(RequestResult.FAILED); + } + return entry.future; } @@ -164,7 +180,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende { Map.Entry mapEntry = this.waitingTasksBySectionPos.entrySet().stream() .filter(task -> task.getValue().networkDataSourceFuture == null) - .min(Comparator.comparingInt(task -> DhSectionPos.getChebyshevSignedBlockDistance(task.getKey(), targetPos))) + .min(Comparator.comparingInt(x -> DhSectionPos.getChebyshevSignedBlockDistance(x.getKey(), targetPos))) .orElse(null); if (mapEntry == null) @@ -190,7 +206,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende CompletableFuture dataSourceFuture = this.networkState.getSession().sendRequest( new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, offsetEntryTimestamp), FullDataSourceResponseMessage.class - ); + ); entry.networkDataSourceFuture = dataSourceFuture; dataSourceFuture.handle((response, throwable) -> { @@ -213,6 +229,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende LOGGER.warn("Unable to handle FullDataPayload - getNetworkCompressionExecutor() is null"); return null; } + CompletableFuture.runAsync(() -> { try @@ -233,6 +250,10 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende LodUtil.assertTrue(this.changedOnly, "Received empty data source response for not changes-only request"); } } + catch (SectionRequiresSplittingException ignored) + { + return entry.future.complete(RequestResult.REQUIRES_SPLITTING); + } catch (SessionClosedException | CancellationException ignored) { return entry.future.cancel(false); @@ -240,7 +261,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende catch (RequestRejectedException e) { LOGGER.info("Request rejected by the server: " + e.getMessage()); - return entry.future.complete(false); + return entry.future.complete(RequestResult.FAILED); } catch (RateLimitedException e) { @@ -272,15 +293,28 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } else { - return entry.future.complete(false); + return entry.future.complete(RequestResult.FAILED); } } - return entry.future.complete(true); + return entry.future.complete(RequestResult.SUCCEEDED); }); } + public boolean isPosCloserThanFarthestWaiting(DhBlockPos2D targetPos, long pos) + { + Long farthestPos = this.waitingTasksBySectionPos + .keySet().stream() + .max(Comparator.comparingInt(x -> DhSectionPos.getChebyshevSignedBlockDistance(x, targetPos))) + .orElse(null); + if (farthestPos == null) + { + return true; + } + + return DhSectionPos.getChebyshevSignedBlockDistance(pos, targetPos) <= DhSectionPos.getChebyshevSignedBlockDistance(farthestPos, targetPos); + } //=========================================// @@ -387,7 +421,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende protected static class RequestQueueEntry { /** encapsulates the entire request, including client side queuing and the actual server request */ - public final CompletableFuture future = new CompletableFuture<>(); + public final CompletableFuture future = new CompletableFuture<>(); public final Consumer dataSourceConsumer; /** will be null if we want to retrieve the LOD regardless of when it was last updated */ @Nullable @@ -417,6 +451,13 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } + public enum RequestResult + { + SUCCEEDED, + REQUIRES_SPLITTING, + FAILED, + } + } \ No newline at end of file diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java new file mode 100644 index 000000000..fbe415d33 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java @@ -0,0 +1,109 @@ +package com.seibel.distanthorizons.core.multiplayer.server; + +import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; + +import javax.annotation.CheckForNull; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +class DataSourceRequestGroup +{ + public final long pos; + + /** + * If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps + */ + private boolean worldGenTaskComplete = false; + + void markWorldGenTaskComplete() + { + this.worldGenTaskComplete = true; + } + + boolean isWorldGenTaskComplete() + { + return this.worldGenTaskComplete; + } + + @CheckForNull + public FullDataSourceV2 fullDataSource = null; + + public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); + public final Semaphore pendingAdditionSemaphore = new Semaphore(Short.MAX_VALUE, true); + public final AtomicBoolean isClosed = new AtomicBoolean(); + + + DataSourceRequestGroup(long pos) + { + this.pos = pos; + } + + public boolean tryClose() + { + if (!this.isClosed.compareAndSet(false, true)) + { + return false; + } + + this.pendingAdditionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + return true; + } + + public boolean tryAddRequest(RequestData requestData) + { + if (!this.pendingAdditionSemaphore.tryAcquire()) + { + return false; + } + + this.requestMessages.put(requestData.futureId(), requestData); + this.pendingAdditionSemaphore.release(); + + return true; + } + + + public RequestData tryRemoveRequest(long requestId, IHangingRequestTransferConsumer hangingRequestTransferConsumer) + { + RequestData removed = this.requestMessages.remove(requestId); + + if (this.requestMessages.isEmpty() && this.tryClose()) + { + hangingRequestTransferConsumer.accept(this.requestMessages.values()); + } + + return removed; + } + + + static class RequestData + { + public final ServerPlayerState serverPlayerState; + public final ServerPlayerState.RateLimiterSet rateLimiterSet; + + public final FullDataSourceRequestMessage message; + public long futureId() { return this.message.futureId; } + public long sectionPos() { return this.message.sectionPos; } + + RequestData(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + this.serverPlayerState = serverPlayerState; + this.rateLimiterSet = rateLimiterSet; + this.message = message; + } + + } + + /** + * While closing this group, some requests may slip through and end up lost.
+ * This is a workaround that allows the caller to transfer these requests to a new group. + */ + @FunctionalInterface + interface IHangingRequestTransferConsumer extends Consumer> { } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java new file mode 100644 index 000000000..f52d623fb --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java @@ -0,0 +1,264 @@ +package com.seibel.distanthorizons.core.multiplayer.server; + +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataSourceProvider; +import com.seibel.distanthorizons.core.level.AbstractDhServerLevel; +import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; +import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO; +import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; +import org.apache.logging.log4j.LogManager; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; + +public class FullDataSourceRequestHandler +{ + private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(), + () -> Config.Common.Logging.logNetworkEvent.get()); + + + private final AbstractDhServerLevel serverLevel; + private String getLevelIdentifier() { return this.serverLevel.getLevelWrapper().getDhIdentifier(); } + private GeneratedFullDataSourceProvider fullDataSourceProvider() { return this.serverLevel.serverside.fullDataFileHandler; } + private List getAllBeamsForPos(long pos) { return this.serverLevel.beaconBeamRepo.getAllBeamsForPos(pos); } + + private final ConcurrentMap requestGroupsByPos = new ConcurrentHashMap<>(); + private final ConcurrentMap requestGroupsByFutureId = new ConcurrentHashMap<>(); + + + public FullDataSourceRequestHandler(AbstractDhServerLevel serverLevel) + { + this.serverLevel = serverLevel; + } + + + //==================// + // network handling // + //==================// + + public void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad()) + { + message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); + return; + } + + if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message)) + { + return; + } + + + // the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated + long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1; + // the server timestamp will be null if no LOD data exists for this position + Long serverTimestamp = this.fullDataSourceProvider().getTimestampForPos(message.sectionPos); + if (serverTimestamp == null + || serverTimestamp <= clientTimestamp) + { + // either no data exists to sync, or the client is already up to date + rateLimiterSet.syncOnLoginRateLimiter.release(); + message.sendResponse(new FullDataSourceResponseMessage(null)); + return; + } + + + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) + { + // shouldn't normally happen, but just in case + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + return; + } + + this.fullDataSourceProvider().getAsync(message.sectionPos).thenAcceptAsync(fullDataSource -> + { + try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.getAllBeamsForPos(message.sectionPos))) + { + serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> + { + message.sendResponse(new FullDataSourceResponseMessage(payload)); + rateLimiterSet.syncOnLoginRateLimiter.release(); + }); + } + }, executor); + } + + public void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled()) + { + message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); + return; + } + + if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message)) + { + return; + } + + this.doQueueWorldGenForRequestMessage(new DataSourceRequestGroup.RequestData(serverPlayerState, message, rateLimiterSet)); + } + + private void doQueueWorldGenForRequestMessage(DataSourceRequestGroup.RequestData requestData) + { + while (true) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.computeIfAbsent(requestData.sectionPos(), pos -> + { + DataSourceRequestGroup newGroup = new DataSourceRequestGroup(pos); + this.tryFulfillDataSourceRequestGroup(newGroup, pos); + LOGGER.debug("[" + this.getLevelIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "]."); + return newGroup; + }); + + // If this fails, loop until either a permit is acquired or the group is removed to create another one + if (!requestGroup.tryAddRequest(requestData)) + { + Thread.yield(); + continue; + } + + this.requestGroupsByFutureId.put(requestData.futureId(), requestGroup); + break; + } + } + + public void cancelRequest(long requestId) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByFutureId.remove(requestId); + if (requestGroup == null) + { + return; + } + + DataSourceRequestGroup.RequestData removedRequest = requestGroup.tryRemoveRequest(requestId, requestsToTransfer -> + { + LOGGER.debug("[" + this.getLevelIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestGroup.pos) + "]."); + this.requestGroupsByPos.remove(requestGroup.pos); + + if (!requestsToTransfer.isEmpty()) + { + for (DataSourceRequestGroup.RequestData requestToTransfer : requestsToTransfer) + { + this.doQueueWorldGenForRequestMessage(requestToTransfer); + } + } + else + { + this.fullDataSourceProvider().removeRetrievalRequestIf(pos -> pos == requestGroup.pos); + } + }); + + if (removedRequest != null) + { + removedRequest.rateLimiterSet.generationRequestRateLimiter.release(); + } + } + + + public void tick() + { + // Send finished data source requests + for (Map.Entry entry : this.requestGroupsByPos.entrySet()) + { + DataSourceRequestGroup requestGroup = entry.getValue(); + + if (requestGroup.fullDataSource == null) + { + continue; + } + + LOGGER.debug("[" + this.getLevelIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]"); + + // Make this group unavailable for adding into + this.requestGroupsByPos.remove(entry.getKey()); + if (!requestGroup.tryClose()) + { + continue; + } + + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) + { + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + continue; + } + CompletableFuture.runAsync(() -> + { + try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.getAllBeamsForPos(entry.getKey()))) + { + for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values()) + { + this.requestGroupsByFutureId.remove(requestData.futureId()); + + requestData.serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { + requestData.message.sendResponse(new FullDataSourceResponseMessage(payload)); + requestData.rateLimiterSet.generationRequestRateLimiter.release(); + }); + } + } + }, executor); + } + } + + private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos) + { + this.fullDataSourceProvider().getAsync(pos).thenAccept(fullDataSource -> + { + if (this.fullDataSourceProvider().isFullyGenerated(fullDataSource.columnGenerationSteps)) + { + LOGGER.info("sending - complete [" + DhSectionPos.toString(pos) + "]"); + requestGroup.fullDataSource = fullDataSource; + } + else if (!this.serverLevel.isNSizedGenerationSupported() && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL) + { + // Make this group unavailable for adding into + this.requestGroupsByPos.remove(pos); + if (!requestGroup.tryClose()) + { + return; + } + + for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values()) + { + this.requestGroupsByFutureId.remove(requestData.futureId()); + requestData.rateLimiterSet.generationRequestRateLimiter.release(); + requestData.message.sendResponse(new SectionRequiresSplittingException()); + } + } + else if (requestGroup.isWorldGenTaskComplete()) + { + LOGGER.info("sending - retry [" + DhSectionPos.toString(pos) + "]"); + this.tryFulfillDataSourceRequestGroup(requestGroup, pos); + } + else + { + LOGGER.info("sending - queueing [" + DhSectionPos.toString(pos) + "]"); + this.fullDataSourceProvider().queuePositionForRetrieval(pos, true); + } + }); + } + + public void onWorldGenTaskComplete(long pos) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.get(pos); + if (requestGroup != null) + { + requestGroup.markWorldGenTaskComplete(); + this.tryFulfillDataSourceRequestGroup(requestGroup, pos); + } + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/exceptions/SectionRequiresSplittingException.java b/core/src/main/java/com/seibel/distanthorizons/core/network/exceptions/SectionRequiresSplittingException.java new file mode 100644 index 000000000..26b6d2a62 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/exceptions/SectionRequiresSplittingException.java @@ -0,0 +1,29 @@ +/* + * This file is part of the Distant Horizons mod + * licensed under the GNU LGPL v3 License. + * + * Copyright (C) 2020-2023 James Seibel + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ + +package com.seibel.distanthorizons.core.network.exceptions; + +/** Fired if the current section is not fully generated and underlying generator does not support N-sized generation. */ +public class SectionRequiresSplittingException extends Exception +{ + public SectionRequiresSplittingException() { this("Section requires splitting"); } + + public SectionRequiresSplittingException(String message) { super(message); } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java index c0679fc40..2d3a2c049 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java @@ -23,6 +23,7 @@ import com.google.common.base.MoreObjects; import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; import com.seibel.distanthorizons.core.network.exceptions.RequestOutOfRangeException; import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage; import io.netty.buffer.ByteBuf; @@ -38,6 +39,7 @@ public class ExceptionMessage extends AbstractTrackableMessage this.add(RateLimitedException.class); this.add(RequestOutOfRangeException.class); this.add(RequestRejectedException.class); + this.add(SectionRequiresSplittingException.class); }}; public Exception exception; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java index 7bb053184..ebd8257d9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java @@ -482,7 +482,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable } long pos = this.missingGenerationPos.removeLong(i); - boolean positionQueued = this.fullDataSourceProvider.queuePositionForRetrieval(pos); + boolean positionQueued = (this.fullDataSourceProvider.queuePositionForRetrieval(pos, false) != null); if (!positionQueued) { // shouldn't normally happen, but just in case