Everything I've done so far (not working/finished)

This commit is contained in:
s809
2023-08-01 22:31:16 +05:00
parent 2251cd4c25
commit 1788c18d59
12 changed files with 136 additions and 46 deletions
@@ -1,6 +1,5 @@
package com.seibel.distanthorizons.core.generation;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.generation.tasks.IWorldGenTaskTracker;
@@ -8,12 +7,12 @@ import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.RemotePlayerConfigMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhChunkPos;
import com.seibel.distanthorizons.core.pos.DhLodPos;
@@ -25,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -36,27 +36,25 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private final ChildNetworkEventSource<NetworkClient> eventSource;
private final ClientNetworkState networkState;
private final IDhClientLevel level;
private final ConcurrentMap<DhSectionPos, WorldGenQueueEntry> waitingTasks = new ConcurrentHashMap<>();
private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE);
private int pendingTasks() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); }
private int maxConcurrentRequests = 0;
private final Set<CompletableFuture<FullDataSourceResponseMessage>> pendingRequests = ConcurrentHashMap.newKeySet();
private volatile CompletableFuture<Void> generatorClosingFuture = null;
private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log);
private final AtomicInteger finishedRequests = new AtomicInteger();
private final AtomicInteger totalRequests = new AtomicInteger();
private final AtomicInteger failedRequests = new AtomicInteger();
public WorldRemoteGenerationQueue(NetworkClient networkClient, IDhClientLevel level)
public WorldRemoteGenerationQueue(ClientNetworkState networkState, IDhClientLevel level)
{
this.eventSource = new ChildNetworkEventSource<>(networkClient);
this.networkState = networkState;
this.level = level;
eventSource.registerHandler(RemotePlayerConfigMessage.class, msg -> {
maxConcurrentRequests = Math.min(msg.payload.fullDataRequestRateLimit, Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit.get());
});
}
@Override public byte largestDataDetail()
@@ -78,15 +76,18 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
@Override public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos)
{
while (eventSource.parent.isReady() && pendingTasks() < maxConcurrentRequests && !waitingTasks.isEmpty())
while (generatorClosingFuture == null
&& networkState.eventSource.parent.isReady()
&& !waitingTasks.isEmpty()
&& pendingTasks() < this.networkState.config.fullDataRequestRateLimit
&& pendingTasksSemaphore.tryAcquire())
{
sendNewRequest(targetPos);
}
}
private void sendNewRequest(DhBlockPos2D targetPos)
{
if (!pendingTasksSemaphore.tryAcquire())
return;
DhSectionPos sectionPos = Objects.requireNonNull(waitingTasks.keySet().stream().reduce(null, (a, b)
-> a != null
&& a.getCenter().getCenterBlockPos().distSquared(targetPos)
@@ -94,9 +95,11 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
? a : b));
WorldGenQueueEntry entry = waitingTasks.remove(sectionPos);
eventSource.parent.<FullDataSourceResponseMessage>sendRequest(new FullDataSourceRequestMessage(sectionPos))
.handle((response, throwable) ->
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.eventSource.parent.sendRequest(new FullDataSourceRequestMessage(sectionPos));
pendingRequests.add(request);
request.handle((response, throwable) ->
{
pendingRequests.remove(request);
pendingTasksSemaphore.release();
finishedRequests.incrementAndGet();
@@ -154,26 +157,29 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
ArrayList<String> lines = new ArrayList<>();
lines.add("World Remote Generation Queue ["+level.getClientLevelWrapper().getDimensionType().getDimensionName()+"]");
lines.add(" Requests: "+this.finishedRequests+" / "+this.totalRequests +" (failed: "+ this.failedRequests+")");
lines.add(" Pending: "+this.pendingTasks()+" / "+this.maxConcurrentRequests);
lines.add(" Pending: "+this.pendingTasks()+" / "+this.networkState.config.fullDataRequestRateLimit);
return lines.toArray(new String[0]);
}
@Override public CompletableFuture<Void> startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning)
{
pendingTasksSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
if (cancelCurrentGeneration)
{
for (WorldGenQueueEntry entry : this.waitingTasks.values())
entry.future.cancel(alsoInterruptRunning);
}
return CompletableFuture.completedFuture(null);
return this.generatorClosingFuture = CompletableFuture.runAsync(() -> {
while (!pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE))
{
for (CompletableFuture<FullDataSourceResponseMessage> request : pendingRequests)
request.cancel(false);
}
if (cancelCurrentGeneration)
{
for (WorldGenQueueEntry entry : this.waitingTasks.values())
entry.future.cancel(alsoInterruptRunning);
}
});
}
@Override public void close()
{
eventSource.close();
f3Message.close();
}
@@ -1,7 +1,5 @@
package com.seibel.distanthorizons.core.level;
import com.seibel.distanthorizons.core.config.AppliedConfigState;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.fullDatafile.IFullDataSourceProvider;
@@ -50,8 +48,6 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
@CheckForNull
private final NetworkClient networkClient;
public final WorldGenModule worldGenModule;
// TODO maybe use some other value?
public final AppliedConfigState<Boolean> worldGeneratorEnabledConfig;
@@ -67,7 +63,6 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
this.networkClient = networkClient;
this.worldGenModule = new WorldGenModule(dataFileHandler, this);
this.worldGeneratorEnabledConfig = new AppliedConfigState<>(Config.Client.Advanced.WorldGenerator.enableDistantGeneration);
clientside = new ClientLevelModule(this);
clientside.startRenderer();
@@ -87,9 +82,8 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
public void doWorldGen()
{
worldGeneratorEnabledConfig.pollNewValue();
boolean isClientUsable = networkClient != null && !networkClient.isClosed();
boolean shouldDoWorldGen = worldGeneratorEnabledConfig.get() && isClientUsable && clientside.isRendering();
boolean shouldDoWorldGen = isClientUsable && clientside.isRendering();
boolean isWorldGenRunning = worldGenModule.isWorldGenRunning();
if (shouldDoWorldGen && !isWorldGenRunning)
{
@@ -14,6 +14,7 @@ import com.seibel.distanthorizons.core.multiplayer.RemotePlayerConnectionHandler
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
@@ -44,6 +45,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
private final ConcurrentLinkedQueue<IServerPlayerWrapper> worldGenLoopingQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<DhSectionPos, IncompleteDataSourceEntry> incompleteDataSources = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, FullDataSourceRequestMessage> fullDataRequests = new ConcurrentHashMap<>();
private final AppliedConfigState<Integer> rateLimitConfig = new AppliedConfigState<>(Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit);
@@ -91,12 +93,20 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
// If this fails, current entry is being drained and need create another one
if (entry.requestCollectionSemaphore.tryAcquire())
{
fullDataRequests.put(msg.futureId, msg);
entry.requestMessages.add(msg);
entry.requestCollectionSemaphore.release();
break;
}
}
});
this.eventSource.registerHandler(CancelMessage.class, msg ->
{
this.fullDataRequests.remove(msg.futureId);
RemotePlayer remotePlayer = remotePlayerConnectionHandler.getConnectedPlayer(msg);
remotePlayer.pendingFullDataRequests.decrementAndGet();
});
}
public void addPlayer(IServerPlayerWrapper serverPlayer)
@@ -142,8 +152,12 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
{
RemotePlayer remotePlayer = remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (remotePlayer == null) continue;
remotePlayer.pendingFullDataRequests.decrementAndGet();
// Check if cancelled
if (this.fullDataRequests.remove(msg.futureId) == null)
continue;
remotePlayer.pendingFullDataRequests.decrementAndGet();
msg.sendResponse(new FullDataSourceResponseMessage(completeSource, this));
}
}
@@ -0,0 +1,32 @@
package com.seibel.distanthorizons.core.multiplayer;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.network.ChildNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.messages.RemotePlayerConfigMessage;
import java.io.Closeable;
public class ClientNetworkState implements Closeable
{
public final ChildNetworkEventSource<NetworkClient> eventSource;
public RemotePlayer.Payload config = new RemotePlayer.Payload();
public ClientNetworkState(NetworkClient networkClient)
{
this.eventSource = new ChildNetworkEventSource<>(networkClient);
this.registerNetworkHandlers();
}
private void registerNetworkHandlers()
{
eventSource.registerHandler(RemotePlayerConfigMessage.class, msg -> {
this.config = msg.payload;
});
}
public void close()
{
this.eventSource.close();
}
}
@@ -71,6 +71,11 @@ public class RemotePlayerConnectionHandler
return playersByConnection.get(msg.getChannelContext());
}
public RemotePlayer getPlayer(IServerPlayerWrapper serverPlayer)
{
return playersByUUID.get(serverPlayer.getUUID());
}
public void mcPlayerJoined(IServerPlayerWrapper serverPlayer)
{
this.playersByUUID.put(serverPlayer.getUUID(), new RemotePlayer(serverPlayer));
@@ -47,9 +47,9 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
public boolean isInitialState() { return this.connectionState == EConnectionState.INITIAL; }
/** Indicates whether the client is closed(-ing) and should not be used. */
public boolean isClosed() { return closedStates.contains(this.connectionState); }
private boolean isReady;
private boolean ready;
/** Indicates whether the connection is established and first message is sent. */
public boolean isReady() { return isReady; }
public boolean isReady() { return ready; }
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final Bootstrap clientBootstrap = new Bootstrap()
@@ -111,13 +111,13 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
}
channel.writeAndFlush(new HelloMessage());
isReady = true;
ready = true;
});
this.channel = connectFuture.channel();
this.channel.closeFuture().addListener((ChannelFuture channelFuture) ->
{
isReady = false;
ready = false;
this.completeAllFuturesExceptionally(channelFuture.cause() != null
? channelFuture.cause()
: new ChannelException("Channel is closed."));
@@ -3,6 +3,7 @@ package com.seibel.distanthorizons.core.network;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.messages.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.ExceptionMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
@@ -13,6 +14,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -20,7 +22,7 @@ public abstract class NetworkEventSource
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
protected final Map<Class<? extends NetworkMessage>, Set<Consumer<NetworkMessage>>> handlers = new HashMap<>();
private final Table<ChannelHandlerContext, Integer, CompletableFuture<FutureTrackableNetworkMessage>> pendingFutures = HashBasedTable.create();
private final Table<ChannelHandlerContext, Long, CompletableFuture<FutureTrackableNetworkMessage>> pendingFutures = HashBasedTable.create();
protected boolean hasHandler(Class<? extends NetworkMessage> handlerClass)
{
@@ -32,6 +34,9 @@ public abstract class NetworkEventSource
{
boolean handled = false;
if (message instanceof FutureTrackableNetworkMessage)
((FutureTrackableNetworkMessage) message).futureId |= (long) message.getChannelContext().hashCode() << 32;
Set<Consumer<NetworkMessage>> handlerList = this.handlers.get(message.getClass());
if (handlerList != null)
{
@@ -45,7 +50,7 @@ public abstract class NetworkEventSource
if (message instanceof FutureTrackableNetworkMessage)
{
FutureTrackableNetworkMessage trackableMessage = (FutureTrackableNetworkMessage)message;
CompletableFuture<FutureTrackableNetworkMessage> future = pendingFutures.remove(message.getChannelContext(), trackableMessage.futureId);
CompletableFuture<FutureTrackableNetworkMessage> future = pendingFutures.get(message.getChannelContext(), trackableMessage.futureId);
if (future != null)
{
handled = true;
@@ -78,7 +83,17 @@ public abstract class NetworkEventSource
protected <TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(ChannelHandlerContext ctx, FutureTrackableNetworkMessage msg)
{
msg.futureId |= (long) ctx.hashCode() << 32;
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
responseFuture.handle((response, throwable) -> {
pendingFutures.remove(ctx, msg.futureId);
if (throwable instanceof CancellationException)
msg.sendResponse(new CancelMessage());
return null;
});
pendingFutures.put(ctx, msg.futureId, (CompletableFuture<FutureTrackableNetworkMessage>) responseFuture);
ctx.writeAndFlush(msg).addListener(writeFuture -> {
@@ -28,7 +28,6 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private Channel channel;
private boolean isClosed = false;
@@ -95,8 +94,8 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
LOGGER.info("Server is started on port "+this.port);
});
this.channel = bindFuture.channel();
this.channel.closeFuture().addListener(future -> this.close());
Channel channel = bindFuture.channel();
channel.closeFuture().addListener(future -> this.close());
}
public void disconnectClient(ChannelHandlerContext ctx, String reason)
@@ -0,0 +1,19 @@
package com.seibel.distanthorizons.core.network.messages;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import io.netty.buffer.ByteBuf;
public class CancelMessage extends FutureTrackableNetworkMessage
{
public CancelMessage() { }
@Override
public void encode0(ByteBuf out)
{
}
@Override
public void decode0(ByteBuf in)
{
}
}
@@ -6,7 +6,7 @@ import io.netty.buffer.ByteBuf;
public abstract class FutureTrackableNetworkMessage extends NetworkMessage
{
private static int lastId = 0;
public int futureId = lastId++;
public long futureId = lastId++;
public void sendResponse(FutureTrackableNetworkMessage responseMessage)
{
@@ -23,7 +23,7 @@ public abstract class FutureTrackableNetworkMessage extends NetworkMessage
{
try
{
out.writeInt(futureId);
out.writeInt((int)futureId);
this.encode0(out);
}
catch (Exception e)
@@ -27,6 +27,7 @@ public class MessageRegistry
// Define your messages after this line
this.registerMessage(AckMessage.class, AckMessage::new);
this.registerMessage(CancelMessage.class, CancelMessage::new);
this.registerMessage(ExceptionMessage.class, ExceptionMessage::new);
this.registerMessage(PlayerUUIDMessage.class, PlayerUUIDMessage::new);
this.registerMessage(RemotePlayerConfigMessage.class, RemotePlayerConfigMessage::new);
@@ -48,6 +48,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
{
this.networkServer.registerHandler(RemotePlayerConfigMessage.class, remotePlayerConfigMessage ->
{
this.remotePlayerConnectionHandler.getConnectedPlayer(remotePlayerConfigMessage).payload = remotePlayerConfigMessage.payload;
remotePlayerConfigMessage.payload.fullDataRequestRateLimit = Math.min(rateLimitConfig.get(), remotePlayerConfigMessage.payload.fullDataRequestRateLimit);
remotePlayerConfigMessage.sendResponse(remotePlayerConfigMessage);
});
@@ -67,6 +69,9 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
{
this.getLevel(origin).removePlayer(player);
this.getLevel(dest).addPlayer(player);
RemotePlayer remotePlayer = this.remotePlayerConnectionHandler.getPlayer(player);
remotePlayer.channelContext.writeAndFlush(new RemotePlayerConfigMessage(remotePlayer.payload));
}
@Override