Client-side chunk handling
This commit is contained in:
@@ -36,6 +36,7 @@ import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartial
|
||||
import com.seibel.distanthorizons.core.pos.DhBlockPos;
|
||||
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
|
||||
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.block.IBlockStateWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IProfilerWrapper;
|
||||
@@ -128,17 +129,20 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
|
||||
private void registerNetworkHandlers()
|
||||
{
|
||||
assert this.eventSource != null;
|
||||
assert this.networkState != null;
|
||||
|
||||
this.eventSource.registerHandler(FullDataPartialUpdateMessage.class, msg ->
|
||||
{
|
||||
try
|
||||
{
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(msg);
|
||||
|
||||
if (!msg.isSameLevelAs(this.levelWrapper))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
this.updateDataSourcesAsync(msg.dataSourceDto.createPooledDataSource(this.levelWrapper));
|
||||
this.updateDataSourcesAsync(dataSourceDto.createPooledDataSource(this.levelWrapper));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
||||
@@ -279,20 +279,22 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
}
|
||||
CompletableFuture.runAsync(() ->
|
||||
{
|
||||
FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource);
|
||||
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
|
||||
try (FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource))
|
||||
{
|
||||
this.requestGroupsByFutureId.remove(msg.futureId);
|
||||
|
||||
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer());
|
||||
if (serverPlayerState == null)
|
||||
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
|
||||
{
|
||||
continue;
|
||||
this.requestGroupsByFutureId.remove(msg.futureId);
|
||||
|
||||
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer());
|
||||
if (serverPlayerState == null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
|
||||
response.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(response.retain());
|
||||
}
|
||||
|
||||
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
|
||||
response.splitIntoChunks(FULL_DATA_CHUNK_SIZE, msg.session::sendMessage);
|
||||
msg.sendResponse(response);
|
||||
}
|
||||
}, executor);
|
||||
}
|
||||
@@ -314,26 +316,28 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
|
||||
}
|
||||
CompletableFuture.runAsync(() ->
|
||||
{
|
||||
FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data);
|
||||
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
|
||||
try (FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data))
|
||||
{
|
||||
if (serverPlayerState.serverPlayer().getLevel() != this.serverLevelWrapper)
|
||||
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!serverPlayerState.config.isRealTimeUpdatesEnabled())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition();
|
||||
int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
|
||||
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
|
||||
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
|
||||
{
|
||||
updateMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage);
|
||||
serverPlayerState.session.sendMessage(updateMessage);
|
||||
if (serverPlayerState.serverPlayer().getLevel() != this.serverLevelWrapper)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!serverPlayerState.config.isRealTimeUpdatesEnabled())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition();
|
||||
int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
|
||||
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
|
||||
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
|
||||
{
|
||||
updateMessage.splitIntoChunks(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage);
|
||||
serverPlayerState.session.sendMessage(updateMessage.retain());
|
||||
}
|
||||
}
|
||||
}
|
||||
}, executor);
|
||||
|
||||
+4
-2
@@ -17,6 +17,7 @@ import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
|
||||
import com.seibel.distanthorizons.core.pos.DhSectionPos;
|
||||
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
|
||||
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import com.seibel.distanthorizons.core.util.LodUtil;
|
||||
import com.seibel.distanthorizons.core.util.ratelimiting.SupplierBasedRateLimiter;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
@@ -189,9 +190,10 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
|
||||
throw throwable;
|
||||
}
|
||||
|
||||
if (response.dataSourceDto != null)
|
||||
if (response.dtoBufferId != null)
|
||||
{
|
||||
FullDataSourceV2 fullDataSource = response.dataSourceDto.createPooledDataSource(this.level.getLevelWrapper());
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response);
|
||||
FullDataSourceV2 fullDataSource = dataSourceDto.createPooledDataSource(this.level.getLevelWrapper());
|
||||
entry.chunkDataConsumer.accept(fullDataSource);
|
||||
FullDataSourceV2.DATA_SOURCE_POOL.returnPooledDataSource(fullDataSource);
|
||||
}
|
||||
|
||||
+48
@@ -1,17 +1,27 @@
|
||||
package com.seibel.distanthorizons.core.multiplayer.client;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
|
||||
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig;
|
||||
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfigChangeListener;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.event.ScopedNetworkEventSource;
|
||||
import com.seibel.distanthorizons.core.network.event.CloseEvent;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.RemotePlayerConfigMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataChunkMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.IFullDataPayloadMessage;
|
||||
import com.seibel.distanthorizons.core.network.session.Session;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ClientNetworkState implements Closeable
|
||||
{
|
||||
@@ -33,6 +43,12 @@ public class ClientNetworkState implements Closeable
|
||||
*/
|
||||
public Session getSession() { return this.session; }
|
||||
|
||||
private final ConcurrentMap<Integer, CompositeByteBuf> fullDataBuffers = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(10, TimeUnit.SECONDS)
|
||||
.<Integer, CompositeByteBuf>build()
|
||||
.asMap();
|
||||
|
||||
|
||||
/**
|
||||
* Constructs a new instance.
|
||||
*/
|
||||
@@ -51,6 +67,38 @@ public class ClientNetworkState implements Closeable
|
||||
{
|
||||
this.configReceived = false;
|
||||
});
|
||||
|
||||
this.session.registerHandler(FullDataChunkMessage.class, msg ->
|
||||
{
|
||||
if (msg.isFirst)
|
||||
{
|
||||
CompositeByteBuf composite = this.fullDataBuffers.remove(msg.bufferId);
|
||||
if (composite != null)
|
||||
{
|
||||
composite.release();
|
||||
LOGGER.debug("Released full data buffer {}: {}", msg.bufferId, composite);
|
||||
}
|
||||
}
|
||||
|
||||
CompositeByteBuf composite = this.fullDataBuffers.computeIfAbsent(msg.bufferId, bufferId -> ByteBufAllocator.DEFAULT.compositeBuffer());
|
||||
composite.addComponent(true, msg.buffer);
|
||||
LOGGER.debug("Full data buffer {}: {}", msg.bufferId, composite);
|
||||
});
|
||||
}
|
||||
|
||||
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(IFullDataPayloadMessage<?> msg)
|
||||
{
|
||||
CompositeByteBuf composite = this.fullDataBuffers.remove(msg.getDtoBufferId());
|
||||
|
||||
try
|
||||
{
|
||||
Objects.requireNonNull(composite);
|
||||
return INetworkObject.decodeToInstance(new FullDataSourceV2DTO(), composite);
|
||||
}
|
||||
finally
|
||||
{
|
||||
composite.release();
|
||||
}
|
||||
}
|
||||
|
||||
public void sendConfigMessage()
|
||||
|
||||
+12
-4
@@ -27,13 +27,15 @@ public class FullDataChunkMessage extends NetworkMessage
|
||||
{
|
||||
public int bufferId;
|
||||
public ByteBuf buffer;
|
||||
public boolean isFirst;
|
||||
|
||||
|
||||
public FullDataChunkMessage() { }
|
||||
public FullDataChunkMessage(int bufferId, ByteBuf buffer)
|
||||
public FullDataChunkMessage(int bufferId, boolean isFirst, ByteBuf buffer)
|
||||
{
|
||||
this.bufferId = bufferId;
|
||||
this.buffer = buffer;
|
||||
this.isFirst = isFirst;
|
||||
}
|
||||
|
||||
|
||||
@@ -46,15 +48,20 @@ public class FullDataChunkMessage extends NetworkMessage
|
||||
out.writeInt(this.bufferId);
|
||||
|
||||
out.writeInt(this.buffer.writerIndex());
|
||||
this.buffer.resetReaderIndex();
|
||||
out.writeBytes(this.buffer);
|
||||
out.writeBytes(this.buffer.readerIndex(0));
|
||||
|
||||
out.writeBoolean(this.isFirst);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ByteBuf in)
|
||||
{
|
||||
this.bufferId = in.readInt();
|
||||
|
||||
int bufferSize = in.readInt();
|
||||
this.buffer = in.readBytes(bufferSize);
|
||||
|
||||
this.isFirst = in.readBoolean();
|
||||
}
|
||||
|
||||
|
||||
@@ -63,7 +70,8 @@ public class FullDataChunkMessage extends NetworkMessage
|
||||
{
|
||||
return super.toStringHelper()
|
||||
.add("bufferId", this.bufferId)
|
||||
.add("buffer", this.buffer);
|
||||
.add("buffer", this.buffer)
|
||||
.add("isFirst", this.isFirst);
|
||||
}
|
||||
|
||||
}
|
||||
+3
-4
@@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage, IFullDataPayloadMessage
|
||||
public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage, IFullDataPayloadMessage<FullDataPartialUpdateMessage>
|
||||
{
|
||||
private String levelName;
|
||||
@Override
|
||||
@@ -37,8 +37,8 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
|
||||
|
||||
@Nullable
|
||||
public Integer dtoBufferId;
|
||||
@Override @Nullable
|
||||
public Integer getDtoBufferId() { return this.dtoBufferId; }
|
||||
@Override
|
||||
public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); }
|
||||
@Override
|
||||
public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; }
|
||||
|
||||
@@ -56,7 +56,6 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
|
||||
this.createCompressedDtoBuffer(fullDataSource);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean warnWhenUnhandled() { return false; }
|
||||
|
||||
|
||||
+5
-3
@@ -25,16 +25,18 @@ import com.seibel.distanthorizons.core.network.messages.TrackableMessage;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Response message, containing the requested full data source,
|
||||
* or nothing if requested in updates-only mode and the data was not updated.
|
||||
*/
|
||||
public class FullDataSourceResponseMessage extends TrackableMessage implements IFullDataPayloadMessage
|
||||
public class FullDataSourceResponseMessage extends TrackableMessage implements IFullDataPayloadMessage<FullDataSourceResponseMessage>
|
||||
{
|
||||
@Nullable
|
||||
public Integer dtoBufferId;
|
||||
@Override @Nullable
|
||||
public Integer getDtoBufferId() { return this.dtoBufferId; }
|
||||
@Override
|
||||
public int getDtoBufferId() { return Objects.requireNonNull(this.dtoBufferId); }
|
||||
@Override
|
||||
public void setDtoBufferId(int bufferId) { this.dtoBufferId = bufferId; }
|
||||
|
||||
|
||||
+22
-9
@@ -6,18 +6,19 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSour
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface IFullDataPayloadMessage
|
||||
|
||||
public interface IFullDataPayloadMessage<T extends IFullDataPayloadMessage<T>> extends Closeable
|
||||
{
|
||||
AtomicInteger lastBufferId = new AtomicInteger();
|
||||
@Nullable
|
||||
Integer getDtoBufferId();
|
||||
|
||||
int getDtoBufferId();
|
||||
void setDtoBufferId(int bufferId);
|
||||
|
||||
ByteBuf getDtoBuffer();
|
||||
@@ -48,22 +49,34 @@ public interface IFullDataPayloadMessage
|
||||
|
||||
default void splitIntoChunks(int chunkSize, Consumer<FullDataChunkMessage> chunkMessageConsumer)
|
||||
{
|
||||
int bufferId = this.getDtoBufferId();
|
||||
ByteBuf dtoBuffer = this.getDtoBuffer();
|
||||
int bufferId = Objects.requireNonNull(this.getDtoBufferId());
|
||||
|
||||
for (int chunkNum = 0; ; chunkNum++)
|
||||
{
|
||||
int offset = chunkNum * chunkSize;
|
||||
int bytesLeft = dtoBuffer.writerIndex() - offset;
|
||||
|
||||
if (offset >= dtoBuffer.writerIndex())
|
||||
|
||||
int actualChunkSize = Math.min(dtoBuffer.writerIndex() - offset, chunkSize);
|
||||
if (actualChunkSize <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
FullDataChunkMessage chunk = new FullDataChunkMessage(bufferId, dtoBuffer.slice(offset, Math.min(bytesLeft, chunkSize)));
|
||||
FullDataChunkMessage chunk = new FullDataChunkMessage(bufferId, chunkNum == 0, dtoBuffer.slice(offset, actualChunkSize));
|
||||
chunkMessageConsumer.accept(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
default T retain()
|
||||
{
|
||||
this.getDtoBuffer().retain();
|
||||
//noinspection unchecked
|
||||
return (T)this;
|
||||
}
|
||||
|
||||
default void close()
|
||||
{
|
||||
this.getDtoBuffer().release();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user