From 3d11a208d739a7c91431810fddc8c0768cf62238 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Wed, 13 Nov 2024 18:19:22 +0500 Subject: [PATCH] Move request handling to another class and rewrite group locking logic --- .../core/level/AbstractDhServerLevel.java | 265 +----------------- .../server/DataSourceRequestGroup.java | 109 +++++++ .../server/FullDataSourceRequestHandler.java | 264 +++++++++++++++++ 3 files changed, 385 insertions(+), 253 deletions(-) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index 4c2e29675..8886d6232 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -4,21 +4,18 @@ import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2; import com.seibel.distanthorizons.core.file.structure.ISaveStructure; -import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.logging.f3.F3Screen; +import com.seibel.distanthorizons.core.multiplayer.server.FullDataSourceRequestHandler; import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState; import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager; import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; -import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; -import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage; import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage; import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage; import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; -import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage; import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D; @@ -28,21 +25,16 @@ import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import javax.annotation.CheckForNull; import javax.annotation.Nullable; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.*; public abstract class AbstractDhServerLevel extends AbstractDhLevel implements IDhServerLevel { protected static final Logger LOGGER = DhLoggerBuilder.getLogger(); - private static final ConfigBasedLogger NETWORK_LOGGER = new ConfigBasedLogger(LogManager.getLogger(), - () -> Config.Common.Logging.logNetworkEvent.get()); public final ServerLevelModule serverside; protected final IServerLevelWrapper serverLevelWrapper; @@ -56,11 +48,9 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I */ protected final ConcurrentLinkedQueue worldGenPlayerCenteringQueue = new ConcurrentLinkedQueue<>(); - private final ConcurrentMap requestGroupByPos = new ConcurrentHashMap<>(); - private final ConcurrentMap requestGroupByFutureId = new ConcurrentHashMap<>(); + private final FullDataSourceRequestHandler requestHandler = new FullDataSourceRequestHandler(this); - - private final boolean generatorSupportsNSizedGeneration = false; + private final boolean NSizedGenerationSupported = false; //=============// @@ -104,52 +94,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I @Override public void serverTick() { - // Send finished data source requests - for (Map.Entry entry : this.requestGroupByPos.entrySet()) - { - DataSourceRequestGroup requestGroup = entry.getValue(); - - if (requestGroup.fullDataSource == null) - { - continue; - } - - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]"); - - // Make this group unavailable for adding into - this.requestGroupByPos.remove(entry.getKey()); - requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); - if (executor == null) - { - LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); - continue; - } - CompletableFuture.runAsync(() -> - { - Objects.requireNonNull(this.beaconBeamRepo); - try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey()))) - { - for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) - { - this.requestGroupByFutureId.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState == null) - { - continue; - } - - serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { - msg.sendResponse(new FullDataSourceResponseMessage(payload)); - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - }); - } - } - }, executor); - } + this.requestHandler.tick(); } @Override @@ -188,149 +133,35 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I { serverPlayerState.networkSession.registerHandler(FullDataSourceRequestMessage.class, (message) -> { - if (!this.messagePlayerInThisLevel(message)) + if (!this.validatePlayerInCurrentLevel(message)) { - // we can't handle players in other levels, don't continue return; } - ServerPlayerState.RateLimiterSet rateLimiterSet = serverPlayerState.getRateLimiterSet(this); LOGGER.info("received message ["+DhSectionPos.toString(message.sectionPos)+"]"); if (message.clientTimestamp == null) { - this.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet); + this.requestHandler.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet); } else { - this.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet); + this.requestHandler.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet); } }); serverPlayerState.networkSession.registerHandler(CancelMessage.class, msg -> { - DataSourceRequestGroup requestGroup = this.requestGroupByFutureId.remove(msg.futureId); - if (requestGroup == null) - { - return; - } - - // If this fails, the group is being removed and completing cancellation is not necessary - if (requestGroup.requestRemoveSemaphore.tryAcquire()) - { - // Prevent adding requests in case the group will be removed by this cancellation - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestRemoveSemaphore.release(); - - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - - FullDataSourceRequestMessage requestMessage = requestGroup.requestMessages.remove(msg.futureId); - if (requestGroup.requestMessages.isEmpty()) - { - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestMessage.sectionPos) + "]."); - this.requestGroupByPos.remove(requestMessage.sectionPos); - this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos); - } - else - { - requestGroup.requestAddSemaphore.release(Short.MAX_VALUE); - } - } + this.requestHandler.cancelRequest(msg.futureId); }); } - private void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) - { - if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad()) - { - message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); - return; - } - - if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message)) - { - return; - } - - - // the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated - long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1; - // the server timestamp will be null if no LOD data exists for this position - Long serverTimestamp = this.serverside.fullDataFileHandler.getTimestampForPos(message.sectionPos); - if (serverTimestamp == null - || serverTimestamp <= clientTimestamp) - { - // either no data exists to sync, or the client is already up to date - rateLimiterSet.syncOnLoginRateLimiter.release(); - message.sendResponse(new FullDataSourceResponseMessage(null)); - return; - } - - - - ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); - if (executor == null) - { - // shouldn't normally happen, but just in case - LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); - return; - } - - this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource -> - { - Objects.requireNonNull(this.beaconBeamRepo); - try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos))) - { - serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> - { - message.sendResponse(new FullDataSourceResponseMessage(payload)); - rateLimiterSet.syncOnLoginRateLimiter.release(); - }); - } - }, executor); - } - private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) - { - if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled()) - { - message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); - return; - } - - if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message)) - { - return; - } - - while (true) - { - DataSourceRequestGroup requestGroup = this.requestGroupByPos.computeIfAbsent(message.sectionPos, pos -> - { - DataSourceRequestGroup newGroup = new DataSourceRequestGroup(); - this.tryFulfillDataSourceRequestGroup(newGroup, pos); - NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "]."); - return newGroup; - }); - - // If this fails, loop until either a permit is acquired or the group is removed to create another one - if (!requestGroup.requestAddSemaphore.tryAcquire()) - { - Thread.yield(); - continue; - } - - this.requestGroupByFutureId.put(message.futureId, requestGroup); - requestGroup.requestMessages.put(message.futureId, message); - requestGroup.requestAddSemaphore.release(); - break; - } - } /** May send an error message in response if the message is a {@link AbstractTrackableMessage} */ - private boolean messagePlayerInThisLevel(T message) + private boolean validatePlayerInCurrentLevel(T message) { if (!(message instanceof ILevelRelatedMessage)) { @@ -374,57 +205,12 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I // world gen // //===========// + public boolean isNSizedGenerationSupported() { return this.NSizedGenerationSupported; } + @Override public void onWorldGenTaskComplete(long pos) { - DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos); - if (requestGroup != null) - { - requestGroup.worldGenTaskComplete = true; - this.tryFulfillDataSourceRequestGroup(requestGroup, pos); - } - } - - private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos) - { - this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource -> - { - if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps)) - { - LOGGER.info("sending - complete ["+DhSectionPos.toString(pos)+"]"); - requestGroup.fullDataSource = fullDataSource; - } - else if (!this.generatorSupportsNSizedGeneration && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL) - { - // Make this group unavailable for adding into - this.requestGroupByPos.remove(pos); - requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - - for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) - { - this.requestGroupByFutureId.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState != null) - { - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - } - - msg.sendResponse(new SectionRequiresSplittingException()); - } - } - else if (requestGroup.worldGenTaskComplete) - { - LOGGER.info("sending - retry ["+DhSectionPos.toString(pos)+"]"); - this.tryFulfillDataSourceRequestGroup(requestGroup, pos); - } - else - { - LOGGER.info("sending - queueing ["+DhSectionPos.toString(pos)+"]"); - this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos); - } - }); + this.requestHandler.onWorldGenTaskComplete(pos); } @@ -564,31 +350,4 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I LOGGER.info("Closed DHLevel for [" + this.getLevelWrapper() + "]."); } - - - //================// - // helper classes // - //================// - - private static class DataSourceRequestGroup - { - public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); - - /** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */ - boolean worldGenTaskComplete = false; - - @CheckForNull - public FullDataSourceV2 fullDataSource = null; - - /** - * These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled, - * as opposed to ReentrantReadWriteLocks which would allow the locking thread continue using it anyway.
- * Short.MAX_VALUE is chosen as a large enough number so non-exclusive accesses never block each other. - */ - public final Semaphore requestAddSemaphore = new Semaphore(Short.MAX_VALUE, true); - /** @see DataSourceRequestGroup#requestAddSemaphore */ - public final Semaphore requestRemoveSemaphore = new Semaphore(Short.MAX_VALUE, true); - - } - } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java new file mode 100644 index 000000000..e183b1437 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/DataSourceRequestGroup.java @@ -0,0 +1,109 @@ +package com.seibel.distanthorizons.core.multiplayer.server; + +import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; + +import javax.annotation.CheckForNull; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +class DataSourceRequestGroup +{ + public final long pos; + + /** + * If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps + */ + private boolean worldGenTaskComplete = false; + + void markWorldGenTaskComplete() + { + this.worldGenTaskComplete = true; + } + + boolean isWorldGenTaskComplete() + { + return this.worldGenTaskComplete; + } + + @CheckForNull + public FullDataSourceV2 fullDataSource = null; + + public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); + public final Semaphore pendingAdditionSemaphore = new Semaphore(Short.MAX_VALUE, true); + public final AtomicBoolean isClosed = new AtomicBoolean(); + + + DataSourceRequestGroup(long pos) + { + this.pos = pos; + } + + public boolean tryClose() + { + if (!this.isClosed.compareAndSet(false, true)) + { + return false; + } + + this.pendingAdditionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + return true; + } + + public boolean tryAddRequest(RequestData requestData) + { + if (!this.pendingAdditionSemaphore.tryAcquire()) + { + return false; + } + + this.requestMessages.put(requestData.futureId(), requestData); + this.pendingAdditionSemaphore.release(); + + return true; + } + + + public RequestData tryRemoveRequest(long requestId, HangingRequestTransferConsumer hangingRequestTransferConsumer) + { + RequestData removed = this.requestMessages.remove(requestId); + + if (this.requestMessages.isEmpty() && this.tryClose()) + { + hangingRequestTransferConsumer.accept(this.requestMessages.values()); + } + + return removed; + } + + + static class RequestData + { + public final ServerPlayerState serverPlayerState; + public final ServerPlayerState.RateLimiterSet rateLimiterSet; + + public final FullDataSourceRequestMessage message; + public long futureId() { return this.message.futureId; } + public long sectionPos() { return this.message.sectionPos; } + + RequestData(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + this.serverPlayerState = serverPlayerState; + this.rateLimiterSet = rateLimiterSet; + this.message = message; + } + + } + + /** + * While closing this group, some requests may slip through and end up lost.
+ * This is a workaround that allows the caller to transfer these requests to a new group. + */ + @FunctionalInterface + interface HangingRequestTransferConsumer extends Consumer> { } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java new file mode 100644 index 000000000..52cf38fd3 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/FullDataSourceRequestHandler.java @@ -0,0 +1,264 @@ +package com.seibel.distanthorizons.core.multiplayer.server; + +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataSourceProvider; +import com.seibel.distanthorizons.core.level.AbstractDhServerLevel; +import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; +import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; +import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO; +import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; +import org.apache.logging.log4j.LogManager; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; + +public class FullDataSourceRequestHandler +{ + private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(), + () -> Config.Common.Logging.logNetworkEvent.get()); + + + private final AbstractDhServerLevel serverLevel; + private String getLevelIdentifier() { return this.serverLevel.getLevelWrapper().getDhIdentifier(); } + private GeneratedFullDataSourceProvider fullDataSourceProvider() { return this.serverLevel.serverside.fullDataFileHandler; } + private List getAllBeamsForPos(long pos) { return this.serverLevel.beaconBeamRepo.getAllBeamsForPos(pos); } + + private final ConcurrentMap requestGroupsByPos = new ConcurrentHashMap<>(); + private final ConcurrentMap requestGroupsByFutureId = new ConcurrentHashMap<>(); + + + public FullDataSourceRequestHandler(AbstractDhServerLevel serverLevel) + { + this.serverLevel = serverLevel; + } + + + //==================// + // network handling // + //==================// + + public void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad()) + { + message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); + return; + } + + if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message)) + { + return; + } + + + // the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated + long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1; + // the server timestamp will be null if no LOD data exists for this position + Long serverTimestamp = this.fullDataSourceProvider().getTimestampForPos(message.sectionPos); + if (serverTimestamp == null + || serverTimestamp <= clientTimestamp) + { + // either no data exists to sync, or the client is already up to date + rateLimiterSet.syncOnLoginRateLimiter.release(); + message.sendResponse(new FullDataSourceResponseMessage(null)); + return; + } + + + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) + { + // shouldn't normally happen, but just in case + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + return; + } + + this.fullDataSourceProvider().getAsync(message.sectionPos).thenAcceptAsync(fullDataSource -> + { + try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.getAllBeamsForPos(message.sectionPos))) + { + serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> + { + message.sendResponse(new FullDataSourceResponseMessage(payload)); + rateLimiterSet.syncOnLoginRateLimiter.release(); + }); + } + }, executor); + } + + public void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) + { + if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled()) + { + message.sendResponse(new RequestRejectedException("Operation is disabled in config.")); + return; + } + + if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message)) + { + return; + } + + this.doQueueWorldGenForRequestMessage(new DataSourceRequestGroup.RequestData(serverPlayerState, message, rateLimiterSet)); + } + + private void doQueueWorldGenForRequestMessage(DataSourceRequestGroup.RequestData requestData) + { + while (true) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.computeIfAbsent(requestData.sectionPos(), pos -> + { + DataSourceRequestGroup newGroup = new DataSourceRequestGroup(pos); + this.tryFulfillDataSourceRequestGroup(newGroup, pos); + LOGGER.debug("[" + this.getLevelIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "]."); + return newGroup; + }); + + // If this fails, loop until either a permit is acquired or the group is removed to create another one + if (!requestGroup.tryAddRequest(requestData)) + { + Thread.yield(); + continue; + } + + this.requestGroupsByFutureId.put(requestData.futureId(), requestGroup); + break; + } + } + + public void cancelRequest(long requestId) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByFutureId.remove(requestId); + if (requestGroup == null) + { + return; + } + + DataSourceRequestGroup.RequestData removedRequest = requestGroup.tryRemoveRequest(requestId, requestsToTransfer -> + { + LOGGER.debug("[" + this.getLevelIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestGroup.pos) + "]."); + this.requestGroupsByPos.remove(requestGroup.pos); + + if (!requestsToTransfer.isEmpty()) + { + for (DataSourceRequestGroup.RequestData requestToTransfer : requestsToTransfer) + { + this.doQueueWorldGenForRequestMessage(requestToTransfer); + } + } + else + { + this.fullDataSourceProvider().removeRetrievalRequestIf(pos -> pos == requestGroup.pos); + } + }); + + if (removedRequest != null) + { + removedRequest.rateLimiterSet.generationRequestRateLimiter.release(); + } + } + + + public void tick() + { + // Send finished data source requests + for (Map.Entry entry : this.requestGroupsByPos.entrySet()) + { + DataSourceRequestGroup requestGroup = entry.getValue(); + + if (requestGroup.fullDataSource == null) + { + continue; + } + + LOGGER.debug("[" + this.getLevelIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]"); + + // Make this group unavailable for adding into + this.requestGroupsByPos.remove(entry.getKey()); + if (!requestGroup.tryClose()) + { + continue; + } + + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) + { + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + continue; + } + CompletableFuture.runAsync(() -> + { + try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.getAllBeamsForPos(entry.getKey()))) + { + for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values()) + { + this.requestGroupsByFutureId.remove(requestData.futureId()); + + requestData.serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { + requestData.message.sendResponse(new FullDataSourceResponseMessage(payload)); + requestData.rateLimiterSet.generationRequestRateLimiter.release(); + }); + } + } + }, executor); + } + } + + private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos) + { + this.fullDataSourceProvider().getAsync(pos).thenAccept(fullDataSource -> + { + if (this.fullDataSourceProvider().isFullyGenerated(fullDataSource.columnGenerationSteps)) + { + LOGGER.info("sending - complete [" + DhSectionPos.toString(pos) + "]"); + requestGroup.fullDataSource = fullDataSource; + } + else if (!this.serverLevel.isNSizedGenerationSupported() && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL) + { + // Make this group unavailable for adding into + this.requestGroupsByPos.remove(pos); + if (!requestGroup.tryClose()) + { + return; + } + + for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values()) + { + this.requestGroupsByFutureId.remove(requestData.futureId()); + requestData.rateLimiterSet.generationRequestRateLimiter.release(); + requestData.message.sendResponse(new SectionRequiresSplittingException()); + } + } + else if (requestGroup.isWorldGenTaskComplete()) + { + LOGGER.info("sending - retry [" + DhSectionPos.toString(pos) + "]"); + this.tryFulfillDataSourceRequestGroup(requestGroup, pos); + } + else + { + LOGGER.info("sending - queueing [" + DhSectionPos.toString(pos) + "]"); + this.fullDataSourceProvider().queuePositionForRetrieval(pos); + } + }); + } + + public void onWorldGenTaskComplete(long pos) + { + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.get(pos); + if (requestGroup != null) + { + requestGroup.markWorldGenTaskComplete(); + this.tryFulfillDataSourceRequestGroup(requestGroup, pos); + } + } + +}