From bea1ff34b4f6d81e706fd40ccde4fcb226f6adf6 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Fri, 18 Oct 2024 01:08:12 +0500 Subject: [PATCH] Add LOD transfer speed setting --- .../distanthorizons/core/config/Config.java | 8 ++ .../core/level/AbstractDhServerLevel.java | 87 ++++++++------- .../core/level/DhClientLevel.java | 2 +- .../AbstractFullDataNetworkRequestQueue.java | 16 +-- .../client/ClientNetworkState.java | 48 +------- .../multiplayer/config/SessionConfig.java | 3 + .../fullData/FullDataPayload.java | 48 ++------ .../fullData/FullDataPayloadReceiver.java | 80 ++++++++++++++ .../fullData/FullDataPayloadSender.java | 104 ++++++++++++++++++ .../multiplayer/server/ServerPlayerState.java | 6 +- .../FullDataPartialUpdateMessage.java | 1 + .../FullDataSourceResponseMessage.java | 1 + .../fullData/FullDataSplitMessage.java | 17 ++- .../distanthorizons/core/util/TimerUtil.java | 13 +++ .../assets/distanthorizons/lang/en_us.json | 4 + 15 files changed, 302 insertions(+), 136 deletions(-) rename core/src/main/java/com/seibel/distanthorizons/core/{network/messages => multiplayer}/fullData/FullDataPayload.java (62%) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadSender.java diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java index 06207d222..410d90223 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java @@ -1564,6 +1564,14 @@ public class Config + "") .build(); + public static ConfigEntry maxDataTransferSpeed = new ConfigEntry.Builder() + .setServersideShortName("maxDataTransferSpeed") + .setMinDefaultMax(1, 500, 1000000 /* 1 GB/s */) + .comment("" + + "Maximum speed for uploading LODs to the clients, in KB/s." + + "") + .build(); + } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java index 88d650661..7598972e4 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/AbstractDhServerLevel.java @@ -14,7 +14,7 @@ import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage; import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage; import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage; -import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage; @@ -128,20 +128,23 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I CompletableFuture.runAsync(() -> { Objects.requireNonNull(this.beaconBeamRepo); - FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey())); - for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) + try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey()))) { - this.requestGroupByFutureId.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState == null) + for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) { - continue; + this.requestGroupByFutureId.remove(msg.futureId); + + ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer()); + if (serverPlayerState == null) + { + continue; + } + + serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { + msg.sendResponse(new FullDataSourceResponseMessage(payload)); + serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); + }); } - - serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release(); - payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, msg.getSession()::sendMessage); - msg.sendResponse(new FullDataSourceResponseMessage(payload)); } }, executor); } @@ -273,12 +276,14 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource -> { - rateLimiterSet.syncOnLoginRateLimiter.release(); - Objects.requireNonNull(this.beaconBeamRepo); - FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos)); - payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, message.getSession()::sendMessage); - message.sendResponse(new FullDataSourceResponseMessage(payload)); + try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos))) + { + serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> { + message.sendResponse(new FullDataSourceResponseMessage(payload)); + rateLimiterSet.syncOnLoginRateLimiter.release(); + }); + } }, executor); } private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet) @@ -371,6 +376,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos); if (requestGroup != null) { + requestGroup.worldGenTaskComplete = true; this.tryFulfillDataSourceRequestGroup(requestGroup, pos); } } @@ -379,7 +385,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I { this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource -> { - if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps)) + if (requestGroup.worldGenTaskComplete || this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps)) { requestGroup.fullDataSource = fullDataSource; } @@ -416,26 +422,30 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I CompletableFuture.runAsync(() -> { Objects.requireNonNull(this.beaconBeamRepo); - FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos())); - for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers()) + try (FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos()))) { - if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper) + for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers()) { - continue; - } - - if (!serverPlayerState.sessionConfig.isRealTimeUpdatesEnabled()) - { - continue; - } - - Vec3d playerPosition = serverPlayerState.getServerPlayer().getPosition(); - int distanceFromPlayer = DhSectionPos.getChebyshevBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16; - if (distanceFromPlayer >= serverPlayerState.getServerPlayer().getViewDistance() - && distanceFromPlayer <= serverPlayerState.sessionConfig.getMaxUpdateDistanceRadius()) - { - payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, serverPlayerState.networkSession::sendMessage); - serverPlayerState.networkSession.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload)); + if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper) + { + continue; + } + + if (!serverPlayerState.sessionConfig.isRealTimeUpdatesEnabled()) + { + continue; + } + + Vec3d playerPosition = serverPlayerState.getServerPlayer().getPosition(); + int distanceFromPlayer = DhSectionPos.getChebyshevBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16; + if (distanceFromPlayer >= serverPlayerState.getServerPlayer().getViewDistance() + && distanceFromPlayer <= serverPlayerState.sessionConfig.getMaxUpdateDistanceRadius()) + { + serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> + { + serverPlayerState.networkSession.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload)); + }); + } } } }, executor); @@ -492,8 +502,11 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I { public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); + /** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */ + boolean worldGenTaskComplete = false; + @CheckForNull - public FullDataSourceV2 fullDataSource; + public FullDataSourceV2 fullDataSource = null; /** * These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled, diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java index 514e1b30a..b5823ddda 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java @@ -137,7 +137,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel try { - FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(message.payload); + FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(message.payload); if (!message.isSameLevelAs(this.levelWrapper)) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java index 6bc49417e..944b8a52d 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataNetworkRequestQueue.java @@ -46,8 +46,6 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class); - private static final Timer TASK_FINISH_TIMER = TimerUtil.CreateTimer("RequestTaskFinishTimer"); - private static final int MAX_RETRY_ATTEMPTS = 3; protected static final long SHUTDOWN_TIMEOUT_SECONDS = 5; @@ -193,7 +191,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende if (response.payload != null) { - FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response.payload); + FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(response.payload); ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); if (executor == null) @@ -258,17 +256,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende } } - // Hack to work around a race condition - // If you finish the request too quickly, the section will never render - TASK_FINISH_TIMER.schedule(new TimerTask() - { - @Override - public void run() - { - entry.future.complete(true); - } - }, 10000); - return null; + return entry.future.complete(true); }); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java index 7f81555b2..358d15a31 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java @@ -1,11 +1,10 @@ package com.seibel.distanthorizons.core.multiplayer.client; -import com.google.common.cache.CacheBuilder; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector; import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; import com.seibel.distanthorizons.core.multiplayer.config.SessionConfig; -import com.seibel.distanthorizons.core.network.INetworkObject; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayloadReceiver; import com.seibel.distanthorizons.core.network.event.ScopedNetworkEventSource; import com.seibel.distanthorizons.core.network.event.internal.CloseInternalEvent; import com.seibel.distanthorizons.core.network.event.internal.IncompatibleMessageInternalEvent; @@ -14,21 +13,14 @@ import com.seibel.distanthorizons.core.network.messages.base.SessionConfigMessag import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage; -import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.session.NetworkSession; -import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; -import com.seibel.distanthorizons.core.util.LodUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import com.seibel.distanthorizons.coreapi.ModInfo; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; import org.apache.logging.log4j.LogManager; import org.jetbrains.annotations.Nullable; import java.io.Closeable; import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; public class ClientNetworkState implements Closeable { @@ -38,10 +30,7 @@ public class ClientNetworkState implements Closeable private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class); - private final ConcurrentMap fullDataBufferById = CacheBuilder.newBuilder() - .expireAfterAccess(10, TimeUnit.SECONDS) - .build() - .asMap(); + public final FullDataPayloadReceiver fullDataPayloadReceiver = new FullDataPayloadReceiver(); private final SessionConfig.AnyChangeListener configAnyChangeListener = new SessionConfig.AnyChangeListener(this::sendConfigMessage); @@ -120,22 +109,7 @@ public class ClientNetworkState implements Closeable this.configReceived = true; }); - this.networkSession.registerHandler(FullDataSplitMessage.class, message -> - { - if (message.isFirst) - { - CompositeByteBuf composite = this.fullDataBufferById.remove(message.bufferId); - if (composite != null) - { - composite.release(); - LOGGER.debug("Released full data buffer [" + message.bufferId + "]: [" + composite + "]"); - } - } - - CompositeByteBuf byteBuffer = this.fullDataBufferById.computeIfAbsent(message.bufferId, bufferId -> ByteBufAllocator.DEFAULT.compositeBuffer()); - byteBuffer.addComponent(true, message.buffer); - LOGGER.debug("Full data buffer [" + message.bufferId + "]: [" + byteBuffer + "]."); - }); + this.networkSession.registerHandler(FullDataSplitMessage.class, this.fullDataPayloadReceiver::receiveChunk); } } @@ -145,20 +119,7 @@ public class ClientNetworkState implements Closeable // send message // //==============// - public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg) - { - CompositeByteBuf compositeByteBuffer = this.fullDataBufferById.remove(msg.dtoBufferId); - LodUtil.assertTrue(compositeByteBuffer != null); - - try - { - return INetworkObject.decodeToInstance(FullDataSourceV2DTO.CreateEmptyDataSource(), compositeByteBuffer); - } - finally - { - compositeByteBuffer.release(); - } - } + public void sendConfigMessage() { @@ -198,6 +159,7 @@ public class ClientNetworkState implements Closeable @Override public void close() { + this.fullDataPayloadReceiver.close(); this.configAnyChangeListener.close(); this.networkSession.close(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java index fd4483151..f0b7bd82c 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java @@ -40,6 +40,8 @@ public class SessionConfig implements INetworkObject registerConfigEntry(Config.Server.synchronizeOnLoad, (x, y) -> x && y); registerConfigEntry(Config.Server.syncOnLoadRateLimit, Math::min); + + registerConfigEntry(Config.Server.maxDataTransferSpeed, Math::min); } public SessionConfig() {} @@ -56,6 +58,7 @@ public class SessionConfig implements INetworkObject public boolean isRealTimeUpdatesEnabled() { return this.getValue(Config.Server.enableRealTimeUpdates); } public boolean getSynchronizeOnLoad() { return this.getValue(Config.Server.synchronizeOnLoad); } public int getSyncOnLoginRateLimit() { return this.getValue(Config.Server.syncOnLoadRateLimit); } + public int getMaxDataTransferSpeed() { return this.getValue(Config.Server.maxDataTransferSpeed); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPayload.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayload.java similarity index 62% rename from core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPayload.java rename to core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayload.java index bbd9df68c..c550b385c 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPayload.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayload.java @@ -1,13 +1,13 @@ -package com.seibel.distanthorizons.core.network.messages.fullData; +package com.seibel.distanthorizons.core.multiplayer.fullData; import com.google.common.base.MoreObjects; import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; import com.seibel.distanthorizons.core.network.INetworkObject; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage; import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO; import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; -import com.seibel.distanthorizons.core.util.TimerUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import org.jetbrains.annotations.NotNull; @@ -20,13 +20,10 @@ import java.util.function.Consumer; /** * @see FullDataSplitMessage */ -public class FullDataPayload implements INetworkObject +public class FullDataPayload implements INetworkObject, AutoCloseable { private static final AtomicInteger lastBufferId = new AtomicInteger(); - // Reference counting is unreliable here for some reason so this is a "fix" - private static final Timer bufferCleanupTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer"); - public int dtoBufferId; public ByteBuf dtoBuffer; @@ -52,15 +49,6 @@ public class FullDataPayload implements INetworkObject this.dtoBuffer = ByteBufAllocator.DEFAULT.buffer(); dataSourceDto.encode(this.dtoBuffer); - - bufferCleanupTimer.schedule(new TimerTask() - { - @Override - public void run() - { - FullDataPayload.this.dtoBuffer.release(); - } - }, 5000L); } catch (IOException e) { @@ -90,36 +78,18 @@ public class FullDataPayload implements INetworkObject this.beaconBeams = this.readCollection(in, new ArrayList<>(), () -> new BeaconBeamDTO(null, null)); } - /** - * Used to send {@link FullDataPayload}'s since the data they contain may be larger - * than what a single packet could contain. - * - * @param payloadChunkSizeInBytes how many bytes can be sent in a single message - */ - public void splitAndSend(int payloadChunkSizeInBytes, Consumer sendMessageConsumer) - { - // chunk in this context means chunk of data, not a MC chunk - for (int payloadChunkNum = 0; ; payloadChunkNum++) - { - int offset = payloadChunkNum * payloadChunkSizeInBytes; - - int actualChunkSize = Math.min(this.dtoBuffer.writerIndex() - offset, payloadChunkSizeInBytes); - if (actualChunkSize <= 0) - { - break; - } - - FullDataSplitMessage chunk = new FullDataSplitMessage(this.dtoBufferId, payloadChunkNum == 0, this.dtoBuffer.slice(offset, actualChunkSize)); - sendMessageConsumer.accept(chunk); - } - } - //================// // base overrides // //================// + @Override + public void close() + { + this.dtoBuffer.release(); + } + @Override public String toString() { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java new file mode 100644 index 000000000..f900046c3 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java @@ -0,0 +1,80 @@ +package com.seibel.distanthorizons.core.multiplayer.fullData; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalNotification; +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; +import com.seibel.distanthorizons.core.network.INetworkObject; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage; +import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; +import com.seibel.distanthorizons.core.util.LodUtil; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import org.apache.logging.log4j.LogManager; + +import java.util.Objects; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +public class FullDataPayloadReceiver implements AutoCloseable +{ + private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(), + () -> Config.Common.Logging.logNetworkEvent.get()); + + private final ConcurrentMap buffersById = CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.SECONDS) + .removalListener((RemovalNotification notification) -> Objects.requireNonNull(notification.getValue()).release()) + .build().asMap(); + + + + @Override + public void close() + { + this.buffersById.clear(); + } + + public void receiveChunk(FullDataSplitMessage message) + { + this.buffersById.compute(message.bufferId, (bufferId, composite) -> + { + if (message.isFirst) + { + if (composite != null) + { + composite.release(); + LOGGER.debug("Released existing full data buffer [" + message.bufferId + "]"); + } + + composite = ByteBufAllocator.DEFAULT.compositeBuffer(); + LOGGER.debug("Created new full data buffer [" + message.bufferId + "]: [" + composite + "]"); + } + else if (composite == null) + { + LOGGER.debug("Received non-first full data chunk for empty buffer [" + message.bufferId + "]: [" + message.buffer + "]."); + return null; + } + + composite.addComponent(true, message.buffer); + LOGGER.debug("Updated full data buffer [" + message.bufferId + "]: [" + composite + "]."); + return composite; + }); + } + + public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg) + { + CompositeByteBuf compositeByteBuffer = this.buffersById.get(msg.dtoBufferId); + LodUtil.assertTrue(compositeByteBuffer != null); + + try + { + return INetworkObject.decodeToInstance(FullDataSourceV2DTO.CreateEmptyDataSource(), compositeByteBuffer); + } + finally + { + // Releasing the buffer is handled by cache + this.buffersById.remove(msg.dtoBufferId); + } + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadSender.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadSender.java new file mode 100644 index 000000000..d2e89190e --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadSender.java @@ -0,0 +1,104 @@ +package com.seibel.distanthorizons.core.multiplayer.fullData; + +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage; +import com.seibel.distanthorizons.core.network.session.NetworkSession; +import com.seibel.distanthorizons.core.util.TimerUtil; +import io.netty.buffer.ByteBuf; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.*; + +public class FullDataPayloadSender implements AutoCloseable +{ + private static final int TICK_RATE = 4; + + private static final Timer UPLOAD_TIMER = TimerUtil.CreateTimer("FullDataPayloadSender"); + private final TimerTask tickTimerTask = TimerUtil.createTimerTask(this::tick); + + private final NetworkSession session; + private final IntSupplier maxKBpsSupplier; + private final ConcurrentLinkedQueue transferQueue = new ConcurrentLinkedQueue<>(); + + public FullDataPayloadSender(NetworkSession session, IntSupplier maxKBpsSupplier) + { + this.session = session; + this.maxKBpsSupplier = maxKBpsSupplier; + UPLOAD_TIMER.scheduleAtFixedRate(this.tickTimerTask, 0, 1000 / TICK_RATE); + } + + @Override + public void close() + { + this.tickTimerTask.cancel(); + + PendingTransfer pendingTransfer; + while ((pendingTransfer = this.transferQueue.poll()) != null) + { + pendingTransfer.close(); + } + } + + + public void sendInChunks(FullDataPayload payload, Runnable sendFinalMessage) + { + this.transferQueue.add(new PendingTransfer(payload, sendFinalMessage)); + } + + private void tick() + { + int bytesToSend = (this.maxKBpsSupplier.getAsInt() * 1000) / TICK_RATE; + while (bytesToSend > 0) + { + PendingTransfer pendingTransfer = this.transferQueue.peek(); + if (pendingTransfer == null) + { + return; + } + + int chunkSize = Math.min(bytesToSend, pendingTransfer.buffer.readableBytes()); + boolean isFirstChunk = pendingTransfer.buffer.readerIndex() == 0; + + FullDataSplitMessage chunkMessage = new FullDataSplitMessage(pendingTransfer.bufferId, pendingTransfer.buffer.readRetainedSlice(chunkSize), isFirstChunk); + this.session.sendMessage(chunkMessage); + + bytesToSend -= chunkSize; + + if (pendingTransfer.buffer.readableBytes() == 0) + { + pendingTransfer.sendFinalMessage.run(); + pendingTransfer.close(); + this.transferQueue.poll(); + } + } + } + + + private static class PendingTransfer implements AutoCloseable + { + public final int bufferId; + public final ByteBuf buffer; + public final Runnable sendFinalMessage; + private final AtomicBoolean isClosed = new AtomicBoolean(); + + private PendingTransfer(FullDataPayload payload, Runnable sendFinalMessage) + { + this.bufferId = payload.dtoBufferId; + this.buffer = payload.dtoBuffer.retainedDuplicate().readerIndex(0); + this.sendFinalMessage = sendFinalMessage; + } + + @Override + public void close() + { + if (this.isClosed.compareAndSet(false, true)) + { + this.buffer.release(); + } + } + + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java index c263bc381..3a0826715 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java @@ -4,6 +4,7 @@ import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener; import com.seibel.distanthorizons.core.level.AbstractDhServerLevel; import com.seibel.distanthorizons.core.multiplayer.config.SessionConfig; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayloadSender; import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage; import com.seibel.distanthorizons.core.network.messages.base.SessionConfigMessage; import com.seibel.distanthorizons.core.network.event.internal.CloseInternalEvent; @@ -34,12 +35,13 @@ public class ServerPlayerState implements Closeable public final SessionConfig sessionConfig = new SessionConfig(); public boolean isReady() { return this.sessionConfig.constrainingConfig != null; } + public final FullDataPayloadSender fullDataPayloadSender; + private final ConcurrentHashMap rateLimiterSets = new ConcurrentHashMap<>(); public RateLimiterSet getRateLimiterSet(AbstractDhServerLevel level) { return this.rateLimiterSets.computeIfAbsent(level, ignored -> new RateLimiterSet()); } public void clearRateLimiterSets() { this.rateLimiterSets.clear(); } - //==============// // constructors // //==============// @@ -47,6 +49,7 @@ public class ServerPlayerState implements Closeable public ServerPlayerState(IServerPlayerWrapper serverPlayer) { this.networkSession = new NetworkSession(serverPlayer); + this.fullDataPayloadSender = new FullDataPayloadSender(this.networkSession, this.sessionConfig::getMaxDataTransferSpeed); this.networkSession.registerHandler(SessionConfigMessage.class, (sessionConfigMessage) -> { @@ -92,6 +95,7 @@ public class ServerPlayerState implements Closeable @Override public void close() { + this.fullDataPayloadSender.close(); this.levelKeyPrefixChangeListener.close(); this.configAnyChangeListener.close(); this.networkSession.close(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java index 86e6e7eaa..d1fd818ee 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java @@ -20,6 +20,7 @@ package com.seibel.distanthorizons.core.network.messages.fullData; import com.google.common.base.MoreObjects; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.INetworkObject; import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java index ac85091bb..74ad28382 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java @@ -20,6 +20,7 @@ package com.seibel.distanthorizons.core.network.messages.fullData; import com.google.common.base.MoreObjects; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.INetworkObject; import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage; import io.netty.buffer.ByteBuf; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSplitMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSplitMessage.java index ae94d7ac1..4c164249f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSplitMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSplitMessage.java @@ -20,9 +20,13 @@ package com.seibel.distanthorizons.core.network.messages.fullData; import com.google.common.base.MoreObjects; +import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage; +import com.seibel.distanthorizons.core.util.TimerUtil; import io.netty.buffer.ByteBuf; +import java.util.Timer; + /** * Used to send part of a {@link FullDataPayload}. * @@ -30,10 +34,15 @@ import io.netty.buffer.ByteBuf; */ public class FullDataSplitMessage extends AbstractNetworkMessage { + private static final long BUFFER_RELEASE_DELAY_MS = 5000L; + public int bufferId; public ByteBuf buffer; public boolean isFirst; + // Reference counting is unreliable here for some reason so this is a "fix" + private static final Timer bufferReleaseTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer"); + private boolean releaseScheduled = false; //==============// @@ -41,7 +50,7 @@ public class FullDataSplitMessage extends AbstractNetworkMessage //==============// public FullDataSplitMessage() { } - public FullDataSplitMessage(int bufferId, boolean isFirst, ByteBuf buffer) + public FullDataSplitMessage(int bufferId, ByteBuf buffer, boolean isFirst) { this.bufferId = bufferId; this.buffer = buffer; @@ -63,6 +72,12 @@ public class FullDataSplitMessage extends AbstractNetworkMessage out.writeBytes(this.buffer.readerIndex(0)); out.writeBoolean(this.isFirst); + + if (!this.releaseScheduled) + { + bufferReleaseTimer.schedule(TimerUtil.createTimerTask(this.buffer::release), BUFFER_RELEASE_DELAY_MS); + this.releaseScheduled = true; + } } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/TimerUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/TimerUtil.java index d000596a9..b5463964c 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/TimerUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/TimerUtil.java @@ -1,6 +1,7 @@ package com.seibel.distanthorizons.core.util; import java.util.Timer; +import java.util.TimerTask; /** * Handles creating timers. @@ -17,4 +18,16 @@ public class TimerUtil return new Timer(ThreadUtil.THREAD_NAME_PREFIX+timerName, true); } + public static TimerTask createTimerTask(Runnable runMethod) + { + return new TimerTask() + { + @Override + public void run() + { + runMethod.run(); + } + }; + } + } diff --git a/core/src/main/resources/assets/distanthorizons/lang/en_us.json b/core/src/main/resources/assets/distanthorizons/lang/en_us.json index e4b8b8509..c2d50c5ae 100644 --- a/core/src/main/resources/assets/distanthorizons/lang/en_us.json +++ b/core/src/main/resources/assets/distanthorizons/lang/en_us.json @@ -687,6 +687,10 @@ "Rate Limit for Sync on Load", "distanthorizons.config.server.syncOnLoadRateLimit.@tooltip": "How many LOD sync requests per second should a client send? \nAlso limits the amount of player's requests allowed to stay in the server's queue.", + "distanthorizons.config.server.maxDataTransferSpeed": + "Maximum Data Transfer Speed, KB/s", + "distanthorizons.config.server.maxDataTransferSpeed.@tooltip": + "Maximum speed for uploading LODs to the clients, in KB/s",