diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java index 6ed422cc6..bd553ed78 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java @@ -1,8 +1,8 @@ package com.seibel.distanthorizons.core.network; +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.network.messages.HelloMessage; -import com.seibel.distanthorizons.core.util.NetworkUtil; -import com.seibel.distanthorizons.core.network.protocol.MessageHandlerSide; +import com.seibel.distanthorizons.core.network.protocol.DhNetworkChannelInitializer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -10,12 +10,13 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.logging.log4j.Logger; -import java.io.Closeable; -import java.io.IOException; import java.util.concurrent.TimeUnit; -public class NetworkClient implements Closeable { +public class NetworkClient extends NetworkEventSource implements AutoCloseable { + private static final Logger LOGGER = DhLoggerBuilder.getLogger(); + private enum State { OPEN, RECONNECT, @@ -27,30 +28,29 @@ public class NetworkClient implements Closeable { // TODO move to config of some sort private final String host; private final int port; - private State state = State.OPEN; - private final Bootstrap clientBootstrap; private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + private final Bootstrap clientBootstrap = new Bootstrap() + .group(workerGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new DhNetworkChannelInitializer(messageHandler)); + + private State state = State.OPEN; private Channel channel; public NetworkClient(String host, int port) { this.host = host; this.port = port; - - clientBootstrap = new Bootstrap() - .group(workerGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, true) - .handler(NetworkUtil.getChannelInitializer(MessageHandlerSide.CLIENT)); connect(); } private void connect() { + LOGGER.info("Connecting to {}:{}", host, port); + ChannelFuture connectFuture = clientBootstrap.connect(host, port); connectFuture.addListener((ChannelFuture channelFuture) -> { if (!channelFuture.isSuccess()) return; - - channel = channelFuture.channel(); channel.writeAndFlush(new HelloMessage()); }); @@ -61,6 +61,14 @@ public class NetworkClient implements Closeable { workerGroup.schedule(this::connect, state == State.RECONNECT ? 0 : FAILURE_RECONNECT_DELAY_SEC, TimeUnit.SECONDS); state = State.OPEN; }); + + registerHandler(HelloMessage.class, (msg, ctx) -> { + LOGGER.info("Connected"); + }); + + registerDisconnectHandler(ctx -> { + LOGGER.info("Disconnected"); + }); } /** Kills the current connection, triggering auto-reconnection immediately. */ @@ -70,8 +78,12 @@ public class NetworkClient implements Closeable { } @Override - public void close() throws IOException { + public void close() { + if (closeReason != null) + LOGGER.error(closeReason); + state = State.CLOSED; - workerGroup.shutdownGracefully(); + workerGroup.shutdownGracefully().syncUninterruptibly(); + channel.close().syncUninterruptibly(); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java new file mode 100644 index 000000000..7729eafac --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java @@ -0,0 +1,36 @@ +package com.seibel.distanthorizons.core.network; + +import com.seibel.distanthorizons.core.network.messages.HelloMessage; +import com.seibel.distanthorizons.core.network.messages.Message; +import com.seibel.distanthorizons.core.network.protocol.MessageHandler; +import com.seibel.distanthorizons.coreapi.ModInfo; +import io.netty.channel.ChannelHandlerContext; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public abstract class NetworkEventSource implements AutoCloseable { + protected final MessageHandler messageHandler = new MessageHandler(); + protected String closeReason = null; + + public NetworkEventSource() { + registerHandler(HelloMessage.class, (msg, ctx) -> { + if (msg.version != ModInfo.PROTOCOL_VERSION) { + try { + closeReason = "Protocol version mismatch"; + close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + public void registerHandler(Class clazz, BiConsumer handler) { + messageHandler.registerHandler(clazz, handler); + } + + public void registerDisconnectHandler(Consumer handler) { + messageHandler.registerDisconnectHandler(handler); + } +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java index 3149d7e1a..48fc1b7c5 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java @@ -1,45 +1,67 @@ package com.seibel.distanthorizons.core.network; +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; +import com.seibel.distanthorizons.core.network.messages.HelloMessage; import com.seibel.distanthorizons.core.network.protocol.*; -import com.seibel.distanthorizons.core.util.NetworkUtil; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import org.apache.logging.log4j.Logger; -import java.io.Closeable; -import java.io.IOException; +public class NetworkServer extends NetworkEventSource implements AutoCloseable { + private static final Logger LOGGER = DhLoggerBuilder.getLogger(); -public class NetworkServer implements Closeable { // TODO move to config of some sort private final int port; - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(); + private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + Channel channel; public NetworkServer(int port) { this.port = port; - ServerBootstrap b = new ServerBootstrap() + LOGGER.info("Starting server on port {}", port); + + ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(NetworkUtil.getChannelInitializer(MessageHandlerSide.SERVER)); + .handler(new LoggingHandler(LogLevel.DEBUG)) + .childHandler(new DhNetworkChannelInitializer(messageHandler)); - b.bind(port) - .addListener((ChannelFuture channelFuture) -> { - if (!channelFuture.isSuccess()) - throw new RuntimeException("Failed to bind: " + channelFuture); - }) - .channel().closeFuture().addListener(future -> close()); + ChannelFuture bindFuture = bootstrap.bind(port); + bindFuture.addListener((ChannelFuture channelFuture) -> { + if (!channelFuture.isSuccess()) + throw new RuntimeException("Failed to bind: " + channelFuture.cause()); + + LOGGER.info("Server is ready"); + }); + + channel = bindFuture.channel(); + channel.closeFuture().addListener(future -> close()); + + registerHandler(HelloMessage.class, (msg, ctx) -> { + LOGGER.info("Client connected: {}", ctx.channel().remoteAddress()); + ctx.channel().writeAndFlush(new HelloMessage()); + }); + + registerDisconnectHandler(ctx -> { + LOGGER.info("Client disconnected: {}", ctx.channel().remoteAddress()); + }); } @Override - public void close() throws IOException { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); + public void close() { + if (closeReason != null) + LOGGER.error(closeReason); + + bossGroup.shutdownGracefully().syncUninterruptibly(); + workerGroup.shutdownGracefully().syncUninterruptibly(); + channel.close().syncUninterruptibly(); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/HelloMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/HelloMessage.java index c790e72a9..c29490b23 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/HelloMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/HelloMessage.java @@ -7,8 +7,6 @@ import io.netty.channel.ChannelHandlerContext; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; -// This message is critical to maintain backwards compatibility -// as it's used to receive version before everything else. public class HelloMessage extends Message { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); @@ -23,14 +21,4 @@ public class HelloMessage extends Message { public void decode(ChannelHandlerContext ctx, ByteBuf in) { version = in.readInt(); } - - @Override - public void handle_Server(ChannelHandlerContext ctx) { - LOGGER.log(Level.INFO, "Client version: " + version); - } - - @Override - public void handle_Client(ChannelHandlerContext ctx) { - LOGGER.log(Level.INFO, "Server version: " + version); - } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/Message.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/Message.java index ac665966f..decb24d7e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/Message.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/Message.java @@ -8,12 +8,5 @@ public abstract class Message { public abstract void encode(ChannelHandlerContext ctx, ByteBuf out); public abstract void decode(ChannelHandlerContext ctx, ByteBuf in); - - public void handle_Server(ChannelHandlerContext ctx) { - throw new UnsupportedOperationException(); - } - public void handle_Client(ChannelHandlerContext ctx) { - throw new UnsupportedOperationException(); - } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/DhNetworkChannelInitializer.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/DhNetworkChannelInitializer.java new file mode 100644 index 000000000..fa102b35d --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/DhNetworkChannelInitializer.java @@ -0,0 +1,31 @@ +package com.seibel.distanthorizons.core.network.protocol; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import org.jetbrains.annotations.NotNull; + +public class DhNetworkChannelInitializer extends ChannelInitializer { + private final MessageHandler messageHandler; + + public DhNetworkChannelInitializer(MessageHandler messageHandler) { + this.messageHandler = messageHandler; + } + + @Override + public void initChannel(@NotNull SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // Encoding + pipeline.addLast(new LengthFieldBasedFrameDecoder(Short.MAX_VALUE, 0, Short.BYTES, 0, Short.BYTES)); + pipeline.addLast(new MessageDecoder()); + + // Encoder + pipeline.addLast(new LengthFieldPrepender(Short.BYTES)); + pipeline.addLast(new MessageEncoder()); + + pipeline.addLast(messageHandler); + } +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java index 19ed385d7..90e84f1b6 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageHandler.java @@ -1,27 +1,53 @@ package com.seibel.distanthorizons.core.network.protocol; +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; +import com.seibel.distanthorizons.core.network.messages.HelloMessage; import com.seibel.distanthorizons.core.network.messages.Message; +import com.seibel.distanthorizons.coreapi.ModInfo; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +@ChannelHandler.Sharable public class MessageHandler extends SimpleChannelInboundHandler { - private final MessageHandlerSide side; + private static final Logger LOGGER = DhLoggerBuilder.getLogger(); - public MessageHandler(MessageHandlerSide side) { - this.side = side; + private Map, List>> handlers = new HashMap<>(); + private List> disconnectHandlers = new LinkedList<>(); + + public void registerHandler(Class clazz, BiConsumer handler) { + handlers.computeIfAbsent(clazz, k -> new LinkedList<>()) + .add((BiConsumer) handler); + } + + public void registerDisconnectHandler(Consumer handler) { + disconnectHandlers.add(handler); } @Override - public void channelRead0(ChannelHandlerContext ctx, Message msg) { - switch (side) { - case CLIENT: - msg.handle_Client(ctx); - break; - case SERVER: - msg.handle_Server(ctx); - break; - default: - throw new IllegalStateException("Invalid handler side"); + protected void channelRead0(ChannelHandlerContext ctx, Message msg) { + List> handlerList = handlers.get(msg.getClass()); + if (handlerList == null) { + LOGGER.warn("Unhandled message type: {}", msg.getClass().getSimpleName()); + return; } + + for (BiConsumer handler : handlerList) + handler.accept(msg, ctx); + } + + @Override + public void channelInactive(@NotNull ChannelHandlerContext ctx) { + for (Consumer handler : disconnectHandlers) + handler.accept(ctx); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java index e02ba0b20..f8e5b8061 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java @@ -14,13 +14,13 @@ public class MessageRegistry { registerMessage(1, HelloMessage.class, HelloMessage::new); }}; - private final Map> idToSupplier = new HashMap<>(); - private final Map, Integer> classToId = new HashMap<>(); + private final Map> idToSupplier = new HashMap<>(); + private final Map, Integer> classToId = new HashMap<>(); private MessageRegistry() { } public void registerMessage(int id, Class clazz, Supplier supplier) { - idToSupplier.put(id, (Supplier) supplier); + idToSupplier.put(id, supplier); classToId.put(clazz, id); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/NetworkUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/NetworkUtil.java deleted file mode 100644 index a67d68bb1..000000000 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/NetworkUtil.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.seibel.distanthorizons.core.util; - -import com.seibel.distanthorizons.core.network.protocol.*; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.LengthFieldPrepender; -import org.jetbrains.annotations.NotNull; - -public class NetworkUtil { - public static ChannelInitializer getChannelInitializer(MessageHandlerSide side) { - return new ChannelInitializer() { - @Override - public void initChannel(@NotNull SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - - // Encoding - pipeline.addLast(new LengthFieldBasedFrameDecoder(Short.MAX_VALUE, 0, Short.BYTES, 0, Short.BYTES)); - pipeline.addLast(new MessageDecoder()); - - // Encoder - pipeline.addLast(new LengthFieldPrepender(Short.BYTES)); - pipeline.addLast(new MessageEncoder()); - - pipeline.addLast(new MessageHandler(side)); - } - }; - } -} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/world/DhClientWorld.java b/core/src/main/java/com/seibel/distanthorizons/core/world/DhClientWorld.java index cebe0fec3..5ec9d3320 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/world/DhClientWorld.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/world/DhClientWorld.java @@ -1,11 +1,13 @@ package com.seibel.distanthorizons.core.world; +import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector; import com.seibel.distanthorizons.core.file.structure.ClientOnlySaveStructure; import com.seibel.distanthorizons.core.level.IDhLevel; import com.seibel.distanthorizons.core.level.DhClientLevel; import com.seibel.distanthorizons.core.network.NetworkClient; import com.seibel.distanthorizons.core.util.ThreadUtil; import com.seibel.distanthorizons.core.util.objects.EventLoop; +import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; @@ -16,6 +18,8 @@ import java.util.concurrent.ExecutorService; public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld { + private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class); + private final ConcurrentHashMap levels; public final ClientOnlySaveStructure saveStructure; @@ -32,8 +36,10 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld super(EWorldEnvironment.Client_Only); this.saveStructure = new ClientOnlySaveStructure(); - this.levels = new ConcurrentHashMap<>(); - this.networkClient = new NetworkClient("127.0.0.1", 25049); + this.levels = new ConcurrentHashMap<>(); + + // TODO server specific configs + this.networkClient = new NetworkClient(MC_CLIENT.getCurrentServerIp(), 25049); LOGGER.info("Started DhWorld of type "+this.environment); } @@ -105,6 +111,8 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld @Override public void close() { + this.networkClient.close(); + this.saveAndFlush().join(); for (DhClientLevel dhClientLevel : this.levels.values()) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/world/DhServerWorld.java b/core/src/main/java/com/seibel/distanthorizons/core/world/DhServerWorld.java index c32dcbe3d..dda84d78e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/world/DhServerWorld.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/world/DhServerWorld.java @@ -25,6 +25,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld this.saveStructure = new LocalSaveStructure(); this.levels = new HashMap<>(); + + // TODO move to global config once server specific configs are implemented this.networkServer = new NetworkServer(25049); LOGGER.info("Started "+DhServerWorld.class.getSimpleName()+" of type "+this.environment); @@ -90,6 +92,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld @Override public void close() { + this.networkServer.close(); + for (DhServerLevel level : this.levels.values()) { LOGGER.info("Unloading level " + level.getLevelWrapper().getDimensionType().getDimensionName());