Fix future id collisions between c<->s

(cause of occasional hangs on disconnection)
Add packet trace logging
This commit is contained in:
s809
2023-09-19 17:28:26 +05:00
parent 6f4e105542
commit 2cfc2c81c8
19 changed files with 214 additions and 46 deletions
@@ -23,4 +23,12 @@ public class MultiplayerConfig implements INetworkObject
this.fullDataRequestRateLimit = in.readInt();
}
@Override public String toString()
{
return "MultiplayerConfig{" +
"renderDistance=" + renderDistance +
", fullDataRequestRateLimit=" + fullDataRequestRateLimit +
'}';
}
}
@@ -75,7 +75,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage)));
.handler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage, this::addNewContext)));
private EConnectionState connectionState = EConnectionState.INITIAL;
private Channel channel;
@@ -45,7 +45,7 @@ public abstract class NetworkEventSource
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
protected final ConcurrentMap<Class<? extends NetworkMessage>, Set<Consumer<NetworkMessage>>> handlers = new ConcurrentHashMap<>();
private final Table<ChannelHandlerContext, Long, CompletableFuture<FutureTrackableNetworkMessage>> pendingFutures = Tables.synchronizedTable(HashBasedTable.create());
private final ConcurrentMap<ChannelHandlerContext, ConcurrentMap<Long, CompletableFuture<FutureTrackableNetworkMessage>>> pendingFutures = new ConcurrentHashMap<>();
protected boolean hasHandler(Class<? extends NetworkMessage> handlerClass)
{
@@ -57,9 +57,6 @@ public abstract class NetworkEventSource
{
boolean handled = false;
if (message instanceof FutureTrackableNetworkMessage)
((FutureTrackableNetworkMessage) message).futureId |= (long) message.getChannelContext().hashCode() << 32;
Set<Consumer<NetworkMessage>> handlerList = this.handlers.get(message.getClass());
if (handlerList != null)
{
@@ -73,27 +70,34 @@ public abstract class NetworkEventSource
if (message instanceof FutureTrackableNetworkMessage)
{
FutureTrackableNetworkMessage trackableMessage = (FutureTrackableNetworkMessage)message;
CompletableFuture<FutureTrackableNetworkMessage> future = pendingFutures.get(message.getChannelContext(), trackableMessage.futureId);
if (future != null)
ConcurrentMap<Long, CompletableFuture<FutureTrackableNetworkMessage>> subMap = pendingFutures.get(message.getChannelContext());
if (subMap != null)
{
handled = true;
if (message instanceof ExceptionMessage)
future.completeExceptionally(((ExceptionMessage) message).exception);
else
future.complete(trackableMessage);
CompletableFuture<FutureTrackableNetworkMessage> future = subMap.get(trackableMessage.futureId);
if (future != null)
{
handled = true;
if (message instanceof ExceptionMessage)
future.completeExceptionally(((ExceptionMessage) message).exception);
else
future.complete(trackableMessage);
}
}
}
if (!handled)
{
String error = "Unhandled message type: " + message.getClass().getSimpleName();
if (message instanceof FutureTrackableNetworkMessage)
error += ", future id: " + ((FutureTrackableNetworkMessage) message).futureId;
String error = "Unhandled message: " + message;
LOGGER.warn(error);
}
}
protected void addNewContext(ChannelHandlerContext ctx)
{
this.pendingFutures.put(ctx, new ConcurrentHashMap<>());
}
public <T extends NetworkMessage> void registerHandler(Class<T> handlerClass, Consumer<T> handlerImplementation)
{
this.handlers.computeIfAbsent(handlerClass, missingHandlerClass ->
@@ -115,20 +119,37 @@ public abstract class NetworkEventSource
protected <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg)
{
msg.futureId |= (long) ctx.hashCode() << 32;
msg.setChannelContext(ctx);
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
responseFuture.handle((response, throwable) -> {
if (!(throwable instanceof ChannelException))
pendingFutures.remove(ctx, msg.futureId);
{
ConcurrentMap<Long, CompletableFuture<FutureTrackableNetworkMessage>> subMap = pendingFutures.get(ctx);
if (subMap != null)
subMap.remove(msg.futureId);
}
if (throwable instanceof CancellationException)
msg.sendResponse(new CancelMessage());
return null;
});
pendingFutures.put(ctx, msg.futureId, (CompletableFuture<FutureTrackableNetworkMessage>) responseFuture);
ConcurrentMap<Long, CompletableFuture<FutureTrackableNetworkMessage>> subMap = pendingFutures.get(ctx);
if (subMap == null) {
// Was deleted before adding
responseFuture.completeExceptionally(ctx.channel().closeFuture().cause());
return responseFuture;
}
//noinspection unchecked
subMap.put(msg.futureId, (CompletableFuture<FutureTrackableNetworkMessage>) responseFuture);
if (!pendingFutures.containsKey(ctx)) {
// Was deleted while adding
responseFuture.completeExceptionally(ctx.channel().closeFuture().cause());
return responseFuture;
}
// If passed until here, cancelling is up to the cleaning side
ctx.writeAndFlush(msg).addListener(writeFuture -> {
if (writeFuture.cause() != null) {
@@ -138,21 +159,19 @@ public abstract class NetworkEventSource
return responseFuture;
}
protected final void completeAllFuturesExceptionally(ChannelHandlerContext ctx, Throwable cause) {
synchronized (pendingFutures)
{
for (CompletableFuture<FutureTrackableNetworkMessage> futureData : pendingFutures.row(ctx).values())
futureData.completeExceptionally(cause);
pendingFutures.row(ctx).clear();
}
protected final void completeAllFuturesExceptionally(ChannelHandlerContext ctx, Throwable cause)
{
ConcurrentMap<Long, CompletableFuture<FutureTrackableNetworkMessage>> map = pendingFutures.remove(ctx);
if (map == null) return;
for (CompletableFuture<FutureTrackableNetworkMessage> future : map.values())
future.completeExceptionally(cause);
}
protected final void completeAllFuturesExceptionally(Throwable cause) {
synchronized (pendingFutures)
{
for (ChannelHandlerContext ctx : pendingFutures.rowKeySet())
this.completeAllFuturesExceptionally(ctx, cause);
}
protected final void completeAllFuturesExceptionally(Throwable cause)
{
for (ChannelHandlerContext ctx : pendingFutures.keySet())
this.completeAllFuturesExceptionally(ctx, cause);
}
public void close()
@@ -99,7 +99,7 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage)));
.childHandler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage, this::addNewContext)));
ChannelFuture bindFuture = bootstrap.bind(this.port);
bindFuture.addListener((ChannelFuture channelFuture) ->
@@ -35,5 +35,10 @@ public class CloseReasonMessage extends NetworkMessage
@Override
public void decode(ByteBuf in) { this.reason = decodeString(in); }
@Override public String toString()
{
return super.toString("reason='" + reason + '\'');
}
}
@@ -58,4 +58,10 @@ public class ExceptionMessage extends FutureTrackableNetworkMessage
String message = decodeString(in);
exception = exceptionMap.get(id).getDeclaredConstructor(String.class).newInstance(message);
}
@Override public String toString()
{
return super.toString("exception=" + exception);
}
}
@@ -35,4 +35,9 @@ public class HelloMessage extends NetworkMessage
@Override
public void decode(ByteBuf in) { this.version = in.readInt(); }
@Override public String toString()
{
return super.toString("version=" + version);
}
}
@@ -48,4 +48,11 @@ public class FullDataSourceRequestMessage extends FutureTrackableNetworkMessage
{
dhSectionPos = INetworkObject.decodeStatic(DhSectionPos.zero(), in);
}
@Override
public String toString()
{
return super.toString("dhSectionPos=" + dhSectionPos);
}
}
@@ -95,4 +95,13 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage
dataBuffer.release();
}
}
@Override public String toString()
{
return super.toString(
"levelHashCode=" + levelHashCode +
", dataBuffer=" + dataBuffer
);
}
}
@@ -47,4 +47,10 @@ public class GenTaskPriorityRequestMessage extends FutureTrackableNetworkMessage
{
decodeCollection(in, posList, DhSectionPos::zero);
}
@Override public String toString()
{
return super.toString("posList=" + posList);
}
}
@@ -47,4 +47,10 @@ public class GenTaskPriorityResponseMessage extends FutureTrackableNetworkMessag
{
decodeMap(in, posList, DhSectionPos::zero, () -> 0);
}
@Override public String toString()
{
return super.toString("posList=" + posList);
}
}
@@ -61,4 +61,12 @@ public class FullDataChangeSummaryRequestMessage extends FutureTrackableNetworkM
return levelWrapper.getDimensionType().getDimensionName().hashCode() == levelHashCode;
}
@Override public String toString()
{
return super.toString(
"checksums=" + checksums +
", levelHashCode=" + levelHashCode
);
}
}
@@ -51,4 +51,10 @@ public class FullDataChangeSummaryResponseMessage extends FutureTrackableNetwork
{
decodeCollection(in, changedPosList, DhSectionPos::zero);
}
@Override public String toString()
{
return super.toString("changedPosList=" + changedPosList);
}
}
@@ -23,6 +23,7 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedF
import com.seibel.distanthorizons.core.level.DhServerLevel;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import com.seibel.distanthorizons.core.pos.DhChunkPos;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataOutputStream;
@@ -33,7 +34,7 @@ import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class FullDataPartialUpdateMessage extends FutureTrackableNetworkMessage
public class FullDataPartialUpdateMessage extends NetworkMessage
{
private ChunkSizedFullDataAccessor fullDataAccessor;
private DhServerLevel level;
@@ -52,8 +53,7 @@ public class FullDataPartialUpdateMessage extends FutureTrackableNetworkMessage
this.levelHashCode = level.getLevelWrapper().getDimensionType().getDimensionName().hashCode();
}
@Override
public void encode0(ByteBuf out) throws IOException
public void encode(ByteBuf out)
{
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream())
{
@@ -69,10 +69,13 @@ public class FullDataPartialUpdateMessage extends FutureTrackableNetworkMessage
out.writeInt(outputStream.size());
out.writeBytes(outputStream.toByteArray());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
@Override
public void decode0(ByteBuf in)
public void decode(ByteBuf in)
{
levelHashCode = in.readInt();
@@ -99,4 +102,14 @@ public class FullDataPartialUpdateMessage extends FutureTrackableNetworkMessage
dataBuffer.release();
}
}
@Override public String toString()
{
return super.toString(
"levelHashCode=" + levelHashCode +
", chunkPos=" + chunkPos +
", dataBuffer=" + dataBuffer
);
}
}
@@ -42,4 +42,9 @@ public class PlayerUUIDMessage extends FutureTrackableNetworkMessage
@Override
public void decode0(ByteBuf in) { this.playerUUID = new UUID(in.readLong(), in.readLong()); }
@Override public String toString()
{
return super.toString("playerUUID=" + playerUUID);
}
}
@@ -37,4 +37,9 @@ public class RemotePlayerConfigMessage extends FutureTrackableNetworkMessage
@Override
public void decode0(ByteBuf in) { this.payload = INetworkObject.decodeStatic(new MultiplayerConfig(), in); }
@Override public String toString()
{
return super.toString("payload=" + payload);
}
}
@@ -19,16 +19,30 @@
package com.seibel.distanthorizons.core.network.protocol;
import com.google.common.collect.MapMaker;
import com.seibel.distanthorizons.core.api.internal.SharedApi;
import com.seibel.distanthorizons.core.network.messages.base.ExceptionMessage;
import com.seibel.distanthorizons.core.world.EWorldEnvironment;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class FutureTrackableNetworkMessage extends NetworkMessage
{
private static final AtomicInteger lastId = new AtomicInteger();
// Only low 32 bits are sent (high bits are used for identifying a channel this request was sent from by remote peer)
public long futureId = lastId.incrementAndGet();
// 32 bits - Context ID (not transmitted)
// 1 bit - Requesting side (client - 0, server - 1)
// 31 bits - Request ID
public long futureId = lastId.incrementAndGet()
| ((Objects.requireNonNull(SharedApi.getEnvironment()) == EWorldEnvironment.Server_Only ? 1 : 0) << 31);
private static final AtomicInteger lastContextId = new AtomicInteger();
private static final ConcurrentMap<ChannelHandlerContext, Integer> contextIds = new MapMaker().weakKeys().makeMap();
public void sendResponse(FutureTrackableNetworkMessage responseMessage)
{
@@ -36,6 +50,13 @@ public abstract class FutureTrackableNetworkMessage extends NetworkMessage
getChannelContext().writeAndFlush(responseMessage);
}
@Override
public void setChannelContext(ChannelHandlerContext channelContext)
{
super.setChannelContext(channelContext);
this.futureId |= (long) contextIds.computeIfAbsent(channelContext, k -> lastContextId.incrementAndGet()) << 32;
}
public void sendResponse(Exception e)
{
sendResponse(new ExceptionMessage(e));
@@ -69,4 +90,19 @@ public abstract class FutureTrackableNetworkMessage extends NetworkMessage
protected abstract void encode0(ByteBuf out) throws Exception;
protected abstract void decode0(ByteBuf in) throws Exception;
@Override
public String toString()
{
return toString(null);
}
protected String toString(@Nullable String extraData)
{
return super.toString(
"futureId=" + futureId +
(extraData != null ? ", " + extraData : "")
);
}
}
@@ -19,11 +19,11 @@
package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.base.CloseEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
@@ -32,26 +32,36 @@ import java.util.function.Consumer;
@ChannelHandler.Sharable
public class MessageHandler extends SimpleChannelInboundHandler<NetworkMessage>
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private static final Logger LOGGER = LogManager.getLogger();
private final Consumer<NetworkMessage> messageConsumer;
private final Consumer<ChannelHandlerContext> channelActiveConsumer;
public MessageHandler(Consumer<NetworkMessage> messageConsumer)
public MessageHandler(Consumer<NetworkMessage> messageConsumer, Consumer<ChannelHandlerContext> channelActiveConsumer)
{
this.messageConsumer = messageConsumer;
this.channelActiveConsumer = channelActiveConsumer;
}
@Override
protected void channelRead0(ChannelHandlerContext channelContext, NetworkMessage message)
{
LOGGER.trace("Received message: " + message.getClass().getSimpleName());
message.setChannelContext(channelContext);
LOGGER.trace("Received message: " + message);
this.messageConsumer.accept(message);
}
@Override
public void channelInactive(@NotNull ChannelHandlerContext channelContext)
public void channelActive(@NotNull ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
this.channelActiveConsumer.accept(ctx);
}
@Override
public void channelInactive(@NotNull ChannelHandlerContext channelContext) throws Exception
{
super.channelInactive(channelContext);
this.channelRead0(channelContext, new CloseEvent());
}
@@ -20,6 +20,9 @@
package com.seibel.distanthorizons.core.network.protocol;
import io.netty.channel.ChannelHandlerContext;
import org.jetbrains.annotations.NotNull;
import javax.annotation.Nullable;
public abstract class NetworkMessage implements INetworkObject
{
@@ -36,5 +39,16 @@ public abstract class NetworkMessage implements INetworkObject
throw new IllegalStateException("Channel context cannot be changed after initial setting.");
this.channelContext = channelContext;
}
@Override public String toString()
{
return toString("");
}
protected String toString(@NotNull String extraData)
{
return this.getClass().getSimpleName() + "{" + extraData + '}';
}
}