Finer control over message handling (untested)

This commit is contained in:
s809
2023-07-19 12:19:39 +05:00
parent 9a2799e83b
commit 02acfaa3ed
9 changed files with 149 additions and 38 deletions
@@ -1,12 +1,50 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.channel.ChannelHandlerContext;
import java.io.File;
import java.util.concurrent.CompletableFuture;
public class RemoteFullDataFileHandler extends FullDataFileHandler
{
public RemoteFullDataFileHandler(IDhLevel level, AbstractSaveStructure saveStructure) { super(level, saveStructure); }
private final Multimap<ChannelHandlerContext, ChunkRequest> chunkRequests = HashMultimap.create();
public RemoteFullDataFileHandler(IDhLevel level, AbstractSaveStructure saveStructure, ChildNetworkEventSource<NetworkClient> eventSource) {
super(level, saveStructure);
this.registerNetworkHandlers(eventSource);
}
private void registerNetworkHandlers(ChildNetworkEventSource<NetworkClient> eventSource) {
//eventSource.registerHandler();
}
@Override
public CompletableFuture<IFullDataSource> read(DhSectionPos pos) {
// TODO read and force update somehow instead ????
return super.read(pos).handle((fullDataSource, throwable) -> {
if (fullDataSource == null) {
}
return fullDataSource;
});
}
@Override
public void close() {
super.close();
}
private static class ChunkRequest {
}
}
@@ -216,14 +216,14 @@ public class SubDimensionLevelMatcher implements AutoCloseable
IClientLevelWrapper clientLevelWrapper = null;
if (clientLevelWrapper == null)
{
// TODO level shouldn't be null, continuing would probably cause a null pointer crash
// TODO Sub dimension level matcher is incomplete
LOGGER.info(this.getClass().getSimpleName() + " implementation incomplete. Unable to get LOD data file from generic folder without [" + IClientLevelWrapper.class.getSimpleName() + "].");
break;
}
IDhLevel tempLevel = new DhClientLevel(new ClientOnlySaveStructure(), clientLevelWrapper);
IDhLevel tempLevel = null; // new DhClientLevel(new ClientOnlySaveStructure(), clientLevelWrapper, ???);
IFullDataSourceProvider fileHandler = new FullDataFileHandler(tempLevel, tempLevel.getSaveStructure());
CompletableFuture<IFullDataSource> testDataSource = fileHandler.read(new DhSectionPos(playerChunkPos));
IFullDataSource lodDataSource = testDataSource.get();
IFullDataSource lodDataSource = testDataSource.get();
// convert the data source into a raw LOD data array
@@ -5,6 +5,8 @@ import com.seibel.distanthorizons.core.file.fullDatafile.IFullDataSourceProvider
import com.seibel.distanthorizons.core.file.fullDatafile.RemoteFullDataFileHandler;
import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.pos.DhBlockPos;
import com.seibel.distanthorizons.core.wrapperInterfaces.block.IBlockStateWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IProfilerWrapper;
@@ -30,11 +32,11 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
// constructor //
//=============//
public DhClientLevel(AbstractSaveStructure saveStructure, IClientLevelWrapper clientLevelWrapper)
public DhClientLevel(AbstractSaveStructure saveStructure, IClientLevelWrapper clientLevelWrapper, ChildNetworkEventSource<NetworkClient> eventSource)
{
this.levelWrapper = clientLevelWrapper;
this.saveStructure = saveStructure;
dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure);
dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure, eventSource);
clientside = new ClientLevelModule(this);
clientside.startRenderer();
LOGGER.info("Started DHLevel for "+this.levelWrapper+" with saves at "+this.saveStructure);
@@ -0,0 +1,43 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.network.protocol.INetworkMessage;
import io.netty.channel.ChannelHandlerContext;
import java.util.function.BiConsumer;
/** Provides a way to register network message handlers which are expected to be removed later. */
public final class ChildNetworkEventSource<TParent extends NetworkEventSource> extends NetworkEventSource
{
private final TParent parent;
private boolean isClosed = false;
public ChildNetworkEventSource(TParent parent)
{
this.parent = parent;
}
public ChildNetworkEventSource(ChildNetworkEventSource<TParent> child)
{
this(child.parent);
}
@Override public <T extends INetworkMessage> void registerHandler(Class<T> handlerClass, BiConsumer<T, ChannelHandlerContext> handlerImplementation)
{
if (isClosed) return;
if (!this.hasHandler(handlerClass))
{
parent.registerHandler(handlerClass, this::handleMessage);
}
super.registerHandler(handlerClass, handlerImplementation);
}
@Override public void close()
{
isClosed = true;
for (Class<? extends INetworkMessage> handlerClass : this.handlers.keySet())
{
parent.removeHandler(handlerClass, this::handleMessage);
}
}
}
@@ -4,6 +4,7 @@ import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -41,7 +42,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new NetworkChannelInitializer(this.messageHandler));
.handler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage)));
private EConnectionState connectionState;
private Channel channel;
@@ -132,7 +133,7 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
this.connectionState = EConnectionState.RECONNECT_FORCE;
this.channel.disconnect();
}
@Override
public void close()
{
@@ -144,6 +145,8 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
this.connectionState = EConnectionState.CLOSED;
this.workerGroup.shutdownGracefully().syncUninterruptibly();
this.channel.close().syncUninterruptibly();
super.close();
}
}
@@ -3,26 +3,48 @@ package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.AckMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class NetworkEventSource implements AutoCloseable
public abstract class NetworkEventSource
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
protected final Map<Class<? extends INetworkMessage>, Set<BiConsumer<INetworkMessage, ChannelHandlerContext>>> handlers = new HashMap<>();
protected final MessageHandler messageHandler = new MessageHandler();
protected boolean hasHandler(Class<? extends INetworkMessage> handlerClass)
{
return this.handlers.containsKey(handlerClass);
}
public <T extends INetworkMessage> void registerHandler(Class<T> clazz, BiConsumer<T, ChannelHandlerContext> handler) { this.messageHandler.registerHandler(clazz, handler); }
protected void handleMessage(INetworkMessage message, ChannelHandlerContext channelContext)
{
Set<BiConsumer<INetworkMessage, ChannelHandlerContext>> handlerList = this.handlers.get(message.getClass());
if (handlerList == null || handlerList.isEmpty())
{
LOGGER.warn("Unhandled message type: " + message.getClass().getSimpleName());
return;
}
for (BiConsumer<INetworkMessage, ChannelHandlerContext> handler : handlerList)
{
handler.accept(message, channelContext);
}
}
public <T extends INetworkMessage> void registerHandler(Class<T> handlerClass, BiConsumer<T, ChannelHandlerContext> handlerImplementation)
{
this.handlers.computeIfAbsent(handlerClass, missingHandlerClass -> new HashSet<>())
.add((BiConsumer<INetworkMessage, ChannelHandlerContext>) handlerImplementation);
}
public <T extends INetworkMessage> void registerAckHandler(Class<T> clazz, Consumer<ChannelHandlerContext> handler)
{
this.messageHandler.registerHandler(AckMessage.class, (ackMessage, channelContext) ->
this.registerHandler(AckMessage.class, (ackMessage, channelContext) ->
{
if (ackMessage.messageType == clazz)
{
@@ -31,4 +53,14 @@ public abstract class NetworkEventSource implements AutoCloseable
});
}
protected <T extends INetworkMessage> void removeHandler(Class<T> handlerClass, BiConsumer<T, ChannelHandlerContext> handlerImplementation)
{
this.handlers.computeIfAbsent(handlerClass, missingHandlerClass -> new HashSet<>())
.remove(handlerImplementation);
}
public void close()
{
this.handlers.clear();
}
}
@@ -5,6 +5,7 @@ import com.seibel.distanthorizons.core.network.messages.AckMessage;
import com.seibel.distanthorizons.core.network.messages.CloseReasonMessage;
import com.seibel.distanthorizons.core.network.messages.CloseMessage;
import com.seibel.distanthorizons.core.network.messages.HelloMessage;
import com.seibel.distanthorizons.core.network.protocol.MessageHandler;
import com.seibel.distanthorizons.core.network.protocol.NetworkChannelInitializer;
import com.seibel.distanthorizons.coreapi.ModInfo;
import io.netty.bootstrap.ServerBootstrap;
@@ -74,7 +75,7 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new NetworkChannelInitializer(this.messageHandler));
.childHandler(new NetworkChannelInitializer(new MessageHandler(this::handleMessage)));
ChannelFuture bindFuture = bootstrap.bind(this.port);
bindFuture.addListener((ChannelFuture channelFuture) ->
@@ -111,6 +112,8 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
this.workerGroup.shutdownGracefully().syncUninterruptibly();
this.bossGroup.shutdownGracefully().syncUninterruptibly();
LOGGER.info("Network server has been closed.");
super.close();
}
}
@@ -19,35 +19,24 @@ public class MessageHandler extends SimpleChannelInboundHandler<INetworkMessage>
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private final Map<Class<? extends INetworkMessage>, List<BiConsumer<INetworkMessage, ChannelHandlerContext>>> handlers = new HashMap<>();
private final BiConsumer<INetworkMessage, ChannelHandlerContext> messageConsumer;
public <T extends INetworkMessage> void registerHandler(Class<T> handlerClass, BiConsumer<T, ChannelHandlerContext> handlerImplementation)
public MessageHandler(BiConsumer<INetworkMessage, ChannelHandlerContext> messageConsumer)
{
this.handlers.computeIfAbsent(handlerClass, missingHandlerClass -> new LinkedList<>())
.add((BiConsumer<INetworkMessage, ChannelHandlerContext>) handlerImplementation);
this.messageConsumer = messageConsumer;
}
@Override
protected void channelRead0(ChannelHandlerContext channelContext, INetworkMessage message)
{
LOGGER.trace("Received message: "+message.getClass().getSimpleName());
List<BiConsumer<INetworkMessage, ChannelHandlerContext>> handlerList = this.handlers.get(message.getClass());
if (handlerList == null)
{
LOGGER.warn("Unhandled message type: "+message.getClass().getSimpleName());
return;
}
for (BiConsumer<INetworkMessage, ChannelHandlerContext> handler : handlerList)
{
handler.accept(message, channelContext);
}
LOGGER.trace("Received message: " + message.getClass().getSimpleName());
this.messageConsumer.accept(message, channelContext);
}
@Override
public void channelInactive(@NotNull ChannelHandlerContext channelContext) { this.channelRead0(channelContext, new CloseMessage()); }
public void channelInactive(@NotNull ChannelHandlerContext channelContext)
{
this.channelRead0(channelContext, new CloseMessage());
}
}
@@ -4,6 +4,7 @@ import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.structure.ClientOnlySaveStructure;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.level.DhClientLevel;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.messages.*;
import com.seibel.distanthorizons.core.network.messages.PlayerUUIDMessage;
@@ -85,7 +86,7 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
return null;
}
return new DhClientLevel(this.saveStructure, clientLevelWrapper);
return new DhClientLevel(this.saveStructure, clientLevelWrapper, new ChildNetworkEventSource<>(networkClient));
});
}