From 6135bdf67c81f46e5c646c2fc14160aeec796213 Mon Sep 17 00:00:00 2001 From: James Seibel Date: Fri, 27 Dec 2024 08:46:32 -0600 Subject: [PATCH] Only have a single thread config This means that CPU use will be a lot more consistent since all internal thread pools share a single thread count semaphore. --- .../client/IDhApiMultiThreadingConfig.java | 34 +- .../client/DhApiMultiThreadingConfig.java | 12 +- .../core/api/internal/SharedApi.java | 2 +- .../distanthorizons/core/config/Config.java | 146 ++------- .../ThreadPresetConfigEventHandler.java | 130 +------- .../FullDataSourceProviderV2.java | 301 +++++++++--------- .../GeneratedFullDataSourceProvider.java | 2 +- .../core/generation/WorldGenerationQueue.java | 2 +- .../core/logging/f3/F3Screen.java | 8 +- .../distanthorizons/core/util/ThreadUtil.java | 13 +- .../core/util/threading/ConfigThreadPool.java | 28 +- .../util/threading/PrioritySemaphore.java | 161 ++++++++++ .../RateLimitedThreadPoolExecutor.java | 68 ++-- .../core/util/threading/ThreadPoolUtil.java | 106 +++--- .../assets/distanthorizons/lang/en_us.json | 52 +-- 15 files changed, 502 insertions(+), 563 deletions(-) create mode 100644 core/src/main/java/com/seibel/distanthorizons/core/util/threading/PrioritySemaphore.java diff --git a/api/src/main/java/com/seibel/distanthorizons/api/interfaces/config/client/IDhApiMultiThreadingConfig.java b/api/src/main/java/com/seibel/distanthorizons/api/interfaces/config/client/IDhApiMultiThreadingConfig.java index 469569e2b..9e4363162 100644 --- a/api/src/main/java/com/seibel/distanthorizons/api/interfaces/config/client/IDhApiMultiThreadingConfig.java +++ b/api/src/main/java/com/seibel/distanthorizons/api/interfaces/config/client/IDhApiMultiThreadingConfig.java @@ -26,33 +26,31 @@ import com.seibel.distanthorizons.api.interfaces.config.IDhApiConfigGroup; * Distant Horizons' threading configuration. * * @author James Seibel - * @version 2023-10-29 + * @version 2024-12-26 * @since API 1.0.0 */ public interface IDhApiMultiThreadingConfig extends IDhApiConfigGroup { /** - * Defines how many world generator threads are used to generate - * terrain outside Minecraft's vanilla render distance.
- *
- * If the number of threads is less than 1 it will be treated as a percentage - * representing how often the single thread will actively generate terrain. + * Defines how many threads Distant Horizons + * uses. + * + * @since API 4.0.0 */ - IDhApiConfigValue worldGeneratorThreads(); - - /** Defines how many file handler threads are used. */ - IDhApiConfigValue fileHandlerThreads(); + IDhApiConfigValue threadCount(); /** - * Defines how many threads are used - * to build LODs.

- * - * This includes:
- * - lighting
- * - Chunk -> LOD conversion
- * - Buffer generation
+ * Defines how many long Distant Horizons + * threads will spend running vs sleeping. + * This is helpful when reducing the CPU + * load on low end CPUs. + * 1.0 = 100% uptime + * 0.5 = 50% uptime + * 0.1 = 10% uptime + * + * @since API 4.0.0 */ - IDhApiConfigValue lodBuilderThreads(); + IDhApiConfigValue threadRuntimeRatio(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/api/external/methods/config/client/DhApiMultiThreadingConfig.java b/core/src/main/java/com/seibel/distanthorizons/core/api/external/methods/config/client/DhApiMultiThreadingConfig.java index a16921cac..e9e827633 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/api/external/methods/config/client/DhApiMultiThreadingConfig.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/api/external/methods/config/client/DhApiMultiThreadingConfig.java @@ -33,15 +33,13 @@ public class DhApiMultiThreadingConfig implements IDhApiMultiThreadingConfig @Override - public IDhApiConfigValue worldGeneratorThreads() - { return new DhApiConfigValue(Config.Common.MultiThreading.numberOfWorldGenerationThreads); } + public IDhApiConfigValue threadCount() + { return new DhApiConfigValue(Config.Common.MultiThreading.numberOfThreads); } @Override - public IDhApiConfigValue fileHandlerThreads() - { return new DhApiConfigValue(Config.Common.MultiThreading.numberOfFileHandlerThreads); } + public IDhApiConfigValue threadRuntimeRatio() + { return new DhApiConfigValue(Config.Common.MultiThreading.threadRunTimeRatio); } + - @Override - public IDhApiConfigValue lodBuilderThreads() - { return new DhApiConfigValue(Config.Common.MultiThreading.numberOfLodBuilderThreads); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java index c44143d86..a57648beb 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/api/internal/SharedApi.java @@ -283,7 +283,7 @@ public class SharedApi } UPDATE_POS_MANAGER.maxSize = MAX_UPDATING_CHUNK_COUNT_PER_THREAD - * Config.Common.MultiThreading.numberOfLodBuilderThreads.get() + * Config.Common.MultiThreading.numberOfThreads.get() * (playerCount + 1); UpdateChunkData updateData = new UpdateChunkData(chunkWrapper, neighbourChunkList, dhLevel, lightUpdateOnly); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java index c707c9988..b22fa91a4 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/Config.java @@ -1340,139 +1340,29 @@ public class Config public static class MultiThreading { - public static final String THREAD_NOTE = "" - + "Multi-threading Note: \n" - + "If the total thread count in Distant Horizon's config is more threads than your CPU has cores, \n" - + "CPU performance may suffer if Distant Horizons has a lot to load or generate. \n" - + "This can be an issue when first loading into a world, when flying, and/or when generating new terrain."; - - public static final String THREAD_RUN_TIME_RATIO_NOTE = "" - + "A value between 1.0 and 0.0 that represents the percentage \n" - + "of time each thread can run before going idle. \n" - + "\n" - + "This can be used to reduce CPU usage if the thread count \n" - + "is already set to 1 for the given option, or more finely \n" - + "tune CPU performance."; - - - public static final ConfigEntry numberOfWorldGenerationThreads = new ConfigEntry.Builder() - .setServersideShortName("numberOfWorldGenerationThreads") + public static final ConfigEntry numberOfThreads = new ConfigEntry.Builder() + .setServersideShortName("numberOfThreads") .setMinDefaultMax(1, - ThreadPresetConfigEventHandler.getWorldGenDefaultThreadCount(), + ThreadPresetConfigEventHandler.getDefaultThreadCount(), Runtime.getRuntime().availableProcessors()) .comment("" - + "How many threads should be used when generating LOD \n" - + "chunks outside the normal render distance? \n" - + "\n" - + "If you experience stuttering when generating distant LODs, \n" - + "decrease this number. \n" - + "If you want to increase LOD \n" - + "generation speed, increase this number. \n" - + "\n" - + THREAD_NOTE) - .build(); - public static final ConfigEntry runTimeRatioForWorldGenerationThreads = new ConfigEntry.Builder() - .setServersideShortName("runTimeRatioForWorldGenerationThreads") - .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getWorldGenDefaultRunTimeRatio(), 1.0) - .comment(THREAD_RUN_TIME_RATIO_NOTE) - .build(); - - public static final ConfigEntry numberOfFileHandlerThreads = new ConfigEntry.Builder() - .setServersideShortName("numberOfFileHandlerThreads") - .setMinDefaultMax(1, - ThreadPresetConfigEventHandler.getFileHandlerDefaultThreadCount(), - Runtime.getRuntime().availableProcessors()) - .comment("" - + "How many threads should be used when reading/writing LOD data to/from disk? \n" - + "\n" - + "Increasing this number will cause LODs to load in faster, \n" - + "but may cause lag when loading a new world or when \n" - + "quickly flying through existing LODs. \n" - + "\n" - + THREAD_NOTE) - .build(); - public static final ConfigEntry runTimeRatioForFileHandlerThreads = new ConfigEntry.Builder() - .setServersideShortName("runTimeRatioForFileHandlerThreads") - .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getFileHandlerDefaultRunTimeRatio(), 1.0) - .comment(THREAD_RUN_TIME_RATIO_NOTE) - .build(); - - public static final ConfigEntry numberOfUpdatePropagatorThreads = new ConfigEntry.Builder() - .setServersideShortName("numberOfUpdatePropagatorThreads") - .setMinDefaultMax(1, - ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultThreadCount(), - Runtime.getRuntime().availableProcessors()) - .comment("" - + "How many threads should be used when applying LOD updates? \n" - + "An LOD update is the operation of down-sampling a high detail LOD \n" - + "into a lower detail one.\n" - + "\n" - + "This config can have a much higher number of threads \n" - + "assigned and much lower run time ratio vs other thread pools \n" - + "because the amount of time any particular thread may run is relatively low.\n" - + "\n" - + "This is because LOD updating only only partially thread safe, \n" - + "so between 40% and 60% of the time a given thread may end up \n" - + "waiting on another thread to finish updating the same LOD it also wants\n" - + "to work on.\n" - + "\n" - + THREAD_NOTE) - .build(); - public static final ConfigEntry runTimeRatioForUpdatePropagatorThreads = new ConfigEntry.Builder() - .setServersideShortName("runTimeRatioForUpdatePropagatorThreads") - .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultRunTimeRatio(), 1.0) - .comment(THREAD_RUN_TIME_RATIO_NOTE) - .build(); - - public static final ConfigEntry numberOfNetworkCompressionThreads = new ConfigEntry.Builder() - .setServersideShortName("numberOfNetworkCompressionThreads") - .setMinDefaultMax(1, - ThreadPresetConfigEventHandler.getNetworkCompressionDefaultThreadCount(), - Runtime.getRuntime().availableProcessors()) - .comment("" - + "How many threads should be used when (de)compressing LODs \n" - + "that are received/sent over the network?\n" - + "\n" - + "This pool doesn't do anything in singleplayer or when connected \n" - + "to a server that doesn't support DH networking. \n" - + "\n" - + THREAD_NOTE) - .build(); - public static final ConfigEntry runTimeRatioForNetworkCompressionThreads = new ConfigEntry.Builder() - .setServersideShortName("runTimeRatioForNetworkCompressionThreads") - .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getNetworkCompressionDefaultRunTimeRatio(), 1.0) - .comment(THREAD_RUN_TIME_RATIO_NOTE) - .build(); - - public static final ConfigEntry numberOfLodBuilderThreads = new ConfigEntry.Builder() - .setServersideShortName("numberOfLodBuilderThreads") - .setMinDefaultMax(1, - ThreadPresetConfigEventHandler.getLodBuilderDefaultThreadCount(), - Runtime.getRuntime().availableProcessors()) - .comment("" - + "How many threads should be used when building LODs? \n" - + "\n" - + "These threads run when terrain is generated, when\n" - + "certain graphics settings are changed, and when moving around the world. \n" - + "\n" - + THREAD_NOTE) - .build(); - public static final ConfigEntry runTimeRatioForLodBuilderThreads = new ConfigEntry.Builder() - .setServersideShortName("runTimeRatioForLodBuilderThreads") - .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getLodBuilderDefaultRunTimeRatio(), 1.0) - .comment(THREAD_RUN_TIME_RATIO_NOTE) - .build(); - - public static final ConfigEntry enableLodBuilderThreadLimiting = new ConfigEntry.Builder() - .setServersideShortName("enableLodBuilderThreadLimiting") - .set(true) - .comment("" - + "Should only be disabled if deadlock occurs and LODs refuse to update. \n" - + "This will cause CPU usage to drastically increase for the Lod Builder threads. \n" - + "\n" - + "Note that if deadlock did occur restarting MC may be necessary to stop the locked threads. \n" + + "How many threads should be used by Distant Horizons? \n" + "") .build(); + public static final ConfigEntry threadRunTimeRatio = new ConfigEntry.Builder() + .setServersideShortName("threadRunTimeRatio") + .setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getDefaultRunTimeRatio(), 1.0) + .comment("" + + "A value between 1.0 and 0.0 that represents the percentage \n" + + "of time each thread can run before going idle. \n" + + "\n" + + "This can be used to reduce CPU usage if the thread count \n" + + "is already set to 1 for the given option, or more finely \n" + + "tune CPU performance. \n" + + "") + .build(); + + } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java index 842e53070..350910468 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/config/eventHandlers/presets/ThreadPresetConfigEventHandler.java @@ -40,112 +40,23 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan private static final Logger LOGGER = LogManager.getLogger(); - - public static int getWorldGenDefaultThreadCount() { return getThreadCountByPercent(0.1); } - private final ConfigEntryWithPresetOptions worldGenThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfWorldGenerationThreads, + public static int getDefaultThreadCount() { return getThreadCountByPercent(0.5); } + private final ConfigEntryWithPresetOptions threadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfThreads, new HashMap() {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getWorldGenDefaultThreadCount()); - this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.15)); - this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.25)); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.5)); + this.put(EDhApiThreadPreset.MINIMAL_IMPACT, getThreadCountByPercent(0.1)); + this.put(EDhApiThreadPreset.LOW_IMPACT, getThreadCountByPercent(0.25)); + this.put(EDhApiThreadPreset.BALANCED, getDefaultThreadCount()); + this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.75)); + this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(1.0)); }}); - public static double getWorldGenDefaultRunTimeRatio() { return 0.5; } - private final ConfigEntryWithPresetOptions worldGenRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForWorldGenerationThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getWorldGenDefaultRunTimeRatio()); - this.put(EDhApiThreadPreset.BALANCED, 0.75); - this.put(EDhApiThreadPreset.AGGRESSIVE, 1.0); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0); - }}); - - - public static int getFileHandlerDefaultThreadCount() { return getThreadCountByPercent(0.1); } - private final ConfigEntryWithPresetOptions fileHandlerThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfFileHandlerThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getFileHandlerDefaultThreadCount()); - this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.2)); - this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.2)); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.5)); - }}); - public static double getFileHandlerDefaultRunTimeRatio() { return 1.0; } - private final ConfigEntryWithPresetOptions fileHandlerRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForFileHandlerThreads, + public static double getDefaultRunTimeRatio() { return 1.0; } + private final ConfigEntryWithPresetOptions threadRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.threadRunTimeRatio, new HashMap() {{ this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.5); - this.put(EDhApiThreadPreset.LOW_IMPACT, getFileHandlerDefaultRunTimeRatio()); - this.put(EDhApiThreadPreset.BALANCED, 1.0); - this.put(EDhApiThreadPreset.AGGRESSIVE, 1.0); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0); - }}); - - - public static int getUpdatePropagatorDefaultThreadCount() { return getThreadCountByPercent(0.10); } - private final ConfigEntryWithPresetOptions UpdatePropagatorThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfUpdatePropagatorThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getUpdatePropagatorDefaultThreadCount()); - this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.25)); - this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.50)); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.75)); - }}); - public static double getUpdatePropagatorDefaultRunTimeRatio() { return 0.25; } - private final ConfigEntryWithPresetOptions UpdatePropagatorRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForUpdatePropagatorThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getUpdatePropagatorDefaultRunTimeRatio()); - this.put(EDhApiThreadPreset.BALANCED, 0.50); - this.put(EDhApiThreadPreset.AGGRESSIVE, 0.75); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0); - }}); - - - public static int getLodBuilderDefaultThreadCount() { return getThreadCountByPercent(0.1); } - private final ConfigEntryWithPresetOptions lodBuilderThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfLodBuilderThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getLodBuilderDefaultThreadCount()); - this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.2)); - this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.4)); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.6)); - }}); - public static double getLodBuilderDefaultRunTimeRatio() { return 0.25; } - private final ConfigEntryWithPresetOptions lodBuilderRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForLodBuilderThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getLodBuilderDefaultRunTimeRatio()); - this.put(EDhApiThreadPreset.BALANCED, 0.5); - this.put(EDhApiThreadPreset.AGGRESSIVE, 0.75); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0); - }}); - - - public static int getNetworkCompressionDefaultThreadCount() { return getThreadCountByPercent(0.3); } - private final ConfigEntryWithPresetOptions networkCompressionThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfNetworkCompressionThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1); - this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultThreadCount()); - this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.4)); - this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.6)); - this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.8)); - }}); - public static double getNetworkCompressionDefaultRunTimeRatio() { return 0.5; } - private final ConfigEntryWithPresetOptions networkCompressionRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForNetworkCompressionThreads, - new HashMap() - {{ - this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.25); - this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultRunTimeRatio()); - this.put(EDhApiThreadPreset.BALANCED, 0.75); + this.put(EDhApiThreadPreset.LOW_IMPACT, 1.0); + this.put(EDhApiThreadPreset.BALANCED, getDefaultRunTimeRatio()); this.put(EDhApiThreadPreset.AGGRESSIVE, 1.0); this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0); }}); @@ -160,21 +71,8 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan private ThreadPresetConfigEventHandler() { // add each config used by this preset - this.configList.add(this.worldGenThreadCount); - this.configList.add(this.worldGenRunTime); - - this.configList.add(this.fileHandlerThreadCount); - this.configList.add(this.fileHandlerRunTime); - - this.configList.add(this.UpdatePropagatorThreadCount); - this.configList.add(this.UpdatePropagatorRunTime); - - this.configList.add(this.lodBuilderThreadCount); - this.configList.add(this.lodBuilderRunTime); - - this.configList.add(this.networkCompressionThreadCount); - this.configList.add(this.networkCompressionRunTime); - + this.configList.add(this.threadCount); + this.configList.add(this.threadRunTime); for (ConfigEntryWithPresetOptions config : this.configList) { @@ -228,4 +126,6 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan @Override protected EDhApiThreadPreset getCustomPresetEnum() { return EDhApiThreadPreset.CUSTOM; } + + } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java index 82019ac98..2327fbb36 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/FullDataSourceProviderV2.java @@ -39,6 +39,7 @@ import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo; import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo; import com.seibel.distanthorizons.core.util.ThreadUtil; import com.seibel.distanthorizons.core.util.objects.DataCorruptedException; +import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper; import it.unimi.dsi.fastutil.longs.LongArrayList; @@ -67,14 +68,13 @@ public class FullDataSourceProviderV2 protected static final int NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD = 50; /** how many parent update tasks can be in the queue at once */ - protected static final int MAX_UPDATE_TASK_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfFileHandlerThreads.get(); + protected static final int MAX_UPDATE_TASK_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get(); /** indicates how long the update queue thread should wait between queuing ticks */ protected static final int UPDATE_QUEUE_THREAD_DELAY_IN_MS = 250; /** how many data sources should be pulled down for migration at once */ private static final int MIGRATION_BATCH_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD; - private static final String MIGRATION_THREAD_NAME_PREFIX = "Full Data Migration Thread: "; /** * 5 minutes
* This should be much longer than any update should take. This is just @@ -83,8 +83,8 @@ public class FullDataSourceProviderV2 private static final int MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS = 5 * 60 * 1_000; - protected final ThreadPoolExecutor migrationThreadPool; - /** + + /** * Interrupting the migration thread pool doesn't work well and may corrupt the database * vs gracefully shutting down the thread ourselves. */ @@ -132,8 +132,16 @@ public class FullDataSourceProviderV2 String levelId = level.getLevelWrapper().getDhIdentifier(); // start migrating any legacy data sources present in the background - this.migrationThreadPool = ThreadUtil.makeRateLimitedThreadPool(1, MIGRATION_THREAD_NAME_PREFIX + "["+levelId+"]", Config.Common.MultiThreading.runTimeRatioForUpdatePropagatorThreads.get(), Thread.MIN_PRIORITY, (Semaphore) null); - this.migrationThreadPool.execute(this::convertLegacyDataSources); + ThreadPoolExecutor executor = ThreadPoolUtil.getFullDataMigrationExecutor(); + if (executor != null) + { + executor.execute(this::convertLegacyDataSources); + } + else + { + // shouldn't happen, but just in case + LOGGER.error("Unable to start migration for level: ["+levelId+"] due to missing executor."); + } // update propagation doesn't need to be run on the server since only the highest detail level is needed this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue [" + levelId + "]"); @@ -339,170 +347,180 @@ public class FullDataSourceProviderV2 private void convertLegacyDataSources() { - String levelId = this.level.getLevelWrapper().getDhIdentifier(); - LOGGER.info("Attempting to migrate data sources for: ["+levelId+"]-["+this.saveDir+"]..."); - - - - //============================// - // delete unused data sources // - //============================// - - // this could be done all at once via SQL, - // but doing it in chunks prevents locking the database for long periods of time - long unusedCount = 0; - long totalDeleteCount = this.legacyFileHandler.repo.getUnusedDataSourceCount(); - if (totalDeleteCount != 0) + try { - // this should only be shown once per session but should be shown during - // either when the deletion or migration phases start - this.showMigrationStartMessage(); + String levelId = this.level.getLevelWrapper().getDhIdentifier(); + LOGGER.info("Attempting to migrate data sources for: [" + levelId + "]-[" + this.saveDir + "]..."); + this.migrationThreadRunning.set(true); - LOGGER.info("deleting [" + levelId + "] - ["+totalDeleteCount+"] unused data sources..."); - this.legacyDeletionCount = totalDeleteCount; - ArrayList unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50); - while (unusedDataPosList.size() != 0) + //============================// + // delete unused data sources // + //============================// + + // this could be done all at once via SQL, + // but doing it in chunks prevents locking the database for long periods of time + long unusedCount = 0; + long totalDeleteCount = this.legacyFileHandler.repo.getUnusedDataSourceCount(); + if (totalDeleteCount != 0) { - unusedCount += unusedDataPosList.size(); - this.legacyDeletionCount -= unusedDataPosList.size(); + // this should only be shown once per session but should be shown during + // either when the deletion or migration phases start + this.showMigrationStartMessage(); - long startTime = System.currentTimeMillis(); + LOGGER.info("deleting [" + levelId + "] - [" + totalDeleteCount + "] unused data sources..."); + this.legacyDeletionCount = totalDeleteCount; - // delete batch and get next batch - this.legacyFileHandler.repo.deleteUnusedLegacyData(unusedDataPosList); - unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50); + ArrayList unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50); + while (unusedDataPosList.size() != 0) + { + unusedCount += unusedDataPosList.size(); + this.legacyDeletionCount -= unusedDataPosList.size(); + + + long startTime = System.currentTimeMillis(); + + // delete batch and get next batch + this.legacyFileHandler.repo.deleteUnusedLegacyData(unusedDataPosList); + unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50); + + long endStart = System.currentTimeMillis(); + long deleteTime = endStart - startTime; + LOGGER.info("Deleting [" + levelId + "] - [" + unusedCount + "/" + totalDeleteCount + "] in [" + deleteTime + "]ms ..."); + + + // a slight delay is added to prevent accidentally locking the database when deleting a lot of rows + // (that shouldn't be the case since we're using WAL journaling, but just in case) + try + { + // use the delete time so we don't make powerful computers wait super long + // and weak computers wait no time at all + Thread.sleep(deleteTime / 2); + } + catch (InterruptedException ignore) + { + } + } + LOGGER.info("Done deleting [" + levelId + "] - [" + totalDeleteCount + "] unused data sources."); - long endStart = System.currentTimeMillis(); - long deleteTime = endStart - startTime; - LOGGER.info("Deleting [" + levelId + "] - [" + unusedCount + "/" + totalDeleteCount + "] in ["+deleteTime+"]ms ..."); + } + + + + //===========// + // migration // + //===========// + + long totalMigrationCount = this.legacyFileHandler.getDataSourceMigrationCount(); + this.migrationCount = totalMigrationCount; + LOGGER.info("Found [" + totalMigrationCount + "] data sources that need migration."); + + ArrayList legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT); + if (!legacyDataSourceList.isEmpty()) + { + this.showMigrationStartMessage(); - - // a slight delay is added to prevent accidentally locking the database when deleting a lot of rows - // (that shouldn't be the case since we're using WAL journaling, but just in case) try { - // use the delete time so we don't make powerful computers wait super long - // and weak computers wait no time at all - Thread.sleep(deleteTime / 2); - } - catch (InterruptedException ignore){} - } - LOGGER.info("Done deleting [" + levelId + "] - ["+totalDeleteCount+"] unused data sources."); - - } - - - - //===========// - // migration // - //===========// - - long totalMigrationCount = this.legacyFileHandler.getDataSourceMigrationCount(); - this.migrationCount = totalMigrationCount; - LOGGER.info("Found ["+totalMigrationCount+"] data sources that need migration."); - - ArrayList legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT); - if (!legacyDataSourceList.isEmpty()) - { - this.showMigrationStartMessage(); - - try - { - // keep going until every data source has been migrated - int progressCount = 0; - while (!legacyDataSourceList.isEmpty() && this.migrationThreadRunning.get()) - { - LOGGER.info("Migrating [" + levelId + "] - [" + progressCount + "/" + totalMigrationCount + "]..."); - - ArrayList> updateFutureList = new ArrayList<>(); - for (int i = 0; i < legacyDataSourceList.size() && this.migrationThreadRunning.get(); i++) + // keep going until every data source has been migrated + int progressCount = 0; + while (!legacyDataSourceList.isEmpty() && this.migrationThreadRunning.get()) { - FullDataSourceV1 legacyDataSource = legacyDataSourceList.get(i); + LOGGER.info("Migrating [" + levelId + "] - [" + progressCount + "/" + totalMigrationCount + "]..."); + + ArrayList> updateFutureList = new ArrayList<>(); + for (int i = 0; i < legacyDataSourceList.size() && this.migrationThreadRunning.get(); i++) + { + FullDataSourceV1 legacyDataSource = legacyDataSourceList.get(i); + + try + { + // convert the legacy data source to the new format, + // this is a relatively cheap operation + FullDataSourceV2 newDataSource = FullDataSourceV2.createFromLegacyDataSourceV1(legacyDataSource); + newDataSource.applyToParent = true; + + // the actual update process can be moderately expensive due to having to update + // the render data along with the full data, so running it async on the update threads gains us a good bit of speed + CompletableFuture future = this.updateDataSourceAsync(newDataSource); + updateFutureList.add(future); + future.thenRun(() -> + { + // after the update finishes the legacy data source can be safely deleted + this.legacyFileHandler.repo.deleteWithKey(legacyDataSource.getPos()); + + try + { + newDataSource.close(); + } + catch (Exception ignore) + { + } + }); + } + catch (Exception e) + { + Long migrationPos = legacyDataSource.getPos(); + LOGGER.warn("Unexpected issue migrating data source at pos " + migrationPos + ". Error: " + e.getMessage(), e); + this.legacyFileHandler.markMigrationFailed(migrationPos); + } + } + try { - // convert the legacy data source to the new format, - // this is a relatively cheap operation - FullDataSourceV2 newDataSource = FullDataSourceV2.createFromLegacyDataSourceV1(legacyDataSource); - newDataSource.applyToParent = true; - - // the actual update process can be moderately expensive due to having to update - // the render data along with the full data, so running it async on the update threads gains us a good bit of speed - CompletableFuture future = this.updateDataSourceAsync(newDataSource); - updateFutureList.add(future); - future.thenRun(() -> - { - // after the update finishes the legacy data source can be safely deleted - this.legacyFileHandler.repo.deleteWithKey(legacyDataSource.getPos()); - - try - { - newDataSource.close(); - } - catch (Exception ignore) { } - }); + // wait for each thread to finish updating + CompletableFuture combinedFutures = CompletableFuture.allOf(updateFutureList.toArray(new CompletableFuture[0])); + combinedFutures.get(MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS); } - catch (Exception e) + catch (InterruptedException | TimeoutException e) { - Long migrationPos = legacyDataSource.getPos(); - LOGGER.warn("Unexpected issue migrating data source at pos " + migrationPos + ". Error: " + e.getMessage(), e); - this.legacyFileHandler.markMigrationFailed(migrationPos); + LOGGER.warn("Migration update timed out after [" + MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS + "] milliseconds. Migration will re-try the same positions again in a moment..", e); } + catch (ExecutionException e) + { + LOGGER.warn("Migration update failed. Migration will re-try the same positions again. Error:" + e.getMessage(), e); + } + + legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT); + + progressCount += legacyDataSourceList.size(); + this.migrationCount -= legacyDataSourceList.size(); } - - - try - { - // wait for each thread to finish updating - CompletableFuture combinedFutures = CompletableFuture.allOf(updateFutureList.toArray(new CompletableFuture[0])); - combinedFutures.get(MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS); - } - catch (InterruptedException | TimeoutException e) - { - LOGGER.warn("Migration update timed out after [" + MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS + "] milliseconds. Migration will re-try the same positions again in a moment..", e); - } - catch (ExecutionException e) - { - LOGGER.warn("Migration update failed. Migration will re-try the same positions again. Error:" + e.getMessage(), e); - } - - legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT); - - progressCount += legacyDataSourceList.size(); - this.migrationCount -= legacyDataSourceList.size(); } - } - catch (Exception e) - { - LOGGER.info("migration stopped due to error for: ["+levelId+"]-["+this.saveDir+"], error: ["+e.getMessage()+"].", e); - this.showMigrationEndMessage(false); - this.migrationStoppedWithError = true; - } - finally - { - if (this.migrationThreadRunning.get()) + catch (Exception e) { - LOGGER.info("migration complete for: ["+levelId+"]-["+this.saveDir+"]."); - this.showMigrationEndMessage(true); - this.migrationCount = 0; - } - else - { - LOGGER.info("migration stopped for: ["+levelId+"]-["+this.saveDir+"]."); + LOGGER.info("migration stopped due to error for: [" + levelId + "]-[" + this.saveDir + "], error: [" + e.getMessage() + "].", e); this.showMigrationEndMessage(false); this.migrationStoppedWithError = true; } + finally + { + if (this.migrationThreadRunning.get()) + { + LOGGER.info("migration complete for: [" + levelId + "]-[" + this.saveDir + "]."); + this.showMigrationEndMessage(true); + this.migrationCount = 0; + } + else + { + LOGGER.info("migration stopped for: [" + levelId + "]-[" + this.saveDir + "]."); + this.showMigrationEndMessage(false); + this.migrationStoppedWithError = true; + } + } + } + else + { + LOGGER.info("No migration necessary."); } } - else + finally { - LOGGER.info("No migration necessary."); + this.migrationThreadRunning.set(false); } - - this.migrationThreadRunning.set(false); } public long getLegacyDeletionCount() { return this.legacyDeletionCount; } @@ -657,7 +675,6 @@ public class FullDataSourceProviderV2 this.legacyFileHandler.close(); this.migrationThreadRunning.set(false); - this.migrationThreadPool.shutdown(); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java index 6e2a77d00..91f74da6b 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataSourceProvider.java @@ -212,7 +212,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im } - int maxQueueCount = MAX_WORLD_GEN_REQUESTS_PER_THREAD * Config.Common.MultiThreading.numberOfWorldGenerationThreads.get(); + int maxQueueCount = MAX_WORLD_GEN_REQUESTS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get(); if (this.delayedFullDataSourceSaveCache.getUnsavedCount() >= maxQueueCount) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java index f0c9811f2..e236674c8 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java @@ -236,7 +236,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb return true; } - int worldGenThreadCount = Math.max(Config.Common.MultiThreading.numberOfWorldGenerationThreads.get(), 1); + int worldGenThreadCount = Math.max(Config.Common.MultiThreading.numberOfThreads.get(), 1); int maxWorldGenTaskCount = worldGenThreadCount * MAX_QUEUED_TASKS_PER_THREAD; return executor.getQueue().size() > maxWorldGenTaskCount; } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java b/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java index a9f9374d9..29fa1ca94 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/logging/f3/F3Screen.java @@ -83,7 +83,6 @@ public class F3Screen ThreadPoolExecutor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor(); ThreadPoolExecutor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor(); ThreadPoolExecutor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor(); - ThreadPoolExecutor bufferBuilderPool = ThreadPoolUtil.getBufferBuilderExecutor(); AbstractDhWorld world = SharedApi.getAbstractDhWorld(); Iterable levelIterator = world.getAllLoadedLevels(); @@ -120,7 +119,6 @@ public class F3Screen messageList.add(getThreadPoolStatString("File Handler", fileHandlerPool)); messageList.add(getThreadPoolStatString("Update Propagator", updatePool)); messageList.add(getThreadPoolStatString("LOD Builder", lodBuilderPool)); - messageList.add(getThreadPoolStatString("Buffer Builder", bufferBuilderPool)); messageList.add(""); } @@ -191,6 +189,12 @@ public class F3Screen { RateLimitedThreadPoolExecutor rateLimitedPool = ((RateLimitedThreadPoolExecutor) pool); + // active threads + int activeThreadCount = rateLimitedPool.semaphoresAcquired.get(); + int threadCount = ThreadPoolUtil.getThreadCount(); + message += ", Active: "+activeThreadCount+"/"+threadCount; + + // thread runtime String runTimeAvgStr; double runTimeAvgInMs = rateLimitedPool.getAverageRunTimeInMs(); if (!Double.isNaN(runTimeAvgInMs)) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java index eb82a916c..5b1fdc144 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/ThreadUtil.java @@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.util; import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener; import com.seibel.distanthorizons.core.config.types.ConfigEntry; import com.seibel.distanthorizons.core.util.threading.DhThreadFactory; +import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore; import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor; import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil; import com.seibel.distanthorizons.coreapi.ModInfo; @@ -53,7 +54,7 @@ public class ThreadUtil // rate limited pool // //===================// - public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, DhThreadFactory threadFactory, ConfigEntry runTimeRatioConfigEntry, Semaphore activeThreadCountSemaphore) + public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, DhThreadFactory threadFactory, ConfigEntry runTimeRatioConfigEntry, PrioritySemaphore activeThreadCountSemaphore, int priority) { // remove the old listener if one exists if (THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.containsKey(threadFactory.threadName)) @@ -70,7 +71,7 @@ public class ThreadUtil } - RateLimitedThreadPoolExecutor executor = makeRateLimitedThreadPool(poolSize, runTimeRatioConfigEntry.get(), threadFactory, activeThreadCountSemaphore); + RateLimitedThreadPoolExecutor executor = makeRateLimitedThreadPool(poolSize, runTimeRatioConfigEntry.get(), threadFactory, activeThreadCountSemaphore, priority); ConfigChangeListener changeListener = new ConfigChangeListener<>(runTimeRatioConfigEntry, (newRunTimeRatio) -> { executor.runTimeRatio = newRunTimeRatio; }); THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.put(threadFactory.threadName, changeListener); @@ -80,13 +81,13 @@ public class ThreadUtil /** should only be used if there isn't a config controlling the run time ratio of this thread pool */ - public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, String name, Double runTimeRatio, int threadPriority, Semaphore activeThreadCountSemaphore) + public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, String name, Double runTimeRatio, int threadPriority, PrioritySemaphore activeThreadCountSemaphore, int priority) { - return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, new DhThreadFactory(name, threadPriority, false), activeThreadCountSemaphore); + return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, new DhThreadFactory(name, threadPriority, false), activeThreadCountSemaphore, priority); } - public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, Double runTimeRatio, DhThreadFactory threadFactory, Semaphore activeThreadCountSemaphore) + public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, Double runTimeRatio, DhThreadFactory threadFactory, PrioritySemaphore activeThreadCountSemaphore, int priority) { - return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, threadFactory, activeThreadCountSemaphore); + return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, threadFactory, activeThreadCountSemaphore, priority); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ConfigThreadPool.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ConfigThreadPool.java index 108d41415..b26210eb3 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ConfigThreadPool.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ConfigThreadPool.java @@ -23,7 +23,8 @@ import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener; import com.seibel.distanthorizons.core.config.types.ConfigEntry; import com.seibel.distanthorizons.core.util.ThreadUtil; -import java.util.concurrent.Semaphore; +import java.util.List; +import java.util.Queue; /** * Handles thread pools with config values for their @@ -32,11 +33,12 @@ import java.util.concurrent.Semaphore; public class ConfigThreadPool { /** Caution must be used to prevent deadlock */ - private final Semaphore activeThreadCountSemaphore; + private final PrioritySemaphore threadSemaphore; + /** higher numbers run first */ + private final int priority; public RateLimitedThreadPoolExecutor executor = null; private int threadCount = 0; - public int getThreadCount() { return this.threadCount; } public final DhThreadFactory threadFactory; @@ -50,10 +52,11 @@ public class ConfigThreadPool // constructor // //=============// - public ConfigThreadPool(DhThreadFactory threadFactory, ConfigEntry threadCountConfig, ConfigEntry runTimeRatioConfig, Semaphore activeThreadCountSemaphore) + public ConfigThreadPool(DhThreadFactory threadFactory, ConfigEntry threadCountConfig, ConfigEntry runTimeRatioConfig, PrioritySemaphore threadSemaphore, int priority) { this.threadFactory = threadFactory; - this.activeThreadCountSemaphore = activeThreadCountSemaphore; + this.threadSemaphore = threadSemaphore; + this.priority = priority; this.threadCountConfig = threadCountConfig; this.threadCountConfigListener = new ConfigChangeListener<>(threadCountConfig, @@ -72,14 +75,24 @@ public class ConfigThreadPool public void setThreadPoolSize(int threadPoolSize) { + Queue incompleteRunnableQueue = null; if (this.executor != null) { // close the previous thread pool if one exists - this.executor.shutdown(); + this.executor.shutdown(); // don't do shutdown now since we don't want to throw any interrupt exceptions + incompleteRunnableQueue = this.executor.getQueue(); } this.threadCount = threadPoolSize; - this.executor = ThreadUtil.makeRateLimitedThreadPool(this.threadCount, this.threadFactory, this.runTimeRatioConfig, this.activeThreadCountSemaphore); + this.executor = ThreadUtil.makeRateLimitedThreadPool(this.threadCount, this.threadFactory, this.runTimeRatioConfig, this.threadSemaphore, this.priority); + + if (incompleteRunnableQueue != null) + { + for (Runnable runnable : incompleteRunnableQueue) + { + this.executor.execute(runnable); + } + } } /** @@ -90,7 +103,6 @@ public class ConfigThreadPool { if (this.executor != null) { - //LOGGER.info("Stopping thread pool"); this.executor.shutdownNow(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PrioritySemaphore.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PrioritySemaphore.java new file mode 100644 index 000000000..592f44ac5 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/PrioritySemaphore.java @@ -0,0 +1,161 @@ +package com.seibel.distanthorizons.core.util.threading; + +import org.jetbrains.annotations.NotNull; + +import java.util.Comparator; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.locks.ReentrantLock; + +/** + * For use with {@link RateLimitedThreadPoolExecutor} + */ +public class PrioritySemaphore +{ + public int maxPermitCount; + public int currentPermitCount; + + private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + private final ReentrantLock lock = new ReentrantLock(); + + private final Random random = new Random(); + + + + //=============// + // constructor // + //=============// + + public PrioritySemaphore(int permits) + { + this.maxPermitCount = permits; + this.currentPermitCount = this.maxPermitCount; + } + + + + //==================// + // permit acquiring // + //==================// + + /** Similar to {@link Semaphore#acquire()} */ + public void acquire(RateLimitedThreadPoolExecutor executor) throws InterruptedException + { + Thread thread = Thread.currentThread(); + this.lock.lock(); + try + { + if (this.currentPermitCount > 0) + { + // a permit is available, + // this thread can run normally + this.currentPermitCount--; + + // return to prevent running the thread's wait() method below + return; + } + } + finally + { + this.lock.unlock(); + } + + + // no permit is available + // this has to be outside the try-finally to prevent holding the lock while waiting + synchronized (thread) + { + // random value between -5 and +5 is used to prevent task starvation + // while still allowing higher priority tasks to run sooner + int priority = executor.priority + this.random.nextInt(11) - 5; + + // this thread will be run when a permit is available + thread.wait(); + this.queue.put(new ThreadWithPriority(thread,priority)); + } + } + + /** Similar to {@link Semaphore#release()} */ + public void release() + { + this.lock.lock(); + try + { + // wake up the nex thread if one is queued + if (!this.queue.isEmpty()) + { + Thread nextThread = this.queue.poll().thread; + synchronized (nextThread) + { + nextThread.notify(); + } + } + else + { + this.currentPermitCount++; + // don't increase past the max allowed (this can happen when changing the max permit count) + this.currentPermitCount = Math.min(this.currentPermitCount, this.maxPermitCount); + } + } + finally + { + this.lock.unlock(); + } + } + + + + //=================// + // permit changing // + //=================// + + public void changePermitCount(int val) + { + // find the max number of permits to increase by + int permitCountIncrease = Math.max(0, val - this.maxPermitCount); + + this.lock.lock(); + try + { + this.currentPermitCount += permitCountIncrease; + this.maxPermitCount = val; + } + finally + { + this.lock.unlock(); + } + } + + public int availablePermits() { return this.currentPermitCount; } + + + + //================// + // helper classes // + //================// + + /** simple sortable container to track a thread and it's priority */ + private static class ThreadWithPriority implements Comparable + { + private final Thread thread; + private final int priority; + + public ThreadWithPriority(Thread thread, int priority) + { + this.thread = thread; + this.priority = priority; + } + + @Override + public int compareTo(@NotNull ThreadWithPriority other) + { + // highest number first + return Integer.compare(other.priority, this.priority); + } + + } + + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java index 5d63f4ac8..0b4161d9a 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/RateLimitedThreadPoolExecutor.java @@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.util.threading; import com.seibel.distanthorizons.core.logging.DhLoggerBuilder; import com.seibel.distanthorizons.core.util.objects.RollingAverage; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.concurrent.*; @@ -31,7 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; * Can be used to more finely control CPU usage and * reduce CPU usage if only 1 thread is already assigned. */ -public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor +public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements Comparable { private static final Logger LOGGER = DhLoggerBuilder.getLogger(); /** logs include the thread name by default which can help diagnose deadlocks */ @@ -45,13 +46,17 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor /** How long it took this thread to run its last task */ private final ThreadLocal lastRunDurationNanoTimeRef = ThreadLocal.withInitial(() -> -1L); - private Runnable onTerminatedEventHandler = null; - - /** if null the thread pool will run independently of other pools */ + /** + * Does nothing if {@link RateLimitedThreadPoolExecutor#threadSemaphore} + * is null.
+ * Higher numbers have higher priority. + */ + public final int priority; + /** if null this thread pool will run independently of other pools */ @Nullable - private final Semaphore activeThreadCountSemaphore; + public final PrioritySemaphore threadSemaphore; /** will always be zero if no semaphore is present */ - private final AtomicInteger semaphoresAcquired = new AtomicInteger(0); + public final AtomicInteger semaphoresAcquired = new AtomicInteger(0); private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200); @@ -61,8 +66,9 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor // constructors // //==============// - public RateLimitedThreadPoolExecutor(int corePoolSize, double runTimeRatio, ThreadFactory threadFactory) { this(corePoolSize, runTimeRatio, threadFactory, null); } - public RateLimitedThreadPoolExecutor(int corePoolSize, double runTimeRatio, ThreadFactory threadFactory, @Nullable Semaphore activeThreadCountSemaphore) + public RateLimitedThreadPoolExecutor( + int corePoolSize, double runTimeRatio, ThreadFactory threadFactory, + @Nullable PrioritySemaphore prioritySemaphore, int priority) { super(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, @@ -70,7 +76,8 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor threadFactory); this.runTimeRatio = runTimeRatio; - this.activeThreadCountSemaphore = activeThreadCountSemaphore; + this.priority = priority; + this.threadSemaphore = prioritySemaphore; } @@ -96,17 +103,17 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor catch (InterruptedException ignored) { } } - if (this.activeThreadCountSemaphore != null) + if (this.threadSemaphore != null) { try { // Warning, this can cause deadlock if one thread calls another. - this.activeThreadCountSemaphore.acquire(); + this.threadSemaphore.acquire(this); this.semaphoresAcquired.getAndAdd(1); if (LOG_SEMAPHORE_ACTIONS) { - LOGGER.debug("acquired, available count: ["+this.activeThreadCountSemaphore.availablePermits()+"]"); + LOGGER.debug("acquired, available count: ["+this.threadSemaphore.availablePermits()+"]"); } } catch (InterruptedException ignore) { } @@ -123,14 +130,14 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor this.lastRunDurationNanoTimeRef.set(System.nanoTime() - this.runStartNanoTimeRef.get()); - if (this.activeThreadCountSemaphore != null) + if (this.threadSemaphore != null) { - this.activeThreadCountSemaphore.release(); + this.threadSemaphore.release(); this.semaphoresAcquired.getAndAdd(-1); if (LOG_SEMAPHORE_ACTIONS) { - LOGGER.debug("released, available count: ["+this.activeThreadCountSemaphore.availablePermits()+"]"); + LOGGER.debug("released, available count: ["+this.threadSemaphore.availablePermits()+"]"); } } } @@ -139,20 +146,13 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor protected void terminated() { super.terminated(); - if (this.onTerminatedEventHandler != null) - { - this.onTerminatedEventHandler.run(); - } // release all held semaphores (shouldn't normally be necessary, but just in case) - if (this.activeThreadCountSemaphore != null) + if (this.threadSemaphore != null) { - int semaphoresAcquired = this.semaphoresAcquired.getAndSet(0); - this.activeThreadCountSemaphore.release(semaphoresAcquired); - if (LOG_SEMAPHORE_ACTIONS) { - LOGGER.info("terminated, released ["+semaphoresAcquired+"], available count: ["+this.activeThreadCountSemaphore.availablePermits()+"]"); + LOGGER.info("terminated, released ["+this.semaphoresAcquired+"], available count: ["+this.threadSemaphore.availablePermits()+"]"); } } } @@ -160,13 +160,25 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor //==============// - // custom logic // + // running time // //==============// - /** only one event handler can be present at a time */ - public void setOnTerminatedEventHandler(Runnable runnable) { this.onTerminatedEventHandler = runnable; } - /** will return Nan if nothing has been submitted yet */ public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); } + + + //============// + // comparison // + //============// + + @Override + public int compareTo(@NotNull RateLimitedThreadPoolExecutor other) + { + // highest number first + return Integer.compare(other.priority, this.priority); + } + + + } \ No newline at end of file diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java index 711a923df..e90a043a3 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/threading/ThreadPoolUtil.java @@ -24,7 +24,6 @@ import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener; import com.seibel.distanthorizons.core.util.ThreadUtil; import org.jetbrains.annotations.Nullable; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; /** @@ -73,31 +72,24 @@ public class ThreadPoolUtil - //======================// - // worker threads pools // - //======================// + public static final String FULL_DATA_MIGRATION_THREAD_NAME = "Full Data Migration"; + private static ThreadPoolExecutor fullDataMigrationThreadPool; + @Nullable + public static ThreadPoolExecutor getFullDataMigrationExecutor() { return fullDataMigrationThreadPool; } - // worker thread pools are generally related with LOD building - // and all share an underlying number of threads. - // WARNING: great care should be used when setting up these threads since deadlock can occur if they are handled poorly. public static final DhThreadFactory CHUNK_TO_LOD_BUILDER_THREAD_FACTORY = new DhThreadFactory("LOD Builder - Chunk to Lod Builder", Thread.MIN_PRIORITY, false); private static ConfigThreadPool chunkToLodBuilderThreadPool; @Nullable public static ThreadPoolExecutor getChunkToLodBuilderExecutor() { return (chunkToLodBuilderThreadPool != null) ? chunkToLodBuilderThreadPool.executor : null; } - public static final DhThreadFactory BUFFER_BUILDER_THREAD_FACTORY = new DhThreadFactory("LOD Builder - Buffer Builder", Thread.MIN_PRIORITY, false); - private static ConfigThreadPool bufferBuilderThreadPool; - @Nullable - public static ThreadPoolExecutor getBufferBuilderExecutor() { return (bufferBuilderThreadPool != null) ? bufferBuilderThreadPool.executor : null; } - /** how many total worker threads can be used */ - private static int workerThreadSemaphoreCount = Config.Common.MultiThreading.numberOfLodBuilderThreads.get(); - public static int getWorkerThreadCount() { return workerThreadSemaphoreCount; } - - private static Semaphore workerThreadSemaphore = null; - private static ConfigChangeListener workerThreadSemaphoreConfigListener = null; + /** how many total threads can be used */ + private static int threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get(); + public static int getThreadCount() { return threadSemaphoreCount; } + private static PrioritySemaphore threadSemaphore = null; + private static ConfigChangeListener threadSemaphoreConfigListener = null; @@ -107,43 +99,40 @@ public class ThreadPoolUtil public static void setupThreadPools() { - // standalone threads // + // create thread semaphore + threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get(); + threadSemaphore = new PrioritySemaphore(threadSemaphoreCount); + threadSemaphoreConfigListener = new ConfigChangeListener<>(Config.Common.MultiThreading.numberOfThreads, (val) -> + { + threadSemaphore.changePermitCount(val); + threadSemaphoreCount = val; + }); - fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY, Config.Common.MultiThreading.numberOfFileHandlerThreads, Config.Common.MultiThreading.runTimeRatioForFileHandlerThreads, null); - updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY, Config.Common.MultiThreading.numberOfUpdatePropagatorThreads, Config.Common.MultiThreading.runTimeRatioForUpdatePropagatorThreads, null); - worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY, Config.Common.MultiThreading.numberOfWorldGenerationThreads, Config.Common.MultiThreading.runTimeRatioForWorldGenerationThreads, null); - networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY, Config.Common.MultiThreading.numberOfNetworkCompressionThreads, Config.Common.MultiThreading.runTimeRatioForNetworkCompressionThreads, null); + + + // thread pools + chunkToLodBuilderThreadPool = new ConfigThreadPool(CHUNK_TO_LOD_BUILDER_THREAD_FACTORY, + Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, + threadSemaphore, 3); // We want to make sure any chunk changes are found + fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY, + Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, + threadSemaphore, 2); // loading in new LODs is second highest priority + updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY, + Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, + threadSemaphore, 1); // update propagation needs to be slightly higher priority than world gen + worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY, + Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, + threadSemaphore, 0); // higher priorities mean the threads will run first + networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY, + Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio, + threadSemaphore, 0); // networking can probably have similar priority to world gen since they work similarly + + + + // single thread pools cleanupThreadPool = ThreadUtil.makeSingleThreadPool(CLEANUP_THREAD_NAME); beaconCullingThreadPool = ThreadUtil.makeSingleThreadPool(BEACON_CULLING_THREAD_NAME); - - - - // worker threads // - - // create thread semaphore - if (Config.Common.MultiThreading.enableLodBuilderThreadLimiting.get()) - { - workerThreadSemaphoreCount = Config.Common.MultiThreading.numberOfLodBuilderThreads.get(); - workerThreadSemaphore = new Semaphore(workerThreadSemaphoreCount); - - workerThreadSemaphoreConfigListener = new ConfigChangeListener<>(Config.Common.MultiThreading.numberOfLodBuilderThreads, (val) -> - { - int changePermit = val - workerThreadSemaphoreCount; - if (changePermit > 0) - { - workerThreadSemaphore.release(changePermit); - } - else - { - workerThreadSemaphore.acquireUninterruptibly(changePermit * -1); - } - workerThreadSemaphoreCount = workerThreadSemaphoreCount + changePermit; - }); - } - - // create thread pools - chunkToLodBuilderThreadPool = new ConfigThreadPool(CHUNK_TO_LOD_BUILDER_THREAD_FACTORY, Config.Common.MultiThreading.numberOfLodBuilderThreads, Config.Common.MultiThreading.runTimeRatioForLodBuilderThreads, workerThreadSemaphore); - bufferBuilderThreadPool = new ConfigThreadPool(BUFFER_BUILDER_THREAD_FACTORY, Config.Common.MultiThreading.numberOfLodBuilderThreads, Config.Common.MultiThreading.runTimeRatioForLodBuilderThreads, workerThreadSemaphore); + fullDataMigrationThreadPool = ThreadUtil.makeSingleThreadPool(FULL_DATA_MIGRATION_THREAD_NAME); } @@ -156,18 +145,15 @@ public class ThreadPoolUtil networkCompressionThreadPool.shutdownExecutorService(); cleanupThreadPool.shutdown(); beaconCullingThreadPool.shutdown(); + fullDataMigrationThreadPool.shutdown(); + chunkToLodBuilderThreadPool.shutdownExecutorService(); + threadSemaphore = null; - // worker threads - ThreadPoolUtil.chunkToLodBuilderThreadPool.shutdownExecutorService(); - ThreadPoolUtil.bufferBuilderThreadPool.shutdownExecutorService(); - - workerThreadSemaphore = null; - - if (workerThreadSemaphoreConfigListener != null) + if (threadSemaphoreConfigListener != null) { - workerThreadSemaphoreConfigListener.close(); - workerThreadSemaphoreConfigListener = null; + threadSemaphoreConfigListener.close(); + threadSemaphoreConfigListener = null; } } diff --git a/core/src/main/resources/assets/distanthorizons/lang/en_us.json b/core/src/main/resources/assets/distanthorizons/lang/en_us.json index 3d9c271b5..1c57e7379 100644 --- a/core/src/main/resources/assets/distanthorizons/lang/en_us.json +++ b/core/src/main/resources/assets/distanthorizons/lang/en_us.json @@ -609,53 +609,13 @@ "distanthorizons.config.common.multiThreading": "Multi-Threading", - "distanthorizons.config.common.multiThreading.numberOfWorldGenerationThreads": - "NO. of world generation threads", - "distanthorizons.config.common.multiThreading.numberOfWorldGenerationThreads.@tooltip": - "How many threads should be used when generating LODs \n outside the normal render distance?\n\nIf you experience stuttering when generating distant LODs, \ndecrease this number. If you want to increase LOD \ngeneration speed, increase this number.", - "distanthorizons.config.common.multiThreading.runTimeRatioForWorldGenerationThreads": - "Runtime % for world generation threads", + "distanthorizons.config.common.multiThreading.numberOfThreads": + "NO. of threads", + "distanthorizons.config.common.multiThreading.numberOfThreads.@tooltip": + "How many threads DH will use.", + "distanthorizons.config.common.multiThreading.threadRunTimeRatio": + "Runtime % for threads", - "distanthorizons.config.common.multiThreading.numberOfBufferBuilderThreads": - "NO. of buffer builder threads", - "distanthorizons.config.common.multiThreading.numberOfBufferBuilderThreads.@tooltip": - "The number of threads used when building geometry data. \nCan only be between 1 and your CPU's processor count.", - "distanthorizons.config.common.multiThreading.runTimeRatioForBufferBuilderThreads": - "Runtime % for buffer builder threads", - - "distanthorizons.config.common.multiThreading.numberOfFileHandlerThreads": - "NO. of file handler threads", - "distanthorizons.config.common.multiThreading.numberOfFileHandlerThreads.@tooltip": - "The number of threads used when reading/writing LOD data to/from the disk. \nCan only be between 1 and your CPU's processor count.", - "distanthorizons.config.common.multiThreading.runTimeRatioForFileHandlerThreads": - "Runtime % for file handler threads", - - "distanthorizons.config.common.multiThreading.numberOfUpdatePropagatorThreads": - "NO. of update propagator threads", - "distanthorizons.config.common.multiThreading.numberOfUpdatePropagatorThreads.@tooltip": - "How many threads should be used when applying LOD updates? \nAn LOD update is the operation of down-sampling a high detail LOD \ninto a lower detail one. \n\nThis config can have a much higher number of threads \nassigned and much lower run time ratio vs other thread pools \nbecause the amount of time any particular thread may run is relatively low.", - "distanthorizons.config.common.multiThreading.runTimeRatioForUpdatePropagatorThreads": - "Runtime % for update propagator threads", - - "distanthorizons.config.common.multiThreading.numberOfNetworkCompressionThreads": - "NO. of Network Compression threads", - "distanthorizons.config.common.multiThreading.numberOfNetworkCompressionThreads.@tooltip": - "How many threads should be used when (de)compressing LODs \nthat are received/sent over the network?", - "distanthorizons.config.common.multiThreading.runTimeRatioForNetworkCompressionThreads": - "Runtime % for Network Compression threads", - - "distanthorizons.config.common.multiThreading.numberOfLodBuilderThreads": - "NO. of LOD builder threads", - "distanthorizons.config.common.multiThreading.numberOfLodBuilderThreads.@tooltip": - "The number of threads used when building LODs. \nThese threads run when terrain is generated, when \ncertain graphics settings are changed, and when moving around the world.", - "distanthorizons.config.common.multiThreading.runTimeRatioForLodBuilderThreads": - "Runtime % for LOD builder threads", - - "distanthorizons.config.common.multiThreading.enableLodBuilderThreadLimiting": - "Enable LOD builder thread limiting", - "distanthorizons.config.common.multiThreading.enableLodBuilderThreadLimiting.@tooltip": - "Should only be disabled if deadlock occurs and LODs refuse to update. \nThis will cause CPU usage to drastically increase for the Lod Builder threads. \nNote that if a deadlock did occur restarting MC may be necessary to stop the locked threads.", - "distanthorizons.config.common.logging":