proof-of-concept n-sized multiplayer request support

This commit is contained in:
James Seibel
2024-10-27 16:14:56 -05:00
parent 2dd83d182f
commit 23b2a62db2
9 changed files with 89 additions and 45 deletions
@@ -66,7 +66,7 @@ public class SharedApi
private static final UpdateChunkPosManager UPDATE_POS_MANAGER = new UpdateChunkPosManager();
/** how many chunks can be queued for updating per thread, used to prevent updates from infinitely pilling up if the user flies around extremely fast */
private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 500;
private static final int MAX_UPDATING_CHUNK_COUNT_PER_THREAD = 1_000;
/** how many milliseconds must pass before an overloaded message can be sent in chat or the log */
private static final int MIN_MS_BETWEEN_OVERLOADED_LOG_MESSAGE = 30_000;
@@ -28,6 +28,7 @@ import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSour
import com.seibel.distanthorizons.core.dependencyInjection.SingletonInjector;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.file.AbstractDataSourceHandler;
import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
@@ -137,15 +138,15 @@ public class FullDataSourceProviderV2
this.migrationThreadPool.execute(this::convertLegacyDataSources);
// update propagation doesn't need to be run on the server since only the highest detail level is needed
if (SharedApi.getEnvironment() != EWorldEnvironment.SERVER_ONLY)
{
//if (SharedApi.getEnvironment() != EWorldEnvironment.SERVER_ONLY) // TODO
//{
this.updateQueueProcessor = ThreadUtil.makeSingleThreadPool("Parent Update Queue ["+dimensionName+"]");
this.updateQueueProcessor.execute(this::runUpdateQueue);
}
else
{
this.updateQueueProcessor = null;
}
//}
//else
//{
// this.updateQueueProcessor = null;
//}
}
@@ -604,7 +605,8 @@ public class FullDataSourceProviderV2
public int getMaxPossibleRetrievalPositionCountForPos(Long pos) { return -1; }
/** @return true if the position was queued, false if not */
public boolean queuePositionForRetrieval(Long genPos) { return false; }
@Nullable
public CompletableFuture<WorldGenResult> queuePositionForRetrieval(Long genPos) { return null; }
/** does nothing if the given position isn't present in the queue */
public void removeRetrievalRequestIf(DhSectionPos.ICancelablePrimitiveLongConsumer removeIf) { }
@@ -632,8 +634,6 @@ public class FullDataSourceProviderV2
@Nullable
public Long getTimestampForPos(long pos)
{ return this.repo.getTimestampForPos(pos); }
public Map<Long, Long> getTimestampsForRange(byte detailLevel, int startPosX, int startPosZ, int endPosX, int endPosZ)
{ return this.repo.getTimestampsForRange(detailLevel, startPosX, startPosZ, endPosX, endPosZ); }
@@ -20,6 +20,7 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
import com.seibel.distanthorizons.api.enums.worldGeneration.EDhApiWorldGenerationStep;
import com.seibel.distanthorizons.core.api.internal.SharedApi;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
@@ -35,9 +36,11 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.world.EWorldEnvironment;
import com.seibel.distanthorizons.coreapi.util.BitShiftUtil;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.awt.*;
@@ -214,19 +217,34 @@ public class GeneratedFullDataSourceProvider extends FullDataSourceProviderV2 im
}
@Override
public boolean queuePositionForRetrieval(Long genPos)
public CompletableFuture<WorldGenResult> queuePositionForRetrieval(Long genPos)
{
IFullDataSourceRetrievalQueue worldGenQueue = this.worldGenQueueRef.get();
if (worldGenQueue == null)
{
return false;
return null;
}
WorldGenTaskTracker genTaskTracker = new WorldGenTaskTracker(genPos);
CompletableFuture<WorldGenResult> worldGenFuture = worldGenQueue.submitRetrievalTask(genPos, (byte) (DhSectionPos.getDetailLevel(genPos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL), genTaskTracker);
worldGenFuture.whenComplete((genTaskResult, ex) -> this.onWorldGenTaskComplete(genTaskResult, ex));
worldGenFuture.whenComplete((genTaskResult, ex) ->
{
LOGGER.info("gen task complete ["+DhSectionPos.toString(genPos)+"]");
//this.onWorldGenTaskComplete(genTaskResult, ex);
});
return true;
return worldGenFuture;
}
@Override
protected void updateDataSourceAtPos(long updatePos, @NotNull FullDataSourceV2 inputData, boolean lockOnUpdatePos)
{
super.updateDataSourceAtPos(updatePos, inputData, lockOnUpdatePos);
//if (SharedApi.getEnvironment() != EWorldEnvironment.CLIENT_ONLY)
// LOGGER.info("updated ["+DhSectionPos.toString(updatePos)+"]");
this.onWorldGenTaskComplete(WorldGenResult.CreateSuccess(updatePos), null);
}
@Override
@@ -81,35 +81,43 @@ public class RemoteFullDataSourceProvider extends GeneratedFullDataSourceProvide
}
//===========================//
// request timestamp updates //
// from server //
//===========================//
// get the timestamp for every maximum detail position in this section
int posToMinimumDetailScale = BitShiftUtil.powerOfTwo(DhSectionPos.getDetailLevel(pos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL);
Map<Long, Long> timestamps = this.getTimestampsForRange(
DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL,
DhSectionPos.getX(pos) * posToMinimumDetailScale,
DhSectionPos.getZ(pos) * posToMinimumDetailScale,
(DhSectionPos.getX(pos) + 1) * posToMinimumDetailScale,
(DhSectionPos.getZ(pos) + 1) * posToMinimumDetailScale
);
// TODO
//// get the timestamp for every maximum detail position in this section
//int posToMinimumDetailScale = BitShiftUtil.powerOfTwo(DhSectionPos.getDetailLevel(pos) - DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL);
//Map<Long, Long> timestamps = this.getTimestampsForRange(
// DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL,
// DhSectionPos.getX(pos) * posToMinimumDetailScale,
// DhSectionPos.getZ(pos) * posToMinimumDetailScale,
// (DhSectionPos.getX(pos) + 1) * posToMinimumDetailScale,
// (DhSectionPos.getZ(pos) + 1) * posToMinimumDetailScale
//);
//
//DhSectionPos.forEachChildAtDetailLevel(pos, DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos ->
//{
// if (!this.visitedPositions.add(childPos))
// {
// return;
// }
//
// // check if the server has newer versions of these LODs
// Long subTimestamp = timestamps.get(childPos);
// if (subTimestamp != null)
// {
// this.syncOnLoadRequestQueue.submitRequest(childPos, subTimestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave);
// }
//});
DhSectionPos.forEachChildAtDetailLevel(pos, DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos ->
Long timestamp = this.getTimestampForPos(pos);
if (timestamp != null)
{
if (!this.visitedPositions.add(childPos))
{
return;
}
// check if the server has newer versions of these LODs
Long subTimestamp = timestamps.get(childPos);
if (subTimestamp != null)
{
this.syncOnLoadRequestQueue.submitRequest(childPos, subTimestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave);
}
});
this.syncOnLoadRequestQueue.submitRequest(pos, timestamp, this.delayedFullDataSourceSaveCache::queueDataSourceForUpdateAndSave);
}
return super.get(pos);
}
@@ -39,7 +39,7 @@ public class RemoteWorldRetrievalQueue extends AbstractFullDataNetworkRequestQue
public void startAndSetTargetPos(DhBlockPos2D targetPos) { super.tick(targetPos); }
@Override
public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; }
public byte lowestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL + 12; } // TODO should be the same as what the server's update propgator can provide
@Override
public byte highestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; }
@@ -143,6 +143,7 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
// Assert that the data at least can fill in 1 single ChunkSizedFullDataAccessor
LodUtil.assertTrue(DhSectionPos.getDetailLevel(pos) > requiredDataDetail + LodUtil.CHUNK_DETAIL_LEVEL);
LOGGER.info("queueing gen ["+DhSectionPos.toString(pos)+"]");
CompletableFuture<WorldGenResult> future = new CompletableFuture<>();
this.waitingTasks.put(pos, new WorldGenTask(pos, requiredDataDetail, tracker, future));
@@ -282,6 +283,8 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
{
//LOGGER.trace("Unable to start task: "+closestTask.pos+", skipping. Task position may have already been generated.");
}
//LOGGER.info("started gen ["+DhSectionPos.toString(closestTask.pos)+"]");
}
else
{
@@ -318,6 +321,8 @@ public class WorldGenerationQueue implements IFullDataSourceRetrievalQueue, IDeb
// send the child futures to the future recipient, to notify them of the new tasks
closestTask.future.complete(WorldGenResult.CreateSplit(childFutures));
//LOGGER.info("split ["+DhSectionPos.toString(sectionPos)+"]");
// return true so we attempt to generate again
return true;
}
@@ -197,6 +197,8 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
ServerPlayerState.RateLimiterSet rateLimiterSet = serverPlayerState.getRateLimiterSet(this);
LOGGER.info("received message ["+DhSectionPos.toString(message.sectionPos)+"]");
if (message.clientTimestamp == null)
{
this.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet);
@@ -281,7 +283,8 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
Objects.requireNonNull(this.beaconBeamRepo);
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos)))
{
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () ->
{
message.sendResponse(new FullDataSourceResponseMessage(payload));
rateLimiterSet.syncOnLoginRateLimiter.release();
});
@@ -388,15 +391,19 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
{
if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps))
{
LOGGER.info("sending - complete ["+DhSectionPos.toString(pos)+"]");
requestGroup.fullDataSource = fullDataSource;
}
else if (requestGroup.worldGenTaskComplete)
{
// If the returned data source is not fully generated, try reading it again
LOGGER.info("sending - retry ["+DhSectionPos.toString(pos)+"]");
// If the returned data source is not fully generated, try reading it again // can wait for a while if waiting for worldgen and/or update propagation
try { Thread.sleep(250); } catch (InterruptedException ignore) {}
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
else
{
LOGGER.info("sending - queueing ["+DhSectionPos.toString(pos)+"]");
this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos);
}
});
@@ -19,7 +19,6 @@ import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.util.TimerUtil;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.ratelimiting.SupplierBasedRateLimiter;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
@@ -109,20 +108,25 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
{ return this.submitRequest(sectionPos, null, dataSourceConsumer); }
public CompletableFuture<Boolean> submitRequest(long sectionPos, @Nullable Long clientTimestamp, Consumer<FullDataSourceV2> dataSourceConsumer)
{
LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed.");
//LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed.");
RequestQueueEntry entry = new RequestQueueEntry(dataSourceConsumer, clientTimestamp);
entry.future.whenComplete((success, throwable) ->
{
LOGGER.info("received ["+DhSectionPos.toString(sectionPos)+"]");
this.waitingTasksBySectionPos.remove(sectionPos);
this.finishedRequests.incrementAndGet();
if (!success || throwable != null)
if ((success == null || !success)
|| throwable != null)
{
this.failedRequests.incrementAndGet();
}
});
LOGGER.info("asking server for ["+DhSectionPos.toString(sectionPos)+"]");
this.waitingTasksBySectionPos.put(sectionPos, entry);
return entry.future;
}
@@ -157,7 +161,8 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
}
private void sendNextRequest(DhBlockPos2D targetPos)
{
Map.Entry<Long, RequestQueueEntry> mapEntry = this.waitingTasksBySectionPos.entrySet().stream()
Map.Entry<Long, RequestQueueEntry> mapEntry = this.waitingTasksBySectionPos
.entrySet().stream()
.filter(task -> task.getValue().networkDataSourceFuture == null)
.min(Comparator.comparingInt(x -> posDistanceSquared(targetPos, x.getKey())))
.orElse(null);
@@ -201,6 +206,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
LOGGER.warn("Unable to handle FullDataPayload - getNetworkCompressionExecutor() is null");
return null;
}
CompletableFuture.runAsync(() ->
{
try
@@ -484,7 +484,7 @@ public class LodRenderSection implements IDebugRenderable, AutoCloseable
}
long pos = this.missingGenerationPos.removeLong(i);
boolean positionQueued = this.fullDataSourceProvider.queuePositionForRetrieval(pos);
boolean positionQueued = (this.fullDataSourceProvider.queuePositionForRetrieval(pos) != null);
if (!positionQueued)
{
// shouldn't normally happen, but just in case