Use FullDataPayload instead of reusing messages
This commit is contained in:
@@ -139,7 +139,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
|
||||
{
|
||||
try
|
||||
{
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(msg);
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(msg.payload);
|
||||
|
||||
if (!msg.isSameLevelAs(this.levelWrapper))
|
||||
{
|
||||
|
||||
@@ -28,6 +28,7 @@ import com.seibel.distanthorizons.core.multiplayer.server.RemotePlayerConnection
|
||||
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
|
||||
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
|
||||
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
|
||||
@@ -173,9 +174,9 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
{
|
||||
rateLimiterSet.loginDataSyncRCLimiter.release();
|
||||
|
||||
FullDataSourceResponseMessage responseMessage = new FullDataSourceResponseMessage(fullDataSource);
|
||||
responseMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(responseMessage);
|
||||
FullDataPayload payload = new FullDataPayload(fullDataSource);
|
||||
payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(new FullDataSourceResponseMessage(payload));
|
||||
}, executor);
|
||||
}
|
||||
}));
|
||||
@@ -289,7 +290,7 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
}
|
||||
CompletableFuture.runAsync(() ->
|
||||
{
|
||||
try (FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource))
|
||||
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource))
|
||||
{
|
||||
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
|
||||
{
|
||||
@@ -302,8 +303,8 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
}
|
||||
|
||||
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
|
||||
response.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(response.retain());
|
||||
payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(new FullDataSourceResponseMessage(payload.retain()));
|
||||
}
|
||||
}
|
||||
}, executor);
|
||||
@@ -326,7 +327,7 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
}
|
||||
CompletableFuture.runAsync(() ->
|
||||
{
|
||||
try (FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data))
|
||||
try (FullDataPayload payload = new FullDataPayload(data))
|
||||
{
|
||||
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
|
||||
{
|
||||
@@ -345,8 +346,8 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
|
||||
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
|
||||
{
|
||||
updateMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage);
|
||||
serverPlayerState.session.sendMessage(updateMessage.retain());
|
||||
payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage);
|
||||
serverPlayerState.session.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload.retain()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+2
-2
@@ -190,9 +190,9 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
|
||||
throw throwable;
|
||||
}
|
||||
|
||||
if (response.dtoBufferId != null)
|
||||
if (response.payload != null)
|
||||
{
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response);
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response.payload);
|
||||
FullDataSourceV2 fullDataSource = dataSourceDto.createPooledDataSource(this.level.getLevelWrapper());
|
||||
entry.chunkDataConsumer.accept(fullDataSource);
|
||||
FullDataSourceV2.DATA_SOURCE_POOL.returnPooledDataSource(fullDataSource);
|
||||
|
||||
+3
-3
@@ -10,7 +10,7 @@ import com.seibel.distanthorizons.core.network.event.ScopedNetworkEventSource;
|
||||
import com.seibel.distanthorizons.core.network.event.CloseEvent;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.RemotePlayerConfigMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataChunkMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.IFullDataPayloadMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.network.session.Session;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
@@ -86,9 +86,9 @@ public class ClientNetworkState implements Closeable
|
||||
});
|
||||
}
|
||||
|
||||
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(IFullDataPayloadMessage<?> msg)
|
||||
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg)
|
||||
{
|
||||
CompositeByteBuf composite = this.fullDataBuffers.remove(msg.getDtoBufferId());
|
||||
CompositeByteBuf composite = this.fullDataBuffers.remove(msg.dtoBufferId);
|
||||
Objects.requireNonNull(composite);
|
||||
|
||||
try
|
||||
|
||||
+8
-24
@@ -20,40 +20,26 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.NetworkMessage;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage, IFullDataPayloadMessage<FullDataPartialUpdateMessage>
|
||||
public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage
|
||||
{
|
||||
private String levelName;
|
||||
@Override
|
||||
public String getLevelName() { return this.levelName; }
|
||||
|
||||
@Nullable
|
||||
public Integer dtoBufferId;
|
||||
@Override
|
||||
public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); }
|
||||
@Override
|
||||
public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; }
|
||||
|
||||
public ByteBuf dtoBuffer;
|
||||
@Override
|
||||
public ByteBuf getDtoBuffer() { return this.dtoBuffer; }
|
||||
@Override
|
||||
public void setDtoBuffer(ByteBuf buffer) { this.dtoBuffer = buffer; }
|
||||
public FullDataPayload payload;
|
||||
|
||||
|
||||
public FullDataPartialUpdateMessage() { }
|
||||
public FullDataPartialUpdateMessage(IServerLevelWrapper level, FullDataSourceV2 fullDataSource)
|
||||
public FullDataPartialUpdateMessage(IServerLevelWrapper level, FullDataPayload payload)
|
||||
{
|
||||
this.levelName = level.getKeyedLevelDimensionName();
|
||||
this.createCompressedDtoBuffer(fullDataSource);
|
||||
this.payload = payload;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -63,15 +49,14 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
|
||||
public void encode(ByteBuf out)
|
||||
{
|
||||
this.writeString(this.levelName, out);
|
||||
out.writeInt(Objects.requireNonNull(this.dtoBufferId));
|
||||
this.dtoBuffer.release();
|
||||
this.payload.encode(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ByteBuf in)
|
||||
{
|
||||
this.levelName = this.readString(in);
|
||||
this.dtoBufferId = in.readInt();
|
||||
this.payload = INetworkObject.decodeToInstance(new FullDataPayload(), in);
|
||||
}
|
||||
|
||||
|
||||
@@ -80,8 +65,7 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
|
||||
{
|
||||
return super.toStringHelper()
|
||||
.add("levelName", this.levelName)
|
||||
.add("dtoBufferId", this.dtoBufferId)
|
||||
.add("dtoBuffer", this.dtoBuffer);
|
||||
.add("payload", this.payload);
|
||||
}
|
||||
|
||||
}
|
||||
+103
@@ -0,0 +1,103 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
||||
public class FullDataPayload implements INetworkObject, Closeable
|
||||
{
|
||||
private static final AtomicInteger lastBufferId = new AtomicInteger();
|
||||
|
||||
public int dtoBufferId;
|
||||
public ByteBuf dtoBuffer;
|
||||
|
||||
|
||||
public FullDataPayload() { }
|
||||
public FullDataPayload(@NotNull FullDataSourceV2 fullDataSource)
|
||||
{
|
||||
Objects.requireNonNull(fullDataSource);
|
||||
|
||||
this.dtoBufferId = lastBufferId.getAndIncrement();
|
||||
|
||||
try
|
||||
{
|
||||
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
|
||||
FullDataSourceV2DTO dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode);
|
||||
|
||||
this.dtoBuffer = ByteBufAllocator.DEFAULT.buffer();
|
||||
dataSourceDto.encode(this.dtoBuffer);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void acceptInChunkMessages(int chunkSize, Consumer<FullDataChunkMessage> chunkMessageConsumer)
|
||||
{
|
||||
for (int chunkNum = 0; ; chunkNum++)
|
||||
{
|
||||
int offset = chunkNum * chunkSize;
|
||||
|
||||
int actualChunkSize = Math.min(this.dtoBuffer.writerIndex() - offset, chunkSize);
|
||||
if (actualChunkSize <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
FullDataChunkMessage chunk = new FullDataChunkMessage(this.dtoBufferId, chunkNum == 0, this.dtoBuffer.slice(offset, actualChunkSize));
|
||||
chunkMessageConsumer.accept(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public FullDataPayload retain()
|
||||
{
|
||||
this.dtoBuffer.retain();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override public void close()
|
||||
{
|
||||
this.dtoBuffer.release();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void encode(ByteBuf out)
|
||||
{
|
||||
out.writeInt(this.dtoBufferId);
|
||||
this.dtoBuffer.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ByteBuf in)
|
||||
{
|
||||
this.dtoBufferId = in.readInt();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("dtoBufferId", this.dtoBufferId)
|
||||
.add("dtoBuffer", this.dtoBuffer)
|
||||
.toString();
|
||||
}
|
||||
|
||||
}
|
||||
+10
-24
@@ -20,56 +20,43 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.messages.TrackableMessage;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Response message, containing the requested full data source,
|
||||
* or nothing if requested in updates-only mode and the data was not updated.
|
||||
*/
|
||||
public class FullDataSourceResponseMessage extends TrackableMessage implements IFullDataPayloadMessage<FullDataSourceResponseMessage>
|
||||
public class FullDataSourceResponseMessage extends TrackableMessage
|
||||
{
|
||||
@Nullable
|
||||
public Integer dtoBufferId;
|
||||
@Override
|
||||
public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); }
|
||||
@Override
|
||||
public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; }
|
||||
|
||||
public ByteBuf dtoBuffer;
|
||||
@Override
|
||||
public ByteBuf getDtoBuffer() { return this.dtoBuffer; }
|
||||
@Override
|
||||
public void setDtoBuffer(ByteBuf buffer) { this.dtoBuffer = buffer; }
|
||||
public FullDataPayload payload;
|
||||
|
||||
|
||||
public FullDataSourceResponseMessage() { }
|
||||
public FullDataSourceResponseMessage(@Nullable FullDataSourceV2 fullDataSource)
|
||||
public FullDataSourceResponseMessage(@Nullable FullDataPayload payload)
|
||||
{
|
||||
if (fullDataSource != null)
|
||||
if (payload != null)
|
||||
{
|
||||
this.createCompressedDtoBuffer(fullDataSource);
|
||||
this.payload = payload;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode0(ByteBuf out)
|
||||
{
|
||||
if (this.writeOptional(out, this.dtoBufferId))
|
||||
if (this.writeOptional(out, this.payload))
|
||||
{
|
||||
out.writeInt(this.dtoBufferId);
|
||||
this.dtoBuffer.release();
|
||||
this.payload.encode(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode0(ByteBuf in)
|
||||
{
|
||||
this.dtoBufferId = this.readOptional(in, in::readInt);
|
||||
this.payload = this.readOptional(in, () -> INetworkObject.decodeToInstance(new FullDataPayload(), in));
|
||||
}
|
||||
|
||||
|
||||
@@ -77,8 +64,7 @@ public class FullDataSourceResponseMessage extends TrackableMessage implements I
|
||||
public MoreObjects.ToStringHelper toStringHelper()
|
||||
{
|
||||
return super.toStringHelper()
|
||||
.add("dtoBufferId", this.dtoBufferId)
|
||||
.add("dtoBuffer", this.dtoBuffer);
|
||||
.add("payload", this.payload);
|
||||
}
|
||||
|
||||
}
|
||||
-82
@@ -1,82 +0,0 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
||||
public interface IFullDataPayloadMessage<T extends IFullDataPayloadMessage<T>> extends Closeable
|
||||
{
|
||||
AtomicInteger lastBufferId = new AtomicInteger();
|
||||
|
||||
int getDtoBufferId();
|
||||
void setDtoBufferId(int bufferId);
|
||||
|
||||
ByteBuf getDtoBuffer();
|
||||
void setDtoBuffer(ByteBuf buffer);
|
||||
|
||||
|
||||
default void createCompressedDtoBuffer(FullDataSourceV2 fullDataSource)
|
||||
{
|
||||
Objects.requireNonNull(fullDataSource);
|
||||
|
||||
int bufferId = lastBufferId.getAndIncrement();
|
||||
this.setDtoBufferId(bufferId);
|
||||
|
||||
try
|
||||
{
|
||||
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
|
||||
FullDataSourceV2DTO dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode);
|
||||
|
||||
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
|
||||
dataSourceDto.encode(buffer);
|
||||
this.setDtoBuffer(buffer);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
default void splitIntoChunks(int chunkSize, Consumer<FullDataChunkMessage> chunkMessageConsumer)
|
||||
{
|
||||
int bufferId = this.getDtoBufferId();
|
||||
ByteBuf dtoBuffer = this.getDtoBuffer();
|
||||
|
||||
for (int chunkNum = 0; ; chunkNum++)
|
||||
{
|
||||
int offset = chunkNum * chunkSize;
|
||||
|
||||
int actualChunkSize = Math.min(dtoBuffer.writerIndex() - offset, chunkSize);
|
||||
if (actualChunkSize <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
FullDataChunkMessage chunk = new FullDataChunkMessage(bufferId, chunkNum == 0, dtoBuffer.slice(offset, actualChunkSize));
|
||||
chunkMessageConsumer.accept(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
default T retain()
|
||||
{
|
||||
this.getDtoBuffer().retain();
|
||||
//noinspection unchecked
|
||||
return (T)this;
|
||||
}
|
||||
|
||||
default void close()
|
||||
{
|
||||
this.getDtoBuffer().release();
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user