Multithread full data migration

This commit is contained in:
James Seibel
2024-03-12 22:00:36 -05:00
parent 996621887c
commit c3f99835db
@@ -41,9 +41,7 @@ import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@@ -53,10 +51,6 @@ public class NewFullDataFileHandler
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
/** how many data sources should be pulled down for migration at once */
private static final int MIGRATION_BATCH_COUNT = 20;
private static final String MIGRATION_THREAD_NAME_PREFIX = "Full Data Migration Thread: ";
protected static final int NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD = 50;
/** how many parent update tasks can be in the queue at once */
protected static final int MAX_UPDATE_TASK_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Client.Advanced.MultiThreading.numberOfFileHandlerThreads.get();
@@ -64,6 +58,12 @@ public class NewFullDataFileHandler
/** indicates how long the update queue thread should wait between queuing ticks */
protected static final int UPDATE_QUEUE_THREAD_DELAY_IN_MS = 250;
/** how many data sources should be pulled down for migration at once */
private static final int MIGRATION_BATCH_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD;
private static final String MIGRATION_THREAD_NAME_PREFIX = "Full Data Migration Thread: ";
/** 1 minute */
private static final int MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS = 60 * 1_000;
protected final ThreadPoolExecutor migrationThreadPool;
/**
@@ -321,20 +321,27 @@ public class NewFullDataFileHandler
{
LOGGER.info("Migrating ["+dimensionName+"] - [" + progressCount + "/" + totalCount + "]...");
ArrayList<CompletableFuture<Void>> updateFutureList = new ArrayList<>();
for (int i = 0; i < legacyDataSourceList.size() && this.migrationThreadRunning.get(); i++)
{
CompleteFullDataSource legacyDataSource = legacyDataSourceList.get(i);
try
{
// convert the legacy data source to the new format
// convert the legacy data source to the new format,
// this is a relatively cheap operation
NewFullDataSource newDataSource = NewFullDataSource.createFromCompleteDataSource(legacyDataSource);
newDataSource.applyToParent = true;
this.updateDataSourceAtPos(newDataSource.getSectionPos(), newDataSource, true);
// the legacy data source can now be deleted
this.legacyFileHandler.repo.deleteWithKey(legacyDataSource.getSectionPos());
// the actual update process can be moderately expensive due to having to update
// the render data along with the full data, so running it async on the update threads gains us a good bit of speed
CompletableFuture<Void> future = this.updateDataSourceAsync(newDataSource);
updateFutureList.add(future);
future.thenRun(() ->
{
// after the update finishes the legacy data source can be safely deleted
this.legacyFileHandler.repo.deleteWithKey(legacyDataSource.getSectionPos());
});
}
catch (Exception e)
{
@@ -344,6 +351,18 @@ public class NewFullDataFileHandler
}
}
try
{
// wait for each thread to finish updating
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(updateFutureList.toArray(new CompletableFuture[0]));
combinedFutures.get(MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
LOGGER.warn("Migration update timed out. Migration will re-try the same positions again. Error:"+e.getMessage(), e);
}
legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
progressCount += legacyDataSourceList.size();
}