[skip ci] Incomplete

This commit is contained in:
s809
2024-05-21 22:54:57 +05:00
parent 40d019d7e8
commit 44205664b5
50 changed files with 535 additions and 1768 deletions
@@ -191,7 +191,7 @@ public class DhApi
* Returns the network protocol version.
* @since API 1.0.0
*/
public static int getNetworkProtocolVersion() { return ModInfo.PROTOCOL_VERSION; }
public static int getNetworkProtocolVersion() { return ModInfo.OLD_PROTOCOL_VERSION; }
// methods //
@@ -202,4 +202,4 @@ public class DhApi
*/
public static boolean isDhThread() { return Thread.currentThread().getName().startsWith(ModInfo.THREAD_NAME_PREFIX); }
}
}
@@ -32,10 +32,7 @@ public final class ModInfo
// region Protocol versions
// Incremented every time any packets are added, changed or removed, with a few exceptions.
/** Netty protocol version. */
public static final int PROTOCOL_VERSION = 3;
/** Plugin channel protocol version. */
public static final int PLUGIN_PROTOCOL_VERSION = 1;
public static final int PROTOCOL_VERSION = 1;
public static final String PLUGIN_CHANNEL_PATH = "plugin_channel";
public static final String WRAPPER_PACKET_PATH = "wrapper";
// endregion
@@ -8,9 +8,9 @@ import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginCloseEvent;
import com.seibel.distanthorizons.core.network.messages.plugin.CurrentLevelKeyMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginHelloMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.HelloMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.ServerConnectInfoMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelHandler;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelSession;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapper;
import io.netty.buffer.ByteBuf;
@@ -26,7 +26,7 @@ public class ClientPluginChannelApi implements AutoCloseable
private static final IMinecraftClientWrapper MC = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
private static final IKeyedClientLevelManager KEYED_CLIENT_LEVEL_MANAGER = SingletonInjector.INSTANCE.get(IKeyedClientLevelManager.class);
private final PluginChannelHandler channelHandler = new PluginChannelHandler();
private final PluginChannelSession channelHandler = new PluginChannelSession();
private final Consumer<IClientLevelWrapper> levelUnloadHandler;
private final Consumer<IServerKeyedClientLevel> multiverseLevelLoadHandler;
@@ -55,7 +55,7 @@ public class ClientPluginChannelApi implements AutoCloseable
public void onJoin(@Nullable ClientNetworkState networkState)
{
this.networkState = networkState;
this.channelHandler.sendMessageClient(new PluginHelloMessage());
this.channelHandler.sendMessageClient(new HelloMessage());
}
private void onCurrentLevelKeyMessage(CurrentLevelKeyMessage msg)
@@ -84,7 +84,7 @@ public class ClientPluginChannelApi implements AutoCloseable
{
if (this.networkState != null)
{
this.networkState.getClient().resetAndConnectTo(
this.networkState.getSession().resetAndConnectTo(
msg.ipOverride != null
? msg.ipOverride
: MC.getCurrentServerIp().split(":")[0],
@@ -104,7 +104,7 @@ public class ClientPluginChannelApi implements AutoCloseable
public void handlePacket(ByteBuf buffer)
{
this.channelHandler.decodeAndHandle(buffer, null);
this.channelHandler.decodeAndHandle(buffer);
}
@Override
@@ -886,48 +886,6 @@ public class Config
+ "")
.build();
/**
* Server port. <br>
* Do not use this to retrieve the current server's port;
* Instead, use {@link com.seibel.distanthorizons.core.network.netty.NettyServer#port NettyServer#port}.
*/
public static ConfigEntry<Integer> serverPort = new ConfigEntry.Builder<Integer>()
.setServersideShortName("serverPort")
.setAppearance(EConfigEntryAppearance.ONLY_IN_FILE)
.setMinDefaultMax(1, 25049, 65535)
.comment(""
+ "The port used by the server to transceive Distant Horizons data.\n"
+ "Note: This port must be TCP."
+ "")
.build();
public static ConfigEntry<String> connectIpOverride = new ConfigEntry.Builder<String>()
.setServersideShortName("connectIpOverride")
.setAppearance(EConfigEntryAppearance.ONLY_IN_FILE)
.set("")
.comment(""
+ "Overrides the IP address sent to the client to transceive Distant Horizons data.\n"
+ "Leave this field empty to let the client use the IP used to connect to the server."
+ "")
.build();
public static ConfigEntry<Integer> connectPortOverride = new ConfigEntry.Builder<Integer>()
.setServersideShortName("connectPortOverride")
.setAppearance(EConfigEntryAppearance.ONLY_IN_FILE)
.setMinDefaultMax(0, 0, 65535)
.comment(""
+ "Overrides the port sent to the client to transceive Distant Horizons data.\n"
+ "Set this field to 0 to use the value assigned to serverPort."
+ "")
.build();
public static ConfigEntry<Boolean> enableConnectOverridesInLan = new ConfigEntry.Builder<Boolean>()
.setServersideShortName("enableConnectOverridesInLan")
.setAppearance(EConfigEntryAppearance.ONLY_IN_FILE)
.set(false)
.comment(""
+ "Controls whether the connect overrides are sent to the clients originating from LAN. \n"
+ "Unless you have a good reason to do otherwise, leave this disabled. \n"
+ "")
.build();
public static ConfigEntry<Integer> rateLimitHitTolerance = new ConfigEntry.Builder<Integer>()
.setServersideShortName("rateLimitHitTolerance")
@@ -9,8 +9,8 @@ import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.multiplayer.client.AbstractFullDataRequestQueue;
import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.generation.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.generation.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
@@ -92,7 +92,7 @@ public class WorldRemoteGenerationQueue extends AbstractFullDataRequestQueue imp
return;
};
CompletableFuture<GenTaskPriorityResponseMessage> request = this.networkState.getClient().sendRequest(new GenTaskPriorityRequestMessage(posList, this.level), GenTaskPriorityResponseMessage.class);
CompletableFuture<GenTaskPriorityResponseMessage> request = this.networkState.getSession().sendRequest(new GenTaskPriorityRequestMessage(posList, this.level), GenTaskPriorityResponseMessage.class);
this.genTaskPriorityRequest = request;
request.handleAsync((response, throwable) -> {
try
@@ -33,7 +33,7 @@ import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState;
import com.seibel.distanthorizons.core.multiplayer.client.FullDataRefreshQueue;
import com.seibel.distanthorizons.core.network.netty.NettyClient;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
@@ -102,7 +102,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
this.networkState = networkState;
if (networkState != null)
{
this.eventSource = new ScopedNetworkEventSource<>(networkState.getClient());
this.eventSource = new ScopedNetworkEventSource<>(networkState.getSession());
this.dataRefreshQueue = new FullDataRefreshQueue(this, networkState);
this.registerNetworkHandlers();
}
@@ -177,7 +177,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
boolean isClientUsable = false, isAllowedDimension = false;
if (networkState != null)
{
isClientUsable = !networkState.getClient().isClosed();
isClientUsable = !networkState.getSession().isClosed();
isAllowedDimension = MC_CLIENT.getWrappedClientLevel() == this.levelWrapper || networkState.config.generateMultipleDimensions;
}
@@ -28,12 +28,12 @@ import com.seibel.distanthorizons.core.multiplayer.server.RemotePlayerConnection
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.netty.NettyServer;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.messages.netty.base.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.generation.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.generation.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
@@ -11,8 +11,8 @@ import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
@@ -181,7 +181,7 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
DhSectionPos sectionPos = mapEntry.getKey();
RequestQueueEntry entry = mapEntry.getValue();
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.getClient().sendRequest(new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, entry.updateTimestamp), FullDataSourceResponseMessage.class);
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.getSession().sendRequest(new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, entry.updateTimestamp), FullDataSourceResponseMessage.class);
entry.request = request;
request.handleAsync((response, throwable) ->
{
@@ -8,11 +8,12 @@ import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfigChangeListener;
import com.seibel.distanthorizons.core.network.netty.NettyClient;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.messages.netty.base.AckMessage;
import com.seibel.distanthorizons.core.network.messages.netty.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.messages.netty.base.HelloMessage;
import com.seibel.distanthorizons.core.network.messages.netty.session.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.messages.netty.session.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.AckMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.messages.plugin.base.HelloMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.session.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.session.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelSession;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import org.apache.logging.log4j.LogManager;
@@ -26,7 +27,7 @@ public class ClientNetworkState implements Closeable
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
private final NettyClient client = new NettyClient();
private final PluginChannelSession session;
private final UUID playerUUID = MC_CLIENT.getPlayerUUID();
@@ -41,33 +42,35 @@ public class ClientNetworkState implements Closeable
* Returns the client used by this instance. <p>
* If you need to subscribe to any packet events, create an instance of {@link ScopedNetworkEventSource} using the returned instance.
*/
public NettyClient getClient() { return this.client; }
public PluginChannelSession getSession() { return this.session; }
/**
* Constructs a new instance.
*/
public ClientNetworkState()
public ClientNetworkState(PluginChannelSession session)
{
this.client.registerHandler(HelloMessage.class, helloMessage ->
this.session = session;
this.session.registerHandler(HelloMessage.class, helloMessage ->
{
LOGGER.info("Connected to server: "+helloMessage.getConnection().getRemoteAddress());
this.getClient().sendRequest(new PlayerUUIDMessage(this.playerUUID), AckMessage.class)
.thenAccept(ack -> this.getClient().sendMessage(new RemotePlayerConfigMessage(new MultiplayerConfig())))
this.getSession().sendRequest(new PlayerUUIDMessage(this.playerUUID), AckMessage.class)
.thenAccept(ack -> this.getSession().sendMessage(new RemotePlayerConfigMessage(new MultiplayerConfig())))
.exceptionally(throwable -> {
LOGGER.error("Error while fetching server's config", throwable);
return null;
});
});
this.client.registerHandler(RemotePlayerConfigMessage.class, msg ->
this.session.registerHandler(RemotePlayerConfigMessage.class, msg ->
{
LOGGER.info("Connection config has been changed: " + msg.payload);
this.config = (MultiplayerConfig) msg.payload;
this.configReceived = true;
});
this.client.registerHandler(NettyCloseEvent.class, msg ->
this.session.registerHandler(NettyCloseEvent.class, msg ->
{
this.configReceived = false;
});
@@ -76,29 +79,29 @@ public class ClientNetworkState implements Closeable
private void onConfigChanged()
{
this.configReceived = false;
this.getClient().sendMessage(new RemotePlayerConfigMessage(new MultiplayerConfig()));
this.getSession().sendMessage(new RemotePlayerConfigMessage(new MultiplayerConfig()));
}
private String[] f3Log()
{
if (!this.client.isInitialized())
if (!this.session.isInitialized())
{
return new String[]{"Did not receive connection info yet..."};
}
if (!this.client.isClosed())
if (!this.session.isClosed())
{
return new String[]{
this.client.getRemoteAddress() != null
this.session.getRemoteAddress() != null
? (this.isReady() ? "Connected to server" : "Connecting to server...")
: MessageFormat.format("Disconnected, attempts left: {0} / {1}", this.client.getReconnectionAttemptsLeft(), NettyClient.RECONNECTION_ATTEMPTS)
: MessageFormat.format("Disconnected, attempts left: {0} / {1}", this.session.getReconnectionAttemptsLeft(), NettyClient.RECONNECTION_ATTEMPTS)
};
}
else
{
return new String[]{
this.client.getCloseReason() != null
? "Disconnected: " + this.client.getCloseReason().getMessage()
this.session.getCloseReason() != null
? "Disconnected: " + this.session.getCloseReason().getMessage()
: "Disconnected (check logs for more information)"
};
}
@@ -109,6 +112,6 @@ public class ClientNetworkState implements Closeable
{
this.f3Message.close();
this.configChangeListener.close();
this.client.close();
this.session.close();
}
}
}
@@ -1,142 +1,43 @@
package com.seibel.distanthorizons.core.multiplayer.server;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.level.DhServerLevel;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfigChangeListener;
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
import com.seibel.distanthorizons.core.network.messages.netty.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.messages.netty.base.AckMessage;
import com.seibel.distanthorizons.core.network.messages.netty.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.messages.netty.session.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.messages.netty.session.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.network.netty.INettyConnection;
import com.seibel.distanthorizons.core.network.messages.plugin.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import com.seibel.distanthorizons.core.network.netty.NettyServer;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableNettyMessage;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import io.netty.buffer.ByteBuf;
import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static com.seibel.distanthorizons.core.config.Config.Client.Advanced.Multiplayer.ServerNetworking;
public class RemotePlayerConnectionHandler implements Closeable
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
private static final ConfigEntry<Boolean> GENERATE_MULTIPLE_DIMENSIONS_CONFIG = Config.Client.Advanced.Multiplayer.ServerNetworking.generateMultipleDimensions;
private final NettyServer server = new NettyServer(ServerNetworking.serverPort.get());
private final ConcurrentHashMap<UUID, ServerPlayerState> playersByUUID = new ConcurrentHashMap<>();
private final BiMap<INettyConnection, ServerPlayerState> playersByConnection = Maps.synchronizedBiMap(HashBiMap.create());
private final MultiplayerConfigChangeListener configChangeListener = new MultiplayerConfigChangeListener(this::onConfigChanged);
private final ConcurrentMap<IServerPlayerWrapper, ServerPlayerState> connectedPlayers = new ConcurrentHashMap<>();
private final ConfigChangeListener<Integer> portChangeListener = new ConfigChangeListener<>(ServerNetworking.serverPort, this::onServerPortChanged);
private final ConfigChangeListener<String> connectIpOverrideChangeListener = new ConfigChangeListener<>(ServerNetworking.connectIpOverride, this::onConnectOverridesChanged);
private final ConfigChangeListener<Integer> connectPortOverrideChangeListener = new ConfigChangeListener<>(ServerNetworking.connectPortOverride, this::onConnectOverridesChanged);
public NettyServer server() { return this.server; }
public RemotePlayerConnectionHandler()
{
this.server.registerHandler(PlayerUUIDMessage.class, playerUUIDMessage ->
{
INettyConnection connection = playerUUIDMessage.getConnection();
ServerPlayerState serverPlayerState = this.playersByUUID.get(playerUUIDMessage.playerUUID);
if (serverPlayerState == null)
{
connection.disconnect("Player is not logged in.");
return;
}
if (serverPlayerState.connection != null)
{
connection.disconnect("Another connection is already in use.");
return;
}
serverPlayerState.connection = connection;
this.playersByConnection.put(connection, serverPlayerState);
playerUUIDMessage.sendResponse(new AckMessage());
});
this.server.registerHandler(RemotePlayerConfigMessage.class, this.connectedPlayersOnly((remotePlayerConfigMessage, serverPlayerState) ->
{
serverPlayerState.config.clientConfig = (MultiplayerConfig) remotePlayerConfigMessage.payload;
serverPlayerState.connection.sendMessage(new RemotePlayerConfigMessage(serverPlayerState.config));
}));
this.server.registerHandler(NettyCloseEvent.class, closeEvent ->
{
ServerPlayerState dhPlayer = this.playersByConnection.remove(closeEvent.getConnection());
if (dhPlayer != null)
{
dhPlayer.clearRateLimiterSets();
dhPlayer.connection = null;
}
});
}
public void handlePluginMessage(IServerPlayerWrapper player, ByteBuf buffer)
{
this.playersByUUID.get(player.getUUID()).pluginChannelHandler.decodeAndHandle(buffer, player);
this.connectedPlayers.get(player).connection.decodeAndHandle(buffer);
}
private void onConfigChanged()
{
for (ServerPlayerState serverPlayerState : this.getConnectedPlayers())
{
serverPlayerState.connection.sendMessage(new RemotePlayerConfigMessage(serverPlayerState.config));
}
}
private void onServerPortChanged(int ignored)
{
LOGGER.warn("Server port change requires a server restart to take effect.");
}
private void onConnectOverridesChanged(@Nullable Object ignored)
{
for (ServerPlayerState playerState : this.playersByUUID.values())
{
playerState.sendConnectInfo();
}
}
public <T extends NettyMessage> Consumer<T> connectedPlayersOnly(BiConsumer<T, ServerPlayerState> next)
{
return msg ->
{
ServerPlayerState serverPlayerState = this.getConnectedPlayer(msg);
if (serverPlayerState != null)
{
next.accept(msg, serverPlayerState);
}
};
}
public <T extends NettyMessage> Consumer<T> currentLevelOnly(DhServerLevel level, BiConsumer<T, ServerPlayerState> next)
{
return this.connectedPlayersOnly((msg, serverPlayerState) ->
return (msg) ->
{
LodUtil.assertTrue(msg instanceof ILevelRelatedMessage, "Received message does not implement " + ILevelRelatedMessage.class.getSimpleName() + ": " + msg.getClass().getSimpleName());
@@ -164,49 +65,27 @@ public class RemotePlayerConnectionHandler implements Closeable
}
next.accept(msg, serverPlayerState);
});
}
public Iterable<ServerPlayerState> getConnectedPlayers()
{
synchronized (this.playersByConnection)
{
return new ArrayList<>(this.playersByConnection.values());
}
}
@Nullable
public ServerPlayerState getConnectedPlayer(NettyMessage msg)
{
return this.playersByConnection.get(msg.getConnection());
};
}
public void registerJoinedPlayer(IServerPlayerWrapper serverPlayer)
{
this.playersByUUID.put(serverPlayer.getUUID(), new ServerPlayerState(serverPlayer, this.server.port));
this.connectedPlayers.put(serverPlayer, new ServerPlayerState(serverPlayer));
}
public void unregisterLeftPlayer(IServerPlayerWrapper serverPlayer)
{
ServerPlayerState playerState = this.playersByUUID.remove(serverPlayer.getUUID());
playerState.close();
INettyConnection connection = this.playersByConnection.inverse().remove(playerState);
if (connection != null)
ServerPlayerState playerState = this.connectedPlayers.remove(serverPlayer);
if (playerState != null)
{
connection.disconnect("You have logged out.");
playerState.close();
}
}
@Override
public void close()
{
this.portChangeListener.close();
this.connectIpOverrideChangeListener.close();
this.connectPortOverrideChangeListener.close();
this.configChangeListener.close();
this.server().close();
}
}
@@ -3,22 +3,21 @@ package com.seibel.distanthorizons.core.multiplayer.server;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.level.DhServerLevel;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfigChangeListener;
import com.seibel.distanthorizons.core.network.messages.plugin.session.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginCloseEvent;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginHelloMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.ServerConnectInfoMessage;
import com.seibel.distanthorizons.core.network.netty.INettyConnection;
import com.seibel.distanthorizons.core.network.messages.plugin.base.HelloMessage;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelHandler;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelSession;
import com.seibel.distanthorizons.core.util.ratelimiting.SupplierBasedRateAndConcurrencyLimiter;
import com.seibel.distanthorizons.core.util.ratelimiting.SupplierBasedRateLimiter;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.NotNull;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import static com.seibel.distanthorizons.core.config.Config.Client.Advanced.Multiplayer.ServerNetworking;
@@ -29,19 +28,12 @@ public class ServerPlayerState
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
public final IServerPlayerWrapper serverPlayer;
public INettyConnection connection;
private final int serverPort;
public final PluginChannelHandler pluginChannelHandler = new PluginChannelHandler();
public final PluginChannelSession connection = new PluginChannelSession();
private final MultiplayerConfigChangeListener configChangeListener = new MultiplayerConfigChangeListener(this::onConfigChanged);
@NotNull
public ConstrainedMultiplayerConfig config = new ConstrainedMultiplayerConfig();
public final SupplierBasedRateLimiter<Void> rateLimitKickTrigger = new SupplierBasedRateLimiter<>(
() -> ServerNetworking.rateLimitHitTolerance.get(),
ignored -> this.connection.disconnect("You have been repeatedly exceeding rate/concurrency limits.")
);
private final ConcurrentHashMap<DhServerLevel, RateLimiterSet> rateLimiterSets = new ConcurrentHashMap<>();
public RateLimiterSet getRateLimiterSet(DhServerLevel level)
{
@@ -52,37 +44,38 @@ public class ServerPlayerState
this.rateLimiterSets.clear();
}
public ServerPlayerState(IServerPlayerWrapper serverPlayer, int serverPort)
public ServerPlayerState(IServerPlayerWrapper serverPlayer)
{
this.serverPlayer = serverPlayer;
this.serverPort = serverPort;
this.pluginChannelHandler.registerHandler(PluginHelloMessage.class, msg -> {
this.sendConnectInfo();
this.connection.registerHandler(RemotePlayerConfigMessage.class, remotePlayerConfigMessage ->
{
this.config.clientConfig = (MultiplayerConfig) remotePlayerConfigMessage.payload;
this.connection.sendMessage(new RemotePlayerConfigMessage(this.config));
});
this.pluginChannelHandler.registerHandler(PluginCloseEvent.class, event -> {
this.connection.registerHandler(HelloMessage.class, msg -> {
this.initializeLodSession();
});
this.connection.registerHandler(PluginCloseEvent.class, event -> {
// Noop
});
}
public void sendConnectInfo()
public void initializeLodSession()
{
String ipOverride = ServerNetworking.connectIpOverride.get();
int portOverride = ServerNetworking.connectPortOverride.get();
InetAddress ip = ((InetSocketAddress) this.serverPlayer.getRemoteAddress()).getAddress();
boolean isLanPlayer = !ServerNetworking.enableConnectOverridesInLan.get() && (ip.isLinkLocalAddress() || ip.isSiteLocalAddress());
this.pluginChannelHandler.sendMessageServer(this.serverPlayer, new ServerConnectInfoMessage(
!isLanPlayer && !ipOverride.isEmpty() ? ipOverride : null,
!isLanPlayer && portOverride != 0 ? portOverride : this.serverPort
));
}
public void close()
{
this.pluginChannelHandler.close();
this.configChangeListener.close();
this.connection.close();
}
private void onConfigChanged()
{
this.connection.sendMessage(new RemotePlayerConfigMessage(this.config));
}
@@ -92,7 +85,6 @@ public class ServerPlayerState
() -> ServerNetworking.generationRequestRCLimit.get(),
msg -> {
msg.sendResponse(new RateLimitedException("Full data request rate/concurrency limit: " + ServerPlayerState.this.config.getFullDataRequestConcurrencyLimit()));
ServerPlayerState.this.rateLimitKickTrigger.tryAcquire(null);
}
);
@@ -100,7 +92,6 @@ public class ServerPlayerState
() -> ServerNetworking.genTaskPriorityRequestRateLimit.get(),
msg -> {
msg.sendResponse(new RateLimitedException("Generation task priority check rate limit: " + ServerPlayerState.this.config.getFullDataRequestConcurrencyLimit()));
ServerPlayerState.this.rateLimitKickTrigger.tryAcquire(null);
}
);
@@ -108,7 +99,6 @@ public class ServerPlayerState
() -> ServerNetworking.loginDataSyncRCLimit.get(),
msg -> {
msg.sendResponse(new RateLimitedException("Data sync rate/concurrency limit: " + ServerPlayerState.this.config.getLoginDataSyncRCLimit()));
ServerPlayerState.this.rateLimitKickTrigger.tryAcquire(null);
}
);
@@ -21,87 +21,196 @@ package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.network.messages.AbstractMessageRegistry;
import com.seibel.distanthorizons.core.network.messages.ICloseEvent;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.network.messages.plugin.base.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.ExceptionMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginCloseEvent;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginMessageRegistry;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelSession;
import com.seibel.distanthorizons.core.network.plugin.TrackableMessage;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.LogManager;
import java.io.InvalidClassException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
public abstract class NetworkEventSource<TMessage extends INetworkObject>
public abstract class NetworkEventSource
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
protected final ConcurrentMap<Class<? extends TMessage>, Set<Consumer<TMessage>>> handlers = new ConcurrentHashMap<>();
protected final ConcurrentMap<Class<? extends PluginChannelMessage>, Set<Consumer<PluginChannelMessage>>> handlers = new ConcurrentHashMap<>();
private final ConcurrentMap<PluginChannelSession, ConcurrentMap<Long, FutureResponseData>> pendingFutures = new ConcurrentHashMap<>();
protected final AbstractMessageRegistry<TMessage> messageRegistry;
public NetworkEventSource(AbstractMessageRegistry<TMessage> messageRegistry)
protected boolean hasHandler(Class<? extends PluginChannelMessage> handlerClass)
{
this.messageRegistry = messageRegistry;
return this.handlers.containsKey(handlerClass);
}
protected final void handleMessage(TMessage message)
protected void handleMessage(PluginChannelMessage message)
{
boolean handled = false;
Set<Consumer<TMessage>> handlerList = this.handlers.get(message.getClass());
Set<Consumer<PluginChannelMessage>> handlerList = this.handlers.get(message.getClass());
if (handlerList != null)
{
for (Consumer<TMessage> handler : handlerList)
for (Consumer<PluginChannelMessage> handler : handlerList)
{
handled = true;
handler.accept(message);
}
}
handled |= this.tryHandleMessage(message);
if (message instanceof TrackableMessage)
{
TrackableMessage trackableMessage = (TrackableMessage) message;
ConcurrentMap<Long, FutureResponseData> subMap = this.pendingFutures.get(message.getConnection());
if (subMap != null)
{
FutureResponseData responseData = subMap.get(trackableMessage.futureId);
if (responseData != null)
{
handled = true;
if (message instanceof ExceptionMessage)
{
responseData.future.completeExceptionally(((ExceptionMessage) message).exception);
}
else if (message.getClass() != responseData.responseClass)
{
responseData.future.completeExceptionally(new InvalidClassException("Response with invalid type: expected " + responseData.responseClass.getSimpleName() + ", got:" + message));
}
else
{
responseData.future.complete(trackableMessage);
}
}
}
}
if (!handled && ModInfo.IS_DEV_BUILD)
if (!handled && ModInfo.IS_DEV_BUILD && message.warnWhenUnhandled())
{
LOGGER.warn("Unhandled message: " + message);
}
}
protected boolean tryHandleMessage(TMessage message)
protected void addNewConnection(PluginChannelSession connection)
{
// By default, messages are handled only by their direct handlers.
return false;
this.pendingFutures.put(connection, new ConcurrentHashMap<>());
}
public <T extends TMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
public <T extends PluginChannelMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
{
//noinspection unchecked
this.handlers.computeIfAbsent(handlerClass, missingHandlerClass ->
{
// Will throw if the handler class is not found and not a CloseEvent
if (!ICloseEvent.class.isAssignableFrom(missingHandlerClass))
// Will throw if the handler class is not found
if (handlerClass != PluginCloseEvent.class)
{
this.messageRegistry.getMessageId(handlerClass);
PluginMessageRegistry.INSTANCE.getMessageId(handlerClass);
}
return ConcurrentHashMap.newKeySet();
return new HashSet<>();
})
.add((Consumer<TMessage>) handlerImplementation);
.add((Consumer<PluginChannelMessage>) handlerImplementation);
}
protected boolean hasHandler(Class<? extends TMessage> handlerClass)
{
return this.handlers.containsKey(handlerClass);
}
protected <T extends TMessage> void removeHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
protected <T extends PluginChannelMessage> void removeHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
{
this.handlers.computeIfAbsent(handlerClass, missingHandlerClass -> new HashSet<>())
.remove(handlerImplementation);
}
protected <TResponse extends TrackableMessage> CompletableFuture<TResponse> createRequest(PluginChannelSession connection, TrackableMessage msg, Class<TResponse> responseClass)
{
msg.setConnection(connection);
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
responseFuture.whenComplete((response, throwable) ->
{
if (!(throwable instanceof ChannelException))
{
ConcurrentMap<Long, FutureResponseData> subMap = this.pendingFutures.get(connection);
if (subMap != null)
{
subMap.remove(msg.futureId);
}
}
if (throwable instanceof CancellationException)
{
msg.sendResponse(new CancelMessage());
}
});
ConcurrentMap<Long, FutureResponseData> subMap = this.pendingFutures.get(connection);
if (subMap == null)
{
// Was deleted before adding
responseFuture.completeExceptionally(connection.getCloseReason());
return responseFuture;
}
subMap.put(msg.futureId, new FutureResponseData(responseClass, responseFuture));
if (!this.pendingFutures.containsKey(connection))
{
// Was deleted while adding
// Note: removal from subMap will happen in whenComplete above
responseFuture.completeExceptionally(connection.getCloseReason());
return responseFuture;
}
// If passed until here, cancelling is up to the cleaning side
return responseFuture;
}
protected final void completeAllFuturesExceptionally(PluginChannelSession connection, Throwable cause)
{
ConcurrentMap<Long, FutureResponseData> map = this.pendingFutures.remove(connection);
if (map == null)
{
return;
}
for (FutureResponseData responseData : map.values())
{
responseData.future.completeExceptionally(cause);
}
}
protected final void completeAllFuturesExceptionally(Throwable cause)
{
for (PluginChannelSession connection : this.pendingFutures.keySet())
{
this.completeAllFuturesExceptionally(connection, cause);
}
}
public void close()
{
this.handlers.clear();
this.completeAllFuturesExceptionally(new ChannelException(this.getClass().getSimpleName() + " is closed."));
}
}
private static class FutureResponseData
{
public final Class<? extends TrackableMessage> responseClass;
public final CompletableFuture<TrackableMessage> future;
private <T extends TrackableMessage> FutureResponseData(Class<T> responseClass, CompletableFuture<T> future)
{
this.responseClass = responseClass;
//noinspection unchecked
this.future = (CompletableFuture<TrackableMessage>) future;
}
}
}
@@ -19,24 +19,24 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import java.util.function.Consumer;
/** Provides a way to register network message handlers which are expected to be removed later. */
public final class ScopedNetworkEventSource<TParent extends NetworkEventSource<TMessage>, TMessage extends INetworkObject> extends NetworkEventSource<TMessage>
public final class ScopedNetworkEventSource extends NetworkEventSource
{
public final TParent parent;
public final NetworkEventSource parent;
private boolean isClosed = false;
public ScopedNetworkEventSource(TParent parent)
public ScopedNetworkEventSource(NetworkEventSource parent)
{
super(parent.messageRegistry);
this.parent = parent;
}
@Override
public <T extends TMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
public <T extends PluginChannelMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
{
if (this.isClosed)
{
@@ -55,9 +55,9 @@ public final class ScopedNetworkEventSource<TParent extends NetworkEventSource<T
public void close()
{
this.isClosed = true;
for (Class<? extends TMessage> handlerClass : this.handlers.keySet())
for (Class<? extends PluginChannelMessage> handlerClass : this.handlers.keySet())
{
this.parent.removeHandler(handlerClass, this::handleMessage);
}
}
}
}
@@ -1,75 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
public abstract class AbstractMessageRegistry<TMessage extends INetworkObject>
{
private final Map<Integer, Supplier<? extends TMessage>> idToSupplier = new HashMap<>();
private final BiMap<Class<? extends TMessage>, Integer> classToId = HashBiMap.create();
protected <T extends TMessage> void registerMessage(Class<T> clazz, Supplier<T> supplier)
{
int id = this.idToSupplier.size() + 1;
this.idToSupplier.put(id, supplier);
this.classToId.put(clazz, id);
}
public TMessage createMessage(int messageId) throws IllegalArgumentException
{
try
{
return this.idToSupplier.get(messageId).get();
}
catch (NullPointerException e)
{
throw new IllegalArgumentException("Invalid message ID: " + messageId);
}
}
@SuppressWarnings("unchecked")
public int getMessageId(TMessage message)
{
return this.getMessageId((Class<? extends TMessage>) message.getClass());
}
public int getMessageId(Class<? extends TMessage> messageClass)
{
try
{
return this.classToId.get(messageClass);
}
catch (NullPointerException e)
{
throw new IllegalArgumentException("Message does not have ID assigned to it: " + messageClass.getSimpleName());
}
}
}
@@ -1,14 +0,0 @@
package com.seibel.distanthorizons.core.network.messages;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import io.netty.buffer.ByteBuf;
public interface ICloseEvent extends INetworkObject
{
@Override
default void encode(ByteBuf out) { throw new UnsupportedOperationException(this.getClass().getSimpleName() + " is not a real message, and cannot be sent."); }
@Override
default void decode(ByteBuf in) { throw new UnsupportedOperationException(this.getClass().getSimpleName() + " is not a real message, and cannot be received."); }
}
@@ -0,0 +1,118 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.seibel.distanthorizons.core.network.messages.plugin.CurrentLevelKeyMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.HelloMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginMessageRegistry;
import com.seibel.distanthorizons.core.network.messages.plugin.base.AckMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.ExceptionMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.fullData.generation.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.session.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.session.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
public class PluginMessageRegistry
{
public static final PluginMessageRegistry INSTANCE = new PluginMessageRegistry();
private final Map<Integer, Supplier<? extends PluginChannelMessage>> idToSupplier = new HashMap<>();
private final BiMap<Class<? extends PluginChannelMessage>, Integer> classToId = HashBiMap.create();
private PluginMessageRegistry()
{
// Note: Messages must have parameterless constructors
// Always sent by the client
this.registerMessage(HelloMessage.class, HelloMessage::new);
this.registerMessage(CloseReasonMessage.class, CloseReasonMessage::new);
// Multiverse support
this.registerMessage(CurrentLevelKeyMessage.class, CurrentLevelKeyMessage::new);
// Core
this.registerMessage(AckMessage.class, AckMessage::new);
this.registerMessage(CancelMessage.class, CancelMessage::new);
this.registerMessage(ExceptionMessage.class, ExceptionMessage::new);
// ID & config
this.registerMessage(PlayerUUIDMessage.class, PlayerUUIDMessage::new);
this.registerMessage(RemotePlayerConfigMessage.class, RemotePlayerConfigMessage::new);
// Full data requests & updates
this.registerMessage(FullDataSourceRequestMessage.class, FullDataSourceRequestMessage::new);
this.registerMessage(FullDataSourceResponseMessage.class, FullDataSourceResponseMessage::new);
this.registerMessage(FullDataPartialUpdateMessage.class, FullDataPartialUpdateMessage::new);
}
protected <T extends PluginChannelMessage> void registerMessage(Class<T> clazz, Supplier<T> supplier)
{
int id = this.idToSupplier.size() + 1;
this.idToSupplier.put(id, supplier);
this.classToId.put(clazz, id);
}
public PluginChannelMessage createMessage(int messageId) throws IllegalArgumentException
{
try
{
return this.idToSupplier.get(messageId).get();
}
catch (NullPointerException e)
{
throw new IllegalArgumentException("Invalid message ID: " + messageId);
}
}
@SuppressWarnings("unchecked")
public int getMessageId(PluginChannelMessage message)
{
return this.gePluginChannelMessageId(message.getClass());
}
public int gePluginChannelMessageId(Class<? extends PluginChannelMessage> messageClass)
{
try
{
return this.classToId.get(messageClass);
}
catch (NullPointerException e)
{
throw new IllegalArgumentException("Message does not have ID assigned to it: " + messageClass.getSimpleName());
}
}
}
@@ -1,46 +0,0 @@
package com.seibel.distanthorizons.core.network.messages.netty;
import com.seibel.distanthorizons.core.network.messages.AbstractMessageRegistry;
import com.seibel.distanthorizons.core.network.messages.netty.base.*;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.generation.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.netty.fullData.generation.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.network.messages.netty.session.PlayerUUIDMessage;
import com.seibel.distanthorizons.core.network.messages.netty.session.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
public class NettyMessageRegistry extends AbstractMessageRegistry<NettyMessage>
{
public static final NettyMessageRegistry INSTANCE = new NettyMessageRegistry();
private NettyMessageRegistry()
{
// Note: Messages must have parameterless constructors
// Opening & closing connection
// These messages should be compatible with any previous protocol versions
this.registerMessage(HelloMessage.class, HelloMessage::new);
this.registerMessage(CloseReasonMessage.class, CloseReasonMessage::new);
// Core
this.registerMessage(AckMessage.class, AckMessage::new);
this.registerMessage(CancelMessage.class, CancelMessage::new);
this.registerMessage(ExceptionMessage.class, ExceptionMessage::new);
// ID & config
this.registerMessage(PlayerUUIDMessage.class, PlayerUUIDMessage::new);
this.registerMessage(RemotePlayerConfigMessage.class, RemotePlayerConfigMessage::new);
// Full data requests & updates
this.registerMessage(FullDataSourceRequestMessage.class, FullDataSourceRequestMessage::new);
this.registerMessage(FullDataSourceResponseMessage.class, FullDataSourceResponseMessage::new);
this.registerMessage(FullDataPartialUpdateMessage.class, FullDataPartialUpdateMessage::new);
// Generation task prioritization
this.registerMessage(GenTaskPriorityRequestMessage.class, GenTaskPriorityRequestMessage::new);
this.registerMessage(GenTaskPriorityResponseMessage.class, GenTaskPriorityResponseMessage::new);
}
}
@@ -1,43 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.base;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.buffer.ByteBuf;
public class HelloMessage extends NettyMessage
{
public int version = ModInfo.PROTOCOL_VERSION;
@Override
public void encode(ByteBuf out) { out.writeInt(this.version); }
@Override
public void decode(ByteBuf in) { this.version = in.readInt(); }
@Override public String toString()
{
return super.toString("version=" + this.version);
}
}
@@ -1,32 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.base;
import com.seibel.distanthorizons.core.network.messages.ICloseEvent;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
/**
* This is not a "real" message, and only used to indicate a disconnection.
* To send a "disconnect reason" message, use {@link CloseReasonMessage}.
*/
public class NettyCloseEvent extends NettyMessage implements ICloseEvent
{
}
@@ -1,68 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.fullData.generation;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.network.messages.netty.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
public class GenTaskPriorityRequestMessage extends TrackableNettyMessage implements ILevelRelatedMessage
{
public List<DhSectionPos> posList = new ArrayList<>();
private String levelName;
@Override
public String getLevelName() { return this.levelName; }
public GenTaskPriorityRequestMessage() { }
public GenTaskPriorityRequestMessage(List<DhSectionPos> posList, IDhLevel level)
{
this.posList = posList;
// TODO Multiverse support
this.levelName = level.getLevelWrapper().getDimensionType().getDimensionName();
}
@Override
protected void encode0(ByteBuf out)
{
this.writeString(this.levelName, out);
this.writeCollection(out, this.posList);
}
@Override
protected void decode0(ByteBuf in)
{
this.levelName = this.readString(in);
this.readCollection(in, this.posList, DhSectionPos::zero);
}
@Override public String toString()
{
return super.toString("posList=" + this.posList);
}
}
@@ -1,56 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.fullData.generation;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Map;
public class GenTaskPriorityResponseMessage extends TrackableNettyMessage
{
public Map<DhSectionPos, Integer> posList = new HashMap<>();
public GenTaskPriorityResponseMessage() { }
public GenTaskPriorityResponseMessage(Map<DhSectionPos, Integer> posList)
{
this.posList = posList;
}
@Override
protected void encode0(ByteBuf out)
{
this.writeCollection(out, this.posList.entrySet());
}
@Override
protected void decode0(ByteBuf in)
{
this.readMap(in, this.posList, DhSectionPos::zero, () -> 0);
}
@Override public String toString()
{
return super.toString("posList=" + this.posList);
}
}
@@ -1,4 +1,4 @@
package com.seibel.distanthorizons.core.network.messages.netty;
package com.seibel.distanthorizons.core.network.messages.plugin;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
@@ -2,10 +2,17 @@ package com.seibel.distanthorizons.core.network.messages.plugin;
import com.seibel.distanthorizons.core.network.messages.ICloseEvent;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import io.netty.buffer.ByteBuf;
/**
* This is not a "real" message, and only used to indicate a disconnection.
*/
public class PluginCloseEvent extends PluginChannelMessage implements ICloseEvent
public class PluginCloseEvent extends PluginChannelMessage implements ICloseEvent, INetworkObject
{
}
@Override
public void encode(ByteBuf out) { throw new UnsupportedOperationException(this.getClass().getSimpleName() + " is not a real message, and cannot be sent."); }
@Override
public void decode(ByteBuf in) { throw new UnsupportedOperationException(this.getClass().getSimpleName() + " is not a real message, and cannot be received."); }
}
@@ -1,21 +0,0 @@
package com.seibel.distanthorizons.core.network.messages.plugin;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import io.netty.buffer.ByteBuf;
/** Serves as a trigger for the server to send the first world change. */
public class PluginHelloMessage extends PluginChannelMessage
{
@Override
public void encode(ByteBuf out)
{
}
@Override
public void decode(ByteBuf in)
{
}
}
@@ -1,20 +0,0 @@
package com.seibel.distanthorizons.core.network.messages.plugin;
import com.seibel.distanthorizons.core.network.messages.AbstractMessageRegistry;
import com.seibel.distanthorizons.core.network.messages.netty.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
public class PluginMessageRegistry extends AbstractMessageRegistry<PluginChannelMessage>
{
public static final PluginMessageRegistry INSTANCE = new PluginMessageRegistry();
private PluginMessageRegistry()
{
// Note: Messages must have parameterless constructors
this.registerMessage(PluginHelloMessage.class, PluginHelloMessage::new);
this.registerMessage(CurrentLevelKeyMessage.class, CurrentLevelKeyMessage::new);
this.registerMessage(ServerConnectInfoMessage.class, ServerConnectInfoMessage::new);
}
}
@@ -17,16 +17,17 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.base;
package com.seibel.distanthorizons.core.network.messages.plugin.base;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableNettyMessage;
import io.netty.buffer.ByteBuf;
/**
* Simple empty response message.
* This message is not sent automatically.
*/
public class AckMessage extends TrackableNettyMessage
public class AckMessage extends TrackableMessage
{
public AckMessage() { }
@@ -36,4 +37,4 @@ public class AckMessage extends TrackableNettyMessage
@Override
public void decode0(ByteBuf in) { }
}
}
@@ -17,12 +17,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.base;
package com.seibel.distanthorizons.core.network.messages.plugin.base;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableNettyMessage;
import io.netty.buffer.ByteBuf;
public class CancelMessage extends TrackableNettyMessage
public class CancelMessage extends TrackableMessage
{
public CancelMessage() { }
@@ -35,4 +36,4 @@ public class CancelMessage extends TrackableNettyMessage
public void decode0(ByteBuf in)
{
}
}
}
@@ -17,12 +17,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.base;
package com.seibel.distanthorizons.core.network.messages.plugin.base;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import io.netty.buffer.ByteBuf;
public class CloseReasonMessage extends NettyMessage
public class CloseReasonMessage extends PluginChannelMessage
{
public String reason;
@@ -43,4 +44,4 @@ public class CloseReasonMessage extends NettyMessage
return super.toString("reason='" + this.reason + '\'');
}
}
}
@@ -17,19 +17,20 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.base;
package com.seibel.distanthorizons.core.network.messages.plugin.base;
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
import com.seibel.distanthorizons.core.network.exceptions.InvalidSectionPosException;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableNettyMessage;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
public class ExceptionMessage extends TrackableNettyMessage
public class ExceptionMessage extends TrackableMessage
{
private static final List<Class<? extends Exception>> exceptionMap = new ArrayList<Class<? extends Exception>>()
{{
@@ -66,4 +67,4 @@ public class ExceptionMessage extends TrackableNettyMessage
return super.toString("exception=" + this.exception);
}
}
}
@@ -0,0 +1,24 @@
package com.seibel.distanthorizons.core.network.messages.plugin.base;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.buffer.ByteBuf;
/** Serves as a trigger for the server to send the first world change. */
public class HelloMessage extends PluginChannelMessage
{
public short version = ModInfo.PROTOCOL_VERSION;
@Override
public void encode(ByteBuf out)
{
out.writeShort(this.version);
}
@Override
public void decode(ByteBuf in)
{
this.version = in.readShort();
}
}
@@ -17,13 +17,14 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.fullData;
package com.seibel.distanthorizons.core.network.messages.plugin.fullData;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.network.messages.netty.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
@@ -31,7 +32,7 @@ import io.netty.buffer.ByteBuf;
import java.io.IOException;
public class FullDataPartialUpdateMessage extends NettyMessage implements ILevelRelatedMessage
public class FullDataPartialUpdateMessage extends PluginChannelMessage implements ILevelRelatedMessage
{
private String levelName;
@Override
@@ -17,10 +17,11 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.fullData;
package com.seibel.distanthorizons.core.network.messages.plugin.fullData;
import com.seibel.distanthorizons.core.network.messages.netty.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
@@ -28,7 +29,7 @@ import io.netty.buffer.ByteBuf;
import javax.annotation.Nullable;
public class FullDataSourceRequestMessage extends TrackableNettyMessage implements ILevelRelatedMessage
public class FullDataSourceRequestMessage extends TrackableMessage implements ILevelRelatedMessage
{
private String levelName;
@@ -17,12 +17,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.fullData;
package com.seibel.distanthorizons.core.network.messages.plugin.fullData;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import io.netty.buffer.ByteBuf;
@@ -34,7 +35,7 @@ import java.io.IOException;
* Response message, containing the requested full data source,
* or nothing if requested in updates-only mode and the data was not updated.
*/
public class FullDataSourceResponseMessage extends TrackableNettyMessage
public class FullDataSourceResponseMessage extends TrackableMessage
{
@Nullable
public FullDataSourceV2DTO dataSourceDto;
@@ -56,7 +57,8 @@ public class FullDataSourceResponseMessage extends TrackableNettyMessage
}
}
@Override public void encode0(ByteBuf out)
@Override
public void encode0(ByteBuf out)
{
if (this.writeOptional(out, this.dataSourceDto))
{
@@ -17,14 +17,15 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.session;
package com.seibel.distanthorizons.core.network.messages.plugin.session;
import com.seibel.distanthorizons.core.network.netty.TrackableNettyMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableMessage;
import com.seibel.distanthorizons.core.network.plugin.TrackableNettyMessage;
import io.netty.buffer.ByteBuf;
import java.util.UUID;
public class PlayerUUIDMessage extends TrackableNettyMessage
public class PlayerUUIDMessage extends TrackableMessage
{
public UUID playerUUID;
@@ -46,4 +47,4 @@ public class PlayerUUIDMessage extends TrackableNettyMessage
return super.toString("playerUUID=" + this.playerUUID);
}
}
}
@@ -17,30 +17,25 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.messages.netty.session;
package com.seibel.distanthorizons.core.network.messages.plugin.session;
import com.seibel.distanthorizons.core.multiplayer.config.AbstractMultiplayerConfig;
import com.seibel.distanthorizons.core.multiplayer.config.MultiplayerConfig;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import io.netty.buffer.ByteBuf;
public class RemotePlayerConfigMessage extends NettyMessage
public class RemotePlayerConfigMessage extends PluginChannelMessage
{
public AbstractMultiplayerConfig payload;
public RemotePlayerConfigMessage() { }
public RemotePlayerConfigMessage(AbstractMultiplayerConfig payload) { this.payload = payload; }
public AbstractMultiplayerConfig payload;
@Override
public void encode(ByteBuf out) { this.payload.encode(out); }
public RemotePlayerConfigMessage() { }
public RemotePlayerConfigMessage(AbstractMultiplayerConfig payload) { this.payload = payload; }
@Override
public void decode(ByteBuf in) { this.payload = INetworkObject.readToObject(new MultiplayerConfig(), in); }
@Override
public void encode(ByteBuf out) { this.payload.encode(out); }
@Override public String toString()
{
return super.toString("payload=" + this.payload);
}
@Override
public void decode(ByteBuf in) { this.payload = INetworkObject.readToObject(new MultiplayerConfig(), in); }
}
}
@@ -1,224 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.netty;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.network.messages.netty.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.messages.netty.base.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.netty.base.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.netty.NettyMessageHandler;
import com.seibel.distanthorizons.core.network.protocol.netty.NettyChannelInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.logging.log4j.LogManager;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class NettyClient extends NettyEventSource implements INettyConnection, AutoCloseable
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
private enum EConnectionState
{
INITIAL,
OPEN,
RECONNECTING,
GOT_CLOSE_REASON,
CLOSED
}
private static final Set<EConnectionState> closedStates = EnumSet.of(
EConnectionState.GOT_CLOSE_REASON,
EConnectionState.CLOSED
);
private final AtomicReference<EConnectionState> connectionState = new AtomicReference<>(EConnectionState.INITIAL);
/** Indicates whether the client is working. */
public boolean isInitialized() { return this.connectionState.get() != EConnectionState.INITIAL; }
/** Indicates whether the client is closed(-ing) and should not be used. */
public boolean isClosed() { return closedStates.contains(this.connectionState.get()); }
private static final int RECONNECTION_DELAY_SEC = 5;
public static final int RECONNECTION_ATTEMPTS = 3;
private InetSocketAddress address;
@Nullable
private Throwable closeReason;
@Override
@Nullable
public Throwable getCloseReason() { return this.closeReason; }
private final EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("DH-Network - Client Thread"));
private final Bootstrap clientBootstrap = new Bootstrap()
.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new NettyChannelInitializer(new NettyMessageHandler(
(ctx, msg) -> {
msg.setConnection(this);
this.handleMessage(msg);
},
ctx -> this.addNewConnection(this),
(ctx, closeReason) -> this.closeReason = closeReason
)));
private Channel channel;
private final AtomicInteger reconnectionAttemptsLeft = new AtomicInteger(RECONNECTION_ATTEMPTS);
/** Returns the amount of reconnections the client will attempt to perform before giving up. */
public int getReconnectionAttemptsLeft() { return this.reconnectionAttemptsLeft.get(); }
public NettyClient()
{
this.registerHandler(CloseReasonMessage.class, closeReasonMessage ->
{
String fullCloseText = "[Server] " + closeReasonMessage.reason;
LOGGER.warn(fullCloseText);
this.closeReason = new Exception(fullCloseText);
this.connectionState.set(EConnectionState.GOT_CLOSE_REASON);
});
this.registerHandler(NettyCloseEvent.class, closeEvent ->
{
LOGGER.info("Disconnected from server: "+this.getRemoteAddress());
if (this.connectionState.get() == EConnectionState.GOT_CLOSE_REASON)
{
this.close();
}
});
}
public void resetAndConnectTo(String host, int port)
{
if (this.connectionState.getAndUpdate(state -> state != EConnectionState.CLOSED ? EConnectionState.INITIAL : state) == EConnectionState.CLOSED)
{
return;
}
// Remove IPv6 brackets
host = host.replaceAll("[\\[\\]]", "");
this.address = new InetSocketAddress(host, port);
if (this.channel != null)
{
this.channel.close().syncUninterruptibly();
}
this.reconnectionAttemptsLeft.set(RECONNECTION_ATTEMPTS);
this.connect();
}
public void connect()
{
if (!this.connectionState.compareAndSet(EConnectionState.INITIAL, EConnectionState.OPEN))
{
return;
}
LOGGER.info("Connecting to server: " + this.address);
ChannelFuture connectFuture = this.clientBootstrap.connect(this.address);
this.channel = connectFuture.channel();
connectFuture.addListener((ChannelFuture channelFuture) ->
{
if (!channelFuture.isSuccess())
{
LOGGER.info("Connection failed: " + channelFuture.cause());
return;
}
this.sendMessage(new HelloMessage());
});
this.channel.closeFuture().addListener((ChannelFuture channelFuture) ->
{
this.completeAllFuturesExceptionally(channelFuture.cause() != null
? channelFuture.cause()
: new ChannelException("Channel is closed."));
if (this.connectionState.get() == EConnectionState.OPEN)
{
int reconnectionAttemptsLeft = this.reconnectionAttemptsLeft.decrementAndGet();
LOGGER.info("Reconnection attempts left: [" + this.reconnectionAttemptsLeft.get() + "] of [" + RECONNECTION_ATTEMPTS + "].");
if (reconnectionAttemptsLeft != 0)
{
if (this.connectionState.compareAndSet(EConnectionState.OPEN, EConnectionState.RECONNECTING))
{
this.workerGroup.schedule(() ->
{
if (this.connectionState.compareAndSet(EConnectionState.RECONNECTING, EConnectionState.INITIAL))
{
this.connect();
}
}, RECONNECTION_DELAY_SEC, TimeUnit.SECONDS);
}
}
else
{
this.connectionState.compareAndSet(EConnectionState.OPEN, EConnectionState.GOT_CLOSE_REASON);
}
}
});
}
@Override
public ChannelHandlerContext getChannelContext()
{
if (this.channel == null)
{
return null;
}
return this.channel.pipeline().context(NettyMessageHandler.class);
}
@Override
public NettyEventSource getRequestHandler()
{
return this;
}
@Override
public void close()
{
EConnectionState stateChangeResult = this.connectionState.getAndSet(EConnectionState.CLOSED);
if (stateChangeResult == EConnectionState.CLOSED || stateChangeResult == EConnectionState.INITIAL)
{
return;
}
this.channel.close().syncUninterruptibly();
this.workerGroup.shutdownGracefully();
super.close();
}
}
@@ -1,163 +0,0 @@
package com.seibel.distanthorizons.core.network.netty;
import com.seibel.distanthorizons.core.network.NetworkEventSource;
import com.seibel.distanthorizons.core.network.messages.netty.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.messages.netty.NettyMessageRegistry;
import com.seibel.distanthorizons.core.network.messages.netty.base.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.netty.base.ExceptionMessage;
import io.netty.channel.ChannelException;
import java.io.InvalidClassException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
public abstract class NettyEventSource extends NetworkEventSource<NettyMessage>
{
private final ConcurrentMap<INettyConnection, ConcurrentMap<Long, FutureResponseData>> pendingFutures = new ConcurrentHashMap<>();
public NettyEventSource()
{
super(NettyMessageRegistry.INSTANCE);
}
@Override
public <T extends NettyMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
{
if (handlerClass != NettyCloseEvent.class)
{
// Will throw if the handler class is not found
this.messageRegistry.getMessageId(handlerClass);
}
super.registerHandler(handlerClass, handlerImplementation);
}
@Override
protected boolean tryHandleMessage(NettyMessage message)
{
if (message instanceof TrackableNettyMessage)
{
TrackableNettyMessage trackableMessage = (TrackableNettyMessage) message;
ConcurrentMap<Long, FutureResponseData> subMap = this.pendingFutures.get(message.getConnection());
if (subMap != null)
{
FutureResponseData responseData = subMap.get(trackableMessage.futureId);
if (responseData != null)
{
if (message instanceof ExceptionMessage)
{
responseData.future.completeExceptionally(((ExceptionMessage) message).exception);
}
else if (message.getClass() != responseData.responseClass)
{
responseData.future.completeExceptionally(new InvalidClassException("Response with invalid type: expected " + responseData.responseClass.getSimpleName() + ", got:" + message));
}
else
{
responseData.future.complete(trackableMessage);
}
return true;
}
}
}
// Still return true if message should be silent when unhandled
return !message.warnWhenUnhandled();
}
protected void addNewConnection(INettyConnection connection)
{
this.pendingFutures.put(connection, new ConcurrentHashMap<>());
}
public <TResponse extends TrackableNettyMessage> CompletableFuture<TResponse> createRequest(INettyConnection connection, TrackableNettyMessage msg, Class<TResponse> responseClass)
{
msg.setConnection(connection);
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
responseFuture.whenComplete((response, throwable) ->
{
if (!(throwable instanceof ChannelException))
{
ConcurrentMap<Long, FutureResponseData> subMap = this.pendingFutures.get(connection);
if (subMap != null)
{
subMap.remove(msg.futureId);
}
}
if (throwable instanceof CancellationException)
{
msg.sendResponse(new CancelMessage());
}
});
ConcurrentMap<Long, FutureResponseData> subMap = this.pendingFutures.get(connection);
if (subMap == null)
{
// Was deleted before adding
responseFuture.completeExceptionally(connection.getCloseReason());
return responseFuture;
}
subMap.put(msg.futureId, new FutureResponseData(responseClass, responseFuture));
if (!this.pendingFutures.containsKey(connection))
{
// Was deleted while adding
// Note: removal from subMap will happen in whenComplete above
responseFuture.completeExceptionally(connection.getCloseReason());
return responseFuture;
}
// If passed until here, cancelling is up to the cleaning side
return responseFuture;
}
protected final void completeAllFuturesExceptionally(INettyConnection connection, Throwable cause)
{
ConcurrentMap<Long, FutureResponseData> map = this.pendingFutures.remove(connection);
if (map == null)
{
return;
}
for (FutureResponseData responseData : map.values())
{
responseData.future.completeExceptionally(cause);
}
}
protected final void completeAllFuturesExceptionally(Throwable cause)
{
for (INettyConnection connection : this.pendingFutures.keySet())
{
this.completeAllFuturesExceptionally(connection, cause);
}
}
@Override
public void close()
{
super.close();
this.completeAllFuturesExceptionally(new ChannelException(this.getClass().getSimpleName() + " is closed."));
}
private static class FutureResponseData
{
public final Class<? extends TrackableNettyMessage> responseClass;
public final CompletableFuture<TrackableNettyMessage> future;
private <T extends TrackableNettyMessage> FutureResponseData(Class<T> responseClass, CompletableFuture<T> future)
{
this.responseClass = responseClass;
//noinspection unchecked
this.future = (CompletableFuture<TrackableNettyMessage>) future;
}
}
}
@@ -1,58 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.netty;
import com.seibel.distanthorizons.core.network.netty.INettyConnection;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import javax.annotation.Nullable;
public abstract class NettyMessage implements INetworkObject
{
private INettyConnection connection = null;
public boolean warnWhenUnhandled() { return true; }
public INettyConnection getConnection()
{
return this.connection;
}
public void setConnection(INettyConnection connection)
{
if (this.connection != null)
{
throw new IllegalStateException("Channel context cannot be changed after initial setting.");
}
this.connection = connection;
}
@Override
public String toString()
{
return this.toString(" ");
}
protected String toString(@Nullable String extraData)
{
return this.getClass().getSimpleName() + "{" + extraData + '}';
}
}
@@ -1,177 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.netty;
import com.google.common.collect.MapMaker;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.network.messages.netty.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.messages.netty.base.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.netty.NettyMessageHandler;
import com.seibel.distanthorizons.core.network.protocol.netty.NettyChannelInitializer;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.logging.log4j.LogManager;
import javax.annotation.Nullable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class NettyServer extends NettyEventSource implements AutoCloseable
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
public final int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("DH-Network - Server Boss Thread"));
private final EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("DH-Network - Server Worker Thread"));
private final AtomicBoolean isClosed = new AtomicBoolean();
private final ConcurrentMap<ChannelHandlerContext, INettyConnection> connections = new MapMaker().weakKeys().weakValues().makeMap();
public NettyServer(int port)
{
this.port = port;
LOGGER.info("Starting server on port "+port);
this.registerHandlers();
this.bind();
}
private void registerHandlers()
{
this.registerHandler(HelloMessage.class, helloMessage ->
{
INettyConnection connection = helloMessage.getConnection();
LOGGER.info("Client connected: "+connection.getRemoteAddress());
if (helloMessage.version != ModInfo.PROTOCOL_VERSION)
{
try
{
String disconnectReason = "Version mismatch. Server version: ["+ModInfo.PROTOCOL_VERSION+"], client version: ["+helloMessage.version+"].";
LOGGER.info("Disconnecting the client ["+connection.getRemoteAddress()+"]: "+disconnectReason);
connection.disconnect(disconnectReason);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return;
}
connection.sendMessage(new HelloMessage());
});
this.registerHandler(NettyCloseEvent.class, closeEvent ->
{
INettyConnection connection = closeEvent.getConnection();
LOGGER.info("Client disconnected: "+connection.getRemoteAddress());
this.completeAllFuturesExceptionally(closeEvent.getConnection(), connection.getCloseReason());
});
}
private void bind()
{
ServerBootstrap bootstrap = new ServerBootstrap()
.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new NettyChannelInitializer(new NettyMessageHandler(
(ctx, msg) -> {
msg.setConnection(this.connections.computeIfAbsent(ctx, Connection::new));
this.handleMessage(msg);
},
ctx -> this.addNewConnection(this.connections.computeIfAbsent(ctx, Connection::new)),
(ctx, closeReason) -> ((Connection) this.connections.computeIfAbsent(ctx, Connection::new)).closeReason = closeReason
)));
ChannelFuture bindFuture = bootstrap.bind(this.port);
bindFuture.addListener((ChannelFuture channelFuture) ->
{
if (!channelFuture.isSuccess())
{
throw new RuntimeException("Failed to bind: " + channelFuture.cause());
}
LOGGER.info("Server is started on port "+this.port);
});
Channel channel = bindFuture.channel();
channel.closeFuture().addListener(future -> this.close());
}
@Override
public void close()
{
if (!this.isClosed.compareAndSet(false, true))
{
return;
}
LOGGER.info("Shutting down the network server.");
this.workerGroup.shutdownGracefully().syncUninterruptibly();
this.bossGroup.shutdownGracefully().syncUninterruptibly();
LOGGER.info("Network server has been closed.");
super.close();
}
public class Connection implements INettyConnection
{
private final ChannelHandlerContext channelContext;
@Nullable
private Throwable closeReason;
@Override
@Nullable
public Throwable getCloseReason() { return this.closeReason; }
public Connection(ChannelHandlerContext channelContext)
{
this.channelContext = channelContext;
}
@Override
public ChannelHandlerContext getChannelContext()
{
return this.channelContext;
}
@Override
public NettyEventSource getRequestHandler()
{
return NettyServer.this;
}
}
}
@@ -5,5 +5,21 @@ import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapp
public abstract class PluginChannelMessage implements INetworkObject
{
public IServerPlayerWrapper serverPlayer;
}
public PluginChannelSession session = null;
public boolean warnWhenUnhandled() { return true; }
public PluginChannelSession getConnection()
{
return this.session;
}
public void setConnection(PluginChannelSession connection)
{
if (this.session != null)
{
throw new IllegalStateException("Session cannot be changed after initialization.");
}
this.session = connection;
}
}
@@ -6,25 +6,32 @@ import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.network.NetworkEventSource;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginCloseEvent;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginMessageRegistry;
import com.seibel.distanthorizons.core.network.protocol.plugin.PluginMessageDecoder;
import com.seibel.distanthorizons.core.network.protocol.plugin.PluginMessageEncoder;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IPluginPacketSender;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
public class PluginChannelHandler extends NetworkEventSource<PluginChannelMessage>
public class PluginChannelSession extends NetworkEventSource
{
/**
* 4 MiB should be enough for any transferred data. <br>
* Currently largest transferred data is DH full data sections, which usually don't exceed 1-2 MiB in size.
*/
private static final int MAX_MESSAGE_LENGTH = 4194304;
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
private final PluginMessageDecoder messageDecoder = new PluginMessageDecoder();
private final PluginMessageEncoder messageEncoder = new PluginMessageEncoder();
private final IPluginPacketSender packetSender = SingletonInjector.INSTANCE.get(IPluginPacketSender.class);
/**
@@ -33,15 +40,22 @@ public class PluginChannelHandler extends NetworkEventSource<PluginChannelMessag
* to allow multi-compat servers.
*/
private final AtomicBoolean isClosed = new AtomicBoolean();
@Nullable
private IServerPlayerWrapper serverPlayer;
public PluginChannelHandler()
public PluginChannelSession()
{
super(PluginMessageRegistry.INSTANCE);
}
public PluginChannelSession(@Nullable IServerPlayerWrapper serverPlayer)
{
super(PluginMessageRegistry.INSTANCE);
this.serverPlayer = serverPlayer;
}
public void decodeAndHandle(ByteBuf byteBuf, @Nullable IServerPlayerWrapper serverPlayer)
public void decodeAndHandle(ByteBuf byteBuf)
{
if (this.isClosed.get())
{
@@ -50,16 +64,17 @@ public class PluginChannelHandler extends NetworkEventSource<PluginChannelMessag
try
{
ArrayList<Object> messages = new ArrayList<>();
this.messageDecoder.decode(byteBuf, messages);
for (Object msgObj : messages)
int version = byteBuf.readShort();
if (version != ModInfo.PROTOCOL_VERSION)
{
PluginChannelMessage msg = (PluginChannelMessage) msgObj;
msg.serverPlayer = serverPlayer;
this.handleMessage(msg);
return;
}
PluginChannelMessage msg = PluginMessageRegistry.INSTANCE.createMessage(byteBuf.readUnsignedShort());
msg.decode(byteBuf);
msg.serverPlayer = this.serverPlayer;
this.handleMessage(msg);
}
catch (Throwable e)
{
@@ -71,13 +86,31 @@ public class PluginChannelHandler extends NetworkEventSource<PluginChannelMessag
}
}
public void sendMessageClient(PluginChannelMessage message)
<TResponse extends TrackableMessage> CompletableFuture<TResponse> sendRequest(TrackableMessage msg, Class<TResponse> responseClass)
{
this.packetSender.sendPluginPacketClient(buffer -> this.messageEncoder.encode(message, buffer));
CompletableFuture<TResponse> responseFuture = this.createRequest(this, msg, responseClass);
this.sendMessage(msg);
return responseFuture;
}
public void sendMessageServer(@Nullable IServerPlayerWrapper serverPlayer, PluginChannelMessage message)
public void sendMessage(PluginChannelMessage message)
{
this.packetSender.sendPluginPacketServer(serverPlayer, buffer -> this.messageEncoder.encode(message, buffer));
LOGGER.debug("Sending message: " + message);
Consumer<ByteBuf> encoder = buffer -> {
buffer.writeShort(ModInfo.PROTOCOL_VERSION);
buffer.writeShort(PluginMessageRegistry.INSTANCE.getMessageId(message));
message.encode(buffer);
};
if (this.serverPlayer != null)
{
this.packetSender.sendPluginPacketServer(this.serverPlayer, encoder);
}
else
{
this.packetSender.sendPluginPacketClient(encoder);
}
}
@Override
@@ -99,4 +132,4 @@ public class PluginChannelHandler extends NetworkEventSource<PluginChannelMessag
super.close();
}
}
}
@@ -1,63 +1,42 @@
package com.seibel.distanthorizons.core.network.netty;
package com.seibel.distanthorizons.core.network.plugin;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.network.messages.netty.base.CloseReasonMessage;
import io.netty.channel.ChannelException;
import com.seibel.distanthorizons.core.network.messages.plugin.base.CloseReasonMessage;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IPluginPacketSender;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.LogManager;
import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
public interface INettyConnection
public class PluginChannelSessionAAAAA
{
private static final IPluginPacketSender packetSender = SingletonInjector.INSTANCE.get(IPluginPacketSender.class);
public final IServerPlayerWrapper serverPlayer;
public boolean isClosed = false;
public PluginChannelSessionAAAAA(IServerPlayerWrapper serverPlayer)
{
this.serverPlayer = serverPlayer;
}
ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
@Nullable
ChannelHandlerContext getChannelContext();
NettyEventSource getRequestHandler();
@Nullable
Throwable getCloseReason();
@Nullable
default SocketAddress getRemoteAddress()
public void sendMessage(PluginChannelMessage message)
{
ChannelHandlerContext ctx = this.getChannelContext();
if (ctx == null)
{
return null;
}
return ctx.channel().remoteAddress();
}
default CompletableFuture<Void> sendMessage(NettyMessage message)
{
LOGGER.debug("Sending message: " + message);
this.LOGGER.debug("Sending message: " + message);
CompletableFuture<Void> future = new CompletableFuture<>();
ChannelHandlerContext ctx = this.getChannelContext();
if (ctx == null)
if (this.serverPlayer != null)
{
future.completeExceptionally(new ChannelException("Channel is closed."));
return future;
packetSender.sendPluginPacketServer(this.serverPlayer, message);
}
ctx.writeAndFlush(message).addListener(writeFuture ->
{
if (writeFuture.cause() != null)
{
future.completeExceptionally(writeFuture.cause());
}
else
{
future.complete(null);
}
});
return future;
}
@@ -88,4 +67,4 @@ public interface INettyConnection
.addListener(ChannelFutureListener.CLOSE);
}
}
}
@@ -17,20 +17,19 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.netty;
package com.seibel.distanthorizons.core.network.plugin;
import com.google.common.collect.MapMaker;
import com.seibel.distanthorizons.core.api.internal.SharedApi;
import com.seibel.distanthorizons.core.network.messages.netty.base.ExceptionMessage;
import com.seibel.distanthorizons.core.network.messages.plugin.base.ExceptionMessage;
import com.seibel.distanthorizons.core.world.EWorldEnvironment;
import io.netty.buffer.ByteBuf;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class TrackableNettyMessage extends NettyMessage
public abstract class TrackableMessage extends PluginChannelMessage
{
private static final AtomicInteger lastId = new AtomicInteger();
// 32 bits - Context ID (not transmitted)
@@ -40,16 +39,16 @@ public abstract class TrackableNettyMessage extends NettyMessage
| ((Objects.requireNonNull(SharedApi.getEnvironment()) == EWorldEnvironment.Server_Only ? 1 : 0) << 31);
private static final AtomicInteger lastContextId = new AtomicInteger();
private static final ConcurrentMap<INettyConnection, Integer> connectionToIdMap = new MapMaker().weakKeys().makeMap();
private static final ConcurrentMap<PluginChannelSessionAAAAA, Integer> connectionToIdMap = new MapMaker().weakKeys().makeMap();
public void sendResponse(TrackableNettyMessage responseMessage)
public void sendResponse(TrackableMessage responseMessage)
{
responseMessage.futureId = this.futureId;
this.getConnection().sendMessage(responseMessage);
this.session.sendMessage(responseMessage);
}
@Override
public void setConnection(INettyConnection connection)
public void setConnection(PluginChannelSession connection)
{
super.setConnection(connection);
this.futureId |= (long) connectionToIdMap.computeIfAbsent(connection, k -> lastContextId.getAndIncrement()) << 32;
@@ -91,18 +90,4 @@ public abstract class TrackableNettyMessage extends NettyMessage
protected abstract void encode0(ByteBuf out) throws Exception;
protected abstract void decode0(ByteBuf in) throws Exception;
@Override
public String toString()
{
return this.toString(null);
}
@Override protected String toString(@Nullable String extraData)
{
return super.toString(
"futureId=" + this.futureId +
(extraData != null ? ", " + extraData : "")
);
}
}
}
@@ -1,45 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.network.messages.AbstractMessageRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MessageDecoder<TMessage extends INetworkObject> extends ByteToMessageDecoder
{
private final AbstractMessageRegistry<TMessage> messageRegistry;
public MessageDecoder(AbstractMessageRegistry<TMessage> messageRegistry)
{
this.messageRegistry = messageRegistry;
}
@Override
protected void decode(ChannelHandlerContext channelContext, ByteBuf inputByteBuf, List<Object> outputDecodedObjectList)
{
TMessage message = this.messageRegistry.createMessage(inputByteBuf.readUnsignedShort());
outputDecodedObjectList.add(INetworkObject.readToObject(message, inputByteBuf));
}
}
@@ -1,44 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.network.messages.AbstractMessageRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MessageEncoder<TMessage extends INetworkObject> extends MessageToByteEncoder<TMessage>
{
private final AbstractMessageRegistry<TMessage> messageRegistry;
public MessageEncoder(AbstractMessageRegistry<TMessage> messageRegistry, Class<TMessage> messageClass)
{
super(messageClass);
this.messageRegistry = messageRegistry;
}
@Override
protected void encode(ChannelHandlerContext channelContext, TMessage message, ByteBuf outputByteBuf) throws IllegalArgumentException
{
outputByteBuf.writeShort(this.messageRegistry.getMessageId(message));
message.encode(outputByteBuf);
}
}
@@ -1,65 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.protocol.netty;
import com.seibel.distanthorizons.core.network.messages.netty.NettyMessageRegistry;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageDecoder;
import com.seibel.distanthorizons.core.network.protocol.MessageEncoder;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.flush.FlushConsolidationHandler;
import org.jetbrains.annotations.NotNull;
/** Used when creating a network channel */
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel>
{
/**
* 4 MiB should be enough for any transferred data. <br>
* Currently largest transferred data is DH full data sections, which usually don't exceed 1-2 MiB in size.
*/
private static final int MAX_MESSAGE_LENGTH = 4194304;
private final NettyMessageHandler messageHandler;
public NettyChannelInitializer(NettyMessageHandler messageHandler) { this.messageHandler = messageHandler; }
@Override
public void initChannel(@NotNull SocketChannel socketChannel)
{
ChannelPipeline pipeline = socketChannel.pipeline();
// Encoder
pipeline.addLast(new FlushConsolidationHandler(256, true));
pipeline.addLast(new LengthFieldPrepender(Integer.BYTES));
pipeline.addLast(new MessageEncoder<>(NettyMessageRegistry.INSTANCE, NettyMessage.class));
pipeline.addLast(new NettyOutboundExceptionRouter());
// Decoder
pipeline.addLast(new LengthFieldBasedFrameDecoder(MAX_MESSAGE_LENGTH, 0, Integer.BYTES, 0, Integer.BYTES));
pipeline.addLast(new MessageDecoder<>(NettyMessageRegistry.INSTANCE));
// Handler
pipeline.addLast(this.messageHandler);
}
}
@@ -1,93 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.protocol.netty;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.network.messages.netty.base.NettyCloseEvent;
import com.seibel.distanthorizons.core.network.netty.NettyMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.NotNull;
import java.net.SocketException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ChannelHandler.Sharable
public class NettyMessageHandler extends SimpleChannelInboundHandler<NettyMessage>
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Client.Advanced.Logging.logNetworkEvent.get());
private final BiConsumer<ChannelHandlerContext, NettyMessage> messageConsumer;
private final Consumer<ChannelHandlerContext> channelActiveConsumer;
private final BiConsumer<ChannelHandlerContext, Throwable> exceptionConsumer;
public NettyMessageHandler(
BiConsumer<ChannelHandlerContext, NettyMessage> messageConsumer,
Consumer<ChannelHandlerContext> channelActiveConsumer,
BiConsumer<ChannelHandlerContext, Throwable> exceptionConsumer)
{
this.messageConsumer = messageConsumer;
this.channelActiveConsumer = channelActiveConsumer;
this.exceptionConsumer = exceptionConsumer;
}
@Override
protected void channelRead0(ChannelHandlerContext channelContext, NettyMessage message)
{
LOGGER.debug("Received message: " + message);
this.messageConsumer.accept(channelContext, message);
}
@Override
public void channelActive(@NotNull ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
this.channelActiveConsumer.accept(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
if (cause instanceof SocketException)
{
LOGGER.info("Exception caught in channel: [" + ctx.name() + "]: " + cause.getMessage());
}
else
{
LOGGER.error("Exception caught in channel: [" + ctx.name() + "].", cause);
this.exceptionConsumer.accept(ctx, cause);
}
ctx.close();
}
@Override
public void channelInactive(@NotNull ChannelHandlerContext channelContext) throws Exception
{
super.channelInactive(channelContext);
this.channelRead0(channelContext, new NettyCloseEvent());
}
}
@@ -1,36 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.protocol.netty;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class NettyOutboundExceptionRouter extends ChannelOutboundHandlerAdapter
{
@Override
public void write(ChannelHandlerContext channelContext, Object messageObj, ChannelPromise promise) throws Exception
{
promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
super.write(channelContext, messageObj, promise);
}
}
@@ -1,33 +0,0 @@
package com.seibel.distanthorizons.core.network.protocol.plugin;
import com.seibel.distanthorizons.core.network.messages.AbstractMessageRegistry;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginMessageRegistry;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.network.protocol.MessageDecoder;
import com.seibel.distanthorizons.core.network.protocol.MessageEncoder;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
public class PluginMessageDecoder extends MessageDecoder<PluginChannelMessage>
{
public PluginMessageDecoder()
{
super(PluginMessageRegistry.INSTANCE);
}
public void decode(ByteBuf inputByteBuf, List<Object> outputDecodedObjectList)
{
int version = inputByteBuf.readShort();
if (version != ModInfo.PLUGIN_PROTOCOL_VERSION)
{
return;
}
super.decode(null, inputByteBuf, outputDecodedObjectList);
}
}
@@ -1,22 +0,0 @@
package com.seibel.distanthorizons.core.network.protocol.plugin;
import com.seibel.distanthorizons.core.network.messages.plugin.PluginMessageRegistry;
import com.seibel.distanthorizons.core.network.plugin.PluginChannelMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageEncoder;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.buffer.ByteBuf;
public class PluginMessageEncoder extends MessageEncoder<PluginChannelMessage>
{
public PluginMessageEncoder()
{
super(PluginMessageRegistry.INSTANCE, PluginChannelMessage.class);
}
public void encode(PluginChannelMessage pluginChannelMessage, ByteBuf outputByteBuf) throws IllegalArgumentException
{
outputByteBuf.writeShort(ModInfo.PLUGIN_PROTOCOL_VERSION);
super.encode(null, pluginChannelMessage, outputByteBuf);
}
}