Base is pretty much finished

This commit is contained in:
s809
2023-07-08 12:25:20 +05:00
parent 62ff9606c1
commit 83ab7e46b3
11 changed files with 85 additions and 23 deletions
@@ -5,6 +5,7 @@ import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.DhNetworkChannelInitializer;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -29,6 +30,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable {
}
private static final int FAILURE_RECONNECT_DELAY_SEC = 5;
private static final int FAILURE_RECONNECT_ATTEMPTS = 5;
// TODO move to config of some sort
private final InetSocketAddress address;
@@ -42,6 +44,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable {
private State state;
private Channel channel;
private int reconnectAttempts = FAILURE_RECONNECT_ATTEMPTS;
public NetworkClient(String host, int port) {
this.address = new InetSocketAddress(host, port);
@@ -62,6 +65,8 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable {
registerHandler(CloseMessage.class, (msg, ctx) -> {
LOGGER.info("Disconnected from server: {}", ctx.channel().remoteAddress());
if (state == State.CLOSE_WAIT)
close();
});
}
@@ -71,21 +76,31 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable {
ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.addListener((ChannelFuture channelFuture) -> {
if (!channelFuture.isSuccess()) return;
if (!channelFuture.isSuccess()) {
LOGGER.warn("Connection failed: {0}", channelFuture.cause());
return;
}
channel.writeAndFlush(new HelloMessage());
});
channel = connectFuture.channel();
channel.closeFuture().addListener((ChannelFuture channelFuture) -> {
switch (state) {
case CLOSE_WAIT:
close();
break;
case OPEN:
reconnectAttempts--;
LOGGER.info("Reconnection attempts left: {} of {}", reconnectAttempts, FAILURE_RECONNECT_ATTEMPTS);
if (reconnectAttempts == 0) {
state = State.CLOSE_WAIT;
return;
}
state = State.RECONNECT;
workerGroup.schedule(this::connect, FAILURE_RECONNECT_DELAY_SEC, TimeUnit.SECONDS);
break;
case RECONNECT_FORCE:
LOGGER.info("Reconnecting forcefully.");
reconnectAttempts = FAILURE_RECONNECT_ATTEMPTS;
state = State.RECONNECT;
workerGroup.schedule(this::connect, 0, TimeUnit.SECONDS);
break;
@@ -104,6 +119,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable {
if (closeReason != null)
LOGGER.error(closeReason);
if (state == State.CLOSED) return;
state = State.CLOSED;
workerGroup.shutdownGracefully().syncUninterruptibly();
channel.close().syncUninterruptibly();
@@ -6,10 +6,7 @@ import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.DhNetworkChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
@@ -25,6 +22,7 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable {
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private Channel channel;
private boolean isClosed = false;
public NetworkServer(int port) {
this.port = port;
@@ -67,7 +65,7 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable {
public void disconnectClient(ChannelHandlerContext ctx, String reason) {
ctx.channel().config().setAutoRead(false);
ctx.writeAndFlush(new CloseReasonMessage(reason))
.addListener(future -> ctx.close());
.addListener(ChannelFutureListener.CLOSE);
}
@Override
@@ -75,8 +73,12 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable {
if (closeReason != null)
LOGGER.error(closeReason);
bossGroup.shutdownGracefully().syncUninterruptibly();
if (isClosed) return;
isClosed = true;
LOGGER.info("Shutting down the server.");
workerGroup.shutdownGracefully().syncUninterruptibly();
channel.close().syncUninterruptibly();
bossGroup.shutdownGracefully().syncUninterruptibly();
LOGGER.info("Server is closed.");
}
}
@@ -8,6 +8,11 @@ import io.netty.buffer.ByteBuf;
public class LodConfigMessage implements INetworkMessage {
public DhRemotePlayer.Config config;
public LodConfigMessage() { }
public LodConfigMessage(DhRemotePlayer.Config config) {
this.config = config;
}
@Override
public void encode(ByteBuf out) {
config.encode(out);
@@ -1,10 +1,11 @@
package com.seibel.distanthorizons.core.network.protocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
public class DhNetworkChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -18,14 +19,17 @@ public class DhNetworkChannelInitializer extends ChannelInitializer<SocketChanne
public void initChannel(@NotNull SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// Encoding
pipeline.addLast(new LengthFieldBasedFrameDecoder(Short.MAX_VALUE, 0, Short.BYTES, 0, Short.BYTES));
pipeline.addLast(new MessageDecoder());
// Encoder
pipeline.addLast(new LengthFieldPrepender(Short.BYTES));
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new OutboundExceptionRouter());
// Decoder
pipeline.addLast(new LengthFieldBasedFrameDecoder(Short.MAX_VALUE, 0, Short.BYTES, 0, Short.BYTES));
pipeline.addLast(new MessageDecoder());
// Handler
pipeline.addLast(messageHandler);
pipeline.addLast(new ExceptionHandler());
}
}
@@ -0,0 +1,16 @@
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.Logger;
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("Exception caught on channel", cause);
ctx.close();
}
}
@@ -1,8 +1,10 @@
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.logging.log4j.Logger;
import java.util.List;
@@ -1,8 +1,10 @@
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.logging.log4j.Logger;
public class MessageEncoder extends MessageToByteEncoder<INetworkMessage> {
@Override
@@ -27,6 +27,8 @@ public class MessageHandler extends SimpleChannelInboundHandler<INetworkMessage>
@Override
protected void channelRead0(ChannelHandlerContext ctx, INetworkMessage msg) {
LOGGER.trace("Received message: {}", msg.getClass().getSimpleName());
List<BiConsumer<INetworkMessage, ChannelHandlerContext>> handlerList = handlers.get(msg.getClass());
if (handlerList == null) {
LOGGER.warn("Unhandled message type: {}", msg.getClass().getSimpleName());
@@ -0,0 +1,14 @@
package com.seibel.distanthorizons.core.network.protocol;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class OutboundExceptionRouter extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
super.write(ctx, msg, promise);
}
}
@@ -56,7 +56,7 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
// TODO Proper config handling
networkClient.registerAckHandler(PlayerUUIDMessage.class, ctx -> {
ctx.writeAndFlush(new LodConfigMessage());
ctx.writeAndFlush(new LodConfigMessage(new DhRemotePlayer.Config()));
});
networkClient.registerHandler(LodConfigMessage.class, (msg, ctx) -> {
@@ -72,11 +72,12 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
});
networkServer.registerHandler(LodConfigMessage.class, (msg, ctx) -> {
// TODO Take notice of received config
// TODO Take notice of received config and possibly echo back a constrained version
ctx.writeAndFlush(new AckMessage(LodConfigMessage.class));
});
networkServer.registerHandler(RequestChunksMessage.class, (msg, ctx) -> {
LOGGER.info("RequestChunksMessage");
// hasReceivedChunkRequest should be false somewhere ???
// to avoid sending updates until client says at least something about its state
});
@@ -89,10 +90,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
public void removePlayer(IServerPlayerWrapper serverPlayer) {
DhRemotePlayer dhPlayer = playersByUUID.remove(serverPlayer.getUUID());
ChannelHandlerContext ctx = playersByConnection.inverse().remove(dhPlayer);
if (ctx != null) {
ctx.writeAndFlush(new CloseReasonMessage("You are being disconnected."))
.addListener(future -> ctx.close());
}
if (ctx != null)
networkServer.disconnectClient(ctx, "You are being disconnected.");
}
@Override