Requests now work properly

Merge request tracker into event source
This commit is contained in:
s809
2023-07-23 14:39:29 +05:00
parent 0c60395426
commit d29ba9d423
11 changed files with 114 additions and 133 deletions
@@ -6,7 +6,6 @@ import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.future.NetworkRequestTracker;
import com.seibel.distanthorizons.core.network.messages.ChunkRequestMessage;
import com.seibel.distanthorizons.core.network.messages.ChunkResponseMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
@@ -20,24 +19,22 @@ public class RemoteFullDataFileHandler extends FullDataFileHandler
protected static final Logger LOGGER = DhLoggerBuilder.getLogger();
private final NetworkClient networkClient;
private final NetworkRequestTracker<ChunkResponseMessage, DhSectionPos> chunkRequestTracker;
public RemoteFullDataFileHandler(IDhLevel level, AbstractSaveStructure saveStructure, NetworkClient networkClient) {
super(level, saveStructure);
this.networkClient = networkClient;
this.chunkRequestTracker = new NetworkRequestTracker<>(networkClient, ChunkResponseMessage.class);
}
@Override
public CompletableFuture<IFullDataSource> read(DhSectionPos pos) {
// TODO: LOD data file updating is probably incomplete
return super.read(pos).thenCompose((fullDataSource) -> {
CompletableFuture<ChunkResponseMessage> responseFuture = chunkRequestTracker.sendRequest(networkClient.getChannel(), new ChunkRequestMessage(pos))
CompletableFuture<ChunkResponseMessage> responseFuture = networkClient.<ChunkResponseMessage>sendRequest(new ChunkRequestMessage(pos))
.exceptionally(throwable -> {
LOGGER.error(throwable);
return null;
});
responseFuture.thenAccept(response -> LOGGER.info("ChunkResponseMessage "+response.dhSectionPos));
responseFuture.thenAccept(response -> LOGGER.info("ChunkResponseMessage "+pos));
FullDataMetaFile metaFile = this.getLoadOrMakeFile(pos, true);
return onDataFileUpdate(fullDataSource, metaFile, iFullDataSource -> {}, iFullDataSource -> true);
@@ -47,6 +44,5 @@ public class RemoteFullDataFileHandler extends FullDataFileHandler
@Override
public void close() {
super.close();
chunkRequestTracker.close();
}
}
@@ -4,18 +4,17 @@ import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class NetworkClient extends NetworkEventSource implements AutoCloseable
@@ -99,8 +98,10 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
});
this.channel = connectFuture.channel();
this. channel.closeFuture().addListener((ChannelFuture channelFuture) ->
this.channel.closeFuture().addListener((ChannelFuture channelFuture) ->
{
this.completeAllFuturesExceptionally(channelFuture.cause());
switch (this.connectionState)
{
case OPEN:
@@ -134,8 +135,9 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
this.channel.disconnect();
}
public Channel getChannel() {
return channel;
public final <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(FutureTrackableNetworkMessage msg)
{
return this.sendRequest(this.channel.pipeline().context(MessageHandler.class), msg);
}
@Override
@@ -1,12 +1,17 @@
package com.seibel.distanthorizons.core.network;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.messages.AckMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -14,7 +19,7 @@ public abstract class NetworkEventSource
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
protected final Map<Class<? extends INetworkMessage>, Set<BiConsumer<INetworkMessage, ChannelHandlerContext>>> handlers = new HashMap<>();
private final Table<ChannelHandlerContext, Integer, CompletableFuture<FutureTrackableNetworkMessage>> pendingFutures = HashBasedTable.create();
protected boolean hasHandler(Class<? extends INetworkMessage> handlerClass)
{
@@ -23,16 +28,32 @@ public abstract class NetworkEventSource
protected void handleMessage(INetworkMessage message, ChannelHandlerContext channelContext)
{
boolean handled = false;
Set<BiConsumer<INetworkMessage, ChannelHandlerContext>> handlerList = this.handlers.get(message.getClass());
if (handlerList == null || handlerList.isEmpty())
if (handlerList != null)
{
LOGGER.warn("Unhandled message type: " + message.getClass().getSimpleName());
return;
for (BiConsumer<INetworkMessage, ChannelHandlerContext> handler : handlerList)
{
handled = true;
handler.accept(message, channelContext);
}
}
for (BiConsumer<INetworkMessage, ChannelHandlerContext> handler : handlerList)
if (message instanceof FutureTrackableNetworkMessage)
{
handler.accept(message, channelContext);
FutureTrackableNetworkMessage trackableMessage = (FutureTrackableNetworkMessage)message;
CompletableFuture<FutureTrackableNetworkMessage> future = pendingFutures.remove(channelContext, trackableMessage.futureId);
if (future != null)
{
handled = true;
future.complete(trackableMessage);
}
}
if (!handled)
{
LOGGER.warn("Unhandled message type: " + message.getClass().getSimpleName());
}
}
@@ -59,8 +80,33 @@ public abstract class NetworkEventSource
.remove(handlerImplementation);
}
protected <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg)
{
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
pendingFutures.put(ctx, msg.futureId, (CompletableFuture<FutureTrackableNetworkMessage>) responseFuture);
ctx.writeAndFlush(msg).addListener(writeFuture -> {
if (writeFuture.cause() != null) {
responseFuture.completeExceptionally(writeFuture.cause());
}
});
return responseFuture;
}
protected void completeAllFuturesExceptionally(ChannelHandlerContext ctx, Throwable cause) {
for (CompletableFuture<FutureTrackableNetworkMessage> futureData : pendingFutures.row(ctx).values())
futureData.completeExceptionally(cause);
pendingFutures.row(ctx).clear();
}
protected void completeAllFuturesExceptionally(Throwable cause) {
for (ChannelHandlerContext ctx : pendingFutures.rowKeySet())
this.completeAllFuturesExceptionally(ctx, cause);
}
public void close()
{
this.handlers.clear();
completeAllFuturesExceptionally(new Exception(this.getClass().getSimpleName()+" is closed."));
}
}
@@ -5,6 +5,7 @@ import com.seibel.distanthorizons.core.network.messages.AckMessage;
import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer;
import com.seibel.distanthorizons.coreapi.ModInfo;
@@ -16,6 +17,8 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CompletableFuture;
public class NetworkServer extends NetworkEventSource implements AutoCloseable
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
@@ -66,6 +69,7 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
this.registerHandler(CloseMessage.class, (closeMessage, channelContext) ->
{
LOGGER.info("Client disconnected: "+channelContext.channel().remoteAddress());
this.completeAllFuturesExceptionally(channelContext, channelContext.channel().closeFuture().cause());
});
}
@@ -99,6 +103,12 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg)
{
return super.sendRequest(ctx, msg);
}
@Override
public void close()
{
@@ -1,8 +0,0 @@
package com.seibel.distanthorizons.core.network.future;
import com.seibel.distanthorizons.core.network.protocol.INetworkMessage;
public interface IFutureTrackableNetworkMessage<TKey extends Comparable<TKey>> extends INetworkMessage
{
TKey getRequestKey();
}
@@ -1,72 +0,0 @@
package com.seibel.distanthorizons.core.network.future;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkEventSource;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundInvoker;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class NetworkRequestTracker
<TResponse extends IFutureTrackableNetworkMessage<TKey>, TKey extends Comparable<TKey>>
implements AutoCloseable
{
private final ChildNetworkEventSource<?> eventSource;
private final Set<Channel> knownChannels = new HashSet<>();
private final Table<Channel, TKey, CompletableFuture<TResponse>> pendingFutures = HashBasedTable.create();
public NetworkRequestTracker(
NetworkEventSource eventSource,
Class<TResponse> responseClass)
{
this.eventSource = new ChildNetworkEventSource<>(eventSource);
registerNetworkHandlers(responseClass);
}
private void registerNetworkHandlers(Class<TResponse> responseClass)
{
this.eventSource.registerHandler(responseClass, (msg, ctx) -> {
CompletableFuture<TResponse> future = pendingFutures.remove(ctx.channel(), msg.getRequestKey());
if (future != null) {
future.complete(msg);
}
});
}
public CompletableFuture<TResponse> sendRequest(Channel channel, IFutureTrackableNetworkMessage<TKey> msg)
{
if (knownChannels.add(channel))
channel.closeFuture().addListener(closeFuture -> completeAllExceptionally(channel, closeFuture.cause()));
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
pendingFutures.put(channel, msg.getRequestKey(), responseFuture);
channel.writeAndFlush(msg).addListener(writeFuture -> {
if (writeFuture.cause() != null) {
responseFuture.completeExceptionally(writeFuture.cause());
}
});
return responseFuture;
}
private void completeAllExceptionally(Channel channel, Throwable cause) {
for (CompletableFuture<TResponse> responseFuture : pendingFutures.row(channel).values())
responseFuture.completeExceptionally(cause);
pendingFutures.row(channel).clear();
}
@Override public void close()
{
this.eventSource.close();
for (Channel channel : pendingFutures.rowKeySet())
this.completeAllExceptionally(channel, new Exception(this.getClass().getSimpleName()+" is closed."));
}
}
@@ -1,11 +1,11 @@
package com.seibel.distanthorizons.core.network.messages;
import com.seibel.distanthorizons.core.network.future.IFutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.buffer.ByteBuf;
public class ChunkRequestMessage implements IFutureTrackableNetworkMessage<DhSectionPos>
public class ChunkRequestMessage extends FutureTrackableNetworkMessage
{
public DhSectionPos dhSectionPos;
@@ -15,16 +15,14 @@ public class ChunkRequestMessage implements IFutureTrackableNetworkMessage<DhSec
this.dhSectionPos = dhSectionPos;
}
@Override public DhSectionPos getRequestKey() { return dhSectionPos; }
@Override
public void encode(ByteBuf out)
public void encode0(ByteBuf out)
{
dhSectionPos.encode(out);
}
@Override
public void decode(ByteBuf in)
public void decode0(ByteBuf in)
{
dhSectionPos = INetworkObject.decode(new DhSectionPos((byte)0, 0, 0), in);
}
@@ -1,31 +1,21 @@
package com.seibel.distanthorizons.core.network.messages;
import com.seibel.distanthorizons.core.network.future.IFutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.buffer.ByteBuf;
public class ChunkResponseMessage implements IFutureTrackableNetworkMessage<DhSectionPos>
public class ChunkResponseMessage extends FutureTrackableNetworkMessage
{
public DhSectionPos dhSectionPos;
public ChunkResponseMessage() {}
public ChunkResponseMessage(DhSectionPos dhSectionPos) {
this.dhSectionPos = dhSectionPos;
}
@Override public DhSectionPos getRequestKey() { return dhSectionPos; }
@Override
public void encode(ByteBuf out)
public void encode0(ByteBuf out)
{
dhSectionPos.encode(out);
}
@Override
public void decode(ByteBuf in)
public void decode0(ByteBuf in)
{
dhSectionPos = INetworkObject.decode(new DhSectionPos((byte)0, 0, 0), in);
}
}
@@ -0,0 +1,30 @@
package com.seibel.distanthorizons.core.network.protocol;
import io.netty.buffer.ByteBuf;
public abstract class FutureTrackableNetworkMessage implements INetworkMessage
{
private static int lastId = 0;
public int futureId = lastId++;
public static FutureTrackableNetworkMessage makeResponse(FutureTrackableNetworkMessage requestMessage, FutureTrackableNetworkMessage responseMessage)
{
responseMessage.futureId = requestMessage.futureId;
return responseMessage;
}
@Override public final void encode(ByteBuf out)
{
out.writeInt(futureId);
this.encode0(out);
}
@Override public final void decode(ByteBuf in)
{
futureId = in.readInt();
this.decode0(in);
}
protected abstract void encode0(ByteBuf out);
protected abstract void decode0(ByteBuf in);
}
@@ -21,7 +21,7 @@ import java.util.function.Consumer;
* @author Leetom
* @version 2022-11-6
*/
public class DhSectionPos implements Comparable<DhSectionPos>, INetworkObject
public class DhSectionPos implements INetworkObject
{
/**
* The lowest detail level a Section position can hold.
@@ -241,18 +241,6 @@ public class DhSectionPos implements Comparable<DhSectionPos>, INetworkObject
this.sectionZ == that.sectionZ;
}
@Override public int compareTo(@NotNull DhSectionPos o)
{
if (this.sectionDetailLevel != o.sectionDetailLevel)
return this.sectionDetailLevel - o.sectionDetailLevel;
if (this.sectionX != o.sectionX)
return this.sectionX - o.sectionX;
if (this.sectionZ != o.sectionZ)
return this.sectionZ - o.sectionZ;
return 0;
}
@Override
public int hashCode()
{
@@ -11,6 +11,7 @@ import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.messages.*;
import com.seibel.distanthorizons.core.network.messages.ChunkRequestMessage;
import com.seibel.distanthorizons.core.network.objects.RemotePlayer;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
@@ -108,7 +109,7 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
level.serverside.worldGenTick(new DhBlockPos2D(msg.dhSectionPos.sectionX, msg.dhSectionPos.sectionZ));
// Send chunk response message back
ctx.writeAndFlush(new ChunkResponseMessage(msg.dhSectionPos));
ctx.writeAndFlush(FutureTrackableNetworkMessage.makeResponse(msg, new ChunkResponseMessage()));
});
}