diff --git a/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/fullData/FullDataPointIdMap.java b/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/fullData/FullDataPointIdMap.java index 589d327f1..fa9a49b2a 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/fullData/FullDataPointIdMap.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/dataObjects/fullData/FullDataPointIdMap.java @@ -161,7 +161,7 @@ public class FullDataPointIdMap } - public String serialize(ILevelWrapper levelWrapper) { return this.biome.serialize(levelWrapper) + SEPARATOR_STRING + this.blockState.serialize(); } + public String serialize(ILevelWrapper levelWrapper) { return this.biome.serialize(levelWrapper) + SEPARATOR_STRING + this.blockState.serialize(levelWrapper); } public static Entry deserialize(String str, ILevelWrapper levelWrapper) throws IOException, InterruptedException { @@ -178,7 +178,7 @@ public class FullDataPointIdMap } IBiomeWrapper biome = WRAPPER_FACTORY.deserializeBiomeWrapper(stringArray[0], levelWrapper); - IBlockStateWrapper blockState = WRAPPER_FACTORY.deserializeBlockStateWrapper(stringArray[1]); + IBlockStateWrapper blockState = WRAPPER_FACTORY.deserializeBlockStateWrapper(stringArray[1], levelWrapper); return new Entry(biome, blockState); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java index b1ddbda49..348529f13 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java @@ -43,6 +43,18 @@ public class FullDataFileHandler implements IFullDataSourceProvider private final ConcurrentHashMap unloadedFiles = new ConcurrentHashMap<>(); private final ConcurrentHashMap fileBySectionPos = new ConcurrentHashMap<>(); public void ForEachFile(Consumer consumer) { this.fileBySectionPos.values().forEach(consumer); } + public Map getLoadStates(Iterable posList) + { + HashMap map = new HashMap<>(); + for (DhSectionPos pos : posList) + { + map.put(pos, + fileBySectionPos.containsKey(pos) ? 3 // Loaded + : unloadedFiles.containsKey(pos) ? 2 // Unloaded + : 1); // Not generated + } + return map; + } private LinkedList> onUpdatedListeners = new LinkedList<>(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java index 0daaec15f..c70a0b05f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java @@ -11,6 +11,8 @@ import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState; import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; import com.seibel.distanthorizons.core.network.messages.FullDataSourceRequestMessage; import com.seibel.distanthorizons.core.network.messages.FullDataSourceResponseMessage; +import com.seibel.distanthorizons.core.network.messages.GenTaskPriorityRequestMessage; +import com.seibel.distanthorizons.core.network.messages.GenTaskPriorityResponseMessage; import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.pos.DhChunkPos; import com.seibel.distanthorizons.core.pos.DhLodPos; @@ -25,9 +27,11 @@ import org.apache.logging.log4j.Logger; import javax.annotation.CheckForNull; import java.awt.*; import java.util.*; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Collectors; public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebugRenderable { @@ -42,6 +46,9 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true); private int pendingTasks() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); } + private CompletableFuture genTaskPriorityRequest = CompletableFuture.completedFuture(null); + private final Semaphore genTaskPriorityRequestSemaphore = new Semaphore(1, true); + private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log); private final AtomicInteger finishedRequests = new AtomicInteger(); private final AtomicInteger totalRequests = new AtomicInteger(); @@ -73,17 +80,62 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug return entry.future; } + private int posDistanceSquared(DhBlockPos2D targetPos, DhSectionPos pos) + { + return (int) pos.getCenter().getCenterBlockPos().distSquared(targetPos); + } + @Override public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos) { - while (generatorClosingFuture == null - && networkState.client().isReady() - && !waitingTasks.isEmpty() + if (generatorClosingFuture != null || !networkState.getClient().isReady()) return; + + while (!waitingTasks.isEmpty() && pendingTasks() < this.networkState.config.fullDataRequestRateLimit && pendingTasksSemaphore.tryAcquire()) { sendNewRequest(targetPos); } + + if (genTaskPriorityRequestSemaphore.tryAcquire()) { + List posList = waitingTasks.entrySet().stream() + .filter(task -> task.getValue().request == null && task.getValue().priority == 0) + .sorted((x, y) -> posDistanceSquared(targetPos, x.getKey()) - posDistanceSquared(targetPos, y.getKey())) + .limit(this.networkState.config.fullDataRequestRateLimit) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + if (posList.isEmpty()) { + genTaskPriorityRequestSemaphore.release(); + return; + }; + + CompletableFuture request = this.networkState.getClient().sendRequest(new GenTaskPriorityRequestMessage(posList)); + genTaskPriorityRequest = request; + request.handleAsync((response, throwable) -> { + try + { + if (throwable != null) + throw throwable; + + for (Map.Entry mapEntry : response.posList.entrySet()) + { + WorldGenQueueEntry entry = waitingTasks.get(mapEntry.getKey()); + if (entry != null) + entry.priority = mapEntry.getValue(); + } + } + catch (ChannelException | CancellationException ignored) + { + } + catch (Throwable e) + { + LOGGER.error("Error while fetching gen task priorities", e); + } + + genTaskPriorityRequestSemaphore.release(); + return null; + }); + } } @Override @@ -106,10 +158,10 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug Map.Entry mapEntry = waitingTasks.entrySet().stream() .filter(task -> task.getValue().request == null) .reduce(null, (a, b) - -> a != null - && a.getKey().getCenter().getCenterBlockPos().distSquared(targetPos) - < b.getKey().getCenter().getCenterBlockPos().distSquared(targetPos) - ? a : b); + -> a == null + || b.getValue().priority > a.getValue().priority + || posDistanceSquared(targetPos, b.getKey()) < posDistanceSquared(targetPos, a.getKey()) + ? b : a); if (mapEntry == null) { pendingTasksSemaphore.release(); @@ -119,69 +171,71 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug DhSectionPos sectionPos = mapEntry.getKey(); WorldGenQueueEntry entry = mapEntry.getValue(); - entry.request = this.networkState.client().sendRequest(new FullDataSourceRequestMessage(sectionPos)); - entry.request.handle((response, throwable) -> - { - entry.request = null; - pendingTasksSemaphore.release(); - finishedRequests.incrementAndGet(); + CompletableFuture request = this.networkState.getClient().sendRequest(new FullDataSourceRequestMessage(sectionPos)); + entry.request = request; + request.handleAsync((response, throwable) -> + { + pendingTasksSemaphore.release(); + finishedRequests.incrementAndGet(); + + try + { + if (throwable != null) + throw throwable; + + waitingTasks.remove(sectionPos); + LOGGER.debug("FullDataSourceResponseMessage " + sectionPos); + CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level); + + // FIXME Add dimension context to request instead + // Check is dimension has been switched - received data may no longer be relevant + if (fullDataSource == null) + throw new CancellationException(); + + Consumer chunkDataConsumer = entry.tracker.getChunkDataConsumer(); + + // FIXME Why keeping a reference in first place + if (chunkDataConsumer == null) + return entry.future.cancel(false); + + sectionPos.forEachChildAtLevel(LodUtil.CHUNK_DETAIL_LEVEL, childPos -> { + ChunkSizedFullDataAccessor accessor = new ChunkSizedFullDataAccessor(new DhChunkPos(childPos.sectionX, childPos.sectionZ)); - try - { - if (throwable != null) - throw throwable; - - waitingTasks.remove(sectionPos); - LOGGER.debug("FullDataSourceResponseMessage " + sectionPos); - CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level); - - // Check is dimension has been switched - received data may no longer be relevant - if (fullDataSource == null) - throw new CancellationException(); - - Consumer chunkDataConsumer = entry.tracker.getChunkDataConsumer(); - - // FIXME Who decided it was a good idea to use weak references for cancellation purposes? - if (chunkDataConsumer == null) - return entry.future.cancel(false); - - sectionPos.forEachChildAtLevel(LodUtil.CHUNK_DETAIL_LEVEL, childPos -> { - ChunkSizedFullDataAccessor accessor = new ChunkSizedFullDataAccessor(new DhChunkPos(childPos.sectionX, childPos.sectionZ)); - - int detailLevelDifference = sectionPos.sectionDetailLevel - childPos.sectionDetailLevel; - int childRelativeX = childPos.sectionX - sectionPos.sectionX * BitShiftUtil.powerOfTwo(detailLevelDifference); - int childRelativeZ = childPos.sectionZ - sectionPos.sectionZ * BitShiftUtil.powerOfTwo(detailLevelDifference); - - fullDataSource.subView( - LodUtil.CHUNK_WIDTH, - childRelativeX * LodUtil.CHUNK_WIDTH, - childRelativeZ * LodUtil.CHUNK_WIDTH - ).shadowCopyTo(accessor); - - chunkDataConsumer.accept(accessor); - }); - } - catch (ChannelException | RateLimitedException e) - { - if (e instanceof RateLimitedException) - LOGGER.warn("Rate limited by server, re-queueing task ["+sectionPos+"]: "+e.getMessage()); - - finishedRequests.decrementAndGet(); - } - catch (CancellationException ignored) - { - finishedRequests.decrementAndGet(); - totalRequests.decrementAndGet(); - } - catch (Throwable e) - { - LOGGER.error("Error while fetching full data source", e); - failedRequests.incrementAndGet(); - return entry.future.complete(WorldGenResult.CreateFail()); - } + int detailLevelDifference = sectionPos.sectionDetailLevel - childPos.sectionDetailLevel; + int childRelativeX = childPos.sectionX - sectionPos.sectionX * BitShiftUtil.powerOfTwo(detailLevelDifference); + int childRelativeZ = childPos.sectionZ - sectionPos.sectionZ * BitShiftUtil.powerOfTwo(detailLevelDifference); - return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos)); + fullDataSource.subView( + LodUtil.CHUNK_WIDTH, + childRelativeX * LodUtil.CHUNK_WIDTH, + childRelativeZ * LodUtil.CHUNK_WIDTH + ).shadowCopyTo(accessor); + + chunkDataConsumer.accept(accessor); }); + } + catch (ChannelException | RateLimitedException e) + { + if (e instanceof RateLimitedException) + LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage()); + + entry.request = null; + finishedRequests.decrementAndGet(); + } + catch (CancellationException ignored) + { + finishedRequests.decrementAndGet(); + totalRequests.decrementAndGet(); + } + catch (Throwable e) + { + LOGGER.error("Error while fetching full data source", e); + failedRequests.incrementAndGet(); + return entry.future.complete(WorldGenResult.CreateFail()); + } + + return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos)); + }); } private String[] f3Log() @@ -197,6 +251,11 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug public CompletableFuture startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) { return this.generatorClosingFuture = CompletableFuture.runAsync(() -> { + while (!genTaskPriorityRequestSemaphore.tryAcquire()) + { + genTaskPriorityRequest.cancel(false); + } + while (!pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE)) { for (WorldGenQueueEntry entry : this.waitingTasks.values()) @@ -231,8 +290,12 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug { public final CompletableFuture future; public final IWorldGenTaskTracker tracker; + + // Higher value = higher priority. + // Priority of 0 is reserved for unassigned value + public int priority = 0; @CheckForNull - public CompletableFuture request; + public CompletableFuture request; public WorldGenQueueEntry(CompletableFuture future, IWorldGenTaskTracker tracker) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java index 24fe4253d..38055b266 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java @@ -8,7 +8,6 @@ import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure; import com.seibel.distanthorizons.core.generation.WorldRemoteGenerationQueue; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState; -import com.seibel.distanthorizons.core.network.NetworkClient; import com.seibel.distanthorizons.core.pos.DhBlockPos; import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.pos.DhSectionPos; @@ -83,7 +82,7 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel public void doWorldGen() { - boolean isClientUsable = networkState != null && !networkState.client().isClosed(); + boolean isClientUsable = networkState != null && !networkState.getClient().isClosed(); boolean shouldDoWorldGen = isClientUsable && clientside.isRendering(); boolean isWorldGenRunning = worldGenModule.isWorldGenRunning(); if (shouldDoWorldGen && !isWorldGenRunning) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index 06e47faa7..cc98e20eb 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -13,9 +13,7 @@ import com.seibel.distanthorizons.core.multiplayer.RemotePlayerConnectionHandler import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource; import com.seibel.distanthorizons.core.network.NetworkServer; import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; -import com.seibel.distanthorizons.core.network.messages.CancelMessage; -import com.seibel.distanthorizons.core.network.messages.FullDataSourceRequestMessage; -import com.seibel.distanthorizons.core.network.messages.FullDataSourceResponseMessage; +import com.seibel.distanthorizons.core.network.messages.*; import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.pos.DhLodPos; import com.seibel.distanthorizons.core.pos.DhSectionPos; @@ -65,6 +63,12 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel private void registerNetworkHandlers() { + // TODO implement transparent message handling restriction by level + // workaround: +// ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg); +// if (serverPlayerState.serverPlayer.getLevel() != this.serverLevelWrapper) +// return; + this.eventSource.registerHandler(FullDataSourceRequestMessage.class, msg -> { ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg); @@ -100,6 +104,16 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel } }); + this.eventSource.registerHandler(GenTaskPriorityRequestMessage.class, msg -> { + ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg); + if (serverPlayerState.serverPlayer.getLevel() != this.serverLevelWrapper) + return; + + msg.sendResponse(new GenTaskPriorityResponseMessage( + this.serverside.dataFileHandler.getLoadStates(msg.posList) + )); + }); + this.eventSource.registerHandler(CancelMessage.class, msg -> { IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/ClientNetworkState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/ClientNetworkState.java index ce05ac121..41744ecfd 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/ClientNetworkState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/ClientNetworkState.java @@ -2,6 +2,7 @@ package com.seibel.distanthorizons.core.multiplayer; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; +import com.seibel.distanthorizons.core.network.IClientRequestHandler; import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource; import com.seibel.distanthorizons.core.network.NetworkClient; import com.seibel.distanthorizons.core.network.messages.AckMessage; @@ -17,29 +18,40 @@ public class ClientNetworkState implements Closeable { protected static final Logger LOGGER = DhLoggerBuilder.getLogger(); - private final ScopedNetworkEventSource eventSource; + private final NetworkClient client; private final UUID playerUUID; public MultiplayerConfig config = new MultiplayerConfig(); - public NetworkClient client() { return this.eventSource.parent; } + /** + * Returns the client used by this instance.

+ * If you need to subscribe to any packet events, create an instance of {@link ScopedNetworkEventSource} using the returned instance. + */ + public IClientRequestHandler getClient() { return this.client; } + /** + * Constructs a new instance. + * + * @param networkClient Client to use. It is assumed that this client will be at full control by this instance. + * @param playerUUID UUID of a player connected + */ public ClientNetworkState(NetworkClient networkClient, UUID playerUUID) { - this.eventSource = new ScopedNetworkEventSource<>(networkClient); + this.client = networkClient; this.playerUUID = playerUUID; this.registerNetworkHandlers(); - this.client().startConnecting(); + this.client.startConnecting(); } private void registerNetworkHandlers() { - this.client().registerHandler(HelloMessage.class, helloMessage -> + this.client.registerHandler(HelloMessage.class, helloMessage -> { LOGGER.info("Connected to server: "+helloMessage.getChannelContext().channel().remoteAddress()); - this.client().sendRequest(new PlayerUUIDMessage(playerUUID)) - .thenCompose(ack -> this.client().sendRequest(new RemotePlayerConfigMessage(new MultiplayerConfig() + this.getClient().sendRequest(new PlayerUUIDMessage(playerUUID)) + .thenCompose(ack -> this.getClient().sendRequest(new RemotePlayerConfigMessage(new MultiplayerConfig() {{ + renderDistance = Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistance.get(); fullDataRequestRateLimit = Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit.get(); }}))) .thenAccept(msg -> { @@ -54,7 +66,6 @@ public class ClientNetworkState implements Closeable public void close() { - this.eventSource.close(); - this.client().close(); + this.client.close(); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/IClientRequestHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/network/IClientRequestHandler.java new file mode 100644 index 000000000..a91b03798 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/IClientRequestHandler.java @@ -0,0 +1,19 @@ +package com.seibel.distanthorizons.core.network; + +import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; + +import java.util.concurrent.CompletableFuture; + +public interface IClientRequestHandler +{ + /** Indicates whether the client is initialized and not started connecting yet. */ + boolean isInitialState(); + /** Indicates whether the client is closed(-ing) and should not be used. */ + boolean isClosed(); + /** Indicates whether the connection is established and first message is sent. */ + boolean isReady(); + + /** Sends a new request. */ + CompletableFuture sendRequest(FutureTrackableNetworkMessage msg); +} + diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java index b79d5997f..8eb4e5a19 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkClient.java @@ -19,7 +19,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -public class NetworkClient extends NetworkEventSource implements AutoCloseable +public class NetworkClient extends NetworkEventSource implements IClientRequestHandler, AutoCloseable { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); @@ -102,6 +102,8 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable // FIXME sometimes this causes the MC connection to crash // this might happen if the URL can't be converted to a IP (IE UnknownHostException) ChannelFuture connectFuture = this.clientBootstrap.connect(this.address); + this.channel = connectFuture.channel(); + connectFuture.addListener((ChannelFuture channelFuture) -> { if (!channelFuture.isSuccess()) @@ -114,7 +116,6 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable ready = true; }); - this.channel = connectFuture.channel(); this.channel.closeFuture().addListener((ChannelFuture channelFuture) -> { ready = false; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java index 09f9ee576..0d6daaa23 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java @@ -66,7 +66,10 @@ public abstract class NetworkEventSource if (!handled) { - LOGGER.warn("Unhandled message type: " + message.getClass().getSimpleName()); + String error = "Unhandled message type: " + message.getClass().getSimpleName(); + if (message instanceof FutureTrackableNetworkMessage) + error += ", future id: " + ((FutureTrackableNetworkMessage) message).futureId; + LOGGER.warn(error); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/CloseReasonMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/CloseReasonMessage.java index 2a87bb4af..0e7e9b640 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/CloseReasonMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/CloseReasonMessage.java @@ -14,9 +14,9 @@ public class CloseReasonMessage extends NetworkMessage public CloseReasonMessage(String reason) { this.reason = reason; } @Override - public void encode(ByteBuf out) { INetworkObject.encodeString(this.reason, out); } + public void encode(ByteBuf out) { encodeString(this.reason, out); } @Override - public void decode(ByteBuf in) { this.reason = INetworkObject.decodeString(in); } + public void decode(ByteBuf in) { this.reason = decodeString(in); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/ExceptionMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/ExceptionMessage.java index b27fba497..4496fd2df 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/ExceptionMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/ExceptionMessage.java @@ -27,13 +27,13 @@ public class ExceptionMessage extends FutureTrackableNetworkMessage @Override protected void encode0(ByteBuf out) { out.writeInt(exceptionMap.indexOf(exception.getClass())); - INetworkObject.encodeString(exception.getMessage(), out); + encodeString(exception.getMessage(), out); } @Override protected void decode0(ByteBuf in) throws Exception { int id = in.readInt(); - String message = INetworkObject.decodeString(in); + String message = decodeString(in); exception = exceptionMap.get(id).getDeclaredConstructor(String.class).newInstance(message); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceRequestMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceRequestMessage.java index 1185f265e..e05f518d5 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceRequestMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceRequestMessage.java @@ -1,14 +1,10 @@ package com.seibel.distanthorizons.core.network.messages; -import com.google.common.collect.MapMaker; -import com.seibel.distanthorizons.core.level.DhClientLevel; import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; import com.seibel.distanthorizons.core.network.protocol.INetworkObject; import com.seibel.distanthorizons.core.pos.DhSectionPos; import io.netty.buffer.ByteBuf; -import java.util.concurrent.ConcurrentMap; - public class FullDataSourceRequestMessage extends FutureTrackableNetworkMessage { public DhSectionPos dhSectionPos; @@ -28,6 +24,6 @@ public class FullDataSourceRequestMessage extends FutureTrackableNetworkMessage @Override public void decode0(ByteBuf in) { - dhSectionPos = INetworkObject.decode(new DhSectionPos((byte)0, 0, 0), in); + dhSectionPos = INetworkObject.decodeStatic(DhSectionPos.zero(), in); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/GenTaskPriorityRequestMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/GenTaskPriorityRequestMessage.java new file mode 100644 index 000000000..45be3c077 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/GenTaskPriorityRequestMessage.java @@ -0,0 +1,31 @@ +package com.seibel.distanthorizons.core.network.messages; + +import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; + +public class GenTaskPriorityRequestMessage extends FutureTrackableNetworkMessage +{ + public List posList = new ArrayList<>(); + + public GenTaskPriorityRequestMessage() { } + public GenTaskPriorityRequestMessage(List posList) + { + this.posList = posList; + } + + @Override + protected void encode0(ByteBuf out) + { + encodeCollection(out, posList); + } + + @Override + protected void decode0(ByteBuf in) + { + decodeCollection(in, posList, DhSectionPos::zero); + } +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/GenTaskPriorityResponseMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/GenTaskPriorityResponseMessage.java new file mode 100644 index 000000000..706309699 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/GenTaskPriorityResponseMessage.java @@ -0,0 +1,32 @@ +package com.seibel.distanthorizons.core.network.messages; + +import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import io.netty.buffer.ByteBuf; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GenTaskPriorityResponseMessage extends FutureTrackableNetworkMessage +{ + public Map posList = new HashMap<>(); + + public GenTaskPriorityResponseMessage() { } + public GenTaskPriorityResponseMessage(Map posList) + { + this.posList = posList; + } + + @Override + protected void encode0(ByteBuf out) + { + encodeCollection(out, posList.entrySet()); + } + + @Override + protected void decode0(ByteBuf in) + { + decodeMap(in, posList, DhSectionPos::zero, () -> 0); + } +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/RemotePlayerConfigMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/RemotePlayerConfigMessage.java index dbc874f7c..ace46ac98 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/RemotePlayerConfigMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/RemotePlayerConfigMessage.java @@ -18,6 +18,6 @@ public class RemotePlayerConfigMessage extends FutureTrackableNetworkMessage public void encode0(ByteBuf out) { this.payload.encode(out); } @Override - public void decode0(ByteBuf in) { this.payload = INetworkObject.decode(new MultiplayerConfig(), in); } + public void decode0(ByteBuf in) { this.payload = INetworkObject.decodeStatic(new MultiplayerConfig(), in); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java index fb9a6d873..6c3eb8d5e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/FutureTrackableNetworkMessage.java @@ -3,11 +3,13 @@ package com.seibel.distanthorizons.core.network.protocol; import com.seibel.distanthorizons.core.network.messages.ExceptionMessage; import io.netty.buffer.ByteBuf; +import java.util.concurrent.atomic.AtomicInteger; + public abstract class FutureTrackableNetworkMessage extends NetworkMessage { - private static int lastId = 0; + private static final AtomicInteger lastId = new AtomicInteger(); // Only low 32 bits are sent (high bits are used for identifying a channel this request was sent from by remote peer) - public long futureId = lastId++; + public long futureId = lastId.incrementAndGet(); public void sendResponse(FutureTrackableNetworkMessage responseMessage) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java index 087e05b75..f70d5f94a 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/INetworkObject.java @@ -3,6 +3,10 @@ package com.seibel.distanthorizons.core.network.protocol; import io.netty.buffer.ByteBuf; import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.*; public interface INetworkObject { @@ -10,22 +14,109 @@ public interface INetworkObject void decode(ByteBuf in); - static T decode(T obj, ByteBuf inputByteBuf) + static T decodeStatic(T obj, ByteBuf inputByteBuf) { obj.decode(inputByteBuf); return obj; } - static void encodeString(String inputString, ByteBuf outputByteBuf) + default void encodeString(String inputString, ByteBuf outputByteBuf) { - outputByteBuf.writeShort(inputString.length()); - outputByteBuf.writeBytes(inputString.getBytes(StandardCharsets.UTF_8)); + byte[] bytes = inputString.getBytes(StandardCharsets.UTF_8); + outputByteBuf.writeShort(bytes.length); + outputByteBuf.writeBytes(bytes); } - static String decodeString(ByteBuf inputByteBuf) + default String decodeString(ByteBuf inputByteBuf) { int length = inputByteBuf.readShort(); return inputByteBuf.readBytes(length).toString(StandardCharsets.UTF_8); } + default void encodeCollection(ByteBuf outputByteBuf, Collection collection) + { + outputByteBuf.writeInt(collection.size()); + + Codec codec = null; + for (T item : collection) + { + if (codec == null) + codec = Codec.getCodec(item.getClass()); + codec.encode.accept(item, outputByteBuf); + } + } + + default void decodeCollection(ByteBuf inputByteBuf, Collection collection, Supplier innerValueConstructor) + { + int size = inputByteBuf.readInt(); + + Codec codec = null; + for (int i = 0; i < size; i++) + { + T item = innerValueConstructor.get(); + + if (codec == null) + codec = Codec.getCodec(item.getClass()); + item = (T) codec.decode.apply(item, inputByteBuf); + + collection.add(item); + } + } + + default void decodeMap(ByteBuf inputByteBuf, Map map, Supplier keySupplier, Supplier valueSupplier) + { + ArrayList> entryList = new ArrayList<>(); + + decodeCollection(inputByteBuf, entryList, () -> new AbstractMap.SimpleEntry<>(keySupplier.get(), valueSupplier.get())); + for (Map.Entry entry : entryList) + map.put(entry.getKey(), entry.getValue()); + } + + /** Should only be used for non-editable classes; + * otherwise, you may want to implement {@link INetworkObject} and use its method where applicable. */ + class Codec + { + private static final ConcurrentMap, Codec> codecMap = new ConcurrentHashMap, Codec>() + {{ + // Primitives must be added manually here + put(Integer.class, new Codec((obj, out) -> out.writeInt((int)obj), (obj, in) -> in.readInt())); + + put(INetworkObject.class, new Codec(INetworkObject::encode, INetworkObject::decodeStatic)); + put(Map.Entry.class, new Codec( + (obj, out) -> { + Map.Entry entry = (Entry) obj; + getCodec(entry.getKey().getClass()).encode.accept(entry.getKey(), out); + getCodec(entry.getValue().getClass()).encode.accept(entry.getValue(), out); + }, + (obj, in) -> { + Map.Entry entry = (Entry) obj; + return new SimpleEntry<>( + getCodec(entry.getKey().getClass()).decode.apply(entry.getKey(), in), + getCodec(entry.getValue().getClass()).decode.apply(entry.getValue(), in) + ); + } + )); + }}; + + public final BiConsumer encode; + public final BiFunction decode; + + public Codec(BiConsumer encode, BiFunction decode) + { + this.encode = (BiConsumer) encode; + this.decode = (BiFunction) decode; + } + + public static Codec getCodec(Class clazz) { + return codecMap.computeIfAbsent(clazz, ignored -> { + for (Map.Entry, Codec> entry : codecMap.entrySet()) + { + if (entry.getKey().isAssignableFrom(clazz)) + return entry.getValue(); + } + + throw new AssertionError("Class has no compatible codec: "+clazz.getSimpleName()); + }); + } + } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageDecoder.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageDecoder.java index 25521b413..114c745d4 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageDecoder.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageDecoder.java @@ -12,7 +12,7 @@ public class MessageDecoder extends ByteToMessageDecoder protected void decode(ChannelHandlerContext channelContext, ByteBuf inputByteBuf, List outputDecodedObjectList) { NetworkMessage message = MessageRegistry.INSTANCE.createMessage(inputByteBuf.readShort()); - outputDecodedObjectList.add(INetworkObject.decode(message, inputByteBuf)); + outputDecodedObjectList.add(INetworkObject.decodeStatic(message, inputByteBuf)); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java index e1dfa41d2..f34f093e4 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/protocol/MessageRegistry.java @@ -21,18 +21,27 @@ public class MessageRegistry { // Note: Messages must have parameterless constructors - // Keep messages below intact so client/server can disconnect if version does not match + // Opening & closing connection + // These messages should be compatible with any previous protocol versions this.registerMessage(HelloMessage.class, HelloMessage::new); this.registerMessage(CloseReasonMessage.class, CloseReasonMessage::new); - // Define your messages after this line + // Core this.registerMessage(AckMessage.class, AckMessage::new); this.registerMessage(CancelMessage.class, CancelMessage::new); this.registerMessage(ExceptionMessage.class, ExceptionMessage::new); + + // ID & config this.registerMessage(PlayerUUIDMessage.class, PlayerUUIDMessage::new); this.registerMessage(RemotePlayerConfigMessage.class, RemotePlayerConfigMessage::new); + + // Full data requests this.registerMessage(FullDataSourceRequestMessage.class, FullDataSourceRequestMessage::new); this.registerMessage(FullDataSourceResponseMessage.class, FullDataSourceResponseMessage::new); + + // Generation task prioritization + this.registerMessage(GenTaskPriorityRequestMessage.class, GenTaskPriorityRequestMessage::new); + this.registerMessage(GenTaskPriorityResponseMessage.class, GenTaskPriorityResponseMessage::new); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java b/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java index 28e395df6..cc467042c 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/pos/DhSectionPos.java @@ -49,6 +49,8 @@ public class DhSectionPos implements INetworkObject + public static DhSectionPos zero() { return new DhSectionPos((byte) 0, 0, 0); }; + public DhSectionPos(byte sectionDetailLevel, int sectionX, int sectionZ) { this.sectionDetailLevel = sectionDetailLevel; diff --git a/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/IWrapperFactory.java b/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/IWrapperFactory.java index 0eeaf4efd..437d5b739 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/IWrapperFactory.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/IWrapperFactory.java @@ -39,7 +39,7 @@ public interface IWrapperFactory extends IBindable { AbstractBatchGenerationEnvironmentWrapper createBatchGenerator(IDhLevel targetLevel); IBiomeWrapper deserializeBiomeWrapper(String str, ILevelWrapper levelWrapper) throws IOException; - IBlockStateWrapper deserializeBlockStateWrapper(String str) throws IOException; + IBlockStateWrapper deserializeBlockStateWrapper(String str, ILevelWrapper levelWrapper) throws IOException; IBlockStateWrapper getAirBlockStateWrapper(); /** diff --git a/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/block/IBlockStateWrapper.java b/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/block/IBlockStateWrapper.java index 5df9357b4..d8ae596d4 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/block/IBlockStateWrapper.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/wrapperInterfaces/block/IBlockStateWrapper.java @@ -1,11 +1,12 @@ package com.seibel.distanthorizons.core.wrapperInterfaces.block; import com.seibel.distanthorizons.api.interfaces.block.IDhApiBlockStateWrapper; +import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; /** A Minecraft version independent way of handling Blocks. */ public interface IBlockStateWrapper extends IDhApiBlockStateWrapper { - String serialize(); + String serialize(ILevelWrapper levelWrapper); /** * Returning a value of 0 means the block is completely transparent.