Merge branch 'NSizedMultiplayerTest'

This commit is contained in:
s809
2024-12-04 23:38:42 +05:00
15 changed files with 567 additions and 361 deletions
@@ -31,7 +31,7 @@ public final class ModInfo
public static final String DEDICATED_SERVER_INITIAL_PATH = "dedicated_server_initial";
/** Incremented every time any packets are added, changed or removed, with a few exceptions. */
public static final int PROTOCOL_VERSION = 7;
public static final int PROTOCOL_VERSION = 8;
public static final String WRAPPER_PACKET_PATH = "message";
/** The internal mod name */
@@ -66,7 +66,7 @@ public class SharedApi
private static final UpdateChunkPosManager UPDATE_POS_MANAGER = new UpdateChunkPosManager();
/** how many chunks can be queued for updating per thread, used to prevent updates from infinitely pilling up if the user flies around extremely fast */
private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 500;
private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 1_000;
/** how many milliseconds must pass before an overloaded message can be sent in chat or the log */
private static final int MIN_MS_BETWEEN_OVERLOADED_LOG_MESSAGE = 30_000;
@@ -1340,6 +1340,7 @@ public class Config
.build();
public static final ConfigEntry<Integer> numberOfUpdatePropagatorThreads = new ConfigEntry.Builder<Integer>()
.setServersideShortName("numberOfUpdatePropagatorThreads")
.setMinDefaultMax(1,
ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultThreadCount(),
Runtime.getRuntime().availableProcessors())
@@ -1360,6 +1361,7 @@ public class Config
+ THREAD_NOTE)
.build();
public static final ConfigEntry<Double> runTimeRatioForUpdatePropagatorThreads = new ConfigEntry.Builder<Double>()
.setServersideShortName("runTimeRatioForUpdatePropagatorThreads")
.setMinDefaultMax(0.01, ThreadPresetConfigEventHandler.getUpdatePropagatorDefaultRunTimeRatio(), 1.0)
.comment(THREAD_RUN_TIME_RATIO_NOTE)
.build();
@@ -21,13 +21,13 @@ package com.seibel.distanthorizons.core.file.fullDatafile;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.api.internal.ClientApi;
import com.seibel.distanthorizons.core.api.internal.SharedApi;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV1;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.file.AbstractDataSourceHandler;
import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
@@ -40,7 +40,6 @@ import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV2Repo;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.world.EWorldEnvironment;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.logging.log4j.Logger;
@@ -137,15 +136,8 @@ public class FullDataSourceProviderV2
this.migrationThreadPool.execute(this::convertLegacyDataSources);
// update propagation doesn't need to be run on the server since only the highest detail level is needed
if (SharedApi.getEnvironment() != EWorldEnvironment.SERVER_ONLY)
{
this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue ["+levelId+"]");
this.updateQueueProcessor.execute(this::runUpdateQueue);
}
else
{
this.updateQueueProcessor = null;
}
this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue [" + levelId + "]");
this.updateQueueProcessor.execute(this::runUpdateQueue);
}
@@ -604,7 +596,8 @@ public class FullDataSourceProviderV2
public int getMaxPossibleRetrievalPositionCountForPos(Long pos) { return -1; }
/** @return true if the position was queued, false if not */
public boolean queuePositionForRetrieval(Long genPos) { return false; }
@Nullable
public CompletableFuture<WorldGenResult> queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests) { return null; }
/** does nothing if the given position isn't present in the queue */
public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf) { }
@@ -632,8 +625,6 @@ public class FullDataSourceProviderV2
@Nullable
public Long getTimestampForPos(long pos)
{ return this.repo.getTimestampForPos(pos); }
public Map<Long, Long> getTimestampsForRange(byte detailLevel, int startPosX, int startPosZ, int endPosX, int endPosZ)
{ return this.repo.getTimestampsForRange(detailLevel, startPosX, startPosZ, endPosX, endPosZ); }
@@ -38,6 +38,7 @@ import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.coreapi.util.BitShiftUtil;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.awt.*;
@@ -60,10 +61,10 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
* TODO this should be dynamically allocated based on CPU load
* and abilities.
*/
public static final int MAX_WORLD_GEN_REQUESTS_PER_THREAD = 20;
public static final int MAX_WORLD_GEN_REQUESTS_PER_THREAD = 20;
private final AtomicReference<IFullDataSourceRetrievalQueue> worldGenQueueRef = new AtomicReference<>(null);
protected final AtomicReference<IFullDataSourceRetrievalQueue> worldGenQueueRef = new AtomicReference<>(null);
private final ArrayList<IOnWorldGenCompleteListener> onWorldGenTaskCompleteListeners = new ArrayList<>();
protected final DelayedFullDataSourceSaveCache delayedFullDataSourceSaveCache = new DelayedFullDataSourceSaveCache(this::onDataSourceSave, 5_000);
@@ -170,8 +171,7 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
}
IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get();
if (worldGenQueue == null)
if (this.worldGenQueueRef.get() == null)
{
// we can't queue anything if the world generator isn't set up yet
return false;
@@ -208,25 +208,47 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
return false;
}
// don't queue additional world gen requests beyond the max allotted count
return worldGenQueue.getWaitingTaskCount() < maxQueueCount;
return true;
}
@Override
public boolean queuePositionForRetrieval(Long genPos)
public CompletableFuture<WorldGenResult> queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests)
{
IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get();
if (worldGenQueue == null)
{
return false;
return null;
}
if (!allowAboveMaxGenRequests)
{
int maxQueueCount = MAX_WORLD_GEN_REQUESTS_PER_THREAD * Config.Common.MultiThreading.numberOfWorldGenerationThreads.get();
if (worldGenQueue.getWaitingTaskCount() >= maxQueueCount)
{
return null;
}
}
WorldGenTaskTracker genTaskTracker = new WorldGenTaskTracker(genPos);
CompletableFuture<WorldGenResult> worldGenFuture = worldGenQueue.submitRetrievalTask(genPos, (byte) (DhSectionPos.getDetailLevel(genPos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL), genTaskTracker);
worldGenFuture.whenComplete((genTaskResult, ex) -> this.onWorldGenTaskComplete(genTaskResult, ex));
worldGenFuture.whenComplete((genTaskResult, ex) ->
{
LOGGER.info("gen task complete ["+DhSectionPos.toString(genPos)+"]");
//this.onWorldGenTaskComplete(genTaskResult, ex);
});
return true;
return worldGenFuture;
}
@Override
protected void updateDataSourceAtPos(long updatePos, @NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos)
{
super.updateDataSourceAtPos(updatePos, inputData, lockOnUpdatePos);
//if (SharedApi.getEnvironment() != EWorldEnvironment.CLIENT_ONLY)
// LOGGER.info("updated ["+DhSectionPos.toString(updatePos)+"]");
this.onWorldGenTaskComplete(WorldGenResult.CreateSuccess(updatePos), null);
}
@Override
@@ -20,24 +20,20 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.generation.RemoteWorldRetrievalQueue;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.level.WorldGenModule;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.multiplayer.client.SyncOnLoadRequestQueue;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
import com.seibel.distanthorizons.core.util.TimerUtil;
import com.seibel.distanthorizons.coreapi.util.BitShiftUtil;
import org.apache.logging.log4j.Logger;
import com.seibel.distanthorizons.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import org.jetbrains.annotations.Nullable;
import java.io.File;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -46,10 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvider
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private static final Timer DELAY_UPDATE_TIMER = TimerUtil.CreateTimer("Remote DataSource Visited Pos Removal Timer");
/** auto remove visited positions from the set after a given amount of time to prevent the set from growing infinitely */
private static final int VISITED_POSITION_REMOVAL_TIME_IN_MS = 20 * 60 * 1_000; // 20 minutes
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
@Nullable
private final SyncOnLoadRequestQueue syncOnLoadRequestQueue;
@@ -62,7 +55,7 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide
//=============//
public RemoteFullDataSourceProvider(
IDhClientLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride,
IDhLevel level, ISaveStructure saveStructure, @Nullable File saveDirOverride,
@Nullable SyncOnLoadRequestQueue syncOnLoadRequestQueue)
{
super(level, saveStructure, saveDirOverride);
@@ -70,24 +63,6 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide
}
@Override
public boolean queuePositionForRetrieval(Long genPos)
{
if (this.syncOnLoadRequestQueue == null)
{
return super.queuePositionForRetrieval(genPos);
}
int maxGenerationRequestDistance = this.syncOnLoadRequestQueue.networkState.sessionConfig.getMaxGenerationRequestDistance();
DhBlockPos2D targetPos = this.level.getTargetPosForGeneration();
if (targetPos == null || DhSectionPos.getChebyshevSignedBlockDistance(genPos, targetPos) / 16 > maxGenerationRequestDistance)
{
return false;
}
return super.queuePositionForRetrieval(genPos);
}
//==================//
// override methods //
@@ -110,62 +85,32 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide
}
//===========================//
// request timestamp updates //
// from server //
//===========================//
// get the timestamp for every maximum detail position in this section
int posToMinimumDetailScale = BitShiftUtil.powerOfTwo(DhSectionPos.getDetailLevel(pos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL);
Map<Long, Long> timestamps = this.getTimestampsForRange(
DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL,
DhSectionPos.getX(pos) * posToMinimumDetailScale,
DhSectionPos.getZ(pos) * posToMinimumDetailScale,
(DhSectionPos.getX(pos) + 1) * posToMinimumDetailScale,
(DhSectionPos.getZ(pos) + 1) * posToMinimumDetailScale
);
DhSectionPos.forEachChildAtDetailLevel(pos, DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos ->
Long timestamp = this.getTimestampForPos(pos);
if (timestamp != null)
{
int maxSyncOnLoadDistance = this.syncOnLoadRequestQueue.networkState.sessionConfig.getMaxSyncOnLoadDistance();
DhBlockPos2D targetPos = this.level.getTargetPosForGeneration();
if (targetPos == null || DhSectionPos.getChebyshevSignedBlockDistance(childPos, targetPos) / 16 > maxSyncOnLoadDistance)
{
return;
}
if (!this.visitedPositions.add(childPos))
{
return;
}
this.queueVisitedPositionForRemoval(childPos);
// check if the server has newer versions of these LODs
Long subTimestamp = timestamps.get(childPos);
if (subTimestamp != null)
{
this.syncOnLoadRequestQueue.submitRequest(childPos, subTimestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave);
}
});
this.syncOnLoadRequestQueue.submitRequest(pos, timestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave);
}
return super.get(pos);
}
/** this is done to prevent infinite set growth */
private void queueVisitedPositionForRemoval(long pos)
@Override
public CompletableFuture<WorldGenResult> queuePositionForRetrieval(long genPos, boolean allowAboveMaxGenRequests)
{
TimerTask timerTask = new TimerTask()
RemoteWorldRetrievalQueue worldGenQueue = (RemoteWorldRetrievalQueue) this.worldGenQueueRef.get();
if (worldGenQueue == null)
{
@Override
public void run()
{
RemoteFullDataSourceProvider.this.visitedPositions.remove(pos);
}
};
try
{
DELAY_UPDATE_TIMER.schedule(timerTask, VISITED_POSITION_REMOVAL_TIME_IN_MS);
return null;
}
catch (IllegalStateException ignore) { /* shouldn't happen, but there have been issues like this in the past */ }
return super.queuePositionForRetrieval(genPos, worldGenQueue.isPosCloserThanFarthestWaiting(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos()), genPos));
}
@@ -7,11 +7,14 @@ import com.seibel.distanthorizons.core.level.DhClientLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.multiplayer.client.AbstractFullDataNetworkRequestQueue;
import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQueue implements IFullDataSourceRetrievalQueue, IDebugRenderable
@@ -39,7 +42,7 @@ public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQue
public void startAndSetTargetPos(DhBlockPos2D targetPos) { super.tick(targetPos); }
@Override
public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; }
public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL + 12; } // TODO should be the same as what the server's update propgator can provide
@Override
public byte highestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; }
@@ -47,9 +50,23 @@ public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQue
public CompletableFuture<WorldGenResult> submitRetrievalTask(long sectionPos, byte requiredDataDetail, IWorldGenTaskTracker tracker)
{
return super.submitRequest(sectionPos, tracker.getDataSourceConsumer())
.thenApply(retrievalSuccess -> retrievalSuccess
? WorldGenResult.CreateSuccess(sectionPos)
: WorldGenResult.CreateFail());
.thenApply(requestResult ->
{
switch (requestResult)
{
case SUCCEEDED:
return WorldGenResult.CreateSuccess(sectionPos);
case FAILED:
return WorldGenResult.CreateFail();
case REQUIRES_SPLITTING:
List<CompletableFuture<WorldGenResult>> childFutures = new ArrayList<>(4);
DhSectionPos.forEachChild(sectionPos, childPos -> childFutures.add(this.submitRetrievalTask(childPos, requiredDataDetail, tracker)));
return WorldGenResult.CreateSplit(childFutures);
}
LodUtil.assertNotReach();
return WorldGenResult.CreateFail();
});
}
@Override
@@ -479,8 +479,8 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
// getters / setters //
//===================//
public int getWaitingTaskCount() { return this.waitingTasks.size(); }
public int getInProgressTaskCount() { return this.inProgressGenTasksByLodPos.size(); }
@Override public int getWaitingTaskCount() { return this.waitingTasks.size(); }
@Override public int getInProgressTaskCount() { return this.inProgressGenTasksByLodPos.size(); }
@Override
public byte lowestDataDetail() { return this.lowestDataDetail; }
@@ -492,7 +492,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
@Override
public void setEstimatedTotalTaskCount(int newEstimate) { this.estimatedTotalTaskCount = newEstimate; }
public void addDebugMenuStringsToList(List<String> messageList) { }
@Override public void addDebugMenuStringsToList(List<String> messageList) { }
@@ -500,7 +500,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
// shutdown //
//==========//
public CompletableFuture<Void> startClosingAsync(boolean cancelCurrentGeneration, boolean alsoInterruptRunning)
@Override public CompletableFuture<Void> startClosingAsync(boolean cancelCurrentGeneration, boolean alsoInterruptRunning)
{
LOGGER.info("Closing world gen queue");
this.queueingThread.shutdownNow();
@@ -4,9 +4,9 @@ import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.multiplayer.server.FullDataSourceRequestHandler;
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState;
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager;
import com.seibel.distanthorizons.core.network.exceptions.RequestOutOfRangeException;
@@ -17,7 +17,6 @@ import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
@@ -27,24 +26,16 @@ import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
public abstract class AbstractDhServerLevel extends AbstractDhLevel implements IDhServerLevel
{
protected static final Logger LOGGER = DhLoggerBuilder.getLogger();
private static final ConfigBasedLogger NETWORK_LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Common.Logging.logNetworkEvent.get());
/** 1 Mebibyte minus 576 bytes for other info */
public static final int FULL_DATA_SPLIT_SIZE_IN_BYTES = 1_048_000;
public final ServerLevelModule serverside;
protected final IServerLevelWrapper serverLevelWrapper;
@@ -58,9 +49,9 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
*/
protected final ConcurrentLinkedQueue<IServerPlayerWrapper> worldGenPlayerCenteringQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupByPos = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupByFutureId = new ConcurrentHashMap<>();
private final FullDataSourceRequestHandler requestHandler = new FullDataSourceRequestHandler(this);
private final boolean NSizedGenerationSupported = false;
//=============//
@@ -104,52 +95,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
@Override
public void serverTick()
{
// Send finished data source requests
for (Map.Entry<Long, DataSourceRequestGroup> entry : this.requestGroupByPos.entrySet())
{
DataSourceRequestGroup requestGroup = entry.getValue();
if (requestGroup.fullDataSource == null)
{
continue;
}
NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]");
// Make this group unavailable for adding into
this.requestGroupByPos.remove(entry.getKey());
requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
continue;
}
CompletableFuture.runAsync(() ->
{
Objects.requireNonNull(this.beaconBeamRepo);
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey())))
{
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
{
this.requestGroupByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
{
continue;
}
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
msg.sendResponse(new FullDataSourceResponseMessage(payload));
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
});
}
}
}, executor);
}
this.requestHandler.tick();
}
@Override
@@ -188,9 +134,8 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
{
serverPlayerState.networkSession.registerHandler(FullDataSourceRequestMessage.class, (message) ->
{
if (!this.messagePlayerInThisLevel(message))
if (!this.validatePlayerInCurrentLevel(message))
{
// we can't handle players in other levels, don't continue
return;
}
@@ -206,7 +151,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
message.sendResponse(new RequestOutOfRangeException("Distance too large: " + distanceFromPlayer + " > " + Config.Server.maxGenerationRequestDistance.get()));
return;
}
this.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet);
this.requestHandler.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet);
}
else
{
@@ -215,131 +160,20 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
message.sendResponse(new RequestOutOfRangeException("Distance too large: " + distanceFromPlayer + " > " + Config.Server.maxSyncOnLoadRequestDistance.get()));
return;
}
this.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet);
this.requestHandler.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet);
}
});
serverPlayerState.networkSession.registerHandler(CancelMessage.class, msg ->
{
DataSourceRequestGroup requestGroup = this.requestGroupByFutureId.remove(msg.futureId);
if (requestGroup == null)
{
return;
}
// If this fails, the group is being removed and completing cancellation is not necessary
if (requestGroup.requestRemoveSemaphore.tryAcquire())
{
// Prevent adding requests in case the group will be removed by this cancellation
requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
requestGroup.requestRemoveSemaphore.release();
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
FullDataSourceRequestMessage requestMessage = requestGroup.requestMessages.remove(msg.futureId);
if (requestGroup.requestMessages.isEmpty())
{
NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestMessage.sectionPos) + "].");
this.requestGroupByPos.remove(requestMessage.sectionPos);
this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos);
}
else
{
requestGroup.requestAddSemaphore.release(Short.MAX_VALUE);
}
}
this.requestHandler.cancelRequest(msg.futureId);
});
}
private void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message))
{
return;
}
// the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated
long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1;
// the server timestamp will be null if no LOD data exists for this position
Long serverTimestamp = this.serverside.fullDataFileHandler.getTimestampForPos(message.sectionPos);
if (serverTimestamp == null
|| serverTimestamp <= clientTimestamp)
{
// either no data exists to sync, or the client is already up to date
rateLimiterSet.syncOnLoginRateLimiter.release();
message.sendResponse(new FullDataSourceResponseMessage(null));
return;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
// shouldn't normally happen, but just in case
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
return;
}
this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource ->
{
Objects.requireNonNull(this.beaconBeamRepo);
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos)))
{
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
message.sendResponse(new FullDataSourceResponseMessage(payload));
rateLimiterSet.syncOnLoginRateLimiter.release();
});
}
}, executor);
}
private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message))
{
return;
}
while (true)
{
DataSourceRequestGroup requestGroup = this.requestGroupByPos.computeIfAbsent(message.sectionPos, pos ->
{
DataSourceRequestGroup newGroup = new DataSourceRequestGroup();
this.tryFulfillDataSourceRequestGroup(newGroup, pos);
NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "].");
return newGroup;
});
// If this fails, loop until either a permit is acquired or the group is removed to create another one
if (!requestGroup.requestAddSemaphore.tryAcquire())
{
Thread.yield();
continue;
}
this.requestGroupByFutureId.put(message.futureId, requestGroup);
requestGroup.requestMessages.put(message.futureId, message);
requestGroup.requestAddSemaphore.release();
break;
}
}
/** May send an error message in response if the message is a {@link AbstractTrackableMessage} */
private <T extends AbstractNetworkMessage> boolean messagePlayerInThisLevel(T message)
private <T extends AbstractNetworkMessage> boolean validatePlayerInCurrentLevel(T message)
{
if (!(message instanceof ILevelRelatedMessage))
{
@@ -383,35 +217,12 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
// world gen //
//===========//
public boolean isNSizedGenerationSupported() { return this.NSizedGenerationSupported; }
@Override
public void onWorldGenTaskComplete(long pos)
{
DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos);
if (requestGroup != null)
{
requestGroup.worldGenTaskComplete = true;
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
}
private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos)
{
this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource ->
{
if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps))
{
requestGroup.fullDataSource = fullDataSource;
}
else if (requestGroup.worldGenTaskComplete)
{
// If the returned data source is not fully generated, try reading it again
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
else
{
this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos);
}
});
this.requestHandler.onWorldGenTaskComplete(pos);
}
@@ -551,31 +362,4 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
LOGGER.info("Closed DHLevel for [" + this.getLevelWrapper() + "].");
}
//================//
// helper classes //
//================//
private static class DataSourceRequestGroup
{
public final ConcurrentMap<Long, FullDataSourceRequestMessage> requestMessages = new ConcurrentHashMap<>();
/** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */
boolean worldGenTaskComplete = false;
@CheckForNull
public FullDataSourceV2 fullDataSource = null;
/**
* These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled,
* as opposed to ReentrantReadWriteLocks which would allow the locking thread continue using it anyway. <br>
* Short.MAX_VALUE is chosen as a large enough number so non-exclusive accesses never block each other.
*/
public final Semaphore requestAddSemaphore = new Semaphore(Short.MAX_VALUE, true);
/** @see DataSourceRequestGroup#requestAddSemaphore */
public final Semaphore requestRemoveSemaphore = new Semaphore(Short.MAX_VALUE, true);
}
}
@@ -10,6 +10,7 @@ import com.seibel.distanthorizons.core.logging.ConfigBasedSpamLogger;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.exceptions.RequestOutOfRangeException;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException;
import com.seibel.distanthorizons.core.network.session.SessionClosedException;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
@@ -33,6 +34,7 @@ import java.io.IOException;
import java.util.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -105,30 +107,44 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
// request submitting //
//====================//
public CompletableFuture<Boolean> submitRequest(long sectionPos, Consumer<FullDataSourceV2> dataSourceConsumer)
public CompletableFuture<RequestResult> submitRequest(long sectionPos, Consumer<FullDataSourceV2> dataSourceConsumer)
{ return this.submitRequest(sectionPos, null, dataSourceConsumer); }
public CompletableFuture<Boolean> submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer<FullDataSourceV2> dataSourceConsumer)
public CompletableFuture<RequestResult> submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer<FullDataSourceV2> dataSourceConsumer)
{
LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed.");
RequestQueueEntry entry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp);
entry.future.whenComplete((success, throwable) ->
AtomicBoolean added = new AtomicBoolean(false);
RequestQueueEntry entry = this.waitingTasksBySectionPos.compute(sectionPos, (k, existingQueueEntry) ->
{
this.waitingTasksBySectionPos.remove(sectionPos);
if (throwable instanceof CancellationException)
if (existingQueueEntry != null)
{
return;
return existingQueueEntry;
}
this.finishedRequests.incrementAndGet();
if (!success || throwable != null)
RequestQueueEntry newEntry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp);
newEntry.future.whenComplete((requestResult, throwable) ->
{
this.failedRequests.incrementAndGet();
}
this.waitingTasksBySectionPos.remove(sectionPos);
if (requestResult != RequestResult.REQUIRES_SPLITTING)
{
this.finishedRequests.incrementAndGet();
}
if ((requestResult == null || requestResult == RequestResult.FAILED)
|| (throwable != null && !(throwable instanceof CancellationException)))
{
this.failedRequests.incrementAndGet();
}
});
added.set(true);
return newEntry;
});
this.waitingTasksBySectionPos.put(sectionPos, entry);
if (!added.get())
{
return CompletableFuture.completedFuture(RequestResult.FAILED);
}
return entry.future;
}
@@ -164,7 +180,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
{
Map.Entry<Long, RequestQueueEntry> mapEntry = this.waitingTasksBySectionPos.entrySet().stream()
.filter(task -> task.getValue().networkDataSourceFuture == null)
.min(Comparator.comparingInt(task -> DhSectionPos.getChebyshevSignedBlockDistance(task.getKey(), targetPos)))
.min(Comparator.comparingInt(x -> DhSectionPos.getChebyshevSignedBlockDistance(x.getKey(), targetPos)))
.orElse(null);
if (mapEntry == null)
@@ -190,7 +206,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
CompletableFuture<FullDataSourceResponseMessage> dataSourceFuture = this.networkState.getSession().sendRequest(
new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, offsetEntryTimestamp),
FullDataSourceResponseMessage.class
);
);
entry.networkDataSourceFuture = dataSourceFuture;
dataSourceFuture.handle((response, throwable) ->
{
@@ -213,6 +229,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
LOGGER.warn("Unable to handle FullDataPayload - getNetworkCompressionExecutor() is null");
return null;
}
CompletableFuture.runAsync(() ->
{
try
@@ -233,6 +250,10 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
LodUtil.assertTrue(this.changedOnly, "Received empty data source response for not changes-only request");
}
}
catch (SectionRequiresSplittingException ignored)
{
return entry.future.complete(RequestResult.REQUIRES_SPLITTING);
}
catch (SessionClosedException | CancellationException ignored)
{
return entry.future.cancel(false);
@@ -240,7 +261,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
catch (RequestRejectedException e)
{
LOGGER.info("Request rejected by the server: " + e.getMessage());
return entry.future.complete(false);
return entry.future.complete(RequestResult.FAILED);
}
catch (RateLimitedException e)
{
@@ -272,15 +293,28 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
}
else
{
return entry.future.complete(false);
return entry.future.complete(RequestResult.FAILED);
}
}
return entry.future.complete(true);
return entry.future.complete(RequestResult.SUCCEEDED);
});
}
public boolean isPosCloserThanFarthestWaiting(DhBlockPos2D targetPos, long pos)
{
Long farthestPos = this.waitingTasksBySectionPos
.keySet().stream()
.max(Comparator.comparingInt(x -> DhSectionPos.getChebyshevSignedBlockDistance(x, targetPos)))
.orElse(null);
if (farthestPos == null)
{
return true;
}
return DhSectionPos.getChebyshevSignedBlockDistance(pos, targetPos) <= DhSectionPos.getChebyshevSignedBlockDistance(farthestPos, targetPos);
}
//=========================================//
@@ -387,7 +421,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
protected static class RequestQueueEntry
{
/** encapsulates the entire request, including client side queuing and the actual server request */
public final CompletableFuture<Boolean> future = new CompletableFuture<>();
public final CompletableFuture<RequestResult> future = new CompletableFuture<>();
public final Consumer<FullDataSourceV2> dataSourceConsumer;
/** will be null if we want to retrieve the LOD regardless of when it was last updated */
@Nullable
@@ -417,6 +451,13 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
}
public enum RequestResult
{
SUCCEEDED,
REQUIRES_SPLITTING,
FAILED,
}
}
@@ -0,0 +1,109 @@
package com.seibel.distanthorizons.core.multiplayer.server;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import javax.annotation.CheckForNull;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
class DataSourceRequestGroup
{
public final long pos;
/**
* If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps
*/
private boolean worldGenTaskComplete = false;
void markWorldGenTaskComplete()
{
this.worldGenTaskComplete = true;
}
boolean isWorldGenTaskComplete()
{
return this.worldGenTaskComplete;
}
@CheckForNull
public FullDataSourceV2 fullDataSource = null;
public final ConcurrentMap<Long, RequestData> requestMessages = new ConcurrentHashMap<>();
public final Semaphore pendingAdditionSemaphore = new Semaphore(Short.MAX_VALUE, true);
public final AtomicBoolean isClosed = new AtomicBoolean();
DataSourceRequestGroup(long pos)
{
this.pos = pos;
}
public boolean tryClose()
{
if (!this.isClosed.compareAndSet(false, true))
{
return false;
}
this.pendingAdditionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
return true;
}
public boolean tryAddRequest(RequestData requestData)
{
if (!this.pendingAdditionSemaphore.tryAcquire())
{
return false;
}
this.requestMessages.put(requestData.futureId(), requestData);
this.pendingAdditionSemaphore.release();
return true;
}
public RequestData tryRemoveRequest(long requestId, IHangingRequestTransferConsumer hangingRequestTransferConsumer)
{
RequestData removed = this.requestMessages.remove(requestId);
if (this.requestMessages.isEmpty() && this.tryClose())
{
hangingRequestTransferConsumer.accept(this.requestMessages.values());
}
return removed;
}
static class RequestData
{
public final ServerPlayerState serverPlayerState;
public final ServerPlayerState.RateLimiterSet rateLimiterSet;
public final FullDataSourceRequestMessage message;
public long futureId() { return this.message.futureId; }
public long sectionPos() { return this.message.sectionPos; }
RequestData(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
this.serverPlayerState = serverPlayerState;
this.rateLimiterSet = rateLimiterSet;
this.message = message;
}
}
/**
* While closing this group, some requests may slip through and end up lost. <br>
* This is a workaround that allows the caller to transfer these requests to a new group.
*/
@FunctionalInterface
interface IHangingRequestTransferConsumer extends Consumer<Collection<RequestData>> { }
}
@@ -0,0 +1,264 @@
package com.seibel.distanthorizons.core.multiplayer.server;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataSourceProvider;
import com.seibel.distanthorizons.core.level.AbstractDhServerLevel;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import org.apache.logging.log4j.LogManager;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
public class FullDataSourceRequestHandler
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Common.Logging.logNetworkEvent.get());
private final AbstractDhServerLevel serverLevel;
private String getLevelIdentifier() { return this.serverLevel.getLevelWrapper().getDhIdentifier(); }
private GeneratedFullDataSourceProvider fullDataSourceProvider() { return this.serverLevel.serverside.fullDataFileHandler; }
private List<BeaconBeamDTO> getAllBeamsForPos(long pos) { return this.serverLevel.beaconBeamRepo.getAllBeamsForPos(pos); }
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupsByPos = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupsByFutureId = new ConcurrentHashMap<>();
public FullDataSourceRequestHandler(AbstractDhServerLevel serverLevel)
{
this.serverLevel = serverLevel;
}
//==================//
// network handling //
//==================//
public void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message))
{
return;
}
// the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated
long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1;
// the server timestamp will be null if no LOD data exists for this position
Long serverTimestamp = this.fullDataSourceProvider().getTimestampForPos(message.sectionPos);
if (serverTimestamp == null
|| serverTimestamp <= clientTimestamp)
{
// either no data exists to sync, or the client is already up to date
rateLimiterSet.syncOnLoginRateLimiter.release();
message.sendResponse(new FullDataSourceResponseMessage(null));
return;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
// shouldn't normally happen, but just in case
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
return;
}
this.fullDataSourceProvider().getAsync(message.sectionPos).thenAcceptAsync(fullDataSource ->
{
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.getAllBeamsForPos(message.sectionPos)))
{
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () ->
{
message.sendResponse(new FullDataSourceResponseMessage(payload));
rateLimiterSet.syncOnLoginRateLimiter.release();
});
}
}, executor);
}
public void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message))
{
return;
}
this.doQueueWorldGenForRequestMessage(new DataSourceRequestGroup.RequestData(serverPlayerState, message, rateLimiterSet));
}
private void doQueueWorldGenForRequestMessage(DataSourceRequestGroup.RequestData requestData)
{
while (true)
{
DataSourceRequestGroup requestGroup = this.requestGroupsByPos.computeIfAbsent(requestData.sectionPos(), pos ->
{
DataSourceRequestGroup newGroup = new DataSourceRequestGroup(pos);
this.tryFulfillDataSourceRequestGroup(newGroup, pos);
LOGGER.debug("[" + this.getLevelIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "].");
return newGroup;
});
// If this fails, loop until either a permit is acquired or the group is removed to create another one
if (!requestGroup.tryAddRequest(requestData))
{
Thread.yield();
continue;
}
this.requestGroupsByFutureId.put(requestData.futureId(), requestGroup);
break;
}
}
public void cancelRequest(long requestId)
{
DataSourceRequestGroup requestGroup = this.requestGroupsByFutureId.remove(requestId);
if (requestGroup == null)
{
return;
}
DataSourceRequestGroup.RequestData removedRequest = requestGroup.tryRemoveRequest(requestId, requestsToTransfer ->
{
LOGGER.debug("[" + this.getLevelIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestGroup.pos) + "].");
this.requestGroupsByPos.remove(requestGroup.pos);
if (!requestsToTransfer.isEmpty())
{
for (DataSourceRequestGroup.RequestData requestToTransfer : requestsToTransfer)
{
this.doQueueWorldGenForRequestMessage(requestToTransfer);
}
}
else
{
this.fullDataSourceProvider().removeRetrievalRequestIf(pos -> pos == requestGroup.pos);
}
});
if (removedRequest != null)
{
removedRequest.rateLimiterSet.generationRequestRateLimiter.release();
}
}
public void tick()
{
// Send finished data source requests
for (Map.Entry<Long, DataSourceRequestGroup> entry : this.requestGroupsByPos.entrySet())
{
DataSourceRequestGroup requestGroup = entry.getValue();
if (requestGroup.fullDataSource == null)
{
continue;
}
LOGGER.debug("[" + this.getLevelIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]");
// Make this group unavailable for adding into
this.requestGroupsByPos.remove(entry.getKey());
if (!requestGroup.tryClose())
{
continue;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
continue;
}
CompletableFuture.runAsync(() ->
{
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.getAllBeamsForPos(entry.getKey())))
{
for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values())
{
this.requestGroupsByFutureId.remove(requestData.futureId());
requestData.serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
requestData.message.sendResponse(new FullDataSourceResponseMessage(payload));
requestData.rateLimiterSet.generationRequestRateLimiter.release();
});
}
}
}, executor);
}
}
private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos)
{
this.fullDataSourceProvider().getAsync(pos).thenAccept(fullDataSource ->
{
if (this.fullDataSourceProvider().isFullyGenerated(fullDataSource.columnGenerationSteps))
{
LOGGER.info("sending - complete [" + DhSectionPos.toString(pos) + "]");
requestGroup.fullDataSource = fullDataSource;
}
else if (!this.serverLevel.isNSizedGenerationSupported() && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL)
{
// Make this group unavailable for adding into
this.requestGroupsByPos.remove(pos);
if (!requestGroup.tryClose())
{
return;
}
for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values())
{
this.requestGroupsByFutureId.remove(requestData.futureId());
requestData.rateLimiterSet.generationRequestRateLimiter.release();
requestData.message.sendResponse(new SectionRequiresSplittingException());
}
}
else if (requestGroup.isWorldGenTaskComplete())
{
LOGGER.info("sending - retry [" + DhSectionPos.toString(pos) + "]");
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
else
{
LOGGER.info("sending - queueing [" + DhSectionPos.toString(pos) + "]");
this.fullDataSourceProvider().queuePositionForRetrieval(pos, true);
}
});
}
public void onWorldGenTaskComplete(long pos)
{
DataSourceRequestGroup requestGroup = this.requestGroupsByPos.get(pos);
if (requestGroup != null)
{
requestGroup.markWorldGenTaskComplete();
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
}
}
@@ -0,0 +1,29 @@
/*
* This file is part of the Distant Horizons mod
* licensed under the GNU LGPL v3 License.
*
* Copyright (C) 2020-2023 James Seibel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.seibel.distanthorizons.core.network.exceptions;
/** Fired if the current section is not fully generated and underlying generator does not support N-sized generation. */
public class SectionRequiresSplittingException extends Exception
{
public SectionRequiresSplittingException() { this("Section requires splitting"); }
public SectionRequiresSplittingException(String message) { super(message); }
}
@@ -23,6 +23,7 @@ import com.google.common.base.MoreObjects;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.exceptions.RequestOutOfRangeException;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException;
import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage;
import io.netty.buffer.ByteBuf;
@@ -38,6 +39,7 @@ public class ExceptionMessage extends AbstractTrackableMessage
this.add(RateLimitedException.class);
this.add(RequestOutOfRangeException.class);
this.add(RequestRejectedException.class);
this.add(SectionRequiresSplittingException.class);
}};
public Exception exception;
@@ -482,7 +482,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
}
long pos = this.missingGenerationPos.removeLong(i);
boolean positionQueued = this.fullDataSourceProvider.queuePositionForRetrieval(pos);
boolean positionQueued = (this.fullDataSourceProvider.queuePositionForRetrieval(pos, false) != null);
if (!positionQueued)
{
// shouldn't normally happen, but just in case