From 61c516df1d9260ae4b725cfe259723f5f67e7cda Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Sat, 27 Jan 2024 19:45:30 +0500 Subject: [PATCH] Post-relog updates --- .../distanthorizons/core/config/Config.java | 19 +- .../fullDatafile/FullDataFileHandler.java | 4 +- .../RemoteFullDataFileHandler.java | 40 ++- .../WorldRemoteGenerationQueue.java | 253 +++------------ .../core/level/DhClientLevel.java | 75 +++-- .../core/level/DhServerLevel.java | 185 +++++++---- .../distanthorizons/core/level/IDhLevel.java | 2 + .../client/AbstractFullDataRequestQueue.java | 289 ++++++++++++++++++ .../client/FullDataRefreshQueue.java | 30 ++ .../config/AbstractMultiplayerConfig.java | 2 + .../multiplayer/config/MultiplayerConfig.java | 31 +- .../MultiplayerConfigChangeListener.java | 13 +- .../multiplayer/server/ServerPlayerState.java | 15 +- .../server/ServersideMultiplayerConfig.java | 18 +- .../core/network/IConnection.java | 7 + .../core/network/NetworkServer.java | 3 +- .../FullDataSourceRequestMessage.java | 46 +-- .../FullDataSourceResponseMessage.java | 66 ++-- .../core/sql/AbstractDataSourceRepo.java | 18 ++ .../SupplierBasedConcurrencyLimiter.java | 1 + 20 files changed, 746 insertions(+), 371 deletions(-) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/FullDataRefreshQueue.java diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java index 00274d35c..211508450 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java @@ -737,7 +737,7 @@ public class Config // deprecated and not implemented, can be made public if we ever re-implement it @Deprecated - private static ConfigEntry generationPriority = new ConfigEntry.Builder() + private static final ConfigEntry generationPriority = new ConfigEntry.Builder() .set(EGenerationPriority.NEAR_FIRST) .comment("" + "In what priority should fake chunks be generated outside the vanilla render distance? \n" @@ -871,8 +871,7 @@ public class Config + "") .build(); - /** Disabled, previous implementation is too terrible to continue using it. */ - private static ConfigEntry enablePostRelogUpdate = new ConfigEntry.Builder() + public static ConfigEntry enablePostRelogUpdate = new ConfigEntry.Builder() .setServersideShortName("enablePostRelogUpdate") .set(false) .comment("" @@ -903,7 +902,7 @@ public class Config .setServersideShortName("fullDataRequestConcurrencyLimit") .setMinDefaultMax(1, 20, 100) .comment("" - + "Limits the amount of sent/processed LOD requests concurrently on server, per player. \n" + + "Limits the amount of sent/processed LOD *generation* requests concurrently on server, per player. \n" + "") .build(); @@ -923,11 +922,19 @@ public class Config + "") .build(); + public static ConfigEntry postRelogUpdateConcurrencyLimit = new ConfigEntry.Builder() + .setServersideShortName("postRelogUpdateConcurrencyLimit") + .setMinDefaultMax(1, 50, 100) + .comment("" + + "Limits the amount of sent/processed LOD *update* requests concurrently on server, per player. \n" + + "") + .build(); + /** * Intentionally disabled. * @see #enablePostRelogUpdate */ - private static ConfigEntry fullDataChangeSummaryRequestRateLimit = new ConfigEntry.Builder() + private static final ConfigEntry fullDataChangeSummaryRequestRateLimit = new ConfigEntry.Builder() .setServersideShortName("fullDataChangeSummaryRequestRateLimit") .setMinDefaultMax(1, 20, 100) .comment("" @@ -1085,7 +1092,7 @@ public class Config // deprecated and not implemented, can be made public if we ever re-implement it @Deprecated - private static ConfigEntry rebuildTimes = new ConfigEntry.Builder() + private static final ConfigEntry rebuildTimes = new ConfigEntry.Builder() .set(EBufferRebuildTimes.NORMAL) .comment("" + "How frequently should vertex buffers (geometry) be rebuilt and sent to the GPU? \n" diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java index 5535568a0..09b848987 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataFileHandler.java @@ -57,13 +57,13 @@ public class FullDataFileHandler extends AbstractDataSourceHandler { + int checksum = Objects.requireNonNull(this.repo.getChecksumForSection(childPos)); + this.dataRefreshQueue.submitRequest(childPos, this.level::updateDataSourcesWithChunkData, checksum); + }); + + return fullDataSource; + } + + @Override + public void close() + { + if (this.dataRefreshQueue != null) + { + this.dataRefreshQueue.close(); + } + super.close(); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java index 97d143c60..ee6766f38 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java @@ -3,71 +3,50 @@ package com.seibel.distanthorizons.core.generation; import com.google.common.base.Stopwatch; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.config.types.ConfigEntry; -import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor; -import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource; import com.seibel.distanthorizons.core.generation.tasks.IWorldGenTaskTracker; import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult; import com.seibel.distanthorizons.core.level.IDhClientLevel; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; -import com.seibel.distanthorizons.core.logging.f3.F3Screen; +import com.seibel.distanthorizons.core.multiplayer.client.AbstractFullDataRequestQueue; import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState; -import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; -import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceRequestMessage; -import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceResponseMessage; import com.seibel.distanthorizons.core.network.messages.fullData.generation.priority.GenTaskPriorityRequestMessage; import com.seibel.distanthorizons.core.network.messages.fullData.generation.priority.GenTaskPriorityResponseMessage; import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.pos.DhSectionPos; -import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; import com.seibel.distanthorizons.core.util.LodUtil; import io.netty.channel.ChannelException; import org.apache.logging.log4j.Logger; -import javax.annotation.CheckForNull; -import java.awt.*; -import java.time.Duration; import java.util.*; import java.util.List; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.stream.Collectors; -public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebugRenderable +public class WorldRemoteGenerationQueue extends AbstractFullDataRequestQueue implements IWorldGenerationQueue, IDebugRenderable { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); - private static final long SHUTDOWN_TIMEOUT_SECONDS = 5; - - private final ClientNetworkState networkState; - private final IDhClientLevel level; - // Used to prevent requests for section very far away, as result of request list not completely filled. // Kinda a hack, since queue is not notified when file handler is done with feeding sections to generate private static final ConfigEntry REQUEST_BEGIN_DELAY = Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestBeginDelay; private final Stopwatch requestBeginStopwatch = Stopwatch.createStarted(); - private volatile CompletableFuture generatorClosingFuture = null; - - private final ConcurrentMap waitingTasks = new ConcurrentHashMap<>(); - private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true); - private CompletableFuture genTaskPriorityRequest = CompletableFuture.completedFuture(null); private final Semaphore genTaskPriorityRequestSemaphore = new Semaphore(1, true); - private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log); - private final AtomicInteger finishedRequests = new AtomicInteger(); - private final AtomicInteger failedRequests = new AtomicInteger(); - private final Set alreadyGeneratedPosHashSet = ConcurrentHashMap.newKeySet(); + @Override + protected int getRequestConcurrencyLimit() { return this.networkState.config.fullDataRequestConcurrencyLimit; } + + @Override + protected String getQueueName() { return "World Remote Generation Queue"; } + public WorldRemoteGenerationQueue(ClientNetworkState networkState, IDhClientLevel level) { - this.networkState = networkState; - this.level = level; - DebugRenderer.register(this, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue); + super(networkState, level, false, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue); } @@ -85,65 +64,53 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug @Override public CompletableFuture submitGenTask(DhSectionPos sectionPos, byte requiredDataDetail, IWorldGenTaskTracker tracker) { - LodUtil.assertTrue(sectionPos.getDetailLevel() == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); - - // check if this is a duplicate generation task - if (this.alreadyGeneratedPosHashSet.contains(sectionPos)) - { - // temporary solution to prevent generating the same section multiple times - LOGGER.trace("Duplicate generation section " + sectionPos + ". Skipping..."); - return CompletableFuture.completedFuture(WorldGenResult.CreateFail()); - } - this.alreadyGeneratedPosHashSet.add(sectionPos); - - WorldGenQueueEntry entry = new WorldGenQueueEntry(new CompletableFuture<>(), tracker); - waitingTasks.put(sectionPos, entry); - return entry.future; - } - - private int posDistanceSquared(DhBlockPos2D targetPos, DhSectionPos pos) - { - return (int) pos.getCenterBlockPos().distSquared(targetPos); + return super.submitRequest(sectionPos, tracker.getChunkDataConsumer()) + .thenApply(result -> result + ? WorldGenResult.CreateSuccess(sectionPos) + : WorldGenResult.CreateFail()); } @Override public void startGenerationQueueAndSetTargetPos(DhBlockPos2D targetPos) { - if (generatorClosingFuture != null || !networkState.getClient().isReady()) return; - if (requestBeginStopwatch.elapsed(TimeUnit.SECONDS) < REQUEST_BEGIN_DELAY.get()) return; - - while (getWaitingTaskCount() > getInProgressTaskCount() - && getInProgressTaskCount() < this.networkState.config.fullDataRequestConcurrencyLimit - && pendingTasksSemaphore.tryAcquire()) + if (this.requestBeginStopwatch.elapsed(TimeUnit.SECONDS) < REQUEST_BEGIN_DELAY.get()) { - sendNewRequest(targetPos); + return; + } + if (!super.tick(targetPos)) + { + return; } - if (genTaskPriorityRequestSemaphore.tryAcquire()) { - List posList = waitingTasks.entrySet().stream() + if (this.genTaskPriorityRequestSemaphore.tryAcquire()) { + List posList = this.waitingTasks.entrySet().stream() .filter(task -> task.getValue().request == null && task.getValue().priority == 0) - .sorted((x, y) -> posDistanceSquared(targetPos, x.getKey()) - posDistanceSquared(targetPos, y.getKey())) + .sorted((x, y) -> this.posDistanceSquared(targetPos, x.getKey()) - this.posDistanceSquared(targetPos, y.getKey())) .limit(this.networkState.config.genTaskPriorityRequestRateLimit) .map(Map.Entry::getKey) .collect(Collectors.toList()); if (posList.isEmpty()) { - genTaskPriorityRequestSemaphore.release(); + this.genTaskPriorityRequestSemaphore.release(); return; }; CompletableFuture request = this.networkState.getClient().sendRequest(new GenTaskPriorityRequestMessage(posList), GenTaskPriorityResponseMessage.class); - genTaskPriorityRequest = request; + this.genTaskPriorityRequest = request; request.handleAsync((response, throwable) -> { try { if (throwable != null) + { throw throwable; + } for (Map.Entry mapEntry : response.posList.entrySet()) { - WorldGenQueueEntry entry = waitingTasks.get(mapEntry.getKey()); + RequestQueueEntry entry = this.waitingTasks.get(mapEntry.getKey()); if (entry != null) + { entry.priority = mapEntry.getValue(); + } } } catch (ChannelException | CancellationException | RateLimitedException ignored) @@ -154,7 +121,7 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug LOGGER.error("Error while fetching gen task priorities", e); } - genTaskPriorityRequestSemaphore.release(); + this.genTaskPriorityRequestSemaphore.release(); return null; }); } @@ -163,168 +130,28 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug @Override public void cancelGenTasks(Iterable positions) { - for (DhSectionPos pos : positions) - { - WorldGenQueueEntry entry = waitingTasks.remove(pos); - if (entry != null) - { - entry.future.cancel(false); - if (entry.request != null) - entry.request.cancel(false); - alreadyGeneratedPosHashSet.remove(pos); - } - } + super.cancelRequests(positions); } - private void sendNewRequest(DhBlockPos2D targetPos) - { - Map.Entry mapEntry = waitingTasks.entrySet().stream() - .filter(task -> task.getValue().request == null) - .reduce(null, (a, b) - -> a == null - || b.getValue().priority > a.getValue().priority - || (b.getValue().priority == a.getValue().priority && posDistanceSquared(targetPos, b.getKey()) < posDistanceSquared(targetPos, a.getKey())) - ? b : a); - if (mapEntry == null) - { - pendingTasksSemaphore.release(); - return; - } - - DhSectionPos sectionPos = mapEntry.getKey(); - WorldGenQueueEntry entry = mapEntry.getValue(); - - CompletableFuture request = this.networkState.getClient().sendRequest(new FullDataSourceRequestMessage(level.getLevelWrapper(), sectionPos), FullDataSourceResponseMessage.class); - entry.request = request; - request.handleAsync((response, throwable) -> - { - pendingTasksSemaphore.release(); - finishedRequests.incrementAndGet(); - - try - { - if (throwable != null) - throw throwable; - - waitingTasks.remove(sectionPos); - LOGGER.debug("FullDataSourceResponseMessage " + sectionPos); - - CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level); - Consumer chunkDataConsumer = entry.tracker.getChunkDataConsumer(); - - // FIXME Why keeping a reference in first place - if (chunkDataConsumer == null) - return entry.future.cancel(false); - - fullDataSource.splitIntoChunkSizedAccessors(chunkDataConsumer); - response.getFullDataSourceLoader().returnPooledDataSource(fullDataSource); - } - catch (InvalidLevelException ignored) - { - // We're too late - } - catch (ChannelException | RateLimitedException e) - { - if (e instanceof RateLimitedException) - LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage()); - - entry.request = null; - finishedRequests.decrementAndGet(); - } - catch (CancellationException ignored) - { - finishedRequests.decrementAndGet(); - } - catch (Throwable e) - { - LOGGER.error("Error while fetching full data source", e); - failedRequests.incrementAndGet(); - return entry.future.complete(WorldGenResult.CreateFail()); - } - - return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos)); - }); - } - - private String[] f3Log() - { - ArrayList lines = new ArrayList<>(); - lines.add("World Remote Generation Queue ["+level.getClientLevelWrapper().getDimensionType().getDimensionName()+"]"); - lines.add("Requests: "+this.finishedRequests+" / "+(this.getWaitingTaskCount() + this.finishedRequests.get())+" (failed: "+ this.failedRequests+", rate limit: "+this.networkState.config.fullDataRequestConcurrencyLimit +")"); - return lines.toArray(new String[0]); - } - - @Override - public int getWaitingTaskCount() { return this.waitingTasks.size(); } - @Override - public int getInProgressTaskCount() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); } - @Override public CompletableFuture startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) { - return this.generatorClosingFuture = CompletableFuture.runAsync(() -> { + return CompletableFuture.allOf(super.startClosing(alsoInterruptRunning), CompletableFuture.runAsync(() -> { Stopwatch stopwatch = Stopwatch.createStarted(); do { - if (genTaskPriorityRequest.cancel(false)) - genTaskPriorityRequestSemaphore.release(); - } - while (!genTaskPriorityRequestSemaphore.tryAcquire() && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS); - - do - { - for (WorldGenQueueEntry entry : this.waitingTasks.values()) + if (this.genTaskPriorityRequest.cancel(false)) { - entry.future.cancel(alsoInterruptRunning); - if (entry.request != null && entry.request.cancel(alsoInterruptRunning)) - pendingTasksSemaphore.release(); + this.genTaskPriorityRequestSemaphore.release(); } } - while (!pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE) && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS); - + while (!this.genTaskPriorityRequestSemaphore.tryAcquire() && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS); + if (stopwatch.elapsed(TimeUnit.SECONDS) >= SHUTDOWN_TIMEOUT_SECONDS) - LOGGER.warn("Generation queue for " + level.getLevelWrapper() + " did not shutdown in " + SHUTDOWN_TIMEOUT_SECONDS + " seconds! Some unfinished tasks might be left hanging."); - }); - } - - @Override - public void close() - { - f3Message.close(); - DebugRenderer.unregister(this, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue); - } - - @Override - public void debugRender(DebugRenderer r) - { - for (Map.Entry mapEntry : waitingTasks.entrySet()) - { - r.renderBox(new DebugRenderer.Box(mapEntry.getKey(), -32f, 64f, 0.05f, - mapEntry.getValue().request != null ? Color.red - : mapEntry.getValue().priority == 3 ? Color.orange - : mapEntry.getValue().priority == 2 ? Color.cyan - : mapEntry.getValue().priority == 1 ? Color.blue - : Color.gray - )); - } - } - - private static class WorldGenQueueEntry - { - public final CompletableFuture future; - public final IWorldGenTaskTracker tracker; - - // Higher value = higher priority. - // Priority of 0 is reserved for unassigned value - public int priority = 0; - @CheckForNull - public CompletableFuture request; - - public WorldGenQueueEntry(CompletableFuture future, IWorldGenTaskTracker tracker) - { - this.future = future; - this.tracker = tracker; - } + { + LOGGER.warn("Priority request queue for " + this.level.getLevelWrapper() + " did not shutdown in " + SHUTDOWN_TIMEOUT_SECONDS + " seconds! It might be left hanging."); + } + })); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java index 3ec690fba..d69addbae 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhClientLevel.java @@ -29,6 +29,7 @@ import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure; import com.seibel.distanthorizons.core.generation.WorldRemoteGenerationQueue; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState; +import com.seibel.distanthorizons.core.multiplayer.client.FullDataRefreshQueue; import com.seibel.distanthorizons.core.network.NetworkClient; import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource; import com.seibel.distanthorizons.core.network.messages.fullData.updates.FullDataPartialUpdateMessage; @@ -73,11 +74,15 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel private final ClientNetworkState networkState; @Nullable private final ScopedNetworkEventSource eventSource; + public final WorldGenModule worldGenModule; public final AppliedConfigState worldGeneratorEnabledConfig; + @Nullable + private final FullDataRefreshQueue dataRefreshQueue; + + - //=============// // constructor // //=============// @@ -91,21 +96,24 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel } this.levelWrapper = clientLevelWrapper; this.saveStructure = saveStructure; - this.dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure, fullDataSaveDirOverride); - this.worldGeneratorEnabledConfig = new AppliedConfigState<>(Config.Client.Advanced.WorldGenerator.enableDistantGeneration); this.networkState = networkState; - this.worldGenModule = new WorldGenModule(dataFileHandler, this); if (networkState != null) { this.eventSource = new ScopedNetworkEventSource<>(networkState.getClient()); + this.dataRefreshQueue = new FullDataRefreshQueue(this, networkState); this.registerNetworkHandlers(); } else { this.eventSource = null; + this.dataRefreshQueue = null; } + this.dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure, fullDataSaveDirOverride, this.dataRefreshQueue); + this.worldGeneratorEnabledConfig = new AppliedConfigState<>(Config.Client.Advanced.WorldGenerator.enableDistantGeneration); + this.worldGenModule = new WorldGenModule(this.dataFileHandler, this); + this.clientside = new ClientLevelModule(this); if (enableRendering) { @@ -123,7 +131,10 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel try { ChunkSizedFullDataAccessor fullDataAccessor = msg.getFullDataSource(this); - if (fullDataAccessor == null) return; + if (fullDataAccessor == null) + { + return; + } this.updateDataSourcesWithChunkData(fullDataAccessor); } @@ -145,6 +156,11 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel { this.chunkToLodBuilder.tick(); this.clientside.clientTick(); + + if (this.dataRefreshQueue != null) + { + this.dataRefreshQueue.tick(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos())); + } } catch (Exception e) { @@ -152,37 +168,38 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel } } + @Override public void doWorldGen() { - boolean isClientUsable = networkState != null && !networkState.getClient().isClosed(); - boolean shouldDoWorldGen = isClientUsable && this.networkState.config.distantGenerationEnabled && clientside.isRendering(); - boolean isWorldGenRunning = worldGenModule.isWorldGenRunning(); + boolean isClientUsable = this.networkState != null && !this.networkState.getClient().isClosed(); + boolean shouldDoWorldGen = isClientUsable && this.networkState.config.distantGenerationEnabled && this.clientside.isRendering(); + boolean isWorldGenRunning = this.worldGenModule.isWorldGenRunning(); if (shouldDoWorldGen && !isWorldGenRunning) { // start world gen - worldGenModule.startWorldGen(this.dataFileHandler, new WorldGenState(this, this.networkState)); + this.worldGenModule.startWorldGen(this.dataFileHandler, new WorldGenState(this, this.networkState)); } else if (!shouldDoWorldGen && isWorldGenRunning) { // stop world gen - worldGenModule.stopWorldGen(this.dataFileHandler); + this.worldGenModule.stopWorldGen(this.dataFileHandler); } - if (worldGenModule.isWorldGenRunning()) + if (this.worldGenModule.isWorldGenRunning()) { - ClientLevelModule.ClientRenderState renderState = clientside.ClientRenderStateRef.get(); + ClientLevelModule.ClientRenderState renderState = this.clientside.ClientRenderStateRef.get(); if (renderState != null && renderState.quadtree != null) { - dataFileHandler.removeGenRequestIf(p -> !renderState.quadtree.isSectionPosInBounds(p)); + this.dataFileHandler.removeGenRequestIf(p -> !renderState.quadtree.isSectionPosInBounds(p)); } - worldGenModule.worldGenTick(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos())); + this.worldGenModule.worldGenTick(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos())); } } @Override public void render(Mat4f mcModelViewMatrix, Mat4f mcProjectionMatrix, float partialTicks, IProfilerWrapper profiler) { - clientside.render(mcModelViewMatrix, mcProjectionMatrix, partialTicks, profiler); + this.clientside.render(mcModelViewMatrix, mcProjectionMatrix, partialTicks, profiler); } @@ -192,35 +209,37 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel //================// @Override - public int computeBaseColor(DhBlockPos pos, IBiomeWrapper biome, IBlockStateWrapper block) { return levelWrapper.computeBaseColor(pos, biome, block); } + public int computeBaseColor(DhBlockPos pos, IBiomeWrapper biome, IBlockStateWrapper block) { return this.levelWrapper.computeBaseColor(pos, biome, block); } @Override - public IClientLevelWrapper getClientLevelWrapper() { return levelWrapper; } + public IClientLevelWrapper getClientLevelWrapper() { return this.levelWrapper; } @Override public void clearRenderCache() { - clientside.clearRenderCache(); + this.clientside.clearRenderCache(); } @Override - public ILevelWrapper getLevelWrapper() { return levelWrapper; } + public ILevelWrapper getLevelWrapper() { return this.levelWrapper; } @Override public void updateDataSourcesWithChunkData(ChunkSizedFullDataAccessor data) { this.clientside.updateDataSourcesWithChunkData(data); } @Override - public int getMinY() { return levelWrapper.getMinHeight(); } + public int getMinY() { return this.levelWrapper.getMinHeight(); } @Override public void close() { - if (worldGenModule != null) - worldGenModule.close(); - clientside.close(); + if (this.worldGenModule != null) + { + this.worldGenModule.close(); + } + this.clientside.close(); super.close(); - dataFileHandler.close(); - LOGGER.info("Closed " + DhClientLevel.class.getSimpleName() + " for " + levelWrapper); + this.dataFileHandler.close(); + LOGGER.info("Closed " + DhClientLevel.class.getSimpleName() + " for " + this.levelWrapper); } //=======================// @@ -230,13 +249,13 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel @Override public IFullDataSourceProvider getFileHandler() { - return dataFileHandler; + return this.dataFileHandler; } @Override public AbstractSaveStructure getSaveStructure() { - return saveStructure; + return this.saveStructure; } @Override @@ -253,6 +272,6 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel 0.2, 32f ) ); - clientside.reloadPos(pos); + this.clientside.reloadPos(pos); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index d5dc85ba0..be4003021 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -56,6 +56,7 @@ import java.util.concurrent.*; public class DhServerLevel extends DhLevel implements IDhServerLevel { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); + public final ServerLevelModule serverside; private final IServerLevelWrapper serverLevelWrapper; @@ -89,36 +90,73 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel private void registerNetworkHandlers() { - this.eventSource.registerHandler(FullDataSourceRequestMessage.class, remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) -> + this.eventSource.registerHandler(FullDataSourceRequestMessage.class, this.remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) -> { - if (!serverPlayerState.config.isDistantGenerationEnabled()) + if (msg.checksum == null) { - msg.sendResponse(new RequestRejectedException("Operation is disabled from config.")); - return; - } - - if (!serverPlayerState.fullDataRequestConcurrencyLimiter.tryAcquire(msg)) - return; - - while (true) - { - IncompleteDataSourceEntry entry = incompleteDataSources.computeIfAbsent(msg.dhSectionPos, pos -> { - IncompleteDataSourceEntry newEntry = new IncompleteDataSourceEntry(); - this.trySetGeneratedDataSourceToEntry(newEntry, pos); - return newEntry; - }); - // If this fails, current entry is being drained and need to create another one - if (entry.requestCollectionSemaphore.tryAcquire()) + // Normal generation + + if (!serverPlayerState.config.isDistantGenerationEnabled()) { - fullDataRequests.put(msg.futureId, entry); - entry.requestMessages.put(msg.futureId, msg); - entry.requestCollectionSemaphore.release(); - break; + msg.sendResponse(new RequestRejectedException("Operation is disabled from config.")); + return; } + + if (!serverPlayerState.fullDataRequestConcurrencyLimiter.tryAcquire(msg)) + { + return; + } + + while (true) + { + IncompleteDataSourceEntry entry = this.incompleteDataSources.computeIfAbsent(msg.sectionPos, pos -> + { + IncompleteDataSourceEntry newEntry = new IncompleteDataSourceEntry(); + this.trySetGeneratedDataSourceToEntry(newEntry, pos); + return newEntry; + }); + // If this fails, current entry is being drained and need to create another one + if (entry.requestCollectionSemaphore.tryAcquire()) + { + this.fullDataRequests.put(msg.futureId, entry); + entry.requestMessages.put(msg.futureId, msg); + entry.requestCollectionSemaphore.release(); + break; + } + } + } + else + { + // Post-relog update + + if (!serverPlayerState.config.isPostRelogUpdateEnabled()) + { + msg.sendResponse(new RequestRejectedException("Operation is disabled from config.")); + return; + } + + if (!serverPlayerState.postRelogUpdateRequestConcurrencyLimiter.tryAcquire(msg)) + { + return; + } + + Integer serverChecksum = this.serverside.dataFileHandler.repo.getChecksumForSection(msg.sectionPos); + if (serverChecksum == null || serverChecksum.equals(msg.checksum)) + { + serverPlayerState.postRelogUpdateRequestConcurrencyLimiter.release(); + msg.sendResponse(new FullDataSourceResponseMessage(null, this)); + return; + } + + this.serverside.dataFileHandler.getAsync(msg.sectionPos).thenAccept(fullDataSource -> + { + serverPlayerState.postRelogUpdateRequestConcurrencyLimiter.release(); + msg.sendResponse(new FullDataSourceResponseMessage((CompleteFullDataSource) fullDataSource, this)); + }); } })); - this.eventSource.registerHandler(GenTaskPriorityRequestMessage.class, remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) -> + this.eventSource.registerHandler(GenTaskPriorityRequestMessage.class, this.remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) -> { msg.sendResponse(new GenTaskPriorityResponseMessage( this.serverside.dataFileHandler.getLoadStates(msg.posList.stream() @@ -130,18 +168,23 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel this.eventSource.registerHandler(CancelMessage.class, msg -> { IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId); - if (entry == null) return; + if (entry == null) + { + return; + } FullDataSourceRequestMessage requestMessage = entry.requestMessages.remove(msg.futureId); - ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg); + ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg); if (serverPlayerState != null) + { serverPlayerState.fullDataRequestConcurrencyLimiter.release(); + } entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); if (entry.requestMessages.isEmpty()) { - incompleteDataSources.remove(requestMessage.dhSectionPos); - serverside.dataFileHandler.removeGenRequestIf(pos -> pos == requestMessage.dhSectionPos); + this.incompleteDataSources.remove(requestMessage.sectionPos); + this.serverside.dataFileHandler.removeGenRequestIf(pos -> pos == requestMessage.sectionPos); } entry.requestCollectionSemaphore.release(Short.MAX_VALUE); @@ -158,20 +201,23 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel this.worldGenLoopingQueue.remove(serverPlayer); } + @Override public void serverTick() { - chunkToLodBuilder.tick(); + this.chunkToLodBuilder.tick(); // Send finished data source requests - for (Map.Entry mapEntry : incompleteDataSources.entrySet()) + for (Map.Entry mapEntry : this.incompleteDataSources.entrySet()) { IncompleteDataSourceEntry entry = mapEntry.getValue(); if (entry.fullDataSource == null || entry.fullDataSource instanceof IIncompleteFullDataSource) + { continue; + } LodUtil.assertTrue(entry.fullDataSource instanceof CompleteFullDataSource, "Invalid full data source"); - incompleteDataSources.remove(mapEntry.getKey()); + this.incompleteDataSources.remove(mapEntry.getKey()); // This semaphore is intentionally acquired forever entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); @@ -181,9 +227,11 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel { this.fullDataRequests.remove(msg.futureId); - ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg); + ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg); if (serverPlayerState == null) + { continue; + } serverPlayerState.fullDataRequestConcurrencyLimiter.release(); msg.sendResponse(new FullDataSourceResponseMessage(completeSource, this)); @@ -191,22 +239,30 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel } // Send updated chunks after delay - for (Map.Entry chunkUpdateEntry : chunkUpdatesToSend.entrySet()) + for (Map.Entry chunkUpdateEntry : this.chunkUpdatesToSend.entrySet()) { ChunkUpdateData chunkUpdateData = chunkUpdateEntry.getValue(); if (System.currentTimeMillis() < chunkUpdateData.time + CHUNK_UPDATE_SEND_DELAY) - continue; - - chunkUpdatesToSend.remove(chunkUpdateEntry.getKey()); - - for (ServerPlayerState serverPlayerState : remotePlayerConnectionHandler.getConnectedPlayers()) { - if (!serverPlayerState.config.isRealTimeUpdatesEnabled()) continue; + continue; + } + + this.chunkUpdatesToSend.remove(chunkUpdateEntry.getKey()); + + for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers()) + { + if (!serverPlayerState.config.isRealTimeUpdatesEnabled()) + { + continue; + } double distanceFromPlayer = chunkUpdateData.accessor.chunkPos.distance(new DhChunkPos(serverPlayerState.serverPlayer.getPosition())); if (distanceFromPlayer < serverPlayerState.serverPlayer.getViewDistance() || - distanceFromPlayer > serverPlayerState.config.getRenderDistanceRadius()) return; + distanceFromPlayer > serverPlayerState.config.getRenderDistanceRadius()) + { + return; + } serverPlayerState.connection.sendMessage(new FullDataPartialUpdateMessage(chunkUpdateData.accessor, this)); } @@ -218,10 +274,14 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel { CompletableFuture future = super.updateChunkAsync(chunk); if (future == null) + { return null; + } if (!Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get()) + { return future; + } future.thenAccept(chunkSizedFullDataAccessor -> { @@ -240,37 +300,42 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel } @Override - public int getMinY() { return getLevelWrapper().getMinHeight(); } + public int getMinY() + { + return this.getLevelWrapper().getMinHeight(); + } @Override public void close() { super.close(); - serverside.close(); - LOGGER.info("Closed DHLevel for {}", getLevelWrapper()); + this.serverside.close(); + LOGGER.info("Closed DHLevel for {}", this.getLevelWrapper()); } @Override public void doWorldGen() { boolean shouldDoWorldGen = true; //todo; - boolean isWorldGenRunning = serverside.worldGenModule.isWorldGenRunning(); + boolean isWorldGenRunning = this.serverside.worldGenModule.isWorldGenRunning(); if (shouldDoWorldGen && !isWorldGenRunning) { // start world gen - serverside.worldGenModule.startWorldGen(serverside.dataFileHandler, new ServerLevelModule.WorldGenState(this)); + this.serverside.worldGenModule.startWorldGen(this.serverside.dataFileHandler, new ServerLevelModule.WorldGenState(this)); } else if (!shouldDoWorldGen && isWorldGenRunning) { // stop world gen - serverside.worldGenModule.stopWorldGen(serverside.dataFileHandler); + this.serverside.worldGenModule.stopWorldGen(this.serverside.dataFileHandler); } - if (serverside.worldGenModule.isWorldGenRunning()) + if (this.serverside.worldGenModule.isWorldGenRunning()) { IServerPlayerWrapper firstPlayer = this.worldGenLoopingQueue.peek(); if (firstPlayer == null) + { return; + } // Put first player in back before removing from front, so it can be removed by other thread without blocking // - if it gets removed, remove() below will remove the item we just put instead @@ -278,23 +343,32 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel this.worldGenLoopingQueue.remove(firstPlayer); Vec3d position = firstPlayer.getPosition(); - serverside.worldGenModule.worldGenTick(new DhBlockPos2D((int) position.x, (int) position.z)); + this.serverside.worldGenModule.worldGenTick(new DhBlockPos2D((int) position.x, (int) position.z)); } } @Override - public IServerLevelWrapper getServerLevelWrapper() { return serverLevelWrapper; } + public IServerLevelWrapper getServerLevelWrapper() + { + return this.serverLevelWrapper; + } @Override - public ILevelWrapper getLevelWrapper() { return getServerLevelWrapper(); } + public ILevelWrapper getLevelWrapper() + { + return this.getServerLevelWrapper(); + } @Override - public IFullDataSourceProvider getFileHandler() { return serverside.dataFileHandler; } + public IFullDataSourceProvider getFileHandler() + { + return this.serverside.dataFileHandler; + } @Override public AbstractSaveStructure getSaveStructure() { - return serverside.saveStructure; + return this.serverside.saveStructure; } @Override @@ -304,9 +378,13 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel { this.serverside.dataFileHandler.getAsync(pos).thenAccept(fullDataSource -> { if (fullDataSource instanceof IIncompleteFullDataSource) + { fullDataSource = ((IIncompleteFullDataSource) fullDataSource).tryPromotingToCompleteDataSource(); + } if (fullDataSource instanceof CompleteFullDataSource) + { entry.fullDataSource = fullDataSource; + } }); } @@ -314,7 +392,10 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel public void onWorldGenTaskComplete(DhSectionPos pos) { IncompleteDataSourceEntry entry = this.incompleteDataSources.get(pos); - if (entry == null) return; + if (entry == null) + { + return; + } this.trySetGeneratedDataSourceToEntry(entry, pos); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java index b707c7166..c97be2d74 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/IDhLevel.java @@ -38,6 +38,8 @@ public interface IDhLevel extends AutoCloseable */ ILevelWrapper getLevelWrapper(); + void updateDataSourcesWithChunkData(ChunkSizedFullDataAccessor data); + @Nullable CompletableFuture updateChunkAsync(IChunkWrapper chunk); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java new file mode 100644 index 000000000..67755ff15 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/AbstractFullDataRequestQueue.java @@ -0,0 +1,289 @@ +package com.seibel.distanthorizons.core.multiplayer.client; + +import com.google.common.base.Stopwatch; +import com.seibel.distanthorizons.core.config.types.ConfigEntry; +import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor; +import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource; +import com.seibel.distanthorizons.core.level.IDhClientLevel; +import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; +import com.seibel.distanthorizons.core.logging.f3.F3Screen; +import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException; +import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException; +import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceRequestMessage; +import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceResponseMessage; +import com.seibel.distanthorizons.core.pos.DhBlockPos2D; +import com.seibel.distanthorizons.core.pos.DhSectionPos; +import com.seibel.distanthorizons.core.render.renderer.DebugRenderer; +import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable; +import com.seibel.distanthorizons.core.util.LodUtil; +import io.netty.channel.ChannelException; +import org.apache.logging.log4j.Logger; + +import javax.annotation.CheckForNull; +import javax.annotation.Nullable; +import java.awt.*; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public abstract class AbstractFullDataRequestQueue implements IDebugRenderable, AutoCloseable +{ + private static final Logger LOGGER = DhLoggerBuilder.getLogger(); + + protected static final long SHUTDOWN_TIMEOUT_SECONDS = 5; + + public final ClientNetworkState networkState; + protected final IDhClientLevel level; + private final boolean changedOnly; + + private volatile CompletableFuture closingFuture = null; + + protected final ConcurrentMap waitingTasks = new ConcurrentHashMap<>(); + private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true); + + private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log); + private final AtomicInteger finishedRequests = new AtomicInteger(); + private final AtomicInteger failedRequests = new AtomicInteger(); + private final ConfigEntry showDebugWireframeConfig; + + private final Set alreadyRequestedPositions = ConcurrentHashMap.newKeySet(); + + + protected abstract int getRequestConcurrencyLimit(); + + protected abstract String getQueueName(); + + + public AbstractFullDataRequestQueue(ClientNetworkState networkState, IDhClientLevel level, boolean changedOnly, ConfigEntry showDebugWireframeConfig) + { + this.networkState = networkState; + this.level = level; + this.changedOnly = changedOnly; + this.showDebugWireframeConfig = showDebugWireframeConfig; + DebugRenderer.register(this, this.showDebugWireframeConfig); + } + + public CompletableFuture submitRequest(DhSectionPos sectionPos, Consumer chunkDataConsumer) + { + return this.submitRequest(sectionPos, chunkDataConsumer, null); + } + public CompletableFuture submitRequest(DhSectionPos sectionPos, Consumer chunkDataConsumer, @Nullable Integer currentChecksum) + { + LodUtil.assertTrue(sectionPos.getDetailLevel() == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); + + // check if this is a duplicate task + if (this.alreadyRequestedPositions.contains(sectionPos)) + { + // temporary solution to prevent requesting the same section multiple times + LOGGER.trace("Duplicate section " + sectionPos + ". Skipping..."); + return CompletableFuture.completedFuture(false); + } + this.alreadyRequestedPositions.add(sectionPos); + + RequestQueueEntry entry = new RequestQueueEntry(chunkDataConsumer, currentChecksum); + this.waitingTasks.put(sectionPos, entry); + return entry.future; + } + + protected int posDistanceSquared(DhBlockPos2D targetPos, DhSectionPos pos) + { + return (int) pos.getCenterBlockPos().distSquared(targetPos); + } + + public synchronized boolean tick(DhBlockPos2D targetPos) + { + if (this.closingFuture != null || !this.networkState.getClient().isReady()) + { + return false; + } + + while (this.getWaitingTaskCount() > this.getInProgressTaskCount() + && this.getInProgressTaskCount() < this.getRequestConcurrencyLimit() + && this.pendingTasksSemaphore.tryAcquire()) + { + this.sendNewRequest(targetPos); + } + + return true; + } + + public void cancelRequests(Iterable positions) + { + for (DhSectionPos pos : positions) + { + RequestQueueEntry entry = this.waitingTasks.remove(pos); + if (entry != null) + { + entry.future.cancel(false); + if (entry.request != null) + { + entry.request.cancel(false); + } + this.alreadyRequestedPositions.remove(pos); + } + } + } + + private void sendNewRequest(DhBlockPos2D targetPos) + { + Map.Entry mapEntry = this.waitingTasks.entrySet().stream() + .filter(task -> task.getValue().request == null) + .reduce(null, (a, b) + -> a == null + || b.getValue().priority > a.getValue().priority + || (b.getValue().priority == a.getValue().priority && this.posDistanceSquared(targetPos, b.getKey()) < this.posDistanceSquared(targetPos, a.getKey())) + ? b : a); + if (mapEntry == null) + { + this.pendingTasksSemaphore.release(); + return; + } + + DhSectionPos sectionPos = mapEntry.getKey(); + RequestQueueEntry entry = mapEntry.getValue(); + + CompletableFuture request = this.networkState.getClient().sendRequest(new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, entry.currentChecksum), FullDataSourceResponseMessage.class); + entry.request = request; + request.handleAsync((response, throwable) -> + { + this.pendingTasksSemaphore.release(); + this.finishedRequests.incrementAndGet(); + + try + { + if (throwable != null) + { + throw throwable; + } + + this.waitingTasks.remove(sectionPos); + LOGGER.debug("FullDataSourceResponseMessage " + sectionPos); + + CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, this.level); + + if (fullDataSource != null) + { + fullDataSource.splitIntoChunkSizedAccessors(entry.chunkDataConsumer); + response.getFullDataSourceLoader().returnPooledDataSource(fullDataSource); + } + else + { + LodUtil.assertTrue(this.changedOnly, "Received empty data source response for not changed-only request"); + } + } + catch (InvalidLevelException ignored) + { + // We're too late + } + catch (ChannelException | RateLimitedException e) + { + if (e instanceof RateLimitedException) + { + LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage()); + } + + entry.request = null; + this.finishedRequests.decrementAndGet(); + } + catch (CancellationException ignored) + { + this.finishedRequests.decrementAndGet(); + } + catch (Throwable e) + { + LOGGER.error("Error while fetching full data source", e); + this.failedRequests.incrementAndGet(); + return entry.future.complete(false); + } + + return entry.future.complete(true); + }); + } + + private String[] f3Log() + { + ArrayList lines = new ArrayList<>(); + lines.add(this.getQueueName() + " [" + this.level.getClientLevelWrapper().getDimensionType().getDimensionName() + "]"); + lines.add("Requests: " + this.finishedRequests + " / " + (this.getWaitingTaskCount() + this.finishedRequests.get()) + " (failed: " + this.failedRequests + ", rate limit: " + this.getRequestConcurrencyLimit() + ")"); + return lines.toArray(new String[0]); + } + + public int getWaitingTaskCount() { return this.waitingTasks.size(); } + + public int getInProgressTaskCount() { return Short.MAX_VALUE - this.pendingTasksSemaphore.availablePermits(); } + + public CompletableFuture startClosing(boolean alsoInterruptRunning) + { + return this.closingFuture = CompletableFuture.runAsync(() -> { + Stopwatch stopwatch = Stopwatch.createStarted(); + + do + { + for (RequestQueueEntry entry : this.waitingTasks.values()) + { + entry.future.cancel(alsoInterruptRunning); + if (entry.request != null && entry.request.cancel(alsoInterruptRunning)) + { + this.pendingTasksSemaphore.release(); + } + } + } + while (!this.pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE) && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS); + + if (stopwatch.elapsed(TimeUnit.SECONDS) >= SHUTDOWN_TIMEOUT_SECONDS) + { + LOGGER.warn(this.getQueueName() + " for " + this.level.getLevelWrapper() + " did not shutdown in " + SHUTDOWN_TIMEOUT_SECONDS + " seconds! Some unfinished tasks might be left hanging."); + } + }); + } + + @Override + public void close() + { + this.f3Message.close(); + DebugRenderer.unregister(this, this.showDebugWireframeConfig); + } + + @Override + public void debugRender(DebugRenderer r) + { + for (Map.Entry mapEntry : this.waitingTasks.entrySet()) + { + r.renderBox(new DebugRenderer.Box(mapEntry.getKey(), -32f, 64f, 0.05f, + mapEntry.getValue().request != null ? Color.red + : mapEntry.getValue().priority == 3 ? Color.orange + : mapEntry.getValue().priority == 2 ? Color.cyan + : mapEntry.getValue().priority == 1 ? Color.blue + : Color.gray + )); + } + } + + protected static class RequestQueueEntry + { + public final CompletableFuture future = new CompletableFuture<>(); + public final Consumer chunkDataConsumer; + @Nullable + public final Integer currentChecksum; + + // Higher value = higher priority. + // Priority of 0 is reserved for unassigned value + public int priority = 0; + @CheckForNull + public CompletableFuture request; + + public RequestQueueEntry( + Consumer chunkDataConsumer, + @Nullable + Integer currentChecksum) + { + this.chunkDataConsumer = chunkDataConsumer; + this.currentChecksum = currentChecksum; + } + + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/FullDataRefreshQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/FullDataRefreshQueue.java new file mode 100644 index 000000000..8e92afd4c --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/FullDataRefreshQueue.java @@ -0,0 +1,30 @@ +package com.seibel.distanthorizons.core.multiplayer.client; + +import com.seibel.distanthorizons.core.config.Config; +import com.seibel.distanthorizons.core.level.IDhClientLevel; +import com.seibel.distanthorizons.core.pos.DhBlockPos2D; + +public class FullDataRefreshQueue extends AbstractFullDataRequestQueue +{ + public FullDataRefreshQueue(IDhClientLevel level, ClientNetworkState networkState) + { + super(networkState, level, true, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue); + } + + @Override + protected int getRequestConcurrencyLimit() { return this.networkState.config.postRelogUpdateConcurrencyLimit; } + + @Override + protected String getQueueName() { return "Data Refresh Queue"; } + + @Override + public boolean tick(DhBlockPos2D targetPos) + { + if (!this.networkState.config.postRelogUpdateEnabled) + { + return false; + } + return super.tick(targetPos); + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/AbstractMultiplayerConfig.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/AbstractMultiplayerConfig.java index f1ad36d78..7086c7ac3 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/AbstractMultiplayerConfig.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/AbstractMultiplayerConfig.java @@ -11,6 +11,7 @@ public abstract class AbstractMultiplayerConfig implements INetworkObject public abstract int getGenTaskPriorityRequestRateLimit(); public abstract boolean isRealTimeUpdatesEnabled(); public abstract boolean isPostRelogUpdateEnabled(); + public abstract int getPostRelogUpdateConcurrencyLimit(); @Override public void encode(ByteBuf out) @@ -21,6 +22,7 @@ public abstract class AbstractMultiplayerConfig implements INetworkObject out.writeInt(this.getGenTaskPriorityRequestRateLimit()); out.writeBoolean(this.isRealTimeUpdatesEnabled()); out.writeBoolean(this.isPostRelogUpdateEnabled()); + out.writeInt(this.getPostRelogUpdateConcurrencyLimit()); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfig.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfig.java index 631a3ce03..578e84fac 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfig.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfig.java @@ -8,22 +8,25 @@ public class MultiplayerConfig extends AbstractMultiplayerConfig // IMPORTANT: Once you added/removed config fields, modify MultiplayerConfigChangeListener accordingly. public int renderDistanceRadius = Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistanceRadius.get(); - @Override public int getRenderDistanceRadius() { return renderDistanceRadius; } + @Override public int getRenderDistanceRadius() { return this.renderDistanceRadius; } public boolean distantGenerationEnabled = Config.Client.Advanced.WorldGenerator.enableDistantGeneration.get(); - @Override public boolean isDistantGenerationEnabled() { return distantGenerationEnabled; } + @Override public boolean isDistantGenerationEnabled() { return this.distantGenerationEnabled; } public int fullDataRequestConcurrencyLimit = Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit.get(); - @Override public int getFullDataRequestConcurrencyLimit() { return fullDataRequestConcurrencyLimit; } + @Override public int getFullDataRequestConcurrencyLimit() { return this.fullDataRequestConcurrencyLimit; } public int genTaskPriorityRequestRateLimit = Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit.get(); - @Override public int getGenTaskPriorityRequestRateLimit() { return genTaskPriorityRequestRateLimit; } + @Override public int getGenTaskPriorityRequestRateLimit() { return this.genTaskPriorityRequestRateLimit; } public boolean realTimeUpdatesEnabled = Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get(); - @Override public boolean isRealTimeUpdatesEnabled() { return realTimeUpdatesEnabled; } + @Override public boolean isRealTimeUpdatesEnabled() { return this.realTimeUpdatesEnabled; } - public boolean postRelogUpdateEnabled = false; // Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get(); - @Override public boolean isPostRelogUpdateEnabled() { return postRelogUpdateEnabled; } + public boolean postRelogUpdateEnabled = Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get(); + @Override public boolean isPostRelogUpdateEnabled() { return this.postRelogUpdateEnabled; } + + public int postRelogUpdateConcurrencyLimit = Config.Client.Advanced.Multiplayer.ServerNetworking.postRelogUpdateConcurrencyLimit.get(); + @Override public int getPostRelogUpdateConcurrencyLimit() { return this.postRelogUpdateConcurrencyLimit; } @Override public void decode(ByteBuf in) @@ -34,17 +37,19 @@ public class MultiplayerConfig extends AbstractMultiplayerConfig this.genTaskPriorityRequestRateLimit = in.readInt(); this.realTimeUpdatesEnabled = in.readBoolean(); this.postRelogUpdateEnabled = in.readBoolean(); + this.postRelogUpdateConcurrencyLimit = in.readInt(); } @Override public String toString() { return "MultiplayerConfig{" + - "renderDistance=" + renderDistanceRadius + - ", distantGenerationEnabled=" + distantGenerationEnabled + - ", fullDataRequestConcurrencyLimit=" + fullDataRequestConcurrencyLimit + - ", genTaskPriorityRequestRateLimit=" + genTaskPriorityRequestRateLimit + - ", realTimeUpdatesEnabled=" + realTimeUpdatesEnabled + - ", postRelogUpdatesEnabled=" + postRelogUpdateEnabled + + "renderDistance=" + this.renderDistanceRadius + + ", distantGenerationEnabled=" + this.distantGenerationEnabled + + ", fullDataRequestConcurrencyLimit=" + this.fullDataRequestConcurrencyLimit + + ", genTaskPriorityRequestRateLimit=" + this.genTaskPriorityRequestRateLimit + + ", realTimeUpdatesEnabled=" + this.realTimeUpdatesEnabled + + ", postRelogUpdatesEnabled=" + this.postRelogUpdateEnabled + + ", postRelogUpdateConcurrencyLimit=" + this.postRelogUpdateConcurrencyLimit + '}'; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfigChangeListener.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfigChangeListener.java index 3a4b8ce50..76ba40c7a 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfigChangeListener.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/MultiplayerConfigChangeListener.java @@ -16,7 +16,8 @@ public class MultiplayerConfigChangeListener implements Closeable Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates, - //Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate + Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate, + Config.Client.Advanced.Multiplayer.ServerNetworking.postRelogUpdateConcurrencyLimit, }; private final ArrayList changeListeners = new ArrayList<>(); @@ -24,15 +25,19 @@ public class MultiplayerConfigChangeListener implements Closeable public MultiplayerConfigChangeListener(Runnable runnable) { for (ConfigEntry entry : CONFIG_ENTRIES) - changeListeners.add(new ConfigChangeListener(entry, ignored -> runnable.run())); + { + this.changeListeners.add(new ConfigChangeListener(entry, ignored -> runnable.run())); + } } @Override public void close() { - for (ConfigChangeListener changeListener : changeListeners) + for (ConfigChangeListener changeListener : this.changeListeners) + { changeListener.close(); - changeListeners.clear(); + } + this.changeListeners.clear(); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java index f8e1ac49b..3ab07416f 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServerPlayerState.java @@ -27,7 +27,7 @@ public class ServerPlayerState public final SupplierBasedConcurrencyLimiter fullDataRequestConcurrencyLimiter = new SupplierBasedConcurrencyLimiter<>( () -> ServerNetworking.fullDataRequestConcurrencyLimit.get(), msg -> { - msg.sendResponse(new RateLimitedException("Max concurrent full data requests: " + config.getFullDataRequestConcurrencyLimit())); + msg.sendResponse(new RateLimitedException("Max concurrent full data requests: " + this.config.getFullDataRequestConcurrencyLimit())); this.rateLimitKickTrigger.tryAcquire(null); } ); @@ -35,15 +35,22 @@ public class ServerPlayerState public final SupplierBasedRateLimiter genTaskPriorityRequestRateLimiter = new SupplierBasedRateLimiter<>( () -> ServerNetworking.genTaskPriorityRequestRateLimit.get(), msg -> { - // Shouldn't be called, but it's here just in case - msg.sendResponse(new RateLimitedException("Max section checks per second: " + config.getFullDataRequestConcurrencyLimit())); + msg.sendResponse(new RateLimitedException("Max section checks per second: " + this.config.getFullDataRequestConcurrencyLimit())); + this.rateLimitKickTrigger.tryAcquire(null); + } + ); + + public final SupplierBasedConcurrencyLimiter postRelogUpdateRequestConcurrencyLimiter = new SupplierBasedConcurrencyLimiter<>( + () -> ServerNetworking.postRelogUpdateConcurrencyLimit.get(), + msg -> { + msg.sendResponse(new RateLimitedException("Max concurrent post-relog update requests: " + this.config.getPostRelogUpdateConcurrencyLimit())); this.rateLimitKickTrigger.tryAcquire(null); } ); - public ServerPlayerState(IServerPlayerWrapper serverPlayer) { this.serverPlayer = serverPlayer; } + public ServerPlayerState(IServerPlayerWrapper serverPlayer) { this.serverPlayer = serverPlayer; } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServersideMultiplayerConfig.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServersideMultiplayerConfig.java index 03a3547a8..e16ccac79 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServersideMultiplayerConfig.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/server/ServersideMultiplayerConfig.java @@ -15,36 +15,42 @@ public class ServersideMultiplayerConfig extends AbstractMultiplayerConfig @Override public int getRenderDistanceRadius() { - return Math.min(clientConfig.renderDistanceRadius, Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistanceRadius.get()); + return Math.min(this.clientConfig.renderDistanceRadius, Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistanceRadius.get()); } @Override public boolean isDistantGenerationEnabled() { - return clientConfig.distantGenerationEnabled && Config.Client.Advanced.WorldGenerator.enableDistantGeneration.get(); + return this.clientConfig.distantGenerationEnabled && Config.Client.Advanced.WorldGenerator.enableDistantGeneration.get(); } @Override public int getFullDataRequestConcurrencyLimit() { - return Math.min(clientConfig.fullDataRequestConcurrencyLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit.get()); + return Math.min(this.clientConfig.fullDataRequestConcurrencyLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit.get()); } @Override public int getGenTaskPriorityRequestRateLimit() { - return Math.min(clientConfig.genTaskPriorityRequestRateLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit.get()); + return Math.min(this.clientConfig.genTaskPriorityRequestRateLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit.get()); } @Override public boolean isRealTimeUpdatesEnabled() { - return clientConfig.realTimeUpdatesEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get(); + return this.clientConfig.realTimeUpdatesEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get(); } @Override public boolean isPostRelogUpdateEnabled() { - return false; // clientConfig.postRelogUpdateEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get(); + return this.clientConfig.postRelogUpdateEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get(); + } + + @Override + public int getPostRelogUpdateConcurrencyLimit() + { + return Math.min(this.clientConfig.postRelogUpdateConcurrencyLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.postRelogUpdateConcurrencyLimit.get()); } @Override diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java b/core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java index 8a74d5c37..4cb494688 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/IConnection.java @@ -6,12 +6,16 @@ import com.seibel.distanthorizons.core.network.protocol.NetworkMessage; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.net.SocketAddress; import java.util.concurrent.CompletableFuture; public interface IConnection { + Logger LOGGER = LogManager.getLogger(); + ChannelHandlerContext getChannelContext(); NetworkEventSource getRequestHandler(); @@ -22,6 +26,7 @@ public interface IConnection default CompletableFuture sendMessage(NetworkMessage message) { + LOGGER.trace("Sending message: " + message); CompletableFuture future = new CompletableFuture<>(); ChannelHandlerContext ctx = this.getChannelContext(); @@ -52,7 +57,9 @@ public interface IConnection this.sendMessage(msg).whenComplete((ignored, throwable) -> { if (throwable != null) + { responseFuture.completeExceptionally(throwable); + } }); return responseFuture; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java index d62a26876..c3d0df0d3 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkServer.java @@ -43,7 +43,6 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); - // TODO move to the config private final int port; private final EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("DH-Network - Server Boss Thread")); @@ -129,7 +128,9 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable public void close() { if (!this.isClosed.compareAndSet(false, true)) + { return; + } LOGGER.info("Shutting down the network server."); this.workerGroup.shutdownGracefully().syncUninterruptibly(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceRequestMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceRequestMessage.java index b253f151a..658dce6cc 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceRequestMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceRequestMessage.java @@ -26,50 +26,56 @@ import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; import io.netty.buffer.ByteBuf; +import javax.annotation.Nullable; + public class FullDataSourceRequestMessage extends FutureTrackableNetworkMessage implements ILevelRelatedMessage { - public DhSectionPos dhSectionPos; private int levelHashCode; - @Override public int getLevelHashCode() { return levelHashCode; } - public boolean changedOnly; + + public DhSectionPos sectionPos; + + /** Only present when requesting for changes. */ + @Nullable + public Integer checksum; + + @Override + public int getLevelHashCode() { return this.levelHashCode; } public FullDataSourceRequestMessage() {} - - public FullDataSourceRequestMessage(ILevelWrapper levelWrapper, DhSectionPos dhSectionPos) + public FullDataSourceRequestMessage(ILevelWrapper levelWrapper, DhSectionPos sectionPos, @Nullable Integer checksum) { // TODO Multiverse support this.levelHashCode = levelWrapper.getDimensionType().getDimensionName().hashCode(); - this.dhSectionPos = dhSectionPos; - } - - public FullDataSourceRequestMessage(ILevelWrapper levelWrapper, DhSectionPos dhSectionPos, boolean changedOnly) - { - this(levelWrapper, dhSectionPos); - this.changedOnly = true; + this.sectionPos = sectionPos; + this.checksum = checksum; } @Override public void encode0(ByteBuf out) { - out.writeInt(levelHashCode); - dhSectionPos.encode(out); - out.writeBoolean(changedOnly); + out.writeInt(this.levelHashCode); + this.sectionPos.encode(out); + if (this.encodeOptional(out, this.checksum)) + { + out.writeInt(this.checksum); + } } @Override public void decode0(ByteBuf in) { - levelHashCode = in.readInt(); - dhSectionPos = INetworkObject.decodeStatic(DhSectionPos.zero(), in); - changedOnly = in.readBoolean(); + this.levelHashCode = in.readInt(); + this.sectionPos = INetworkObject.decodeStatic(DhSectionPos.zero(), in); + this.checksum = this.decodeOptional(in, in::readInt); } @Override public String toString() { return super.toString( - "dhSectionPos=" + dhSectionPos + - ", levelHashCode=" + levelHashCode + "dhSectionPos=" + this.sectionPos + + ", levelHashCode=" + this.levelHashCode + + ", checksum=" + this.checksum ); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceResponseMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceResponseMessage.java index 15f9c392e..52b2ef71c 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceResponseMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/generation/FullDataSourceResponseMessage.java @@ -31,20 +31,33 @@ import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataOutputStre import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; +import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; +/** + * Response message, containing the requested full data source, + * or nothing if requested in updates-only mode and the data was not updated.
+ * Decoded full data source is not cached, since it's intended for a single use. + */ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage { + // Transmitted data + @Nullable + private ByteBuf dataBuffer; + + // Used only when encoding + @Nullable private CompleteFullDataSource fullDataSource; private DhServerLevel level; + // Used only when decoding private CompleteFullDataSourceLoader fullDataSourceLoader; - public CompleteFullDataSourceLoader getFullDataSourceLoader() { return fullDataSourceLoader; } - private ByteBuf dataBuffer; + public CompleteFullDataSourceLoader getFullDataSourceLoader() { return this.fullDataSourceLoader; } + public FullDataSourceResponseMessage() {} - public FullDataSourceResponseMessage(CompleteFullDataSource fullDataSource, DhServerLevel level) + public FullDataSourceResponseMessage(@Nullable CompleteFullDataSource fullDataSource, DhServerLevel level) { this.fullDataSource = fullDataSource; this.level = level; @@ -53,41 +66,54 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage @Override public void encode0(ByteBuf out) throws IOException { - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) + if (this.encodeOptional(out, this.fullDataSource)) { - DhDataOutputStream dhOutputStream = new DhDataOutputStream(outputStream); - fullDataSource.writeToStream(dhOutputStream, level); - dhOutputStream.flush(); - - out.writeByte(fullDataSource.getDataFormatVersion()); - out.writeInt(outputStream.size()); - out.writeBytes(outputStream.toByteArray()); + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) + { + DhDataOutputStream dhOutputStream = new DhDataOutputStream(outputStream); + this.fullDataSource.writeToStream(dhOutputStream, this.level); + dhOutputStream.flush(); + + out.writeByte(this.fullDataSource.getDataFormatVersion()); + out.writeInt(outputStream.size()); + out.writeBytes(outputStream.toByteArray()); + } } } @Override public void decode0(ByteBuf in) { - byte dataVersion = in.readByte(); - this.fullDataSourceLoader = (CompleteFullDataSourceLoader) AbstractFullDataSourceLoader.getLoader(CompleteFullDataSource.DATA_TYPE_NAME, dataVersion); - this.dataBuffer = in.readBytes(in.readInt()); + this.dataBuffer = this.decodeOptional(in, () -> + { + byte dataVersion = in.readByte(); + this.fullDataSourceLoader = (CompleteFullDataSourceLoader) AbstractFullDataSourceLoader.getLoader(CompleteFullDataSource.DATA_TYPE_NAME, dataVersion); + return in.readBytes(in.readInt()); + }); } - public CompleteFullDataSource getFullDataSource(DhSectionPos pos, IDhLevel level) throws IOException, InterruptedException + @Nullable + public synchronized CompleteFullDataSource getFullDataSource(DhSectionPos pos, IDhLevel level) throws IOException, InterruptedException { - try (ByteBufInputStream inputStream = new ByteBufInputStream(dataBuffer)) + if (this.dataBuffer == null) { - return fullDataSourceLoader.loadData(pos, new DhDataInputStream(inputStream), level); + return null; + } + + try (ByteBufInputStream inputStream = new ByteBufInputStream(this.dataBuffer)) + { + return this.fullDataSourceLoader.loadData(pos, new DhDataInputStream(inputStream), level); } finally { - dataBuffer.release(); + this.dataBuffer.release(); } } - @Override public String toString() + @Override + public String toString() { - return super.toString("dataBuffer=" + dataBuffer); + return super.toString("dataBuffer=" + this.dataBuffer); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/sql/AbstractDataSourceRepo.java b/core/src/main/java/com/seibel/distanthorizons/core/sql/AbstractDataSourceRepo.java index 479da8595..05232d9f8 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/sql/AbstractDataSourceRepo.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/sql/AbstractDataSourceRepo.java @@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.sql; import com.seibel.distanthorizons.api.enums.worldGeneration.EDhApiWorldGenerationStep; import com.seibel.distanthorizons.core.pos.DhSectionPos; +import javax.annotation.Nullable; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Map; @@ -164,5 +165,22 @@ public abstract class AbstractDataSourceRepo extends AbstractDhRepo resultMap = this.queryDictionaryFirst("SELECT Checksum FROM DhFullData WHERE DhSectionPos = '" + pos.serialize() + "';"); + if (resultMap == null || resultMap.get("Checksum") == null) + { + return null; + } + else + { + return (int) resultMap.get("Checksum"); + } + } + } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/ratelimiting/SupplierBasedConcurrencyLimiter.java b/core/src/main/java/com/seibel/distanthorizons/core/util/ratelimiting/SupplierBasedConcurrencyLimiter.java index 27e2316c0..6326c1608 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/ratelimiting/SupplierBasedConcurrencyLimiter.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/ratelimiting/SupplierBasedConcurrencyLimiter.java @@ -27,6 +27,7 @@ public class SupplierBasedConcurrencyLimiter { if (this.pendingTasks.incrementAndGet() > this.maxConcurrentTasksSupplier.get()) { + this.pendingTasks.decrementAndGet(); this.onFailureConsumer.accept(context); return false; }