Add LOD transfer speed setting

This commit is contained in:
s809
2024-10-18 01:08:12 +05:00
parent 59767c807b
commit bea1ff34b4
15 changed files with 302 additions and 136 deletions
@@ -1564,6 +1564,14 @@ public class Config
+ "")
.build();
public static ConfigEntry<Integer> maxDataTransferSpeed = new ConfigEntry.Builder<Integer>()
.setServersideShortName("maxDataTransferSpeed")
.setMinDefaultMax(1, 500, 1000000 /* 1 GB/s */)
.comment(""
+ "Maximum speed for uploading LODs to the clients, in KB/s."
+ "")
.build();
}
@@ -14,7 +14,7 @@ import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage;
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage;
@@ -128,20 +128,23 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
CompletableFuture.runAsync(() ->
{
Objects.requireNonNull(this.beaconBeamRepo);
FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey()));
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey())))
{
this.requestGroupByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
{
continue;
this.requestGroupByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
{
continue;
}
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
msg.sendResponse(new FullDataSourceResponseMessage(payload));
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
});
}
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, msg.getSession()::sendMessage);
msg.sendResponse(new FullDataSourceResponseMessage(payload));
}
}, executor);
}
@@ -273,12 +276,14 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource ->
{
rateLimiterSet.syncOnLoginRateLimiter.release();
Objects.requireNonNull(this.beaconBeamRepo);
FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos));
payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, message.getSession()::sendMessage);
message.sendResponse(new FullDataSourceResponseMessage(payload));
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos)))
{
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
message.sendResponse(new FullDataSourceResponseMessage(payload));
rateLimiterSet.syncOnLoginRateLimiter.release();
});
}
}, executor);
}
private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
@@ -371,6 +376,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos);
if (requestGroup != null)
{
requestGroup.worldGenTaskComplete = true;
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
}
@@ -379,7 +385,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
{
this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource ->
{
if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps))
if (requestGroup.worldGenTaskComplete || this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps))
{
requestGroup.fullDataSource = fullDataSource;
}
@@ -416,26 +422,30 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
CompletableFuture.runAsync(() ->
{
Objects.requireNonNull(this.beaconBeamRepo);
FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos()));
for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers())
try (FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos())))
{
if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper)
for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers())
{
continue;
}
if (!serverPlayerState.sessionConfig.isRealTimeUpdatesEnabled())
{
continue;
}
Vec3d playerPosition = serverPlayerState.getServerPlayer().getPosition();
int distanceFromPlayer = DhSectionPos.getChebyshevBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
if (distanceFromPlayer >= serverPlayerState.getServerPlayer().getViewDistance()
&& distanceFromPlayer <= serverPlayerState.sessionConfig.getMaxUpdateDistanceRadius())
{
payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, serverPlayerState.networkSession::sendMessage);
serverPlayerState.networkSession.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload));
if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper)
{
continue;
}
if (!serverPlayerState.sessionConfig.isRealTimeUpdatesEnabled())
{
continue;
}
Vec3d playerPosition = serverPlayerState.getServerPlayer().getPosition();
int distanceFromPlayer = DhSectionPos.getChebyshevBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
if (distanceFromPlayer >= serverPlayerState.getServerPlayer().getViewDistance()
&& distanceFromPlayer <= serverPlayerState.sessionConfig.getMaxUpdateDistanceRadius())
{
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () ->
{
serverPlayerState.networkSession.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload));
});
}
}
}
}, executor);
@@ -492,8 +502,11 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
{
public final ConcurrentMap<Long, FullDataSourceRequestMessage> requestMessages = new ConcurrentHashMap<>();
/** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */
boolean worldGenTaskComplete = false;
@CheckForNull
public FullDataSourceV2 fullDataSource;
public FullDataSourceV2 fullDataSource = null;
/**
* These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled,
@@ -137,7 +137,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
try
{
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(message.payload);
FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(message.payload);
if (!message.isSameLevelAs(this.levelWrapper))
{
@@ -46,8 +46,6 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
private static final Timer TASK_FINISH_TIMER = TimerUtil.CreateTimer("RequestTaskFinishTimer");
private static final int MAX_RETRY_ATTEMPTS = 3;
protected static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
@@ -193,7 +191,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
if (response.payload != null)
{
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response.payload);
FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(response.payload);
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
@@ -258,17 +256,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
}
}
// Hack to work around a race condition
// If you finish the request too quickly, the section will never render
TASK_FINISH_TIMER.schedule(new TimerTask()
{
@Override
public void run()
{
entry.future.complete(true);
}
}, 10000);
return null;
return entry.future.complete(true);
});
}
@@ -1,11 +1,10 @@
package com.seibel.distanthorizons.core.multiplayer.client;
import com.google.common.cache.CacheBuilder;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.multiplayer.config.SessionConfig;
import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayloadReceiver;
import com.seibel.distanthorizons.core.network.event.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.event.internal.CloseInternalEvent;
import com.seibel.distanthorizons.core.network.event.internal.IncompatibleMessageInternalEvent;
@@ -14,21 +13,14 @@ import com.seibel.distanthorizons.core.network.messages.base.SessionConfigMessag
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.session.NetworkSession;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
public class ClientNetworkState implements Closeable
{
@@ -38,10 +30,7 @@ public class ClientNetworkState implements Closeable
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
private final ConcurrentMap<Integer, CompositeByteBuf> fullDataBufferById = CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.SECONDS)
.<Integer, CompositeByteBuf>build()
.asMap();
public final FullDataPayloadReceiver fullDataPayloadReceiver = new FullDataPayloadReceiver();
private final SessionConfig.AnyChangeListener configAnyChangeListener = new SessionConfig.AnyChangeListener(this::sendConfigMessage);
@@ -120,22 +109,7 @@ public class ClientNetworkState implements Closeable
this.configReceived = true;
});
this.networkSession.registerHandler(FullDataSplitMessage.class, message ->
{
if (message.isFirst)
{
CompositeByteBuf composite = this.fullDataBufferById.remove(message.bufferId);
if (composite != null)
{
composite.release();
LOGGER.debug("Released full data buffer [" + message.bufferId + "]: [" + composite + "]");
}
}
CompositeByteBuf byteBuffer = this.fullDataBufferById.computeIfAbsent(message.bufferId, bufferId -> ByteBufAllocator.DEFAULT.compositeBuffer());
byteBuffer.addComponent(true, message.buffer);
LOGGER.debug("Full data buffer [" + message.bufferId + "]: [" + byteBuffer + "].");
});
this.networkSession.registerHandler(FullDataSplitMessage.class, this.fullDataPayloadReceiver::receiveChunk);
}
}
@@ -145,20 +119,7 @@ public class ClientNetworkState implements Closeable
// send message //
//==============//
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg)
{
CompositeByteBuf compositeByteBuffer = this.fullDataBufferById.remove(msg.dtoBufferId);
LodUtil.assertTrue(compositeByteBuffer != null);
try
{
return INetworkObject.decodeToInstance(FullDataSourceV2DTO.CreateEmptyDataSource(), compositeByteBuffer);
}
finally
{
compositeByteBuffer.release();
}
}
public void sendConfigMessage()
{
@@ -198,6 +159,7 @@ public class ClientNetworkState implements Closeable
@Override
public void close()
{
this.fullDataPayloadReceiver.close();
this.configAnyChangeListener.close();
this.networkSession.close();
}
@@ -40,6 +40,8 @@ public class SessionConfig implements INetworkObject
registerConfigEntry(Config.Server.synchronizeOnLoad, (x, y) -> x && y);
registerConfigEntry(Config.Server.syncOnLoadRateLimit, Math::min);
registerConfigEntry(Config.Server.maxDataTransferSpeed, Math::min);
}
public SessionConfig() {}
@@ -56,6 +58,7 @@ public class SessionConfig implements INetworkObject
public boolean isRealTimeUpdatesEnabled() { return this.getValue(Config.Server.enableRealTimeUpdates); }
public boolean getSynchronizeOnLoad() { return this.getValue(Config.Server.synchronizeOnLoad); }
public int getSyncOnLoginRateLimit() { return this.getValue(Config.Server.syncOnLoadRateLimit); }
public int getMaxDataTransferSpeed() { return this.getValue(Config.Server.maxDataTransferSpeed); }
@@ -1,13 +1,13 @@
package com.seibel.distanthorizons.core.network.messages.fullData;
package com.seibel.distanthorizons.core.multiplayer.fullData;
import com.google.common.base.MoreObjects;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.util.TimerUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.jetbrains.annotations.NotNull;
@@ -20,13 +20,10 @@ import java.util.function.Consumer;
/**
* @see FullDataSplitMessage
*/
public class FullDataPayload implements INetworkObject
public class FullDataPayload implements INetworkObject, AutoCloseable
{
private static final AtomicInteger lastBufferId = new AtomicInteger();
// Reference counting is unreliable here for some reason so this is a "fix"
private static final Timer bufferCleanupTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer");
public int dtoBufferId;
public ByteBuf dtoBuffer;
@@ -52,15 +49,6 @@ public class FullDataPayload implements INetworkObject
this.dtoBuffer = ByteBufAllocator.DEFAULT.buffer();
dataSourceDto.encode(this.dtoBuffer);
bufferCleanupTimer.schedule(new TimerTask()
{
@Override
public void run()
{
FullDataPayload.this.dtoBuffer.release();
}
}, 5000L);
}
catch (IOException e)
{
@@ -90,36 +78,18 @@ public class FullDataPayload implements INetworkObject
this.beaconBeams = this.readCollection(in, new ArrayList<>(), () -> new BeaconBeamDTO(null, null));
}
/**
* Used to send {@link FullDataPayload}'s since the data they contain may be larger
* than what a single packet could contain.
*
* @param payloadChunkSizeInBytes how many bytes can be sent in a single message
*/
public void splitAndSend(int payloadChunkSizeInBytes, Consumer<FullDataSplitMessage> sendMessageConsumer)
{
// chunk in this context means chunk of data, not a MC chunk
for (int payloadChunkNum = 0; ; payloadChunkNum++)
{
int offset = payloadChunkNum * payloadChunkSizeInBytes;
int actualChunkSize = Math.min(this.dtoBuffer.writerIndex() - offset, payloadChunkSizeInBytes);
if (actualChunkSize <= 0)
{
break;
}
FullDataSplitMessage chunk = new FullDataSplitMessage(this.dtoBufferId, payloadChunkNum == 0, this.dtoBuffer.slice(offset, actualChunkSize));
sendMessageConsumer.accept(chunk);
}
}
//================//
// base overrides //
//================//
@Override
public void close()
{
this.dtoBuffer.release();
}
@Override
public String toString()
{
@@ -0,0 +1,80 @@
package com.seibel.distanthorizons.core.multiplayer.fullData;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalNotification;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.util.LodUtil;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import org.apache.logging.log4j.LogManager;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
public class FullDataPayloadReceiver implements AutoCloseable
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Common.Logging.logNetworkEvent.get());
private final ConcurrentMap<Integer, CompositeByteBuf> buffersById = CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Integer, CompositeByteBuf> notification) -> Objects.requireNonNull(notification.getValue()).release())
.build().asMap();
@Override
public void close()
{
this.buffersById.clear();
}
public void receiveChunk(FullDataSplitMessage message)
{
this.buffersById.compute(message.bufferId, (bufferId, composite) ->
{
if (message.isFirst)
{
if (composite != null)
{
composite.release();
LOGGER.debug("Released existing full data buffer [" + message.bufferId + "]");
}
composite = ByteBufAllocator.DEFAULT.compositeBuffer();
LOGGER.debug("Created new full data buffer [" + message.bufferId + "]: [" + composite + "]");
}
else if (composite == null)
{
LOGGER.debug("Received non-first full data chunk for empty buffer [" + message.bufferId + "]: [" + message.buffer + "].");
return null;
}
composite.addComponent(true, message.buffer);
LOGGER.debug("Updated full data buffer [" + message.bufferId + "]: [" + composite + "].");
return composite;
});
}
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg)
{
CompositeByteBuf compositeByteBuffer = this.buffersById.get(msg.dtoBufferId);
LodUtil.assertTrue(compositeByteBuffer != null);
try
{
return INetworkObject.decodeToInstance(FullDataSourceV2DTO.CreateEmptyDataSource(), compositeByteBuffer);
}
finally
{
// Releasing the buffer is handled by cache
this.buffersById.remove(msg.dtoBufferId);
}
}
}
@@ -0,0 +1,104 @@
package com.seibel.distanthorizons.core.multiplayer.fullData;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
import com.seibel.distanthorizons.core.network.session.NetworkSession;
import com.seibel.distanthorizons.core.util.TimerUtil;
import io.netty.buffer.ByteBuf;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
public class FullDataPayloadSender implements AutoCloseable
{
private static final int TICK_RATE = 4;
private static final Timer UPLOAD_TIMER = TimerUtil.CreateTimer("FullDataPayloadSender");
private final TimerTask tickTimerTask = TimerUtil.createTimerTask(this::tick);
private final NetworkSession session;
private final IntSupplier maxKBpsSupplier;
private final ConcurrentLinkedQueue<PendingTransfer> transferQueue = new ConcurrentLinkedQueue<>();
public FullDataPayloadSender(NetworkSession session, IntSupplier maxKBpsSupplier)
{
this.session = session;
this.maxKBpsSupplier = maxKBpsSupplier;
UPLOAD_TIMER.scheduleAtFixedRate(this.tickTimerTask, 0, 1000 / TICK_RATE);
}
@Override
public void close()
{
this.tickTimerTask.cancel();
PendingTransfer pendingTransfer;
while ((pendingTransfer = this.transferQueue.poll()) != null)
{
pendingTransfer.close();
}
}
public void sendInChunks(FullDataPayload payload, Runnable sendFinalMessage)
{
this.transferQueue.add(new PendingTransfer(payload, sendFinalMessage));
}
private void tick()
{
int bytesToSend = (this.maxKBpsSupplier.getAsInt() * 1000) / TICK_RATE;
while (bytesToSend > 0)
{
PendingTransfer pendingTransfer = this.transferQueue.peek();
if (pendingTransfer == null)
{
return;
}
int chunkSize = Math.min(bytesToSend, pendingTransfer.buffer.readableBytes());
boolean isFirstChunk = pendingTransfer.buffer.readerIndex() == 0;
FullDataSplitMessage chunkMessage = new FullDataSplitMessage(pendingTransfer.bufferId, pendingTransfer.buffer.readRetainedSlice(chunkSize), isFirstChunk);
this.session.sendMessage(chunkMessage);
bytesToSend -= chunkSize;
if (pendingTransfer.buffer.readableBytes() == 0)
{
pendingTransfer.sendFinalMessage.run();
pendingTransfer.close();
this.transferQueue.poll();
}
}
}
private static class PendingTransfer implements AutoCloseable
{
public final int bufferId;
public final ByteBuf buffer;
public final Runnable sendFinalMessage;
private final AtomicBoolean isClosed = new AtomicBoolean();
private PendingTransfer(FullDataPayload payload, Runnable sendFinalMessage)
{
this.bufferId = payload.dtoBufferId;
this.buffer = payload.dtoBuffer.retainedDuplicate().readerIndex(0);
this.sendFinalMessage = sendFinalMessage;
}
@Override
public void close()
{
if (this.isClosed.compareAndSet(false, true))
{
this.buffer.release();
}
}
}
}
@@ -4,6 +4,7 @@ import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.level.AbstractDhServerLevel;
import com.seibel.distanthorizons.core.multiplayer.config.SessionConfig;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayloadSender;
import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage;
import com.seibel.distanthorizons.core.network.messages.base.SessionConfigMessage;
import com.seibel.distanthorizons.core.network.event.internal.CloseInternalEvent;
@@ -34,12 +35,13 @@ public class ServerPlayerState implements Closeable
public final SessionConfig sessionConfig = new SessionConfig();
public boolean isReady() { return this.sessionConfig.constrainingConfig != null; }
public final FullDataPayloadSender fullDataPayloadSender;
private final ConcurrentHashMap<AbstractDhServerLevel, RateLimiterSet> rateLimiterSets = new ConcurrentHashMap<>();
public RateLimiterSet getRateLimiterSet(AbstractDhServerLevel level) { return this.rateLimiterSets.computeIfAbsent(level, ignored -> new RateLimiterSet()); }
public void clearRateLimiterSets() { this.rateLimiterSets.clear(); }
//==============//
// constructors //
//==============//
@@ -47,6 +49,7 @@ public class ServerPlayerState implements Closeable
public ServerPlayerState(IServerPlayerWrapper serverPlayer)
{
this.networkSession = new NetworkSession(serverPlayer);
this.fullDataPayloadSender = new FullDataPayloadSender(this.networkSession, this.sessionConfig::getMaxDataTransferSpeed);
this.networkSession.registerHandler(SessionConfigMessage.class, (sessionConfigMessage) ->
{
@@ -92,6 +95,7 @@ public class ServerPlayerState implements Closeable
@Override
public void close()
{
this.fullDataPayloadSender.close();
this.levelKeyPrefixChangeListener.close();
this.configAnyChangeListener.close();
this.networkSession.close();
@@ -20,6 +20,7 @@
package com.seibel.distanthorizons.core.network.messages.fullData;
import com.google.common.base.MoreObjects;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
@@ -20,6 +20,7 @@
package com.seibel.distanthorizons.core.network.messages.fullData;
import com.google.common.base.MoreObjects;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage;
import io.netty.buffer.ByteBuf;
@@ -20,9 +20,13 @@
package com.seibel.distanthorizons.core.network.messages.fullData;
import com.google.common.base.MoreObjects;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
import com.seibel.distanthorizons.core.util.TimerUtil;
import io.netty.buffer.ByteBuf;
import java.util.Timer;
/**
* Used to send part of a {@link FullDataPayload}.
*
@@ -30,10 +34,15 @@ import io.netty.buffer.ByteBuf;
*/
public class FullDataSplitMessage extends AbstractNetworkMessage
{
private static final long BUFFER_RELEASE_DELAY_MS = 5000L;
public int bufferId;
public ByteBuf buffer;
public boolean isFirst;
// Reference counting is unreliable here for some reason so this is a "fix"
private static final Timer bufferReleaseTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer");
private boolean releaseScheduled = false;
//==============//
@@ -41,7 +50,7 @@ public class FullDataSplitMessage extends AbstractNetworkMessage
//==============//
public FullDataSplitMessage() { }
public FullDataSplitMessage(int bufferId, boolean isFirst, ByteBuf buffer)
public FullDataSplitMessage(int bufferId, ByteBuf buffer, boolean isFirst)
{
this.bufferId = bufferId;
this.buffer = buffer;
@@ -63,6 +72,12 @@ public class FullDataSplitMessage extends AbstractNetworkMessage
out.writeBytes(this.buffer.readerIndex(0));
out.writeBoolean(this.isFirst);
if (!this.releaseScheduled)
{
bufferReleaseTimer.schedule(TimerUtil.createTimerTask(this.buffer::release), BUFFER_RELEASE_DELAY_MS);
this.releaseScheduled = true;
}
}
@Override
@@ -1,6 +1,7 @@
package com.seibel.distanthorizons.core.util;
import java.util.Timer;
import java.util.TimerTask;
/**
* Handles creating timers.
@@ -17,4 +18,16 @@ public class TimerUtil
return new Timer(ThreadUtil.THREAD_NAME_PREFIX+timerName, true);
}
public static TimerTask createTimerTask(Runnable runMethod)
{
return new TimerTask()
{
@Override
public void run()
{
runMethod.run();
}
};
}
}
@@ -687,6 +687,10 @@
"Rate Limit for Sync on Load",
"distanthorizons.config.server.syncOnLoadRateLimit.@tooltip":
"How many LOD sync requests per second should a client send? \nAlso limits the amount of player's requests allowed to stay in the server's queue.",
"distanthorizons.config.server.maxDataTransferSpeed":
"Maximum Data Transfer Speed, KB/s",
"distanthorizons.config.server.maxDataTransferSpeed.@tooltip":
"Maximum speed for uploading LODs to the clients, in KB/s",