Server-side logic is pretty much finished
This commit is contained in:
@@ -57,6 +57,7 @@ import java.util.function.Consumer;
|
||||
public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
{
|
||||
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
|
||||
public static final int FULL_DATA_CHUNK_SIZE = 32766;
|
||||
|
||||
public final ServerLevelModule serverside;
|
||||
private final IServerLevelWrapper serverLevelWrapper;
|
||||
@@ -164,7 +165,10 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
this.serverside.fullDataFileHandler.getAsync(msg.sectionPos).thenAcceptAsync(fullDataSource ->
|
||||
{
|
||||
rateLimiterSet.loginDataSyncRCLimiter.release();
|
||||
msg.sendResponse(new FullDataSourceResponseMessage(fullDataSource));
|
||||
|
||||
FullDataSourceResponseMessage responseMessage = new FullDataSourceResponseMessage(fullDataSource);
|
||||
responseMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(responseMessage);
|
||||
}, executor);
|
||||
}
|
||||
}));
|
||||
@@ -287,6 +291,7 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
}
|
||||
|
||||
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
|
||||
response.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(response);
|
||||
}
|
||||
}, executor);
|
||||
@@ -327,6 +332,7 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
|
||||
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
|
||||
{
|
||||
updateMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage);
|
||||
serverPlayerState.session.sendMessage(updateMessage);
|
||||
}
|
||||
}
|
||||
|
||||
+2
@@ -23,6 +23,7 @@ import com.google.common.collect.BiMap;
|
||||
import com.google.common.collect.HashBiMap;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.RemotePlayerConfigMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataChunkMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CloseReasonMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.requests.ExceptionMessage;
|
||||
@@ -64,6 +65,7 @@ public class MessageRegistry
|
||||
this.registerMessage(FullDataSourceRequestMessage.class, FullDataSourceRequestMessage::new);
|
||||
this.registerMessage(FullDataSourceResponseMessage.class, FullDataSourceResponseMessage::new);
|
||||
this.registerMessage(FullDataPartialUpdateMessage.class, FullDataPartialUpdateMessage::new);
|
||||
this.registerMessage(FullDataChunkMessage.class, FullDataChunkMessage::new);
|
||||
}
|
||||
|
||||
|
||||
|
||||
+10
-13
@@ -20,26 +20,20 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.NetworkMessage;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class FullDataChunkMessage extends NetworkMessage
|
||||
{
|
||||
public int bufferId;
|
||||
public ByteBuf buffer;
|
||||
|
||||
|
||||
public FullDataChunkMessage() { }
|
||||
public FullDataChunkMessage(ByteBuf buffer)
|
||||
public FullDataChunkMessage(int bufferId, ByteBuf buffer)
|
||||
{
|
||||
this.bufferId = bufferId;
|
||||
this.buffer = buffer;
|
||||
}
|
||||
|
||||
|
||||
@@ -49,6 +43,8 @@ public class FullDataChunkMessage extends NetworkMessage
|
||||
@Override
|
||||
public void encode(ByteBuf out)
|
||||
{
|
||||
out.writeInt(this.bufferId);
|
||||
|
||||
out.writeInt(this.buffer.writerIndex());
|
||||
this.buffer.resetReaderIndex();
|
||||
out.writeBytes(this.buffer);
|
||||
@@ -57,7 +53,8 @@ public class FullDataChunkMessage extends NetworkMessage
|
||||
@Override
|
||||
public void decode(ByteBuf in)
|
||||
{
|
||||
this.buffer = in.readBytes(in.readInt());
|
||||
int bufferSize = in.readInt();
|
||||
this.buffer = in.readBytes(bufferSize);
|
||||
}
|
||||
|
||||
|
||||
@@ -65,8 +62,8 @@ public class FullDataChunkMessage extends NetworkMessage
|
||||
public MoreObjects.ToStringHelper toStringHelper()
|
||||
{
|
||||
return super.toStringHelper()
|
||||
.add("levelName", this.levelName)
|
||||
.add("dataSourceDto", this.dataSourceDto);
|
||||
.add("bufferId", this.bufferId)
|
||||
.add("buffer", this.buffer);
|
||||
}
|
||||
|
||||
}
|
||||
+19
-20
@@ -20,17 +20,13 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.NetworkMessage;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage, IFullDataPayloadMessage
|
||||
@@ -39,24 +35,25 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
|
||||
@Override
|
||||
public String getLevelName() { return this.levelName; }
|
||||
|
||||
public FullDataSourceV2DTO dataSourceDto;
|
||||
@Override public FullDataSourceV2DTO getDataSourceDto() { return Objects.requireNonNull(this.dataSourceDto); }
|
||||
@Nullable
|
||||
public Integer dtoBufferId;
|
||||
@Override @Nullable
|
||||
public Integer getDtoBufferId() { return this.dtoBufferId; }
|
||||
@Override
|
||||
public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; }
|
||||
|
||||
public ByteBuf dtoBuffer;
|
||||
@Override
|
||||
public ByteBuf getDtoBuffer() { return this.dtoBuffer; }
|
||||
@Override
|
||||
public void setDtoBuffer(ByteBuf buffer) { this.dtoBuffer = buffer; }
|
||||
|
||||
|
||||
public FullDataPartialUpdateMessage() { }
|
||||
public FullDataPartialUpdateMessage(IServerLevelWrapper level, FullDataSourceV2 fullDataSource)
|
||||
{
|
||||
this.levelName = level.getKeyedLevelDimensionName();
|
||||
|
||||
try
|
||||
{
|
||||
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
|
||||
this.dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.createCompressedDtoBuffer(fullDataSource);
|
||||
}
|
||||
|
||||
|
||||
@@ -67,14 +64,15 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
|
||||
public void encode(ByteBuf out)
|
||||
{
|
||||
this.writeString(this.levelName, out);
|
||||
// dataSourceDto must be sent separately
|
||||
out.writeInt(Objects.requireNonNull(this.dtoBufferId));
|
||||
this.dtoBuffer.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ByteBuf in)
|
||||
{
|
||||
this.levelName = this.readString(in);
|
||||
// dataSourceDto is received separately
|
||||
this.dtoBufferId = in.readInt();
|
||||
}
|
||||
|
||||
|
||||
@@ -83,7 +81,8 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
|
||||
{
|
||||
return super.toStringHelper()
|
||||
.add("levelName", this.levelName)
|
||||
.add("dataSourceDto", this.dataSourceDto);
|
||||
.add("dtoBufferId", this.dtoBufferId)
|
||||
.add("dtoBuffer", this.dtoBuffer);
|
||||
}
|
||||
|
||||
}
|
||||
+20
-23
@@ -20,18 +20,11 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.network.messages.TrackableMessage;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Response message, containing the requested full data source,
|
||||
* or nothing if requested in updates-only mode and the data was not updated.
|
||||
@@ -39,39 +32,42 @@ import java.util.Objects;
|
||||
public class FullDataSourceResponseMessage extends TrackableMessage implements IFullDataPayloadMessage
|
||||
{
|
||||
@Nullable
|
||||
public FullDataSourceV2DTO dataSourceDto;
|
||||
@Override public FullDataSourceV2DTO getDataSourceDto() { return Objects.requireNonNull(this.dataSourceDto); }
|
||||
public Integer dtoBufferId;
|
||||
@Override @Nullable
|
||||
public Integer getDtoBufferId() { return this.dtoBufferId; }
|
||||
@Override
|
||||
public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; }
|
||||
|
||||
public ByteBuf dtoBuffer;
|
||||
@Override
|
||||
public ByteBuf getDtoBuffer() { return this.dtoBuffer; }
|
||||
@Override
|
||||
public void setDtoBuffer(ByteBuf buffer) { this.dtoBuffer = buffer; }
|
||||
|
||||
|
||||
public FullDataSourceResponseMessage() { }
|
||||
public FullDataSourceResponseMessage(@Nullable FullDataSourceV2 fullDataSource)
|
||||
{
|
||||
try
|
||||
if (fullDataSource != null)
|
||||
{
|
||||
if (fullDataSource != null)
|
||||
{
|
||||
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
|
||||
this.dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode);
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
this.createCompressedDtoBuffer(fullDataSource);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode0(ByteBuf out)
|
||||
{
|
||||
if (this.writeOptional(out, this.dataSourceDto))
|
||||
if (this.writeOptional(out, this.dtoBufferId))
|
||||
{
|
||||
this.dataSourceDto.encode(out);
|
||||
out.writeInt(this.dtoBufferId);
|
||||
this.dtoBuffer.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode0(ByteBuf in)
|
||||
{
|
||||
this.dataSourceDto = this.readOptional(in, () -> INetworkObject.decodeToInstance(new FullDataSourceV2DTO(), in));
|
||||
this.dtoBufferId = this.readOptional(in, in::readInt);
|
||||
}
|
||||
|
||||
|
||||
@@ -79,7 +75,8 @@ public class FullDataSourceResponseMessage extends TrackableMessage implements I
|
||||
public MoreObjects.ToStringHelper toStringHelper()
|
||||
{
|
||||
return super.toStringHelper()
|
||||
.add("dataSourceDto", this.dataSourceDto);
|
||||
.add("dtoBufferId", this.dtoBufferId)
|
||||
.add("dtoBuffer", this.dtoBuffer);
|
||||
}
|
||||
|
||||
}
|
||||
+49
-16
@@ -1,36 +1,69 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface IFullDataPayloadMessage
|
||||
{
|
||||
FullDataSourceV2DTO getDataSourceDto();
|
||||
AtomicInteger lastBufferId = new AtomicInteger();
|
||||
@Nullable
|
||||
Integer getDtoBufferId();
|
||||
void setDtoBufferId(int bufferId);
|
||||
|
||||
default List<ByteBuf> getDataSourceDtoChunks(int chunkSize)
|
||||
ByteBuf getDtoBuffer();
|
||||
void setDtoBuffer(ByteBuf buffer);
|
||||
|
||||
|
||||
default void createCompressedDtoBuffer(FullDataSourceV2 fullDataSource)
|
||||
{
|
||||
FullDataSourceV2DTO dto = this.getDataSourceDto();
|
||||
int chunkCount = dto.estimatedEncodedSize() / chunkSize;
|
||||
Objects.requireNonNull(fullDataSource);
|
||||
|
||||
CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();
|
||||
int bufferId = lastBufferId.getAndIncrement();
|
||||
this.setDtoBufferId(bufferId);
|
||||
|
||||
ArrayList<ByteBuf> result = new ArrayList<>();
|
||||
for (int i = 0; i < chunkCount; i++)
|
||||
try
|
||||
{
|
||||
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
|
||||
FullDataSourceV2DTO dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode);
|
||||
|
||||
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
|
||||
result.add(buffer);
|
||||
composite.addComponent(buffer);
|
||||
dataSourceDto.encode(buffer);
|
||||
this.setDtoBuffer(buffer);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
default void splitIntoChunks(int chunkSize, Consumer<FullDataChunkMessage> chunkMessageConsumer)
|
||||
{
|
||||
ByteBuf dtoBuffer = this.getDtoBuffer();
|
||||
int bufferId = Objects.requireNonNull(this.getDtoBufferId());
|
||||
|
||||
dto.encode(composite);
|
||||
|
||||
composite.release();
|
||||
return result;
|
||||
for (int chunkNum = 0; ; chunkNum++)
|
||||
{
|
||||
int offset = chunkNum * chunkSize;
|
||||
int bytesLeft = dtoBuffer.writerIndex() - offset;
|
||||
|
||||
if (offset >= dtoBuffer.writerIndex())
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
FullDataChunkMessage chunk = new FullDataChunkMessage(bufferId, dtoBuffer.slice(offset, Math.min(bytesLeft, chunkSize)));
|
||||
chunkMessageConsumer.accept(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user