From 84e90a7a9b741b313fff5d9b19641d1d9ddddd53 Mon Sep 17 00:00:00 2001 From: s809 <43530948+s809@users.noreply.github.com> Date: Mon, 8 Jul 2024 23:07:58 +0500 Subject: [PATCH] Use dedicated thread pool for data compression --- .../distanthorizons/core/config/Config.java | 18 ++++ .../ThreadPresetConfigEventHandler.java | 25 ++++++ .../core/level/DhServerLevel.java | 82 ++++++++++++------- .../FullDataPartialUpdateMessage.java | 36 ++------ .../FullDataSourceResponseMessage.java | 28 ++----- .../core/util/threading/ThreadPoolUtil.java | 6 ++ 6 files changed, 120 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java index 2f72f424d..9233e59af 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java @@ -1093,6 +1093,24 @@ public class Config + "") .build(); + public static final ConfigEntry numberOfNetworkCompressionThreads = new ConfigEntry.Builder() + .setServersideShortName("numberOfNetworkCompressionThreads") + .setMinDefaultMax(1, + ThreadPresetConfigEventHandler.getNetworkCompressionDefaultThreadCount(), + Runtime.getRuntime().availableProcessors()) + .comment("" + + "How many threads should be used when building LODs? \n" + + "\n" + + "These threads run when terrain is generated, when\n" + + "certain graphics settings are changed, and when moving around the world. \n" + + "\n" + + THREAD_NOTE) + .build(); + public static final ConfigEntry runTimeRatioForNetworkCompressionThreads = new ConfigEntry.Builder() + .setServersideShortName("runTimeRatioForNetworkCompressionThreads") + .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getNetworkCompressionDefaultRunTimeRatio(), 1.0) + .comment(THREAD_RUN_TIME_RATIO_NOTE) + .build(); } public static class GpuBuffers diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java index 54d27d419..bcdbb45c9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java @@ -128,6 +128,28 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan }}); + public static int getNetworkCompressionDefaultThreadCount() { return getThreadCountByPercent(0.1); } + private final ConfigEntryWithPresetOptions networkCompressionThreadCount = new ConfigEntryWithPresetOptions<>(Config.Client.Advanced.MultiThreading.numberOfNetworkCompressionThreads, + new HashMap() + {{ + this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1); + this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultThreadCount()); + this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.2)); + this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.4)); + this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.6)); + }}); + public static double getNetworkCompressionDefaultRunTimeRatio() { return 0.25; } + private final ConfigEntryWithPresetOptions networkCompressionRunTime = new ConfigEntryWithPresetOptions<>(Config.Client.Advanced.MultiThreading.runTimeRatioForNetworkCompressionThreads, + new HashMap() + {{ + this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1); + this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultRunTimeRatio()); + this.put(EDhApiThreadPreset.BALANCED, 0.5); + this.put(EDhApiThreadPreset.AGGRESSIVE, 0.75); + this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0); + }}); + + //==============// // constructors // @@ -149,6 +171,9 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan this.configList.add(this.lodBuilderThreadCount); this.configList.add(this.lodBuilderRunTime); + this.configList.add(this.networkCompressionThreadCount); + this.configList.add(this.networkCompressionRunTime); + for (ConfigEntryWithPresetOptions config : this.configList) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index 30f19b99f..91b1402ef 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -38,6 +38,7 @@ import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.pos.DhSectionPos; import com.seibel.distanthorizons.core.util.LodUtil; +import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper; @@ -145,11 +146,17 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel return; } - this.serverside.fullDataFileHandler.getAsync(msg.sectionPos).thenAccept(fullDataSource -> + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) + { + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + return; + } + this.serverside.fullDataFileHandler.getAsync(msg.sectionPos).thenAcceptAsync(fullDataSource -> { rateLimiterSet.loginDataSyncRCLimiter.release(); msg.sendResponse(new FullDataSourceResponseMessage(fullDataSource)); - }); + }, executor); } })); @@ -241,20 +248,29 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel // This semaphore is intentionally acquired forever entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(entry.fullDataSource); - for (FullDataSourceRequestMessage msg : entry.requestMessages.values()) + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) { - this.fullDataRequests.remove(msg.futureId); - - ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer()); - if (serverPlayerState == null) - { - continue; - } - - serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release(); - msg.sendResponse(response); + LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null"); + continue; } + CompletableFuture.runAsync(() -> + { + FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(entry.fullDataSource); + for (FullDataSourceRequestMessage msg : entry.requestMessages.values()) + { + this.fullDataRequests.remove(msg.futureId); + + ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer()); + if (serverPlayerState == null) + { + continue; + } + + serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release(); + msg.sendResponse(response); + } + }, executor); } } @@ -266,22 +282,32 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel return this.getFullDataProvider().updateDataSourceAsync(data); } - FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data); - for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers()) + ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); + if (executor == null) { - if (!serverPlayerState.config.isRealTimeUpdatesEnabled()) - { - continue; - } - - Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition(); - int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16; - if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() && - distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius()) - { - serverPlayerState.session.sendMessage(updateMessage); - } + LOGGER.warn("Unable to send FullDataPartialUpdateMessage - getNetworkCompressionExecutor() is null"); + return this.getFullDataProvider().updateDataSourceAsync(data); } + CompletableFuture.runAsync(() -> + { + FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data); + for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers()) + { + if (!serverPlayerState.config.isRealTimeUpdatesEnabled()) + { + continue; + } + + Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition(); + int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16; + if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() && + distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius()) + { + serverPlayerState.session.sendMessage(updateMessage); + } + } + }, executor); + return this.getFullDataProvider().updateDataSourceAsync(data); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java index bc419b869..05b0a4a39 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataPartialUpdateMessage.java @@ -20,7 +20,6 @@ package com.seibel.distanthorizons.core.network.messages.fullData; import com.google.common.base.MoreObjects; -import com.google.common.base.Suppliers; import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; @@ -30,11 +29,8 @@ import com.seibel.distanthorizons.core.network.INetworkObject; import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; import io.netty.buffer.ByteBuf; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.jetbrains.annotations.Nullable; import java.io.IOException; -import java.util.function.Supplier; public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage { @@ -42,35 +38,18 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev @Override public String getLevelName() { return this.levelName; } - // Encode only - @Nullable - private final FullDataSourceV2 fullDataSource; - private final Supplier dataSourceDtoSupplier = Suppliers.memoize(this::createDataSourceDto); - - // Decode only - @Nullable public FullDataSourceV2DTO dataSourceDto; - public FullDataPartialUpdateMessage() { this.fullDataSource = null; } - public FullDataPartialUpdateMessage(ILevelWrapper level, @NonNull FullDataSourceV2 fullDataSource) + public FullDataPartialUpdateMessage() { } + public FullDataPartialUpdateMessage(ILevelWrapper level, FullDataSourceV2 fullDataSource) { this.levelName = level.getDimensionName(); - this.fullDataSource = fullDataSource; - } - - - @Override - public boolean warnWhenUnhandled() { return false; } - - - private FullDataSourceV2DTO createDataSourceDto() - { + try { - assert this.fullDataSource != null; EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get(); - return FullDataSourceV2DTO.CreateFromDataSource(this.fullDataSource, compressionMode); + this.dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode); } catch (IOException e) { @@ -78,11 +57,15 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev } } + + @Override + public boolean warnWhenUnhandled() { return false; } + @Override public void encode(ByteBuf out) { this.writeString(this.levelName, out); - this.dataSourceDtoSupplier.get().encode(out); + this.dataSourceDto.encode(out); } @Override @@ -98,7 +81,6 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev { return super.toStringHelper() .add("levelName", this.levelName) - .add("fullDataSource", this.fullDataSource) .add("dataSourceDto", this.dataSourceDto); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java index 04715d91f..9d8b30908 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/fullData/FullDataSourceResponseMessage.java @@ -20,7 +20,6 @@ package com.seibel.distanthorizons.core.network.messages.fullData; import com.google.common.base.MoreObjects; -import com.google.common.base.Suppliers; import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; import com.seibel.distanthorizons.core.config.Config; import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2; @@ -31,7 +30,6 @@ import io.netty.buffer.ByteBuf; import org.jetbrains.annotations.Nullable; import java.io.IOException; -import java.util.function.Supplier; /** * Response message, containing the requested full data source, @@ -39,28 +37,19 @@ import java.util.function.Supplier; */ public class FullDataSourceResponseMessage extends TrackableMessage { - // Encode only - @Nullable - private final FullDataSourceV2 fullDataSource; - private final Supplier dataSourceDtoSupplier = Suppliers.memoize(this::createDataSourceDto); - - // Decode only @Nullable public FullDataSourceV2DTO dataSourceDto; - public FullDataSourceResponseMessage() { this(null); } + public FullDataSourceResponseMessage() { } public FullDataSourceResponseMessage(@Nullable FullDataSourceV2 fullDataSource) - { - this.fullDataSource = fullDataSource; - } - - private FullDataSourceV2DTO createDataSourceDto() { try { - assert this.fullDataSource != null; - EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get(); - return FullDataSourceV2DTO.CreateFromDataSource(this.fullDataSource, compressionMode); + if (fullDataSource != null) + { + EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get(); + this.dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode); + } } catch (IOException e) { @@ -71,9 +60,9 @@ public class FullDataSourceResponseMessage extends TrackableMessage @Override public void encode0(ByteBuf out) { - if (this.writeOptional(out, this.fullDataSource)) + if (this.writeOptional(out, this.dataSourceDto)) { - this.dataSourceDtoSupplier.get().encode(out); + this.dataSourceDto.encode(out); } } @@ -88,7 +77,6 @@ public class FullDataSourceResponseMessage extends TrackableMessage public MoreObjects.ToStringHelper toStringHelper() { return super.toStringHelper() - .add("fullDataSource", this.fullDataSource) .add("dataSourceDto", this.dataSourceDto); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java index e4f786a84..63b200dfc 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java @@ -66,6 +66,11 @@ public class ThreadPoolUtil @Nullable public static ThreadPoolExecutor getCleanupExecutor() { return cleanupThreadPool; } + public static final DhThreadFactory NETWORK_COMPRESSION_THREAD_FACTORY = new DhThreadFactory("Network Compression", Thread.MIN_PRIORITY); + private static ConfigThreadPool networkCompressionThreadPool; + @Nullable + public static ThreadPoolExecutor getNetworkCompressionExecutor() { return networkCompressionThreadPool.executor; } + //======================// @@ -112,6 +117,7 @@ public class ThreadPoolUtil fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfFileHandlerThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForFileHandlerThreads, null); updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfUpdatePropagatorThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForUpdatePropagatorThreads, null); worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfWorldGenerationThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForWorldGenerationThreads, null); + networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfNetworkCompressionThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForNetworkCompressionThreads, null); bufferUploaderThreadPool = ThreadUtil.makeSingleThreadPool(BUFFER_UPLOADER_THREAD_NAME); cleanupThreadPool = ThreadUtil.makeSingleThreadPool(CLEANUP_THREAD_NAME);