diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java index 3bbe1c2c0..9c7eda259 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java @@ -36,6 +36,7 @@ import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartial import com.seibel.distanthorizons.core.pos.DhBlockPos; import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; +import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import com.seibel.distanthorizons.core.wrapperInterfaces.block.IBlockStateWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IProfilerWrapper; @@ -128,17 +129,20 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel private void registerNetworkHandlers() { assert this.eventSource != null; + assert this.networkState != null; this.eventSource.registerHandler(FullDataPartialUpdateMessage.class, msg -> { try { + FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(msg); + if (!msg.isSameLevelAs(this.levelWrapper)) { return; } - this.updateDataSourcesAsync(msg.dataSourceDto.createPooledDataSource(this.levelWrapper)); + this.updateDataSourcesAsync(dataSourceDto.createPooledDataSource(this.levelWrapper)); } catch (Exception e) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index f0a792379..241181ba2 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -279,20 +279,22 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel } CompletableFuture.runAsync(() -> { - FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource); - for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) + try (FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource)) { - this.requestGroupsByFutureId.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState == null) + for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) { - continue; + this.requestGroupsByFutureId.remove(msg.futureId); + + ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer()); + if (serverPlayerState == null) + { + continue; + } + + serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release(); + response.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage); + msg.sendResponse(response.retain()); } - - serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release(); - response.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage); - msg.sendResponse(response); } }, executor); } @@ -314,26 +316,28 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel } CompletableFuture.runAsync(() -> { - FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data); - for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers()) + try (FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data)) { - if (serverPlayerState.serverPlayer().getLevel() != this.serverLevelWrapper) + for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers()) { - continue; - } - - if (!serverPlayerState.config.isRealTimeUpdatesEnabled()) - { - continue; - } - - Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition(); - int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16; - if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() && - distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius()) - { - updateMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage); - serverPlayerState.session.sendMessage(updateMessage); + if (serverPlayerState.serverPlayer().getLevel() != this.serverLevelWrapper) + { + continue; + } + + if (!serverPlayerState.config.isRealTimeUpdatesEnabled()) + { + continue; + } + + Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition(); + int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16; + if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() && + distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius()) + { + updateMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage); + serverPlayerState.session.sendMessage(updateMessage.retain()); + } } } }, executor); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java index bae5b6429..5f5e2eaf4 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java @@ -17,6 +17,7 @@ import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; +import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import com.seibel.distanthorizons.core.util.LodUtil; import com.seibel.distanthorizons.core.util.ratelimiting.SupplierBasedRateLimiter; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; @@ -189,9 +190,10 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable, throw throwable; } - if (response.dataSourceDto != null) + if (response.dtoBufferId != null) { - FullDataSourceV2 fullDataSource = response.dataSourceDto.createPooledDataSource(this.level.getLevelWrapper()); + FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response); + FullDataSourceV2 fullDataSource = dataSourceDto.createPooledDataSource(this.level.getLevelWrapper()); entry.chunkDataConsumer.accept(fullDataSource); FullDataSourceV2.DATA_SOURCE_POOL.returnPooledDataSource(fullDataSource); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java index 7a27e13d6..a63f8e2db 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java @@ -1,17 +1,27 @@ package com.seibel.distanthorizons.core.multiplayer.client; +import com.google.common.cache.CacheBuilder; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.logging.ConfigBasedLogger; import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig; import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfigChangeListener; +import com.seibel.distanthorizons.core.network.INetworkObject; import com.seibel.distanthorizons.core.network.event.ScopedNetworkEventSource; import com.seibel.distanthorizons.core.network.event.CloseEvent; import com.seibel.distanthorizons.core.network.messages.base.RemotePlayerConfigMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataChunkMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.IFullDataPayloadMessage; import com.seibel.distanthorizons.core.network.session.Session; +import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; import org.apache.logging.log4j.LogManager; import java.io.Closeable; import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; public class ClientNetworkState implements Closeable { @@ -33,6 +43,12 @@ public class ClientNetworkState implements Closeable */ public Session getSession() { return this.session; } + private final ConcurrentMap fullDataBuffers = CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.SECONDS) + .build() + .asMap(); + + /** * Constructs a new instance. */ @@ -51,6 +67,38 @@ public class ClientNetworkState implements Closeable { this.configReceived = false; }); + + this.session.registerHandler(FullDataChunkMessage.class, msg -> + { + if (msg.isFirst) + { + CompositeByteBuf composite = this.fullDataBuffers.remove(msg.bufferId); + if (composite != null) + { + composite.release(); + LOGGER.debug("Released full data buffer {}: {}", msg.bufferId, composite); + } + } + + CompositeByteBuf composite = this.fullDataBuffers.computeIfAbsent(msg.bufferId, bufferId -> ByteBufAllocator.DEFAULT.compositeBuffer()); + composite.addComponent(true, msg.buffer); + LOGGER.debug("Full data buffer {}: {}", msg.bufferId, composite); + }); + } + + public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(IFullDataPayloadMessage msg) + { + CompositeByteBuf composite = this.fullDataBuffers.remove(msg.getDtoBufferId()); + + try + { + Objects.requireNonNull(composite); + return INetworkObject.decodeToInstance(new FullDataSourceV2DTO(), composite); + } + finally + { + composite.release(); + } } public void sendConfigMessage() diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataChunkMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataChunkMessage.java index ac9768a0f..000715c74 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataChunkMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataChunkMessage.java @@ -27,13 +27,15 @@ public class FullDataChunkMessage extends NetworkMessage { public int bufferId; public ByteBuf buffer; + public boolean isFirst; public FullDataChunkMessage() { } - public FullDataChunkMessage(int bufferId, ByteBuf buffer) + public FullDataChunkMessage(int bufferId, boolean isFirst, ByteBuf buffer) { this.bufferId = bufferId; this.buffer = buffer; + this.isFirst = isFirst; } @@ -46,15 +48,20 @@ public class FullDataChunkMessage extends NetworkMessage out.writeInt(this.bufferId); out.writeInt(this.buffer.writerIndex()); - this.buffer.resetReaderIndex(); - out.writeBytes(this.buffer); + out.writeBytes(this.buffer.readerIndex(0)); + + out.writeBoolean(this.isFirst); } @Override public void decode(ByteBuf in) { + this.bufferId = in.readInt(); + int bufferSize = in.readInt(); this.buffer = in.readBytes(bufferSize); + + this.isFirst = in.readBoolean(); } @@ -63,7 +70,8 @@ public class FullDataChunkMessage extends NetworkMessage { return super.toStringHelper() .add("bufferId", this.bufferId) - .add("buffer", this.buffer); + .add("buffer", this.buffer) + .add("isFirst", this.isFirst); } } \ No newline at end of file diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java index ab022bdee..973d5560a 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java @@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable; import java.util.Objects; -public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage, IFullDataPayloadMessage +public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage, IFullDataPayloadMessage { private String levelName; @Override @@ -37,8 +37,8 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev @Nullable public Integer dtoBufferId; - @Override @Nullable - public Integer getDtoBufferId() { return this.dtoBufferId; } + @Override + public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); } @Override public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; } @@ -56,7 +56,6 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev this.createCompressedDtoBuffer(fullDataSource); } - @Override public boolean warnWhenUnhandled() { return false; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java index cf016ba28..9cfc5fea4 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java @@ -25,16 +25,18 @@ import com.seibel.distanthorizons.core.network.messages.TrackableMessage; import io.netty.buffer.ByteBuf; import org.jetbrains.annotations.Nullable; +import java.util.Objects; + /** * Response message, containing the requested full data source, * or nothing if requested in updates-only mode and the data was not updated. */ -public class FullDataSourceResponseMessage extends TrackableMessage implements IFullDataPayloadMessage +public class FullDataSourceResponseMessage extends TrackableMessage implements IFullDataPayloadMessage { @Nullable public Integer dtoBufferId; - @Override @Nullable - public Integer getDtoBufferId() { return this.dtoBufferId; } + @Override + public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); } @Override public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java index 71dfd98cc..a631e0b76 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/IFullDataPayloadMessage.java @@ -6,18 +6,19 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSour import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import org.jetbrains.annotations.Nullable; +import java.io.Closeable; import java.io.IOException; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -public interface IFullDataPayloadMessage + +public interface IFullDataPayloadMessage> extends Closeable { AtomicInteger lastBufferId = new AtomicInteger(); - @Nullable - Integer getDtoBufferId(); + + int getDtoBufferId(); void setDtoBufferId(int bufferId); ByteBuf getDtoBuffer(); @@ -48,22 +49,34 @@ public interface IFullDataPayloadMessage default void splitIntoChunks(int chunkSize, Consumer chunkMessageConsumer) { + int bufferId = this.getDtoBufferId(); ByteBuf dtoBuffer = this.getDtoBuffer(); - int bufferId = Objects.requireNonNull(this.getDtoBufferId()); for (int chunkNum = 0; ; chunkNum++) { int offset = chunkNum * chunkSize; - int bytesLeft = dtoBuffer.writerIndex() - offset; - - if (offset >= dtoBuffer.writerIndex()) + + int actualChunkSize = Math.min(dtoBuffer.writerIndex() - offset, chunkSize); + if (actualChunkSize <= 0) { break; } - FullDataChunkMessage chunk = new FullDataChunkMessage(bufferId, dtoBuffer.slice(offset, Math.min(bytesLeft, chunkSize))); + FullDataChunkMessage chunk = new FullDataChunkMessage(bufferId, chunkNum == 0, dtoBuffer.slice(offset, actualChunkSize)); chunkMessageConsumer.accept(chunk); } } + + default T retain() + { + this.getDtoBuffer().retain(); + //noinspection unchecked + return (T)this; + } + + default void close() + { + this.getDtoBuffer().release(); + } }