Minimal ping-pong but less rigid impl

DhClientWorld#close is never fired smh
This commit is contained in:
s809
2023-06-29 20:58:43 +05:00
parent e660c466f8
commit 2fc76e2842
11 changed files with 192 additions and 102 deletions
@@ -1,8 +1,8 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.util.NetworkUtil;
import com.seibel.distanthorizons.core.network.protocol.MessageHandlerSide;
import com.seibel.distanthorizons.core.network.protocol.DhNetworkChannelInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -10,12 +10,13 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.logging.log4j.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class NetworkClient implements Closeable {
public class NetworkClient extends NetworkEventSource implements AutoCloseable {
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private enum State {
OPEN,
RECONNECT,
@@ -27,30 +28,29 @@ public class NetworkClient implements Closeable {
// TODO move to config of some sort
private final String host;
private final int port;
private State state = State.OPEN;
private final Bootstrap clientBootstrap;
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final Bootstrap clientBootstrap = new Bootstrap()
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DhNetworkChannelInitializer(messageHandler));
private State state = State.OPEN;
private Channel channel;
public NetworkClient(String host, int port) {
this.host = host;
this.port = port;
clientBootstrap = new Bootstrap()
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(NetworkUtil.getChannelInitializer(MessageHandlerSide.CLIENT));
connect();
}
private void connect() {
LOGGER.info("Connecting to {}:{}", host, port);
ChannelFuture connectFuture = clientBootstrap.connect(host, port);
connectFuture.addListener((ChannelFuture channelFuture) -> {
if (!channelFuture.isSuccess()) return;
channel = channelFuture.channel();
channel.writeAndFlush(new HelloMessage());
});
@@ -61,6 +61,14 @@ public class NetworkClient implements Closeable {
workerGroup.schedule(this::connect, state == State.RECONNECT ? 0 : FAILURE_RECONNECT_DELAY_SEC, TimeUnit.SECONDS);
state = State.OPEN;
});
registerHandler(HelloMessage.class, (msg, ctx) -> {
LOGGER.info("Connected");
});
registerDisconnectHandler(ctx -> {
LOGGER.info("Disconnected");
});
}
/** Kills the current connection, triggering auto-reconnection immediately. */
@@ -70,8 +78,12 @@ public class NetworkClient implements Closeable {
}
@Override
public void close() throws IOException {
public void close() {
if (closeReason != null)
LOGGER.error(closeReason);
state = State.CLOSED;
workerGroup.shutdownGracefully();
workerGroup.shutdownGracefully().syncUninterruptibly();
channel.close().syncUninterruptibly();
}
}
@@ -0,0 +1,36 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.messages.Message;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.channel.ChannelHandlerContext;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class NetworkEventSource implements AutoCloseable {
protected final MessageHandler messageHandler = new MessageHandler();
protected String closeReason = null;
public NetworkEventSource() {
registerHandler(HelloMessage.class, (msg, ctx) -> {
if (msg.version != ModInfo.PROTOCOL_VERSION) {
try {
closeReason = "Protocol version mismatch";
close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
public <T extends Message> void registerHandler(Class<T> clazz, BiConsumer<T, ChannelHandlerContext> handler) {
messageHandler.registerHandler(clazz, handler);
}
public void registerDisconnectHandler(Consumer<ChannelHandlerContext> handler) {
messageHandler.registerDisconnectHandler(handler);
}
}
@@ -1,45 +1,67 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.*;
import com.seibel.distanthorizons.core.util.NetworkUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.logging.log4j.Logger;
import java.io.Closeable;
import java.io.IOException;
public class NetworkServer extends NetworkEventSource implements AutoCloseable {
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
public class NetworkServer implements Closeable {
// TODO move to config of some sort
private final int port;
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
Channel channel;
public NetworkServer(int port) {
this.port = port;
ServerBootstrap b = new ServerBootstrap()
LOGGER.info("Starting server on port {}", port);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(NetworkUtil.getChannelInitializer(MessageHandlerSide.SERVER));
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new DhNetworkChannelInitializer(messageHandler));
b.bind(port)
.addListener((ChannelFuture channelFuture) -> {
if (!channelFuture.isSuccess())
throw new RuntimeException("Failed to bind: " + channelFuture);
})
.channel().closeFuture().addListener(future -> close());
ChannelFuture bindFuture = bootstrap.bind(port);
bindFuture.addListener((ChannelFuture channelFuture) -> {
if (!channelFuture.isSuccess())
throw new RuntimeException("Failed to bind: " + channelFuture.cause());
LOGGER.info("Server is ready");
});
channel = bindFuture.channel();
channel.closeFuture().addListener(future -> close());
registerHandler(HelloMessage.class, (msg, ctx) -> {
LOGGER.info("Client connected: {}", ctx.channel().remoteAddress());
ctx.channel().writeAndFlush(new HelloMessage());
});
registerDisconnectHandler(ctx -> {
LOGGER.info("Client disconnected: {}", ctx.channel().remoteAddress());
});
}
@Override
public void close() throws IOException {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
public void close() {
if (closeReason != null)
LOGGER.error(closeReason);
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
channel.close().syncUninterruptibly();
}
}
@@ -7,8 +7,6 @@ import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
// This message is critical to maintain backwards compatibility
// as it's used to receive version before everything else.
public class HelloMessage extends Message {
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
@@ -23,14 +21,4 @@ public class HelloMessage extends Message {
public void decode(ChannelHandlerContext ctx, ByteBuf in) {
version = in.readInt();
}
@Override
public void handle_Server(ChannelHandlerContext ctx) {
LOGGER.log(Level.INFO, "Client version: " + version);
}
@Override
public void handle_Client(ChannelHandlerContext ctx) {
LOGGER.log(Level.INFO, "Server version: " + version);
}
}
@@ -8,12 +8,5 @@ public abstract class Message {
public abstract void encode(ChannelHandlerContext ctx, ByteBuf out);
public abstract void decode(ChannelHandlerContext ctx, ByteBuf in);
public void handle_Server(ChannelHandlerContext ctx) {
throw new UnsupportedOperationException();
}
public void handle_Client(ChannelHandlerContext ctx) {
throw new UnsupportedOperationException();
}
}
@@ -0,0 +1,31 @@
package com.seibel.distanthorizons.core.network.protocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.jetbrains.annotations.NotNull;
public class DhNetworkChannelInitializer extends ChannelInitializer<SocketChannel> {
private final MessageHandler messageHandler;
public DhNetworkChannelInitializer(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public void initChannel(@NotNull SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// Encoding
pipeline.addLast(new LengthFieldBasedFrameDecoder(Short.MAX_VALUE, 0, Short.BYTES, 0, Short.BYTES));
pipeline.addLast(new MessageDecoder());
// Encoder
pipeline.addLast(new LengthFieldPrepender(Short.BYTES));
pipeline.addLast(new MessageEncoder());
pipeline.addLast(messageHandler);
}
}
@@ -1,27 +1,53 @@
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.messages.Message;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ChannelHandler.Sharable
public class MessageHandler extends SimpleChannelInboundHandler<Message> {
private final MessageHandlerSide side;
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
public MessageHandler(MessageHandlerSide side) {
this.side = side;
private Map<Class<? extends Message>, List<BiConsumer<Message, ChannelHandlerContext>>> handlers = new HashMap<>();
private List<Consumer<ChannelHandlerContext>> disconnectHandlers = new LinkedList<>();
public <T extends Message> void registerHandler(Class<T> clazz, BiConsumer<T, ChannelHandlerContext> handler) {
handlers.computeIfAbsent(clazz, k -> new LinkedList<>())
.add((BiConsumer<Message, ChannelHandlerContext>) handler);
}
public void registerDisconnectHandler(Consumer<ChannelHandlerContext> handler) {
disconnectHandlers.add(handler);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Message msg) {
switch (side) {
case CLIENT:
msg.handle_Client(ctx);
break;
case SERVER:
msg.handle_Server(ctx);
break;
default:
throw new IllegalStateException("Invalid handler side");
protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
List<BiConsumer<Message, ChannelHandlerContext>> handlerList = handlers.get(msg.getClass());
if (handlerList == null) {
LOGGER.warn("Unhandled message type: {}", msg.getClass().getSimpleName());
return;
}
for (BiConsumer<Message, ChannelHandlerContext> handler : handlerList)
handler.accept(msg, ctx);
}
@Override
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
for (Consumer<ChannelHandlerContext> handler : disconnectHandlers)
handler.accept(ctx);
}
}
@@ -14,13 +14,13 @@ public class MessageRegistry {
registerMessage(1, HelloMessage.class, HelloMessage::new);
}};
private final Map<Integer, Supplier<Message>> idToSupplier = new HashMap<>();
private final Map<Class<?>, Integer> classToId = new HashMap<>();
private final Map<Integer, Supplier<? extends Message>> idToSupplier = new HashMap<>();
private final Map<Class<? extends Message>, Integer> classToId = new HashMap<>();
private MessageRegistry() { }
public <T extends Message> void registerMessage(int id, Class<T> clazz, Supplier<T> supplier) {
idToSupplier.put(id, (Supplier<Message>) supplier);
idToSupplier.put(id, supplier);
classToId.put(clazz, id);
}
@@ -1,30 +0,0 @@
package com.seibel.distanthorizons.core.util;
import com.seibel.distanthorizons.core.network.protocol.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.jetbrains.annotations.NotNull;
public class NetworkUtil {
public static ChannelInitializer<SocketChannel> getChannelInitializer(MessageHandlerSide side) {
return new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(@NotNull SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// Encoding
pipeline.addLast(new LengthFieldBasedFrameDecoder(Short.MAX_VALUE, 0, Short.BYTES, 0, Short.BYTES));
pipeline.addLast(new MessageDecoder());
// Encoder
pipeline.addLast(new LengthFieldPrepender(Short.BYTES));
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageHandler(side));
}
};
}
}
@@ -1,11 +1,13 @@
package com.seibel.distanthorizons.core.world;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.structure.ClientOnlySaveStructure;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.level.DhClientLevel;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.util.objects.EventLoop;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
@@ -16,6 +18,8 @@ import java.util.concurrent.ExecutorService;
public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
{
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
private final ConcurrentHashMap<IClientLevelWrapper, DhClientLevel> levels;
public final ClientOnlySaveStructure saveStructure;
@@ -32,8 +36,10 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
super(EWorldEnvironment.Client_Only);
this.saveStructure = new ClientOnlySaveStructure();
this.levels = new ConcurrentHashMap<>();
this.networkClient = new NetworkClient("127.0.0.1", 25049);
this.levels = new ConcurrentHashMap<>();
// TODO server specific configs
this.networkClient = new NetworkClient(MC_CLIENT.getCurrentServerIp(), 25049);
LOGGER.info("Started DhWorld of type "+this.environment);
}
@@ -105,6 +111,8 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
@Override
public void close()
{
this.networkClient.close();
this.saveAndFlush().join();
for (DhClientLevel dhClientLevel : this.levels.values())
{
@@ -25,6 +25,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
this.saveStructure = new LocalSaveStructure();
this.levels = new HashMap<>();
// TODO move to global config once server specific configs are implemented
this.networkServer = new NetworkServer(25049);
LOGGER.info("Started "+DhServerWorld.class.getSimpleName()+" of type "+this.environment);
@@ -90,6 +92,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
@Override
public void close()
{
this.networkServer.close();
for (DhServerLevel level : this.levels.values())
{
LOGGER.info("Unloading level " + level.getLevelWrapper().getDimensionType().getDimensionName());