Abstract away ChannelHandlerContext from user code

This commit is contained in:
s809
2023-11-27 23:46:05 +05:00
parent f36bffa4b7
commit 166cc55e3c
13 changed files with 195 additions and 124 deletions
@@ -582,7 +582,7 @@ public class Config
.comment(""
+ "How bright LOD colors are. \n"
+ "\n"
+ "0 = black \n"f
+ "0 = black \n"
+ "1 = normal \n"
+ "2 = near white")
.build();
@@ -280,7 +280,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
if (distanceFromPlayer < serverPlayerState.serverPlayer.getViewDistance() ||
distanceFromPlayer > serverPlayerState.config.getRenderDistanceRadius()) return;
serverPlayerState.channelContext.writeAndFlush(new FullDataPartialUpdateMessage(chunkUpdateData.accessor, this));
serverPlayerState.connection.sendMessage(new FullDataPartialUpdateMessage(chunkUpdateData.accessor, this));
}
}
}
@@ -48,7 +48,7 @@ public class ClientNetworkState implements Closeable
{
this.client.registerHandler(HelloMessage.class, helloMessage ->
{
LOGGER.info("Connected to server: "+helloMessage.getChannelContext().channel().remoteAddress());
LOGGER.info("Connected to server: "+helloMessage.getConnection().getRemoteAddress());
this.getClient().sendRequest(new PlayerUUIDMessage(playerUUID), AckMessage.class)
.thenAccept(ack -> this.getClient().sendMessage(new RemotePlayerConfigMessage(new MultiplayerConfig())))
@@ -5,16 +5,16 @@ import com.google.common.collect.HashBiMap;
import com.seibel.distanthorizons.core.level.DhServerLevel;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfigChangeListener;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.IConnection;
import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.messages.base.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.messages.base.AckMessage;
import com.seibel.distanthorizons.core.network.messages.base.CloseEvent;
import com.seibel.distanthorizons.core.network.messages.base.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.messages.session.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.messages.session.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import io.netty.channel.ChannelHandlerContext;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
@@ -27,7 +27,7 @@ public class RemotePlayerConnectionHandler implements Closeable
{
private final ScopedNetworkEventSource<NetworkServer> eventSource;
private final HashMap<UUID, ServerPlayerState> playersByUUID = new HashMap<>();
private final BiMap<ChannelHandlerContext, ServerPlayerState> playersByConnection = HashBiMap.create();
private final BiMap<IConnection, ServerPlayerState> playersByConnection = HashBiMap.create();
private final MultiplayerConfigChangeListener configChangeListener = new MultiplayerConfigChangeListener(this::onConfigChanged);
@@ -43,23 +43,23 @@ public class RemotePlayerConnectionHandler implements Closeable
{
this.eventSource.registerHandler(PlayerUUIDMessage.class, playerUUIDMessage ->
{
ChannelHandlerContext channelContext = playerUUIDMessage.getChannelContext();
ServerPlayerState dhPlayer = this.playersByUUID.get(playerUUIDMessage.playerUUID);
IConnection connection = playerUUIDMessage.getConnection();
ServerPlayerState serverPlayerState = this.playersByUUID.get(playerUUIDMessage.playerUUID);
if (dhPlayer == null)
if (serverPlayerState == null)
{
this.eventSource.parent.disconnectClient(channelContext, "Player is not logged in.");
connection.disconnect("Player is not logged in.");
return;
}
if (dhPlayer.channelContext != null)
if (serverPlayerState.connection != null)
{
this.eventSource.parent.disconnectClient(channelContext, "Another connection is already in use.");
connection.disconnect("Another connection is already in use.");
return;
}
dhPlayer.channelContext = channelContext;
this.playersByConnection.put(channelContext, dhPlayer);
serverPlayerState.connection = connection;
this.playersByConnection.put(connection, serverPlayerState);
playerUUIDMessage.sendResponse(new AckMessage());
});
@@ -67,15 +67,15 @@ public class RemotePlayerConnectionHandler implements Closeable
this.eventSource.registerHandler(RemotePlayerConfigMessage.class, this.connectedPlayersOnly((remotePlayerConfigMessage, serverPlayerState) ->
{
serverPlayerState.config.clientConfig = (MultiplayerConfig) remotePlayerConfigMessage.payload;
serverPlayerState.channelContext.writeAndFlush(new RemotePlayerConfigMessage(serverPlayerState.config));
serverPlayerState.connection.sendMessage(new RemotePlayerConfigMessage(serverPlayerState.config));
}));
this.eventSource.registerHandler(CloseEvent.class, closeEvent ->
{
ServerPlayerState dhPlayer = this.playersByConnection.remove(closeEvent.getChannelContext());
ServerPlayerState dhPlayer = this.playersByConnection.remove(closeEvent.getConnection());
if (dhPlayer != null)
{
dhPlayer.channelContext = null;
dhPlayer.connection = null;
}
});
}
@@ -83,7 +83,7 @@ public class RemotePlayerConnectionHandler implements Closeable
private void onConfigChanged()
{
for (ServerPlayerState serverPlayerState : this.getConnectedPlayers())
serverPlayerState.channelContext.writeAndFlush(new RemotePlayerConfigMessage(serverPlayerState.config));
serverPlayerState.connection.sendMessage(new RemotePlayerConfigMessage(serverPlayerState.config));
}
public <T extends NetworkMessage> Consumer<T> connectedPlayersOnly(BiConsumer<T, ServerPlayerState> next)
@@ -118,16 +118,7 @@ public class RemotePlayerConnectionHandler implements Closeable
@Nullable
public ServerPlayerState getConnectedPlayer(NetworkMessage msg)
{
return playersByConnection.get(msg.getChannelContext());
}
@Nullable
public ServerPlayerState getConnectedPlayer(IServerPlayerWrapper serverPlayer)
{
ServerPlayerState player = playersByUUID.get(serverPlayer.getUUID());
if (player == null || player.channelContext == null)
return null;
return player;
return playersByConnection.get(msg.getConnection());
}
public void registerJoinedPlayer(IServerPlayerWrapper serverPlayer)
@@ -138,11 +129,9 @@ public class RemotePlayerConnectionHandler implements Closeable
public void unregisterLeftPlayer(IServerPlayerWrapper serverPlayer)
{
ServerPlayerState dhPlayer = this.playersByUUID.remove(serverPlayer.getUUID());
ChannelHandlerContext channelContext = this.playersByConnection.inverse().remove(dhPlayer);
if (channelContext != null)
{
this.eventSource.parent.disconnectClient(channelContext, "You are being disconnected.");
}
IConnection connection = this.playersByConnection.inverse().remove(dhPlayer);
if (connection != null)
connection.disconnect("You are being disconnected.");
}
@Override
@@ -1,14 +1,14 @@
package com.seibel.distanthorizons.core.multiplayer.server;
import com.seibel.distanthorizons.core.network.IConnection;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.atomic.AtomicInteger;
public class ServerPlayerState
{
public IServerPlayerWrapper serverPlayer;
public ChannelHandlerContext channelContext;
public IConnection connection;
public ServersideMultiplayerConfig config = new ServersideMultiplayerConfig();
public final AtomicInteger pendingFullDataRequests = new AtomicInteger();
@@ -0,0 +1,58 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.network.messages.base.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
public interface IConnection
{
ChannelHandlerContext getChannelContext();
NetworkEventSource getRequestHandler();
default SocketAddress getRemoteAddress()
{
return this.getChannelContext().channel().remoteAddress();
}
default CompletableFuture<Void> sendMessage(NetworkMessage message)
{
CompletableFuture<Void> future = new CompletableFuture<>();
this.getChannelContext().writeAndFlush(message).addListener(writeFuture ->
{
if (writeFuture.cause() != null)
{
future.completeExceptionally(writeFuture.cause());
}
else
{
future.complete(null);
}
});
return future;
}
default <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(FutureTrackableNetworkMessage msg, Class<TResponse> responseClass)
{
return this.getRequestHandler().sendRequest(this, msg, responseClass);
}
default void disconnect(String reason)
{
this.getChannelContext().channel().config().setAutoRead(false);
this.getChannelContext().writeAndFlush(new CloseReasonMessage(reason))
.addListener(ChannelFutureListener.CLOSE);
}
default Throwable getCloseReason()
{
return this.getChannelContext().channel().closeFuture().cause();
}
}
@@ -23,7 +23,6 @@ import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.base.CloseEvent;
import com.seibel.distanthorizons.core.network.messages.base.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.base.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
@@ -36,10 +35,9 @@ import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class NetworkClient extends NetworkEventSource implements AutoCloseable
public class NetworkClient extends NetworkEventSource implements IConnection, AutoCloseable
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
@@ -75,7 +73,13 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage, this::addNewContext)));
.handler(new NetworkChannelInitializer(new MessageHandler(
(ctx, msg) -> {
msg.setConnection(this);
this.handleMessage(msg);
},
ctx -> this.addNewConnection(this)
)));
private EConnectionState connectionState = EConnectionState.INITIAL;
private Channel channel;
@@ -99,7 +103,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
this.registerHandler(CloseEvent.class, closeEvent ->
{
LOGGER.info("Disconnected from server: "+ closeEvent.getChannelContext().channel().remoteAddress());
LOGGER.info("Disconnected from server: "+this.getRemoteAddress());
if (this.connectionState == EConnectionState.CLOSING)
{
this.close();
@@ -158,17 +162,19 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
});
}
public final void sendMessage(NetworkMessage msg)
@Override
public ChannelHandlerContext getChannelContext()
{
this.channel.writeAndFlush(msg);
return this.channel.pipeline().context(MessageHandler.class);
}
public final <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(FutureTrackableNetworkMessage msg, Class<TResponse> responseClass)
@Override
public NetworkEventSource getRequestHandler()
{
return this.sendRequest(this.channel.pipeline().context(MessageHandler.class), msg, responseClass);
return this;
}
@Override
@Override
public void close()
{
if (this.connectionState == EConnectionState.CLOSED)
@@ -178,7 +184,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
this.connectionState = EConnectionState.CLOSED;
this.channel.close().syncUninterruptibly();
this.workerGroup.shutdownGracefully().syncUninterruptibly();
this.workerGroup.shutdownGracefully();
super.close();
}
@@ -27,7 +27,6 @@ import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMe
import com.seibel.distanthorizons.core.network.protocol.MessageRegistry;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.Logger;
import java.io.InvalidClassException;
@@ -43,7 +42,7 @@ public abstract class NetworkEventSource
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
protected final ConcurrentMap<Class<? extends NetworkMessage>, Set<Consumer<NetworkMessage>>> handlers = new ConcurrentHashMap<>();
private final ConcurrentMap<ChannelHandlerContext, ConcurrentMap<Long, FutureResponseData>> pendingFutures = new ConcurrentHashMap<>();
private final ConcurrentMap<IConnection, ConcurrentMap<Long, FutureResponseData>> pendingFutures = new ConcurrentHashMap<>();
protected boolean hasHandler(Class<? extends NetworkMessage> handlerClass)
{
@@ -68,7 +67,7 @@ public abstract class NetworkEventSource
if (message instanceof FutureTrackableNetworkMessage)
{
FutureTrackableNetworkMessage trackableMessage = (FutureTrackableNetworkMessage)message;
ConcurrentMap<Long, FutureResponseData> subMap = pendingFutures.get(message.getChannelContext());
ConcurrentMap<Long, FutureResponseData> subMap = pendingFutures.get(message.getConnection());
if (subMap != null)
{
FutureResponseData responseData = subMap.get(trackableMessage.futureId);
@@ -90,9 +89,9 @@ public abstract class NetworkEventSource
LOGGER.warn("Unhandled message: " + message);
}
protected void addNewContext(ChannelHandlerContext ctx)
protected void addNewConnection(IConnection connection)
{
this.pendingFutures.put(ctx, new ConcurrentHashMap<>());
this.pendingFutures.put(connection, new ConcurrentHashMap<>());
}
public <T extends NetworkMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
@@ -115,50 +114,51 @@ public abstract class NetworkEventSource
}
protected <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg, Class<TResponse> responseClass)
protected <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(IConnection connection, FutureTrackableNetworkMessage msg, Class<TResponse> responseClass)
{
msg.setChannelContext(ctx);
msg.setConnection(connection);
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
responseFuture.handle((response, throwable) -> {
responseFuture.whenComplete((response, throwable) ->
{
if (!(throwable instanceof ChannelException))
{
ConcurrentMap<Long, FutureResponseData> subMap = pendingFutures.get(ctx);
ConcurrentMap<Long, FutureResponseData> subMap = pendingFutures.get(connection);
if (subMap != null)
subMap.remove(msg.futureId);
}
if (throwable instanceof CancellationException)
msg.sendResponse(new CancelMessage());
return null;
});
ConcurrentMap<Long, FutureResponseData> subMap = pendingFutures.get(ctx);
if (subMap == null) {
ConcurrentMap<Long, FutureResponseData> subMap = pendingFutures.get(connection);
if (subMap == null)
{
// Was deleted before adding
responseFuture.completeExceptionally(ctx.channel().closeFuture().cause());
responseFuture.completeExceptionally(connection.getCloseReason());
return responseFuture;
}
subMap.put(msg.futureId, new FutureResponseData(responseClass, responseFuture));
if (!pendingFutures.containsKey(ctx)) {
if (!pendingFutures.containsKey(connection))
{
// Was deleted while adding
responseFuture.completeExceptionally(ctx.channel().closeFuture().cause());
responseFuture.completeExceptionally(connection.getCloseReason());
return responseFuture;
}
// If passed until here, cancelling is up to the cleaning side
ctx.writeAndFlush(msg).addListener(writeFuture -> {
if (writeFuture.cause() != null) {
responseFuture.completeExceptionally(writeFuture.cause());
}
connection.sendMessage(msg).whenComplete((ignored, throwable) ->
{
if (throwable != null)
responseFuture.completeExceptionally(throwable);
});
return responseFuture;
}
protected final void completeAllFuturesExceptionally(ChannelHandlerContext ctx, Throwable cause)
protected final void completeAllFuturesExceptionally(IConnection connection, Throwable cause)
{
ConcurrentMap<Long, FutureResponseData> map = pendingFutures.remove(ctx);
ConcurrentMap<Long, FutureResponseData> map = pendingFutures.remove(connection);
if (map == null) return;
for (FutureResponseData responseData : map.values())
@@ -167,8 +167,8 @@ public abstract class NetworkEventSource
protected final void completeAllFuturesExceptionally(Throwable cause)
{
for (ChannelHandlerContext ctx : pendingFutures.keySet())
this.completeAllFuturesExceptionally(ctx, cause);
for (IConnection connection : pendingFutures.keySet())
this.completeAllFuturesExceptionally(connection, cause);
}
public void close()
@@ -19,13 +19,13 @@
package com.seibel.distanthorizons.core.network;
import com.google.common.collect.MapMaker;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.base.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.base.CloseEvent;
import com.seibel.distanthorizons.core.network.messages.base.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
@@ -35,7 +35,8 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class NetworkServer extends NetworkEventSource implements AutoCloseable
{
@@ -46,7 +47,9 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private boolean isClosed = false;
private final AtomicBoolean isClosed = new AtomicBoolean();
private final ConcurrentMap<ChannelHandlerContext, IConnection> connections = new MapMaker().weakKeys().weakValues().makeMap();
@@ -63,16 +66,16 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
{
this.registerHandler(HelloMessage.class, helloMessage ->
{
ChannelHandlerContext channelContext = helloMessage.getChannelContext();
LOGGER.info("Client connected: "+channelContext.channel().remoteAddress());
IConnection connection = helloMessage.getConnection();
LOGGER.info("Client connected: "+connection.getRemoteAddress());
if (helloMessage.version != ModInfo.PROTOCOL_VERSION)
{
try
{
String disconnectReason = "Version mismatch. Server version: ["+ModInfo.PROTOCOL_VERSION+"], client version: ["+helloMessage.version+"].";
LOGGER.info("Disconnecting the client ["+channelContext.name()+"]: "+disconnectReason);
this.disconnectClient(channelContext, disconnectReason);
LOGGER.info("Disconnecting the client ["+connection.getRemoteAddress()+"]: "+disconnectReason);
connection.disconnect(disconnectReason);
}
catch (Exception e)
{
@@ -81,15 +84,14 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
return;
}
channelContext.writeAndFlush(new HelloMessage());
connection.sendMessage(new HelloMessage());
});
this.registerHandler(CloseEvent.class, closeEvent ->
{
Channel channel = closeEvent.getChannelContext().channel();
LOGGER.info("Client disconnected: "+channel.remoteAddress());
this.completeAllFuturesExceptionally(closeEvent.getChannelContext(), channel.closeFuture().cause());
IConnection connection = closeEvent.getConnection();
LOGGER.info("Client disconnected: "+connection.getRemoteAddress());
this.completeAllFuturesExceptionally(closeEvent.getConnection(), connection.getCloseReason());
});
}
@@ -99,7 +101,13 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage, this::addNewContext)));
.childHandler(new NetworkChannelInitializer(new MessageHandler(
(ctx, msg) -> {
msg.setConnection(this.connections.computeIfAbsent(ctx, Connection::new));
this.handleMessage(msg);
},
ctx -> this.addNewConnection(this.connections.computeIfAbsent(ctx, Connection::new))
)));
ChannelFuture bindFuture = bootstrap.bind(this.port);
bindFuture.addListener((ChannelFuture channelFuture) ->
@@ -116,27 +124,11 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
channel.closeFuture().addListener(future -> this.close());
}
public void disconnectClient(ChannelHandlerContext ctx, String reason)
{
ctx.channel().config().setAutoRead(false);
ctx.writeAndFlush(new CloseReasonMessage(reason))
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg, Class<TResponse> responseClass)
{
return super.sendRequest(ctx, msg, responseClass);
}
@Override
public void close()
{
if (this.isClosed)
{
if (!this.isClosed.compareAndSet(false, true))
return;
}
this.isClosed = true;
LOGGER.info("Shutting down the network server.");
this.workerGroup.shutdownGracefully().syncUninterruptibly();
@@ -146,4 +138,26 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
super.close();
}
public class Connection implements IConnection
{
private final ChannelHandlerContext channelContext;
public Connection(ChannelHandlerContext channelContext)
{
this.channelContext = channelContext;
}
@Override
public ChannelHandlerContext getChannelContext()
{
return this.channelContext;
}
@Override
public NetworkEventSource getRequestHandler()
{
return NetworkServer.this;
}
}
}
@@ -21,10 +21,10 @@ package com.seibel.distanthorizons.core.network.protocol;
import com.google.common.collect.MapMaker;
import com.seibel.distanthorizons.core.api.internal.SharedApi;
import com.seibel.distanthorizons.core.network.IConnection;
import com.seibel.distanthorizons.core.network.messages.base.ExceptionMessage;
import com.seibel.distanthorizons.core.world.EWorldEnvironment;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -41,19 +41,19 @@ public abstract class FutureTrackableNetworkMessage extends NetworkMessage
| ((Objects.requireNonNull(SharedApi.getEnvironment()) == EWorldEnvironment.Server_Only ? 1 : 0) << 31);
private static final AtomicInteger lastContextId = new AtomicInteger();
private static final ConcurrentMap<ChannelHandlerContext, Integer> contextIds = new MapMaker().weakKeys().makeMap();
private static final ConcurrentMap<IConnection, Integer> connectionToIdMap = new MapMaker().weakKeys().makeMap();
public void sendResponse(FutureTrackableNetworkMessage responseMessage)
{
responseMessage.futureId = futureId;
getChannelContext().writeAndFlush(responseMessage);
this.getConnection().sendMessage(responseMessage);
}
@Override
public void setChannelContext(ChannelHandlerContext channelContext)
public void setConnection(IConnection connection)
{
super.setChannelContext(channelContext);
this.futureId |= (long) contextIds.computeIfAbsent(channelContext, k -> lastContextId.getAndIncrement()) << 32;
super.setConnection(connection);
this.futureId |= (long) connectionToIdMap.computeIfAbsent(connection, k -> lastContextId.getAndIncrement()) << 32;
}
public void sendResponse(Exception e)
@@ -109,8 +109,10 @@ public interface INetworkObject
map.put(entry.getKey(), entry.getValue());
}
/** Should only be used for non-editable classes;
* otherwise, you may want to implement {@link INetworkObject} and use its method where applicable. */
/**
* Should only be used for non-editable classes;
* otherwise, you may want to implement {@link INetworkObject} and use its method where applicable.
*/
class Codec
{
private static final ConcurrentMap<Class<?>, Codec> codecMap = new ConcurrentHashMap<Class<?>, Codec>()
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ChannelHandler.Sharable
@@ -34,10 +35,10 @@ public class MessageHandler extends SimpleChannelInboundHandler<NetworkMessage>
{
private static final Logger LOGGER = LogManager.getLogger();
private final Consumer<NetworkMessage> messageConsumer;
private final BiConsumer<ChannelHandlerContext, NetworkMessage> messageConsumer;
private final Consumer<ChannelHandlerContext> channelActiveConsumer;
public MessageHandler(Consumer<NetworkMessage> messageConsumer, Consumer<ChannelHandlerContext> channelActiveConsumer)
public MessageHandler(BiConsumer<ChannelHandlerContext, NetworkMessage> messageConsumer, Consumer<ChannelHandlerContext> channelActiveConsumer)
{
this.messageConsumer = messageConsumer;
this.channelActiveConsumer = channelActiveConsumer;
@@ -46,9 +47,8 @@ public class MessageHandler extends SimpleChannelInboundHandler<NetworkMessage>
@Override
protected void channelRead0(ChannelHandlerContext channelContext, NetworkMessage message)
{
message.setChannelContext(channelContext);
LOGGER.trace("Received message: " + message);
this.messageConsumer.accept(message);
this.messageConsumer.accept(channelContext, message);
}
@Override
@@ -19,33 +19,35 @@
package com.seibel.distanthorizons.core.network.protocol;
import io.netty.channel.ChannelHandlerContext;
import org.jetbrains.annotations.NotNull;
import com.seibel.distanthorizons.core.network.IConnection;
import javax.annotation.Nullable;
public abstract class NetworkMessage implements INetworkObject
{
private ChannelHandlerContext channelContext = null;
private IConnection connection = null;
public boolean warnWhenUnhandled() { return true; }
public ChannelHandlerContext getChannelContext()
public IConnection getConnection()
{
return channelContext;
return connection;
}
public void setChannelContext(ChannelHandlerContext channelContext)
public void setConnection(IConnection connection)
{
if (this.channelContext != null)
if (this.connection != null)
throw new IllegalStateException("Channel context cannot be changed after initial setting.");
this.channelContext = channelContext;
this.connection = connection;
}
@Override public String toString()
@Override
public String toString()
{
return toString("");
}
protected String toString(@NotNull String extraData)
protected String toString(@Nullable String extraData)
{
return this.getClass().getSimpleName() + "{" + extraData + '}';
}