From 166cc55e3c870a7c125fa2334255948a7c69a48c Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Mon, 27 Nov 2023 23:46:05 +0500 Subject: [PATCH] Abstract away ChannelHandlerContext from user code --- .../distanthorizons/core/config/Config.java | 2 +- .../core/level/DhServerLevel.java | 2 +- .../client/ClientNetworkState.java | 2 +- .../server/RemotePlayerConnectionHandler.java | 51 +++++-------- .../multiplayer/server/ServerPlayerState.java | 4 +- .../core/network/IConnection.java | 58 ++++++++++++++ .../core/network/NetworkClient.java | 28 ++++--- .../core/network/NetworkEventSource.java | 48 ++++++------ .../core/network/NetworkServer.java | 76 +++++++++++-------- .../FutureTrackableNetworkMessage.java | 12 +-- .../core/network/protocol/INetworkObject.java | 6 +- .../core/network/protocol/MessageHandler.java | 8 +- .../core/network/protocol/NetworkMessage.java | 22 +++--- 13 files changed, 195 insertions(+), 124 deletions(-) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java index c9fa0536c..b5711adb6 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java @@ -582,7 +582,7 @@ public class Config .comment("" + "How bright LOD colors are. \n" + "\n" - + "0 = black \n"f + + "0 = black \n" + "1 = normal \n" + "2 = near white") .build(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index b19d2fb09..bd0066c86 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -280,7 +280,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel if (distanceFromPlayer < serverPlayerState.serverPlayer.getViewDistance() || distanceFromPlayer > serverPlayerState.config.getRenderDistanceRadius()) return; - serverPlayerState.channelContext.writeAndFlush(new FullDataPartialUpdateMessage(chunkUpdateData.accessor, this)); + serverPlayerState.connection.sendMessage(new FullDataPartialUpdateMessage(chunkUpdateData.accessor, this)); } } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java index 4870863b5..9883b393a 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java @@ -48,7 +48,7 @@ public class ClientNetworkState implements Closeable { this.client.registerHandler(HelloMessage.class, helloMessage -> { - LOGGER.info("Connected to server: "+helloMessage.getChannelContext().channel().remoteAddress()); + LOGGER.info("Connected to server: "+helloMessage.getConnection().getRemoteAddress()); this.getClient().sendRequest(new PlayerUUIDMessage(playerUUID), AckMessage.class) .thenAccept(ack -> this.getClient().sendMessage(new RemotePlayerConfigMessage(new MultiplayerConfig()))) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/RemotePlayerConnectionHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/RemotePlayerConnectionHandler.java index e8828d75a..56f5870d6 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/RemotePlayerConnectionHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/RemotePlayerConnectionHandler.java @@ -5,16 +5,16 @@ import com.google.common.collect.HashBiMap; import com.seibel.distanthorizons.core.level.DhServerLevel; import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig; import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfigChangeListener; -import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource; +import com.seibel.distanthorizons.core.network.IConnection; import com.seibel.distanthorizons.core.network.NetworkServer; -import com.seibel.distanthorizons.core.network.messages.base.ILevelRelatedMessage; +import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource; import com.seibel.distanthorizons.core.network.messages.base.AckMessage; import com.seibel.distanthorizons.core.network.messages.base.CloseEvent; +import com.seibel.distanthorizons.core.network.messages.base.ILevelRelatedMessage; import com.seibel.distanthorizons.core.network.messages.session.PlayerUUIDMessage; import com.seibel.distanthorizons.core.network.messages.session.RemotePlayerConfigMessage; import com.seibel.distanthorizons.core.network.protocol.NetworkMessage; import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper; -import io.netty.channel.ChannelHandlerContext; import org.jetbrains.annotations.Nullable; import java.io.Closeable; @@ -27,7 +27,7 @@ public class RemotePlayerConnectionHandler implements Closeable { private final ScopedNetworkEventSource eventSource; private final HashMap playersByUUID = new HashMap<>(); - private final BiMap playersByConnection = HashBiMap.create(); + private final BiMap playersByConnection = HashBiMap.create(); private final MultiplayerConfigChangeListener configChangeListener = new MultiplayerConfigChangeListener(this::onConfigChanged); @@ -43,23 +43,23 @@ public class RemotePlayerConnectionHandler implements Closeable { this.eventSource.registerHandler(PlayerUUIDMessage.class, playerUUIDMessage -> { - ChannelHandlerContext channelContext = playerUUIDMessage.getChannelContext(); - ServerPlayerState dhPlayer = this.playersByUUID.get(playerUUIDMessage.playerUUID); + IConnection connection = playerUUIDMessage.getConnection(); + ServerPlayerState serverPlayerState = this.playersByUUID.get(playerUUIDMessage.playerUUID); - if (dhPlayer == null) + if (serverPlayerState == null) { - this.eventSource.parent.disconnectClient(channelContext, "Player is not logged in."); + connection.disconnect("Player is not logged in."); return; } - if (dhPlayer.channelContext != null) + if (serverPlayerState.connection != null) { - this.eventSource.parent.disconnectClient(channelContext, "Another connection is already in use."); + connection.disconnect("Another connection is already in use."); return; } - dhPlayer.channelContext = channelContext; - this.playersByConnection.put(channelContext, dhPlayer); + serverPlayerState.connection = connection; + this.playersByConnection.put(connection, serverPlayerState); playerUUIDMessage.sendResponse(new AckMessage()); }); @@ -67,15 +67,15 @@ public class RemotePlayerConnectionHandler implements Closeable this.eventSource.registerHandler(RemotePlayerConfigMessage.class, this.connectedPlayersOnly((remotePlayerConfigMessage, serverPlayerState) -> { serverPlayerState.config.clientConfig = (MultiplayerConfig) remotePlayerConfigMessage.payload; - serverPlayerState.channelContext.writeAndFlush(new RemotePlayerConfigMessage(serverPlayerState.config)); + serverPlayerState.connection.sendMessage(new RemotePlayerConfigMessage(serverPlayerState.config)); })); this.eventSource.registerHandler(CloseEvent.class, closeEvent -> { - ServerPlayerState dhPlayer = this.playersByConnection.remove(closeEvent.getChannelContext()); + ServerPlayerState dhPlayer = this.playersByConnection.remove(closeEvent.getConnection()); if (dhPlayer != null) { - dhPlayer.channelContext = null; + dhPlayer.connection = null; } }); } @@ -83,7 +83,7 @@ public class RemotePlayerConnectionHandler implements Closeable private void onConfigChanged() { for (ServerPlayerState serverPlayerState : this.getConnectedPlayers()) - serverPlayerState.channelContext.writeAndFlush(new RemotePlayerConfigMessage(serverPlayerState.config)); + serverPlayerState.connection.sendMessage(new RemotePlayerConfigMessage(serverPlayerState.config)); } public Consumer connectedPlayersOnly(BiConsumer next) @@ -118,16 +118,7 @@ public class RemotePlayerConnectionHandler implements Closeable @Nullable public ServerPlayerState getConnectedPlayer(NetworkMessage msg) { - return playersByConnection.get(msg.getChannelContext()); - } - - @Nullable - public ServerPlayerState getConnectedPlayer(IServerPlayerWrapper serverPlayer) - { - ServerPlayerState player = playersByUUID.get(serverPlayer.getUUID()); - if (player == null || player.channelContext == null) - return null; - return player; + return playersByConnection.get(msg.getConnection()); } public void registerJoinedPlayer(IServerPlayerWrapper serverPlayer) @@ -138,11 +129,9 @@ public class RemotePlayerConnectionHandler implements Closeable public void unregisterLeftPlayer(IServerPlayerWrapper serverPlayer) { ServerPlayerState dhPlayer = this.playersByUUID.remove(serverPlayer.getUUID()); - ChannelHandlerContext channelContext = this.playersByConnection.inverse().remove(dhPlayer); - if (channelContext != null) - { - this.eventSource.parent.disconnectClient(channelContext, "You are being disconnected."); - } + IConnection connection = this.playersByConnection.inverse().remove(dhPlayer); + if (connection != null) + connection.disconnect("You are being disconnected."); } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java index c2fde5ef5..3429107a2 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java @@ -1,14 +1,14 @@ package com.seibel.distanthorizons.core.multiplayer.server; +import com.seibel.distanthorizons.core.network.IConnection; import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper; -import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.atomic.AtomicInteger; public class ServerPlayerState { public IServerPlayerWrapper serverPlayer; - public ChannelHandlerContext channelContext; + public IConnection connection; public ServersideMultiplayerConfig config = new ServersideMultiplayerConfig(); public final AtomicInteger pendingFullDataRequests = new AtomicInteger(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java b/core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java new file mode 100644 index 000000000..bc94845dc --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java @@ -0,0 +1,58 @@ +package com.seibel.distanthorizons.core.network; + +import com.seibel.distanthorizons.core.network.messages.base.CloseReasonMessage; +import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; +import com.seibel.distanthorizons.core.network.protocol.NetworkMessage; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; + +import java.net.SocketAddress; +import java.util.concurrent.CompletableFuture; + +public interface IConnection +{ + ChannelHandlerContext getChannelContext(); + NetworkEventSource getRequestHandler(); + + default SocketAddress getRemoteAddress() + { + return this.getChannelContext().channel().remoteAddress(); + } + + default CompletableFuture sendMessage(NetworkMessage message) + { + CompletableFuture future = new CompletableFuture<>(); + + this.getChannelContext().writeAndFlush(message).addListener(writeFuture -> + { + if (writeFuture.cause() != null) + { + future.completeExceptionally(writeFuture.cause()); + } + else + { + future.complete(null); + } + }); + + return future; + } + + default CompletableFuture sendRequest(FutureTrackableNetworkMessage msg, Class responseClass) + { + return this.getRequestHandler().sendRequest(this, msg, responseClass); + } + + default void disconnect(String reason) + { + this.getChannelContext().channel().config().setAutoRead(false); + this.getChannelContext().writeAndFlush(new CloseReasonMessage(reason)) + .addListener(ChannelFutureListener.CLOSE); + } + + default Throwable getCloseReason() + { + return this.getChannelContext().channel().closeFuture().cause(); + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java index 5891b24bd..813ab7ca0 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java @@ -23,7 +23,6 @@ import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.network.messages.base.CloseEvent; import com.seibel.distanthorizons.core.network.messages.base.CloseReasonMessage; import com.seibel.distanthorizons.core.network.messages.base.HelloMessage; -import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; import com.seibel.distanthorizons.core.network.protocol.MessageHandler; import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer; import com.seibel.distanthorizons.core.network.protocol.NetworkMessage; @@ -36,10 +35,9 @@ import org.apache.logging.log4j.Logger; import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -public class NetworkClient extends NetworkEventSource implements AutoCloseable +public class NetworkClient extends NetworkEventSource implements IConnection, AutoCloseable { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); @@ -75,7 +73,13 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable .group(this.workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage, this::addNewContext))); + .handler(new NetworkChannelInitializer(new MessageHandler( + (ctx, msg) -> { + msg.setConnection(this); + this.handleMessage(msg); + }, + ctx -> this.addNewConnection(this) + ))); private EConnectionState connectionState = EConnectionState.INITIAL; private Channel channel; @@ -99,7 +103,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable this.registerHandler(CloseEvent.class, closeEvent -> { - LOGGER.info("Disconnected from server: "+ closeEvent.getChannelContext().channel().remoteAddress()); + LOGGER.info("Disconnected from server: "+this.getRemoteAddress()); if (this.connectionState == EConnectionState.CLOSING) { this.close(); @@ -158,17 +162,19 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable }); } - public final void sendMessage(NetworkMessage msg) + @Override + public ChannelHandlerContext getChannelContext() { - this.channel.writeAndFlush(msg); + return this.channel.pipeline().context(MessageHandler.class); } - public final CompletableFuture sendRequest(FutureTrackableNetworkMessage msg, Class responseClass) + @Override + public NetworkEventSource getRequestHandler() { - return this.sendRequest(this.channel.pipeline().context(MessageHandler.class), msg, responseClass); + return this; } - @Override + @Override public void close() { if (this.connectionState == EConnectionState.CLOSED) @@ -178,7 +184,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable this.connectionState = EConnectionState.CLOSED; this.channel.close().syncUninterruptibly(); - this.workerGroup.shutdownGracefully().syncUninterruptibly(); + this.workerGroup.shutdownGracefully(); super.close(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java index 5b53ba7ff..77d3834fd 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java @@ -27,7 +27,6 @@ import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMe import com.seibel.distanthorizons.core.network.protocol.MessageRegistry; import com.seibel.distanthorizons.core.network.protocol.NetworkMessage; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelHandlerContext; import org.apache.logging.log4j.Logger; import java.io.InvalidClassException; @@ -43,7 +42,7 @@ public abstract class NetworkEventSource { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); protected final ConcurrentMap, Set>> handlers = new ConcurrentHashMap<>(); - private final ConcurrentMap> pendingFutures = new ConcurrentHashMap<>(); + private final ConcurrentMap> pendingFutures = new ConcurrentHashMap<>(); protected boolean hasHandler(Class handlerClass) { @@ -68,7 +67,7 @@ public abstract class NetworkEventSource if (message instanceof FutureTrackableNetworkMessage) { FutureTrackableNetworkMessage trackableMessage = (FutureTrackableNetworkMessage)message; - ConcurrentMap subMap = pendingFutures.get(message.getChannelContext()); + ConcurrentMap subMap = pendingFutures.get(message.getConnection()); if (subMap != null) { FutureResponseData responseData = subMap.get(trackableMessage.futureId); @@ -90,9 +89,9 @@ public abstract class NetworkEventSource LOGGER.warn("Unhandled message: " + message); } - protected void addNewContext(ChannelHandlerContext ctx) + protected void addNewConnection(IConnection connection) { - this.pendingFutures.put(ctx, new ConcurrentHashMap<>()); + this.pendingFutures.put(connection, new ConcurrentHashMap<>()); } public void registerHandler(Class handlerClass, Consumer handlerImplementation) @@ -115,50 +114,51 @@ public abstract class NetworkEventSource } - protected CompletableFuture sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg, Class responseClass) + protected CompletableFuture sendRequest(IConnection connection, FutureTrackableNetworkMessage msg, Class responseClass) { - msg.setChannelContext(ctx); + msg.setConnection(connection); CompletableFuture responseFuture = new CompletableFuture<>(); - responseFuture.handle((response, throwable) -> { + responseFuture.whenComplete((response, throwable) -> + { if (!(throwable instanceof ChannelException)) { - ConcurrentMap subMap = pendingFutures.get(ctx); + ConcurrentMap subMap = pendingFutures.get(connection); if (subMap != null) subMap.remove(msg.futureId); } if (throwable instanceof CancellationException) msg.sendResponse(new CancelMessage()); - - return null; }); - ConcurrentMap subMap = pendingFutures.get(ctx); - if (subMap == null) { + ConcurrentMap subMap = pendingFutures.get(connection); + if (subMap == null) + { // Was deleted before adding - responseFuture.completeExceptionally(ctx.channel().closeFuture().cause()); + responseFuture.completeExceptionally(connection.getCloseReason()); return responseFuture; } subMap.put(msg.futureId, new FutureResponseData(responseClass, responseFuture)); - if (!pendingFutures.containsKey(ctx)) { + if (!pendingFutures.containsKey(connection)) + { // Was deleted while adding - responseFuture.completeExceptionally(ctx.channel().closeFuture().cause()); + responseFuture.completeExceptionally(connection.getCloseReason()); return responseFuture; } // If passed until here, cancelling is up to the cleaning side - ctx.writeAndFlush(msg).addListener(writeFuture -> { - if (writeFuture.cause() != null) { - responseFuture.completeExceptionally(writeFuture.cause()); - } + connection.sendMessage(msg).whenComplete((ignored, throwable) -> + { + if (throwable != null) + responseFuture.completeExceptionally(throwable); }); return responseFuture; } - protected final void completeAllFuturesExceptionally(ChannelHandlerContext ctx, Throwable cause) + protected final void completeAllFuturesExceptionally(IConnection connection, Throwable cause) { - ConcurrentMap map = pendingFutures.remove(ctx); + ConcurrentMap map = pendingFutures.remove(connection); if (map == null) return; for (FutureResponseData responseData : map.values()) @@ -167,8 +167,8 @@ public abstract class NetworkEventSource protected final void completeAllFuturesExceptionally(Throwable cause) { - for (ChannelHandlerContext ctx : pendingFutures.keySet()) - this.completeAllFuturesExceptionally(ctx, cause); + for (IConnection connection : pendingFutures.keySet()) + this.completeAllFuturesExceptionally(connection, cause); } public void close() diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java index 003b192eb..ab24e7516 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java @@ -19,13 +19,13 @@ package com.seibel.distanthorizons.core.network; +import com.google.common.collect.MapMaker; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; -import com.seibel.distanthorizons.core.network.messages.base.CloseReasonMessage; import com.seibel.distanthorizons.core.network.messages.base.CloseEvent; import com.seibel.distanthorizons.core.network.messages.base.HelloMessage; -import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; import com.seibel.distanthorizons.core.network.protocol.MessageHandler; import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer; +import com.seibel.distanthorizons.core.network.protocol.NetworkMessage; import com.seibel.distanthorizons.coreapi.ModInfo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; @@ -35,7 +35,8 @@ import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.apache.logging.log4j.Logger; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; public class NetworkServer extends NetworkEventSource implements AutoCloseable { @@ -46,7 +47,9 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); - private boolean isClosed = false; + private final AtomicBoolean isClosed = new AtomicBoolean(); + + private final ConcurrentMap connections = new MapMaker().weakKeys().weakValues().makeMap(); @@ -63,16 +66,16 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable { this.registerHandler(HelloMessage.class, helloMessage -> { - ChannelHandlerContext channelContext = helloMessage.getChannelContext(); - LOGGER.info("Client connected: "+channelContext.channel().remoteAddress()); + IConnection connection = helloMessage.getConnection(); + LOGGER.info("Client connected: "+connection.getRemoteAddress()); if (helloMessage.version != ModInfo.PROTOCOL_VERSION) { try { String disconnectReason = "Version mismatch. Server version: ["+ModInfo.PROTOCOL_VERSION+"], client version: ["+helloMessage.version+"]."; - LOGGER.info("Disconnecting the client ["+channelContext.name()+"]: "+disconnectReason); - this.disconnectClient(channelContext, disconnectReason); + LOGGER.info("Disconnecting the client ["+connection.getRemoteAddress()+"]: "+disconnectReason); + connection.disconnect(disconnectReason); } catch (Exception e) { @@ -81,15 +84,14 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable return; } - channelContext.writeAndFlush(new HelloMessage()); + connection.sendMessage(new HelloMessage()); }); this.registerHandler(CloseEvent.class, closeEvent -> { - Channel channel = closeEvent.getChannelContext().channel(); - LOGGER.info("Client disconnected: "+channel.remoteAddress()); - - this.completeAllFuturesExceptionally(closeEvent.getChannelContext(), channel.closeFuture().cause()); + IConnection connection = closeEvent.getConnection(); + LOGGER.info("Client disconnected: "+connection.getRemoteAddress()); + this.completeAllFuturesExceptionally(closeEvent.getConnection(), connection.getCloseReason()); }); } @@ -99,7 +101,13 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable .group(this.bossGroup, this.workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) - .childHandler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage, this::addNewContext))); + .childHandler(new NetworkChannelInitializer(new MessageHandler( + (ctx, msg) -> { + msg.setConnection(this.connections.computeIfAbsent(ctx, Connection::new)); + this.handleMessage(msg); + }, + ctx -> this.addNewConnection(this.connections.computeIfAbsent(ctx, Connection::new)) + ))); ChannelFuture bindFuture = bootstrap.bind(this.port); bindFuture.addListener((ChannelFuture channelFuture) -> @@ -116,27 +124,11 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable channel.closeFuture().addListener(future -> this.close()); } - public void disconnectClient(ChannelHandlerContext ctx, String reason) - { - ctx.channel().config().setAutoRead(false); - ctx.writeAndFlush(new CloseReasonMessage(reason)) - .addListener(ChannelFutureListener.CLOSE); - } - - @Override - public CompletableFuture sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg, Class responseClass) - { - return super.sendRequest(ctx, msg, responseClass); - } - @Override public void close() { - if (this.isClosed) - { + if (!this.isClosed.compareAndSet(false, true)) return; - } - this.isClosed = true; LOGGER.info("Shutting down the network server."); this.workerGroup.shutdownGracefully().syncUninterruptibly(); @@ -146,4 +138,26 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable super.close(); } + public class Connection implements IConnection + { + private final ChannelHandlerContext channelContext; + + public Connection(ChannelHandlerContext channelContext) + { + this.channelContext = channelContext; + } + + @Override + public ChannelHandlerContext getChannelContext() + { + return this.channelContext; + } + + @Override + public NetworkEventSource getRequestHandler() + { + return NetworkServer.this; + } + + } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java index a6e437e2d..ce267f67e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java @@ -21,10 +21,10 @@ package com.seibel.distanthorizons.core.network.protocol; import com.google.common.collect.MapMaker; import com.seibel.distanthorizons.core.api.internal.SharedApi; +import com.seibel.distanthorizons.core.network.IConnection; import com.seibel.distanthorizons.core.network.messages.base.ExceptionMessage; import com.seibel.distanthorizons.core.world.EWorldEnvironment; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; import javax.annotation.Nullable; import java.util.Objects; @@ -41,19 +41,19 @@ public abstract class FutureTrackableNetworkMessage extends NetworkMessage | ((Objects.requireNonNull(SharedApi.getEnvironment()) == EWorldEnvironment.Server_Only ? 1 : 0) << 31); private static final AtomicInteger lastContextId = new AtomicInteger(); - private static final ConcurrentMap contextIds = new MapMaker().weakKeys().makeMap(); + private static final ConcurrentMap connectionToIdMap = new MapMaker().weakKeys().makeMap(); public void sendResponse(FutureTrackableNetworkMessage responseMessage) { responseMessage.futureId = futureId; - getChannelContext().writeAndFlush(responseMessage); + this.getConnection().sendMessage(responseMessage); } @Override - public void setChannelContext(ChannelHandlerContext channelContext) + public void setConnection(IConnection connection) { - super.setChannelContext(channelContext); - this.futureId |= (long) contextIds.computeIfAbsent(channelContext, k -> lastContextId.getAndIncrement()) << 32; + super.setConnection(connection); + this.futureId |= (long) connectionToIdMap.computeIfAbsent(connection, k -> lastContextId.getAndIncrement()) << 32; } public void sendResponse(Exception e) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java index eb4e2e38f..63a2cc476 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java @@ -109,8 +109,10 @@ public interface INetworkObject map.put(entry.getKey(), entry.getValue()); } - /** Should only be used for non-editable classes; - * otherwise, you may want to implement {@link INetworkObject} and use its method where applicable. */ + /** + * Should only be used for non-editable classes; + * otherwise, you may want to implement {@link INetworkObject} and use its method where applicable. + */ class Codec { private static final ConcurrentMap, Codec> codecMap = new ConcurrentHashMap, Codec>() diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java index 3967b86b6..55122118d 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; +import java.util.function.BiConsumer; import java.util.function.Consumer; @ChannelHandler.Sharable @@ -34,10 +35,10 @@ public class MessageHandler extends SimpleChannelInboundHandler { private static final Logger LOGGER = LogManager.getLogger(); - private final Consumer messageConsumer; + private final BiConsumer messageConsumer; private final Consumer channelActiveConsumer; - public MessageHandler(Consumer messageConsumer, Consumer channelActiveConsumer) + public MessageHandler(BiConsumer messageConsumer, Consumer channelActiveConsumer) { this.messageConsumer = messageConsumer; this.channelActiveConsumer = channelActiveConsumer; @@ -46,9 +47,8 @@ public class MessageHandler extends SimpleChannelInboundHandler @Override protected void channelRead0(ChannelHandlerContext channelContext, NetworkMessage message) { - message.setChannelContext(channelContext); LOGGER.trace("Received message: " + message); - this.messageConsumer.accept(message); + this.messageConsumer.accept(channelContext, message); } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/NetworkMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/NetworkMessage.java index 4c5fb44c3..0c51455ee 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/NetworkMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/NetworkMessage.java @@ -19,33 +19,35 @@ package com.seibel.distanthorizons.core.network.protocol; -import io.netty.channel.ChannelHandlerContext; -import org.jetbrains.annotations.NotNull; +import com.seibel.distanthorizons.core.network.IConnection; + +import javax.annotation.Nullable; public abstract class NetworkMessage implements INetworkObject { - private ChannelHandlerContext channelContext = null; + private IConnection connection = null; public boolean warnWhenUnhandled() { return true; } - public ChannelHandlerContext getChannelContext() + public IConnection getConnection() { - return channelContext; + return connection; } - public void setChannelContext(ChannelHandlerContext channelContext) + public void setConnection(IConnection connection) { - if (this.channelContext != null) + if (this.connection != null) throw new IllegalStateException("Channel context cannot be changed after initial setting."); - this.channelContext = channelContext; + this.connection = connection; } - @Override public String toString() + @Override + public String toString() { return toString(""); } - protected String toString(@NotNull String extraData) + protected String toString(@Nullable String extraData) { return this.getClass().getSimpleName() + "{" + extraData + '}'; }