From 23b2a62db262a837c6bbea72120ca4ecca658453 Mon Sep 17 00:00:00 2001 From: James Seibel Date: Sun, 27 Oct 2024 16:14:56 -0500 Subject: [PATCH 1/7] proof-of-concept n-sized multiplayer request support --- .../core/api/internal/SharedApi.java | 2 +- .../FullDataSourceProviderV2.java | 20 +++---- .../GeneratedFullDataSourceProvider.java | 26 ++++++++-- .../RemoteFullDataSourceProvider.java | 52 +++++++++++-------- .../generation/RemoteWorldRetrievalQueue.java | 2 +- .../core/generation/WorldGenerationQueue.java | 5 ++ .../core/level/AbstractDhServerLevel.java | 11 +++- .../AbstractFullDataNetworkRequestQueue.java | 14 +++-- .../core/render/LodRenderSection.java | 2 +- 9 files changed, 89 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java index 192110a5b..d79be8372 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java @@ -66,7 +66,7 @@ public class SharedApi private static final UpdateChunkPosManager UPDATE_POS_MANAGER = new UpdateChunkPosManager(); /** how many chunks can be queued for updating per thread, used to prevent updates from infinitely pilling up if the user flies around extremely fast */ - private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 500; + private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 1_000; /** how many milliseconds must pass before an overloaded message can be sent in chat or the log */ private static final int MIN_MS_BETWEEN_OVERLOADED_LOG_MESSAGE = 30_000; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java index b9360801c..b4d8b5889 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java @@ -28,6 +28,7 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSour import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; import com.seibel.distanthorizons.core.file.AbstractDataSourceHandler; +import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult; import com.seibel.distanthorizons.core.level.IDhLevel; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.pos.DhSectionPos; @@ -137,15 +138,15 @@ public class FullDataSourceProviderV2 this.migrationThreadPool.execute(this::convertLegacyDataSources); // update propagation doesn't need to be run on the server since only the highest detail level is needed - if (SharedApi.getEnvironment() != EWorldEnvironment.SERVER_ONLY) - { + //if (SharedApi.getEnvironment() != EWorldEnvironment.SERVER_ONLY) // TODO + //{ this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue ["+dimensionName+"]"); this.updateQueueProcessor.execute(this::runUpdateQueue); - } - else - { - this.updateQueueProcessor = null; - } + //} + //else + //{ + // this.updateQueueProcessor = null; + //} } @@ -604,7 +605,8 @@ public class FullDataSourceProviderV2 public int getMaxPossibleRetrievalPositionCountForPos(Long pos) { return -1; } /** @return true if the position was queued, false if not */ - public boolean queuePositionForRetrieval(Long genPos) { return false; } + @Nullable + public CompletableFuture queuePositionForRetrieval(Long genPos) { return null; } /** does nothing if the given position isn't present in the queue */ public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf) { } @@ -632,8 +634,6 @@ public class FullDataSourceProviderV2 @Nullable public Long getTimestampForPos(long pos) { return this.repo.getTimestampForPos(pos); } - public Map getTimestampsForRange(byte detailLevel, int startPosX, int startPosZ, int endPosX, int endPosZ) - { return this.repo.getTimestampsForRange(detailLevel, startPosX, startPosZ, endPosX, endPosZ); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index 65fb7b59a..8b5288f13 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -20,6 +20,7 @@ package com.seibel.distanthorizons.core.file.fullDatafile; import com.seibel.distanthorizons.api.enums.worldGeneration.EDhApiWorldGenerationStep; +import com.seibel.distanthorizons.core.api.internal.SharedApi; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; @@ -35,9 +36,11 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.util.LodUtil; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; +import com.seibel.distanthorizons.core.world.EWorldEnvironment; import com.seibel.distanthorizons.coreapi.util.BitShiftUtil; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.awt.*; @@ -214,19 +217,34 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im } @Override - public boolean queuePositionForRetrieval(Long genPos) + public CompletableFuture queuePositionForRetrieval(Long genPos) { IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get(); if (worldGenQueue == null) { - return false; + return null; } WorldGenTaskTracker genTaskTracker = new WorldGenTaskTracker(genPos); CompletableFuture worldGenFuture = worldGenQueue.submitRetrievalTask(genPos, (byte) (DhSectionPos.getDetailLevel(genPos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL), genTaskTracker); - worldGenFuture.whenComplete((genTaskResult, ex) -> this.onWorldGenTaskComplete(genTaskResult, ex)); + worldGenFuture.whenComplete((genTaskResult, ex) -> + { + LOGGER.info("gen task complete ["+DhSectionPos.toString(genPos)+"]"); + //this.onWorldGenTaskComplete(genTaskResult, ex); + }); - return true; + return worldGenFuture; + } + + @Override + protected void updateDataSourceAtPos(long updatePos, @NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos) + { + super.updateDataSourceAtPos(updatePos, inputData, lockOnUpdatePos); + + //if (SharedApi.getEnvironment() != EWorldEnvironment.CLIENT_ONLY) + // LOGGER.info("updated ["+DhSectionPos.toString(updatePos)+"]"); + + this.onWorldGenTaskComplete(WorldGenResult.CreateSuccess(updatePos), null); } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java index 0ca214482..41c9fda74 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java @@ -81,35 +81,43 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide } + //===========================// // request timestamp updates // // from server // //===========================// - // get the timestamp for every maximum detail position in this section - int posToMinimumDetailScale = BitShiftUtil.powerOfTwo(DhSectionPos.getDetailLevel(pos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL); - Map timestamps = this.getTimestampsForRange( - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, - DhSectionPos.getX(pos) * posToMinimumDetailScale, - DhSectionPos.getZ(pos) * posToMinimumDetailScale, - (DhSectionPos.getX(pos) + 1) * posToMinimumDetailScale, - (DhSectionPos.getZ(pos) + 1) * posToMinimumDetailScale - ); + // TODO + //// get the timestamp for every maximum detail position in this section + //int posToMinimumDetailScale = BitShiftUtil.powerOfTwo(DhSectionPos.getDetailLevel(pos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL); + //Map timestamps = this.getTimestampsForRange( + // DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, + // DhSectionPos.getX(pos) * posToMinimumDetailScale, + // DhSectionPos.getZ(pos) * posToMinimumDetailScale, + // (DhSectionPos.getX(pos) + 1) * posToMinimumDetailScale, + // (DhSectionPos.getZ(pos) + 1) * posToMinimumDetailScale + //); + // + //DhSectionPos.forEachChildAtDetailLevel(pos, DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos -> + //{ + // if (!this.visitedPositions.add(childPos)) + // { + // return; + // } + // + // // check if the server has newer versions of these LODs + // Long subTimestamp = timestamps.get(childPos); + // if (subTimestamp != null) + // { + // this.syncOnLoadRequestQueue.submitRequest(childPos, subTimestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave); + // } + //}); - DhSectionPos.forEachChildAtDetailLevel(pos, DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos -> + Long timestamp = this.getTimestampForPos(pos); + if (timestamp != null) { - if (!this.visitedPositions.add(childPos)) - { - return; - } - - // check if the server has newer versions of these LODs - Long subTimestamp = timestamps.get(childPos); - if (subTimestamp != null) - { - this.syncOnLoadRequestQueue.submitRequest(childPos, subTimestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave); - } - }); + this.syncOnLoadRequestQueue.submitRequest(pos, timestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave); + } return super.get(pos); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java index 56b23490f..44f8c6a0b 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java @@ -39,7 +39,7 @@ public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQue public void startAndSetTargetPos(DhBlockPos2D targetPos) { super.tick(targetPos); } @Override - public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; } + public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL + 12; } // TODO should be the same as what the server's update propgator can provide @Override public byte highestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java index d700ea058..0bec732b3 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java @@ -143,6 +143,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // Assert that the data at least can fill in 1 single ChunkSizedFullDataAccessor LodUtil.assertTrue(DhSectionPos.getDetailLevel(pos) > requiredDataDetail + LodUtil.CHUNK_DETAIL_LEVEL); + LOGGER.info("queueing gen ["+DhSectionPos.toString(pos)+"]"); CompletableFuture future = new CompletableFuture<>(); this.waitingTasks.put(pos, new WorldGenTask(pos, requiredDataDetail, tracker, future)); @@ -282,6 +283,8 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb { //LOGGER.trace("Unable to start task: "+closestTask.pos+", skipping. Task position may have already been generated."); } + + //LOGGER.info("started gen ["+DhSectionPos.toString(closestTask.pos)+"]"); } else { @@ -318,6 +321,8 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // send the child futures to the future recipient, to notify them of the new tasks closestTask.future.complete(WorldGenResult.CreateSplit(childFutures)); + //LOGGER.info("split ["+DhSectionPos.toString(sectionPos)+"]"); + // return true so we attempt to generate again return true; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index c8873f338..ad1ea86a3 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -197,6 +197,8 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I ServerPlayerState.RateLimiterSet rateLimiterSet = serverPlayerState.getRateLimiterSet(this); + LOGGER.info("received message ["+DhSectionPos.toString(message.sectionPos)+"]"); + if (message.clientTimestamp == null) { this.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet); @@ -281,7 +283,8 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I Objects.requireNonNull(this.beaconBeamRepo); try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos))) { - serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { + serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> + { message.sendResponse(new FullDataSourceResponseMessage(payload)); rateLimiterSet.syncOnLoginRateLimiter.release(); }); @@ -388,15 +391,19 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I { if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps)) { + LOGGER.info("sending - complete ["+DhSectionPos.toString(pos)+"]"); requestGroup.fullDataSource = fullDataSource; } else if (requestGroup.worldGenTaskComplete) { - // If the returned data source is not fully generated, try reading it again + LOGGER.info("sending - retry ["+DhSectionPos.toString(pos)+"]"); + // If the returned data source is not fully generated, try reading it again // can wait for a while if waiting for worldgen and/or update propagation + try { Thread.sleep(250); } catch (InterruptedException ignore) {} this.tryFulfillDataSourceRequestGroup(requestGroup, pos); } else { + LOGGER.info("sending - queueing ["+DhSectionPos.toString(pos)+"]"); this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos); } }); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index 6c762a0ab..6f26780f1 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -19,7 +19,6 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import com.seibel.distanthorizons.core.util.LodUtil; -import com.seibel.distanthorizons.core.util.TimerUtil; import com.seibel.distanthorizons.core.util.objects.DataCorruptedException; import com.seibel.distanthorizons.core.util.ratelimiting.SupplierBasedRateLimiter; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; @@ -109,20 +108,25 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende { return this.submitRequest(sectionPos, null, dataSourceConsumer); } public CompletableFuture submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer dataSourceConsumer) { - LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); + //LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); RequestQueueEntry entry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp); entry.future.whenComplete((success, throwable) -> { + LOGGER.info("received ["+DhSectionPos.toString(sectionPos)+"]"); + this.waitingTasksBySectionPos.remove(sectionPos); this.finishedRequests.incrementAndGet(); - if (!success || throwable != null) + if ((success == null || !success) + || throwable != null) { this.failedRequests.incrementAndGet(); } }); + LOGGER.info("asking server for ["+DhSectionPos.toString(sectionPos)+"]"); + this.waitingTasksBySectionPos.put(sectionPos, entry); return entry.future; } @@ -157,7 +161,8 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } private void sendNextRequest(DhBlockPos2D targetPos) { - Map.Entry mapEntry = this.waitingTasksBySectionPos.entrySet().stream() + Map.Entry mapEntry = this.waitingTasksBySectionPos + .entrySet().stream() .filter(task -> task.getValue().networkDataSourceFuture == null) .min(Comparator.comparingInt(x -> posDistanceSquared(targetPos, x.getKey()))) .orElse(null); @@ -201,6 +206,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende LOGGER.warn("Unable to handle FullDataPayload - getNetworkCompressionExecutor() is null"); return null; } + CompletableFuture.runAsync(() -> { try diff --git a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java index 334cba2cc..99a55189b 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java @@ -484,7 +484,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable } long pos = this.missingGenerationPos.removeLong(i); - boolean positionQueued = this.fullDataSourceProvider.queuePositionForRetrieval(pos); + boolean positionQueued = (this.fullDataSourceProvider.queuePositionForRetrieval(pos) != null); if (!positionQueued) { // shouldn't normally happen, but just in case From 13ab18d763044a600e71c0ea501fe6d0cb80a312 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Thu, 31 Oct 2024 16:00:43 +0500 Subject: [PATCH 2/7] Add section splitting --- .../distanthorizons/core/config/Config.java | 2 + .../generation/RemoteWorldRetrievalQueue.java | 23 +++++++-- .../core/level/AbstractDhServerLevel.java | 28 +++++++++-- .../AbstractFullDataNetworkRequestQueue.java | 50 ++++++++++++------- .../SectionRequiresSplittingException.java | 29 +++++++++++ .../messages/requests/ExceptionMessage.java | 2 + .../core/pos/DhSectionPos.java | 22 ++++++++ .../core/pos/blockPos/DhBlockPos2D.java | 1 + 8 files changed, 131 insertions(+), 26 deletions(-) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/network/exceptions/SectionRequiresSplittingException.java diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java index c8440cd9d..afc8d8788 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java @@ -1330,6 +1330,7 @@ public class Config .build(); public static final ConfigEntry numberOfUpdatePropagatorThreads = new ConfigEntry.Builder() + .setServersideShortName("numberOfUpdatePropagatorThreads") .setMinDefaultMax(1, ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultThreadCount(), Runtime.getRuntime().availableProcessors()) @@ -1350,6 +1351,7 @@ public class Config + THREAD_NOTE) .build(); public static final ConfigEntry runTimeRatioForUpdatePropagatorThreads = new ConfigEntry.Builder() + .setServersideShortName("runTimeRatioForUpdatePropagatorThreads") .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultRunTimeRatio(), 1.0) .comment(THREAD_RUN_TIME_RATIO_NOTE) .build(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java index 44f8c6a0b..c56ea51c7 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/RemoteWorldRetrievalQueue.java @@ -7,11 +7,14 @@ import com.seibel.distanthorizons.core.level.DhClientLevel; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.multiplayer.client.AbstractFullDataNetworkRequestQueue; import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState; +import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.util.LodUtil; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQueue implements IFullDataSourceRetrievalQueue, IDebugRenderable @@ -47,9 +50,23 @@ public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQue public CompletableFuture submitRetrievalTask(long sectionPos, byte requiredDataDetail, IWorldGenTaskTracker tracker) { return super.submitRequest(sectionPos, tracker.getDataSourceConsumer()) - .thenApply(retrievalSuccess -> retrievalSuccess - ? WorldGenResult.CreateSuccess(sectionPos) - : WorldGenResult.CreateFail()); + .thenApply(requestResult -> + { + switch (requestResult) + { + case SUCCEEDED: + return WorldGenResult.CreateSuccess(sectionPos); + case FAILED: + return WorldGenResult.CreateFail(); + case REQUIRES_SPLITTING: + List> childFutures = new ArrayList<>(4); + DhSectionPos.forEachChild(sectionPos, childPos -> childFutures.add(this.submitRetrievalTask(childPos, requiredDataDetail, tracker))); + return WorldGenResult.CreateSplit(childFutures); + } + + LodUtil.assertNotReach(); + return WorldGenResult.CreateFail(); + }); } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index ad1ea86a3..8ab6428b0 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -11,6 +11,7 @@ import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState; import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager; import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage; import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage; import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; @@ -43,9 +44,6 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I private static final ConfigBasedLogger NETWORK_LOGGER = new ConfigBasedLogger(LogManager.getLogger(), () -> Config.Common.Logging.logNetworkEvent.get()); - /** 1 Mebibyte minus 576 bytes for other info */ - public static final int FULL_DATA_SPLIT_SIZE_IN_BYTES = 1_048_000; - public final ServerLevelModule serverside; protected final IServerLevelWrapper serverLevelWrapper; @@ -62,6 +60,8 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I private final ConcurrentMap requestGroupByFutureId = new ConcurrentHashMap<>(); + private final boolean generatorSupportsNSizedGeneration = false; + //=============// // constructor // @@ -394,11 +394,29 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I LOGGER.info("sending - complete ["+DhSectionPos.toString(pos)+"]"); requestGroup.fullDataSource = fullDataSource; } + else if (!this.generatorSupportsNSizedGeneration && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL) + { + // Make this group unavailable for adding into + this.requestGroupByPos.remove(pos); + requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + + for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) + { + this.requestGroupByFutureId.remove(msg.futureId); + + ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); + if (serverPlayerState != null) + { + serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); + } + + msg.sendResponse(new SectionRequiresSplittingException()); + } + } else if (requestGroup.worldGenTaskComplete) { LOGGER.info("sending - retry ["+DhSectionPos.toString(pos)+"]"); - // If the returned data source is not fully generated, try reading it again // can wait for a while if waiting for worldgen and/or update propagation - try { Thread.sleep(250); } catch (InterruptedException ignore) {} this.tryFulfillDataSourceRequestGroup(requestGroup, pos); } else diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index 6f26780f1..fd2202551 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -10,6 +10,7 @@ import com.seibel.distanthorizons.core.logging.ConfigBasedSpamLogger; import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; import com.seibel.distanthorizons.core.network.session.SessionClosedException; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; @@ -104,29 +105,29 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende // request submitting // //====================// - public CompletableFuture submitRequest(long sectionPos, Consumer dataSourceConsumer) + public CompletableFuture submitRequest(long sectionPos, Consumer dataSourceConsumer) { return this.submitRequest(sectionPos, null, dataSourceConsumer); } - public CompletableFuture submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer dataSourceConsumer) + public CompletableFuture submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer dataSourceConsumer) { //LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); RequestQueueEntry entry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp); - entry.future.whenComplete((success, throwable) -> + entry.future.whenComplete((requestResult, throwable) -> { - LOGGER.info("received ["+DhSectionPos.toString(sectionPos)+"]"); - this.waitingTasksBySectionPos.remove(sectionPos); - this.finishedRequests.incrementAndGet(); - if ((success == null || !success) - || throwable != null) + if (requestResult != RequestResult.REQUIRES_SPLITTING) + { + this.finishedRequests.incrementAndGet(); + } + + if ((requestResult == null || requestResult == RequestResult.FAILED) + || (throwable != null && !(throwable instanceof CancellationException))) { this.failedRequests.incrementAndGet(); } }); - LOGGER.info("asking server for ["+DhSectionPos.toString(sectionPos)+"]"); - this.waitingTasksBySectionPos.put(sectionPos, entry); return entry.future; } @@ -164,7 +165,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende Map.Entry mapEntry = this.waitingTasksBySectionPos .entrySet().stream() .filter(task -> task.getValue().networkDataSourceFuture == null) - .min(Comparator.comparingInt(x -> posDistanceSquared(targetPos, x.getKey()))) + .min(Comparator.comparingInt(x -> posDistance(targetPos, x.getKey()))) .orElse(null); if (mapEntry == null) @@ -183,7 +184,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende CompletableFuture dataSourceFuture = this.networkState.getSession().sendRequest( new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, offsetEntryTimestamp), FullDataSourceResponseMessage.class - ); + ); entry.networkDataSourceFuture = dataSourceFuture; dataSourceFuture.handle((response, throwable) -> { @@ -227,10 +228,14 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende LodUtil.assertTrue(this.changedOnly, "Received empty data source response for not changes-only request"); } } + catch (SectionRequiresSplittingException ignored) + { + return entry.future.complete(RequestResult.REQUIRES_SPLITTING); + } catch (InvalidLevelException | RequestRejectedException ignored) { // We're too late / some cases might trigger a bunch of expected rejections - return entry.future.complete(false); + return entry.future.complete(RequestResult.FAILED); } catch (SessionClosedException | CancellationException ignored) { @@ -260,11 +265,11 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } else { - return entry.future.complete(false); + return entry.future.complete(RequestResult.FAILED); } } - return entry.future.complete(true); + return entry.future.complete(RequestResult.SUCCEEDED); }); } @@ -370,8 +375,10 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende // helper methods // //================// - protected static int posDistanceSquared(DhBlockPos2D targetPos, long pos) - { return (int) DhSectionPos.getCenterBlockPos(pos).distSquared(targetPos); } + protected static int posDistance(DhBlockPos2D targetPos, long pos) + { + return DhSectionPos.signedDistance(pos, targetPos); + } @@ -382,7 +389,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende protected static class RequestQueueEntry { /** encapsulates the entire request, including client side queuing and the actual server request */ - public final CompletableFuture future = new CompletableFuture<>(); + public final CompletableFuture future = new CompletableFuture<>(); public final Consumer dataSourceConsumer; /** will be null if we want to retrieve the LOD regardless of when it was last updated */ @Nullable @@ -412,6 +419,13 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } + public enum RequestResult + { + SUCCEEDED, + REQUIRES_SPLITTING, + FAILED, + } + } \ No newline at end of file diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/exceptions/SectionRequiresSplittingException.java b/core/src/main/java/com/seibel/distanthorizons/core/network/exceptions/SectionRequiresSplittingException.java new file mode 100644 index 000000000..26b6d2a62 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/exceptions/SectionRequiresSplittingException.java @@ -0,0 +1,29 @@ +/* + * This file is part of the Distant Horizons mod + * licensed under the GNU LGPL v3 License. + * + * Copyright (C) 2020-2023 James Seibel + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ + +package com.seibel.distanthorizons.core.network.exceptions; + +/** Fired if the current section is not fully generated and underlying generator does not support N-sized generation. */ +public class SectionRequiresSplittingException extends Exception +{ + public SectionRequiresSplittingException() { this("Section requires splitting"); } + + public SectionRequiresSplittingException(String message) { super(message); } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java index c6b4bae46..187d2cfa9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/requests/ExceptionMessage.java @@ -23,6 +23,7 @@ import com.google.common.base.MoreObjects; import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage; import io.netty.buffer.ByteBuf; @@ -38,6 +39,7 @@ public class ExceptionMessage extends AbstractTrackableMessage this.add(RateLimitedException.class); this.add(InvalidLevelException.class); this.add(RequestRejectedException.class); + this.add(SectionRequiresSplittingException.class); }}; public Exception exception; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java b/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java index 3fb1252af..446ab110b 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java @@ -269,6 +269,28 @@ public class DhSectionPos ); } + public static int signedDistance(long pos, DhBlockPos2D blockPos) + { + // Assuming Square2D has a method to get its center and size + double halfWidth = getBlockWidth(pos) / 2.0; + + // Calculate the distance from the point to the center of the square + double dx = blockPos.x - getCenterBlockPosX(pos); + double dy = blockPos.z - getCenterBlockPosZ(pos); + + // Calculate the horizontal and vertical distances to the edges of the square + double distanceX = Math.abs(dx) - halfWidth; + double distanceY = Math.abs(dy) - halfWidth; + + // The signed distance is the max of distanceX and distanceY + // This gives us the distance to the nearest edge + // If both distances are negative, the point is inside the square + double signedDistance = Math.max(distanceX, distanceY); + + return (int) signedDistance; // Cast to int, as per function signature + } + + //==================// diff --git a/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java b/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java index 82cd2d397..13728ba81 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java @@ -75,6 +75,7 @@ public class DhBlockPos2D public long distSquared(DhBlockPos2D other) { return this.distSquared(other.x, other.z); } public long distSquared(int x, int z) { return MathUtil.pow2((long) this.x - x) + MathUtil.pow2((long) this.z - z); } + public long chebyshevDist(DhBlockPos2D other) { return Math.max(Math.abs(this.x - other.x), Math.abs(this.z - other.z)); } //===========// From 3d11a208d739a7c91431810fddc8c0768cf62238 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Wed, 13 Nov 2024 18:19:22 +0500 Subject: [PATCH 3/7] Move request handling to another class and rewrite group locking logic --- .../core/level/AbstractDhServerLevel.java | 265 +----------------- .../server/DataSourceRequestGroup.java | 109 +++++++ .../server/FullDataSourceRequestHandler.java | 264 +++++++++++++++++ 3 files changed, 385 insertions(+), 253 deletions(-) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index 4c2e29675..8886d6232 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -4,21 +4,18 @@ import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; -import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.logging.f3.F3Screen; +import com.seibel.distanthorizons.core.multiplayer.server.FullDataSourceRequestHandler; import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState; import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager; import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; -import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; -import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage; import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage; import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage; import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; -import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage; import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; @@ -28,21 +25,16 @@ import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import javax.annotation.CheckForNull; import javax.annotation.Nullable; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.*; public abstract class AbstractDhServerLevel extends AbstractDhLevel implements IDhServerLevel { protected static final Logger LOGGER = DhLoggerBuilder.getLogger(); - private static final ConfigBasedLogger NETWORK_LOGGER = new ConfigBasedLogger(LogManager.getLogger(), - () -> Config.Common.Logging.logNetworkEvent.get()); public final ServerLevelModule serverside; protected final IServerLevelWrapper serverLevelWrapper; @@ -56,11 +48,9 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I */ protected final ConcurrentLinkedQueue worldGenPlayerCenteringQueue = new ConcurrentLinkedQueue<>(); - private final ConcurrentMap requestGroupByPos = new ConcurrentHashMap<>(); - private final ConcurrentMap requestGroupByFutureId = new ConcurrentHashMap<>(); + private final FullDataSourceRequestHandler requestHandler = new FullDataSourceRequestHandler(this); - - private final boolean generatorSupportsNSizedGeneration = false; + private final boolean NSizedGenerationSupported = false; //=============// @@ -104,52 +94,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I @Override public void serverTick() { - // Send finished data source requests - for (Map.Entry entry : this.requestGroupByPos.entrySet()) - { - DataSourceRequestGroup requestGroup = entry.getValue(); - - if (requestGroup.fullDataSource == null) - { - continue; - } - - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]"); - - // Make this group unavailable for adding into - this.requestGroupByPos.remove(entry.getKey()); - requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); - if (executor == null) - { - LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); - continue; - } - CompletableFuture.runAsync(() -> - { - Objects.requireNonNull(this.beaconBeamRepo); - try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey()))) - { - for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) - { - this.requestGroupByFutureId.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState == null) - { - continue; - } - - serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { - msg.sendResponse(new FullDataSourceResponseMessage(payload)); - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - }); - } - } - }, executor); - } + this.requestHandler.tick(); } @Override @@ -188,149 +133,35 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I { serverPlayerState.networkSession.registerHandler(FullDataSourceRequestMessage.class, (message) -> { - if (!this.messagePlayerInThisLevel(message)) + if (!this.validatePlayerInCurrentLevel(message)) { - // we can't handle players in other levels, don't continue return; } - ServerPlayerState.RateLimiterSet rateLimiterSet = serverPlayerState.getRateLimiterSet(this); LOGGER.info("received message ["+DhSectionPos.toString(message.sectionPos)+"]"); if (message.clientTimestamp == null) { - this.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet); + this.requestHandler.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet); } else { - this.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet); + this.requestHandler.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet); } }); serverPlayerState.networkSession.registerHandler(CancelMessage.class, msg -> { - DataSourceRequestGroup requestGroup = this.requestGroupByFutureId.remove(msg.futureId); - if (requestGroup == null) - { - return; - } - - // If this fails, the group is being removed and completing cancellation is not necessary - if (requestGroup.requestRemoveSemaphore.tryAcquire()) - { - // Prevent adding requests in case the group will be removed by this cancellation - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestRemoveSemaphore.release(); - - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - - FullDataSourceRequestMessage requestMessage = requestGroup.requestMessages.remove(msg.futureId); - if (requestGroup.requestMessages.isEmpty()) - { - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestMessage.sectionPos) + "]."); - this.requestGroupByPos.remove(requestMessage.sectionPos); - this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos); - } - else - { - requestGroup.requestAddSemaphore.release(Short.MAX_VALUE); - } - } + this.requestHandler.cancelRequest(msg.futureId); }); } - private void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) - { - if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad()) - { - message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); - return; - } - - if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message)) - { - return; - } - - - // the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated - long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1; - // the server timestamp will be null if no LOD data exists for this position - Long serverTimestamp = this.serverside.fullDataFileHandler.getTimestampForPos(message.sectionPos); - if (serverTimestamp == null - || serverTimestamp <= clientTimestamp) - { - // either no data exists to sync, or the client is already up to date - rateLimiterSet.syncOnLoginRateLimiter.release(); - message.sendResponse(new FullDataSourceResponseMessage(null)); - return; - } - - - - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); - if (executor == null) - { - // shouldn't normally happen, but just in case - LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); - return; - } - - this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource -> - { - Objects.requireNonNull(this.beaconBeamRepo); - try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos))) - { - serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> - { - message.sendResponse(new FullDataSourceResponseMessage(payload)); - rateLimiterSet.syncOnLoginRateLimiter.release(); - }); - } - }, executor); - } - private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) - { - if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled()) - { - message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); - return; - } - - if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message)) - { - return; - } - - while (true) - { - DataSourceRequestGroup requestGroup = this.requestGroupByPos.computeIfAbsent(message.sectionPos, pos -> - { - DataSourceRequestGroup newGroup = new DataSourceRequestGroup(); - this.tryFulfillDataSourceRequestGroup(newGroup, pos); - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "]."); - return newGroup; - }); - - // If this fails, loop until either a permit is acquired or the group is removed to create another one - if (!requestGroup.requestAddSemaphore.tryAcquire()) - { - Thread.yield(); - continue; - } - - this.requestGroupByFutureId.put(message.futureId, requestGroup); - requestGroup.requestMessages.put(message.futureId, message); - requestGroup.requestAddSemaphore.release(); - break; - } - } /** May send an error message in response if the message is a {@link AbstractTrackableMessage} */ - private boolean messagePlayerInThisLevel(T message) + private boolean validatePlayerInCurrentLevel(T message) { if (!(message instanceof ILevelRelatedMessage)) { @@ -374,57 +205,12 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I // world gen // //===========// + public boolean isNSizedGenerationSupported() { return this.NSizedGenerationSupported; } + @Override public void onWorldGenTaskComplete(long pos) { - DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos); - if (requestGroup != null) - { - requestGroup.worldGenTaskComplete = true; - this.tryFulfillDataSourceRequestGroup(requestGroup, pos); - } - } - - private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos) - { - this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource -> - { - if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps)) - { - LOGGER.info("sending - complete ["+DhSectionPos.toString(pos)+"]"); - requestGroup.fullDataSource = fullDataSource; - } - else if (!this.generatorSupportsNSizedGeneration && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL) - { - // Make this group unavailable for adding into - this.requestGroupByPos.remove(pos); - requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - - for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) - { - this.requestGroupByFutureId.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState != null) - { - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - } - - msg.sendResponse(new SectionRequiresSplittingException()); - } - } - else if (requestGroup.worldGenTaskComplete) - { - LOGGER.info("sending - retry ["+DhSectionPos.toString(pos)+"]"); - this.tryFulfillDataSourceRequestGroup(requestGroup, pos); - } - else - { - LOGGER.info("sending - queueing ["+DhSectionPos.toString(pos)+"]"); - this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos); - } - }); + this.requestHandler.onWorldGenTaskComplete(pos); } @@ -564,31 +350,4 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I LOGGER.info("Closed DHLevel for [" + this.getLevelWrapper() + "]."); } - - - //================// - // helper classes // - //================// - - private static class DataSourceRequestGroup - { - public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); - - /** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */ - boolean worldGenTaskComplete = false; - - @CheckForNull - public FullDataSourceV2 fullDataSource = null; - - /** - * These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled, - * as opposed to ReentrantReadWriteLocks which would allow the locking thread continue using it anyway.
- * Short.MAX_VALUE is chosen as a large enough number so non-exclusive accesses never block each other. - */ - public final Semaphore requestAddSemaphore = new Semaphore(Short.MAX_VALUE, true); - /** @see DataSourceRequestGroup#requestAddSemaphore */ - public final Semaphore requestRemoveSemaphore = new Semaphore(Short.MAX_VALUE, true); - - } - } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java new file mode 100644 index 000000000..e183b1437 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java @@ -0,0 +1,109 @@ +package com.seibel.distanthorizons.core.multiplayer.server; + +import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; + +import javax.annotation.CheckForNull; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +class DataSourceRequestGroup +{ + public final long pos; + + /** + * If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps + */ + private boolean worldGenTaskComplete = false; + + void markWorldGenTaskComplete() + { + this.worldGenTaskComplete = true; + } + + boolean isWorldGenTaskComplete() + { + return this.worldGenTaskComplete; + } + + @CheckForNull + public FullDataSourceV2 fullDataSource = null; + + public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); + public final Semaphore pendingAdditionSemaphore = new Semaphore(Short.MAX_VALUE, true); + public final AtomicBoolean isClosed = new AtomicBoolean(); + + + DataSourceRequestGroup(long pos) + { + this.pos = pos; + } + + public boolean tryClose() + { + if (!this.isClosed.compareAndSet(false, true)) + { + return false; + } + + this.pendingAdditionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + return true; + } + + public boolean tryAddRequest(RequestData requestData) + { + if (!this.pendingAdditionSemaphore.tryAcquire()) + { + return false; + } + + this.requestMessages.put(requestData.futureId(), requestData); + this.pendingAdditionSemaphore.release(); + + return true; + } + + + public RequestData tryRemoveRequest(long requestId, HangingRequestTransferConsumer hangingRequestTransferConsumer) + { + RequestData removed = this.requestMessages.remove(requestId); + + if (this.requestMessages.isEmpty() && this.tryClose()) + { + hangingRequestTransferConsumer.accept(this.requestMessages.values()); + } + + return removed; + } + + + static class RequestData + { + public final ServerPlayerState serverPlayerState; + public final ServerPlayerState.RateLimiterSet rateLimiterSet; + + public final FullDataSourceRequestMessage message; + public long futureId() { return this.message.futureId; } + public long sectionPos() { return this.message.sectionPos; } + + RequestData(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + this.serverPlayerState = serverPlayerState; + this.rateLimiterSet = rateLimiterSet; + this.message = message; + } + + } + + /** + * While closing this group, some requests may slip through and end up lost.
+ * This is a workaround that allows the caller to transfer these requests to a new group. + */ + @FunctionalInterface + interface HangingRequestTransferConsumer extends Consumer> { } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java new file mode 100644 index 000000000..52cf38fd3 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java @@ -0,0 +1,264 @@ +package com.seibel.distanthorizons.core.multiplayer.server; + +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataSourceProvider; +import com.seibel.distanthorizons.core.level.AbstractDhServerLevel; +import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; +import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO; +import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; +import org.apache.logging.log4j.LogManager; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; + +public class FullDataSourceRequestHandler +{ + private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(), + () -> Config.Common.Logging.logNetworkEvent.get()); + + + private final AbstractDhServerLevel serverLevel; + private String getLevelIdentifier() { return this.serverLevel.getLevelWrapper().getDhIdentifier(); } + private GeneratedFullDataSourceProvider fullDataSourceProvider() { return this.serverLevel.serverside.fullDataFileHandler; } + private List getAllBeamsForPos(long pos) { return this.serverLevel.beaconBeamRepo.getAllBeamsForPos(pos); } + + private final ConcurrentMap requestGroupsByPos = new ConcurrentHashMap<>(); + private final ConcurrentMap requestGroupsByFutureId = new ConcurrentHashMap<>(); + + + public FullDataSourceRequestHandler(AbstractDhServerLevel serverLevel) + { + this.serverLevel = serverLevel; + } + + + //==================// + // network handling // + //==================// + + public void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad()) + { + message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); + return; + } + + if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message)) + { + return; + } + + + // the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated + long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1; + // the server timestamp will be null if no LOD data exists for this position + Long serverTimestamp = this.fullDataSourceProvider().getTimestampForPos(message.sectionPos); + if (serverTimestamp == null + || serverTimestamp <= clientTimestamp) + { + // either no data exists to sync, or the client is already up to date + rateLimiterSet.syncOnLoginRateLimiter.release(); + message.sendResponse(new FullDataSourceResponseMessage(null)); + return; + } + + + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) + { + // shouldn't normally happen, but just in case + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + return; + } + + this.fullDataSourceProvider().getAsync(message.sectionPos).thenAcceptAsync(fullDataSource -> + { + try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.getAllBeamsForPos(message.sectionPos))) + { + serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> + { + message.sendResponse(new FullDataSourceResponseMessage(payload)); + rateLimiterSet.syncOnLoginRateLimiter.release(); + }); + } + }, executor); + } + + public void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled()) + { + message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); + return; + } + + if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message)) + { + return; + } + + this.doQueueWorldGenForRequestMessage(new DataSourceRequestGroup.RequestData(serverPlayerState, message, rateLimiterSet)); + } + + private void doQueueWorldGenForRequestMessage(DataSourceRequestGroup.RequestData requestData) + { + while (true) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.computeIfAbsent(requestData.sectionPos(), pos -> + { + DataSourceRequestGroup newGroup = new DataSourceRequestGroup(pos); + this.tryFulfillDataSourceRequestGroup(newGroup, pos); + LOGGER.debug("[" + this.getLevelIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "]."); + return newGroup; + }); + + // If this fails, loop until either a permit is acquired or the group is removed to create another one + if (!requestGroup.tryAddRequest(requestData)) + { + Thread.yield(); + continue; + } + + this.requestGroupsByFutureId.put(requestData.futureId(), requestGroup); + break; + } + } + + public void cancelRequest(long requestId) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByFutureId.remove(requestId); + if (requestGroup == null) + { + return; + } + + DataSourceRequestGroup.RequestData removedRequest = requestGroup.tryRemoveRequest(requestId, requestsToTransfer -> + { + LOGGER.debug("[" + this.getLevelIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestGroup.pos) + "]."); + this.requestGroupsByPos.remove(requestGroup.pos); + + if (!requestsToTransfer.isEmpty()) + { + for (DataSourceRequestGroup.RequestData requestToTransfer : requestsToTransfer) + { + this.doQueueWorldGenForRequestMessage(requestToTransfer); + } + } + else + { + this.fullDataSourceProvider().removeRetrievalRequestIf(pos -> pos == requestGroup.pos); + } + }); + + if (removedRequest != null) + { + removedRequest.rateLimiterSet.generationRequestRateLimiter.release(); + } + } + + + public void tick() + { + // Send finished data source requests + for (Map.Entry entry : this.requestGroupsByPos.entrySet()) + { + DataSourceRequestGroup requestGroup = entry.getValue(); + + if (requestGroup.fullDataSource == null) + { + continue; + } + + LOGGER.debug("[" + this.getLevelIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]"); + + // Make this group unavailable for adding into + this.requestGroupsByPos.remove(entry.getKey()); + if (!requestGroup.tryClose()) + { + continue; + } + + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) + { + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + continue; + } + CompletableFuture.runAsync(() -> + { + try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.getAllBeamsForPos(entry.getKey()))) + { + for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values()) + { + this.requestGroupsByFutureId.remove(requestData.futureId()); + + requestData.serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { + requestData.message.sendResponse(new FullDataSourceResponseMessage(payload)); + requestData.rateLimiterSet.generationRequestRateLimiter.release(); + }); + } + } + }, executor); + } + } + + private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos) + { + this.fullDataSourceProvider().getAsync(pos).thenAccept(fullDataSource -> + { + if (this.fullDataSourceProvider().isFullyGenerated(fullDataSource.columnGenerationSteps)) + { + LOGGER.info("sending - complete [" + DhSectionPos.toString(pos) + "]"); + requestGroup.fullDataSource = fullDataSource; + } + else if (!this.serverLevel.isNSizedGenerationSupported() && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL) + { + // Make this group unavailable for adding into + this.requestGroupsByPos.remove(pos); + if (!requestGroup.tryClose()) + { + return; + } + + for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values()) + { + this.requestGroupsByFutureId.remove(requestData.futureId()); + requestData.rateLimiterSet.generationRequestRateLimiter.release(); + requestData.message.sendResponse(new SectionRequiresSplittingException()); + } + } + else if (requestGroup.isWorldGenTaskComplete()) + { + LOGGER.info("sending - retry [" + DhSectionPos.toString(pos) + "]"); + this.tryFulfillDataSourceRequestGroup(requestGroup, pos); + } + else + { + LOGGER.info("sending - queueing [" + DhSectionPos.toString(pos) + "]"); + this.fullDataSourceProvider().queuePositionForRetrieval(pos); + } + }); + } + + public void onWorldGenTaskComplete(long pos) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.get(pos); + if (requestGroup != null) + { + requestGroup.markWorldGenTaskComplete(); + this.tryFulfillDataSourceRequestGroup(requestGroup, pos); + } + } + +} From 587ea7017cb51f07eb4fc8e04377a1a7193106ca Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Wed, 13 Nov 2024 18:34:27 +0500 Subject: [PATCH 4/7] Fail subsequent requests for already pending LOD --- .../AbstractFullDataNetworkRequestQueue.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index 1f911b079..688657a92 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.*; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -111,24 +112,40 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende { //LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); - RequestQueueEntry entry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp); - entry.future.whenComplete((requestResult, throwable) -> + AtomicBoolean added = new AtomicBoolean(false); + RequestQueueEntry entry = this.waitingTasksBySectionPos.compute(sectionPos, (k, existingQueueEntry) -> { - this.waitingTasksBySectionPos.remove(sectionPos); - - if (requestResult != RequestResult.REQUIRES_SPLITTING) + if (existingQueueEntry != null) { - this.finishedRequests.incrementAndGet(); + return existingQueueEntry; } - if ((requestResult == null || requestResult == RequestResult.FAILED) - || (throwable != null && !(throwable instanceof CancellationException))) + RequestQueueEntry newEntry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp); + newEntry.future.whenComplete((requestResult, throwable) -> { - this.failedRequests.incrementAndGet(); - } + this.waitingTasksBySectionPos.remove(sectionPos); + + if (requestResult != RequestResult.REQUIRES_SPLITTING) + { + this.finishedRequests.incrementAndGet(); + } + + if ((requestResult == null || requestResult == RequestResult.FAILED) + || (throwable != null && !(throwable instanceof CancellationException))) + { + this.failedRequests.incrementAndGet(); + } + }); + + added.set(true); + return newEntry; }); - this.waitingTasksBySectionPos.put(sectionPos, entry); + if (!added.get()) + { + return CompletableFuture.completedFuture(RequestResult.FAILED); + } + return entry.future; } From 585a288f686347c9d7d7f0d6a8a4cbf141e534d1 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Tue, 3 Dec 2024 21:10:58 +0500 Subject: [PATCH 5/7] Fix gen tasks sometimes not submitting after LOD level changes --- .../FullDataSourceProviderV2.java | 4 +--- .../GeneratedFullDataSourceProvider.java | 22 ++++++++++------- .../RemoteFullDataSourceProvider.java | 23 +++++++++++++++--- .../AbstractFullDataNetworkRequestQueue.java | 24 +++++++++++-------- .../server/FullDataSourceRequestHandler.java | 2 +- .../core/render/LodRenderSection.java | 8 +------ 6 files changed, 51 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java index 901301b00..024eb7345 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java @@ -21,7 +21,6 @@ package com.seibel.distanthorizons.core.file.fullDatafile; import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; import com.seibel.distanthorizons.core.api.internal.ClientApi; -import com.seibel.distanthorizons.core.api.internal.SharedApi; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV1; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; @@ -41,7 +40,6 @@ import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo; import com.seibel.distanthorizons.core.util.ThreadUtil; import com.seibel.distanthorizons.core.util.objects.DataCorruptedException; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; -import com.seibel.distanthorizons.core.world.EWorldEnvironment; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.logging.log4j.Logger; @@ -606,7 +604,7 @@ public class FullDataSourceProviderV2 /** @return true if the position was queued, false if not */ @Nullable - public CompletableFuture queuePositionForRetrieval(Long genPos) { return null; } + public CompletableFuture queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests) { return null; } /** does nothing if the given position isn't present in the queue */ public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf) { } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index b16f55cb4..1d106e572 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -63,10 +63,10 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im * TODO this should be dynamically allocated based on CPU load * and abilities. */ - public static final int MAX_WORLD_GEN_REQUESTS_PER_THREAD = 20; + public static final int MAX_WORLD_GEN_REQUESTS_PER_THREAD = 20; - private final AtomicReference worldGenQueueRef = new AtomicReference<>(null); + protected final AtomicReference worldGenQueueRef = new AtomicReference<>(null); private final ArrayList onWorldGenTaskCompleteListeners = new ArrayList<>(); protected final DelayedFullDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedFullDataSourceSaveCache(this::onDataSourceSave, 5_000); @@ -173,8 +173,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im } - IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get(); - if (worldGenQueue == null) + if (this.worldGenQueueRef.get() == null) { // we can't queue anything if the world generator isn't set up yet return false; @@ -211,13 +210,11 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im return false; } - - // don't queue additional world gen requests beyond the max allotted count - return worldGenQueue.getWaitingTaskCount() < maxQueueCount; + return true; } @Override - public CompletableFuture queuePositionForRetrieval(Long genPos) + public CompletableFuture queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests) { IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get(); if (worldGenQueue == null) @@ -225,6 +222,15 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im return null; } + if (!allowAboveMaxGenRequests) + { + int maxQueueCount = MAX_WORLD_GEN_REQUESTS_PER_THREAD * Config.Common.MultiThreading.numberOfWorldGenerationThreads.get(); + if (worldGenQueue.getWaitingTaskCount() >= maxQueueCount) + { + return null; + } + } + WorldGenTaskTracker genTaskTracker = new WorldGenTaskTracker(genPos); CompletableFuture worldGenFuture = worldGenQueue.submitRetrievalTask(genPos, (byte) (DhSectionPos.getDetailLevel(genPos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL), genTaskTracker); worldGenFuture.whenComplete((genTaskResult, ex) -> diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java index 41c9fda74..c251fdcfd 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java @@ -20,18 +20,20 @@ package com.seibel.distanthorizons.core.file.fullDatafile; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; import com.seibel.distanthorizons.core.generation.RemoteWorldRetrievalQueue; +import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult; import com.seibel.distanthorizons.core.level.IDhLevel; import com.seibel.distanthorizons.core.level.WorldGenModule; import com.seibel.distanthorizons.core.multiplayer.client.SyncOnLoadRequestQueue; -import com.seibel.distanthorizons.core.pos.DhSectionPos; -import com.seibel.distanthorizons.coreapi.util.BitShiftUtil; +import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; +import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import org.jetbrains.annotations.Nullable; import java.io.File; -import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** @@ -40,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvider { + private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class); + @Nullable private final SyncOnLoadRequestQueue syncOnLoadRequestQueue; private final Set visitedPositions = ConcurrentHashMap.newKeySet(); @@ -123,6 +127,19 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide } + @Override + public CompletableFuture queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests) + { + RemoteWorldRetrievalQueue worldGenQueue = (RemoteWorldRetrievalQueue) this.worldGenQueueRef.get(); + if (worldGenQueue == null) + { + return null; + } + + return super.queuePositionForRetrieval(genPos, worldGenQueue.isPosCloserThanFarthestWaiting(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos()), genPos)); + } + + //==========// // shutdown // diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index b2ae7d5f7..e3df80701 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -183,7 +183,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende Map.Entry mapEntry = this.waitingTasksBySectionPos .entrySet().stream() .filter(task -> task.getValue().networkDataSourceFuture == null) - .min(Comparator.comparingInt(x -> posDistanceSquared(targetPos, x.getKey()))) + .min(Comparator.comparingInt(x -> DhSectionPos.getChebyshevSignedBlockDistance(x.getKey(), targetPos))) .orElse(null); if (mapEntry == null) @@ -305,6 +305,19 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } + public boolean isPosCloserThanFarthestWaiting(DhBlockPos2D targetPos, long pos) + { + Long farthestPos = this.waitingTasksBySectionPos + .keySet().stream() + .max(Comparator.comparingInt(x -> DhSectionPos.getChebyshevSignedBlockDistance(x, targetPos))) + .orElse(null); + if (farthestPos == null) + { + return true; + } + + return DhSectionPos.getChebyshevSignedBlockDistance(pos, targetPos) <= DhSectionPos.getChebyshevSignedBlockDistance(farthestPos, targetPos); + } //=========================================// @@ -404,15 +417,6 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende - //================// - // helper methods // - //================// - - protected static int posDistanceSquared(DhBlockPos2D targetPos, long pos) - { return (int) DhSectionPos.getCenterBlockPos(pos).distSquared(targetPos); } - - - //================// // helper classes // //================// diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java index 52cf38fd3..f52d623fb 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java @@ -246,7 +246,7 @@ public class FullDataSourceRequestHandler else { LOGGER.info("sending - queueing [" + DhSectionPos.toString(pos) + "]"); - this.fullDataSourceProvider().queuePositionForRetrieval(pos); + this.fullDataSourceProvider().queuePositionForRetrieval(pos, true); } }); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java index f58eb02c1..01987440d 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java @@ -475,14 +475,8 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable // queue from last to first to prevent shifting the array unnecessarily for (int i = this.missingGenerationPos.size() - 1; i >= 0; i--) { - if (!this.fullDataSourceProvider.canQueueRetrieval()) - { - // the data source provider isn't accepting any more jobs - break; - } - long pos = this.missingGenerationPos.removeLong(i); - boolean positionQueued = (this.fullDataSourceProvider.queuePositionForRetrieval(pos) != null); + boolean positionQueued = (this.fullDataSourceProvider.queuePositionForRetrieval(pos, false) != null); if (!positionQueued) { // shouldn't normally happen, but just in case From 0b4fa1b2ed386149cc00b626c3b74f592d5dcb5c Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Tue, 3 Dec 2024 21:23:37 +0500 Subject: [PATCH 6/7] Up protocol version --- .../main/java/com/seibel/distanthorizons/coreapi/ModInfo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java b/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java index a61287f87..b9b444732 100644 --- a/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java +++ b/api/src/main/java/com/seibel/distanthorizons/coreapi/ModInfo.java @@ -31,7 +31,7 @@ public final class ModInfo public static final String DEDICATED_SERVER_INITIAL_PATH = "dedicated_server_initial"; /** Incremented every time any packets are added, changed or removed, with a few exceptions. */ - public static final int PROTOCOL_VERSION = 7; + public static final int PROTOCOL_VERSION = 8; public static final String WRAPPER_PACKET_PATH = "message"; /** The internal mod name */ From 53e3c5c11cad8d9520dbc4617663bab259a425b6 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Wed, 4 Dec 2024 22:54:45 +0500 Subject: [PATCH 7/7] Clean up --- .../FullDataSourceProviderV2.java | 11 ++------ .../GeneratedFullDataSourceProvider.java | 2 -- .../RemoteFullDataSourceProvider.java | 26 ------------------- .../core/generation/WorldGenerationQueue.java | 13 +++------- .../core/level/AbstractDhServerLevel.java | 2 -- .../AbstractFullDataNetworkRequestQueue.java | 5 +--- .../server/DataSourceRequestGroup.java | 4 +-- .../core/pos/DhSectionPos.java | 22 ---------------- .../core/pos/blockPos/DhBlockPos2D.java | 1 - .../core/render/LodRenderSection.java | 6 +++++ 10 files changed, 15 insertions(+), 77 deletions(-) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java index 024eb7345..1ce14c48e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java @@ -136,15 +136,8 @@ public class FullDataSourceProviderV2 this.migrationThreadPool.execute(this::convertLegacyDataSources); // update propagation doesn't need to be run on the server since only the highest detail level is needed - //if (SharedApi.getEnvironment() != EWorldEnvironment.SERVER_ONLY) // TODO - //{ - this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue ["+levelId+"]"); - this.updateQueueProcessor.execute(this::runUpdateQueue); - //} - //else - //{ - // this.updateQueueProcessor = null; - //} + this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue [" + levelId + "]"); + this.updateQueueProcessor.execute(this::runUpdateQueue); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index 1d106e572..f347086e7 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -20,7 +20,6 @@ package com.seibel.distanthorizons.core.file.fullDatafile; import com.seibel.distanthorizons.api.enums.worldGeneration.EDhApiWorldGenerationStep; -import com.seibel.distanthorizons.core.api.internal.SharedApi; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; @@ -36,7 +35,6 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.util.LodUtil; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; -import com.seibel.distanthorizons.core.world.EWorldEnvironment; import com.seibel.distanthorizons.coreapi.util.BitShiftUtil; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.logging.log4j.Logger; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java index c251fdcfd..48c2f4e8f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/RemoteFullDataSourceProvider.java @@ -91,32 +91,6 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide // from server // //===========================// - // TODO - //// get the timestamp for every maximum detail position in this section - //int posToMinimumDetailScale = BitShiftUtil.powerOfTwo(DhSectionPos.getDetailLevel(pos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL); - //Map timestamps = this.getTimestampsForRange( - // DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, - // DhSectionPos.getX(pos) * posToMinimumDetailScale, - // DhSectionPos.getZ(pos) * posToMinimumDetailScale, - // (DhSectionPos.getX(pos) + 1) * posToMinimumDetailScale, - // (DhSectionPos.getZ(pos) + 1) * posToMinimumDetailScale - //); - // - //DhSectionPos.forEachChildAtDetailLevel(pos, DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos -> - //{ - // if (!this.visitedPositions.add(childPos)) - // { - // return; - // } - // - // // check if the server has newer versions of these LODs - // Long subTimestamp = timestamps.get(childPos); - // if (subTimestamp != null) - // { - // this.syncOnLoadRequestQueue.submitRequest(childPos, subTimestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave); - // } - //}); - Long timestamp = this.getTimestampForPos(pos); if (timestamp != null) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java index 0d8cf3b26..13f89c920 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java @@ -147,7 +147,6 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // Assert that the data at least can fill in 1 single ChunkSizedFullDataAccessor LodUtil.assertTrue(DhSectionPos.getDetailLevel(pos) > requiredDataDetail + LodUtil.CHUNK_DETAIL_LEVEL); - LOGGER.info("queueing gen ["+DhSectionPos.toString(pos)+"]"); CompletableFuture future = new CompletableFuture<>(); this.waitingTasks.put(pos, new WorldGenTask(pos, requiredDataDetail, tracker, future)); @@ -287,8 +286,6 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb { //LOGGER.trace("Unable to start task: "+closestTask.pos+", skipping. Task position may have already been generated."); } - - //LOGGER.info("started gen ["+DhSectionPos.toString(closestTask.pos)+"]"); } else { @@ -325,8 +322,6 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // send the child futures to the future recipient, to notify them of the new tasks closestTask.future.complete(WorldGenResult.CreateSplit(childFutures)); - //LOGGER.info("split ["+DhSectionPos.toString(sectionPos)+"]"); - // return true so we attempt to generate again return true; } @@ -484,8 +479,8 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // getters / setters // //===================// - public int getWaitingTaskCount() { return this.waitingTasks.size(); } - public int getInProgressTaskCount() { return this.inProgressGenTasksByLodPos.size(); } + @Override public int getWaitingTaskCount() { return this.waitingTasks.size(); } + @Override public int getInProgressTaskCount() { return this.inProgressGenTasksByLodPos.size(); } @Override public byte lowestDataDetail() { return this.lowestDataDetail; } @@ -497,7 +492,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb @Override public void setEstimatedTotalTaskCount(int newEstimate) { this.estimatedTotalTaskCount = newEstimate; } - public void addDebugMenuStringsToList(List messageList) { } + @Override public void addDebugMenuStringsToList(List messageList) { } @@ -505,7 +500,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb // shutdown // //==========// - public CompletableFuture startClosingAsync(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) + @Override public CompletableFuture startClosingAsync(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) { LOGGER.info("Closing world gen queue"); this.queueingThread.shutdownNow(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index 3cab0e086..c49b3b113 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -144,8 +144,6 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I ServerPlayerState.RateLimiterSet rateLimiterSet = serverPlayerState.getRateLimiterSet(this); - LOGGER.info("received message ["+DhSectionPos.toString(message.sectionPos)+"]"); - if (message.clientTimestamp == null) { if (distanceFromPlayer > Config.Server.maxGenerationRequestDistance.get()) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index e3df80701..daf7881bd 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -111,8 +111,6 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende { return this.submitRequest(sectionPos, null, dataSourceConsumer); } public CompletableFuture submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer dataSourceConsumer) { - //LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); - AtomicBoolean added = new AtomicBoolean(false); RequestQueueEntry entry = this.waitingTasksBySectionPos.compute(sectionPos, (k, existingQueueEntry) -> { @@ -180,8 +178,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } private void sendNextRequest(DhBlockPos2D targetPos) { - Map.Entry mapEntry = this.waitingTasksBySectionPos - .entrySet().stream() + Map.Entry mapEntry = this.waitingTasksBySectionPos.entrySet().stream() .filter(task -> task.getValue().networkDataSourceFuture == null) .min(Comparator.comparingInt(x -> DhSectionPos.getChebyshevSignedBlockDistance(x.getKey(), targetPos))) .orElse(null); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java index e183b1437..fbe415d33 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java @@ -68,7 +68,7 @@ class DataSourceRequestGroup } - public RequestData tryRemoveRequest(long requestId, HangingRequestTransferConsumer hangingRequestTransferConsumer) + public RequestData tryRemoveRequest(long requestId, IHangingRequestTransferConsumer hangingRequestTransferConsumer) { RequestData removed = this.requestMessages.remove(requestId); @@ -104,6 +104,6 @@ class DataSourceRequestGroup * This is a workaround that allows the caller to transfer these requests to a new group. */ @FunctionalInterface - interface HangingRequestTransferConsumer extends Consumer> { } + interface IHangingRequestTransferConsumer extends Consumer> { } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java b/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java index 033bb9ddb..64bf74423 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java @@ -275,28 +275,6 @@ public class DhSectionPos ) - getBlockWidth(pos) / 2; } - public static int signedDistance(long pos, DhBlockPos2D blockPos) - { - // Assuming Square2D has a method to get its center and size - double halfWidth = getBlockWidth(pos) / 2.0; - - // Calculate the distance from the point to the center of the square - double dx = blockPos.x - getCenterBlockPosX(pos); - double dy = blockPos.z - getCenterBlockPosZ(pos); - - // Calculate the horizontal and vertical distances to the edges of the square - double distanceX = Math.abs(dx) - halfWidth; - double distanceY = Math.abs(dy) - halfWidth; - - // The signed distance is the max of distanceX and distanceY - // This gives us the distance to the nearest edge - // If both distances are negative, the point is inside the square - double signedDistance = Math.max(distanceX, distanceY); - - return (int) signedDistance; // Cast to int, as per function signature - } - - //==================// diff --git a/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java b/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java index 13728ba81..82cd2d397 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/pos/blockPos/DhBlockPos2D.java @@ -75,7 +75,6 @@ public class DhBlockPos2D public long distSquared(DhBlockPos2D other) { return this.distSquared(other.x, other.z); } public long distSquared(int x, int z) { return MathUtil.pow2((long) this.x - x) + MathUtil.pow2((long) this.z - z); } - public long chebyshevDist(DhBlockPos2D other) { return Math.max(Math.abs(this.x - other.x), Math.abs(this.z - other.z)); } //===========// diff --git a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java index 01987440d..ebd8257d9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/render/LodRenderSection.java @@ -475,6 +475,12 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable // queue from last to first to prevent shifting the array unnecessarily for (int i = this.missingGenerationPos.size() - 1; i >= 0; i--) { + if (!this.fullDataSourceProvider.canQueueRetrieval()) + { + // the data source provider isn't accepting any more jobs + break; + } + long pos = this.missingGenerationPos.removeLong(i); boolean positionQueued = (this.fullDataSourceProvider.queuePositionForRetrieval(pos, false) != null); if (!positionQueued)