Only have a single thread config

This means that CPU use will be a lot more consistent since all internal thread pools share a single thread count semaphore.
This commit is contained in:
James Seibel
2024-12-27 08:46:32 -06:00
parent 81654123d8
commit 6135bdf67c
15 changed files with 502 additions and 563 deletions
@@ -26,33 +26,31 @@ import com.seibel.distanthorizons.api.interfaces.config.IDhApiConfigGroup;
* Distant Horizons' threading configuration.
*
* @author James Seibel
* @version 2023-10-29
* @version 2024-12-26
* @since API 1.0.0
*/
public interface IDhApiMultiThreadingConfig extends IDhApiConfigGroup
{
/**
* Defines how many world generator threads are used to generate
* terrain outside Minecraft's vanilla render distance. <br>
* <br>
* If the number of threads is less than 1 it will be treated as a percentage
* representing how often the single thread will actively generate terrain.
* Defines how many threads Distant Horizons
* uses.
*
* @since API 4.0.0
*/
IDhApiConfigValue<Integer> worldGeneratorThreads();
/** Defines how many file handler threads are used. */
IDhApiConfigValue<Integer> fileHandlerThreads();
IDhApiConfigValue<Integer> threadCount();
/**
* Defines how many threads are used
* to build LODs. <br><br>
*
* This includes: <br>
* - lighting <br>
* - Chunk -> LOD conversion <br>
* - Buffer generation <br>
* Defines how many long Distant Horizons
* threads will spend running vs sleeping.
* This is helpful when reducing the CPU
* load on low end CPUs.
* 1.0 = 100% uptime
* 0.5 = 50% uptime
* 0.1 = 10% uptime
*
* @since API 4.0.0
*/
IDhApiConfigValue<Integer> lodBuilderThreads();
IDhApiConfigValue<Double> threadRuntimeRatio();
}
@@ -33,15 +33,13 @@ public class DhApiMultiThreadingConfig implements IDhApiMultiThreadingConfig
@Override
public IDhApiConfigValue<Integer> worldGeneratorThreads()
{ return new DhApiConfigValue<Integer, Integer>(Config.Common.MultiThreading.numberOfWorldGenerationThreads); }
public IDhApiConfigValue<Integer> threadCount()
{ return new DhApiConfigValue<Integer, Integer>(Config.Common.MultiThreading.numberOfThreads); }
@Override
public IDhApiConfigValue<Integer> fileHandlerThreads()
{ return new DhApiConfigValue<Integer, Integer>(Config.Common.MultiThreading.numberOfFileHandlerThreads); }
public IDhApiConfigValue<Double> threadRuntimeRatio()
{ return new DhApiConfigValue<Double, Double>(Config.Common.MultiThreading.threadRunTimeRatio); }
@Override
public IDhApiConfigValue<Integer> lodBuilderThreads()
{ return new DhApiConfigValue<Integer, Integer>(Config.Common.MultiThreading.numberOfLodBuilderThreads); }
}
@@ -283,7 +283,7 @@ public class SharedApi
}
UPDATE_POS_MANAGER.maxSize = MAX_UPDATING_CHUNK_COUNT_PER_THREAD
* Config.Common.MultiThreading.numberOfLodBuilderThreads.get()
* Config.Common.MultiThreading.numberOfThreads.get()
* (playerCount + 1);
UpdateChunkData updateData = new UpdateChunkData(chunkWrapper, neighbourChunkList, dhLevel, lightUpdateOnly);
@@ -1340,139 +1340,29 @@ public class Config
public static class MultiThreading
{
public static final String THREAD_NOTE = ""
+ "Multi-threading Note: \n"
+ "If the total thread count in Distant Horizon's config is more threads than your CPU has cores, \n"
+ "CPU performance may suffer if Distant Horizons has a lot to load or generate. \n"
+ "This can be an issue when first loading into a world, when flying, and/or when generating new terrain.";
public static final String THREAD_RUN_TIME_RATIO_NOTE = ""
+ "A value between 1.0 and 0.0 that represents the percentage \n"
+ "of time each thread can run before going idle. \n"
+ "\n"
+ "This can be used to reduce CPU usage if the thread count \n"
+ "is already set to 1 for the given option, or more finely \n"
+ "tune CPU performance.";
public static final ConfigEntry<Integer> numberOfWorldGenerationThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfWorldGenerationThreads")
public static final ConfigEntry<Integer> numberOfThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfThreads")
.setMinDefaultMax(1,
ThreadPresetConfigEventHandler.getWorldGenDefaultThreadCount(),
ThreadPresetConfigEventHandler.getDefaultThreadCount(),
Runtime.getRuntime().availableProcessors())
.comment(""
+ "How many threads should be used when generating LOD \n"
+ "chunks outside the normal render distance? \n"
+ "\n"
+ "If you experience stuttering when generating distant LODs, \n"
+ "decrease this number. \n"
+ "If you want to increase LOD \n"
+ "generation speed, increase this number. \n"
+ "\n"
+ THREAD_NOTE)
.build();
public static final ConfigEntry<Double> runTimeRatioForWorldGenerationThreads = new ConfigEntry.Builder<Double>()
.setServersideShortName("runTimeRatioForWorldGenerationThreads")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getWorldGenDefaultRunTimeRatio(), 1.0)
.comment(THREAD_RUN_TIME_RATIO_NOTE)
.build();
public static final ConfigEntry<Integer> numberOfFileHandlerThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfFileHandlerThreads")
.setMinDefaultMax(1,
ThreadPresetConfigEventHandler.getFileHandlerDefaultThreadCount(),
Runtime.getRuntime().availableProcessors())
.comment(""
+ "How many threads should be used when reading/writing LOD data to/from disk? \n"
+ "\n"
+ "Increasing this number will cause LODs to load in faster, \n"
+ "but may cause lag when loading a new world or when \n"
+ "quickly flying through existing LODs. \n"
+ "\n"
+ THREAD_NOTE)
.build();
public static final ConfigEntry<Double> runTimeRatioForFileHandlerThreads = new ConfigEntry.Builder<Double>()
.setServersideShortName("runTimeRatioForFileHandlerThreads")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getFileHandlerDefaultRunTimeRatio(), 1.0)
.comment(THREAD_RUN_TIME_RATIO_NOTE)
.build();
public static final ConfigEntry<Integer> numberOfUpdatePropagatorThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfUpdatePropagatorThreads")
.setMinDefaultMax(1,
ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultThreadCount(),
Runtime.getRuntime().availableProcessors())
.comment(""
+ "How many threads should be used when applying LOD updates? \n"
+ "An LOD update is the operation of down-sampling a high detail LOD \n"
+ "into a lower detail one.\n"
+ "\n"
+ "This config can have a much higher number of threads \n"
+ "assigned and much lower run time ratio vs other thread pools \n"
+ "because the amount of time any particular thread may run is relatively low.\n"
+ "\n"
+ "This is because LOD updating only only partially thread safe, \n"
+ "so between 40% and 60% of the time a given thread may end up \n"
+ "waiting on another thread to finish updating the same LOD it also wants\n"
+ "to work on.\n"
+ "\n"
+ THREAD_NOTE)
.build();
public static final ConfigEntry<Double> runTimeRatioForUpdatePropagatorThreads = new ConfigEntry.Builder<Double>()
.setServersideShortName("runTimeRatioForUpdatePropagatorThreads")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultRunTimeRatio(), 1.0)
.comment(THREAD_RUN_TIME_RATIO_NOTE)
.build();
public static final ConfigEntry<Integer> numberOfNetworkCompressionThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfNetworkCompressionThreads")
.setMinDefaultMax(1,
ThreadPresetConfigEventHandler.getNetworkCompressionDefaultThreadCount(),
Runtime.getRuntime().availableProcessors())
.comment(""
+ "How many threads should be used when (de)compressing LODs \n"
+ "that are received/sent over the network?\n"
+ "\n"
+ "This pool doesn't do anything in singleplayer or when connected \n"
+ "to a server that doesn't support DH networking. \n"
+ "\n"
+ THREAD_NOTE)
.build();
public static final ConfigEntry<Double> runTimeRatioForNetworkCompressionThreads = new ConfigEntry.Builder<Double>()
.setServersideShortName("runTimeRatioForNetworkCompressionThreads")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getNetworkCompressionDefaultRunTimeRatio(), 1.0)
.comment(THREAD_RUN_TIME_RATIO_NOTE)
.build();
public static final ConfigEntry<Integer> numberOfLodBuilderThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfLodBuilderThreads")
.setMinDefaultMax(1,
ThreadPresetConfigEventHandler.getLodBuilderDefaultThreadCount(),
Runtime.getRuntime().availableProcessors())
.comment(""
+ "How many threads should be used when building LODs? \n"
+ "\n"
+ "These threads run when terrain is generated, when\n"
+ "certain graphics settings are changed, and when moving around the world. \n"
+ "\n"
+ THREAD_NOTE)
.build();
public static final ConfigEntry<Double> runTimeRatioForLodBuilderThreads = new ConfigEntry.Builder<Double>()
.setServersideShortName("runTimeRatioForLodBuilderThreads")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getLodBuilderDefaultRunTimeRatio(), 1.0)
.comment(THREAD_RUN_TIME_RATIO_NOTE)
.build();
public static final ConfigEntry<Boolean> enableLodBuilderThreadLimiting = new ConfigEntry.Builder<Boolean>()
.setServersideShortName("enableLodBuilderThreadLimiting")
.set(true)
.comment(""
+ "Should only be disabled if deadlock occurs and LODs refuse to update. \n"
+ "This will cause CPU usage to drastically increase for the Lod Builder threads. \n"
+ "\n"
+ "Note that if deadlock did occur restarting MC may be necessary to stop the locked threads. \n"
+ "How many threads should be used by Distant Horizons? \n"
+ "")
.build();
public static final ConfigEntry<Double> threadRunTimeRatio = new ConfigEntry.Builder<Double>()
.setServersideShortName("threadRunTimeRatio")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getDefaultRunTimeRatio(), 1.0)
.comment(""
+ "A value between 1.0 and 0.0 that represents the percentage \n"
+ "of time each thread can run before going idle. \n"
+ "\n"
+ "This can be used to reduce CPU usage if the thread count \n"
+ "is already set to 1 for the given option, or more finely \n"
+ "tune CPU performance. \n" +
"")
.build();
}
@@ -40,112 +40,23 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan
private static final Logger LOGGER = LogManager.getLogger();
public static int getWorldGenDefaultThreadCount() { return getThreadCountByPercent(0.1); }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Integer> worldGenThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfWorldGenerationThreads,
public static int getDefaultThreadCount() { return getThreadCountByPercent(0.5); }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Integer> threadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfThreads,
new HashMap<EDhApiThreadPreset, Integer>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getWorldGenDefaultThreadCount());
this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.15));
this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.25));
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.5));
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, getThreadCountByPercent(0.1));
this.put(EDhApiThreadPreset.LOW_IMPACT, getThreadCountByPercent(0.25));
this.put(EDhApiThreadPreset.BALANCED, getDefaultThreadCount());
this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.75));
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(1.0));
}});
public static double getWorldGenDefaultRunTimeRatio() { return 0.5; }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Double> worldGenRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForWorldGenerationThreads,
new HashMap<EDhApiThreadPreset, Double>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getWorldGenDefaultRunTimeRatio());
this.put(EDhApiThreadPreset.BALANCED, 0.75);
this.put(EDhApiThreadPreset.AGGRESSIVE, 1.0);
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0);
}});
public static int getFileHandlerDefaultThreadCount() { return getThreadCountByPercent(0.1); }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Integer> fileHandlerThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfFileHandlerThreads,
new HashMap<EDhApiThreadPreset, Integer>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getFileHandlerDefaultThreadCount());
this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.2));
this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.2));
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.5));
}});
public static double getFileHandlerDefaultRunTimeRatio() { return 1.0; }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Double> fileHandlerRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForFileHandlerThreads,
public static double getDefaultRunTimeRatio() { return 1.0; }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Double> threadRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.threadRunTimeRatio,
new HashMap<EDhApiThreadPreset, Double>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.5);
this.put(EDhApiThreadPreset.LOW_IMPACT, getFileHandlerDefaultRunTimeRatio());
this.put(EDhApiThreadPreset.BALANCED, 1.0);
this.put(EDhApiThreadPreset.AGGRESSIVE, 1.0);
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0);
}});
public static int getUpdatePropagatorDefaultThreadCount() { return getThreadCountByPercent(0.10); }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Integer> UpdatePropagatorThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfUpdatePropagatorThreads,
new HashMap<EDhApiThreadPreset, Integer>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getUpdatePropagatorDefaultThreadCount());
this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.25));
this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.50));
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.75));
}});
public static double getUpdatePropagatorDefaultRunTimeRatio() { return 0.25; }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Double> UpdatePropagatorRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForUpdatePropagatorThreads,
new HashMap<EDhApiThreadPreset, Double>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getUpdatePropagatorDefaultRunTimeRatio());
this.put(EDhApiThreadPreset.BALANCED, 0.50);
this.put(EDhApiThreadPreset.AGGRESSIVE, 0.75);
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0);
}});
public static int getLodBuilderDefaultThreadCount() { return getThreadCountByPercent(0.1); }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Integer> lodBuilderThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfLodBuilderThreads,
new HashMap<EDhApiThreadPreset, Integer>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getLodBuilderDefaultThreadCount());
this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.2));
this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.4));
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.6));
}});
public static double getLodBuilderDefaultRunTimeRatio() { return 0.25; }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Double> lodBuilderRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForLodBuilderThreads,
new HashMap<EDhApiThreadPreset, Double>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getLodBuilderDefaultRunTimeRatio());
this.put(EDhApiThreadPreset.BALANCED, 0.5);
this.put(EDhApiThreadPreset.AGGRESSIVE, 0.75);
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0);
}});
public static int getNetworkCompressionDefaultThreadCount() { return getThreadCountByPercent(0.3); }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Integer> networkCompressionThreadCount = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.numberOfNetworkCompressionThreads,
new HashMap<EDhApiThreadPreset, Integer>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 1);
this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultThreadCount());
this.put(EDhApiThreadPreset.BALANCED, getThreadCountByPercent(0.4));
this.put(EDhApiThreadPreset.AGGRESSIVE, getThreadCountByPercent(0.6));
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, getThreadCountByPercent(0.8));
}});
public static double getNetworkCompressionDefaultRunTimeRatio() { return 0.5; }
private final ConfigEntryWithPresetOptions<EDhApiThreadPreset, Double> networkCompressionRunTime = new ConfigEntryWithPresetOptions<>(Config.Common.MultiThreading.runTimeRatioForNetworkCompressionThreads,
new HashMap<EDhApiThreadPreset, Double>()
{{
this.put(EDhApiThreadPreset.MINIMAL_IMPACT, 0.25);
this.put(EDhApiThreadPreset.LOW_IMPACT, getNetworkCompressionDefaultRunTimeRatio());
this.put(EDhApiThreadPreset.BALANCED, 0.75);
this.put(EDhApiThreadPreset.LOW_IMPACT, 1.0);
this.put(EDhApiThreadPreset.BALANCED, getDefaultRunTimeRatio());
this.put(EDhApiThreadPreset.AGGRESSIVE, 1.0);
this.put(EDhApiThreadPreset.I_PAID_FOR_THE_WHOLE_CPU, 1.0);
}});
@@ -160,21 +71,8 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan
private ThreadPresetConfigEventHandler()
{
// add each config used by this preset
this.configList.add(this.worldGenThreadCount);
this.configList.add(this.worldGenRunTime);
this.configList.add(this.fileHandlerThreadCount);
this.configList.add(this.fileHandlerRunTime);
this.configList.add(this.UpdatePropagatorThreadCount);
this.configList.add(this.UpdatePropagatorRunTime);
this.configList.add(this.lodBuilderThreadCount);
this.configList.add(this.lodBuilderRunTime);
this.configList.add(this.networkCompressionThreadCount);
this.configList.add(this.networkCompressionRunTime);
this.configList.add(this.threadCount);
this.configList.add(this.threadRunTime);
for (ConfigEntryWithPresetOptions<EDhApiThreadPreset, ?> config : this.configList)
{
@@ -228,4 +126,6 @@ public class ThreadPresetConfigEventHandler extends AbstractPresetConfigEventHan
@Override
protected EDhApiThreadPreset getCustomPresetEnum() { return EDhApiThreadPreset.CUSTOM; }
}
@@ -39,6 +39,7 @@ import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import it.unimi.dsi.fastutil.longs.LongArrayList;
@@ -67,14 +68,13 @@ public class FullDataSourceProviderV2
protected static final int NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD = 50;
/** how many parent update tasks can be in the queue at once */
protected static final int MAX_UPDATE_TASK_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfFileHandlerThreads.get();
protected static final int MAX_UPDATE_TASK_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get();
/** indicates how long the update queue thread should wait between queuing ticks */
protected static final int UPDATE_QUEUE_THREAD_DELAY_IN_MS = 250;
/** how many data sources should be pulled down for migration at once */
private static final int MIGRATION_BATCH_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD;
private static final String MIGRATION_THREAD_NAME_PREFIX = "Full Data Migration Thread: ";
/**
* 5 minutes <br>
* This should be much longer than any update should take. This is just
@@ -83,8 +83,8 @@ public class FullDataSourceProviderV2
private static final int MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS = 5 * 60 * 1_000;
protected final ThreadPoolExecutor migrationThreadPool;
/**
/**
* Interrupting the migration thread pool doesn't work well and may corrupt the database
* vs gracefully shutting down the thread ourselves.
*/
@@ -132,8 +132,16 @@ public class FullDataSourceProviderV2
String levelId = level.getLevelWrapper().getDhIdentifier();
// start migrating any legacy data sources present in the background
this.migrationThreadPool = ThreadUtil.makeRateLimitedThreadPool(1, MIGRATION_THREAD_NAME_PREFIX + "["+levelId+"]", Config.Common.MultiThreading.runTimeRatioForUpdatePropagatorThreads.get(), Thread.MIN_PRIORITY, (Semaphore) null);
this.migrationThreadPool.execute(this::convertLegacyDataSources);
ThreadPoolExecutor executor = ThreadPoolUtil.getFullDataMigrationExecutor();
if (executor != null)
{
executor.execute(this::convertLegacyDataSources);
}
else
{
// shouldn't happen, but just in case
LOGGER.error("Unable to start migration for level: ["+levelId+"] due to missing executor.");
}
// update propagation doesn't need to be run on the server since only the highest detail level is needed
this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue [" + levelId + "]");
@@ -339,170 +347,180 @@ public class FullDataSourceProviderV2
private void convertLegacyDataSources()
{
String levelId = this.level.getLevelWrapper().getDhIdentifier();
LOGGER.info("Attempting to migrate data sources for: ["+levelId+"]-["+this.saveDir+"]...");
//============================//
// delete unused data sources //
//============================//
// this could be done all at once via SQL,
// but doing it in chunks prevents locking the database for long periods of time
long unusedCount = 0;
long totalDeleteCount = this.legacyFileHandler.repo.getUnusedDataSourceCount();
if (totalDeleteCount != 0)
try
{
// this should only be shown once per session but should be shown during
// either when the deletion or migration phases start
this.showMigrationStartMessage();
String levelId = this.level.getLevelWrapper().getDhIdentifier();
LOGGER.info("Attempting to migrate data sources for: [" + levelId + "]-[" + this.saveDir + "]...");
this.migrationThreadRunning.set(true);
LOGGER.info("deleting [" + levelId + "] - ["+totalDeleteCount+"] unused data sources...");
this.legacyDeletionCount = totalDeleteCount;
ArrayList<String> unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50);
while (unusedDataPosList.size() != 0)
//============================//
// delete unused data sources //
//============================//
// this could be done all at once via SQL,
// but doing it in chunks prevents locking the database for long periods of time
long unusedCount = 0;
long totalDeleteCount = this.legacyFileHandler.repo.getUnusedDataSourceCount();
if (totalDeleteCount != 0)
{
unusedCount += unusedDataPosList.size();
this.legacyDeletionCount -= unusedDataPosList.size();
// this should only be shown once per session but should be shown during
// either when the deletion or migration phases start
this.showMigrationStartMessage();
long startTime = System.currentTimeMillis();
LOGGER.info("deleting [" + levelId + "] - [" + totalDeleteCount + "] unused data sources...");
this.legacyDeletionCount = totalDeleteCount;
// delete batch and get next batch
this.legacyFileHandler.repo.deleteUnusedLegacyData(unusedDataPosList);
unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50);
ArrayList<String> unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50);
while (unusedDataPosList.size() != 0)
{
unusedCount += unusedDataPosList.size();
this.legacyDeletionCount -= unusedDataPosList.size();
long startTime = System.currentTimeMillis();
// delete batch and get next batch
this.legacyFileHandler.repo.deleteUnusedLegacyData(unusedDataPosList);
unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50);
long endStart = System.currentTimeMillis();
long deleteTime = endStart - startTime;
LOGGER.info("Deleting [" + levelId + "] - [" + unusedCount + "/" + totalDeleteCount + "] in [" + deleteTime + "]ms ...");
// a slight delay is added to prevent accidentally locking the database when deleting a lot of rows
// (that shouldn't be the case since we're using WAL journaling, but just in case)
try
{
// use the delete time so we don't make powerful computers wait super long
// and weak computers wait no time at all
Thread.sleep(deleteTime / 2);
}
catch (InterruptedException ignore)
{
}
}
LOGGER.info("Done deleting [" + levelId + "] - [" + totalDeleteCount + "] unused data sources.");
long endStart = System.currentTimeMillis();
long deleteTime = endStart - startTime;
LOGGER.info("Deleting [" + levelId + "] - [" + unusedCount + "/" + totalDeleteCount + "] in ["+deleteTime+"]ms ...");
}
//===========//
// migration //
//===========//
long totalMigrationCount = this.legacyFileHandler.getDataSourceMigrationCount();
this.migrationCount = totalMigrationCount;
LOGGER.info("Found [" + totalMigrationCount + "] data sources that need migration.");
ArrayList<FullDataSourceV1> legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
if (!legacyDataSourceList.isEmpty())
{
this.showMigrationStartMessage();
// a slight delay is added to prevent accidentally locking the database when deleting a lot of rows
// (that shouldn't be the case since we're using WAL journaling, but just in case)
try
{
// use the delete time so we don't make powerful computers wait super long
// and weak computers wait no time at all
Thread.sleep(deleteTime / 2);
}
catch (InterruptedException ignore){}
}
LOGGER.info("Done deleting [" + levelId + "] - ["+totalDeleteCount+"] unused data sources.");
}
//===========//
// migration //
//===========//
long totalMigrationCount = this.legacyFileHandler.getDataSourceMigrationCount();
this.migrationCount = totalMigrationCount;
LOGGER.info("Found ["+totalMigrationCount+"] data sources that need migration.");
ArrayList<FullDataSourceV1> legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
if (!legacyDataSourceList.isEmpty())
{
this.showMigrationStartMessage();
try
{
// keep going until every data source has been migrated
int progressCount = 0;
while (!legacyDataSourceList.isEmpty() && this.migrationThreadRunning.get())
{
LOGGER.info("Migrating [" + levelId + "] - [" + progressCount + "/" + totalMigrationCount + "]...");
ArrayList<CompletableFuture<Void>> updateFutureList = new ArrayList<>();
for (int i = 0; i < legacyDataSourceList.size() && this.migrationThreadRunning.get(); i++)
// keep going until every data source has been migrated
int progressCount = 0;
while (!legacyDataSourceList.isEmpty() && this.migrationThreadRunning.get())
{
FullDataSourceV1 legacyDataSource = legacyDataSourceList.get(i);
LOGGER.info("Migrating [" + levelId + "] - [" + progressCount + "/" + totalMigrationCount + "]...");
ArrayList<CompletableFuture<Void>> updateFutureList = new ArrayList<>();
for (int i = 0; i < legacyDataSourceList.size() && this.migrationThreadRunning.get(); i++)
{
FullDataSourceV1 legacyDataSource = legacyDataSourceList.get(i);
try
{
// convert the legacy data source to the new format,
// this is a relatively cheap operation
FullDataSourceV2 newDataSource = FullDataSourceV2.createFromLegacyDataSourceV1(legacyDataSource);
newDataSource.applyToParent = true;
// the actual update process can be moderately expensive due to having to update
// the render data along with the full data, so running it async on the update threads gains us a good bit of speed
CompletableFuture<Void> future = this.updateDataSourceAsync(newDataSource);
updateFutureList.add(future);
future.thenRun(() ->
{
// after the update finishes the legacy data source can be safely deleted
this.legacyFileHandler.repo.deleteWithKey(legacyDataSource.getPos());
try
{
newDataSource.close();
}
catch (Exception ignore)
{
}
});
}
catch (Exception e)
{
Long migrationPos = legacyDataSource.getPos();
LOGGER.warn("Unexpected issue migrating data source at pos " + migrationPos + ". Error: " + e.getMessage(), e);
this.legacyFileHandler.markMigrationFailed(migrationPos);
}
}
try
{
// convert the legacy data source to the new format,
// this is a relatively cheap operation
FullDataSourceV2 newDataSource = FullDataSourceV2.createFromLegacyDataSourceV1(legacyDataSource);
newDataSource.applyToParent = true;
// the actual update process can be moderately expensive due to having to update
// the render data along with the full data, so running it async on the update threads gains us a good bit of speed
CompletableFuture<Void> future = this.updateDataSourceAsync(newDataSource);
updateFutureList.add(future);
future.thenRun(() ->
{
// after the update finishes the legacy data source can be safely deleted
this.legacyFileHandler.repo.deleteWithKey(legacyDataSource.getPos());
try
{
newDataSource.close();
}
catch (Exception ignore) { }
});
// wait for each thread to finish updating
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(updateFutureList.toArray(new CompletableFuture[0]));
combinedFutures.get(MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
}
catch (Exception e)
catch (InterruptedException | TimeoutException e)
{
Long migrationPos = legacyDataSource.getPos();
LOGGER.warn("Unexpected issue migrating data source at pos " + migrationPos + ". Error: " + e.getMessage(), e);
this.legacyFileHandler.markMigrationFailed(migrationPos);
LOGGER.warn("Migration update timed out after [" + MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS + "] milliseconds. Migration will re-try the same positions again in a moment..", e);
}
catch (ExecutionException e)
{
LOGGER.warn("Migration update failed. Migration will re-try the same positions again. Error:" + e.getMessage(), e);
}
legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
progressCount += legacyDataSourceList.size();
this.migrationCount -= legacyDataSourceList.size();
}
try
{
// wait for each thread to finish updating
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(updateFutureList.toArray(new CompletableFuture[0]));
combinedFutures.get(MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | TimeoutException e)
{
LOGGER.warn("Migration update timed out after [" + MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS + "] milliseconds. Migration will re-try the same positions again in a moment..", e);
}
catch (ExecutionException e)
{
LOGGER.warn("Migration update failed. Migration will re-try the same positions again. Error:" + e.getMessage(), e);
}
legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
progressCount += legacyDataSourceList.size();
this.migrationCount -= legacyDataSourceList.size();
}
}
catch (Exception e)
{
LOGGER.info("migration stopped due to error for: ["+levelId+"]-["+this.saveDir+"], error: ["+e.getMessage()+"].", e);
this.showMigrationEndMessage(false);
this.migrationStoppedWithError = true;
}
finally
{
if (this.migrationThreadRunning.get())
catch (Exception e)
{
LOGGER.info("migration complete for: ["+levelId+"]-["+this.saveDir+"].");
this.showMigrationEndMessage(true);
this.migrationCount = 0;
}
else
{
LOGGER.info("migration stopped for: ["+levelId+"]-["+this.saveDir+"].");
LOGGER.info("migration stopped due to error for: [" + levelId + "]-[" + this.saveDir + "], error: [" + e.getMessage() + "].", e);
this.showMigrationEndMessage(false);
this.migrationStoppedWithError = true;
}
finally
{
if (this.migrationThreadRunning.get())
{
LOGGER.info("migration complete for: [" + levelId + "]-[" + this.saveDir + "].");
this.showMigrationEndMessage(true);
this.migrationCount = 0;
}
else
{
LOGGER.info("migration stopped for: [" + levelId + "]-[" + this.saveDir + "].");
this.showMigrationEndMessage(false);
this.migrationStoppedWithError = true;
}
}
}
else
{
LOGGER.info("No migration necessary.");
}
}
else
finally
{
LOGGER.info("No migration necessary.");
this.migrationThreadRunning.set(false);
}
this.migrationThreadRunning.set(false);
}
public long getLegacyDeletionCount() { return this.legacyDeletionCount; }
@@ -657,7 +675,6 @@ public class FullDataSourceProviderV2
this.legacyFileHandler.close();
this.migrationThreadRunning.set(false);
this.migrationThreadPool.shutdown();
}
}
@@ -212,7 +212,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
}
int maxQueueCount = MAX_WORLD_GEN_REQUESTS_PER_THREAD * Config.Common.MultiThreading.numberOfWorldGenerationThreads.get();
int maxQueueCount = MAX_WORLD_GEN_REQUESTS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get();
if (this.delayedFullDataSourceSaveCache.getUnsavedCount() >= maxQueueCount)
{
@@ -236,7 +236,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
return true;
}
int worldGenThreadCount = Math.max(Config.Common.MultiThreading.numberOfWorldGenerationThreads.get(), 1);
int worldGenThreadCount = Math.max(Config.Common.MultiThreading.numberOfThreads.get(), 1);
int maxWorldGenTaskCount = worldGenThreadCount * MAX_QUEUED_TASKS_PER_THREAD;
return executor.getQueue().size() > maxWorldGenTaskCount;
}
@@ -83,7 +83,6 @@ public class F3Screen
ThreadPoolExecutor fileHandlerPool = ThreadPoolUtil.getFileHandlerExecutor();
ThreadPoolExecutor updatePool = ThreadPoolUtil.getUpdatePropagatorExecutor();
ThreadPoolExecutor lodBuilderPool = ThreadPoolUtil.getChunkToLodBuilderExecutor();
ThreadPoolExecutor bufferBuilderPool = ThreadPoolUtil.getBufferBuilderExecutor();
AbstractDhWorld world = SharedApi.getAbstractDhWorld();
Iterable<? extends IDhLevel> levelIterator = world.getAllLoadedLevels();
@@ -120,7 +119,6 @@ public class F3Screen
messageList.add(getThreadPoolStatString("File Handler", fileHandlerPool));
messageList.add(getThreadPoolStatString("Update Propagator", updatePool));
messageList.add(getThreadPoolStatString("LOD Builder", lodBuilderPool));
messageList.add(getThreadPoolStatString("Buffer Builder", bufferBuilderPool));
messageList.add("");
}
@@ -191,6 +189,12 @@ public class F3Screen
{
RateLimitedThreadPoolExecutor rateLimitedPool = ((RateLimitedThreadPoolExecutor) pool);
// active threads
int activeThreadCount = rateLimitedPool.semaphoresAcquired.get();
int threadCount = ThreadPoolUtil.getThreadCount();
message += ", Active: "+activeThreadCount+"/"+threadCount;
// thread runtime
String runTimeAvgStr;
double runTimeAvgInMs = rateLimitedPool.getAverageRunTimeInMs();
if (!Double.isNaN(runTimeAvgInMs))
@@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.util;
import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.util.threading.DhThreadFactory;
import com.seibel.distanthorizons.core.util.threading.PrioritySemaphore;
import com.seibel.distanthorizons.core.util.threading.RateLimitedThreadPoolExecutor;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.coreapi.ModInfo;
@@ -53,7 +54,7 @@ public class ThreadUtil
// rate limited pool //
//===================//
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, DhThreadFactory threadFactory, ConfigEntry<Double> runTimeRatioConfigEntry, Semaphore activeThreadCountSemaphore)
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, DhThreadFactory threadFactory, ConfigEntry<Double> runTimeRatioConfigEntry, PrioritySemaphore activeThreadCountSemaphore, int priority)
{
// remove the old listener if one exists
if (THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.containsKey(threadFactory.threadName))
@@ -70,7 +71,7 @@ public class ThreadUtil
}
RateLimitedThreadPoolExecutor executor = makeRateLimitedThreadPool(poolSize, runTimeRatioConfigEntry.get(), threadFactory, activeThreadCountSemaphore);
RateLimitedThreadPoolExecutor executor = makeRateLimitedThreadPool(poolSize, runTimeRatioConfigEntry.get(), threadFactory, activeThreadCountSemaphore, priority);
ConfigChangeListener<Double> changeListener = new ConfigChangeListener<>(runTimeRatioConfigEntry, (newRunTimeRatio) -> { executor.runTimeRatio = newRunTimeRatio; });
THREAD_CHANGE_LISTENERS_BY_THREAD_NAME.put(threadFactory.threadName, changeListener);
@@ -80,13 +81,13 @@ public class ThreadUtil
/** should only be used if there isn't a config controlling the run time ratio of this thread pool */
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, String name, Double runTimeRatio, int threadPriority, Semaphore activeThreadCountSemaphore)
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, String name, Double runTimeRatio, int threadPriority, PrioritySemaphore activeThreadCountSemaphore, int priority)
{
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, new DhThreadFactory(name, threadPriority, false), activeThreadCountSemaphore);
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, new DhThreadFactory(name, threadPriority, false), activeThreadCountSemaphore, priority);
}
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, Double runTimeRatio, DhThreadFactory threadFactory, Semaphore activeThreadCountSemaphore)
public static RateLimitedThreadPoolExecutor makeRateLimitedThreadPool(int poolSize, Double runTimeRatio, DhThreadFactory threadFactory, PrioritySemaphore activeThreadCountSemaphore, int priority)
{
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, threadFactory, activeThreadCountSemaphore);
return new RateLimitedThreadPoolExecutor(poolSize, runTimeRatio, threadFactory, activeThreadCountSemaphore, priority);
}
@@ -23,7 +23,8 @@ import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import java.util.concurrent.Semaphore;
import java.util.List;
import java.util.Queue;
/**
* Handles thread pools with config values for their
@@ -32,11 +33,12 @@ import java.util.concurrent.Semaphore;
public class ConfigThreadPool
{
/** Caution must be used to prevent deadlock */
private final Semaphore activeThreadCountSemaphore;
private final PrioritySemaphore threadSemaphore;
/** higher numbers run first */
private final int priority;
public RateLimitedThreadPoolExecutor executor = null;
private int threadCount = 0;
public int getThreadCount() { return this.threadCount; }
public final DhThreadFactory threadFactory;
@@ -50,10 +52,11 @@ public class ConfigThreadPool
// constructor //
//=============//
public ConfigThreadPool(DhThreadFactory threadFactory, ConfigEntry<Integer> threadCountConfig, ConfigEntry<Double> runTimeRatioConfig, Semaphore activeThreadCountSemaphore)
public ConfigThreadPool(DhThreadFactory threadFactory, ConfigEntry<Integer> threadCountConfig, ConfigEntry<Double> runTimeRatioConfig, PrioritySemaphore threadSemaphore, int priority)
{
this.threadFactory = threadFactory;
this.activeThreadCountSemaphore = activeThreadCountSemaphore;
this.threadSemaphore = threadSemaphore;
this.priority = priority;
this.threadCountConfig = threadCountConfig;
this.threadCountConfigListener = new ConfigChangeListener<>(threadCountConfig,
@@ -72,14 +75,24 @@ public class ConfigThreadPool
public void setThreadPoolSize(int threadPoolSize)
{
Queue<Runnable> incompleteRunnableQueue = null;
if (this.executor != null)
{
// close the previous thread pool if one exists
this.executor.shutdown();
this.executor.shutdown(); // don't do shutdown now since we don't want to throw any interrupt exceptions
incompleteRunnableQueue = this.executor.getQueue();
}
this.threadCount = threadPoolSize;
this.executor = ThreadUtil.makeRateLimitedThreadPool(this.threadCount, this.threadFactory, this.runTimeRatioConfig, this.activeThreadCountSemaphore);
this.executor = ThreadUtil.makeRateLimitedThreadPool(this.threadCount, this.threadFactory, this.runTimeRatioConfig, this.threadSemaphore, this.priority);
if (incompleteRunnableQueue != null)
{
for (Runnable runnable : incompleteRunnableQueue)
{
this.executor.execute(runnable);
}
}
}
/**
@@ -90,7 +103,6 @@ public class ConfigThreadPool
{
if (this.executor != null)
{
//LOGGER.info("Stopping thread pool");
this.executor.shutdownNow();
}
@@ -0,0 +1,161 @@
package com.seibel.distanthorizons.core.util.threading;
import org.jetbrains.annotations.NotNull;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;
/**
* For use with {@link RateLimitedThreadPoolExecutor}
*/
public class PrioritySemaphore
{
public int maxPermitCount;
public int currentPermitCount;
private final PriorityBlockingQueue<ThreadWithPriority> queue = new PriorityBlockingQueue<>();
private final ReentrantLock lock = new ReentrantLock();
private final Random random = new Random();
//=============//
// constructor //
//=============//
public PrioritySemaphore(int permits)
{
this.maxPermitCount = permits;
this.currentPermitCount = this.maxPermitCount;
}
//==================//
// permit acquiring //
//==================//
/** Similar to {@link Semaphore#acquire()} */
public void acquire(RateLimitedThreadPoolExecutor executor) throws InterruptedException
{
Thread thread = Thread.currentThread();
this.lock.lock();
try
{
if (this.currentPermitCount > 0)
{
// a permit is available,
// this thread can run normally
this.currentPermitCount--;
// return to prevent running the thread's wait() method below
return;
}
}
finally
{
this.lock.unlock();
}
// no permit is available
// this has to be outside the try-finally to prevent holding the lock while waiting
synchronized (thread)
{
// random value between -5 and +5 is used to prevent task starvation
// while still allowing higher priority tasks to run sooner
int priority = executor.priority + this.random.nextInt(11) - 5;
// this thread will be run when a permit is available
thread.wait();
this.queue.put(new ThreadWithPriority(thread,priority));
}
}
/** Similar to {@link Semaphore#release()} */
public void release()
{
this.lock.lock();
try
{
// wake up the nex thread if one is queued
if (!this.queue.isEmpty())
{
Thread nextThread = this.queue.poll().thread;
synchronized (nextThread)
{
nextThread.notify();
}
}
else
{
this.currentPermitCount++;
// don't increase past the max allowed (this can happen when changing the max permit count)
this.currentPermitCount = Math.min(this.currentPermitCount, this.maxPermitCount);
}
}
finally
{
this.lock.unlock();
}
}
//=================//
// permit changing //
//=================//
public void changePermitCount(int val)
{
// find the max number of permits to increase by
int permitCountIncrease = Math.max(0, val - this.maxPermitCount);
this.lock.lock();
try
{
this.currentPermitCount += permitCountIncrease;
this.maxPermitCount = val;
}
finally
{
this.lock.unlock();
}
}
public int availablePermits() { return this.currentPermitCount; }
//================//
// helper classes //
//================//
/** simple sortable container to track a thread and it's priority */
private static class ThreadWithPriority implements Comparable<ThreadWithPriority>
{
private final Thread thread;
private final int priority;
public ThreadWithPriority(Thread thread, int priority)
{
this.thread = thread;
this.priority = priority;
}
@Override
public int compareTo(@NotNull ThreadWithPriority other)
{
// highest number first
return Integer.compare(other.priority, this.priority);
}
}
}
@@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.util.threading;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.util.objects.RollingAverage;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.*;
@@ -31,7 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* Can be used to more finely control CPU usage and
* reduce CPU usage if only 1 thread is already assigned.
*/
public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor implements Comparable<RateLimitedThreadPoolExecutor>
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
/** logs include the thread name by default which can help diagnose deadlocks */
@@ -45,13 +46,17 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
/** How long it took this thread to run its last task */
private final ThreadLocal<Long> lastRunDurationNanoTimeRef = ThreadLocal.withInitial(() -> -1L);
private Runnable onTerminatedEventHandler = null;
/** if null the thread pool will run independently of other pools */
/**
* Does nothing if {@link RateLimitedThreadPoolExecutor#threadSemaphore}
* is null. <br>
* Higher numbers have higher priority.
*/
public final int priority;
/** if null this thread pool will run independently of other pools */
@Nullable
private final Semaphore activeThreadCountSemaphore;
public final PrioritySemaphore threadSemaphore;
/** will always be zero if no semaphore is present */
private final AtomicInteger semaphoresAcquired = new AtomicInteger(0);
public final AtomicInteger semaphoresAcquired = new AtomicInteger(0);
private final RollingAverage runTimeInMsRollingAverage = new RollingAverage(200);
@@ -61,8 +66,9 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
// constructors //
//==============//
public RateLimitedThreadPoolExecutor(int corePoolSize, double runTimeRatio, ThreadFactory threadFactory) { this(corePoolSize, runTimeRatio, threadFactory, null); }
public RateLimitedThreadPoolExecutor(int corePoolSize, double runTimeRatio, ThreadFactory threadFactory, @Nullable Semaphore activeThreadCountSemaphore)
public RateLimitedThreadPoolExecutor(
int corePoolSize, double runTimeRatio, ThreadFactory threadFactory,
@Nullable PrioritySemaphore prioritySemaphore, int priority)
{
super(corePoolSize, corePoolSize,
0L, TimeUnit.MILLISECONDS,
@@ -70,7 +76,8 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
threadFactory);
this.runTimeRatio = runTimeRatio;
this.activeThreadCountSemaphore = activeThreadCountSemaphore;
this.priority = priority;
this.threadSemaphore = prioritySemaphore;
}
@@ -96,17 +103,17 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
catch (InterruptedException ignored) { }
}
if (this.activeThreadCountSemaphore != null)
if (this.threadSemaphore != null)
{
try
{
// Warning, this can cause deadlock if one thread calls another.
this.activeThreadCountSemaphore.acquire();
this.threadSemaphore.acquire(this);
this.semaphoresAcquired.getAndAdd(1);
if (LOG_SEMAPHORE_ACTIONS)
{
LOGGER.debug("acquired, available count: ["+this.activeThreadCountSemaphore.availablePermits()+"]");
LOGGER.debug("acquired, available count: ["+this.threadSemaphore.availablePermits()+"]");
}
}
catch (InterruptedException ignore) { }
@@ -123,14 +130,14 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
this.lastRunDurationNanoTimeRef.set(System.nanoTime() - this.runStartNanoTimeRef.get());
if (this.activeThreadCountSemaphore != null)
if (this.threadSemaphore != null)
{
this.activeThreadCountSemaphore.release();
this.threadSemaphore.release();
this.semaphoresAcquired.getAndAdd(-1);
if (LOG_SEMAPHORE_ACTIONS)
{
LOGGER.debug("released, available count: ["+this.activeThreadCountSemaphore.availablePermits()+"]");
LOGGER.debug("released, available count: ["+this.threadSemaphore.availablePermits()+"]");
}
}
}
@@ -139,20 +146,13 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
protected void terminated()
{
super.terminated();
if (this.onTerminatedEventHandler != null)
{
this.onTerminatedEventHandler.run();
}
// release all held semaphores (shouldn't normally be necessary, but just in case)
if (this.activeThreadCountSemaphore != null)
if (this.threadSemaphore != null)
{
int semaphoresAcquired = this.semaphoresAcquired.getAndSet(0);
this.activeThreadCountSemaphore.release(semaphoresAcquired);
if (LOG_SEMAPHORE_ACTIONS)
{
LOGGER.info("terminated, released ["+semaphoresAcquired+"], available count: ["+this.activeThreadCountSemaphore.availablePermits()+"]");
LOGGER.info("terminated, released ["+this.semaphoresAcquired+"], available count: ["+this.threadSemaphore.availablePermits()+"]");
}
}
}
@@ -160,13 +160,25 @@ public class RateLimitedThreadPoolExecutor extends ThreadPoolExecutor
//==============//
// custom logic //
// running time //
//==============//
/** only one event handler can be present at a time */
public void setOnTerminatedEventHandler(Runnable runnable) { this.onTerminatedEventHandler = runnable; }
/** will return Nan if nothing has been submitted yet */
public double getAverageRunTimeInMs() { return this.runTimeInMsRollingAverage.getAverage(); }
//============//
// comparison //
//============//
@Override
public int compareTo(@NotNull RateLimitedThreadPoolExecutor other)
{
// highest number first
return Integer.compare(other.priority, this.priority);
}
}
@@ -24,7 +24,6 @@ import com.seibel.distanthorizons.core.config.listeners.ConfigChangeListener;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -73,31 +72,24 @@ public class ThreadPoolUtil
//======================//
// worker threads pools //
//======================//
public static final String FULL_DATA_MIGRATION_THREAD_NAME = "Full Data Migration";
private static ThreadPoolExecutor fullDataMigrationThreadPool;
@Nullable
public static ThreadPoolExecutor getFullDataMigrationExecutor() { return fullDataMigrationThreadPool; }
// worker thread pools are generally related with LOD building
// and all share an underlying number of threads.
// WARNING: great care should be used when setting up these threads since deadlock can occur if they are handled poorly.
public static final DhThreadFactory CHUNK_TO_LOD_BUILDER_THREAD_FACTORY = new DhThreadFactory("LOD Builder - Chunk to Lod Builder", Thread.MIN_PRIORITY, false);
private static ConfigThreadPool chunkToLodBuilderThreadPool;
@Nullable
public static ThreadPoolExecutor getChunkToLodBuilderExecutor() { return (chunkToLodBuilderThreadPool != null) ? chunkToLodBuilderThreadPool.executor : null; }
public static final DhThreadFactory BUFFER_BUILDER_THREAD_FACTORY = new DhThreadFactory("LOD Builder - Buffer Builder", Thread.MIN_PRIORITY, false);
private static ConfigThreadPool bufferBuilderThreadPool;
@Nullable
public static ThreadPoolExecutor getBufferBuilderExecutor() { return (bufferBuilderThreadPool != null) ? bufferBuilderThreadPool.executor : null; }
/** how many total worker threads can be used */
private static int workerThreadSemaphoreCount = Config.Common.MultiThreading.numberOfLodBuilderThreads.get();
public static int getWorkerThreadCount() { return workerThreadSemaphoreCount; }
private static Semaphore workerThreadSemaphore = null;
private static ConfigChangeListener<Integer> workerThreadSemaphoreConfigListener = null;
/** how many total threads can be used */
private static int threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get();
public static int getThreadCount() { return threadSemaphoreCount; }
private static PrioritySemaphore threadSemaphore = null;
private static ConfigChangeListener<Integer> threadSemaphoreConfigListener = null;
@@ -107,43 +99,40 @@ public class ThreadPoolUtil
public static void setupThreadPools()
{
// standalone threads //
// create thread semaphore
threadSemaphoreCount = Config.Common.MultiThreading.numberOfThreads.get();
threadSemaphore = new PrioritySemaphore(threadSemaphoreCount);
threadSemaphoreConfigListener = new ConfigChangeListener<>(Config.Common.MultiThreading.numberOfThreads, (val) ->
{
threadSemaphore.changePermitCount(val);
threadSemaphoreCount = val;
});
fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY, Config.Common.MultiThreading.numberOfFileHandlerThreads, Config.Common.MultiThreading.runTimeRatioForFileHandlerThreads, null);
updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY, Config.Common.MultiThreading.numberOfUpdatePropagatorThreads, Config.Common.MultiThreading.runTimeRatioForUpdatePropagatorThreads, null);
worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY, Config.Common.MultiThreading.numberOfWorldGenerationThreads, Config.Common.MultiThreading.runTimeRatioForWorldGenerationThreads, null);
networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY, Config.Common.MultiThreading.numberOfNetworkCompressionThreads, Config.Common.MultiThreading.runTimeRatioForNetworkCompressionThreads, null);
// thread pools
chunkToLodBuilderThreadPool = new ConfigThreadPool(CHUNK_TO_LOD_BUILDER_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 3); // We want to make sure any chunk changes are found
fileHandlerThreadPool = new ConfigThreadPool(FILE_HANDLER_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 2); // loading in new LODs is second highest priority
updatePropagatorThreadPool = new ConfigThreadPool(UPDATE_PROPAGATOR_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 1); // update propagation needs to be slightly higher priority than world gen
worldGenThreadPool = new ConfigThreadPool(WORLD_GEN_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 0); // higher priorities mean the threads will run first
networkCompressionThreadPool = new ConfigThreadPool(NETWORK_COMPRESSION_THREAD_FACTORY,
Config.Common.MultiThreading.numberOfThreads, Config.Common.MultiThreading.threadRunTimeRatio,
threadSemaphore, 0); // networking can probably have similar priority to world gen since they work similarly
// single thread pools
cleanupThreadPool = ThreadUtil.makeSingleThreadPool(CLEANUP_THREAD_NAME);
beaconCullingThreadPool = ThreadUtil.makeSingleThreadPool(BEACON_CULLING_THREAD_NAME);
// worker threads //
// create thread semaphore
if (Config.Common.MultiThreading.enableLodBuilderThreadLimiting.get())
{
workerThreadSemaphoreCount = Config.Common.MultiThreading.numberOfLodBuilderThreads.get();
workerThreadSemaphore = new Semaphore(workerThreadSemaphoreCount);
workerThreadSemaphoreConfigListener = new ConfigChangeListener<>(Config.Common.MultiThreading.numberOfLodBuilderThreads, (val) ->
{
int changePermit = val - workerThreadSemaphoreCount;
if (changePermit > 0)
{
workerThreadSemaphore.release(changePermit);
}
else
{
workerThreadSemaphore.acquireUninterruptibly(changePermit * -1);
}
workerThreadSemaphoreCount = workerThreadSemaphoreCount + changePermit;
});
}
// create thread pools
chunkToLodBuilderThreadPool = new ConfigThreadPool(CHUNK_TO_LOD_BUILDER_THREAD_FACTORY, Config.Common.MultiThreading.numberOfLodBuilderThreads, Config.Common.MultiThreading.runTimeRatioForLodBuilderThreads, workerThreadSemaphore);
bufferBuilderThreadPool = new ConfigThreadPool(BUFFER_BUILDER_THREAD_FACTORY, Config.Common.MultiThreading.numberOfLodBuilderThreads, Config.Common.MultiThreading.runTimeRatioForLodBuilderThreads, workerThreadSemaphore);
fullDataMigrationThreadPool = ThreadUtil.makeSingleThreadPool(FULL_DATA_MIGRATION_THREAD_NAME);
}
@@ -156,18 +145,15 @@ public class ThreadPoolUtil
networkCompressionThreadPool.shutdownExecutorService();
cleanupThreadPool.shutdown();
beaconCullingThreadPool.shutdown();
fullDataMigrationThreadPool.shutdown();
chunkToLodBuilderThreadPool.shutdownExecutorService();
threadSemaphore = null;
// worker threads
ThreadPoolUtil.chunkToLodBuilderThreadPool.shutdownExecutorService();
ThreadPoolUtil.bufferBuilderThreadPool.shutdownExecutorService();
workerThreadSemaphore = null;
if (workerThreadSemaphoreConfigListener != null)
if (threadSemaphoreConfigListener != null)
{
workerThreadSemaphoreConfigListener.close();
workerThreadSemaphoreConfigListener = null;
threadSemaphoreConfigListener.close();
threadSemaphoreConfigListener = null;
}
}
@@ -609,53 +609,13 @@
"distanthorizons.config.common.multiThreading":
"Multi-Threading",
"distanthorizons.config.common.multiThreading.numberOfWorldGenerationThreads":
"NO. of world generation threads",
"distanthorizons.config.common.multiThreading.numberOfWorldGenerationThreads.@tooltip":
"How many threads should be used when generating LODs \n outside the normal render distance?\n\nIf you experience stuttering when generating distant LODs, \ndecrease this number. If you want to increase LOD \ngeneration speed, increase this number.",
"distanthorizons.config.common.multiThreading.runTimeRatioForWorldGenerationThreads":
"Runtime % for world generation threads",
"distanthorizons.config.common.multiThreading.numberOfThreads":
"NO. of threads",
"distanthorizons.config.common.multiThreading.numberOfThreads.@tooltip":
"How many threads DH will use.",
"distanthorizons.config.common.multiThreading.threadRunTimeRatio":
"Runtime % for threads",
"distanthorizons.config.common.multiThreading.numberOfBufferBuilderThreads":
"NO. of buffer builder threads",
"distanthorizons.config.common.multiThreading.numberOfBufferBuilderThreads.@tooltip":
"The number of threads used when building geometry data. \nCan only be between 1 and your CPU's processor count.",
"distanthorizons.config.common.multiThreading.runTimeRatioForBufferBuilderThreads":
"Runtime % for buffer builder threads",
"distanthorizons.config.common.multiThreading.numberOfFileHandlerThreads":
"NO. of file handler threads",
"distanthorizons.config.common.multiThreading.numberOfFileHandlerThreads.@tooltip":
"The number of threads used when reading/writing LOD data to/from the disk. \nCan only be between 1 and your CPU's processor count.",
"distanthorizons.config.common.multiThreading.runTimeRatioForFileHandlerThreads":
"Runtime % for file handler threads",
"distanthorizons.config.common.multiThreading.numberOfUpdatePropagatorThreads":
"NO. of update propagator threads",
"distanthorizons.config.common.multiThreading.numberOfUpdatePropagatorThreads.@tooltip":
"How many threads should be used when applying LOD updates? \nAn LOD update is the operation of down-sampling a high detail LOD \ninto a lower detail one. \n\nThis config can have a much higher number of threads \nassigned and much lower run time ratio vs other thread pools \nbecause the amount of time any particular thread may run is relatively low.",
"distanthorizons.config.common.multiThreading.runTimeRatioForUpdatePropagatorThreads":
"Runtime % for update propagator threads",
"distanthorizons.config.common.multiThreading.numberOfNetworkCompressionThreads":
"NO. of Network Compression threads",
"distanthorizons.config.common.multiThreading.numberOfNetworkCompressionThreads.@tooltip":
"How many threads should be used when (de)compressing LODs \nthat are received/sent over the network?",
"distanthorizons.config.common.multiThreading.runTimeRatioForNetworkCompressionThreads":
"Runtime % for Network Compression threads",
"distanthorizons.config.common.multiThreading.numberOfLodBuilderThreads":
"NO. of LOD builder threads",
"distanthorizons.config.common.multiThreading.numberOfLodBuilderThreads.@tooltip":
"The number of threads used when building LODs. \nThese threads run when terrain is generated, when \ncertain graphics settings are changed, and when moving around the world.",
"distanthorizons.config.common.multiThreading.runTimeRatioForLodBuilderThreads":
"Runtime % for LOD builder threads",
"distanthorizons.config.common.multiThreading.enableLodBuilderThreadLimiting":
"Enable LOD builder thread limiting",
"distanthorizons.config.common.multiThreading.enableLodBuilderThreadLimiting.@tooltip":
"Should only be disabled if deadlock occurs and LODs refuse to update. \nThis will cause CPU usage to drastically increase for the Lod Builder threads. \nNote that if a deadlock did occur restarting MC may be necessary to stop the locked threads.",
"distanthorizons.config.common.logging":