Split up full data source provider into multiple classes

This commit is contained in:
James Seibel
2025-11-04 07:46:06 -06:00
parent bf05965015
commit 4d4d8fd8e9
16 changed files with 1412 additions and 1268 deletions
@@ -27,7 +27,7 @@ import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.FullDataPointIdMap;
import com.seibel.distanthorizons.core.dataObjects.transformers.FullDataOcclusionCuller;
import com.seibel.distanthorizons.core.dataObjects.transformers.LodDataBuilder;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pooling.AbstractPhantomArrayList;
@@ -1,377 +0,0 @@
package com.seibel.distanthorizons.core.file;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataSourceProvider;
import com.seibel.distanthorizons.core.file.fullDatafile.RemoteFullDataSourceProvider;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.dto.IBaseDTO;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.threading.PositionalLockProvider;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.logging.DhLogger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* @see FullDataSourceProviderV2
* @see RemoteFullDataSourceProvider
* @see GeneratedFullDataSourceProvider
*/
public abstract class AbstractDataSourceHandler
<TDataSource extends IDataSource,
TDTO extends IBaseDTO<Long>,
TRepo extends AbstractDhRepo<Long, TDTO>,
TDhLevel extends IDhLevel>
implements AutoCloseable
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private static final Set<String> CORRUPT_DATA_ERRORS_LOGGED = Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* The highest numerical detail level possible.
* Used when determining which positions to update.
*
* @see AbstractDataSourceHandler#MIN_SECTION_DETAIL_LEVEL
*/
public static final byte TOP_SECTION_DETAIL_LEVEL = DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL + LodUtil.REGION_DETAIL_LEVEL;
/**
* The lowest numerical detail level possible.
*
* @see AbstractDataSourceHandler#TOP_SECTION_DETAIL_LEVEL
*/
public static final byte MIN_SECTION_DETAIL_LEVEL = DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL;
protected final PositionalLockProvider updateLockProvider = new PositionalLockProvider();
/**
* generally just used for debugging,
* keeps track of which positions are currently locked.
*/
public final Set<Long> lockedPosSet = ConcurrentHashMap.newKeySet();
public final ConcurrentHashMap<Long, AtomicInteger> queuedUpdateCountsByPos = new ConcurrentHashMap<>();
protected final ReentrantLock closeLock = new ReentrantLock();
protected volatile boolean isShutdown = false;
protected final TDhLevel level;
protected final File saveDir;
public final TRepo repo;
public final ArrayList<IDataSourceUpdateListenerFunc<TDataSource>> dateSourceUpdateListeners = new ArrayList<>();
//=============//
// constructor //
//=============//
public AbstractDataSourceHandler(TDhLevel level, ISaveStructure saveStructure) { this(level, saveStructure, null); }
public AbstractDataSourceHandler(TDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride)
{
this.level = level;
this.saveDir = (saveDirOverride == null) ? saveStructure.getSaveFolder(level.getLevelWrapper()) : saveDirOverride;
this.repo = this.createRepo();
}
//==================//
// abstract methods //
//==================//
/** When this is called the parent folders should be created */
protected abstract TRepo createRepo();
protected abstract TDataSource createDataSourceFromDto(TDTO dto) throws InterruptedException, IOException, DataCorruptedException;
protected abstract TDTO createDtoFromDataSource(TDataSource dataSource);
protected abstract TDataSource makeEmptyDataSource(long pos);
//==============//
// data reading //
//==============//
/**
* Returns the {@link TDataSource} for the given section position. <Br>
* The returned data source may be null if repo is in the process of shutting down. <Br> <Br>
*
* This call is concurrent. I.e. it supports being called by multiple threads at the same time.
*/
public CompletableFuture<TDataSource> getAsync(long pos)
{
AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
}
try
{
return CompletableFuture.supplyAsync(() -> this.get(pos), executor);
}
catch (RejectedExecutionException ignore)
{
// the thread pool was probably shut down because it's size is being changed, just wait a sec and it should be back
return CompletableFuture.completedFuture(null);
}
}
/**
* Should only be used in internal file handler methods where we are already running on a file handler thread.
* Can return null if the repo is in the process of being shut down
* @see AbstractDataSourceHandler#getAsync(long)
*/
@Nullable
public TDataSource get(long pos)
{
try(TDTO dto = this.repo.getByKey(pos))
{
if (dto == null)
{
return this.makeEmptyDataSource(pos);
}
try
{
// load from database
return this.createDataSourceFromDto(dto);
}
catch (DataCorruptedException e)
{
this.tryLogCorruptedDataError(DhSectionPos.toString(pos), e);
this.repo.deleteWithKey(pos);
}
}
catch (InterruptedException ignore) { }
catch (IOException e)
{
LOGGER.warn("File read Error for pos ["+DhSectionPos.toString(pos)+"], error: "+e.getMessage(), e);
}
// an error occurred
return null;
}
protected void tryLogCorruptedDataError(String whereClause, Exception e)
{
// there's a rare issue where the exception doesn't
// have a message, which can cause problems
String message = (e.getMessage() == null) ? e.getMessage() : "No Error message for exception ["+e.getClass().getSimpleName()+"]";
// Only log each message type once.
// This is done to prevent logging "No compression mode with the value [2]" 10,000 times
// if the user is migrating from a nightly build and used ZStd.
if (CORRUPT_DATA_ERRORS_LOGGED.add(message))
{
LOGGER.warn("Corrupted data found at [" + whereClause + "]. Data at will be deleted so it can be re-generated to prevent issues. Future errors with this same message won't be logged. Error: [" + message + "].", e);
}
}
//===============//
// data updating //
//===============//
/**
* Can be used if the same thread is already handling IO and/or LOD generation.
* Otherwise the async version {@link AbstractDataSourceHandler#updateDataSourceAsync(FullDataSourceV2)} may be a better choice.
*/
public void updateDataSource(@NotNull FullDataSourceV2 inputDataSource)
{ this.updateDataSourceAtPos(inputDataSource.getPos(), inputDataSource, true); }
/**
* Can be used if you don't want to lock the current thread
* Otherwise the sync version {@link AbstractDataSourceHandler#updateDataSource(FullDataSourceV2)} may be a better choice.
*/
public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource)
{
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
}
try
{
this.markUpdateStart(inputDataSource.getPos());
return CompletableFuture.runAsync(() ->
{
try
{
this.updateDataSourceAtPos(inputDataSource.getPos(), inputDataSource, true);
}
catch (Exception e)
{
LOGGER.error("Unexpected error in async data source update at pos: ["+DhSectionPos.toString(inputDataSource.getPos())+"], error: ["+e.getMessage()+"].", e);
}
finally
{
this.markUpdateEnd(inputDataSource.getPos());
}
}, executor);
}
catch (RejectedExecutionException ignore)
{
// can happen if the executor was shutdown while this task was queued
this.markUpdateEnd(inputDataSource.getPos());
return CompletableFuture.completedFuture(null);
}
}
/**
* After this method returns the inputData will be written to file.
*
* @param updatePos the position to update
*/
protected void updateDataSourceAtPos(long updatePos, @NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos)
{
boolean methodLocked = false;
// a lock is necessary to prevent two threads from writing to the same position at once,
// if that happens only the second update will apply and the LOD will end up with hole(s)
ReentrantLock updateLock = this.updateLockProvider.getLock(updatePos);
try
{
if (lockOnUpdatePos)
{
methodLocked = true;
updateLock.lock();
this.lockedPosSet.add(updatePos);
}
// get or create the data source
try (TDataSource recipientDataSource = this.get(updatePos))
{
if (recipientDataSource != null)
{
boolean dataModified = recipientDataSource.update(inputData);
if (dataModified)
{
// save the updated data to the database
try (TDTO dto = this.createDtoFromDataSource(recipientDataSource))
{
this.repo.save(dto);
}
for (IDataSourceUpdateListenerFunc<TDataSource> listener : this.dateSourceUpdateListeners)
{
if (listener != null)
{
listener.OnDataSourceUpdated(recipientDataSource);
}
}
}
}
}
}
catch (Exception e)
{
LOGGER.error("Error updating pos ["+DhSectionPos.toString(updatePos)+"], error: "+e.getMessage(), e);
}
finally
{
if (methodLocked)
{
updateLock.unlock();
this.lockedPosSet.remove(updatePos);
}
}
}
//==================//
// debugger methods //
//==================//
/** used for debugging to track which positions are queued for updating */
private void markUpdateStart(long dataSourcePos)
{
this.queuedUpdateCountsByPos.compute(dataSourcePos, (pos, atomicCount) ->
{
if (atomicCount == null)
{
atomicCount = new AtomicInteger(0);
}
atomicCount.incrementAndGet();
return atomicCount;
});
}
/** used for debugging to track which positions are queued for updating */
private void markUpdateEnd(long dataSourcePos)
{
this.queuedUpdateCountsByPos.compute(dataSourcePos, (pos, atomicCount) ->
{
if (atomicCount != null && atomicCount.decrementAndGet() <= 0)
{
atomicCount = null;
}
return atomicCount;
});
}
//=========//
// cleanup //
//=========//
@Override
public void close()
{
try
{
this.closeLock.lock();
this.isShutdown = true;
// wait a moment so any queued saves can finish queuing,
// otherwise we might not see everything that needs saving and attempt to use a closed repo
Thread.sleep(200);
LOGGER.info("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.level + "].");
this.repo.close();
}
catch (InterruptedException ignore) { }
finally
{
this.closeLock.unlock();
}
}
//================//
// helper classes //
//================//
@FunctionalInterface
public interface IDataSourceUpdateListenerFunc<TDataSource>
{
void OnDataSourceUpdated(TDataSource updatedFullDataSource);
}
}
@@ -1,814 +0,0 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.file.fullDatafile;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.api.internal.ClientApi;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV1;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.file.AbstractDataSourceHandler;
import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import com.seibel.distanthorizons.core.logging.DhLogger;
import org.jetbrains.annotations.Nullable;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
* Handles reading/writing {@link FullDataSourceV2}
* to and from the database.
*/
public class FullDataSourceProviderV2
extends AbstractDataSourceHandler<FullDataSourceV2, FullDataSourceV2DTO, FullDataSourceV2Repo, IDhLevel>
implements IDebugRenderable
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
protected static final int NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD = 5;
/** how many parent update tasks can be in the queue at once */
protected static int getMaxUpdateTaskCount() { return NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD* Config.Common.MultiThreading.numberOfThreads.get(); }
/** indicates how long the update queue thread should wait between queuing ticks */
protected static final int UPDATE_QUEUE_THREAD_DELAY_IN_MS = 250;
/** how many data sources should be pulled down for migration at once */
private static final int MIGRATION_BATCH_COUNT = NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD;
/**
* 5 minutes <br>
* This should be much longer than any update should take. This is just
* to make sure the thread doesn't get stuck.
*/
private static final int MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS = 5 * 60 * 1_000;
/**
* Interrupting the migration thread pool doesn't work well and may corrupt the database
* vs gracefully shutting down the thread ourselves.
*/
protected final AtomicBoolean migrationThreadRunning = new AtomicBoolean(true);
protected final FullDataSourceProviderV1<IDhLevel> legacyFileHandler;
protected boolean migrationStartMessageQueued = false;
protected long legacyDeletionCount = -1;
protected long migrationCount = -1;
protected boolean migrationStoppedWithError = false;
/**
* Tracks which positions are currently being updated
* to prevent duplicate concurrent updates.
*/
public final Set<Long> updatingPosSet = ConcurrentHashMap.newKeySet();
// TODO only run thread if modifications happened recently
/**
* This isn't in {@link AbstractDataSourceHandler} since we only want to update
* the newest version of the full data, so if we have providers for either
* render data or old full data, we don't want to update them. <br><br>
*
* Will be null on the dedicated server since updates don't need to be propagated,
* only the highest detail level is needed.
*/
@Nullable
private final ThreadPoolExecutor updateQueueProcessor;
//=============//
// constructor //
//=============//
public FullDataSourceProviderV2(IDhLevel level, ISaveStructure saveStructure) { this(level, saveStructure, null); }
public FullDataSourceProviderV2(IDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride)
{
super(level, saveStructure, saveDirOverride);
this.legacyFileHandler = new FullDataSourceProviderV1<>(level, saveStructure, saveDirOverride);
DebugRenderer.register(this, Config.Client.Advanced.Debugging.DebugWireframe.showFullDataUpdateStatus);
String levelId = level.getLevelWrapper().getDhIdentifier();
// start migrating any legacy data sources present in the background
ThreadPoolExecutor executor = ThreadPoolUtil.getFullDataMigrationExecutor();
if (executor != null)
{
executor.execute(this::convertLegacyDataSources);
}
else
{
// shouldn't happen, but just in case
LOGGER.error("Unable to start migration for level: ["+levelId+"] due to missing executor.");
}
// update propagation doesn't need to be run on the server since only the highest detail level is needed
this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue [" + levelId + "]");
this.updateQueueProcessor.execute(this::runUpdateQueue);
}
//====================//
// Abstract overrides //
//====================//
@Override
protected FullDataSourceV2Repo createRepo()
{
try
{
return new FullDataSourceV2Repo(AbstractDhRepo.DEFAULT_DATABASE_TYPE, new File(this.saveDir.getPath() + File.separator + ISaveStructure.DATABASE_NAME));
}
catch (SQLException e)
{
// should only happen if there is an issue with the database (it's locked or the folder path is missing)
// or the database update failed
throw new RuntimeException(e);
}
}
@Override
protected FullDataSourceV2DTO createDtoFromDataSource(FullDataSourceV2 dataSource)
{
try
{
// when creating new data use the compressor currently selected in the config
EDhApiDataCompressionMode compressionModeEnum = Config.Common.LodBuilding.dataCompression.get();
return FullDataSourceV2DTO.CreateFromDataSource(dataSource, compressionModeEnum);
}
catch (IOException e)
{
LOGGER.warn("Unable to create DTO, error: "+e.getMessage(), e);
return null;
}
}
@Override
protected FullDataSourceV2 createDataSourceFromDto(FullDataSourceV2DTO dto) throws InterruptedException, IOException, DataCorruptedException
{ return dto.createDataSource(this.level.getLevelWrapper()); }
@Override
protected FullDataSourceV2 makeEmptyDataSource(long pos)
{ return FullDataSourceV2.createEmpty(pos); }
//================//
// parent updates //
//================//
private void runUpdateQueue()
{
while (!Thread.interrupted())
{
try
{
Thread.sleep(UPDATE_QUEUE_THREAD_DELAY_IN_MS);
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getUpdatePropagatorExecutor();
if (executor == null || executor.isTerminated())
{
continue;
}
// TODO it might be worth skipping this logic if no parent updates happened
// update positions closest to the player (if not on a server)
// to make world gen appear faster
DhBlockPos targetBlockPos = DhBlockPos.ZERO;
if (MC_CLIENT != null && MC_CLIENT.playerExists())
{
targetBlockPos = MC_CLIENT.getPlayerBlockPos();
}
this.runParentUpdates(executor, targetBlockPos);
if (Config.Common.LodBuilding.Experimental.upsampleLowerDetailLodsToFillHoles.get())
{
this.runChildUpdates(executor, targetBlockPos);
}
}
catch (InterruptedException ignored)
{
Thread.currentThread().interrupt();
}
catch (Exception e)
{
LOGGER.error("Unexpected error in the parent update queue thread. Error: " + e.getMessage(), e);
}
}
LOGGER.info("Update thread ["+Thread.currentThread().getName()+"] terminated.");
}
/** will always apply updates */
private void runParentUpdates(PriorityTaskPicker.Executor executor, DhBlockPos targetBlockPos)
{
int maxUpdateTaskCount = getMaxUpdateTaskCount();
// queue parent updates
if (executor.getQueueSize() < maxUpdateTaskCount
&& this.updatingPosSet.size() < maxUpdateTaskCount)
{
// get the positions that need to be applied to their parents
LongArrayList parentUpdatePosList = this.repo.getPositionsToUpdate(targetBlockPos.getX(), targetBlockPos.getZ(), maxUpdateTaskCount);
// combine updates together based on their parent
HashMap<Long, HashSet<Long>> updatePosByParentPos = new HashMap<>();
for (Long pos : parentUpdatePosList)
{
updatePosByParentPos.compute(DhSectionPos.getParentPos(pos), (parentPos, updatePosSet) ->
{
if (updatePosSet == null)
{
updatePosSet = new HashSet<>();
}
updatePosSet.add(pos);
return updatePosSet;
});
}
// queue the updates
for (Long parentUpdatePos : updatePosByParentPos.keySet())
{
// stop if there are already a bunch of updates queued
if (this.updatingPosSet.size() > maxUpdateTaskCount
|| executor.getQueueSize() > maxUpdateTaskCount
|| !this.updatingPosSet.add(parentUpdatePos))
{
break;
}
try
{
executor.execute(() ->
{
ReentrantLock parentWriteLock = this.updateLockProvider.getLock(parentUpdatePos);
boolean parentLocked = false;
try
{
//LOGGER.info("updating parent: "+parentUpdatePos);
// Locking the parent before the children should prevent deadlocks.
// TryLock is used instead of lock so this thread can handle a different update.
if (parentWriteLock.tryLock())
{
parentLocked = true;
this.lockedPosSet.add(parentUpdatePos);
try (FullDataSourceV2 parentDataSource = this.get(parentUpdatePos))
{
// will return null if the file handler is shutting down
if (parentDataSource != null)
{
// apply each child pos to the parent
for (Long childPos : updatePosByParentPos.get(parentUpdatePos))
{
ReentrantLock childReadLock = this.updateLockProvider.getLock(childPos);
try
{
childReadLock.lock();
this.lockedPosSet.add(childPos);
try (FullDataSourceV2 childDataSource = this.get(childPos))
{
// can return null when the file handler is being shut down
if (childDataSource != null)
{
parentDataSource.update(childDataSource);
}
}
}
catch (Exception e)
{
LOGGER.error("Unexpected in parent update propagation for parent pos: ["+DhSectionPos.toString(parentUpdatePos)+"], child pos: [" + DhSectionPos.toString(parentUpdatePos) + "], Error: [" + e.getMessage() + "].", e);
}
finally
{
childReadLock.unlock();
this.lockedPosSet.remove(childPos);
}
}
if (DhSectionPos.getDetailLevel(parentUpdatePos) < TOP_SECTION_DETAIL_LEVEL)
{
parentDataSource.applyToParent = true;
}
this.updateDataSourceAtPos(parentUpdatePos, parentDataSource, false);
for (Long childPos : updatePosByParentPos.get(parentUpdatePos))
{
this.repo.setApplyToParent(childPos, false);
}
}
}
}
}
finally
{
if (parentLocked)
{
parentWriteLock.unlock();
this.lockedPosSet.remove(parentUpdatePos);
}
this.updatingPosSet.remove(parentUpdatePos);
}
});
}
catch (RejectedExecutionException ignore)
{ /* the executor was shut down, it should be back up shortly and able to accept new jobs */ }
catch (Exception e)
{
this.updatingPosSet.remove(parentUpdatePos);
throw e;
}
}
}
}
/** stops if it finds any LOD data */
private void runChildUpdates(PriorityTaskPicker.Executor executor, DhBlockPos targetBlockPos)
{
int maxUpdateTaskCount = getMaxUpdateTaskCount();
// queue child updates
if (executor.getQueueSize() < maxUpdateTaskCount
&& this.updatingPosSet.size() < maxUpdateTaskCount)
{
// get the positions that need to be applied to their children
LongArrayList childUpdatePosList = this.repo.getChildPositionsToUpdate(targetBlockPos.getX(), targetBlockPos.getZ(), maxUpdateTaskCount);
// queue the updates
for (long parentUpdatePos : childUpdatePosList)
{
// stop if there are already a bunch of updates queued
if (this.updatingPosSet.size() > maxUpdateTaskCount
|| executor.getQueueSize() > maxUpdateTaskCount)
{
break;
}
// skip already updating positions
if (!this.updatingPosSet.add(parentUpdatePos))
{
continue;
}
try
{
executor.execute(() ->
{
ReentrantLock parentReadLock = this.updateLockProvider.getLock(parentUpdatePos);
boolean parentLocked = false;
try
{
//LOGGER.info("updating parent: "+parentUpdatePos);
// Locking the parent before the children should prevent deadlocks.
// TryLock is used instead of lock so this thread can handle a different update.
if (parentReadLock.tryLock())
{
parentLocked = true;
this.lockedPosSet.add(parentUpdatePos);
try (FullDataSourceV2 parentDataSource = this.get(parentUpdatePos))
{
// will return null if the file handler is shutting down
if (parentDataSource != null)
{
// apply parent to each child
for (int i = 0; i < 4; i++)
{
long childPos = DhSectionPos.getChildByIndex(parentUpdatePos, i);
ReentrantLock childWriteLock = this.updateLockProvider.getLock(childPos);
try
{
childWriteLock.lock();
this.lockedPosSet.add(childPos);
try (FullDataSourceV2 childDataSource = this.get(childPos))
{
// will return null if the file handler is shutting down
if (childDataSource != null)
{
childDataSource.update(parentDataSource);
// don't propagate child updates past the bottom of the tree
if (DhSectionPos.getDetailLevel(childPos) != DhSectionPos.SECTION_BLOCK_DETAIL_LEVEL)
{
childDataSource.applyToChildren = true;
}
this.updateDataSourceAtPos(childPos, childDataSource, false);
}
}
}
catch (Exception e)
{
LOGGER.error("Unexpected in child update propagation for parent pos: ["+DhSectionPos.toString(parentUpdatePos)+"], child pos: [" + DhSectionPos.toString(parentUpdatePos) + "], Error: [" + e.getMessage() + "].", e);
}
finally
{
childWriteLock.unlock();
this.lockedPosSet.remove(childPos);
}
}
this.repo.setApplyToChild(parentUpdatePos, false);
}
}
}
}
finally
{
if (parentLocked)
{
parentReadLock.unlock();
this.lockedPosSet.remove(parentUpdatePos);
}
this.updatingPosSet.remove(parentUpdatePos);
}
});
}
catch (RejectedExecutionException ignore)
{ /* the executor was shut down, it should be back up shortly and able to accept new jobs */ }
catch (Exception e)
{
this.updatingPosSet.remove(parentUpdatePos);
throw e;
}
}
}
}
//=======================//
// data source migration //
//=======================//
private void convertLegacyDataSources()
{
try
{
String levelId = this.level.getLevelWrapper().getDhIdentifier();
LOGGER.info("Attempting to migrate data sources for: [" + levelId + "]-[" + this.saveDir + "]...");
this.migrationThreadRunning.set(true);
//============================//
// delete unused data sources //
//============================//
// this could be done all at once via SQL,
// but doing it in chunks prevents locking the database for long periods of time
long unusedCount = 0;
long totalDeleteCount = this.legacyFileHandler.repo.getUnusedDataSourceCount();
if (totalDeleteCount != 0)
{
// this should only be shown once per session but should be shown during
// either when the deletion or migration phases start
this.showMigrationStartMessage();
LOGGER.info("deleting [" + levelId + "] - [" + totalDeleteCount + "] unused data sources...");
this.legacyDeletionCount = totalDeleteCount;
ArrayList<String> unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50);
while (unusedDataPosList.size() != 0)
{
unusedCount += unusedDataPosList.size();
this.legacyDeletionCount -= unusedDataPosList.size();
long startTime = System.currentTimeMillis();
// delete batch and get next batch
this.legacyFileHandler.repo.deleteUnusedLegacyData(unusedDataPosList);
unusedDataPosList = this.legacyFileHandler.repo.getUnusedDataSourcePositionStringList(50);
long endStart = System.currentTimeMillis();
long deleteTime = endStart - startTime;
LOGGER.info("Deleting [" + levelId + "] - [" + unusedCount + "/" + totalDeleteCount + "] in [" + deleteTime + "]ms ...");
// a slight delay is added to prevent accidentally locking the database when deleting a lot of rows
// (that shouldn't be the case since we're using WAL journaling, but just in case)
try
{
// use the delete time so we don't make powerful computers wait super long
// and weak computers wait no time at all
Thread.sleep(deleteTime / 2);
}
catch (InterruptedException ignore)
{
}
}
LOGGER.info("Done deleting [" + levelId + "] - [" + totalDeleteCount + "] unused data sources.");
}
//===========//
// migration //
//===========//
long totalMigrationCount = this.legacyFileHandler.getDataSourceMigrationCount();
this.migrationCount = totalMigrationCount;
LOGGER.info("Found [" + totalMigrationCount + "] data sources that need migration.");
ArrayList<FullDataSourceV1> legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
if (!legacyDataSourceList.isEmpty())
{
this.showMigrationStartMessage();
try
{
// keep going until every data source has been migrated
int progressCount = 0;
while (!legacyDataSourceList.isEmpty() && this.migrationThreadRunning.get())
{
NumberFormat numFormat = F3Screen.NUMBER_FORMAT;
LOGGER.info("Migrating [" + levelId + "] - [" + numFormat.format(progressCount) + "/" + numFormat.format(totalMigrationCount) + "]...");
ArrayList<CompletableFuture<Void>> updateFutureList = new ArrayList<>();
for (int i = 0; i < legacyDataSourceList.size() && this.migrationThreadRunning.get(); i++)
{
FullDataSourceV1 legacyDataSource = legacyDataSourceList.get(i);
try
{
// convert the legacy data source to the new format,
// this is a relatively cheap operation
FullDataSourceV2 newDataSource = FullDataSourceV2.createFromLegacyDataSourceV1(legacyDataSource);
newDataSource.applyToParent = true;
// the actual update process can be moderately expensive due to having to update
// the render data along with the full data, so running it async on the update threads gains us a good bit of speed
CompletableFuture<Void> future = this.updateDataSourceAsync(newDataSource);
updateFutureList.add(future);
future.thenRun(() ->
{
// after the update finishes the legacy data source can be safely deleted
this.legacyFileHandler.repo.deleteWithKey(legacyDataSource.getPos());
newDataSource.close();
});
}
catch (Exception e)
{
Long migrationPos = legacyDataSource.getPos();
LOGGER.warn("Unexpected issue migrating data source at pos [" + DhSectionPos.toString(migrationPos) + "]. Error: " + e.getMessage(), e);
this.legacyFileHandler.markMigrationFailed(migrationPos);
}
}
try
{
// wait for each thread to finish updating
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(updateFutureList.toArray(new CompletableFuture[0]));
combinedFutures.get(MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | TimeoutException e)
{
LOGGER.warn("Migration update timed out after [" + MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS + "] milliseconds. Migration will re-try the same positions again in a moment.", e);
}
catch (ExecutionException e)
{
LOGGER.warn("Migration update failed. Migration will re-try the same positions again. Error:" + e.getMessage(), e);
}
legacyDataSourceList = this.legacyFileHandler.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
progressCount += legacyDataSourceList.size();
this.migrationCount -= legacyDataSourceList.size();
}
}
catch (Exception e)
{
LOGGER.info("migration stopped due to error for: [" + levelId + "]-[" + this.saveDir + "], error: [" + e.getMessage() + "].", e);
this.showMigrationEndMessage(false);
this.migrationStoppedWithError = true;
}
finally
{
if (this.migrationThreadRunning.get())
{
LOGGER.info("migration complete for: [" + levelId + "]-[" + this.saveDir + "].");
this.showMigrationEndMessage(true);
this.migrationCount = 0;
}
else
{
LOGGER.info("migration stopped for: [" + levelId + "]-[" + this.saveDir + "].");
this.showMigrationEndMessage(false);
this.migrationStoppedWithError = true;
}
}
}
else
{
LOGGER.info("No migration necessary.");
}
}
finally
{
this.migrationThreadRunning.set(false);
}
}
public long getLegacyDeletionCount() { return this.legacyDeletionCount; }
public long getTotalMigrationCount() { return this.migrationCount; }
public boolean getMigrationStoppedWithError() { return this.migrationStoppedWithError; }
private void showMigrationStartMessage()
{
if (this.migrationStartMessageQueued)
{
return;
}
this.migrationStartMessageQueued = true;
String levelId = this.level.getLevelWrapper().getDhIdentifier();
ClientApi.INSTANCE.showChatMessageNextFrame(
"Old Distant Horizons data is being migrated for ["+levelId+"]. \n" +
"While migrating LODs may load slowly \n" +
"and DH world gen will be disabled. \n" +
"You can see migration progress in the F3 menu."
);
}
private void showMigrationEndMessage(boolean success)
{
String levelId = this.level.getLevelWrapper().getDhIdentifier();
if (success)
{
ClientApi.INSTANCE.showChatMessageNextFrame("Distant Horizons data migration for ["+levelId+"] completed.");
}
else
{
ClientApi.INSTANCE.showChatMessageNextFrame(
"Distant Horizons data migration for ["+levelId+"] stopped. \n" +
"Some data may not have been migrated."
);
}
}
//=======================//
// retrieval (world gen) //
//=======================//
/**
* Returns true if this provider can generate or retrieve
* {@link FullDataSourceV2}'s that aren't currently in the database.
*/
public boolean canRetrieveMissingDataSources()
{
// the base handler just handles basic reading/writing
// to the database and as such can't retrieve anything else.
return false;
}
/**
* Returns false if this provider isn't accepting new requests,
* this can be due to having a full queue or some other
* limiting factor. <br><br>
*
* Note: when overriding make sure to add: <br>
* <code>
* if (!super.canQueueRetrieval()) <br>
* { <br>
* return false; <br>
* } <br>
* </code>
* to the beginning of your override.
* Otherwise, parent retrieval limits will be ignored.
*/
public boolean canQueueRetrieval()
{
// Retrieval shouldn't happen while an unknown number of
// legacy data sources are present.
// If retrieval was allowed we might run into concurrency issues.
return !this.migrationThreadRunning.get();
}
/**
* @return null if this provider can't generate any positions and
* an empty array if all positions were generated
*/
@Nullable
public LongArrayList getPositionsToRetrieve(Long pos) { return null; }
/** @return true if the position was queued, false if not */
@Nullable
public CompletableFuture<WorldGenResult> queuePositionForRetrieval(Long genPos) { return null; }
/** does nothing if the given position isn't present in the queue */
public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf) { }
public void clearRetrievalQueue() { }
/** Can be used to display how many total retrieval requests might be available. */
public void setTotalRetrievalPositionCount(int newCount) { }
/** Can be used to display how many total chunk retrieval requests should be available. */
public void setEstimatedRemainingRetrievalChunkCount(int newCount) { }
public boolean fileExists(long pos) { return this.repo.getDataSizeInBytes(pos) > 0; }
//========================//
// multiplayer networking //
//========================//
@Nullable
public Long getTimestampForPos(long pos)
{ return this.repo.getTimestampForPos(pos); }
//===========//
// overrides //
//===========//
@Override
public void debugRender(DebugRenderer renderer)
{
this.lockedPosSet
.forEach((pos) -> { renderer.renderBox(new DebugRenderer.Box(pos, -32f, 74f, 0.15f, Color.PINK)); });
this.queuedUpdateCountsByPos
.forEach((pos, updateCountRef) -> { renderer.renderBox(new DebugRenderer.Box(pos, -32f, 80f + (updateCountRef.get() * 16f), 0.20f, Color.WHITE)); });
this.updatingPosSet
.forEach((pos) -> { renderer.renderBox(new DebugRenderer.Box(pos, -32f, 80f, 0.20f, Color.MAGENTA)); });
}
@Override
public void close()
{
super.close();
if (this.updateQueueProcessor != null)
{
this.updateQueueProcessor.shutdownNow();
}
this.legacyFileHandler.close();
this.migrationThreadRunning.set(false);
}
}
@@ -23,6 +23,8 @@ import com.seibel.distanthorizons.api.enums.worldGeneration.EDhApiWorldGeneratio
import com.seibel.distanthorizons.core.api.internal.SharedApi;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataUpdatePropagatorV2;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.generation.DhLightingEngine;
import com.seibel.distanthorizons.core.generation.IFullDataSourceRetrievalQueue;
@@ -41,7 +43,6 @@ import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import com.seibel.distanthorizons.core.logging.DhLogger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -80,7 +81,16 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
//=============//
public GeneratedFullDataSourceProvider(IDhLevel level, ISaveStructure saveStructure) { super(level, saveStructure); }
public GeneratedFullDataSourceProvider(IDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride) { super(level, saveStructure, saveDirOverride); }
public GeneratedFullDataSourceProvider(IDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride)
{
super(level, saveStructure, saveDirOverride);
this.addDataSourceUpdateListener((@NotNull FullDataSourceV2 updatedData) ->
{
this.onWorldGenTaskComplete(WorldGenResult.CreateSuccess(updatedData.getPos()), null);
});
}
@@ -177,7 +187,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
{
boolean oldQueueExists = this.worldGenQueueRef.compareAndSet(null, newWorldGenQueue);
LodUtil.assertTrue(oldQueueExists, "previous world gen queue is still here!");
LOGGER.info("Set world gen queue for level [" + this.level.getLevelWrapper().getDhIdentifier() + "].");
LOGGER.info("Set world gen queue for level [" + this.levelId + "].");
}
@Override
@@ -213,7 +223,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
PriorityTaskPicker.Executor renderLoadExecutor = ThreadPoolUtil.getRenderLoadingExecutor();
if (renderLoadExecutor == null
|| renderLoadExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2)
|| renderLoadExecutor.getQueueSize() >= FullDataUpdatePropagatorV2.getMaxPropagateTaskCount() / 2)
{
// don't queue additional world gen requests if the render loader handler is overwhelmed,
// otherwise LODs may not load in properly
@@ -222,7 +232,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
PriorityTaskPicker.Executor fileHandlerExecutor = ThreadPoolUtil.getFileHandlerExecutor();
if (fileHandlerExecutor == null
|| fileHandlerExecutor.getQueueSize() >= getMaxUpdateTaskCount() / 2)
|| fileHandlerExecutor.getQueueSize() >= FullDataUpdatePropagatorV2.getMaxPropagateTaskCount() / 2)
{
// don't queue additional world gen requests if the file handler is overwhelmed,
// otherwise LODs may not load in properly
@@ -294,17 +304,6 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
return worldGenFuture;
}
@Override
protected void updateDataSourceAtPos(long updatePos, @NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos)
{
super.updateDataSourceAtPos(updatePos, inputData, lockOnUpdatePos);
//if (SharedApi.getEnvironment() != EWorldEnvironment.CLIENT_ONLY)
// LOGGER.info("updated ["+DhSectionPos.toString(updatePos)+"]");
this.onWorldGenTaskComplete(WorldGenResult.CreateSuccess(updatePos), null);
}
@Override
public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf)
{
@@ -0,0 +1,7 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
@FunctionalInterface
public interface IDataSourceUpdateListenerFunc<TDataSource>
{
void OnDataSourceUpdated(TDataSource updatedFullDataSource);
}
@@ -1,4 +1,4 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
package com.seibel.distanthorizons.core.file.fullDatafile.V1;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV1;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
@@ -43,10 +43,10 @@ public class FullDataSourceProviderV1<TDhLevel extends IDhLevel>
// constructor //
//=============//
public FullDataSourceProviderV1(TDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride)
public FullDataSourceProviderV1(TDhLevel level, File saveDir)
{
this.level = level;
this.saveDir = (saveDirOverride == null) ? saveStructure.getSaveFolder(level.getLevelWrapper()) : saveDirOverride;
this.saveDir = saveDir;
if (!this.saveDir.exists() && !this.saveDir.mkdirs())
{
LOGGER.warn("Unable to create full data folder, file saving may fail.");
@@ -0,0 +1,346 @@
package com.seibel.distanthorizons.core.file.fullDatafile.V2;
import com.seibel.distanthorizons.core.api.internal.ClientApi;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV1;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V1.FullDataSourceProviderV1;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import java.io.File;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class DataMigratorV1 implements IDebugRenderable, AutoCloseable
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
/** how many data sources should be pulled down for migration at once */
private static final int MIGRATION_BATCH_COUNT = FullDataUpdatePropagatorV2.NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD;
/**
* 5 minutes <br>
* This should be much longer than any update should take. This is just
* to make sure the thread doesn't get stuck.
*/
private static final int MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS = 5 * 60 * 1_000;
private final FullDataUpdaterV2 dataUpdater;
private boolean migrationStartMessageQueued = false;
private long legacyDeletionCount = -1;
private long migrationCount = -1;
private boolean migrationStoppedWithError = false;
/**
* Interrupting the migration thread pool doesn't work well and may corrupt the database
* vs gracefully shutting down the thread ourselves.
*/
public final AtomicBoolean migrationThreadRunning = new AtomicBoolean(true);
private final FullDataSourceProviderV1<IDhLevel> v1DataSourceProvider;
private final String levelId;
private final File saveDir;
//=============//
// constructor //
//=============//
public DataMigratorV1(
FullDataUpdaterV2 dataUpdater,
IDhLevel level, String levelId, File saveDir)
{
this.dataUpdater = dataUpdater;
this.saveDir = saveDir;
this.v1DataSourceProvider = new FullDataSourceProviderV1<>(level, saveDir);
this.levelId = levelId;
// start migrating any legacy data sources present in the background
ThreadPoolExecutor executor = ThreadPoolUtil.getFullDataMigrationExecutor();
if (executor != null)
{
executor.execute(this::convertLegacyDataSources);
}
else
{
// shouldn't happen, but just in case
LOGGER.error("Unable to start migration for level: ["+this.levelId+"] due to missing executor.");
}
}
//=======================//
// data source migration //
//=======================//
private void convertLegacyDataSources()
{
try
{
LOGGER.debug("Attempting to migrate data sources for: [" + this.levelId + "]-[" + this.saveDir + "]...");
this.migrationThreadRunning.set(true);
//============================//
// delete unused data sources //
//============================//
// this could be done all at once via SQL,
// but doing it in chunks prevents locking the database for long periods of time
long unusedCount = 0;
long totalDeleteCount = this.v1DataSourceProvider.repo.getUnusedDataSourceCount();
if (totalDeleteCount != 0)
{
// this should only be shown once per session but should be shown during
// either when the deletion or migration phases start
this.showMigrationStartMessage();
LOGGER.info("deleting [" + this.levelId + "] - [" + totalDeleteCount + "] unused data sources...");
this.legacyDeletionCount = totalDeleteCount;
ArrayList<String> unusedDataPosList = this.v1DataSourceProvider.repo.getUnusedDataSourcePositionStringList(50);
while (unusedDataPosList.size() != 0)
{
unusedCount += unusedDataPosList.size();
this.legacyDeletionCount -= unusedDataPosList.size();
long startTime = System.currentTimeMillis();
// delete batch and get next batch
this.v1DataSourceProvider.repo.deleteUnusedLegacyData(unusedDataPosList);
unusedDataPosList = this.v1DataSourceProvider.repo.getUnusedDataSourcePositionStringList(50);
long endStart = System.currentTimeMillis();
long deleteTime = endStart - startTime;
LOGGER.info("Deleting [" + this.levelId + "] - [" + unusedCount + "/" + totalDeleteCount + "] in [" + deleteTime + "]ms ...");
// a slight delay is added to prevent accidentally locking the database when deleting a lot of rows
// (that shouldn't be the case since we're using WAL journaling, but just in case)
try
{
// use the delete time so we don't make powerful computers wait super long
// and weak computers wait no time at all
Thread.sleep(deleteTime / 2);
}
catch (InterruptedException ignore)
{
}
}
LOGGER.info("Done deleting [" + this.levelId + "] - [" + totalDeleteCount + "] unused data sources.");
}
//===========//
// migration //
//===========//
long totalMigrationCount = this.v1DataSourceProvider.getDataSourceMigrationCount();
this.migrationCount = totalMigrationCount;
LOGGER.debug("Found [" + totalMigrationCount + "] data sources that need migration.");
ArrayList<FullDataSourceV1> legacyDataSourceList = this.v1DataSourceProvider.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
if (!legacyDataSourceList.isEmpty())
{
this.showMigrationStartMessage();
try
{
// keep going until every data source has been migrated
int progressCount = 0;
while (!legacyDataSourceList.isEmpty() && this.migrationThreadRunning.get())
{
NumberFormat numFormat = F3Screen.NUMBER_FORMAT;
LOGGER.info("Migrating [" + this.levelId + "] - [" + numFormat.format(progressCount) + "/" + numFormat.format(totalMigrationCount) + "]...");
ArrayList<CompletableFuture<Void>> updateFutureList = new ArrayList<>();
for (int i = 0; i < legacyDataSourceList.size() && this.migrationThreadRunning.get(); i++)
{
FullDataSourceV1 legacyDataSource = legacyDataSourceList.get(i);
try
{
// convert the legacy data source to the new format,
// this is a relatively cheap operation
FullDataSourceV2 newDataSource = FullDataSourceV2.createFromLegacyDataSourceV1(legacyDataSource);
newDataSource.applyToParent = true;
// the actual update process can be moderately expensive due to having to update
// the render data along with the full data, so running it async on the update threads gains us a good bit of speed
CompletableFuture<Void> future = this.dataUpdater.updateDataSourceAsync(newDataSource);
updateFutureList.add(future);
future.thenRun(() ->
{
// after the update finishes the legacy data source can be safely deleted
this.v1DataSourceProvider.repo.deleteWithKey(legacyDataSource.getPos());
newDataSource.close();
});
}
catch (Exception e)
{
long migrationPos = legacyDataSource.getPos();
LOGGER.warn("Unexpected issue migrating data source at pos [" + DhSectionPos.toString(migrationPos) + "]. Error: " + e.getMessage(), e);
this.v1DataSourceProvider.markMigrationFailed(migrationPos);
}
}
try
{
// wait for each thread to finish updating
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(updateFutureList.toArray(new CompletableFuture[0]));
combinedFutures.get(MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | TimeoutException e)
{
LOGGER.warn("Migration update timed out after [" + MIGRATION_MAX_UPDATE_TIMEOUT_IN_MS + "] milliseconds. Migration will re-try the same positions again in a moment.", e);
}
catch (ExecutionException e)
{
LOGGER.warn("Migration update failed. Migration will re-try the same positions again. Error:" + e.getMessage(), e);
}
legacyDataSourceList = this.v1DataSourceProvider.getDataSourcesToMigrate(MIGRATION_BATCH_COUNT);
progressCount += legacyDataSourceList.size();
this.migrationCount -= legacyDataSourceList.size();
}
}
catch (Exception e)
{
LOGGER.info("migration stopped due to error for: [" + this.levelId + "]-[" + this.saveDir + "], error: [" + e.getMessage() + "].", e);
this.showMigrationEndMessage(false);
this.migrationStoppedWithError = true;
}
finally
{
if (this.migrationThreadRunning.get())
{
LOGGER.info("migration complete for: [" + this.levelId + "]-[" + this.saveDir + "].");
this.showMigrationEndMessage(true);
this.migrationCount = 0;
}
else
{
LOGGER.info("migration stopped for: [" + this.levelId + "]-[" + this.saveDir + "].");
this.showMigrationEndMessage(false);
this.migrationStoppedWithError = true;
}
}
}
else
{
LOGGER.info("No migration necessary.");
}
}
finally
{
this.migrationThreadRunning.set(false);
}
}
private void showMigrationStartMessage()
{
if (this.migrationStartMessageQueued)
{
return;
}
this.migrationStartMessageQueued = true;
ClientApi.INSTANCE.showChatMessageNextFrame(
"Old Distant Horizons data is being migrated for ["+this.levelId+"]. \n" +
"While migrating LODs may load slowly \n" +
"and DH world gen will be disabled. \n" +
"You can see migration progress in the F3 menu."
);
}
private void showMigrationEndMessage(boolean success)
{
if (success)
{
ClientApi.INSTANCE.showChatMessageNextFrame("Distant Horizons data migration for ["+this.levelId+"] completed.");
}
else
{
ClientApi.INSTANCE.showChatMessageNextFrame(
"Distant Horizons data migration for ["+this.levelId+"] stopped. \n" +
"Some data may not have been migrated."
);
}
}
//===========//
// debugging //
//===========//
public void addDebugMenuStringsToList(List<String> messageList)
{
// migration
boolean migrationErrored = this.migrationStoppedWithError;
if (!migrationErrored)
{
long legacyDeletionCount = this.legacyDeletionCount;
if (legacyDeletionCount > 0)
{
messageList.add(" Migrating - Deleting #: " + F3Screen.NUMBER_FORMAT.format(legacyDeletionCount));
}
long migrationCount = this.migrationCount;
if (migrationCount > 0)
{
messageList.add(" Migrating - Conversion #: " + F3Screen.NUMBER_FORMAT.format(migrationCount));
}
}
else
{
messageList.add(" Migration Failed");
}
}
//===========//
// overrides //
//===========//
@Override
public void debugRender(DebugRenderer renderer)
{
// nothing currently needed
}
@Override
public void close()
{
//LOGGER.info("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "].");
}
}
@@ -0,0 +1,396 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.file.fullDatafile.V2;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.IDataSourceUpdateListenerFunc;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* Handles reading/writing {@link FullDataSourceV2}
* to and from the database.
*/
public class FullDataSourceProviderV2 implements IDebugRenderable, AutoCloseable
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private static final Set<String> CORRUPT_DATA_ERRORS_LOGGED = Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* The highest numerical detail level possible.
* Used when determining which positions to update.
*
* @see FullDataSourceProviderV2#MIN_SECTION_DETAIL_LEVEL
*/
public static final byte TOP_SECTION_DETAIL_LEVEL
= DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL
+ LodUtil.REGION_DETAIL_LEVEL; // TODO how big do we need to go?
/**
* The lowest numerical detail level possible.
*
* @see FullDataSourceProviderV2#TOP_SECTION_DETAIL_LEVEL
*/
public static final byte MIN_SECTION_DETAIL_LEVEL = DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL;
protected final ReentrantLock closeLock = new ReentrantLock();
protected volatile boolean isShutdown = false;
protected final File saveDir;
public final FullDataSourceV2Repo repo;
protected final IDhLevel level;
protected final String levelId;
private final FullDataUpdaterV2 dataUpdater;
private final FullDataUpdatePropagatorV2 updatePropagator;
private final DataMigratorV1 dataMigratorV1;
//=============//
// constructor //
//=============//
public FullDataSourceProviderV2(IDhLevel level, ISaveStructure saveStructure) { this(level, saveStructure, null); }
public FullDataSourceProviderV2(IDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride)
{
this.saveDir = (saveDirOverride == null) ? saveStructure.getSaveFolder(level.getLevelWrapper()) : saveDirOverride;
this.repo = this.createRepo();
this.level = level;
this.levelId = this.level.getLevelWrapper().getDhIdentifier();
this.dataUpdater = new FullDataUpdaterV2(this, this.levelId);
this.updatePropagator = new FullDataUpdatePropagatorV2(this, this.dataUpdater, this.levelId);
this.dataMigratorV1 = new DataMigratorV1(this.dataUpdater, this.level, this.levelId, this.saveDir);
DebugRenderer.register(this, Config.Client.Advanced.Debugging.DebugWireframe.showFullDataUpdateStatus);
}
private FullDataSourceV2Repo createRepo()
{
try
{
return new FullDataSourceV2Repo(AbstractDhRepo.DEFAULT_DATABASE_TYPE, new File(this.saveDir.getPath() + File.separator + ISaveStructure.DATABASE_NAME));
}
catch (SQLException e)
{
// should only happen if there is an issue with the database (it's locked or the folder path is missing)
// or the database update failed
throw new RuntimeException(e);
}
}
//====================//
// Abstract overrides //
//====================//
public FullDataSourceV2DTO createDtoFromDataSource(FullDataSourceV2 dataSource)
{
try
{
// when creating new data use the compressor currently selected in the config
EDhApiDataCompressionMode compressionModeEnum = Config.Common.LodBuilding.dataCompression.get();
return FullDataSourceV2DTO.CreateFromDataSource(dataSource, compressionModeEnum);
}
catch (IOException e)
{
LOGGER.warn("Unable to create DTO, error: "+e.getMessage(), e);
return null;
}
}
protected FullDataSourceV2 createDataSourceFromDto(FullDataSourceV2DTO dto) throws InterruptedException, IOException, DataCorruptedException
{ return dto.createDataSource(this.level.getLevelWrapper()); }
protected FullDataSourceV2 makeEmptyDataSource(long pos)
{ return FullDataSourceV2.createEmpty(pos); }
//=================//
// event listeners //
//=================//
public void addDataSourceUpdateListener(IDataSourceUpdateListenerFunc<FullDataSourceV2> listener)
{
synchronized (this.dataUpdater)
{
this.dataUpdater.dateSourceUpdateListeners.add(listener);
}
}
public void removeDataSourceUpdateListener(IDataSourceUpdateListenerFunc<FullDataSourceV2> listener)
{
synchronized (this.dataUpdater)
{
this.dataUpdater.dateSourceUpdateListeners.add(listener);
}
}
//=========================//
// basic DataSource getter //
//=========================//
/**
* Returns the {@link FullDataSourceV2} for the given section position. <Br>
* The returned data source may be null if repo is in the process of shutting down. <Br> <Br>
*
* This call is concurrent. I.e. it supports being called by multiple threads at the same time.
*/
public CompletableFuture<FullDataSourceV2> getAsync(long pos)
{
AbstractExecutorService executor = ThreadPoolUtil.getFileHandlerExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
}
try
{
return CompletableFuture.supplyAsync(() -> this.get(pos), executor);
}
catch (RejectedExecutionException ignore)
{
// the thread pool was probably shut down because it's size is being changed, just wait a sec and it should be back
return CompletableFuture.completedFuture(null);
}
}
/**
* Should only be used in internal file handler methods where we are already running on a file handler thread.
* Can return null if the repo is in the process of being shut down
* @see FullDataSourceProviderV2#getAsync(long)
*/
@Nullable
public FullDataSourceV2 get(long pos)
{
try(FullDataSourceV2DTO dto = this.repo.getByKey(pos))
{
if (dto == null)
{
return this.makeEmptyDataSource(pos);
}
try
{
// load from database
return this.createDataSourceFromDto(dto);
}
catch (DataCorruptedException e)
{
this.tryLogCorruptedDataError(DhSectionPos.toString(pos), e);
this.repo.deleteWithKey(pos);
}
}
catch (InterruptedException ignore) { }
catch (IOException e)
{
LOGGER.warn("File read Error for pos ["+DhSectionPos.toString(pos)+"], error: "+e.getMessage(), e);
}
// an error occurred
return null;
}
protected void tryLogCorruptedDataError(String whereClause, Exception e)
{
// there's a rare issue where the exception doesn't
// have a message, which can cause problems
String message = (e.getMessage() == null) ? e.getMessage() : "No Error message for exception ["+e.getClass().getSimpleName()+"]";
// Only log each message type once.
// This is done to prevent logging "No compression mode with the value [2]" 10,000 times
// if the user is migrating from a nightly build and used ZStd.
if (CORRUPT_DATA_ERRORS_LOGGED.add(message))
{
LOGGER.warn("Corrupted data found at [" + whereClause + "]. Data at will be deleted so it can be re-generated to prevent issues. Future errors with this same message won't be logged. Error: [" + message + "].", e);
}
}
//=======================//
// retrieval (world gen) //
//=======================//
/**
* Returns true if this provider can generate or retrieve
* {@link FullDataSourceV2}'s that aren't currently in the database.
*/
public boolean canRetrieveMissingDataSources()
{
// the base handler just handles basic reading/writing
// to the database and as such can't retrieve anything else.
return false;
}
/**
* Returns false if this provider isn't accepting new requests,
* this can be due to having a full queue or some other
* limiting factor. <br><br>
*
* Note: when overriding make sure to add: <br>
* <code>
* if (!super.canQueueRetrieval()) <br>
* { <br>
* return false; <br>
* } <br>
* </code>
* to the beginning of your override.
* Otherwise, parent retrieval limits will be ignored.
*/
public boolean canQueueRetrieval()
{
// Retrieval shouldn't happen while an unknown number of
// legacy data sources are present.
// If retrieval was allowed we might run into concurrency issues.
return !this.dataMigratorV1.migrationThreadRunning.get();
}
/**
* @return null if this provider can't generate any positions and
* an empty array if all positions were generated
*/
@Nullable
public LongArrayList getPositionsToRetrieve(Long pos) { return null; }
/** @return true if the position was queued, false if not */
@Nullable
public CompletableFuture<WorldGenResult> queuePositionForRetrieval(Long genPos) { return null; }
/** does nothing if the given position isn't present in the queue */
public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf) { }
public void clearRetrievalQueue() { }
/** Can be used to display how many total retrieval requests might be available. */
public void setTotalRetrievalPositionCount(int newCount) { }
/** Can be used to display how many total chunk retrieval requests should be available. */
public void setEstimatedRemainingRetrievalChunkCount(int newCount) { }
//
// TODO
//
public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputData)
{
return this.dataUpdater.updateDataSourceAsync(inputData);
}
//========================//
// multiplayer networking //
//========================//
@Nullable
public Long getTimestampForPos(long pos)
{ return this.repo.getTimestampForPos(pos); }
//===========//
// debugging //
//===========//
public void addDebugMenuStringsToList(List<String> messageList)
{
this.dataMigratorV1.addDebugMenuStringsToList(messageList);
}
//===========//
// overrides //
//===========//
@Override
public void debugRender(DebugRenderer renderer)
{
this.dataUpdater.debugRender(renderer);
this.updatePropagator.debugRender(renderer);
this.dataMigratorV1.debugRender(renderer);
}
@Override
public void close()
{
try
{
LOGGER.debug("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "].");
this.closeLock.lock();
this.isShutdown = true;
this.dataUpdater.close();
this.updatePropagator.close();
this.dataMigratorV1.close();
// wait a moment so any queued saves can finish queuing,
// otherwise we might not see everything that needs saving and attempt to use a closed repo
Thread.sleep(200);
this.repo.close();
}
catch (InterruptedException ignore) { }
finally
{
this.closeLock.unlock();
}
}
}
@@ -0,0 +1,408 @@
package com.seibel.distanthorizons.core.file.fullDatafile.V2;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.util.threading.PriorityTaskPicker;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.jetbrains.annotations.Nullable;
import java.awt.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;
public class FullDataUpdatePropagatorV2 implements IDebugRenderable, AutoCloseable
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
/** indicates how long the update queue thread should wait between queuing ticks */
protected static final int PROPAGATE_QUEUE_THREAD_DELAY_IN_MS = 250;
public static final int NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD = 5;
/** how many parent update tasks can be in the queue at once */
public static int getMaxPropagateTaskCount()
{ return NUMBER_OF_PARENT_UPDATE_TASKS_PER_THREAD * Config.Common.MultiThreading.numberOfThreads.get(); }
private final FullDataSourceProviderV2 provider;
private final FullDataUpdaterV2 dataUpdater;
/**
* Tracks which positions are currently being updated
* to prevent duplicate concurrent updates.
*/
private final Set<Long> updatingPosSet = ConcurrentHashMap.newKeySet();
// TODO only run thread if modifications happened recently
/**
* Will be null on the dedicated server since updates don't need to be propagated,
* only the highest detail level is needed.
*/
@Nullable
public final ThreadPoolExecutor updateQueueProcessor;
private final String levelId;
//=============//
// constructor //
//=============//
public FullDataUpdatePropagatorV2(FullDataSourceProviderV2 provider, FullDataUpdaterV2 dataUpdater, String levelId)
{
this.provider = provider;
this.dataUpdater = dataUpdater;
this.levelId = levelId;
// update propagation doesn't need to be run on the server since only the highest detail level is needed
this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Update Propagate Queue [" + this.levelId + "]");
this.updateQueueProcessor.execute(this::runUpdateQueue);
}
//================//
// parent updates //
//================//
private void runUpdateQueue()
{
while (!Thread.interrupted())
{
try
{
Thread.sleep(PROPAGATE_QUEUE_THREAD_DELAY_IN_MS);
PriorityTaskPicker.Executor executor = ThreadPoolUtil.getUpdatePropagatorExecutor();
if (executor == null || executor.isTerminated())
{
continue;
}
// TODO it might be worth skipping this logic if no parent updates happened
// update positions closest to the player (if not on a server)
// to make world gen appear faster
DhBlockPos targetBlockPos = DhBlockPos.ZERO;
if (MC_CLIENT != null
&& MC_CLIENT.playerExists())
{
targetBlockPos = MC_CLIENT.getPlayerBlockPos();
}
this.runParentUpdates(executor, targetBlockPos);
if (Config.Common.LodBuilding.Experimental.upsampleLowerDetailLodsToFillHoles.get())
{
this.runChildUpdates(executor, targetBlockPos);
}
}
catch (InterruptedException ignored)
{
Thread.currentThread().interrupt();
}
catch (Exception e)
{
LOGGER.error("Unexpected error in the parent update queue thread. Error: " + e.getMessage(), e);
}
}
LOGGER.info("Update thread ["+Thread.currentThread().getName()+"] terminated.");
}
/** will always apply updates */
private void runParentUpdates(PriorityTaskPicker.Executor executor, DhBlockPos targetBlockPos)
{
int maxUpdateTaskCount = getMaxPropagateTaskCount();
// queue parent updates
if (executor.getQueueSize() < maxUpdateTaskCount
&& this.updatingPosSet.size() < maxUpdateTaskCount)
{
// get the positions that need to be applied to their parents
LongArrayList parentUpdatePosList = this.provider.repo.getPositionsToUpdate(targetBlockPos.getX(), targetBlockPos.getZ(), maxUpdateTaskCount);
// combine updates together based on their parent
HashMap<Long, HashSet<Long>> updatePosByParentPos = new HashMap<>();
for (Long pos : parentUpdatePosList)
{
updatePosByParentPos.compute(DhSectionPos.getParentPos(pos), (parentPos, updatePosSet) ->
{
if (updatePosSet == null)
{
updatePosSet = new HashSet<>();
}
updatePosSet.add(pos);
return updatePosSet;
});
}
// queue the updates
for (Long parentUpdatePos : updatePosByParentPos.keySet())
{
// stop if there are already a bunch of updates queued
if (this.updatingPosSet.size() > maxUpdateTaskCount
|| executor.getQueueSize() > maxUpdateTaskCount
|| !this.updatingPosSet.add(parentUpdatePos))
{
break;
}
try
{
executor.execute(() ->
{
ReentrantLock parentWriteLock = this.dataUpdater.updateLockProvider.getLock(parentUpdatePos);
boolean parentLocked = false;
try
{
//LOGGER.info("updating parent: "+parentUpdatePos);
// Locking the parent before the children should prevent deadlocks.
// TryLock is used instead of lock so this thread can handle a different update.
if (parentWriteLock.tryLock())
{
parentLocked = true;
this.dataUpdater.lockedPosSet.add(parentUpdatePos);
try (FullDataSourceV2 parentDataSource = this.provider.get(parentUpdatePos))
{
// will return null if the file handler is shutting down
if (parentDataSource != null)
{
// apply each child pos to the parent
for (Long childPos : updatePosByParentPos.get(parentUpdatePos))
{
ReentrantLock childReadLock = this.dataUpdater.updateLockProvider.getLock(childPos);
try
{
childReadLock.lock();
this.dataUpdater.lockedPosSet.add(childPos);
try (FullDataSourceV2 childDataSource = this.provider.get(childPos))
{
// can return null when the file handler is being shut down
if (childDataSource != null)
{
parentDataSource.updateFromChunk(childDataSource);
}
}
}
catch (Exception e)
{
LOGGER.error("Unexpected in parent update propagation for parent pos: ["+DhSectionPos.toString(parentUpdatePos)+"], child pos: [" + DhSectionPos.toString(parentUpdatePos) + "], Error: [" + e.getMessage() + "].", e);
}
finally
{
childReadLock.unlock();
this.dataUpdater.lockedPosSet.remove(childPos);
}
}
if (DhSectionPos.getDetailLevel(parentUpdatePos) < FullDataSourceProviderV2.TOP_SECTION_DETAIL_LEVEL)
{
parentDataSource.applyToParent = true;
}
this.dataUpdater.updateDataSource(parentDataSource, false);
for (Long childPos : updatePosByParentPos.get(parentUpdatePos))
{
this.provider.repo.setApplyToParent(childPos, false);
}
}
}
}
}
finally
{
if (parentLocked)
{
parentWriteLock.unlock();
this.dataUpdater.lockedPosSet.remove(parentUpdatePos);
}
this.updatingPosSet.remove(parentUpdatePos);
}
});
}
catch (RejectedExecutionException ignore)
{ /* the executor was shut down, it should be back up shortly and able to accept new jobs */ }
catch (Exception e)
{
this.updatingPosSet.remove(parentUpdatePos);
throw e;
}
}
}
}
/** stops if it finds any LOD data */
private void runChildUpdates(PriorityTaskPicker.Executor executor, DhBlockPos targetBlockPos)
{
int maxUpdateTaskCount = getMaxPropagateTaskCount();
// queue child updates
if (executor.getQueueSize() < maxUpdateTaskCount
&& this.updatingPosSet.size() < maxUpdateTaskCount)
{
// get the positions that need to be applied to their children
LongArrayList childUpdatePosList = this.provider.repo.getChildPositionsToUpdate(targetBlockPos.getX(), targetBlockPos.getZ(), maxUpdateTaskCount);
// queue the updates
for (long parentUpdatePos : childUpdatePosList)
{
// stop if there are already a bunch of updates queued
if (this.updatingPosSet.size() > maxUpdateTaskCount
|| executor.getQueueSize() > maxUpdateTaskCount)
{
break;
}
// skip already updating positions
if (!this.updatingPosSet.add(parentUpdatePos))
{
continue;
}
try
{
executor.execute(() ->
{
ReentrantLock parentReadLock = this.dataUpdater.updateLockProvider.getLock(parentUpdatePos);
boolean parentLocked = false;
try
{
//LOGGER.info("updating parent: "+parentUpdatePos);
// Locking the parent before the children should prevent deadlocks.
// TryLock is used instead of lock so this thread can handle a different update.
if (parentReadLock.tryLock())
{
parentLocked = true;
this.dataUpdater.lockedPosSet.add(parentUpdatePos);
try (FullDataSourceV2 parentDataSource = this.provider.get(parentUpdatePos))
{
// will return null if the file handler is shutting down
if (parentDataSource != null)
{
// apply parent to each child
for (int i = 0; i < 4; i++)
{
long childPos = DhSectionPos.getChildByIndex(parentUpdatePos, i);
ReentrantLock childWriteLock = this.dataUpdater.updateLockProvider.getLock(childPos);
try
{
childWriteLock.lock();
this.dataUpdater.lockedPosSet.add(childPos);
try (FullDataSourceV2 childDataSource = this.provider.get(childPos))
{
// will return null if the file handler is shutting down
if (childDataSource != null)
{
childDataSource.updateFromChunk(parentDataSource);
// don't propagate child updates past the bottom of the tree
if (DhSectionPos.getDetailLevel(childPos) != DhSectionPos.SECTION_BLOCK_DETAIL_LEVEL)
{
childDataSource.applyToChildren = true;
}
this.dataUpdater.updateDataSource(childDataSource, false);
}
}
}
catch (Exception e)
{
LOGGER.error("Unexpected in child update propagation for parent pos: ["+DhSectionPos.toString(parentUpdatePos)+"], child pos: [" + DhSectionPos.toString(parentUpdatePos) + "], Error: [" + e.getMessage() + "].", e);
}
finally
{
childWriteLock.unlock();
this.dataUpdater.lockedPosSet.remove(childPos);
}
}
this.provider.repo.setApplyToChild(parentUpdatePos, false);
}
}
}
}
finally
{
if (parentLocked)
{
parentReadLock.unlock();
this.dataUpdater.lockedPosSet.remove(parentUpdatePos);
}
this.updatingPosSet.remove(parentUpdatePos);
}
});
}
catch (RejectedExecutionException ignore)
{ /* the executor was shut down, it should be back up shortly and able to accept new jobs */ }
catch (Exception e)
{
this.updatingPosSet.remove(parentUpdatePos);
throw e;
}
}
}
}
//===========//
// overrides //
//===========//
@Override
public void debugRender(DebugRenderer renderer)
{
this.updatingPosSet
.forEach((pos) -> { renderer.renderBox(new DebugRenderer.Box(pos, -32f, 80f, 0.20f, Color.MAGENTA)); });
}
@Override
public void close()
{
try
{
//LOGGER.info("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "].");
if (this.updateQueueProcessor != null)
{
this.updateQueueProcessor.shutdownNow();
}
// wait a moment so any queued saves can finish queuing,
// otherwise we might not see everything that needs saving and attempt to use a closed repo
Thread.sleep(200);
}
catch (InterruptedException ignore) { }
}
}
@@ -0,0 +1,216 @@
package com.seibel.distanthorizons.core.file.fullDatafile.V2;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.IDataSourceUpdateListenerFunc;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.util.threading.PositionalLockProvider;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import org.jetbrains.annotations.NotNull;
import java.awt.*;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class FullDataUpdaterV2 implements IDebugRenderable, AutoCloseable
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private final FullDataSourceProviderV2 provider;
protected final PositionalLockProvider updateLockProvider = new PositionalLockProvider();
/**
* generally just used for debugging,
* keeps track of which positions are currently locked.
*/
public final Set<Long> lockedPosSet = ConcurrentHashMap.newKeySet();
private final ConcurrentHashMap<Long, AtomicInteger> queuedUpdateCountsByPos = new ConcurrentHashMap<>();
public final ArrayList<IDataSourceUpdateListenerFunc<FullDataSourceV2>> dateSourceUpdateListeners = new ArrayList<>();
private final String levelId;
//=============//
// constructor //
//=============//
public FullDataUpdaterV2(FullDataSourceProviderV2 provider, String levelId)
{
this.provider = provider;
this.levelId = levelId;
}
//===============//
// data updating //
//===============//
/**
* Can be used if you don't want to lock the current thread
* Otherwise the sync version {@link FullDataUpdaterV2#updateDataSource(FullDataSourceV2, boolean)} may be a better choice.
*/
public CompletableFuture<Void> updateDataSourceAsync(@NotNull FullDataSourceV2 inputDataSource)
{
AbstractExecutorService executor = ThreadPoolUtil.getChunkToLodBuilderExecutor();
if (executor == null || executor.isTerminated())
{
return CompletableFuture.completedFuture(null);
}
try
{
this.markUpdateStart(inputDataSource.getPos());
return CompletableFuture.runAsync(() ->
{
try
{
this.updateDataSource(inputDataSource, true);
}
catch (Exception e)
{
LOGGER.error("Unexpected error in async data source update at pos: ["+ DhSectionPos.toString(inputDataSource.getPos())+"], error: ["+e.getMessage()+"].", e);
}
finally
{
this.markUpdateEnd(inputDataSource.getPos());
}
}, executor);
}
catch (RejectedExecutionException ignore)
{
// can happen if the executor was shutdown while this task was queued
this.markUpdateEnd(inputDataSource.getPos());
return CompletableFuture.completedFuture(null);
}
}
/** After this method returns the inputData will be written to file. */
public void updateDataSource(@NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos)
{
long updatePos = inputData.getPos();
boolean methodLocked = false;
// a lock is necessary to prevent two threads from writing to the same position at once,
// if that happens only the second update will apply and the LOD will end up with hole(s)
ReentrantLock updateLock = this.updateLockProvider.getLock(updatePos);
try
{
if (lockOnUpdatePos)
{
methodLocked = true;
updateLock.lock();
this.lockedPosSet.add(updatePos);
}
// get or create the data source
try (FullDataSourceV2 recipientDataSource = this.provider.get(updatePos))
{
if (recipientDataSource != null)
{
boolean dataModified = recipientDataSource.updateFromChunk(inputData);
if (dataModified)
{
// save the updated data to the database
try (FullDataSourceV2DTO dto = this.provider.createDtoFromDataSource(recipientDataSource))
{
this.provider.repo.save(dto);
}
for (IDataSourceUpdateListenerFunc<FullDataSourceV2> listener : this.dateSourceUpdateListeners)
{
if (listener != null)
{
listener.OnDataSourceUpdated(recipientDataSource);
}
}
}
}
}
}
catch (Exception e)
{
LOGGER.error("Error updating pos ["+DhSectionPos.toString(updatePos)+"], error: "+e.getMessage(), e);
}
finally
{
if (methodLocked)
{
updateLock.unlock();
this.lockedPosSet.remove(updatePos);
}
}
}
//==================//
// debugger methods //
//==================//
/** used for debugging to track which positions are queued for updating */
private void markUpdateStart(long dataSourcePos)
{
this.queuedUpdateCountsByPos.compute(dataSourcePos, (pos, atomicCount) ->
{
if (atomicCount == null)
{
atomicCount = new AtomicInteger(0);
}
atomicCount.incrementAndGet();
return atomicCount;
});
}
/** used for debugging to track which positions are queued for updating */
private void markUpdateEnd(long dataSourcePos)
{
this.queuedUpdateCountsByPos.compute(dataSourcePos, (pos, atomicCount) ->
{
if (atomicCount != null && atomicCount.decrementAndGet() <= 0)
{
atomicCount = null;
}
return atomicCount;
});
}
//===========//
// overrides //
//===========//
@Override
public void debugRender(DebugRenderer renderer)
{
this.lockedPosSet
.forEach((pos) -> { renderer.renderBox(new DebugRenderer.Box(pos, -32f, 74f, 0.15f, Color.PINK)); });
this.queuedUpdateCountsByPos
.forEach((pos, updateCountRef) -> { renderer.renderBox(new DebugRenderer.Box(pos, -32f, 80f + (updateCountRef.get() * 16f), 0.20f, Color.WHITE)); });
}
@Override
public void close()
{
//LOGGER.info("Closing [" + this.getClass().getSimpleName() + "] for level: [" + this.levelId + "].");
}
}
@@ -2,10 +2,9 @@ package com.seibel.distanthorizons.core.level;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.multiplayer.server.FullDataSourceRequestHandler;
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState;
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager;
@@ -297,27 +296,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
@Override
public void addDebugMenuStringsToList(List<String> messageList)
{
// migration
boolean migrationErrored = this.serverside.fullDataFileHandler.getMigrationStoppedWithError();
if (!migrationErrored)
{
long legacyDeletionCount = this.serverside.fullDataFileHandler.getLegacyDeletionCount();
if (legacyDeletionCount > 0)
{
messageList.add(" Migrating - Deleting #: " + F3Screen.NUMBER_FORMAT.format(legacyDeletionCount));
}
long migrationCount = this.serverside.fullDataFileHandler.getTotalMigrationCount();
if (migrationCount > 0)
{
messageList.add(" Migrating - Conversion #: " + F3Screen.NUMBER_FORMAT.format(migrationCount));
}
}
else
{
messageList.add(" Migration Failed");
}
// world gen
this.serverside.fullDataFileHandler.addDebugMenuStringsToList(messageList);
this.serverside.worldGenModule.addDebugMenuStringsToList(messageList);
}
@@ -22,8 +22,8 @@ package com.seibel.distanthorizons.core.level;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.AbstractDataSourceHandler;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.IDataSourceUpdateListenerFunc;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
import com.seibel.distanthorizons.core.render.LodQuadTree;
@@ -39,7 +39,7 @@ import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
public class ClientLevelModule implements Closeable, AbstractDataSourceHandler.IDataSourceUpdateListenerFunc<FullDataSourceV2>
public class ClientLevelModule implements Closeable, IDataSourceUpdateListenerFunc<FullDataSourceV2>
{
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
@@ -69,7 +69,7 @@ public class ClientLevelModule implements Closeable, AbstractDataSourceHandler.I
this.clientLevel = clientLevel;
this.fullDataSourceProvider = this.clientLevel.getFullDataProvider();
this.fullDataSourceProvider.dateSourceUpdateListeners.add(this);
this.fullDataSourceProvider.addDataSourceUpdateListener(this);
}
@@ -161,7 +161,8 @@ public class ClientLevelModule implements Closeable, AbstractDataSourceHandler.I
// data handling //
//===============//
public CompletableFuture<Void> updateDataSourcesAsync(FullDataSourceV2 data) { return this.clientLevel.getFullDataProvider().updateDataSourceAsync(data); }
public CompletableFuture<Void> updateDataSourcesAsync(FullDataSourceV2 data)
{ return this.clientLevel.getFullDataProvider().updateDataSourceAsync(data); }
@Override
public void OnDataSourceUpdated(FullDataSourceV2 updatedFullDataSource)
{
@@ -195,7 +196,7 @@ public class ClientLevelModule implements Closeable, AbstractDataSourceHandler.I
}
}
this.fullDataSourceProvider.dateSourceUpdateListeners.remove(this);
this.fullDataSourceProvider.removeDataSourceUpdateListener(this);
}
@@ -23,7 +23,7 @@ import com.google.common.cache.CacheBuilder;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.RemoteFullDataSourceProvider;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.generation.RemoteWorldRetrievalQueue;
@@ -68,7 +68,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
public final ClientLevelModule clientside;
public final IClientLevelWrapper levelWrapper;
public final ISaveStructure saveStructure;
public final RemoteFullDataSourceProvider dataFileHandler;
public final RemoteFullDataSourceProvider remoteDataSourceProvider;
@CheckForNull
private final ClientNetworkState networkState;
@@ -130,12 +130,12 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
this.syncOnLoadRequestQueue = null;
}
this.dataFileHandler = new RemoteFullDataSourceProvider(this, saveStructure, fullDataSaveDirOverride, this.syncOnLoadRequestQueue);
this.worldGenModule = new WorldGenModule(this, this.dataFileHandler, () -> new WorldGenState(this, networkState));
this.remoteDataSourceProvider = new RemoteFullDataSourceProvider(this, saveStructure, fullDataSaveDirOverride, this.syncOnLoadRequestQueue);
this.worldGenModule = new WorldGenModule(this, this.remoteDataSourceProvider, () -> new WorldGenState(this, networkState));
this.clientside = new ClientLevelModule(this);
this.createAndSetSupportingRepos(this.dataFileHandler.repo.databaseFile);
this.createAndSetSupportingRepos(this.remoteDataSourceProvider.repo.databaseFile);
this.runRepoReliantSetup();
this.clientside.startRenderer();
@@ -283,7 +283,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
public CompletableFuture<Void> updateDataSourcesAsync(FullDataSourceV2 data) { return this.clientside.updateDataSourcesAsync(data); }
@Override
public FullDataSourceProviderV2 getFullDataProvider() { return this.dataFileHandler; }
public FullDataSourceProviderV2 getFullDataProvider() { return this.remoteDataSourceProvider; }
@Override
public ISaveStructure getSaveStructure() { return this.saveStructure; }
@@ -321,24 +321,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
messageList.add("["+dimName+"] rendering: "+(rendering ? "yes" : "no"));
boolean migrationErrored = this.dataFileHandler.getMigrationStoppedWithError();
if (!migrationErrored)
{
long legacyDeletionCount = this.dataFileHandler.getLegacyDeletionCount();
if (legacyDeletionCount > 0)
{
messageList.add(" Migrating - Deleting #: " + legacyDeletionCount);
}
long migrationCount = this.dataFileHandler.getTotalMigrationCount();
if (migrationCount > 0)
{
messageList.add(" Migrating - Conversion #: " + migrationCount);
}
}
else
{
messageList.add(" Migration Failed");
}
this.remoteDataSourceProvider.addDebugMenuStringsToList(messageList);
// world gen
@@ -378,7 +361,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
this.levelWrapper.setDhLevel(null);
this.clientside.close();
super.close();
this.dataFileHandler.close();
this.remoteDataSourceProvider.close();
LOGGER.info("Closed [" + DhClientLevel.class.getSimpleName() + "] for [" + this.levelWrapper + "]");
}
@@ -22,7 +22,7 @@ package com.seibel.distanthorizons.core.level;
import com.seibel.distanthorizons.core.api.internal.ClientApi;
import com.seibel.distanthorizons.core.api.internal.ServerApi;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataSourceProvider;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.pos.DhChunkPos;
@@ -26,7 +26,7 @@ import com.seibel.distanthorizons.core.dataObjects.render.CachedColumnRenderSour
import com.seibel.distanthorizons.core.dataObjects.render.ColumnRenderSource;
import com.seibel.distanthorizons.core.dataObjects.render.bufferBuilding.LodBufferContainer;
import com.seibel.distanthorizons.core.enums.EDhDirection;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
@@ -31,7 +31,7 @@ import com.seibel.distanthorizons.core.dataObjects.render.bufferBuilding.LodQuad
import com.seibel.distanthorizons.core.dataObjects.transformers.FullDataToRenderDataTransformer;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.enums.EDhDirection;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.fullDatafile.V2.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.logging.DhLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;