Use dedicated thread pool for data compression

This commit is contained in:
s809
2024-07-08 23:07:58 +05:00
parent 48a8cdc365
commit 84e90a7a9b
6 changed files with 120 additions and 75 deletions
@@ -1093,6 +1093,24 @@ public class Config
+ "")
.build();
public static final ConfigEntry<Integer> numberOfNetworkCompressionThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfNetworkCompressionThreads")
.setMinDefaultMax(1,
ThreadPresetConfigEventHandler.getNetworkCompressionDefaultThreadCount(),
Runtime.getRuntime().availableProcessors())
.comment(""
+ "How many threads should be used when building LODs? \n"
+ "\n"
+ "These threads run when terrain is generated, when\n"
+ "certain graphics settings are changed, and when moving around the world. \n"
+ "\n"
+ THREAD_NOTE)
.build();
public static final ConfigEntry<Double> runTimeRatioForNetworkCompressionThreads = new ConfigEntry.Builder<Double>()
.setServersideShortName("runTimeRatioForNetworkCompressionThreads")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getNetworkCompressionDefaultRunTimeRatio(), 1.0)
.comment(THREAD_RUN_TIME_RATIO_NOTE)
.build();
}
public static class GpuBuffers
@@ -128,6 +128,28 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan
}});
public static int getNetworkCompressionDefaultThreadCount() { return getThreadCountByPercent(0.1); }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Integer> networkCompressionThreadCount = new ConfigEntryWithPresetOptions<>(Config.Client.Advanced.MultiThreading.numberOfNetworkCompressionThreads,
new HashMap<EDhApiThreadPreset, Integer>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultThreadCount());
this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.2));
this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.4));
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.6));
}});
public static double getNetworkCompressionDefaultRunTimeRatio() { return 0.25; }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Double> networkCompressionRunTime = new ConfigEntryWithPresetOptions<>(Config.Client.Advanced.MultiThreading.runTimeRatioForNetworkCompressionThreads,
new HashMap<EDhApiThreadPreset, Double>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultRunTimeRatio());
this.put(EDhApiThreadPreset.BALANCED, 0.5);
this.put(EDhApiThreadPreset.AGGRESSIVE, 0.75);
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0);
}});
//==============//
// constructors //
@@ -149,6 +171,9 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan
this.configList.add(this.lodBuilderThreadCount);
this.configList.add(this.lodBuilderRunTime);
this.configList.add(this.networkCompressionThreadCount);
this.configList.add(this.networkCompressionRunTime);
for (ConfigEntryWithPresetOptions<EDhApiThreadPreset, ?> config : this.configList)
{
@@ -38,6 +38,7 @@ import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
@@ -145,11 +146,17 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
return;
}
this.serverside.fullDataFileHandler.getAsync(msg.sectionPos).thenAccept(fullDataSource ->
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
return;
}
this.serverside.fullDataFileHandler.getAsync(msg.sectionPos).thenAcceptAsync(fullDataSource ->
{
rateLimiterSet.loginDataSyncRCLimiter.release();
msg.sendResponse(new FullDataSourceResponseMessage(fullDataSource));
});
}, executor);
}
}));
@@ -241,20 +248,29 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
// This semaphore is intentionally acquired forever
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(entry.fullDataSource);
for (FullDataSourceRequestMessage msg : entry.requestMessages.values())
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
this.fullDataRequests.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
{
continue;
}
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
msg.sendResponse(response);
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
continue;
}
CompletableFuture.runAsync(() ->
{
FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(entry.fullDataSource);
for (FullDataSourceRequestMessage msg : entry.requestMessages.values())
{
this.fullDataRequests.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
{
continue;
}
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
msg.sendResponse(response);
}
}, executor);
}
}
@@ -266,22 +282,32 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
return this.getFullDataProvider().updateDataSourceAsync(data);
}
FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data);
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
if (!serverPlayerState.config.isRealTimeUpdatesEnabled())
{
continue;
}
Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition();
int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
{
serverPlayerState.session.sendMessage(updateMessage);
}
LOGGER.warn("Unable to send FullDataPartialUpdateMessage - getNetworkCompressionExecutor() is null");
return this.getFullDataProvider().updateDataSourceAsync(data);
}
CompletableFuture.runAsync(() ->
{
FullDataPartialUpdateMessage updateMessage = new FullDataPartialUpdateMessage(this.serverLevelWrapper, data);
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
{
if (!serverPlayerState.config.isRealTimeUpdatesEnabled())
{
continue;
}
Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition();
int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
{
serverPlayerState.session.sendMessage(updateMessage);
}
}
}, executor);
return this.getFullDataProvider().updateDataSourceAsync(data);
}
@@ -20,7 +20,6 @@
package com.seibel.distanthorizons.core.network.messages.fullData;
import com.google.common.base.MoreObjects;
import com.google.common.base.Suppliers;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
@@ -30,11 +29,8 @@ import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import io.netty.buffer.ByteBuf;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.util.function.Supplier;
public class FullDataPartialUpdateMessage extends NetworkMessage implements ILevelRelatedMessage
{
@@ -42,35 +38,18 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
@Override
public String getLevelName() { return this.levelName; }
// Encode only
@Nullable
private final FullDataSourceV2 fullDataSource;
private final Supplier<FullDataSourceV2DTO> dataSourceDtoSupplier = Suppliers.memoize(this::createDataSourceDto);
// Decode only
@Nullable
public FullDataSourceV2DTO dataSourceDto;
public FullDataPartialUpdateMessage() { this.fullDataSource = null; }
public FullDataPartialUpdateMessage(ILevelWrapper level, @NonNull FullDataSourceV2 fullDataSource)
public FullDataPartialUpdateMessage() { }
public FullDataPartialUpdateMessage(ILevelWrapper level, FullDataSourceV2 fullDataSource)
{
this.levelName = level.getDimensionName();
this.fullDataSource = fullDataSource;
}
@Override
public boolean warnWhenUnhandled() { return false; }
private FullDataSourceV2DTO createDataSourceDto()
{
try
{
assert this.fullDataSource != null;
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
return FullDataSourceV2DTO.CreateFromDataSource(this.fullDataSource, compressionMode);
this.dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode);
}
catch (IOException e)
{
@@ -78,11 +57,15 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
}
}
@Override
public boolean warnWhenUnhandled() { return false; }
@Override
public void encode(ByteBuf out)
{
this.writeString(this.levelName, out);
this.dataSourceDtoSupplier.get().encode(out);
this.dataSourceDto.encode(out);
}
@Override
@@ -98,7 +81,6 @@ public class FullDataPartialUpdateMessage extends NetworkMessage implements ILev
{
return super.toStringHelper()
.add("levelName", this.levelName)
.add("fullDataSource", this.fullDataSource)
.add("dataSourceDto", this.dataSourceDto);
}
@@ -20,7 +20,6 @@
package com.seibel.distanthorizons.core.network.messages.fullData;
import com.google.common.base.MoreObjects;
import com.google.common.base.Suppliers;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
@@ -31,7 +30,6 @@ import io.netty.buffer.ByteBuf;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.util.function.Supplier;
/**
* Response message, containing the requested full data source,
@@ -39,28 +37,19 @@ import java.util.function.Supplier;
*/
public class FullDataSourceResponseMessage extends TrackableMessage
{
// Encode only
@Nullable
private final FullDataSourceV2 fullDataSource;
private final Supplier<FullDataSourceV2DTO> dataSourceDtoSupplier = Suppliers.memoize(this::createDataSourceDto);
// Decode only
@Nullable
public FullDataSourceV2DTO dataSourceDto;
public FullDataSourceResponseMessage() { this(null); }
public FullDataSourceResponseMessage() { }
public FullDataSourceResponseMessage(@Nullable FullDataSourceV2 fullDataSource)
{
this.fullDataSource = fullDataSource;
}
private FullDataSourceV2DTO createDataSourceDto()
{
try
{
assert this.fullDataSource != null;
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
return FullDataSourceV2DTO.CreateFromDataSource(this.fullDataSource, compressionMode);
if (fullDataSource != null)
{
EDhApiDataCompressionMode compressionMode = Config.Client.Advanced.LodBuilding.dataCompression.get();
this.dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode);
}
}
catch (IOException e)
{
@@ -71,9 +60,9 @@ public class FullDataSourceResponseMessage extends TrackableMessage
@Override
public void encode0(ByteBuf out)
{
if (this.writeOptional(out, this.fullDataSource))
if (this.writeOptional(out, this.dataSourceDto))
{
this.dataSourceDtoSupplier.get().encode(out);
this.dataSourceDto.encode(out);
}
}
@@ -88,7 +77,6 @@ public class FullDataSourceResponseMessage extends TrackableMessage
public MoreObjects.ToStringHelper toStringHelper()
{
return super.toStringHelper()
.add("fullDataSource", this.fullDataSource)
.add("dataSourceDto", this.dataSourceDto);
}
@@ -66,6 +66,11 @@ public class ThreadPoolUtil
@Nullable
public static ThreadPoolExecutor getCleanupExecutor() { return cleanupThreadPool; }
public static final DhThreadFactory NETWORK_COMPRESSION_THREAD_FACTORY = new DhThreadFactory("Network Compression", Thread.MIN_PRIORITY);
private static ConfigThreadPool networkCompressionThreadPool;
@Nullable
public static ThreadPoolExecutor getNetworkCompressionExecutor() { return networkCompressionThreadPool.executor; }
//======================//
@@ -112,6 +117,7 @@ public class ThreadPoolUtil
fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfFileHandlerThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForFileHandlerThreads, null);
updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfUpdatePropagatorThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForUpdatePropagatorThreads, null);
worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfWorldGenerationThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForWorldGenerationThreads, null);
networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY, Config.Client.Advanced.MultiThreading.numberOfNetworkCompressionThreads, Config.Client.Advanced.MultiThreading.runTimeRatioForNetworkCompressionThreads, null);
bufferUploaderThreadPool = ThreadUtil.makeSingleThreadPool(BUFFER_UPLOADER_THREAD_NAME);
cleanupThreadPool = ThreadUtil.makeSingleThreadPool(CLEANUP_THREAD_NAME);