Post-relog updates

This commit is contained in:
s809
2024-01-27 19:45:30 +05:00
parent 7e2019abd4
commit 61c516df1d
20 changed files with 746 additions and 371 deletions
@@ -737,7 +737,7 @@ public class Config
// deprecated and not implemented, can be made public if we ever re-implement it
@Deprecated
private static ConfigEntry<EGenerationPriority> generationPriority = new ConfigEntry.Builder<EGenerationPriority>()
private static final ConfigEntry<EGenerationPriority> generationPriority = new ConfigEntry.Builder<EGenerationPriority>()
.set(EGenerationPriority.NEAR_FIRST)
.comment(""
+ "In what priority should fake chunks be generated outside the vanilla render distance? \n"
@@ -871,8 +871,7 @@ public class Config
+ "")
.build();
/** Disabled, previous implementation is too terrible to continue using it. */
private static ConfigEntry<Boolean> enablePostRelogUpdate = new ConfigEntry.Builder<Boolean>()
public static ConfigEntry<Boolean> enablePostRelogUpdate = new ConfigEntry.Builder<Boolean>()
.setServersideShortName("enablePostRelogUpdate")
.set(false)
.comment(""
@@ -903,7 +902,7 @@ public class Config
.setServersideShortName("fullDataRequestConcurrencyLimit")
.setMinDefaultMax(1, 20, 100)
.comment(""
+ "Limits the amount of sent/processed LOD requests concurrently on server, per player. \n"
+ "Limits the amount of sent/processed LOD *generation* requests concurrently on server, per player. \n"
+ "")
.build();
@@ -923,11 +922,19 @@ public class Config
+ "")
.build();
public static ConfigEntry<Integer> postRelogUpdateConcurrencyLimit = new ConfigEntry.Builder<Integer>()
.setServersideShortName("postRelogUpdateConcurrencyLimit")
.setMinDefaultMax(1, 50, 100)
.comment(""
+ "Limits the amount of sent/processed LOD *update* requests concurrently on server, per player. \n"
+ "")
.build();
/**
* Intentionally disabled.
* @see #enablePostRelogUpdate
*/
private static ConfigEntry<Integer> fullDataChangeSummaryRequestRateLimit = new ConfigEntry.Builder<Integer>()
private static final ConfigEntry<Integer> fullDataChangeSummaryRequestRateLimit = new ConfigEntry.Builder<Integer>()
.setServersideShortName("fullDataChangeSummaryRequestRateLimit")
.setMinDefaultMax(1, 20, 100)
.comment(""
@@ -1085,7 +1092,7 @@ public class Config
// deprecated and not implemented, can be made public if we ever re-implement it
@Deprecated
private static ConfigEntry<EBufferRebuildTimes> rebuildTimes = new ConfigEntry.Builder<EBufferRebuildTimes>()
private static final ConfigEntry<EBufferRebuildTimes> rebuildTimes = new ConfigEntry.Builder<EBufferRebuildTimes>()
.set(EBufferRebuildTimes.NORMAL)
.comment(""
+ "How frequently should vertex buffers (geometry) be rebuilt and sent to the GPU? \n"
@@ -57,13 +57,13 @@ public class FullDataFileHandler extends AbstractDataSourceHandler<IFullDataSour
for (DhSectionPos pos : posList)
{
map.put(pos,
unsavedDataSourceBySectionPos.containsKey(pos) ? 3 // Loaded
this.unsavedDataSourceBySectionPos.containsKey(pos) ? 3 // Loaded
: this.fileExists(pos) ? 2 // Unloaded
: 1); // Not generated
}
return map;
}
protected boolean fileExists(DhSectionPos pos) { return this.repo.existsWithPrimaryKey(pos.serialize()); }
public boolean fileExists(DhSectionPos pos) { return this.repo.existsWithPrimaryKey(pos.serialize()); }
//=============//
@@ -19,17 +19,53 @@
package com.seibel.distanthorizons.core.file.fullDatafile;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IIncompleteFullDataSource;
import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure;
import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.multiplayer.client.FullDataRefreshQueue;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import org.jetbrains.annotations.Nullable;
import java.io.File;
import java.util.Objects;
public class RemoteFullDataFileHandler extends GeneratedFullDataFileHandler
{
public RemoteFullDataFileHandler(IDhLevel level, AbstractSaveStructure saveStructure, @Nullable File saveDirOverride)
@Nullable
private final FullDataRefreshQueue dataRefreshQueue;
public RemoteFullDataFileHandler(IDhClientLevel level, AbstractSaveStructure saveStructure, @Nullable File saveDirOverride, @Nullable FullDataRefreshQueue dataRefreshQueue)
{
super(level, saveStructure, saveDirOverride);
this.dataRefreshQueue = dataRefreshQueue;
}
@Override
public IFullDataSource get(DhSectionPos pos)
{
IFullDataSource fullDataSource = super.get(pos);
if (fullDataSource instanceof IIncompleteFullDataSource || this.dataRefreshQueue == null)
{
return fullDataSource;
}
pos.forEachChildAtLevel(DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, childPos -> {
int checksum = Objects.requireNonNull(this.repo.getChecksumForSection(childPos));
this.dataRefreshQueue.submitRequest(childPos, this.level::updateDataSourcesWithChunkData, checksum);
});
return fullDataSource;
}
@Override
public void close()
{
if (this.dataRefreshQueue != null)
{
this.dataRefreshQueue.close();
}
super.close();
}
}
@@ -3,71 +3,50 @@ package com.seibel.distanthorizons.core.generation;
import com.google.common.base.Stopwatch;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.generation.tasks.IWorldGenTaskTracker;
import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.multiplayer.client.AbstractFullDataRequestQueue;
import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState;
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.generation.priority.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.generation.priority.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.LodUtil;
import io.netty.channel.ChannelException;
import org.apache.logging.log4j.Logger;
import javax.annotation.CheckForNull;
import java.awt.*;
import java.time.Duration;
import java.util.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebugRenderable
public class WorldRemoteGenerationQueue extends AbstractFullDataRequestQueue implements IWorldGenerationQueue, IDebugRenderable
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
private final ClientNetworkState networkState;
private final IDhClientLevel level;
// Used to prevent requests for section very far away, as result of request list not completely filled.
// Kinda a hack, since queue is not notified when file handler is done with feeding sections to generate
private static final ConfigEntry<Integer> REQUEST_BEGIN_DELAY = Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestBeginDelay;
private final Stopwatch requestBeginStopwatch = Stopwatch.createStarted();
private volatile CompletableFuture<Void> generatorClosingFuture = null;
private final ConcurrentMap<DhSectionPos, WorldGenQueueEntry> waitingTasks = new ConcurrentHashMap<>();
private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true);
private CompletableFuture<?> genTaskPriorityRequest = CompletableFuture.completedFuture(null);
private final Semaphore genTaskPriorityRequestSemaphore = new Semaphore(1, true);
private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log);
private final AtomicInteger finishedRequests = new AtomicInteger();
private final AtomicInteger failedRequests = new AtomicInteger();
private final Set<DhSectionPos> alreadyGeneratedPosHashSet = ConcurrentHashMap.newKeySet();
@Override
protected int getRequestConcurrencyLimit() { return this.networkState.config.fullDataRequestConcurrencyLimit; }
@Override
protected String getQueueName() { return "World Remote Generation Queue"; }
public WorldRemoteGenerationQueue(ClientNetworkState networkState, IDhClientLevel level)
{
this.networkState = networkState;
this.level = level;
DebugRenderer.register(this, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue);
super(networkState, level, false, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue);
}
@@ -85,65 +64,53 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
@Override
public CompletableFuture<WorldGenResult> submitGenTask(DhSectionPos sectionPos, byte requiredDataDetail, IWorldGenTaskTracker tracker)
{
LodUtil.assertTrue(sectionPos.getDetailLevel() == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed.");
// check if this is a duplicate generation task
if (this.alreadyGeneratedPosHashSet.contains(sectionPos))
{
// temporary solution to prevent generating the same section multiple times
LOGGER.trace("Duplicate generation section " + sectionPos + ". Skipping...");
return CompletableFuture.completedFuture(WorldGenResult.CreateFail());
}
this.alreadyGeneratedPosHashSet.add(sectionPos);
WorldGenQueueEntry entry = new WorldGenQueueEntry(new CompletableFuture<>(), tracker);
waitingTasks.put(sectionPos, entry);
return entry.future;
}
private int posDistanceSquared(DhBlockPos2D targetPos, DhSectionPos pos)
{
return (int) pos.getCenterBlockPos().distSquared(targetPos);
return super.submitRequest(sectionPos, tracker.getChunkDataConsumer())
.thenApply(result -> result
? WorldGenResult.CreateSuccess(sectionPos)
: WorldGenResult.CreateFail());
}
@Override
public void startGenerationQueueAndSetTargetPos(DhBlockPos2D targetPos)
{
if (generatorClosingFuture != null || !networkState.getClient().isReady()) return;
if (requestBeginStopwatch.elapsed(TimeUnit.SECONDS) < REQUEST_BEGIN_DELAY.get()) return;
while (getWaitingTaskCount() > getInProgressTaskCount()
&& getInProgressTaskCount() < this.networkState.config.fullDataRequestConcurrencyLimit
&& pendingTasksSemaphore.tryAcquire())
if (this.requestBeginStopwatch.elapsed(TimeUnit.SECONDS) < REQUEST_BEGIN_DELAY.get())
{
sendNewRequest(targetPos);
return;
}
if (!super.tick(targetPos))
{
return;
}
if (genTaskPriorityRequestSemaphore.tryAcquire()) {
List<DhSectionPos> posList = waitingTasks.entrySet().stream()
if (this.genTaskPriorityRequestSemaphore.tryAcquire()) {
List<DhSectionPos> posList = this.waitingTasks.entrySet().stream()
.filter(task -> task.getValue().request == null && task.getValue().priority == 0)
.sorted((x, y) -> posDistanceSquared(targetPos, x.getKey()) - posDistanceSquared(targetPos, y.getKey()))
.sorted((x, y) -> this.posDistanceSquared(targetPos, x.getKey()) - this.posDistanceSquared(targetPos, y.getKey()))
.limit(this.networkState.config.genTaskPriorityRequestRateLimit)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
if (posList.isEmpty()) {
genTaskPriorityRequestSemaphore.release();
this.genTaskPriorityRequestSemaphore.release();
return;
};
CompletableFuture<GenTaskPriorityResponseMessage> request = this.networkState.getClient().sendRequest(new GenTaskPriorityRequestMessage(posList), GenTaskPriorityResponseMessage.class);
genTaskPriorityRequest = request;
this.genTaskPriorityRequest = request;
request.handleAsync((response, throwable) -> {
try
{
if (throwable != null)
{
throw throwable;
}
for (Map.Entry<DhSectionPos, Integer> mapEntry : response.posList.entrySet())
{
WorldGenQueueEntry entry = waitingTasks.get(mapEntry.getKey());
RequestQueueEntry entry = this.waitingTasks.get(mapEntry.getKey());
if (entry != null)
{
entry.priority = mapEntry.getValue();
}
}
}
catch (ChannelException | CancellationException | RateLimitedException ignored)
@@ -154,7 +121,7 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
LOGGER.error("Error while fetching gen task priorities", e);
}
genTaskPriorityRequestSemaphore.release();
this.genTaskPriorityRequestSemaphore.release();
return null;
});
}
@@ -163,168 +130,28 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
@Override
public void cancelGenTasks(Iterable<DhSectionPos> positions)
{
for (DhSectionPos pos : positions)
{
WorldGenQueueEntry entry = waitingTasks.remove(pos);
if (entry != null)
{
entry.future.cancel(false);
if (entry.request != null)
entry.request.cancel(false);
alreadyGeneratedPosHashSet.remove(pos);
}
}
super.cancelRequests(positions);
}
private void sendNewRequest(DhBlockPos2D targetPos)
{
Map.Entry<DhSectionPos, WorldGenQueueEntry> mapEntry = waitingTasks.entrySet().stream()
.filter(task -> task.getValue().request == null)
.reduce(null, (a, b)
-> a == null
|| b.getValue().priority > a.getValue().priority
|| (b.getValue().priority == a.getValue().priority && posDistanceSquared(targetPos, b.getKey()) < posDistanceSquared(targetPos, a.getKey()))
? b : a);
if (mapEntry == null)
{
pendingTasksSemaphore.release();
return;
}
DhSectionPos sectionPos = mapEntry.getKey();
WorldGenQueueEntry entry = mapEntry.getValue();
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.getClient().sendRequest(new FullDataSourceRequestMessage(level.getLevelWrapper(), sectionPos), FullDataSourceResponseMessage.class);
entry.request = request;
request.handleAsync((response, throwable) ->
{
pendingTasksSemaphore.release();
finishedRequests.incrementAndGet();
try
{
if (throwable != null)
throw throwable;
waitingTasks.remove(sectionPos);
LOGGER.debug("FullDataSourceResponseMessage " + sectionPos);
CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level);
Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer = entry.tracker.getChunkDataConsumer();
// FIXME Why keeping a reference in first place
if (chunkDataConsumer == null)
return entry.future.cancel(false);
fullDataSource.splitIntoChunkSizedAccessors(chunkDataConsumer);
response.getFullDataSourceLoader().returnPooledDataSource(fullDataSource);
}
catch (InvalidLevelException ignored)
{
// We're too late
}
catch (ChannelException | RateLimitedException e)
{
if (e instanceof RateLimitedException)
LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage());
entry.request = null;
finishedRequests.decrementAndGet();
}
catch (CancellationException ignored)
{
finishedRequests.decrementAndGet();
}
catch (Throwable e)
{
LOGGER.error("Error while fetching full data source", e);
failedRequests.incrementAndGet();
return entry.future.complete(WorldGenResult.CreateFail());
}
return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos));
});
}
private String[] f3Log()
{
ArrayList<String> lines = new ArrayList<>();
lines.add("World Remote Generation Queue ["+level.getClientLevelWrapper().getDimensionType().getDimensionName()+"]");
lines.add("Requests: "+this.finishedRequests+" / "+(this.getWaitingTaskCount() + this.finishedRequests.get())+" (failed: "+ this.failedRequests+", rate limit: "+this.networkState.config.fullDataRequestConcurrencyLimit +")");
return lines.toArray(new String[0]);
}
@Override
public int getWaitingTaskCount() { return this.waitingTasks.size(); }
@Override
public int getInProgressTaskCount() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); }
@Override
public CompletableFuture<Void> startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning)
{
return this.generatorClosingFuture = CompletableFuture.runAsync(() -> {
return CompletableFuture.allOf(super.startClosing(alsoInterruptRunning), CompletableFuture.runAsync(() -> {
Stopwatch stopwatch = Stopwatch.createStarted();
do
{
if (genTaskPriorityRequest.cancel(false))
genTaskPriorityRequestSemaphore.release();
}
while (!genTaskPriorityRequestSemaphore.tryAcquire() && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS);
do
{
for (WorldGenQueueEntry entry : this.waitingTasks.values())
if (this.genTaskPriorityRequest.cancel(false))
{
entry.future.cancel(alsoInterruptRunning);
if (entry.request != null && entry.request.cancel(alsoInterruptRunning))
pendingTasksSemaphore.release();
this.genTaskPriorityRequestSemaphore.release();
}
}
while (!pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE) && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS);
while (!this.genTaskPriorityRequestSemaphore.tryAcquire() && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS);
if (stopwatch.elapsed(TimeUnit.SECONDS) >= SHUTDOWN_TIMEOUT_SECONDS)
LOGGER.warn("Generation queue for " + level.getLevelWrapper() + " did not shutdown in " + SHUTDOWN_TIMEOUT_SECONDS + " seconds! Some unfinished tasks might be left hanging.");
});
}
@Override
public void close()
{
f3Message.close();
DebugRenderer.unregister(this, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue);
}
@Override
public void debugRender(DebugRenderer r)
{
for (Map.Entry<DhSectionPos, WorldGenQueueEntry> mapEntry : waitingTasks.entrySet())
{
r.renderBox(new DebugRenderer.Box(mapEntry.getKey(), -32f, 64f, 0.05f,
mapEntry.getValue().request != null ? Color.red
: mapEntry.getValue().priority == 3 ? Color.orange
: mapEntry.getValue().priority == 2 ? Color.cyan
: mapEntry.getValue().priority == 1 ? Color.blue
: Color.gray
));
}
}
private static class WorldGenQueueEntry
{
public final CompletableFuture<WorldGenResult> future;
public final IWorldGenTaskTracker tracker;
// Higher value = higher priority.
// Priority of 0 is reserved for unassigned value
public int priority = 0;
@CheckForNull
public CompletableFuture<?> request;
public WorldGenQueueEntry(CompletableFuture<WorldGenResult> future, IWorldGenTaskTracker tracker)
{
this.future = future;
this.tracker = tracker;
}
{
LOGGER.warn("Priority request queue for " + this.level.getLevelWrapper() + " did not shutdown in " + SHUTDOWN_TIMEOUT_SECONDS + " seconds! It might be left hanging.");
}
}));
}
}
@@ -29,6 +29,7 @@ import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure;
import com.seibel.distanthorizons.core.generation.WorldRemoteGenerationQueue;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.multiplayer.client.ClientNetworkState;
import com.seibel.distanthorizons.core.multiplayer.client.FullDataRefreshQueue;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.messages.fullData.updates.FullDataPartialUpdateMessage;
@@ -73,11 +74,15 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
private final ClientNetworkState networkState;
@Nullable
private final ScopedNetworkEventSource<NetworkClient> eventSource;
public final WorldGenModule worldGenModule;
public final AppliedConfigState<Boolean> worldGeneratorEnabledConfig;
@Nullable
private final FullDataRefreshQueue dataRefreshQueue;
//=============//
// constructor //
//=============//
@@ -91,21 +96,24 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
}
this.levelWrapper = clientLevelWrapper;
this.saveStructure = saveStructure;
this.dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure, fullDataSaveDirOverride);
this.worldGeneratorEnabledConfig = new AppliedConfigState<>(Config.Client.Advanced.WorldGenerator.enableDistantGeneration);
this.networkState = networkState;
this.worldGenModule = new WorldGenModule(dataFileHandler, this);
if (networkState != null)
{
this.eventSource = new ScopedNetworkEventSource<>(networkState.getClient());
this.dataRefreshQueue = new FullDataRefreshQueue(this, networkState);
this.registerNetworkHandlers();
}
else
{
this.eventSource = null;
this.dataRefreshQueue = null;
}
this.dataFileHandler = new RemoteFullDataFileHandler(this, saveStructure, fullDataSaveDirOverride, this.dataRefreshQueue);
this.worldGeneratorEnabledConfig = new AppliedConfigState<>(Config.Client.Advanced.WorldGenerator.enableDistantGeneration);
this.worldGenModule = new WorldGenModule(this.dataFileHandler, this);
this.clientside = new ClientLevelModule(this);
if (enableRendering)
{
@@ -123,7 +131,10 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
try
{
ChunkSizedFullDataAccessor fullDataAccessor = msg.getFullDataSource(this);
if (fullDataAccessor == null) return;
if (fullDataAccessor == null)
{
return;
}
this.updateDataSourcesWithChunkData(fullDataAccessor);
}
@@ -145,6 +156,11 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
{
this.chunkToLodBuilder.tick();
this.clientside.clientTick();
if (this.dataRefreshQueue != null)
{
this.dataRefreshQueue.tick(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos()));
}
}
catch (Exception e)
{
@@ -152,37 +168,38 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
}
}
@Override
public void doWorldGen()
{
boolean isClientUsable = networkState != null && !networkState.getClient().isClosed();
boolean shouldDoWorldGen = isClientUsable && this.networkState.config.distantGenerationEnabled && clientside.isRendering();
boolean isWorldGenRunning = worldGenModule.isWorldGenRunning();
boolean isClientUsable = this.networkState != null && !this.networkState.getClient().isClosed();
boolean shouldDoWorldGen = isClientUsable && this.networkState.config.distantGenerationEnabled && this.clientside.isRendering();
boolean isWorldGenRunning = this.worldGenModule.isWorldGenRunning();
if (shouldDoWorldGen && !isWorldGenRunning)
{
// start world gen
worldGenModule.startWorldGen(this.dataFileHandler, new WorldGenState(this, this.networkState));
this.worldGenModule.startWorldGen(this.dataFileHandler, new WorldGenState(this, this.networkState));
}
else if (!shouldDoWorldGen && isWorldGenRunning)
{
// stop world gen
worldGenModule.stopWorldGen(this.dataFileHandler);
this.worldGenModule.stopWorldGen(this.dataFileHandler);
}
if (worldGenModule.isWorldGenRunning())
if (this.worldGenModule.isWorldGenRunning())
{
ClientLevelModule.ClientRenderState renderState = clientside.ClientRenderStateRef.get();
ClientLevelModule.ClientRenderState renderState = this.clientside.ClientRenderStateRef.get();
if (renderState != null && renderState.quadtree != null)
{
dataFileHandler.removeGenRequestIf(p -> !renderState.quadtree.isSectionPosInBounds(p));
this.dataFileHandler.removeGenRequestIf(p -> !renderState.quadtree.isSectionPosInBounds(p));
}
worldGenModule.worldGenTick(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos()));
this.worldGenModule.worldGenTick(new DhBlockPos2D(MC_CLIENT.getPlayerBlockPos()));
}
}
@Override
public void render(Mat4f mcModelViewMatrix, Mat4f mcProjectionMatrix, float partialTicks, IProfilerWrapper profiler)
{
clientside.render(mcModelViewMatrix, mcProjectionMatrix, partialTicks, profiler);
this.clientside.render(mcModelViewMatrix, mcProjectionMatrix, partialTicks, profiler);
}
@@ -192,35 +209,37 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
//================//
@Override
public int computeBaseColor(DhBlockPos pos, IBiomeWrapper biome, IBlockStateWrapper block) { return levelWrapper.computeBaseColor(pos, biome, block); }
public int computeBaseColor(DhBlockPos pos, IBiomeWrapper biome, IBlockStateWrapper block) { return this.levelWrapper.computeBaseColor(pos, biome, block); }
@Override
public IClientLevelWrapper getClientLevelWrapper() { return levelWrapper; }
public IClientLevelWrapper getClientLevelWrapper() { return this.levelWrapper; }
@Override
public void clearRenderCache()
{
clientside.clearRenderCache();
this.clientside.clearRenderCache();
}
@Override
public ILevelWrapper getLevelWrapper() { return levelWrapper; }
public ILevelWrapper getLevelWrapper() { return this.levelWrapper; }
@Override
public void updateDataSourcesWithChunkData(ChunkSizedFullDataAccessor data) { this.clientside.updateDataSourcesWithChunkData(data); }
@Override
public int getMinY() { return levelWrapper.getMinHeight(); }
public int getMinY() { return this.levelWrapper.getMinHeight(); }
@Override
public void close()
{
if (worldGenModule != null)
worldGenModule.close();
clientside.close();
if (this.worldGenModule != null)
{
this.worldGenModule.close();
}
this.clientside.close();
super.close();
dataFileHandler.close();
LOGGER.info("Closed " + DhClientLevel.class.getSimpleName() + " for " + levelWrapper);
this.dataFileHandler.close();
LOGGER.info("Closed " + DhClientLevel.class.getSimpleName() + " for " + this.levelWrapper);
}
//=======================//
@@ -230,13 +249,13 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
@Override
public IFullDataSourceProvider getFileHandler()
{
return dataFileHandler;
return this.dataFileHandler;
}
@Override
public AbstractSaveStructure getSaveStructure()
{
return saveStructure;
return this.saveStructure;
}
@Override
@@ -253,6 +272,6 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
0.2, 32f
)
);
clientside.reloadPos(pos);
this.clientside.reloadPos(pos);
}
}
@@ -56,6 +56,7 @@ import java.util.concurrent.*;
public class DhServerLevel extends DhLevel implements IDhServerLevel
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
public final ServerLevelModule serverside;
private final IServerLevelWrapper serverLevelWrapper;
@@ -89,36 +90,73 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
private void registerNetworkHandlers()
{
this.eventSource.registerHandler(FullDataSourceRequestMessage.class, remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) ->
this.eventSource.registerHandler(FullDataSourceRequestMessage.class, this.remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) ->
{
if (!serverPlayerState.config.isDistantGenerationEnabled())
if (msg.checksum == null)
{
msg.sendResponse(new RequestRejectedException("Operation is disabled from config."));
return;
}
if (!serverPlayerState.fullDataRequestConcurrencyLimiter.tryAcquire(msg))
return;
while (true)
{
IncompleteDataSourceEntry entry = incompleteDataSources.computeIfAbsent(msg.dhSectionPos, pos -> {
IncompleteDataSourceEntry newEntry = new IncompleteDataSourceEntry();
this.trySetGeneratedDataSourceToEntry(newEntry, pos);
return newEntry;
});
// If this fails, current entry is being drained and need to create another one
if (entry.requestCollectionSemaphore.tryAcquire())
// Normal generation
if (!serverPlayerState.config.isDistantGenerationEnabled())
{
fullDataRequests.put(msg.futureId, entry);
entry.requestMessages.put(msg.futureId, msg);
entry.requestCollectionSemaphore.release();
break;
msg.sendResponse(new RequestRejectedException("Operation is disabled from config."));
return;
}
if (!serverPlayerState.fullDataRequestConcurrencyLimiter.tryAcquire(msg))
{
return;
}
while (true)
{
IncompleteDataSourceEntry entry = this.incompleteDataSources.computeIfAbsent(msg.sectionPos, pos ->
{
IncompleteDataSourceEntry newEntry = new IncompleteDataSourceEntry();
this.trySetGeneratedDataSourceToEntry(newEntry, pos);
return newEntry;
});
// If this fails, current entry is being drained and need to create another one
if (entry.requestCollectionSemaphore.tryAcquire())
{
this.fullDataRequests.put(msg.futureId, entry);
entry.requestMessages.put(msg.futureId, msg);
entry.requestCollectionSemaphore.release();
break;
}
}
}
else
{
// Post-relog update
if (!serverPlayerState.config.isPostRelogUpdateEnabled())
{
msg.sendResponse(new RequestRejectedException("Operation is disabled from config."));
return;
}
if (!serverPlayerState.postRelogUpdateRequestConcurrencyLimiter.tryAcquire(msg))
{
return;
}
Integer serverChecksum = this.serverside.dataFileHandler.repo.getChecksumForSection(msg.sectionPos);
if (serverChecksum == null || serverChecksum.equals(msg.checksum))
{
serverPlayerState.postRelogUpdateRequestConcurrencyLimiter.release();
msg.sendResponse(new FullDataSourceResponseMessage(null, this));
return;
}
this.serverside.dataFileHandler.getAsync(msg.sectionPos).thenAccept(fullDataSource ->
{
serverPlayerState.postRelogUpdateRequestConcurrencyLimiter.release();
msg.sendResponse(new FullDataSourceResponseMessage((CompleteFullDataSource) fullDataSource, this));
});
}
}));
this.eventSource.registerHandler(GenTaskPriorityRequestMessage.class, remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) ->
this.eventSource.registerHandler(GenTaskPriorityRequestMessage.class, this.remotePlayerConnectionHandler.currentLevelOnly(this, (msg, serverPlayerState) ->
{
msg.sendResponse(new GenTaskPriorityResponseMessage(
this.serverside.dataFileHandler.getLoadStates(msg.posList.stream()
@@ -130,18 +168,23 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
this.eventSource.registerHandler(CancelMessage.class, msg ->
{
IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId);
if (entry == null) return;
if (entry == null)
{
return;
}
FullDataSourceRequestMessage requestMessage = entry.requestMessages.remove(msg.futureId);
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (serverPlayerState != null)
{
serverPlayerState.fullDataRequestConcurrencyLimiter.release();
}
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
if (entry.requestMessages.isEmpty())
{
incompleteDataSources.remove(requestMessage.dhSectionPos);
serverside.dataFileHandler.removeGenRequestIf(pos -> pos == requestMessage.dhSectionPos);
this.incompleteDataSources.remove(requestMessage.sectionPos);
this.serverside.dataFileHandler.removeGenRequestIf(pos -> pos == requestMessage.sectionPos);
}
entry.requestCollectionSemaphore.release(Short.MAX_VALUE);
@@ -158,20 +201,23 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
this.worldGenLoopingQueue.remove(serverPlayer);
}
@Override
public void serverTick()
{
chunkToLodBuilder.tick();
this.chunkToLodBuilder.tick();
// Send finished data source requests
for (Map.Entry<DhSectionPos, IncompleteDataSourceEntry> mapEntry : incompleteDataSources.entrySet())
for (Map.Entry<DhSectionPos, IncompleteDataSourceEntry> mapEntry : this.incompleteDataSources.entrySet())
{
IncompleteDataSourceEntry entry = mapEntry.getValue();
if (entry.fullDataSource == null || entry.fullDataSource instanceof IIncompleteFullDataSource)
{
continue;
}
LodUtil.assertTrue(entry.fullDataSource instanceof CompleteFullDataSource, "Invalid full data source");
incompleteDataSources.remove(mapEntry.getKey());
this.incompleteDataSources.remove(mapEntry.getKey());
// This semaphore is intentionally acquired forever
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
@@ -181,9 +227,11 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
{
this.fullDataRequests.remove(msg.futureId);
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (serverPlayerState == null)
{
continue;
}
serverPlayerState.fullDataRequestConcurrencyLimiter.release();
msg.sendResponse(new FullDataSourceResponseMessage(completeSource, this));
@@ -191,22 +239,30 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
}
// Send updated chunks after delay
for (Map.Entry<DhChunkPos, ChunkUpdateData> chunkUpdateEntry : chunkUpdatesToSend.entrySet())
for (Map.Entry<DhChunkPos, ChunkUpdateData> chunkUpdateEntry : this.chunkUpdatesToSend.entrySet())
{
ChunkUpdateData chunkUpdateData = chunkUpdateEntry.getValue();
if (System.currentTimeMillis() < chunkUpdateData.time + CHUNK_UPDATE_SEND_DELAY)
continue;
chunkUpdatesToSend.remove(chunkUpdateEntry.getKey());
for (ServerPlayerState serverPlayerState : remotePlayerConnectionHandler.getConnectedPlayers())
{
if (!serverPlayerState.config.isRealTimeUpdatesEnabled()) continue;
continue;
}
this.chunkUpdatesToSend.remove(chunkUpdateEntry.getKey());
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
{
if (!serverPlayerState.config.isRealTimeUpdatesEnabled())
{
continue;
}
double distanceFromPlayer = chunkUpdateData.accessor.chunkPos.distance(new DhChunkPos(serverPlayerState.serverPlayer.getPosition()));
if (distanceFromPlayer < serverPlayerState.serverPlayer.getViewDistance() ||
distanceFromPlayer > serverPlayerState.config.getRenderDistanceRadius()) return;
distanceFromPlayer > serverPlayerState.config.getRenderDistanceRadius())
{
return;
}
serverPlayerState.connection.sendMessage(new FullDataPartialUpdateMessage(chunkUpdateData.accessor, this));
}
@@ -218,10 +274,14 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
{
CompletableFuture<ChunkSizedFullDataAccessor> future = super.updateChunkAsync(chunk);
if (future == null)
{
return null;
}
if (!Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get())
{
return future;
}
future.thenAccept(chunkSizedFullDataAccessor ->
{
@@ -240,37 +300,42 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
}
@Override
public int getMinY() { return getLevelWrapper().getMinHeight(); }
public int getMinY()
{
return this.getLevelWrapper().getMinHeight();
}
@Override
public void close()
{
super.close();
serverside.close();
LOGGER.info("Closed DHLevel for {}", getLevelWrapper());
this.serverside.close();
LOGGER.info("Closed DHLevel for {}", this.getLevelWrapper());
}
@Override
public void doWorldGen()
{
boolean shouldDoWorldGen = true; //todo;
boolean isWorldGenRunning = serverside.worldGenModule.isWorldGenRunning();
boolean isWorldGenRunning = this.serverside.worldGenModule.isWorldGenRunning();
if (shouldDoWorldGen && !isWorldGenRunning)
{
// start world gen
serverside.worldGenModule.startWorldGen(serverside.dataFileHandler, new ServerLevelModule.WorldGenState(this));
this.serverside.worldGenModule.startWorldGen(this.serverside.dataFileHandler, new ServerLevelModule.WorldGenState(this));
}
else if (!shouldDoWorldGen && isWorldGenRunning)
{
// stop world gen
serverside.worldGenModule.stopWorldGen(serverside.dataFileHandler);
this.serverside.worldGenModule.stopWorldGen(this.serverside.dataFileHandler);
}
if (serverside.worldGenModule.isWorldGenRunning())
if (this.serverside.worldGenModule.isWorldGenRunning())
{
IServerPlayerWrapper firstPlayer = this.worldGenLoopingQueue.peek();
if (firstPlayer == null)
{
return;
}
// Put first player in back before removing from front, so it can be removed by other thread without blocking
// - if it gets removed, remove() below will remove the item we just put instead
@@ -278,23 +343,32 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
this.worldGenLoopingQueue.remove(firstPlayer);
Vec3d position = firstPlayer.getPosition();
serverside.worldGenModule.worldGenTick(new DhBlockPos2D((int) position.x, (int) position.z));
this.serverside.worldGenModule.worldGenTick(new DhBlockPos2D((int) position.x, (int) position.z));
}
}
@Override
public IServerLevelWrapper getServerLevelWrapper() { return serverLevelWrapper; }
public IServerLevelWrapper getServerLevelWrapper()
{
return this.serverLevelWrapper;
}
@Override
public ILevelWrapper getLevelWrapper() { return getServerLevelWrapper(); }
public ILevelWrapper getLevelWrapper()
{
return this.getServerLevelWrapper();
}
@Override
public IFullDataSourceProvider getFileHandler() { return serverside.dataFileHandler; }
public IFullDataSourceProvider getFileHandler()
{
return this.serverside.dataFileHandler;
}
@Override
public AbstractSaveStructure getSaveStructure()
{
return serverside.saveStructure;
return this.serverside.saveStructure;
}
@Override
@@ -304,9 +378,13 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
{
this.serverside.dataFileHandler.getAsync(pos).thenAccept(fullDataSource -> {
if (fullDataSource instanceof IIncompleteFullDataSource)
{
fullDataSource = ((IIncompleteFullDataSource) fullDataSource).tryPromotingToCompleteDataSource();
}
if (fullDataSource instanceof CompleteFullDataSource)
{
entry.fullDataSource = fullDataSource;
}
});
}
@@ -314,7 +392,10 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
public void onWorldGenTaskComplete(DhSectionPos pos)
{
IncompleteDataSourceEntry entry = this.incompleteDataSources.get(pos);
if (entry == null) return;
if (entry == null)
{
return;
}
this.trySetGeneratedDataSourceToEntry(entry, pos);
}
@@ -38,6 +38,8 @@ public interface IDhLevel extends AutoCloseable
*/
ILevelWrapper getLevelWrapper();
void updateDataSourcesWithChunkData(ChunkSizedFullDataAccessor data);
@Nullable
CompletableFuture<ChunkSizedFullDataAccessor> updateChunkAsync(IChunkWrapper chunk);
@@ -0,0 +1,289 @@
package com.seibel.distanthorizons.core.multiplayer.client;
import com.google.common.base.Stopwatch;
import com.seibel.distanthorizons.core.config.types.ConfigEntry;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.generation.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.render.renderer.DebugRenderer;
import com.seibel.distanthorizons.core.render.renderer.IDebugRenderable;
import com.seibel.distanthorizons.core.util.LodUtil;
import io.netty.channel.ChannelException;
import org.apache.logging.log4j.Logger;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import java.awt.*;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public abstract class AbstractFullDataRequestQueue implements IDebugRenderable, AutoCloseable
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
protected static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
public final ClientNetworkState networkState;
protected final IDhClientLevel level;
private final boolean changedOnly;
private volatile CompletableFuture<Void> closingFuture = null;
protected final ConcurrentMap<DhSectionPos, RequestQueueEntry> waitingTasks = new ConcurrentHashMap<>();
private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true);
private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log);
private final AtomicInteger finishedRequests = new AtomicInteger();
private final AtomicInteger failedRequests = new AtomicInteger();
private final ConfigEntry<Boolean> showDebugWireframeConfig;
private final Set<DhSectionPos> alreadyRequestedPositions = ConcurrentHashMap.newKeySet();
protected abstract int getRequestConcurrencyLimit();
protected abstract String getQueueName();
public AbstractFullDataRequestQueue(ClientNetworkState networkState, IDhClientLevel level, boolean changedOnly, ConfigEntry<Boolean> showDebugWireframeConfig)
{
this.networkState = networkState;
this.level = level;
this.changedOnly = changedOnly;
this.showDebugWireframeConfig = showDebugWireframeConfig;
DebugRenderer.register(this, this.showDebugWireframeConfig);
}
public CompletableFuture<Boolean> submitRequest(DhSectionPos sectionPos, Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer)
{
return this.submitRequest(sectionPos, chunkDataConsumer, null);
}
public CompletableFuture<Boolean> submitRequest(DhSectionPos sectionPos, Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer, @Nullable Integer currentChecksum)
{
LodUtil.assertTrue(sectionPos.getDetailLevel() == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed.");
// check if this is a duplicate task
if (this.alreadyRequestedPositions.contains(sectionPos))
{
// temporary solution to prevent requesting the same section multiple times
LOGGER.trace("Duplicate section " + sectionPos + ". Skipping...");
return CompletableFuture.completedFuture(false);
}
this.alreadyRequestedPositions.add(sectionPos);
RequestQueueEntry entry = new RequestQueueEntry(chunkDataConsumer, currentChecksum);
this.waitingTasks.put(sectionPos, entry);
return entry.future;
}
protected int posDistanceSquared(DhBlockPos2D targetPos, DhSectionPos pos)
{
return (int) pos.getCenterBlockPos().distSquared(targetPos);
}
public synchronized boolean tick(DhBlockPos2D targetPos)
{
if (this.closingFuture != null || !this.networkState.getClient().isReady())
{
return false;
}
while (this.getWaitingTaskCount() > this.getInProgressTaskCount()
&& this.getInProgressTaskCount() < this.getRequestConcurrencyLimit()
&& this.pendingTasksSemaphore.tryAcquire())
{
this.sendNewRequest(targetPos);
}
return true;
}
public void cancelRequests(Iterable<DhSectionPos> positions)
{
for (DhSectionPos pos : positions)
{
RequestQueueEntry entry = this.waitingTasks.remove(pos);
if (entry != null)
{
entry.future.cancel(false);
if (entry.request != null)
{
entry.request.cancel(false);
}
this.alreadyRequestedPositions.remove(pos);
}
}
}
private void sendNewRequest(DhBlockPos2D targetPos)
{
Map.Entry<DhSectionPos, RequestQueueEntry> mapEntry = this.waitingTasks.entrySet().stream()
.filter(task -> task.getValue().request == null)
.reduce(null, (a, b)
-> a == null
|| b.getValue().priority > a.getValue().priority
|| (b.getValue().priority == a.getValue().priority && this.posDistanceSquared(targetPos, b.getKey()) < this.posDistanceSquared(targetPos, a.getKey()))
? b : a);
if (mapEntry == null)
{
this.pendingTasksSemaphore.release();
return;
}
DhSectionPos sectionPos = mapEntry.getKey();
RequestQueueEntry entry = mapEntry.getValue();
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.getClient().sendRequest(new FullDataSourceRequestMessage(this.level.getLevelWrapper(), sectionPos, entry.currentChecksum), FullDataSourceResponseMessage.class);
entry.request = request;
request.handleAsync((response, throwable) ->
{
this.pendingTasksSemaphore.release();
this.finishedRequests.incrementAndGet();
try
{
if (throwable != null)
{
throw throwable;
}
this.waitingTasks.remove(sectionPos);
LOGGER.debug("FullDataSourceResponseMessage " + sectionPos);
CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, this.level);
if (fullDataSource != null)
{
fullDataSource.splitIntoChunkSizedAccessors(entry.chunkDataConsumer);
response.getFullDataSourceLoader().returnPooledDataSource(fullDataSource);
}
else
{
LodUtil.assertTrue(this.changedOnly, "Received empty data source response for not changed-only request");
}
}
catch (InvalidLevelException ignored)
{
// We're too late
}
catch (ChannelException | RateLimitedException e)
{
if (e instanceof RateLimitedException)
{
LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage());
}
entry.request = null;
this.finishedRequests.decrementAndGet();
}
catch (CancellationException ignored)
{
this.finishedRequests.decrementAndGet();
}
catch (Throwable e)
{
LOGGER.error("Error while fetching full data source", e);
this.failedRequests.incrementAndGet();
return entry.future.complete(false);
}
return entry.future.complete(true);
});
}
private String[] f3Log()
{
ArrayList<String> lines = new ArrayList<>();
lines.add(this.getQueueName() + " [" + this.level.getClientLevelWrapper().getDimensionType().getDimensionName() + "]");
lines.add("Requests: " + this.finishedRequests + " / " + (this.getWaitingTaskCount() + this.finishedRequests.get()) + " (failed: " + this.failedRequests + ", rate limit: " + this.getRequestConcurrencyLimit() + ")");
return lines.toArray(new String[0]);
}
public int getWaitingTaskCount() { return this.waitingTasks.size(); }
public int getInProgressTaskCount() { return Short.MAX_VALUE - this.pendingTasksSemaphore.availablePermits(); }
public CompletableFuture<Void> startClosing(boolean alsoInterruptRunning)
{
return this.closingFuture = CompletableFuture.runAsync(() -> {
Stopwatch stopwatch = Stopwatch.createStarted();
do
{
for (RequestQueueEntry entry : this.waitingTasks.values())
{
entry.future.cancel(alsoInterruptRunning);
if (entry.request != null && entry.request.cancel(alsoInterruptRunning))
{
this.pendingTasksSemaphore.release();
}
}
}
while (!this.pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE) && stopwatch.elapsed(TimeUnit.SECONDS) < SHUTDOWN_TIMEOUT_SECONDS);
if (stopwatch.elapsed(TimeUnit.SECONDS) >= SHUTDOWN_TIMEOUT_SECONDS)
{
LOGGER.warn(this.getQueueName() + " for " + this.level.getLevelWrapper() + " did not shutdown in " + SHUTDOWN_TIMEOUT_SECONDS + " seconds! Some unfinished tasks might be left hanging.");
}
});
}
@Override
public void close()
{
this.f3Message.close();
DebugRenderer.unregister(this, this.showDebugWireframeConfig);
}
@Override
public void debugRender(DebugRenderer r)
{
for (Map.Entry<DhSectionPos, RequestQueueEntry> mapEntry : this.waitingTasks.entrySet())
{
r.renderBox(new DebugRenderer.Box(mapEntry.getKey(), -32f, 64f, 0.05f,
mapEntry.getValue().request != null ? Color.red
: mapEntry.getValue().priority == 3 ? Color.orange
: mapEntry.getValue().priority == 2 ? Color.cyan
: mapEntry.getValue().priority == 1 ? Color.blue
: Color.gray
));
}
}
protected static class RequestQueueEntry
{
public final CompletableFuture<Boolean> future = new CompletableFuture<>();
public final Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer;
@Nullable
public final Integer currentChecksum;
// Higher value = higher priority.
// Priority of 0 is reserved for unassigned value
public int priority = 0;
@CheckForNull
public CompletableFuture<?> request;
public RequestQueueEntry(
Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer,
@Nullable
Integer currentChecksum)
{
this.chunkDataConsumer = chunkDataConsumer;
this.currentChecksum = currentChecksum;
}
}
}
@@ -0,0 +1,30 @@
package com.seibel.distanthorizons.core.multiplayer.client;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.level.IDhClientLevel;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
public class FullDataRefreshQueue extends AbstractFullDataRequestQueue
{
public FullDataRefreshQueue(IDhClientLevel level, ClientNetworkState networkState)
{
super(networkState, level, true, Config.Client.Advanced.Debugging.DebugWireframe.showWorldGenQueue);
}
@Override
protected int getRequestConcurrencyLimit() { return this.networkState.config.postRelogUpdateConcurrencyLimit; }
@Override
protected String getQueueName() { return "Data Refresh Queue"; }
@Override
public boolean tick(DhBlockPos2D targetPos)
{
if (!this.networkState.config.postRelogUpdateEnabled)
{
return false;
}
return super.tick(targetPos);
}
}
@@ -11,6 +11,7 @@ public abstract class AbstractMultiplayerConfig implements INetworkObject
public abstract int getGenTaskPriorityRequestRateLimit();
public abstract boolean isRealTimeUpdatesEnabled();
public abstract boolean isPostRelogUpdateEnabled();
public abstract int getPostRelogUpdateConcurrencyLimit();
@Override
public void encode(ByteBuf out)
@@ -21,6 +22,7 @@ public abstract class AbstractMultiplayerConfig implements INetworkObject
out.writeInt(this.getGenTaskPriorityRequestRateLimit());
out.writeBoolean(this.isRealTimeUpdatesEnabled());
out.writeBoolean(this.isPostRelogUpdateEnabled());
out.writeInt(this.getPostRelogUpdateConcurrencyLimit());
}
}
@@ -8,22 +8,25 @@ public class MultiplayerConfig extends AbstractMultiplayerConfig
// IMPORTANT: Once you added/removed config fields, modify MultiplayerConfigChangeListener accordingly.
public int renderDistanceRadius = Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistanceRadius.get();
@Override public int getRenderDistanceRadius() { return renderDistanceRadius; }
@Override public int getRenderDistanceRadius() { return this.renderDistanceRadius; }
public boolean distantGenerationEnabled = Config.Client.Advanced.WorldGenerator.enableDistantGeneration.get();
@Override public boolean isDistantGenerationEnabled() { return distantGenerationEnabled; }
@Override public boolean isDistantGenerationEnabled() { return this.distantGenerationEnabled; }
public int fullDataRequestConcurrencyLimit = Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit.get();
@Override public int getFullDataRequestConcurrencyLimit() { return fullDataRequestConcurrencyLimit; }
@Override public int getFullDataRequestConcurrencyLimit() { return this.fullDataRequestConcurrencyLimit; }
public int genTaskPriorityRequestRateLimit = Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit.get();
@Override public int getGenTaskPriorityRequestRateLimit() { return genTaskPriorityRequestRateLimit; }
@Override public int getGenTaskPriorityRequestRateLimit() { return this.genTaskPriorityRequestRateLimit; }
public boolean realTimeUpdatesEnabled = Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get();
@Override public boolean isRealTimeUpdatesEnabled() { return realTimeUpdatesEnabled; }
@Override public boolean isRealTimeUpdatesEnabled() { return this.realTimeUpdatesEnabled; }
public boolean postRelogUpdateEnabled = false; // Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get();
@Override public boolean isPostRelogUpdateEnabled() { return postRelogUpdateEnabled; }
public boolean postRelogUpdateEnabled = Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get();
@Override public boolean isPostRelogUpdateEnabled() { return this.postRelogUpdateEnabled; }
public int postRelogUpdateConcurrencyLimit = Config.Client.Advanced.Multiplayer.ServerNetworking.postRelogUpdateConcurrencyLimit.get();
@Override public int getPostRelogUpdateConcurrencyLimit() { return this.postRelogUpdateConcurrencyLimit; }
@Override
public void decode(ByteBuf in)
@@ -34,17 +37,19 @@ public class MultiplayerConfig extends AbstractMultiplayerConfig
this.genTaskPriorityRequestRateLimit = in.readInt();
this.realTimeUpdatesEnabled = in.readBoolean();
this.postRelogUpdateEnabled = in.readBoolean();
this.postRelogUpdateConcurrencyLimit = in.readInt();
}
@Override public String toString()
{
return "MultiplayerConfig{" +
"renderDistance=" + renderDistanceRadius +
", distantGenerationEnabled=" + distantGenerationEnabled +
", fullDataRequestConcurrencyLimit=" + fullDataRequestConcurrencyLimit +
", genTaskPriorityRequestRateLimit=" + genTaskPriorityRequestRateLimit +
", realTimeUpdatesEnabled=" + realTimeUpdatesEnabled +
", postRelogUpdatesEnabled=" + postRelogUpdateEnabled +
"renderDistance=" + this.renderDistanceRadius +
", distantGenerationEnabled=" + this.distantGenerationEnabled +
", fullDataRequestConcurrencyLimit=" + this.fullDataRequestConcurrencyLimit +
", genTaskPriorityRequestRateLimit=" + this.genTaskPriorityRequestRateLimit +
", realTimeUpdatesEnabled=" + this.realTimeUpdatesEnabled +
", postRelogUpdatesEnabled=" + this.postRelogUpdateEnabled +
", postRelogUpdateConcurrencyLimit=" + this.postRelogUpdateConcurrencyLimit +
'}';
}
@@ -16,7 +16,8 @@ public class MultiplayerConfigChangeListener implements Closeable
Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit,
Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit,
Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates,
//Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate
Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate,
Config.Client.Advanced.Multiplayer.ServerNetworking.postRelogUpdateConcurrencyLimit,
};
private final ArrayList<ConfigChangeListener> changeListeners = new ArrayList<>();
@@ -24,15 +25,19 @@ public class MultiplayerConfigChangeListener implements Closeable
public MultiplayerConfigChangeListener(Runnable runnable)
{
for (ConfigEntry entry : CONFIG_ENTRIES)
changeListeners.add(new ConfigChangeListener(entry, ignored -> runnable.run()));
{
this.changeListeners.add(new ConfigChangeListener(entry, ignored -> runnable.run()));
}
}
@Override
public void close()
{
for (ConfigChangeListener changeListener : changeListeners)
for (ConfigChangeListener changeListener : this.changeListeners)
{
changeListener.close();
changeListeners.clear();
}
this.changeListeners.clear();
}
}
@@ -27,7 +27,7 @@ public class ServerPlayerState
public final SupplierBasedConcurrencyLimiter<FullDataSourceRequestMessage> fullDataRequestConcurrencyLimiter = new SupplierBasedConcurrencyLimiter<>(
() -> ServerNetworking.fullDataRequestConcurrencyLimit.get(),
msg -> {
msg.sendResponse(new RateLimitedException("Max concurrent full data requests: " + config.getFullDataRequestConcurrencyLimit()));
msg.sendResponse(new RateLimitedException("Max concurrent full data requests: " + this.config.getFullDataRequestConcurrencyLimit()));
this.rateLimitKickTrigger.tryAcquire(null);
}
);
@@ -35,15 +35,22 @@ public class ServerPlayerState
public final SupplierBasedRateLimiter<GenTaskPriorityRequestMessage> genTaskPriorityRequestRateLimiter = new SupplierBasedRateLimiter<>(
() -> ServerNetworking.genTaskPriorityRequestRateLimit.get(),
msg -> {
// Shouldn't be called, but it's here just in case
msg.sendResponse(new RateLimitedException("Max section checks per second: " + config.getFullDataRequestConcurrencyLimit()));
msg.sendResponse(new RateLimitedException("Max section checks per second: " + this.config.getFullDataRequestConcurrencyLimit()));
this.rateLimitKickTrigger.tryAcquire(null);
}
);
public final SupplierBasedConcurrencyLimiter<FullDataSourceRequestMessage> postRelogUpdateRequestConcurrencyLimiter = new SupplierBasedConcurrencyLimiter<>(
() -> ServerNetworking.postRelogUpdateConcurrencyLimit.get(),
msg -> {
msg.sendResponse(new RateLimitedException("Max concurrent post-relog update requests: " + this.config.getPostRelogUpdateConcurrencyLimit()));
this.rateLimitKickTrigger.tryAcquire(null);
}
);
public ServerPlayerState(IServerPlayerWrapper serverPlayer) { this.serverPlayer = serverPlayer; }
public ServerPlayerState(IServerPlayerWrapper serverPlayer) { this.serverPlayer = serverPlayer; }
}
@@ -15,36 +15,42 @@ public class ServersideMultiplayerConfig extends AbstractMultiplayerConfig
@Override
public int getRenderDistanceRadius()
{
return Math.min(clientConfig.renderDistanceRadius, Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistanceRadius.get());
return Math.min(this.clientConfig.renderDistanceRadius, Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistanceRadius.get());
}
@Override
public boolean isDistantGenerationEnabled()
{
return clientConfig.distantGenerationEnabled && Config.Client.Advanced.WorldGenerator.enableDistantGeneration.get();
return this.clientConfig.distantGenerationEnabled && Config.Client.Advanced.WorldGenerator.enableDistantGeneration.get();
}
@Override
public int getFullDataRequestConcurrencyLimit()
{
return Math.min(clientConfig.fullDataRequestConcurrencyLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit.get());
return Math.min(this.clientConfig.fullDataRequestConcurrencyLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.fullDataRequestConcurrencyLimit.get());
}
@Override
public int getGenTaskPriorityRequestRateLimit()
{
return Math.min(clientConfig.genTaskPriorityRequestRateLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit.get());
return Math.min(this.clientConfig.genTaskPriorityRequestRateLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.genTaskPriorityRequestRateLimit.get());
}
@Override
public boolean isRealTimeUpdatesEnabled()
{
return clientConfig.realTimeUpdatesEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get();
return this.clientConfig.realTimeUpdatesEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enableRealTimeUpdates.get();
}
@Override
public boolean isPostRelogUpdateEnabled() {
return false; // clientConfig.postRelogUpdateEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get();
return this.clientConfig.postRelogUpdateEnabled && Config.Client.Advanced.Multiplayer.ServerNetworking.enablePostRelogUpdate.get();
}
@Override
public int getPostRelogUpdateConcurrencyLimit()
{
return Math.min(this.clientConfig.postRelogUpdateConcurrencyLimit, Config.Client.Advanced.Multiplayer.ServerNetworking.postRelogUpdateConcurrencyLimit.get());
}
@Override
@@ -6,12 +6,16 @@ import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
public interface IConnection
{
Logger LOGGER = LogManager.getLogger();
ChannelHandlerContext getChannelContext();
NetworkEventSource getRequestHandler();
@@ -22,6 +26,7 @@ public interface IConnection
default CompletableFuture<Void> sendMessage(NetworkMessage message)
{
LOGGER.trace("Sending message: " + message);
CompletableFuture<Void> future = new CompletableFuture<>();
ChannelHandlerContext ctx = this.getChannelContext();
@@ -52,7 +57,9 @@ public interface IConnection
this.sendMessage(msg).whenComplete((ignored, throwable) ->
{
if (throwable != null)
{
responseFuture.completeExceptionally(throwable);
}
});
return responseFuture;
}
@@ -43,7 +43,6 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
// TODO move to the config
private final int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("DH-Network - Server Boss Thread"));
@@ -129,7 +128,9 @@ public class NetworkServer extends NetworkEventSource implements AutoCloseable
public void close()
{
if (!this.isClosed.compareAndSet(false, true))
{
return;
}
LOGGER.info("Shutting down the network server.");
this.workerGroup.shutdownGracefully().syncUninterruptibly();
@@ -26,50 +26,56 @@ import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import io.netty.buffer.ByteBuf;
import javax.annotation.Nullable;
public class FullDataSourceRequestMessage extends FutureTrackableNetworkMessage implements ILevelRelatedMessage
{
public DhSectionPos dhSectionPos;
private int levelHashCode;
@Override public int getLevelHashCode() { return levelHashCode; }
public boolean changedOnly;
public DhSectionPos sectionPos;
/** Only present when requesting for changes. */
@Nullable
public Integer checksum;
@Override
public int getLevelHashCode() { return this.levelHashCode; }
public FullDataSourceRequestMessage() {}
public FullDataSourceRequestMessage(ILevelWrapper levelWrapper, DhSectionPos dhSectionPos)
public FullDataSourceRequestMessage(ILevelWrapper levelWrapper, DhSectionPos sectionPos, @Nullable Integer checksum)
{
// TODO Multiverse support
this.levelHashCode = levelWrapper.getDimensionType().getDimensionName().hashCode();
this.dhSectionPos = dhSectionPos;
}
public FullDataSourceRequestMessage(ILevelWrapper levelWrapper, DhSectionPos dhSectionPos, boolean changedOnly)
{
this(levelWrapper, dhSectionPos);
this.changedOnly = true;
this.sectionPos = sectionPos;
this.checksum = checksum;
}
@Override
public void encode0(ByteBuf out)
{
out.writeInt(levelHashCode);
dhSectionPos.encode(out);
out.writeBoolean(changedOnly);
out.writeInt(this.levelHashCode);
this.sectionPos.encode(out);
if (this.encodeOptional(out, this.checksum))
{
out.writeInt(this.checksum);
}
}
@Override
public void decode0(ByteBuf in)
{
levelHashCode = in.readInt();
dhSectionPos = INetworkObject.decodeStatic(DhSectionPos.zero(), in);
changedOnly = in.readBoolean();
this.levelHashCode = in.readInt();
this.sectionPos = INetworkObject.decodeStatic(DhSectionPos.zero(), in);
this.checksum = this.decodeOptional(in, in::readInt);
}
@Override
public String toString()
{
return super.toString(
"dhSectionPos=" + dhSectionPos +
", levelHashCode=" + levelHashCode
"dhSectionPos=" + this.sectionPos +
", levelHashCode=" + this.levelHashCode +
", checksum=" + this.checksum
);
}
@@ -31,20 +31,33 @@ import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataOutputStre
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Response message, containing the requested full data source,
* or nothing if requested in updates-only mode and the data was not updated. <br>
* Decoded full data source is not cached, since it's intended for a single use.
*/
public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage
{
// Transmitted data
@Nullable
private ByteBuf dataBuffer;
// Used only when encoding
@Nullable
private CompleteFullDataSource fullDataSource;
private DhServerLevel level;
// Used only when decoding
private CompleteFullDataSourceLoader fullDataSourceLoader;
public CompleteFullDataSourceLoader getFullDataSourceLoader() { return fullDataSourceLoader; }
private ByteBuf dataBuffer;
public CompleteFullDataSourceLoader getFullDataSourceLoader() { return this.fullDataSourceLoader; }
public FullDataSourceResponseMessage() {}
public FullDataSourceResponseMessage(CompleteFullDataSource fullDataSource, DhServerLevel level)
public FullDataSourceResponseMessage(@Nullable CompleteFullDataSource fullDataSource, DhServerLevel level)
{
this.fullDataSource = fullDataSource;
this.level = level;
@@ -53,41 +66,54 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage
@Override
public void encode0(ByteBuf out) throws IOException
{
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream())
if (this.encodeOptional(out, this.fullDataSource))
{
DhDataOutputStream dhOutputStream = new DhDataOutputStream(outputStream);
fullDataSource.writeToStream(dhOutputStream, level);
dhOutputStream.flush();
out.writeByte(fullDataSource.getDataFormatVersion());
out.writeInt(outputStream.size());
out.writeBytes(outputStream.toByteArray());
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream())
{
DhDataOutputStream dhOutputStream = new DhDataOutputStream(outputStream);
this.fullDataSource.writeToStream(dhOutputStream, this.level);
dhOutputStream.flush();
out.writeByte(this.fullDataSource.getDataFormatVersion());
out.writeInt(outputStream.size());
out.writeBytes(outputStream.toByteArray());
}
}
}
@Override
public void decode0(ByteBuf in)
{
byte dataVersion = in.readByte();
this.fullDataSourceLoader = (CompleteFullDataSourceLoader) AbstractFullDataSourceLoader.getLoader(CompleteFullDataSource.DATA_TYPE_NAME, dataVersion);
this.dataBuffer = in.readBytes(in.readInt());
this.dataBuffer = this.decodeOptional(in, () ->
{
byte dataVersion = in.readByte();
this.fullDataSourceLoader = (CompleteFullDataSourceLoader) AbstractFullDataSourceLoader.getLoader(CompleteFullDataSource.DATA_TYPE_NAME, dataVersion);
return in.readBytes(in.readInt());
});
}
public CompleteFullDataSource getFullDataSource(DhSectionPos pos, IDhLevel level) throws IOException, InterruptedException
@Nullable
public synchronized CompleteFullDataSource getFullDataSource(DhSectionPos pos, IDhLevel level) throws IOException, InterruptedException
{
try (ByteBufInputStream inputStream = new ByteBufInputStream(dataBuffer))
if (this.dataBuffer == null)
{
return fullDataSourceLoader.loadData(pos, new DhDataInputStream(inputStream), level);
return null;
}
try (ByteBufInputStream inputStream = new ByteBufInputStream(this.dataBuffer))
{
return this.fullDataSourceLoader.loadData(pos, new DhDataInputStream(inputStream), level);
}
finally
{
dataBuffer.release();
this.dataBuffer.release();
}
}
@Override public String toString()
@Override
public String toString()
{
return super.toString("dataBuffer=" + dataBuffer);
return super.toString("dataBuffer=" + this.dataBuffer);
}
}
@@ -22,6 +22,7 @@ package com.seibel.distanthorizons.core.sql;
import com.seibel.distanthorizons.api.enums.worldGeneration.EDhApiWorldGenerationStep;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import javax.annotation.Nullable;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
@@ -164,5 +165,22 @@ public abstract class AbstractDataSourceRepo extends AbstractDhRepo<DataSourceDt
return maxDetailLevel + DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL;
}
/**
* Returns the checksum of a given section pos, if it's present in the table.
*/
@Nullable
public Integer getChecksumForSection(DhSectionPos pos)
{
Map<String, Object> resultMap = this.queryDictionaryFirst("SELECT Checksum FROM DhFullData WHERE DhSectionPos = '" + pos.serialize() + "';");
if (resultMap == null || resultMap.get("Checksum") == null)
{
return null;
}
else
{
return (int) resultMap.get("Checksum");
}
}
}
@@ -27,6 +27,7 @@ public class SupplierBasedConcurrencyLimiter<T>
{
if (this.pendingTasks.incrementAndGet() > this.maxConcurrentTasksSupplier.get())
{
this.pendingTasks.decrementAndGet();
this.onFailureConsumer.accept(context);
return false;
}