Speed up shutdown and reduce logging

This commit is contained in:
James Seibel
2025-11-14 07:46:02 -06:00
parent 6fe0df7d0f
commit b82a59ecbc
11 changed files with 129 additions and 81 deletions
@@ -47,6 +47,7 @@ import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
@@ -76,14 +77,11 @@ public class FullDataSourceProviderV2 implements IDebugRenderable, AutoCloseable
public static final byte LEAF_SECTION_DETAIL_LEVEL = DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL; public static final byte LEAF_SECTION_DETAIL_LEVEL = DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL;
protected final ReentrantLock closeLock = new ReentrantLock();
protected volatile boolean isShutdown = false;
protected final File saveDir;
public final FullDataSourceV2Repo repo; public final FullDataSourceV2Repo repo;
protected final AtomicBoolean isShutdownRef = new AtomicBoolean(false);
protected final File saveDir;
protected final IDhLevel level; protected final IDhLevel level;
protected final String levelId; protected final String levelId;
@@ -174,6 +172,11 @@ public class FullDataSourceProviderV2 implements IDebugRenderable, AutoCloseable
*/ */
public CompletableFuture<FullDataSourceV2> getAsync(long pos) public CompletableFuture<FullDataSourceV2> getAsync(long pos)
{ {
if (this.isShutdownRef.get())
{
return CompletableFuture.completedFuture(null);
}
AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor(); AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor();
if (executor == null || executor.isTerminated()) if (executor == null || executor.isTerminated())
{ {
@@ -199,6 +202,11 @@ public class FullDataSourceProviderV2 implements IDebugRenderable, AutoCloseable
@Nullable @Nullable
public FullDataSourceV2 get(long pos) public FullDataSourceV2 get(long pos)
{ {
if (this.isShutdownRef.get())
{
return null;
}
try(FullDataSourceV2DTO dto = this.repo.getByKey(pos)) try(FullDataSourceV2DTO dto = this.repo.getByKey(pos))
{ {
if (dto == null) if (dto == null)
@@ -267,6 +275,11 @@ public class FullDataSourceProviderV2 implements IDebugRenderable, AutoCloseable
*/ */
public FullDataSourceV2 getAdjForDirection(long pos, EDhDirection direction) public FullDataSourceV2 getAdjForDirection(long pos, EDhDirection direction)
{ {
if (this.isShutdownRef.get())
{
return null;
}
try(FullDataSourceV2DTO dto = this.repo.getAdjByPosAndDirection(pos, direction)) try(FullDataSourceV2DTO dto = this.repo.getAdjByPosAndDirection(pos, direction))
{ {
if (dto == null) if (dto == null)
@@ -386,7 +399,14 @@ public class FullDataSourceProviderV2 implements IDebugRenderable, AutoCloseable
@Nullable @Nullable
public Long getTimestampForPos(long pos) public Long getTimestampForPos(long pos)
{ return this.repo.getTimestampForPos(pos); } {
if (this.isShutdownRef.get())
{
return null;
}
return this.repo.getTimestampForPos(pos);
}
@@ -416,28 +436,15 @@ public class FullDataSourceProviderV2 implements IDebugRenderable, AutoCloseable
@Override @Override
public void close() public void close()
{ {
try LOGGER.debug("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "].");
{
LOGGER.debug("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "]."); this.isShutdownRef.set(true);
this.closeLock.lock(); this.dataUpdater.close();
this.isShutdown = true; this.updatePropagator.close();
this.dataMigratorV1.close();
this.dataUpdater.close();
this.updatePropagator.close(); this.repo.close();
this.dataMigratorV1.close();
// wait a moment so any queued saves can finish queuing,
// otherwise we might not see everything that needs saving and attempt to use a closed repo
Thread.sleep(200);
this.repo.close();
}
catch (InterruptedException ignore) { }
finally
{
this.closeLock.unlock();
}
} }
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseable public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseable
@@ -41,9 +42,6 @@ public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseab
{ return NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get(); } { return NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get(); }
private final FullDataSourceProviderV2 provider;
private final FullDataUpdaterV2 dataUpdater;
/** /**
* Tracks which positions are currently being updated * Tracks which positions are currently being updated
@@ -59,9 +57,14 @@ public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseab
@Nullable @Nullable
public final ThreadPoolExecutor updateQueueProcessor; public final ThreadPoolExecutor updateQueueProcessor;
private final AtomicBoolean isShutdownRef = new AtomicBoolean(false);
private final String levelId; private final String levelId;
private final FullDataSourceProviderV2 provider;
private final FullDataUpdaterV2 dataUpdater;
//=============// //=============//
// constructor // // constructor //
@@ -125,8 +128,6 @@ public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseab
LOGGER.error("Unexpected error in the parent update queue thread. Error: " + e.getMessage(), e); LOGGER.error("Unexpected error in the parent update queue thread. Error: " + e.getMessage(), e);
} }
} }
LOGGER.info("Update thread ["+Thread.currentThread().getName()+"] terminated.");
} }
/** will always apply updates */ /** will always apply updates */
private void runParentUpdates(PriorityTaskPicker.Executor executor, DhBlockPos targetBlockPos) private void runParentUpdates(PriorityTaskPicker.Executor executor, DhBlockPos targetBlockPos)
@@ -387,20 +388,10 @@ public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseab
@Override @Override
public void close() public void close()
{ {
try if (this.updateQueueProcessor != null)
{ {
//LOGGER.info("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "]."); this.updateQueueProcessor.shutdownNow();
if (this.updateQueueProcessor != null)
{
this.updateQueueProcessor.shutdownNow();
}
// wait a moment so any queued saves can finish queuing,
// otherwise we might not see everything that needs saving and attempt to use a closed repo
Thread.sleep(200);
} }
catch (InterruptedException ignore) { }
} }
@@ -19,6 +19,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@@ -26,10 +27,6 @@ public class FullDataUpdaterV2 implements IDebugRenderable, AutoCloseable
{ {
private static final DhLogger LOGGER = new DhLoggerBuilder().build(); private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private final FullDataSourceProviderV2 provider;
protected final PositionalLockProvider updateLockProvider = new PositionalLockProvider(); protected final PositionalLockProvider updateLockProvider = new PositionalLockProvider();
/** /**
* generally just used for debugging, * generally just used for debugging,
@@ -41,6 +38,9 @@ public class FullDataUpdaterV2 implements IDebugRenderable, AutoCloseable
public final ArrayList<IDataSourceUpdateListenerFunc<FullDataSourceV2>> dateSourceUpdateListeners = new ArrayList<>(); public final ArrayList<IDataSourceUpdateListenerFunc<FullDataSourceV2>> dateSourceUpdateListeners = new ArrayList<>();
private final String levelId; private final String levelId;
private final AtomicBoolean isShutdownRef = new AtomicBoolean(false);
private final FullDataSourceProviderV2 provider;
@@ -67,6 +67,11 @@ public class FullDataUpdaterV2 implements IDebugRenderable, AutoCloseable
*/ */
public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource) public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource)
{ {
if (this.isShutdownRef.get())
{
return CompletableFuture.completedFuture(null);
}
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor(); AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor == null || executor.isTerminated()) if (executor == null || executor.isTerminated())
{ {
@@ -104,6 +109,12 @@ public class FullDataUpdaterV2 implements IDebugRenderable, AutoCloseable
/** After this method returns the inputData will be written to file. */ /** After this method returns the inputData will be written to file. */
public void updateDataSource(@NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos) public void updateDataSource(@NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos)
{ {
if (this.isShutdownRef.get())
{
return;
}
long updatePos = inputData.getPos(); long updatePos = inputData.getPos();
boolean methodLocked = false; boolean methodLocked = false;
@@ -231,7 +242,7 @@ public class FullDataUpdaterV2 implements IDebugRenderable, AutoCloseable
@Override @Override
public void close() public void close()
{ {
//LOGGER.info("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "]."); this.isShutdownRef.set(true);
} }
@@ -621,19 +621,13 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
LodUtil.assertTrue(this.generatorClosingFuture != null); LodUtil.assertTrue(this.generatorClosingFuture != null);
LOGGER.info("Awaiting world generator thread pool termination..."); LOGGER.info("Shutting down world generator thread pool...");
try
AbstractExecutorService executor = ThreadPoolUtil.getWorldGenExecutor();
if (executor != null)
{ {
int waitTimeInSeconds = 3; List<Runnable> tasks = executor.shutdownNow();
AbstractExecutorService executor = ThreadPoolUtil.getWorldGenExecutor(); LOGGER.info("World generator thread pool shutdown with [" + tasks.size() + "] incomplete tasks.");
if (executor != null && !executor.awaitTermination(waitTimeInSeconds, TimeUnit.SECONDS))
{
LOGGER.warn("World generator thread pool shutdown didn't complete after [" + waitTimeInSeconds + "] seconds. Some world generator requests may still be running.");
}
}
catch (InterruptedException e)
{
LOGGER.warn("World generator thread pool shutdown interrupted! Ignoring child threads...", e);
} }
@@ -107,7 +107,6 @@ public class DhServerLevel extends AbstractDhServerLevel
{ {
super.close(); super.close();
this.serverside.close(); this.serverside.close();
LOGGER.info("Closed DHLevel for ["+this.getLevelWrapper()+"].");
} }
} }
@@ -369,7 +369,7 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
if (DbConnectionClosedException.isClosedException(e)) if (DbConnectionClosedException.isClosedException(e))
{ {
throw new DbConnectionClosedException(e); return new ArrayList<>();
} }
else else
{ {
@@ -98,10 +98,6 @@ public class PriorityTaskPicker
// Clear this executor's tasks since we no longer expect anything to execute. // Clear this executor's tasks since we no longer expect anything to execute.
executor.taskQueue.clear(); executor.taskQueue.clear();
} }
else
{
throw e;
}
} }
} }
} }
@@ -10,8 +10,10 @@ import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public abstract class AbstractDhServerWorld<TDhServerLevel extends AbstractDhServerLevel> extends AbstractDhWorld implements IDhServerWorld public abstract class AbstractDhServerWorld<TDhServerLevel extends AbstractDhServerLevel> extends AbstractDhWorld implements IDhServerWorld
@@ -134,10 +136,9 @@ public abstract class AbstractDhServerWorld<TDhServerLevel extends AbstractDhSer
@Override @Override
public void close() public void close()
{ {
ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>();
for (TDhServerLevel level : this.dhLevelByLevelWrapper.values()) for (TDhServerLevel level : this.dhLevelByLevelWrapper.values())
{ {
LOGGER.info("Unloading level [" + level.getLevelWrapper().getDhIdentifier() + "].");
// level wrapper shouldn't be null, but just in case // level wrapper shouldn't be null, but just in case
IServerLevelWrapper serverLevelWrapper = level.getServerLevelWrapper(); IServerLevelWrapper serverLevelWrapper = level.getServerLevelWrapper();
if (serverLevelWrapper != null) if (serverLevelWrapper != null)
@@ -145,7 +146,23 @@ public abstract class AbstractDhServerWorld<TDhServerLevel extends AbstractDhSer
serverLevelWrapper.onUnload(); serverLevelWrapper.onUnload();
} }
level.close();
// close levels asynchronously to speed up
// shutdown on servers with a lot of levels
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Thread closeThread = new Thread(() ->
{
level.close();
closeFuture.complete(null);
}, "level shutdown");
closeThread.start();
closeFutures.add(closeFuture);
}
// wait for all the levels to finish closing
for (CompletableFuture<Void> future : closeFutures)
{
future.join();
} }
this.dhLevelByLevelWrapper.clear(); this.dhLevelByLevelWrapper.clear();
@@ -28,9 +28,11 @@ import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
public class DhClientServerWorld extends AbstractDhServerWorld<DhClientServerLevel> implements IDhClientWorld public class DhClientServerWorld extends AbstractDhServerWorld<DhClientServerLevel> implements IDhClientWorld
@@ -143,13 +145,13 @@ public class DhClientServerWorld extends AbstractDhServerWorld<DhClientServerLev
@Override @Override
public synchronized void close() public synchronized void close()
{ {
ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>();
synchronized (this.dhLevels) synchronized (this.dhLevels)
{ {
// close each level // close each level
for (DhClientServerLevel level : this.dhLevels) for (DhClientServerLevel level : this.dhLevels)
{ {
LOGGER.info("Unloading level [" + level.getServerLevelWrapper().getDhIdentifier() + "].");
// level wrapper shouldn't be null, but just in case // level wrapper shouldn't be null, but just in case
IServerLevelWrapper serverLevelWrapper = level.getServerLevelWrapper(); IServerLevelWrapper serverLevelWrapper = level.getServerLevelWrapper();
if (serverLevelWrapper != null) if (serverLevelWrapper != null)
@@ -157,10 +159,26 @@ public class DhClientServerWorld extends AbstractDhServerWorld<DhClientServerLev
serverLevelWrapper.onUnload(); serverLevelWrapper.onUnload();
} }
level.close(); // close levels asynchronously to speed up
// shutdown on servers with a lot of levels
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Thread closeThread = new Thread(() ->
{
level.close();
closeFuture.complete(null);
}, "level shutdown");
closeThread.start();
closeFutures.add(closeFuture);
} }
} }
// wait for all the levels to finish closing
for (CompletableFuture<Void> future : closeFutures)
{
future.join();
}
this.dhLevelByLevelWrapper.clear(); this.dhLevelByLevelWrapper.clear();
this.eventLoop.close(); this.eventLoop.close();
LOGGER.info("Closed DhWorld of type " + this.environment); LOGGER.info("Closed DhWorld of type " + this.environment);
@@ -29,7 +29,9 @@ import com.seibel.distanthorizons.core.wrapperInterfaces.world.IClientLevelWrapp
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper; import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@@ -127,14 +129,11 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
public void close() public void close()
{ {
this.networkState.close(); this.networkState.close();
this.dhTickerThread.shutdownNow(); this.dhTickerThread.shutdownNow();
ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>();
for (DhClientLevel dhClientLevel : this.levels.values()) for (DhClientLevel dhClientLevel : this.levels.values())
{ {
LOGGER.info("Unloading level [" + dhClientLevel.getLevelWrapper().getDhIdentifier() + "].");
// level wrapper shouldn't be null, but just in case // level wrapper shouldn't be null, but just in case
IClientLevelWrapper clientLevelWrapper = dhClientLevel.getClientLevelWrapper(); IClientLevelWrapper clientLevelWrapper = dhClientLevel.getClientLevelWrapper();
if (clientLevelWrapper != null) if (clientLevelWrapper != null)
@@ -142,7 +141,23 @@ public class DhClientWorld extends AbstractDhWorld implements IDhClientWorld
clientLevelWrapper.onUnload(); clientLevelWrapper.onUnload();
} }
dhClientLevel.close();
// close levels asynchronously to speed up
// shutdown on servers with a lot of levels
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Thread closeThread = new Thread(() ->
{
dhClientLevel.close();
closeFuture.complete(null);
}, "level shutdown");
closeThread.start();
closeFutures.add(closeFuture);
}
// wait for all the levels to finish closing
for (CompletableFuture<Void> future : closeFutures)
{
future.join();
} }
this.levels.clear(); this.levels.clear();
@@ -64,7 +64,7 @@ public class DhServerWorld extends AbstractDhServerWorld<DhServerLevel>
if (this.dhLevelByLevelWrapper.containsKey(wrapper)) if (this.dhLevelByLevelWrapper.containsKey(wrapper))
{ {
LOGGER.info("Unloading level {} ", this.dhLevelByLevelWrapper.get(wrapper)); DhServerLevel level = this.dhLevelByLevelWrapper.get(wrapper);
wrapper.onUnload(); wrapper.onUnload();
this.dhLevelByLevelWrapper.remove(wrapper).close(); this.dhLevelByLevelWrapper.remove(wrapper).close();
} }