Merge branch 'main' of https://gitlab.com/jeseibel/distant-horizons-core
This commit is contained in:
@@ -31,7 +31,7 @@ public final class ModInfo
|
||||
public static final String DEDICATED_SERVER_INITIAL_PATH = "dedicated_server_initial";
|
||||
|
||||
/** Incremented every time any packets are added, changed or removed, with a few exceptions. */
|
||||
public static final int PROTOCOL_VERSION = 5;
|
||||
public static final int PROTOCOL_VERSION = 6;
|
||||
public static final String WRAPPER_PACKET_PATH = "message";
|
||||
|
||||
/** The internal mod name */
|
||||
|
||||
+4
-4
@@ -6,7 +6,7 @@ import com.seibel.distanthorizons.core.level.IKeyedClientLevelManager;
|
||||
import com.seibel.distanthorizons.core.level.IServerKeyedClientLevel;
|
||||
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
|
||||
import com.seibel.distanthorizons.core.network.event.internal.CloseInternalEvent;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.LevelInitMessage;
|
||||
import com.seibel.distanthorizons.core.network.session.NetworkSession;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapper;
|
||||
@@ -69,13 +69,13 @@ public class ClientPluginChannelApi
|
||||
{
|
||||
Objects.requireNonNull(networkSession);
|
||||
this.networkSession = networkSession;
|
||||
this.networkSession.registerHandler(CurrentLevelKeyMessage.class, this::onCurrentLevelKeyMessage);
|
||||
this.networkSession.registerHandler(LevelInitMessage.class, this::onLevelInitMessage);
|
||||
this.networkSession.registerHandler(CloseInternalEvent.class, this::onClose);
|
||||
}
|
||||
|
||||
private void onCurrentLevelKeyMessage(CurrentLevelKeyMessage msg)
|
||||
private void onLevelInitMessage(LevelInitMessage msg)
|
||||
{
|
||||
if (!msg.levelKey.matches(CurrentLevelKeyMessage.VALIDATION_REGEX))
|
||||
if (!msg.levelKey.matches(LevelInitMessage.VALIDATION_REGEX))
|
||||
{
|
||||
throw new IllegalArgumentException("Server sent invalid level key.");
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ import com.seibel.distanthorizons.core.world.*;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.chunk.IChunkWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftRenderWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftSharedWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapper;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
|
||||
import com.seibel.distanthorizons.coreapi.DependencyInjection.ApiEventInjector;
|
||||
@@ -61,6 +62,7 @@ public class SharedApi
|
||||
/** will be null on the server-side */
|
||||
@Nullable
|
||||
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
|
||||
private static final IMinecraftSharedWrapper MC_SHARED = SingletonInjector.INSTANCE.get(IMinecraftSharedWrapper.class);
|
||||
|
||||
private static final UpdateChunkPosManager UPDATE_POS_MANAGER = new UpdateChunkPosManager();
|
||||
/** how many chunks can be queued for updating per thread, used to prevent updates from infinitely pilling up if the user flies around extremely fast */
|
||||
@@ -265,11 +267,24 @@ public class SharedApi
|
||||
{ queueChunkUpdate(chunkWrapper, neighbourChunkList, dhLevel,false); }
|
||||
private static void queueChunkUpdate(IChunkWrapper chunkWrapper, @Nullable ArrayList<IChunkWrapper> neighbourChunkList, IDhLevel dhLevel, boolean lightUpdateOnly)
|
||||
{
|
||||
int playerCount;
|
||||
if (MC_CLIENT != null && MC_CLIENT.playerExists())
|
||||
{
|
||||
// Multiplayer & local world
|
||||
UPDATE_POS_MANAGER.setCenter(MC_CLIENT.getPlayerChunkPos());
|
||||
UPDATE_POS_MANAGER.maxSize = MAX_UPDATING_CHUNK_COUNT_PER_THREAD * Config.Common.MultiThreading.numberOfLodBuilderThreads.get();
|
||||
|
||||
// Exclude yourself in local worlds
|
||||
playerCount = MC_CLIENT.clientConnectedToDedicatedServer() ? 0 : MC_SHARED.getPlayerCount() - 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Dedicated server
|
||||
playerCount = MC_SHARED.getPlayerCount();
|
||||
}
|
||||
|
||||
UPDATE_POS_MANAGER.maxSize = MAX_UPDATING_CHUNK_COUNT_PER_THREAD
|
||||
* Config.Common.MultiThreading.numberOfLodBuilderThreads.get()
|
||||
* (playerCount + 1);
|
||||
|
||||
UpdateChunkData updateData = new UpdateChunkData(chunkWrapper, neighbourChunkList, dhLevel, lightUpdateOnly);
|
||||
if(lightUpdateOnly)
|
||||
|
||||
@@ -1572,6 +1572,14 @@ public class Config
|
||||
+ "")
|
||||
.build();
|
||||
|
||||
public static ConfigEntry<Integer> maxDataTransferSpeed = new ConfigEntry.Builder<Integer>()
|
||||
.setServersideShortName("maxDataTransferSpeed")
|
||||
.setMinDefaultMax(1, 500, 1000000 /* 1 GB/s */)
|
||||
.comment(""
|
||||
+ "Maximum speed for uploading LODs to the clients, in KB/s."
|
||||
+ "")
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
+89
-36
@@ -6,6 +6,7 @@ import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV
|
||||
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
|
||||
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
|
||||
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
|
||||
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
|
||||
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState;
|
||||
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager;
|
||||
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
|
||||
@@ -14,7 +15,7 @@ import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage;
|
||||
@@ -31,6 +32,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.annotation.CheckForNull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.*;
|
||||
@@ -128,20 +130,23 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
|
||||
CompletableFuture.runAsync(() ->
|
||||
{
|
||||
Objects.requireNonNull(this.beaconBeamRepo);
|
||||
FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey()));
|
||||
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
|
||||
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey())))
|
||||
{
|
||||
this.requestGroupByFutureId.remove(msg.futureId);
|
||||
|
||||
ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer());
|
||||
if (serverPlayerState == null)
|
||||
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
|
||||
{
|
||||
continue;
|
||||
this.requestGroupByFutureId.remove(msg.futureId);
|
||||
|
||||
ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer());
|
||||
if (serverPlayerState == null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
|
||||
msg.sendResponse(new FullDataSourceResponseMessage(payload));
|
||||
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
|
||||
});
|
||||
}
|
||||
|
||||
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
|
||||
payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, msg.getSession()::sendMessage);
|
||||
msg.sendResponse(new FullDataSourceResponseMessage(payload));
|
||||
}
|
||||
}, executor);
|
||||
}
|
||||
@@ -273,12 +278,14 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
|
||||
|
||||
this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource ->
|
||||
{
|
||||
rateLimiterSet.syncOnLoginRateLimiter.release();
|
||||
|
||||
Objects.requireNonNull(this.beaconBeamRepo);
|
||||
FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos));
|
||||
payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, message.getSession()::sendMessage);
|
||||
message.sendResponse(new FullDataSourceResponseMessage(payload));
|
||||
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos)))
|
||||
{
|
||||
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
|
||||
message.sendResponse(new FullDataSourceResponseMessage(payload));
|
||||
rateLimiterSet.syncOnLoginRateLimiter.release();
|
||||
});
|
||||
}
|
||||
}, executor);
|
||||
}
|
||||
private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
|
||||
@@ -371,6 +378,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
|
||||
DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos);
|
||||
if (requestGroup != null)
|
||||
{
|
||||
requestGroup.worldGenTaskComplete = true;
|
||||
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
|
||||
}
|
||||
}
|
||||
@@ -383,6 +391,11 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
|
||||
{
|
||||
requestGroup.fullDataSource = fullDataSource;
|
||||
}
|
||||
else if (requestGroup.worldGenTaskComplete)
|
||||
{
|
||||
// If the returned data source is not fully generated, try reading it again
|
||||
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos);
|
||||
@@ -416,26 +429,30 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
|
||||
CompletableFuture.runAsync(() ->
|
||||
{
|
||||
Objects.requireNonNull(this.beaconBeamRepo);
|
||||
FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos()));
|
||||
for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers())
|
||||
try (FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos())))
|
||||
{
|
||||
if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper)
|
||||
for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!serverPlayerState.sessionConfig.isRealTimeUpdatesEnabled())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Vec3d playerPosition = serverPlayerState.getServerPlayer().getPosition();
|
||||
int distanceFromPlayer = DhSectionPos.getChebyshevBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
|
||||
if (distanceFromPlayer >= serverPlayerState.getServerPlayer().getViewDistance()
|
||||
&& distanceFromPlayer <= serverPlayerState.sessionConfig.getMaxUpdateDistanceRadius())
|
||||
{
|
||||
payload.splitAndSend(FULL_DATA_SPLIT_SIZE_IN_BYTES, serverPlayerState.networkSession::sendMessage);
|
||||
serverPlayerState.networkSession.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload));
|
||||
if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!serverPlayerState.sessionConfig.isRealTimeUpdatesEnabled())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Vec3d playerPosition = serverPlayerState.getServerPlayer().getPosition();
|
||||
int distanceFromPlayer = DhSectionPos.getChebyshevBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
|
||||
if (distanceFromPlayer >= serverPlayerState.getServerPlayer().getViewDistance()
|
||||
&& distanceFromPlayer <= serverPlayerState.sessionConfig.getMaxUpdateDistanceRadius())
|
||||
{
|
||||
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () ->
|
||||
{
|
||||
serverPlayerState.networkSession.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}, executor);
|
||||
@@ -446,6 +463,39 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
|
||||
|
||||
|
||||
|
||||
//===========//
|
||||
// debugging //
|
||||
//===========//
|
||||
|
||||
@Override
|
||||
public void addDebugMenuStringsToList(List<String> messageList)
|
||||
{
|
||||
// migration
|
||||
boolean migrationErrored = this.serverside.fullDataFileHandler.getMigrationStoppedWithError();
|
||||
if (!migrationErrored)
|
||||
{
|
||||
long legacyDeletionCount = this.serverside.fullDataFileHandler.getLegacyDeletionCount();
|
||||
if (legacyDeletionCount > 0)
|
||||
{
|
||||
messageList.add(" Migrating - Deleting #: " + F3Screen.NUMBER_FORMAT.format(legacyDeletionCount));
|
||||
}
|
||||
long migrationCount = this.serverside.fullDataFileHandler.getTotalMigrationCount();
|
||||
if (migrationCount > 0)
|
||||
{
|
||||
messageList.add(" Migrating - Conversion #: " + F3Screen.NUMBER_FORMAT.format(migrationCount));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
messageList.add(" Migration Failed");
|
||||
}
|
||||
|
||||
// world gen
|
||||
this.serverside.worldGenModule.addDebugMenuStringsToList(messageList);
|
||||
}
|
||||
|
||||
|
||||
|
||||
//=========//
|
||||
// getters //
|
||||
//=========//
|
||||
@@ -492,8 +542,11 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
|
||||
{
|
||||
public final ConcurrentMap<Long, FullDataSourceRequestMessage> requestMessages = new ConcurrentHashMap<>();
|
||||
|
||||
/** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */
|
||||
boolean worldGenTaskComplete = false;
|
||||
|
||||
@CheckForNull
|
||||
public FullDataSourceV2 fullDataSource;
|
||||
public FullDataSourceV2 fullDataSource = null;
|
||||
|
||||
/**
|
||||
* These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled,
|
||||
|
||||
@@ -137,7 +137,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
|
||||
|
||||
try
|
||||
{
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(message.payload);
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(message.payload);
|
||||
|
||||
if (!message.isSameLevelAs(this.levelWrapper))
|
||||
{
|
||||
|
||||
@@ -136,30 +136,7 @@ public class DhClientServerLevel extends AbstractDhServerLevel implements IDhCli
|
||||
boolean rendering = this.clientside.isRendering();
|
||||
messageList.add("["+dimName+"] rendering: "+(rendering ? "yes" : "no"));
|
||||
|
||||
|
||||
// migration
|
||||
boolean migrationErrored = this.serverside.fullDataFileHandler.getMigrationStoppedWithError();
|
||||
if (!migrationErrored)
|
||||
{
|
||||
long legacyDeletionCount = this.serverside.fullDataFileHandler.getLegacyDeletionCount();
|
||||
if (legacyDeletionCount > 0)
|
||||
{
|
||||
messageList.add(" Migrating - Deleting #: " + F3Screen.NUMBER_FORMAT.format(legacyDeletionCount));
|
||||
}
|
||||
long migrationCount = this.serverside.fullDataFileHandler.getTotalMigrationCount();
|
||||
if (migrationCount > 0)
|
||||
{
|
||||
messageList.add(" Migrating - Conversion #: " + F3Screen.NUMBER_FORMAT.format(migrationCount));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
messageList.add(" Migration Failed");
|
||||
}
|
||||
|
||||
|
||||
// world gen
|
||||
this.serverside.worldGenModule.addDebugMenuStringsToList(messageList);
|
||||
super.addDebugMenuStringsToList(messageList);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -77,7 +77,10 @@ public class DhServerLevel extends AbstractDhServerLevel
|
||||
|
||||
@Override
|
||||
public void addDebugMenuStringsToList(List<String> messageList)
|
||||
{ messageList.add("["+this.serverLevelWrapper.getDimensionName()+"], SL"); }
|
||||
{
|
||||
messageList.add("[" + this.serverLevelWrapper.getDimensionName() + "]");
|
||||
super.addDebugMenuStringsToList(messageList);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
+9
-19
@@ -31,10 +31,8 @@ import javax.annotation.CheckForNull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.awt.*;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
@@ -46,8 +44,6 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
|
||||
|
||||
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
|
||||
|
||||
private static final Timer TASK_FINISH_TIMER = TimerUtil.CreateTimer("RequestTaskFinishTimer");
|
||||
|
||||
private static final int MAX_RETRY_ATTEMPTS = 3;
|
||||
|
||||
protected static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
|
||||
@@ -163,7 +159,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
|
||||
{
|
||||
Map.Entry<Long, RequestQueueEntry> mapEntry = this.waitingTasksBySectionPos.entrySet().stream()
|
||||
.filter(task -> task.getValue().networkDataSourceFuture == null)
|
||||
.min((x, y) -> posDistanceSquared(targetPos, x.getKey()) - posDistanceSquared(targetPos, y.getKey()))
|
||||
.min(Comparator.comparingInt(x -> posDistanceSquared(targetPos, x.getKey())))
|
||||
.orElse(null);
|
||||
|
||||
if (mapEntry == null)
|
||||
@@ -175,8 +171,12 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
|
||||
long sectionPos = mapEntry.getKey();
|
||||
RequestQueueEntry entry = mapEntry.getValue();
|
||||
|
||||
Long offsetEntryTimestamp = entry.updateTimestamp != null
|
||||
? entry.updateTimestamp + this.networkState.getServerTimeOffset()
|
||||
: null;
|
||||
|
||||
CompletableFuture<FullDataSourceResponseMessage> dataSourceFuture = this.networkState.getSession().sendRequest(
|
||||
new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, entry.updateTimestamp),
|
||||
new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, offsetEntryTimestamp),
|
||||
FullDataSourceResponseMessage.class
|
||||
);
|
||||
entry.networkDataSourceFuture = dataSourceFuture;
|
||||
@@ -193,7 +193,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
|
||||
|
||||
if (response.payload != null)
|
||||
{
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.decodeDataSourceAndReleaseBuffer(response.payload);
|
||||
FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(response.payload);
|
||||
|
||||
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
|
||||
if (executor == null)
|
||||
@@ -258,17 +258,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
|
||||
}
|
||||
}
|
||||
|
||||
// Hack to work around a race condition
|
||||
// If you finish the request too quickly, the section will never render
|
||||
TASK_FINISH_TIMER.schedule(new TimerTask()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
entry.future.complete(true);
|
||||
}
|
||||
}, 10000);
|
||||
return null;
|
||||
return entry.future.complete(true);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
+13
-45
@@ -1,34 +1,26 @@
|
||||
package com.seibel.distanthorizons.core.multiplayer.client;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
|
||||
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
|
||||
import com.seibel.distanthorizons.core.multiplayer.config.SessionConfig;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayloadReceiver;
|
||||
import com.seibel.distanthorizons.core.network.event.ScopedNetworkEventSource;
|
||||
import com.seibel.distanthorizons.core.network.event.internal.CloseInternalEvent;
|
||||
import com.seibel.distanthorizons.core.network.event.internal.IncompatibleMessageInternalEvent;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.LevelInitMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.SessionConfigMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.network.session.NetworkSession;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import com.seibel.distanthorizons.core.util.LodUtil;
|
||||
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
|
||||
import com.seibel.distanthorizons.coreapi.ModInfo;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ClientNetworkState implements Closeable
|
||||
{
|
||||
@@ -38,10 +30,7 @@ public class ClientNetworkState implements Closeable
|
||||
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
|
||||
|
||||
|
||||
private final ConcurrentMap<Integer, CompositeByteBuf> fullDataBufferById = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(10, TimeUnit.SECONDS)
|
||||
.<Integer, CompositeByteBuf>build()
|
||||
.asMap();
|
||||
public final FullDataPayloadReceiver fullDataPayloadReceiver = new FullDataPayloadReceiver();
|
||||
|
||||
private final SessionConfig.AnyChangeListener configAnyChangeListener = new SessionConfig.AnyChangeListener(this::sendConfigMessage);
|
||||
|
||||
@@ -64,6 +53,9 @@ public class ClientNetworkState implements Closeable
|
||||
@Nullable
|
||||
private Integer closestProtocolVersion;
|
||||
|
||||
private long serverTimeOffset = 0;
|
||||
public long getServerTimeOffset() { return this.serverTimeOffset; }
|
||||
|
||||
|
||||
|
||||
//=============//
|
||||
@@ -81,13 +73,16 @@ public class ClientNetworkState implements Closeable
|
||||
}
|
||||
});
|
||||
|
||||
this.networkSession.registerHandler(CurrentLevelKeyMessage.class, message ->
|
||||
this.networkSession.registerHandler(LevelInitMessage.class, message ->
|
||||
{
|
||||
// we will also receive this message when we have full support
|
||||
if (this.serverSupportStatus == EServerSupportStatus.NONE)
|
||||
{
|
||||
this.serverSupportStatus = EServerSupportStatus.LEVELS_ONLY;
|
||||
}
|
||||
|
||||
this.serverTimeOffset = message.serverTime - System.currentTimeMillis();
|
||||
LOGGER.info("Server time offset: [${this.serverTimeOffset}] ms");
|
||||
});
|
||||
|
||||
this.networkSession.registerHandler(CloseInternalEvent.class, message ->
|
||||
@@ -120,22 +115,7 @@ public class ClientNetworkState implements Closeable
|
||||
this.configReceived = true;
|
||||
});
|
||||
|
||||
this.networkSession.registerHandler(FullDataSplitMessage.class, message ->
|
||||
{
|
||||
if (message.isFirst)
|
||||
{
|
||||
CompositeByteBuf composite = this.fullDataBufferById.remove(message.bufferId);
|
||||
if (composite != null)
|
||||
{
|
||||
composite.release();
|
||||
LOGGER.debug("Released full data buffer [" + message.bufferId + "]: [" + composite + "]");
|
||||
}
|
||||
}
|
||||
|
||||
CompositeByteBuf byteBuffer = this.fullDataBufferById.computeIfAbsent(message.bufferId, bufferId -> ByteBufAllocator.DEFAULT.compositeBuffer());
|
||||
byteBuffer.addComponent(true, message.buffer);
|
||||
LOGGER.debug("Full data buffer [" + message.bufferId + "]: [" + byteBuffer + "].");
|
||||
});
|
||||
this.networkSession.registerHandler(FullDataSplitMessage.class, this.fullDataPayloadReceiver::receiveChunk);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,20 +125,7 @@ public class ClientNetworkState implements Closeable
|
||||
// send message //
|
||||
//==============//
|
||||
|
||||
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg)
|
||||
{
|
||||
CompositeByteBuf compositeByteBuffer = this.fullDataBufferById.remove(msg.dtoBufferId);
|
||||
LodUtil.assertTrue(compositeByteBuffer != null);
|
||||
|
||||
try
|
||||
{
|
||||
return INetworkObject.decodeToInstance(FullDataSourceV2DTO.CreateEmptyDataSource(), compositeByteBuffer);
|
||||
}
|
||||
finally
|
||||
{
|
||||
compositeByteBuffer.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void sendConfigMessage()
|
||||
{
|
||||
@@ -198,6 +165,7 @@ public class ClientNetworkState implements Closeable
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
this.fullDataPayloadReceiver.close();
|
||||
this.configAnyChangeListener.close();
|
||||
this.networkSession.close();
|
||||
}
|
||||
|
||||
+3
@@ -40,6 +40,8 @@ public class SessionConfig implements INetworkObject
|
||||
|
||||
registerConfigEntry(Config.Server.synchronizeOnLoad, (x, y) -> x && y);
|
||||
registerConfigEntry(Config.Server.syncOnLoadRateLimit, Math::min);
|
||||
|
||||
registerConfigEntry(Config.Server.maxDataTransferSpeed, Math::min);
|
||||
}
|
||||
|
||||
public SessionConfig() {}
|
||||
@@ -56,6 +58,7 @@ public class SessionConfig implements INetworkObject
|
||||
public boolean isRealTimeUpdatesEnabled() { return this.getValue(Config.Server.enableRealTimeUpdates); }
|
||||
public boolean getSynchronizeOnLoad() { return this.getValue(Config.Server.synchronizeOnLoad); }
|
||||
public int getSyncOnLoginRateLimit() { return this.getValue(Config.Server.syncOnLoadRateLimit); }
|
||||
public int getMaxDataTransferSpeed() { return this.getValue(Config.Server.maxDataTransferSpeed); }
|
||||
|
||||
|
||||
|
||||
|
||||
+9
-39
@@ -1,13 +1,13 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
package com.seibel.distanthorizons.core.multiplayer.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
|
||||
import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import com.seibel.distanthorizons.core.util.TimerUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@@ -20,13 +20,10 @@ import java.util.function.Consumer;
|
||||
/**
|
||||
* @see FullDataSplitMessage
|
||||
*/
|
||||
public class FullDataPayload implements INetworkObject
|
||||
public class FullDataPayload implements INetworkObject, AutoCloseable
|
||||
{
|
||||
private static final AtomicInteger lastBufferId = new AtomicInteger();
|
||||
|
||||
// Reference counting is unreliable here for some reason so this is a "fix"
|
||||
private static final Timer bufferCleanupTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer");
|
||||
|
||||
public int dtoBufferId;
|
||||
public ByteBuf dtoBuffer;
|
||||
|
||||
@@ -52,15 +49,6 @@ public class FullDataPayload implements INetworkObject
|
||||
|
||||
this.dtoBuffer = ByteBufAllocator.DEFAULT.buffer();
|
||||
dataSourceDto.encode(this.dtoBuffer);
|
||||
|
||||
bufferCleanupTimer.schedule(new TimerTask()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
FullDataPayload.this.dtoBuffer.release();
|
||||
}
|
||||
}, 5000L);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
@@ -90,36 +78,18 @@ public class FullDataPayload implements INetworkObject
|
||||
this.beaconBeams = this.readCollection(in, new ArrayList<>(), () -> new BeaconBeamDTO(null, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to send {@link FullDataPayload}'s since the data they contain may be larger
|
||||
* than what a single packet could contain.
|
||||
*
|
||||
* @param payloadChunkSizeInBytes how many bytes can be sent in a single message
|
||||
*/
|
||||
public void splitAndSend(int payloadChunkSizeInBytes, Consumer<FullDataSplitMessage> sendMessageConsumer)
|
||||
{
|
||||
// chunk in this context means chunk of data, not a MC chunk
|
||||
for (int payloadChunkNum = 0; ; payloadChunkNum++)
|
||||
{
|
||||
int offset = payloadChunkNum * payloadChunkSizeInBytes;
|
||||
|
||||
int actualChunkSize = Math.min(this.dtoBuffer.writerIndex() - offset, payloadChunkSizeInBytes);
|
||||
if (actualChunkSize <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
FullDataSplitMessage chunk = new FullDataSplitMessage(this.dtoBufferId, payloadChunkNum == 0, this.dtoBuffer.slice(offset, actualChunkSize));
|
||||
sendMessageConsumer.accept(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
//================//
|
||||
// base overrides //
|
||||
//================//
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
this.dtoBuffer.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
+88
@@ -0,0 +1,88 @@
|
||||
package com.seibel.distanthorizons.core.multiplayer.fullData;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalCause;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
|
||||
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
|
||||
import com.seibel.distanthorizons.core.util.LodUtil;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class FullDataPayloadReceiver implements AutoCloseable
|
||||
{
|
||||
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
|
||||
() -> Config.Common.Logging.logNetworkEvent.get());
|
||||
|
||||
private final ConcurrentMap<Integer, CompositeByteBuf> buffersById = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(10, TimeUnit.SECONDS)
|
||||
.removalListener((RemovalNotification<Integer, CompositeByteBuf> notification) ->
|
||||
{
|
||||
// If an entry was replaced without removing, the buffer has to be released manually
|
||||
if (notification.getCause() != RemovalCause.REPLACED)
|
||||
{
|
||||
Objects.requireNonNull(notification.getValue()).release();
|
||||
}
|
||||
})
|
||||
.build().asMap();
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
this.buffersById.clear();
|
||||
}
|
||||
|
||||
public void receiveChunk(FullDataSplitMessage message)
|
||||
{
|
||||
this.buffersById.compute(message.bufferId, (bufferId, composite) ->
|
||||
{
|
||||
if (message.isFirst)
|
||||
{
|
||||
if (composite != null)
|
||||
{
|
||||
composite.release();
|
||||
LOGGER.debug("Released existing full data buffer [" + message.bufferId + "]");
|
||||
}
|
||||
|
||||
composite = ByteBufAllocator.DEFAULT.compositeBuffer();
|
||||
LOGGER.debug("Created new full data buffer [" + message.bufferId + "]: [" + composite + "]");
|
||||
}
|
||||
else if (composite == null)
|
||||
{
|
||||
LOGGER.debug("Received non-first full data chunk for empty buffer [" + message.bufferId + "]: [" + message.buffer + "].");
|
||||
return null;
|
||||
}
|
||||
|
||||
composite.addComponent(true, message.buffer);
|
||||
LOGGER.debug("Updated full data buffer [" + message.bufferId + "]: [" + composite + "].");
|
||||
return composite;
|
||||
});
|
||||
}
|
||||
|
||||
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload msg)
|
||||
{
|
||||
CompositeByteBuf compositeByteBuffer = this.buffersById.get(msg.dtoBufferId);
|
||||
LodUtil.assertTrue(compositeByteBuffer != null);
|
||||
|
||||
try
|
||||
{
|
||||
return INetworkObject.decodeToInstance(FullDataSourceV2DTO.CreateEmptyDataSource(), compositeByteBuffer);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Releasing the buffer is handled by cache
|
||||
this.buffersById.remove(msg.dtoBufferId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
+104
@@ -0,0 +1,104 @@
|
||||
package com.seibel.distanthorizons.core.multiplayer.fullData;
|
||||
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
|
||||
import com.seibel.distanthorizons.core.network.session.NetworkSession;
|
||||
import com.seibel.distanthorizons.core.util.TimerUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.*;
|
||||
|
||||
public class FullDataPayloadSender implements AutoCloseable
|
||||
{
|
||||
private static final int TICK_RATE = 4;
|
||||
|
||||
private static final Timer UPLOAD_TIMER = TimerUtil.CreateTimer("FullDataPayloadSender");
|
||||
private final TimerTask tickTimerTask = TimerUtil.createTimerTask(this::tick);
|
||||
|
||||
private final NetworkSession session;
|
||||
private final IntSupplier maxKBpsSupplier;
|
||||
private final ConcurrentLinkedQueue<PendingTransfer> transferQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
public FullDataPayloadSender(NetworkSession session, IntSupplier maxKBpsSupplier)
|
||||
{
|
||||
this.session = session;
|
||||
this.maxKBpsSupplier = maxKBpsSupplier;
|
||||
UPLOAD_TIMER.scheduleAtFixedRate(this.tickTimerTask, 0, 1000 / TICK_RATE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
this.tickTimerTask.cancel();
|
||||
|
||||
PendingTransfer pendingTransfer;
|
||||
while ((pendingTransfer = this.transferQueue.poll()) != null)
|
||||
{
|
||||
pendingTransfer.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void sendInChunks(FullDataPayload payload, Runnable sendFinalMessage)
|
||||
{
|
||||
this.transferQueue.add(new PendingTransfer(payload, sendFinalMessage));
|
||||
}
|
||||
|
||||
private void tick()
|
||||
{
|
||||
int bytesToSend = (this.maxKBpsSupplier.getAsInt() * 1000) / TICK_RATE;
|
||||
while (bytesToSend > 0)
|
||||
{
|
||||
PendingTransfer pendingTransfer = this.transferQueue.peek();
|
||||
if (pendingTransfer == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
int chunkSize = Math.min(bytesToSend, pendingTransfer.buffer.readableBytes());
|
||||
boolean isFirstChunk = pendingTransfer.buffer.readerIndex() == 0;
|
||||
|
||||
FullDataSplitMessage chunkMessage = new FullDataSplitMessage(pendingTransfer.bufferId, pendingTransfer.buffer.readRetainedSlice(chunkSize), isFirstChunk);
|
||||
this.session.sendMessage(chunkMessage);
|
||||
|
||||
bytesToSend -= chunkSize;
|
||||
|
||||
if (pendingTransfer.buffer.readableBytes() == 0)
|
||||
{
|
||||
pendingTransfer.sendFinalMessage.run();
|
||||
pendingTransfer.close();
|
||||
this.transferQueue.poll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class PendingTransfer implements AutoCloseable
|
||||
{
|
||||
public final int bufferId;
|
||||
public final ByteBuf buffer;
|
||||
public final Runnable sendFinalMessage;
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean();
|
||||
|
||||
private PendingTransfer(FullDataPayload payload, Runnable sendFinalMessage)
|
||||
{
|
||||
this.bufferId = payload.dtoBufferId;
|
||||
this.buffer = payload.dtoBuffer.retainedDuplicate().readerIndex(0);
|
||||
this.sendFinalMessage = sendFinalMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (this.isClosed.compareAndSet(false, true))
|
||||
{
|
||||
this.buffer.release();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
+7
-3
@@ -4,7 +4,8 @@ import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
|
||||
import com.seibel.distanthorizons.core.level.AbstractDhServerLevel;
|
||||
import com.seibel.distanthorizons.core.multiplayer.config.SessionConfig;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage;
|
||||
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayloadSender;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.LevelInitMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.SessionConfigMessage;
|
||||
import com.seibel.distanthorizons.core.network.event.internal.CloseInternalEvent;
|
||||
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
|
||||
@@ -34,12 +35,13 @@ public class ServerPlayerState implements Closeable
|
||||
public final SessionConfig sessionConfig = new SessionConfig();
|
||||
public boolean isReady() { return this.sessionConfig.constrainingConfig != null; }
|
||||
|
||||
public final FullDataPayloadSender fullDataPayloadSender;
|
||||
|
||||
private final ConcurrentHashMap<AbstractDhServerLevel, RateLimiterSet> rateLimiterSets = new ConcurrentHashMap<>();
|
||||
public RateLimiterSet getRateLimiterSet(AbstractDhServerLevel level) { return this.rateLimiterSets.computeIfAbsent(level, ignored -> new RateLimiterSet()); }
|
||||
public void clearRateLimiterSets() { this.rateLimiterSets.clear(); }
|
||||
|
||||
|
||||
|
||||
//==============//
|
||||
// constructors //
|
||||
//==============//
|
||||
@@ -47,6 +49,7 @@ public class ServerPlayerState implements Closeable
|
||||
public ServerPlayerState(IServerPlayerWrapper serverPlayer)
|
||||
{
|
||||
this.networkSession = new NetworkSession(serverPlayer);
|
||||
this.fullDataPayloadSender = new FullDataPayloadSender(this.networkSession, this.sessionConfig::getMaxDataTransferSpeed);
|
||||
|
||||
this.networkSession.registerHandler(SessionConfigMessage.class, (sessionConfigMessage) ->
|
||||
{
|
||||
@@ -76,7 +79,7 @@ public class ServerPlayerState implements Closeable
|
||||
if (!levelKey.equals(this.lastLevelKey))
|
||||
{
|
||||
this.lastLevelKey = levelKey;
|
||||
this.networkSession.sendMessage(new CurrentLevelKeyMessage(levelKey));
|
||||
this.networkSession.sendMessage(new LevelInitMessage(levelKey));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,6 +95,7 @@ public class ServerPlayerState implements Closeable
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
this.fullDataPayloadSender.close();
|
||||
this.levelKeyPrefixChangeListener.close();
|
||||
this.configAnyChangeListener.close();
|
||||
this.networkSession.close();
|
||||
|
||||
+2
-2
@@ -22,7 +22,7 @@ package com.seibel.distanthorizons.core.network.messages;
|
||||
import com.google.common.collect.BiMap;
|
||||
import com.google.common.collect.HashBiMap;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CodecCrashMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.LevelInitMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.SessionConfigMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage;
|
||||
@@ -59,7 +59,7 @@ public class MessageRegistry
|
||||
this.registerMessage(CloseReasonMessage.class, CloseReasonMessage::new);
|
||||
|
||||
// Level keys
|
||||
this.registerMessage(CurrentLevelKeyMessage.class, CurrentLevelKeyMessage::new);
|
||||
this.registerMessage(LevelInitMessage.class, LevelInitMessage::new);
|
||||
|
||||
// Config (for full DH support)
|
||||
this.registerMessage(SessionConfigMessage.class, SessionConfigMessage::new);
|
||||
|
||||
+20
-6
@@ -4,7 +4,7 @@ import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
public class CurrentLevelKeyMessage extends AbstractNetworkMessage
|
||||
public class LevelInitMessage extends AbstractNetworkMessage
|
||||
{
|
||||
public static final int MAX_LENGTH = 150;
|
||||
|
||||
@@ -16,6 +16,7 @@ public class CurrentLevelKeyMessage extends AbstractNetworkMessage
|
||||
|
||||
|
||||
public String levelKey;
|
||||
public long serverTime;
|
||||
|
||||
|
||||
|
||||
@@ -23,8 +24,12 @@ public class CurrentLevelKeyMessage extends AbstractNetworkMessage
|
||||
// constructors //
|
||||
//==============//
|
||||
|
||||
public CurrentLevelKeyMessage() { }
|
||||
public CurrentLevelKeyMessage(String levelKey) { this.levelKey = levelKey; }
|
||||
public LevelInitMessage() { }
|
||||
public LevelInitMessage(String levelKey)
|
||||
{
|
||||
this.levelKey = levelKey;
|
||||
this.serverTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -33,10 +38,18 @@ public class CurrentLevelKeyMessage extends AbstractNetworkMessage
|
||||
//===============//
|
||||
|
||||
@Override
|
||||
public void encode(ByteBuf out) { this.writeString(this.levelKey, out); }
|
||||
public void encode(ByteBuf out)
|
||||
{
|
||||
this.writeString(this.levelKey, out);
|
||||
out.writeLong(this.serverTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ByteBuf in) { this.levelKey = this.readString(in); }
|
||||
public void decode(ByteBuf in)
|
||||
{
|
||||
this.levelKey = this.readString(in);
|
||||
this.serverTime = in.readLong();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -48,7 +61,8 @@ public class CurrentLevelKeyMessage extends AbstractNetworkMessage
|
||||
public MoreObjects.ToStringHelper toStringHelper()
|
||||
{
|
||||
return super.toStringHelper()
|
||||
.add("levelKey", this.levelKey);
|
||||
.add("levelKey", this.levelKey)
|
||||
.add("serverTime", this.serverTime);
|
||||
}
|
||||
|
||||
}
|
||||
+1
@@ -20,6 +20,7 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
|
||||
|
||||
+1
@@ -20,6 +20,7 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.network.INetworkObject;
|
||||
import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
+16
-1
@@ -20,9 +20,13 @@
|
||||
package com.seibel.distanthorizons.core.network.messages.fullData;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
|
||||
import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
|
||||
import com.seibel.distanthorizons.core.util.TimerUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
import java.util.Timer;
|
||||
|
||||
/**
|
||||
* Used to send part of a {@link FullDataPayload}.
|
||||
*
|
||||
@@ -30,10 +34,15 @@ import io.netty.buffer.ByteBuf;
|
||||
*/
|
||||
public class FullDataSplitMessage extends AbstractNetworkMessage
|
||||
{
|
||||
private static final long BUFFER_RELEASE_DELAY_MS = 5000L;
|
||||
|
||||
public int bufferId;
|
||||
public ByteBuf buffer;
|
||||
public boolean isFirst;
|
||||
|
||||
// Reference counting is unreliable here for some reason so this is a "fix"
|
||||
private static final Timer bufferReleaseTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer");
|
||||
private boolean releaseScheduled = false;
|
||||
|
||||
|
||||
//==============//
|
||||
@@ -41,7 +50,7 @@ public class FullDataSplitMessage extends AbstractNetworkMessage
|
||||
//==============//
|
||||
|
||||
public FullDataSplitMessage() { }
|
||||
public FullDataSplitMessage(int bufferId, boolean isFirst, ByteBuf buffer)
|
||||
public FullDataSplitMessage(int bufferId, ByteBuf buffer, boolean isFirst)
|
||||
{
|
||||
this.bufferId = bufferId;
|
||||
this.buffer = buffer;
|
||||
@@ -63,6 +72,12 @@ public class FullDataSplitMessage extends AbstractNetworkMessage
|
||||
out.writeBytes(this.buffer.readerIndex(0));
|
||||
|
||||
out.writeBoolean(this.isFirst);
|
||||
|
||||
if (!this.releaseScheduled)
|
||||
{
|
||||
bufferReleaseTimer.schedule(TimerUtil.createTimerTask(this.buffer::release), BUFFER_RELEASE_DELAY_MS);
|
||||
this.releaseScheduled = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.seibel.distanthorizons.core.util;
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
/**
|
||||
* Handles creating timers.
|
||||
@@ -17,4 +18,16 @@ public class TimerUtil
|
||||
return new Timer(ThreadUtil.THREAD_NAME_PREFIX+timerName, true);
|
||||
}
|
||||
|
||||
public static TimerTask createTimerTask(Runnable runMethod)
|
||||
{
|
||||
return new TimerTask()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
runMethod.run();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
+2
@@ -33,4 +33,6 @@ public interface IMinecraftSharedWrapper extends IBindable
|
||||
/** @return true if this is the first time loading this world */
|
||||
boolean isWorldNew();
|
||||
|
||||
int getPlayerCount();
|
||||
|
||||
}
|
||||
|
||||
+3
-3
@@ -21,7 +21,7 @@ package com.seibel.distanthorizons.core.wrapperInterfaces.world;
|
||||
|
||||
import com.seibel.distanthorizons.core.api.internal.SharedApi;
|
||||
import com.seibel.distanthorizons.core.config.Config;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.CurrentLevelKeyMessage;
|
||||
import com.seibel.distanthorizons.core.network.messages.base.LevelInitMessage;
|
||||
import com.seibel.distanthorizons.core.world.EWorldEnvironment;
|
||||
|
||||
import java.io.File;
|
||||
@@ -41,7 +41,7 @@ public interface IServerLevelWrapper extends ILevelWrapper
|
||||
if (SharedApi.getEnvironment() == EWorldEnvironment.CLIENT_SERVER)
|
||||
{
|
||||
String cleanWorldFolderName = this.getMcSaveFolder().getParentFile().getName()
|
||||
.replaceAll("[^" + CurrentLevelKeyMessage.PART_ALLOWED_CHARS_REGEX + " ]", "")
|
||||
.replaceAll("[^" + LevelInitMessage.PART_ALLOWED_CHARS_REGEX + " ]", "")
|
||||
.replaceAll(" ", "_");
|
||||
levelKeyPrefix += (!levelKeyPrefix.isEmpty() ? "_" : "") + cleanWorldFolderName;
|
||||
}
|
||||
@@ -51,7 +51,7 @@ public interface IServerLevelWrapper extends ILevelWrapper
|
||||
String mainPart = "@" + dimensionName;
|
||||
|
||||
return levelKeyPrefix.substring(0, Math.min(
|
||||
CurrentLevelKeyMessage.MAX_LENGTH - mainPart.length(),
|
||||
LevelInitMessage.MAX_LENGTH - mainPart.length(),
|
||||
levelKeyPrefix.length()
|
||||
)) + mainPart;
|
||||
}
|
||||
|
||||
@@ -689,6 +689,10 @@
|
||||
"Rate Limit for Sync on Load",
|
||||
"distanthorizons.config.server.syncOnLoadRateLimit.@tooltip":
|
||||
"How many LOD sync requests per second should a client send? \nAlso limits the amount of player's requests allowed to stay in the server's queue.",
|
||||
"distanthorizons.config.server.maxDataTransferSpeed":
|
||||
"Maximum Data Transfer Speed, KB/s",
|
||||
"distanthorizons.config.server.maxDataTransferSpeed.@tooltip":
|
||||
"Maximum speed for uploading LODs to the clients, in KB/s",
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user