From e5033a0c0f68779340682cde6879f2c84c74b0ca Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Mon, 29 Jul 2024 13:10:04 +0500 Subject: [PATCH] Use FullDataPayload instead of reusing messages --- .../core/level/DhClientLevel.java | 2 +- .../core/level/DhServerLevel.java | 19 ++-- .../client/AbstractFullDataRequestQueue.java | 4 +- .../client/ClientNetworkState.java | 6 +- .../FullDataPartialUpdateMessage.java | 32 ++---- .../messages/fullData/FullDataPayload.java | 103 ++++++++++++++++++ .../FullDataSourceResponseMessage.java | 34 ++---- .../fullData/IFullDataPayloadMessage.java | 82 -------------- 8 files changed, 137 insertions(+), 145 deletions(-) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPayload.java delete mode 100644 core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java index 02260a3d3..50aa62a7b 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java @@ -139,7 +139,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel { try { - FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(msg); + FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(msg.payload); if (!msg.isSameLevelAs(this.levelWrapper)) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index 7a84219f8..c2b8290bb 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -28,6 +28,7 @@ import com.seibel.distanthorizons.core.multiplayer.server.RemotePlayerConnection import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException; import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage; @@ -173,9 +174,9 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel { rateLimiterSet.loginDataSyncRCLimiter.release(); - FullDataSourceResponseMessage responseMessage = new FullDataSourceResponseMessage(fullDataSource); - responseMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage); - msg.sendResponse(responseMessage); + FullDataPayload payload = new FullDataPayload(fullDataSource); + payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage); + msg.sendResponse(new FullDataSourceResponseMessage(payload)); }, executor); } })); @@ -289,7 +290,7 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel } CompletableFuture.runAsync(() -> { - try (FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource)) + try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource)) { for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) { @@ -302,8 +303,8 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel } serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release(); - response.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage); - msg.sendResponse(response.retain()); + payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage); + msg.sendResponse(new FullDataSourceResponseMessage(payload.retain())); } } }, executor); @@ -326,7 +327,7 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel } CompletableFuture.runAsync(() -> { - try (FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data)) + try (FullDataPayload payload = new FullDataPayload(data)) { for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers()) { @@ -345,8 +346,8 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() && distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius()) { - updateMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage); - serverPlayerState.session.sendMessage(updateMessage.retain()); + payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage); + serverPlayerState.session.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload.retain())); } } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java index 5f5e2eaf4..7455b9b15 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java @@ -190,9 +190,9 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable, throw throwable; } - if (response.dtoBufferId != null) + if (response.payload != null) { - FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response); + FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response.payload); FullDataSourceV2 fullDataSource = dataSourceDto.createPooledDataSource(this.level.getLevelWrapper()); entry.chunkDataConsumer.accept(fullDataSource); FullDataSourceV2.DATA_SOURCE_POOL.returnPooledDataSource(fullDataSource); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java index b84263111..850920e7e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java @@ -10,7 +10,7 @@ import com.seibel.distanthorizons.core.network.event.ScopedNetworkEventSource; import com.seibel.distanthorizons.core.network.event.CloseEvent; import com.seibel.distanthorizons.core.network.messages.base.RemotePlayerConfigMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataChunkMessage; -import com.seibel.distanthorizons.core.network.messages.fullData.IFullDataPayloadMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload; import com.seibel.distanthorizons.core.network.session.Session; import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import io.netty.buffer.ByteBufAllocator; @@ -86,9 +86,9 @@ public class ClientNetworkState implements Closeable }); } - public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(IFullDataPayloadMessage msg) + public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg) { - CompositeByteBuf composite = this.fullDataBuffers.remove(msg.getDtoBufferId()); + CompositeByteBuf composite = this.fullDataBuffers.remove(msg.dtoBufferId); Objects.requireNonNull(composite); try diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java index 973d5560a..29bd25403 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java @@ -20,40 +20,26 @@ package com.seibel.distanthorizons.core.network.messages.fullData; import com.google.common.base.MoreObjects; -import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.network.INetworkObject; import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage; import com.seibel.distanthorizons.core.network.messages.NetworkMessage; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper; import io.netty.buffer.ByteBuf; -import org.jetbrains.annotations.Nullable; -import java.util.Objects; - -public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage, IFullDataPayloadMessage +public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage { private String levelName; @Override public String getLevelName() { return this.levelName; } - @Nullable - public Integer dtoBufferId; - @Override - public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); } - @Override - public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; } - - public ByteBuf dtoBuffer; - @Override - public ByteBuf getDtoBuffer() { return this.dtoBuffer; } - @Override - public void setDtoBuffer(ByteBuf buffer) { this.dtoBuffer = buffer; } + public FullDataPayload payload; public FullDataPartialUpdateMessage() { } - public FullDataPartialUpdateMessage(IServerLevelWrapper level, FullDataSourceV2 fullDataSource) + public FullDataPartialUpdateMessage(IServerLevelWrapper level, FullDataPayload payload) { this.levelName = level.getKeyedLevelDimensionName(); - this.createCompressedDtoBuffer(fullDataSource); + this.payload = payload; } @Override @@ -63,15 +49,14 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev public void encode(ByteBuf out) { this.writeString(this.levelName, out); - out.writeInt(Objects.requireNonNull(this.dtoBufferId)); - this.dtoBuffer.release(); + this.payload.encode(out); } @Override public void decode(ByteBuf in) { this.levelName = this.readString(in); - this.dtoBufferId = in.readInt(); + this.payload = INetworkObject.decodeToInstance(new FullDataPayload(), in); } @@ -80,8 +65,7 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev { return super.toStringHelper() .add("levelName", this.levelName) - .add("dtoBufferId", this.dtoBufferId) - .add("dtoBuffer", this.dtoBuffer); + .add("payload", this.payload); } } \ No newline at end of file diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPayload.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPayload.java new file mode 100644 index 000000000..fbc830b1a --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPayload.java @@ -0,0 +1,103 @@ +package com.seibel.distanthorizons.core.network.messages.fullData; + +import com.google.common.base.MoreObjects; +import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.network.INetworkObject; +import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.jetbrains.annotations.NotNull; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + + +public class FullDataPayload implements INetworkObject, Closeable +{ + private static final AtomicInteger lastBufferId = new AtomicInteger(); + + public int dtoBufferId; + public ByteBuf dtoBuffer; + + + public FullDataPayload() { } + public FullDataPayload(@NotNull FullDataSourceV2 fullDataSource) + { + Objects.requireNonNull(fullDataSource); + + this.dtoBufferId = lastBufferId.getAndIncrement(); + + try + { + EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get(); + FullDataSourceV2DTO dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode); + + this.dtoBuffer = ByteBufAllocator.DEFAULT.buffer(); + dataSourceDto.encode(this.dtoBuffer); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + + public void acceptInChunkMessages(int chunkSize, Consumer chunkMessageConsumer) + { + for (int chunkNum = 0; ; chunkNum++) + { + int offset = chunkNum * chunkSize; + + int actualChunkSize = Math.min(this.dtoBuffer.writerIndex() - offset, chunkSize); + if (actualChunkSize <= 0) + { + break; + } + + FullDataChunkMessage chunk = new FullDataChunkMessage(this.dtoBufferId, chunkNum == 0, this.dtoBuffer.slice(offset, actualChunkSize)); + chunkMessageConsumer.accept(chunk); + } + } + + + public FullDataPayload retain() + { + this.dtoBuffer.retain(); + return this; + } + + @Override public void close() + { + this.dtoBuffer.release(); + } + + + @Override + public void encode(ByteBuf out) + { + out.writeInt(this.dtoBufferId); + this.dtoBuffer.release(); + } + + @Override + public void decode(ByteBuf in) + { + this.dtoBufferId = in.readInt(); + } + + + @Override + public String toString() + { + return MoreObjects.toStringHelper(this) + .add("dtoBufferId", this.dtoBufferId) + .add("dtoBuffer", this.dtoBuffer) + .toString(); + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java index 9cfc5fea4..a60fd5982 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java @@ -20,56 +20,43 @@ package com.seibel.distanthorizons.core.network.messages.fullData; import com.google.common.base.MoreObjects; -import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; +import com.seibel.distanthorizons.core.network.INetworkObject; import com.seibel.distanthorizons.core.network.messages.TrackableMessage; import io.netty.buffer.ByteBuf; import org.jetbrains.annotations.Nullable; -import java.util.Objects; - /** * Response message, containing the requested full data source, * or nothing if requested in updates-only mode and the data was not updated. */ -public class FullDataSourceResponseMessage extends TrackableMessage implements IFullDataPayloadMessage +public class FullDataSourceResponseMessage extends TrackableMessage { @Nullable - public Integer dtoBufferId; - @Override - public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); } - @Override - public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; } - - public ByteBuf dtoBuffer; - @Override - public ByteBuf getDtoBuffer() { return this.dtoBuffer; } - @Override - public void setDtoBuffer(ByteBuf buffer) { this.dtoBuffer = buffer; } + public FullDataPayload payload; public FullDataSourceResponseMessage() { } - public FullDataSourceResponseMessage(@Nullable FullDataSourceV2 fullDataSource) + public FullDataSourceResponseMessage(@Nullable FullDataPayload payload) { - if (fullDataSource != null) + if (payload != null) { - this.createCompressedDtoBuffer(fullDataSource); + this.payload = payload; } } @Override public void encode0(ByteBuf out) { - if (this.writeOptional(out, this.dtoBufferId)) + if (this.writeOptional(out, this.payload)) { - out.writeInt(this.dtoBufferId); - this.dtoBuffer.release(); + this.payload.encode(out); } } @Override public void decode0(ByteBuf in) { - this.dtoBufferId = this.readOptional(in, in::readInt); + this.payload = this.readOptional(in, () -> INetworkObject.decodeToInstance(new FullDataPayload(), in)); } @@ -77,8 +64,7 @@ public class FullDataSourceResponseMessage extends TrackableMessage implements I public MoreObjects.ToStringHelper toStringHelper() { return super.toStringHelper() - .add("dtoBufferId", this.dtoBufferId) - .add("dtoBuffer", this.dtoBuffer); + .add("payload", this.payload); } } \ No newline at end of file diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java deleted file mode 100644 index a631e0b76..000000000 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.seibel.distanthorizons.core.network.messages.fullData; - -import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; -import com.seibel.distanthorizons.core.config.Config; -import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; -import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - - -public interface IFullDataPayloadMessage> extends Closeable -{ - AtomicInteger lastBufferId = new AtomicInteger(); - - int getDtoBufferId(); - void setDtoBufferId(int bufferId); - - ByteBuf getDtoBuffer(); - void setDtoBuffer(ByteBuf buffer); - - - default void createCompressedDtoBuffer(FullDataSourceV2 fullDataSource) - { - Objects.requireNonNull(fullDataSource); - - int bufferId = lastBufferId.getAndIncrement(); - this.setDtoBufferId(bufferId); - - try - { - EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get(); - FullDataSourceV2DTO dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode); - - ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); - dataSourceDto.encode(buffer); - this.setDtoBuffer(buffer); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - default void splitIntoChunks(int chunkSize, Consumer chunkMessageConsumer) - { - int bufferId = this.getDtoBufferId(); - ByteBuf dtoBuffer = this.getDtoBuffer(); - - for (int chunkNum = 0; ; chunkNum++) - { - int offset = chunkNum * chunkSize; - - int actualChunkSize = Math.min(dtoBuffer.writerIndex() - offset, chunkSize); - if (actualChunkSize <= 0) - { - break; - } - - FullDataChunkMessage chunk = new FullDataChunkMessage(bufferId, chunkNum == 0, dtoBuffer.slice(offset, actualChunkSize)); - chunkMessageConsumer.accept(chunk); - } - } - - default T retain() - { - this.getDtoBuffer().retain(); - //noinspection unchecked - return (T)this; - } - - default void close() - { - this.getDtoBuffer().release(); - } - -}