Somewhat proper request cancellation

This commit is contained in:
s809
2023-08-04 21:04:34 +05:00
parent 76b226b865
commit 5767668efa
8 changed files with 131 additions and 59 deletions
@@ -80,12 +80,17 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
}
public void removeGenRequestIf(Function<DhSectionPos, Boolean> removeIf) {
HashSet<DhSectionPos> removedRequests = new HashSet<>();
this.incompleteDataSources.forEach((pos, dataSource) ->
{
if (removeIf.apply(pos)) {
this.incompleteDataSources.remove(pos);
removedRequests.add(pos);
}
});
this.worldGenQueueRef.get().cancelGenTasks(removedRequests);
}
//=================//
@@ -332,6 +337,7 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler
}
@Override
@Nullable
public Consumer<ChunkSizedFullDataAccessor> getChunkDataConsumer()
{
if (this.loadedTargetFullDataSource == null)
@@ -4,19 +4,20 @@ import com.seibel.distanthorizons.core.generation.tasks.IWorldGenTaskTracker;
import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhLodPos;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public interface IWorldGenerationQueue extends Closeable
{
byte largestDataDetail();
CompletableFuture<WorldGenResult> submitGenTask(DhLodPos pos, byte requiredDataDetail, IWorldGenTaskTracker tracker);
void cancelGenTasks(Iterable<DhSectionPos> positions);
void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos);
CompletableFuture<Void> startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning);
void close();
}
@@ -152,6 +152,11 @@ public class WorldGenerationQueue implements IWorldGenerationQueue, IDebugRender
//}
}
@Override
public void cancelGenTasks(Iterable<DhSectionPos> positions)
{
// TODO Cancel gen tasks properly
}
//===============//
// running tasks //
@@ -20,9 +20,7 @@ import com.seibel.distanthorizons.coreapi.util.BitShiftUtil;
import io.netty.channel.ChannelException;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -34,13 +32,12 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
private final ClientNetworkState networkState;
private final IDhClientLevel level;
private final ConcurrentMap<DhSectionPos, WorldGenQueueEntry> waitingTasks = new ConcurrentHashMap<>();
private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE);
private int pendingTasks() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); }
private final Set<CompletableFuture<FullDataSourceResponseMessage>> pendingRequests = ConcurrentHashMap.newKeySet();
private volatile CompletableFuture<Void> generatorClosingFuture = null;
private final ConcurrentMap<DhSectionPos, WorldGenQueueEntry> waitingTasks = new ConcurrentHashMap<>();
private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true);
private int pendingTasks() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); }
private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log);
private final AtomicInteger finishedRequests = new AtomicInteger();
private final AtomicInteger totalRequests = new AtomicInteger();
@@ -52,12 +49,14 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
this.level = level;
}
@Override public byte largestDataDetail()
@Override
public byte largestDataDetail()
{
return LodUtil.BLOCK_DETAIL_LEVEL;
}
@Override public CompletableFuture<WorldGenResult> submitGenTask(DhLodPos lodPos, byte requiredDataDetail, IWorldGenTaskTracker tracker)
@Override
public CompletableFuture<WorldGenResult> submitGenTask(DhLodPos lodPos, byte requiredDataDetail, IWorldGenTaskTracker tracker)
{
LodUtil.assertTrue(lodPos.detailLevel == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed.");
DhSectionPos sectionPos = new DhSectionPos(lodPos.detailLevel, lodPos);
@@ -69,7 +68,8 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
return entry.future;
}
@Override public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos)
@Override
public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos)
{
while (generatorClosingFuture == null
&& networkState.client().isReady()
@@ -81,20 +81,42 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
}
}
@Override
public void cancelGenTasks(Iterable<DhSectionPos> positions)
{
for (DhSectionPos pos : positions)
{
WorldGenQueueEntry entry = waitingTasks.remove(pos);
if (entry != null)
{
if (entry.request != null)
entry.request.cancel(false);
entry.future.cancel(false);
}
}
}
private void sendNewRequest(DhBlockPos2D targetPos)
{
DhSectionPos sectionPos = Objects.requireNonNull(waitingTasks.keySet().stream().reduce(null, (a, b)
-> a != null
&& a.getCenter().getCenterBlockPos().distSquared(targetPos)
< b.getCenter().getCenterBlockPos().distSquared(targetPos)
? a : b));
WorldGenQueueEntry entry = waitingTasks.remove(sectionPos);
Map.Entry<DhSectionPos, WorldGenQueueEntry> mapEntry = waitingTasks.entrySet().stream()
.filter(task -> task.getValue().request == null)
.reduce(null, (a, b)
-> a != null
&& a.getKey().getCenter().getCenterBlockPos().distSquared(targetPos)
< b.getKey().getCenter().getCenterBlockPos().distSquared(targetPos)
? a : b);
if (mapEntry == null)
{
pendingTasksSemaphore.release();
return;
}
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.client().sendRequest(new FullDataSourceRequestMessage(sectionPos));
pendingRequests.add(request);
request.handle((response, throwable) ->
DhSectionPos sectionPos = mapEntry.getKey();
WorldGenQueueEntry entry = mapEntry.getValue();
entry.request = this.networkState.client().sendRequest(new FullDataSourceRequestMessage(sectionPos));
entry.request.handle((response, throwable) ->
{
pendingRequests.remove(request);
pendingTasksSemaphore.release();
finishedRequests.incrementAndGet();
@@ -103,11 +125,20 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
if (throwable != null)
throw throwable;
waitingTasks.remove(sectionPos);
LOGGER.info("FullDataSourceResponseMessage " + sectionPos);
CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level);
// Check is dimension has been switched - received data may no longer be relevant
if (fullDataSource == null)
throw new CancellationException();
Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer = entry.tracker.getChunkDataConsumer();
// FIXME Who decided it was a good idea to use weak references for cancellation purposes?
if (chunkDataConsumer == null)
return entry.future.cancel(false);
sectionPos.forEachChildAtLevel(LodUtil.CHUNK_DETAIL_LEVEL, childPos -> {
ChunkSizedFullDataAccessor accessor = new ChunkSizedFullDataAccessor(new DhChunkPos(childPos.sectionX, childPos.sectionZ));
@@ -123,24 +154,27 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
chunkDataConsumer.accept(accessor);
});
entry.future.complete(WorldGenResult.CreateSuccess(sectionPos));
}
catch (CancellationException | ChannelException | RateLimitedException e)
catch (ChannelException | RateLimitedException e)
{
if (e instanceof RateLimitedException)
LOGGER.warn("Rate limited by server, re-queueing task ["+sectionPos+"]: "+e.getMessage());
finishedRequests.decrementAndGet();
waitingTasks.put(sectionPos, entry);
}
catch (CancellationException ignored)
{
finishedRequests.decrementAndGet();
totalRequests.decrementAndGet();
}
catch (Throwable e)
{
LOGGER.error("Error while fetching full data source", e);
failedRequests.incrementAndGet();
entry.future.complete(WorldGenResult.CreateFail());
return entry.future.complete(WorldGenResult.CreateFail());
}
return null;
return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos));
});
}
@@ -153,24 +187,24 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
return lines.toArray(new String[0]);
}
@Override public CompletableFuture<Void> startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning)
@Override
public CompletableFuture<Void> startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning)
{
return this.generatorClosingFuture = CompletableFuture.runAsync(() -> {
while (!pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE))
{
for (CompletableFuture<FullDataSourceResponseMessage> request : pendingRequests)
request.cancel(false);
}
if (cancelCurrentGeneration)
{
for (WorldGenQueueEntry entry : this.waitingTasks.values())
{
if (entry.request != null)
entry.request.cancel(alsoInterruptRunning);
entry.future.cancel(alsoInterruptRunning);
}
}
});
}
@Override public void close()
@Override
public void close()
{
f3Message.close();
}
@@ -179,6 +213,7 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue
{
public CompletableFuture<WorldGenResult> future;
public IWorldGenTaskTracker tracker;
public CompletableFuture<FullDataSourceResponseMessage> request;
public WorldGenQueueEntry(CompletableFuture<WorldGenResult> future, IWorldGenTaskTracker tracker)
{
@@ -2,6 +2,7 @@ package com.seibel.distanthorizons.core.generation.tasks;
import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor;
import javax.annotation.Nullable;
import java.util.function.Consumer;
/**
@@ -13,5 +14,6 @@ public interface IWorldGenTaskTracker
/** Returns true if the task hasn't been garbage collected. */
boolean isMemoryAddressValid();
@Nullable
Consumer<ChunkSizedFullDataAccessor> getChunkDataConsumer();
}
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
import javax.annotation.CheckForNull;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
@@ -43,7 +44,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
private final ConcurrentLinkedQueue<IServerPlayerWrapper> worldGenLoopingQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<DhSectionPos, IncompleteDataSourceEntry> incompleteDataSources = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, FullDataSourceRequestMessage> fullDataRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, IncompleteDataSourceEntry> fullDataRequests = new ConcurrentHashMap<>();
private final AppliedConfigState<Integer> rateLimitConfig = new AppliedConfigState<>(Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit);
@@ -91,8 +92,8 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
// If this fails, current entry is being drained and need create another one
if (entry.requestCollectionSemaphore.tryAcquire())
{
fullDataRequests.put(msg.futureId, msg);
entry.requestMessages.add(msg);
fullDataRequests.put(msg.futureId, entry);
entry.requestMessages.put(msg.futureId, msg);
entry.requestCollectionSemaphore.release();
break;
}
@@ -101,9 +102,20 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
this.eventSource.registerHandler(CancelMessage.class, msg ->
{
this.fullDataRequests.remove(msg.futureId);
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
serverPlayerState.pendingFullDataRequests.decrementAndGet();
IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId);
if (entry == null) return;
FullDataSourceRequestMessage requestMessage = entry.requestMessages.remove(msg.futureId);
remotePlayerConnectionHandler.getConnectedPlayer(msg).pendingFullDataRequests.decrementAndGet();
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
if (entry.requestMessages.isEmpty())
{
incompleteDataSources.remove(requestMessage.dhSectionPos);
serverside.dataFileHandler.removeGenRequestIf(pos -> pos == requestMessage.dhSectionPos);
}
entry.requestCollectionSemaphore.release(Short.MAX_VALUE);
});
}
@@ -127,32 +139,32 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
{
chunkToLodBuilder.tick();
for (Iterator<IncompleteDataSourceEntry> it = incompleteDataSources.values().iterator(); it.hasNext(); )
for (Map.Entry<DhSectionPos, IncompleteDataSourceEntry> mapEntry : incompleteDataSources.entrySet())
{
IncompleteDataSourceEntry entry = it.next();
if (entry.fullDataSource == null) continue;
IncompleteDataSourceEntry entry = mapEntry.getValue();
if (entry.fullDataSource == null)
continue;
if (entry.fullDataSource instanceof IIncompleteFullDataSource)
{
IIncompleteFullDataSource incompleteSource = (IIncompleteFullDataSource) entry.fullDataSource;
if (!incompleteSource.hasBeenPromoted()) continue;
if (!incompleteSource.hasBeenPromoted())
continue;
entry.fullDataSource = incompleteSource.tryPromotingToCompleteDataSource();
}
LodUtil.assertTrue(entry.fullDataSource instanceof CompleteFullDataSource, "Invalid full data source");
incompleteDataSources.remove(mapEntry.getKey());
it.remove();
// This semaphore is intentionally acquired forever
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
CompleteFullDataSource completeSource = (CompleteFullDataSource) entry.fullDataSource;
for (FullDataSourceRequestMessage msg : entry.requestMessages)
for (FullDataSourceRequestMessage msg : entry.requestMessages.values())
{
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (serverPlayerState == null) continue;
// Check if cancelled
if (this.fullDataRequests.remove(msg.futureId) == null)
if (serverPlayerState == null)
continue;
serverPlayerState.pendingFullDataRequests.decrementAndGet();
@@ -242,7 +254,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
{
@CheckForNull
public IFullDataSource fullDataSource;
public final Set<FullDataSourceRequestMessage> requestMessages = ConcurrentHashMap.newKeySet();
public final ConcurrentMap<Long, FullDataSourceRequestMessage> requestMessages = new ConcurrentHashMap<>();
public final Semaphore requestCollectionSemaphore = new Semaphore(Short.MAX_VALUE, true);
}
}
@@ -8,6 +8,7 @@ import com.seibel.distanthorizons.core.network.messages.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.ExceptionMessage;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.NetworkMessage;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import org.apache.logging.log4j.Logger;
@@ -89,7 +90,8 @@ public abstract class NetworkEventSource
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
responseFuture.handle((response, throwable) -> {
pendingFutures.remove(ctx, msg.futureId);
if (!(throwable instanceof ChannelException))
pendingFutures.remove(ctx, msg.futureId);
if (throwable instanceof CancellationException)
msg.sendResponse(new CancelMessage());
@@ -126,6 +128,6 @@ public abstract class NetworkEventSource
public void close()
{
this.handlers.clear();
completeAllFuturesExceptionally(new Exception(this.getClass().getSimpleName()+" is closed."));
completeAllFuturesExceptionally(new ChannelException(this.getClass().getSimpleName()+" is closed."));
}
}
@@ -12,6 +12,7 @@ import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataOutputStre
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -20,6 +21,7 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage
private CompleteFullDataSource fullDataSource;
private DhServerLevel level;
private int levelHashCode;
private CompleteFullDataSourceLoader fullDataSourceLoader;
private ByteBuf dataBuffer;
@@ -28,6 +30,9 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage
{
this.fullDataSource = fullDataSource;
this.level = level;
// TODO Multiverse support
this.levelHashCode = level.getLevelWrapper().getDimensionType().getDimensionName().hashCode();
}
@Override
@@ -39,6 +44,7 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage
fullDataSource.writeToStream(dhOutputStream, level);
dhOutputStream.flush();
out.writeInt(levelHashCode);
out.writeByte(fullDataSource.getBinaryDataFormatVersion());
out.writeInt(outputStream.size());
out.writeBytes(outputStream.toByteArray());
@@ -48,16 +54,19 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage
@Override
public void decode0(ByteBuf in)
{
levelHashCode = in.readInt();
byte dataVersion = in.readByte();
this.fullDataSourceLoader = (CompleteFullDataSourceLoader) AbstractFullDataSourceLoader.getLoader(CompleteFullDataSource.TYPE_ID, dataVersion);
assert this.fullDataSourceLoader != null;
this.dataBuffer = in.readBytes(in.readInt());
}
@Nullable
public CompleteFullDataSource getFullDataSource(DhSectionPos pos, IDhLevel level) throws IOException, InterruptedException
{
// TODO Multiverse support
if (levelHashCode != level.getLevelWrapper().getDimensionType().getDimensionName().hashCode())
return null;
try (ByteBufInputStream inputStream = new ByteBufInputStream(dataBuffer))
{
return fullDataSourceLoader.loadData(pos, new DhDataInputStream(inputStream), level);