Fix switching dimensions (request cancellation is broken)

This commit is contained in:
s809
2023-08-04 13:06:56 +05:00
parent 1788c18d59
commit 76b226b865
18 changed files with 189 additions and 180 deletions
@@ -8,8 +8,6 @@ import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceResponseMessage;
@@ -25,10 +23,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -77,7 +72,7 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
@Override public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos)
{
while (generatorClosingFuture == null
&& networkState.eventSource.parent.isReady()
&& networkState.client().isReady()
&& !waitingTasks.isEmpty()
&& pendingTasks() < this.networkState.config.fullDataRequestRateLimit
&& pendingTasksSemaphore.tryAcquire())
@@ -95,7 +90,7 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
? a : b));
WorldGenQueueEntry entry = waitingTasks.remove(sectionPos);
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.eventSource.parent.sendRequest(new FullDataSourceRequestMessage(sectionPos));
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.client().sendRequest(new FullDataSourceRequestMessage(sectionPos));
pendingRequests.add(request);
request.handle((response, throwable) ->
{
@@ -131,14 +126,11 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
entry.future.complete(WorldGenResult.CreateSuccess(sectionPos));
}
catch (RateLimitedException e)
{
LOGGER.warn("Rate limited by server, re-queueing task: "+sectionPos, e);
finishedRequests.decrementAndGet();
waitingTasks.put(sectionPos, entry);
}
catch (ChannelException e)
catch (CancellationException | ChannelException | RateLimitedException e)
{
if (e instanceof RateLimitedException)
LOGGER.warn("Rate limited by server, re-queueing task ["+sectionPos+"]: "+e.getMessage());
finishedRequests.decrementAndGet();
waitingTasks.put(sectionPos, entry);
}
@@ -7,6 +7,7 @@ import com.seibel.distanthorizons.core.file.fullDatafile.RemoteFullDataFileHandl
import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure;
import com.seibel.distanthorizons.core.generation.WorldRemoteGenerationQueue;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.pos.DhBlockPos;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
@@ -34,9 +35,9 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
private static class WorldGenState extends WorldGenModule.WorldGenState
{
WorldGenState(IDhClientLevel level, NetworkClient client)
WorldGenState(IDhClientLevel level, ClientNetworkState networkState)
{
this.worldGenerationQueue = new WorldRemoteGenerationQueue(client, level);
this.worldGenerationQueue = new WorldRemoteGenerationQueue(networkState, level);
}
}
@@ -46,7 +47,7 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
public final RemoteFullDataFileHandler dataFileHandler;
@CheckForNull
private final NetworkClient networkClient;
private final ClientNetworkState networkState;
public final WorldGenModule worldGenModule;
@@ -55,13 +56,13 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
// constructor //
//=============//
public DhClientLevel(AbstractSaveStructure saveStructure, IClientLevelWrapper clientLevelWrapper, @Nullable NetworkClient networkClient)
public DhClientLevel(AbstractSaveStructure saveStructure, IClientLevelWrapper clientLevelWrapper, @Nullable ClientNetworkState networkState)
{
this.levelWrapper = clientLevelWrapper;
this.saveStructure = saveStructure;
this.dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure);
this.networkClient = networkClient;
this.networkState = networkState;
this.worldGenModule = new WorldGenModule(dataFileHandler, this);
clientside = new ClientLevelModule(this);
@@ -82,13 +83,13 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
public void doWorldGen()
{
boolean isClientUsable = networkClient != null && !networkClient.isClosed();
boolean isClientUsable = networkState != null && !networkState.client().isClosed();
boolean shouldDoWorldGen = isClientUsable && clientside.isRendering();
boolean isWorldGenRunning = worldGenModule.isWorldGenRunning();
if (shouldDoWorldGen && !isWorldGenRunning)
{
// start world gen
worldGenModule.startWorldGen(this.dataFileHandler, new WorldGenState(this, this.networkClient));
worldGenModule.startWorldGen(this.dataFileHandler, new WorldGenState(this, this.networkState));
}
else if (!shouldDoWorldGen && isWorldGenRunning)
{
@@ -6,12 +6,11 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedF
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IIncompleteFullDataSource;
import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataFileHandler;
import com.seibel.distanthorizons.core.file.fullDatafile.IFullDataSourceProvider;
import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure;
import com.seibel.distanthorizons.core.multiplayer.RemotePlayer;
import com.seibel.distanthorizons.core.multiplayer.ServerPlayerState;
import com.seibel.distanthorizons.core.multiplayer.RemotePlayerConnectionHandler;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.CancelMessage;
@@ -22,7 +21,6 @@ import com.seibel.distanthorizons.core.pos.DhLodPos;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.world.DhServerWorld;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
@@ -41,7 +39,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
private final IServerLevelWrapper serverLevelWrapper;
private final RemotePlayerConnectionHandler remotePlayerConnectionHandler;
private final ChildNetworkEventSource<NetworkServer> eventSource;
private final ScopedNetworkEventSource<NetworkServer> eventSource;
private final ConcurrentLinkedQueue<IServerPlayerWrapper> worldGenLoopingQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<DhSectionPos, IncompleteDataSourceEntry> incompleteDataSources = new ConcurrentHashMap<>();
@@ -60,7 +58,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
LOGGER.info("Started DHLevel for {} with saves at {}", serverLevelWrapper, saveStructure);
this.remotePlayerConnectionHandler = remotePlayerConnectionHandler;
this.eventSource = new ChildNetworkEventSource<>(remotePlayerConnectionHandler.eventSource);
this.eventSource = new ScopedNetworkEventSource<>(remotePlayerConnectionHandler.server());
this.registerNetworkHandlers();
}
@@ -68,15 +66,15 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
{
this.eventSource.registerHandler(FullDataSourceRequestMessage.class, msg ->
{
RemotePlayer remotePlayer = remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (remotePlayer.serverPlayer.getLevel() != this.serverLevelWrapper)
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (serverPlayerState.serverPlayer.getLevel() != this.serverLevelWrapper)
return;
LOGGER.info("FullDataSourceRequestMessage received at pos ({}, {}) with detail level {}", msg.dhSectionPos.sectionX, msg.dhSectionPos.sectionZ, msg.dhSectionPos.sectionDetailLevel);
if (remotePlayer.pendingFullDataRequests.incrementAndGet() > rateLimitConfig.get())
if (serverPlayerState.pendingFullDataRequests.incrementAndGet() > rateLimitConfig.get())
{
remotePlayer.pendingFullDataRequests.decrementAndGet();
serverPlayerState.pendingFullDataRequests.decrementAndGet();
msg.sendResponse(new RateLimitedException("Max concurrent requests: "+rateLimitConfig.get()));
return;
}
@@ -104,8 +102,8 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
this.eventSource.registerHandler(CancelMessage.class, msg ->
{
this.fullDataRequests.remove(msg.futureId);
RemotePlayer remotePlayer = remotePlayerConnectionHandler.getConnectedPlayer(msg);
remotePlayer.pendingFullDataRequests.decrementAndGet();
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
serverPlayerState.pendingFullDataRequests.decrementAndGet();
});
}
@@ -150,14 +148,14 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
CompleteFullDataSource completeSource = (CompleteFullDataSource) entry.fullDataSource;
for (FullDataSourceRequestMessage msg : entry.requestMessages)
{
RemotePlayer remotePlayer = remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (remotePlayer == null) continue;
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (serverPlayerState == null) continue;
// Check if cancelled
if (this.fullDataRequests.remove(msg.futureId) == null)
continue;
remotePlayer.pendingFullDataRequests.decrementAndGet();
serverPlayerState.pendingFullDataRequests.decrementAndGet();
msg.sendResponse(new FullDataSourceResponseMessage(completeSource, this));
}
}
@@ -1,32 +1,60 @@
package com.seibel.distanthorizons.core.multiplayer;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.messages.AckMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.messages.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.messages.RemotePlayerConfigMessage;
import org.apache.logging.log4j.Logger;
import java.io.Closeable;
import java.util.UUID;
public class ClientNetworkState implements Closeable
{
public final ChildNetworkEventSource<NetworkClient> eventSource;
public RemotePlayer.Payload config = new RemotePlayer.Payload();
protected static final Logger LOGGER = DhLoggerBuilder.getLogger();
public ClientNetworkState(NetworkClient networkClient)
private final ScopedNetworkEventSource<NetworkClient> eventSource;
private final UUID playerUUID;
public MultiplayerConfig config = new MultiplayerConfig();
public NetworkClient client() { return this.eventSource.parent; }
public ClientNetworkState(NetworkClient networkClient, UUID playerUUID)
{
this.eventSource = new ChildNetworkEventSource<>(networkClient);
this.eventSource = new ScopedNetworkEventSource<>(networkClient);
this.playerUUID = playerUUID;
this.registerNetworkHandlers();
this.client().startConnecting();
}
private void registerNetworkHandlers()
{
eventSource.registerHandler(RemotePlayerConfigMessage.class, msg -> {
this.config = msg.payload;
this.client().registerHandler(HelloMessage.class, helloMessage ->
{
LOGGER.info("Connected to server: "+helloMessage.getChannelContext().channel().remoteAddress());
this.client().<AckMessage>sendRequest(new PlayerUUIDMessage(playerUUID))
.thenCompose(ack -> this.client().<RemotePlayerConfigMessage>sendRequest(new RemotePlayerConfigMessage(new MultiplayerConfig()
{{
fullDataRequestRateLimit = Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit.get();
}})))
.thenAccept(msg -> {
this.config = msg.payload;
})
.exceptionally(throwable -> {
LOGGER.error("Error while fetching server's config", throwable);
return null;
});
});
}
public void close()
{
this.eventSource.close();
this.client().close();
}
}
@@ -0,0 +1,26 @@
package com.seibel.distanthorizons.core.multiplayer;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import io.netty.buffer.ByteBuf;
public class MultiplayerConfig implements INetworkObject
{
public int renderDistance;
public int fullDataRequestRateLimit;
@Override
public void encode(ByteBuf out)
{
out.writeInt(this.renderDistance);
out.writeInt(this.fullDataRequestRateLimit);
}
@Override
public void decode(ByteBuf in)
{
this.renderDistance = in.readInt();
this.fullDataRequestRateLimit = in.readInt();
}
}
@@ -1,43 +0,0 @@
package com.seibel.distanthorizons.core.multiplayer;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.atomic.AtomicInteger;
public class RemotePlayer
{
public IServerPlayerWrapper serverPlayer;
public Payload payload;
public ChannelHandlerContext channelContext;
public final AtomicInteger pendingFullDataRequests = new AtomicInteger();
public RemotePlayer(IServerPlayerWrapper serverPlayer) { this.serverPlayer = serverPlayer; }
public static class Payload implements INetworkObject
{
public int renderDistance;
public int fullDataRequestRateLimit;
@Override
public void encode(ByteBuf out)
{
out.writeInt(this.renderDistance);
out.writeInt(this.fullDataRequestRateLimit);
}
@Override
public void decode(ByteBuf in)
{
this.renderDistance = in.readInt();
this.fullDataRequestRateLimit = in.readInt();
}
}
}
@@ -2,27 +2,30 @@ package com.seibel.distanthorizons.core.multiplayer;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.messages.AckMessage;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.CloseEvent;
import com.seibel.distanthorizons.core.network.messages.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import io.netty.channel.ChannelHandlerContext;
import java.io.Closeable;
import java.util.HashMap;
import java.util.UUID;
public class RemotePlayerConnectionHandler
public class RemotePlayerConnectionHandler implements Closeable
{
public final ChildNetworkEventSource<NetworkServer> eventSource;
private final HashMap<UUID, RemotePlayer> playersByUUID = new HashMap<>();
private final BiMap<ChannelHandlerContext, RemotePlayer> playersByConnection = HashBiMap.create();
private final ScopedNetworkEventSource<NetworkServer> eventSource;
private final HashMap<UUID, ServerPlayerState> playersByUUID = new HashMap<>();
private final BiMap<ChannelHandlerContext, ServerPlayerState> playersByConnection = HashBiMap.create();
public NetworkServer server() { return this.eventSource.parent; }
public RemotePlayerConnectionHandler(NetworkServer networkServer)
{
this.eventSource = new ChildNetworkEventSource<>(networkServer);
this.eventSource = new ScopedNetworkEventSource<>(networkServer);
this.registerNetworkHandlers();
}
@@ -31,7 +34,7 @@ public class RemotePlayerConnectionHandler
this.eventSource.registerHandler(PlayerUUIDMessage.class, playerUUIDMessage ->
{
ChannelHandlerContext channelContext = playerUUIDMessage.getChannelContext();
RemotePlayer dhPlayer = this.playersByUUID.get(playerUUIDMessage.playerUUID);
ServerPlayerState dhPlayer = this.playersByUUID.get(playerUUIDMessage.playerUUID);
if (dhPlayer == null)
{
@@ -51,9 +54,9 @@ public class RemotePlayerConnectionHandler
playerUUIDMessage.sendResponse(new AckMessage());
});
this.eventSource.registerHandler(CloseMessage.class, closeMessage ->
this.eventSource.registerHandler(CloseEvent.class, closeEvent ->
{
RemotePlayer dhPlayer = this.playersByConnection.remove(closeMessage.getChannelContext());
ServerPlayerState dhPlayer = this.playersByConnection.remove(closeEvent.getChannelContext());
if (dhPlayer != null)
{
dhPlayer.channelContext = null;
@@ -61,29 +64,29 @@ public class RemotePlayerConnectionHandler
});
}
public Iterable<RemotePlayer> getConnectedPlayers()
public Iterable<ServerPlayerState> getConnectedPlayers()
{
return playersByConnection.values();
}
public RemotePlayer getConnectedPlayer(NetworkMessage msg)
public ServerPlayerState getConnectedPlayer(NetworkMessage msg)
{
return playersByConnection.get(msg.getChannelContext());
}
public RemotePlayer getPlayer(IServerPlayerWrapper serverPlayer)
public ServerPlayerState getPlayer(IServerPlayerWrapper serverPlayer)
{
return playersByUUID.get(serverPlayer.getUUID());
}
public void mcPlayerJoined(IServerPlayerWrapper serverPlayer)
public void registerJoinedPlayer(IServerPlayerWrapper serverPlayer)
{
this.playersByUUID.put(serverPlayer.getUUID(), new RemotePlayer(serverPlayer));
this.playersByUUID.put(serverPlayer.getUUID(), new ServerPlayerState(serverPlayer));
}
public void mcPlayerLeft(IServerPlayerWrapper serverPlayer)
public void unregisterLeftPlayer(IServerPlayerWrapper serverPlayer)
{
RemotePlayer dhPlayer = this.playersByUUID.remove(serverPlayer.getUUID());
ServerPlayerState dhPlayer = this.playersByUUID.remove(serverPlayer.getUUID());
ChannelHandlerContext channelContext = this.playersByConnection.inverse().remove(dhPlayer);
if (channelContext != null)
{
@@ -91,4 +94,11 @@ public class RemotePlayerConnectionHandler
}
}
@Override
public void close()
{
this.eventSource.close();
this.server().close();
}
}
@@ -0,0 +1,20 @@
package com.seibel.distanthorizons.core.multiplayer;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.atomic.AtomicInteger;
public class ServerPlayerState
{
public IServerPlayerWrapper serverPlayer;
public MultiplayerConfig config;
public ChannelHandlerContext channelContext;
public final AtomicInteger pendingFullDataRequests = new AtomicInteger();
public ServerPlayerState(IServerPlayerWrapper serverPlayer) { this.serverPlayer = serverPlayer; }
}
@@ -1,7 +1,7 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.CloseEvent;
import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
@@ -78,9 +78,9 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
this.connectionState = EConnectionState.CLOSE_WAIT;
});
this.registerHandler(CloseMessage.class, closeMessage ->
this.registerHandler(CloseEvent.class, closeEvent ->
{
LOGGER.info("Disconnected from server: "+closeMessage.getChannelContext().channel().remoteAddress());
LOGGER.info("Disconnected from server: "+ closeEvent.getChannelContext().channel().remoteAddress());
if (this.connectionState == EConnectionState.CLOSE_WAIT)
{
this.close();
@@ -2,6 +2,7 @@ package com.seibel.distanthorizons.core.network;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.Tables;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.ExceptionMessage;
@@ -10,19 +11,19 @@ import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
public abstract class NetworkEventSource
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
protected final Map<Class<? extends NetworkMessage>, Set<Consumer<NetworkMessage>>> handlers = new HashMap<>();
private final Table<ChannelHandlerContext, Long, CompletableFuture<FutureTrackableNetworkMessage>> pendingFutures = HashBasedTable.create();
protected final ConcurrentMap<Class<? extends NetworkMessage>, Set<Consumer<NetworkMessage>>> handlers = new ConcurrentHashMap<>();
private final Table<ChannelHandlerContext, Long, CompletableFuture<FutureTrackableNetworkMessage>> pendingFutures = Tables.synchronizedTable(HashBasedTable.create());
protected boolean hasHandler(Class<? extends NetworkMessage> handlerClass)
{
@@ -84,6 +85,7 @@ public abstract class NetworkEventSource
protected <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg)
{
msg.futureId |= (long) ctx.hashCode() << 32;
msg.setChannelContext(ctx);
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
responseFuture.handle((response, throwable) -> {
@@ -105,14 +107,20 @@ public abstract class NetworkEventSource
}
protected final void completeAllFuturesExceptionally(ChannelHandlerContext ctx, Throwable cause) {
for (CompletableFuture<FutureTrackableNetworkMessage> futureData : pendingFutures.row(ctx).values())
futureData.completeExceptionally(cause);
pendingFutures.row(ctx).clear();
synchronized (pendingFutures)
{
for (CompletableFuture<FutureTrackableNetworkMessage> futureData : pendingFutures.row(ctx).values())
futureData.completeExceptionally(cause);
pendingFutures.row(ctx).clear();
}
}
protected final void completeAllFuturesExceptionally(Throwable cause) {
for (ChannelHandlerContext ctx : pendingFutures.rowKeySet())
this.completeAllFuturesExceptionally(ctx, cause);
synchronized (pendingFutures)
{
for (ChannelHandlerContext ctx : pendingFutures.rowKeySet())
this.completeAllFuturesExceptionally(ctx, cause);
}
}
public void close()
@@ -1,9 +1,8 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.AckMessage;
import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.CloseEvent;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
@@ -66,12 +65,12 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
channelContext.writeAndFlush(new HelloMessage());
});
this.registerHandler(CloseMessage.class, closeMessage ->
this.registerHandler(CloseEvent.class, closeEvent ->
{
Channel channel = closeMessage.getChannelContext().channel();
Channel channel = closeEvent.getChannelContext().channel();
LOGGER.info("Client disconnected: "+channel.remoteAddress());
this.completeAllFuturesExceptionally(closeMessage.getChannelContext(), channel.closeFuture().cause());
this.completeAllFuturesExceptionally(closeEvent.getChannelContext(), channel.closeFuture().cause());
});
}
@@ -5,19 +5,15 @@ import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import java.util.function.Consumer;
/** Provides a way to register network message handlers which are expected to be removed later. */
public final class ChildNetworkEventSource<TParent extends NetworkEventSource> extends NetworkEventSource
public final class ScopedNetworkEventSource<TParent extends NetworkEventSource> extends NetworkEventSource
{
public final TParent parent;
private boolean isClosed = false;
public ChildNetworkEventSource(TParent parent)
public ScopedNetworkEventSource(TParent parent)
{
this.parent = parent;
}
public ChildNetworkEventSource(ChildNetworkEventSource<TParent> child)
{
this.parent = child.parent;
}
@Override
public <T extends NetworkMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
@@ -7,7 +7,7 @@ import io.netty.buffer.ByteBuf;
* This is not a "real" message, and only used to indicate a disconnection.
* To send a "disconnect reason" message, use {@link CloseReasonMessage}.
*/
public class CloseMessage extends NetworkMessage
public class CloseEvent extends NetworkMessage
{
@Override
public void encode(ByteBuf out) { throw new UnsupportedOperationException("CloseMessage is not a real message, and must not be sent."); }
@@ -1,23 +1,23 @@
package com.seibel.distanthorizons.core.network.messages;
import com.seibel.distanthorizons.core.multiplayer.MultiplayerConfig;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.multiplayer.RemotePlayer;
import io.netty.buffer.ByteBuf;
public class RemotePlayerConfigMessage extends FutureTrackableNetworkMessage
{
public RemotePlayer.Payload payload;
public MultiplayerConfig payload;
public RemotePlayerConfigMessage() { }
public RemotePlayerConfigMessage(RemotePlayer.Payload payload) { this.payload = payload; }
public RemotePlayerConfigMessage(MultiplayerConfig payload) { this.payload = payload; }
@Override
public void encode0(ByteBuf out) { this.payload.encode(out); }
@Override
public void decode0(ByteBuf in) { this.payload = INetworkObject.decode(new RemotePlayer.Payload(), in); }
public void decode0(ByteBuf in) { this.payload = INetworkObject.decode(new MultiplayerConfig(), in); }
}
@@ -6,6 +6,7 @@ import io.netty.buffer.ByteBuf;
public abstract class FutureTrackableNetworkMessage extends NetworkMessage
{
private static int lastId = 0;
// Only low 32 bits are sent (high bits are used for identifying a channel this request was sent from by remote peer)
public long futureId = lastId++;
public void sendResponse(FutureTrackableNetworkMessage responseMessage)
@@ -1,14 +1,13 @@
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.CloseEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ChannelHandler.Sharable
@@ -34,7 +33,7 @@ public class MessageHandler extends SimpleChannelInboundHandler<NetworkMessage>
@Override
public void channelInactive(@NotNull ChannelHandlerContext channelContext)
{
this.channelRead0(channelContext, new CloseMessage());
this.channelRead0(channelContext, new CloseEvent());
}
}
@@ -5,9 +5,8 @@ import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.structure.ClientOnlySaveStructure;
import com.seibel.distanthorizons.core.level.DhClientLevel;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.messages.*;
import com.seibel.distanthorizons.core.multiplayer.RemotePlayer;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.util.objects.EventLoop;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
@@ -26,9 +25,8 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
private final ConcurrentHashMap<IClientLevelWrapper, DhClientLevel> levels;
public final ClientOnlySaveStructure saveStructure;
@CheckForNull
private final NetworkClient networkClient;
private final ClientNetworkState networkState;
// TODO why does this executor have 2 threads?
public ExecutorService dhTickerThread = ThreadUtil.makeSingleThreadPool("DH Client World Ticker Thread", 2);
@@ -50,34 +48,16 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
if (Config.Client.Advanced.Multiplayer.enableServerNetworking.get())
{
// TODO server specific configs
this.networkClient = new NetworkClient(MC_CLIENT.getCurrentServerIp(), 25049);
this.registerNetworkHandlers();
NetworkClient networkClient = new NetworkClient(MC_CLIENT.getCurrentServerIp(), 25049);
this.networkState = new ClientNetworkState(networkClient, MC_CLIENT.getPlayerUUID());
}
else
{
this.networkClient = null;
this.networkState = null;
}
LOGGER.info("Started DhWorld of type "+this.environment);
}
private void registerNetworkHandlers() {
assert this.networkClient != null;
this.networkClient.registerHandler(HelloMessage.class, helloMessage ->
{
LOGGER.info("Connected to server: "+helloMessage.getChannelContext().channel().remoteAddress());
this.networkClient.<AckMessage>sendRequest(new PlayerUUIDMessage(MC_CLIENT.getPlayerUUID()))
.thenCompose(ack -> this.networkClient.<RemotePlayerConfigMessage>sendRequest(new RemotePlayerConfigMessage(new RemotePlayer.Payload()
{{
fullDataRequestRateLimit = Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit.get();
}})))
.exceptionally(throwable -> {
LOGGER.error("Error while fetching server's config", throwable);
return null;
});
});
}
@@ -102,7 +82,7 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
return null;
}
return new DhClientLevel(this.saveStructure, clientLevelWrapper, networkClient);
return new DhClientLevel(this.saveStructure, clientLevelWrapper, networkState);
});
}
@@ -144,8 +124,6 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
public void doWorldGen() {
this.levels.values().forEach(DhClientLevel::doWorldGen);
if (networkClient != null && networkClient.isInitialState())
networkClient.startConnecting();
}
@Override
@@ -157,9 +135,9 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
@Override
public void close()
{
if (this.networkClient != null)
if (this.networkState != null)
{
this.networkClient.close();
this.networkState.close();
}
@@ -5,7 +5,7 @@ import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.file.structure.LocalSaveStructure;
import com.seibel.distanthorizons.core.level.DhServerLevel;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.multiplayer.RemotePlayer;
import com.seibel.distanthorizons.core.multiplayer.ServerPlayerState;
import com.seibel.distanthorizons.core.multiplayer.RemotePlayerConnectionHandler;
import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.messages.RemotePlayerConfigMessage;
@@ -23,7 +23,6 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
private final HashMap<IServerLevelWrapper, DhServerLevel> levels;
public final LocalSaveStructure saveStructure;
private final NetworkServer networkServer;
private final RemotePlayerConnectionHandler remotePlayerConnectionHandler;
private final AppliedConfigState<Integer> rateLimitConfig = new AppliedConfigState<>(Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit);
@@ -37,18 +36,18 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
this.levels = new HashMap<>();
// TODO move to global payload once server specific configs are implemented
this.networkServer = new NetworkServer(25049);
this.registerNetworkHandlers();
NetworkServer networkServer = new NetworkServer(25049);
this.remotePlayerConnectionHandler = new RemotePlayerConnectionHandler(networkServer);
this.registerNetworkHandlers();
LOGGER.info("Started "+DhServerWorld.class.getSimpleName()+" of type "+this.environment);
}
private void registerNetworkHandlers()
{
this.networkServer.registerHandler(RemotePlayerConfigMessage.class, remotePlayerConfigMessage ->
this.remotePlayerConnectionHandler.server().registerHandler(RemotePlayerConfigMessage.class, remotePlayerConfigMessage ->
{
this.remotePlayerConnectionHandler.getConnectedPlayer(remotePlayerConfigMessage).payload = remotePlayerConfigMessage.payload;
this.remotePlayerConnectionHandler.getConnectedPlayer(remotePlayerConfigMessage).config = remotePlayerConfigMessage.payload;
remotePlayerConfigMessage.payload.fullDataRequestRateLimit = Math.min(rateLimitConfig.get(), remotePlayerConfigMessage.payload.fullDataRequestRateLimit);
remotePlayerConfigMessage.sendResponse(remotePlayerConfigMessage);
@@ -57,21 +56,18 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
public void addPlayer(IServerPlayerWrapper serverPlayer)
{
this.remotePlayerConnectionHandler.mcPlayerJoined(serverPlayer);
this.remotePlayerConnectionHandler.registerJoinedPlayer(serverPlayer);
this.getLevel(serverPlayer.getLevel()).addPlayer(serverPlayer);
}
public void removePlayer(IServerPlayerWrapper serverPlayer)
{
this.getLevel(serverPlayer.getLevel()).removePlayer(serverPlayer);
this.remotePlayerConnectionHandler.mcPlayerLeft(serverPlayer);
this.remotePlayerConnectionHandler.unregisterLeftPlayer(serverPlayer);
}
public void changePlayerLevel(IServerPlayerWrapper player, IServerLevelWrapper origin, IServerLevelWrapper dest)
{
this.getLevel(origin).removePlayer(player);
this.getLevel(dest).addPlayer(player);
RemotePlayer remotePlayer = this.remotePlayerConnectionHandler.getPlayer(player);
remotePlayer.channelContext.writeAndFlush(new RemotePlayerConfigMessage(remotePlayer.payload));
}
@Override
@@ -124,10 +120,10 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
if (rateLimitConfig.pollNewValue())
{
for (RemotePlayer remotePlayer : this.remotePlayerConnectionHandler.getConnectedPlayers())
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
{
remotePlayer.payload.fullDataRequestRateLimit = rateLimitConfig.get();
remotePlayer.channelContext.writeAndFlush(new RemotePlayerConfigMessage(remotePlayer.payload));
serverPlayerState.config.fullDataRequestRateLimit = rateLimitConfig.get();
serverPlayerState.channelContext.writeAndFlush(new RemotePlayerConfigMessage(serverPlayerState.config));
}
}
}
@@ -145,7 +141,7 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
@Override
public void close()
{
this.networkServer.close();
this.remotePlayerConnectionHandler.close();
for (DhServerLevel level : this.levels.values())
{