diff --git a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataFileHandler.java b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataFileHandler.java index ff65b0b21..2da61a3ad 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataFileHandler.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/file/fullDatafile/GeneratedFullDataFileHandler.java @@ -80,12 +80,17 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler } public void removeGenRequestIf(Function removeIf) { + HashSet removedRequests = new HashSet<>(); + this.incompleteDataSources.forEach((pos, dataSource) -> { if (removeIf.apply(pos)) { this.incompleteDataSources.remove(pos); + removedRequests.add(pos); } }); + + this.worldGenQueueRef.get().cancelGenTasks(removedRequests); } //=================// @@ -332,6 +337,7 @@ public class GeneratedFullDataFileHandler extends FullDataFileHandler } @Override + @Nullable public Consumer getChunkDataConsumer() { if (this.loadedTargetFullDataSource == null) diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/IWorldGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/IWorldGenerationQueue.java index 43b33ce12..f3d2da864 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/IWorldGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/IWorldGenerationQueue.java @@ -4,19 +4,20 @@ import com.seibel.distanthorizons.core.generation.tasks.IWorldGenTaskTracker; import com.seibel.distanthorizons.core.generation.tasks.WorldGenResult; import com.seibel.distanthorizons.core.pos.DhBlockPos2D; import com.seibel.distanthorizons.core.pos.DhLodPos; +import com.seibel.distanthorizons.core.pos.DhSectionPos; import java.io.Closeable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; public interface IWorldGenerationQueue extends Closeable { byte largestDataDetail(); CompletableFuture submitGenTask(DhLodPos pos, byte requiredDataDetail, IWorldGenTaskTracker tracker); + void cancelGenTasks(Iterable positions); + void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos); CompletableFuture startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning); - void close(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java index e64eac970..d0211adc9 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldGenerationQueue.java @@ -152,6 +152,11 @@ public class WorldGenerationQueue implements IWorldGenerationQueue, IDebugRender //} } + @Override + public void cancelGenTasks(Iterable positions) + { + // TODO Cancel gen tasks properly + } //===============// // running tasks // diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java index 7669ff722..3947965ae 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/WorldRemoteGenerationQueue.java @@ -20,9 +20,7 @@ import com.seibel.distanthorizons.coreapi.util.BitShiftUtil; import io.netty.channel.ChannelException; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -34,13 +32,12 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue private final ClientNetworkState networkState; private final IDhClientLevel level; - private final ConcurrentMap waitingTasks = new ConcurrentHashMap<>(); - private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE); - private int pendingTasks() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); } - private final Set> pendingRequests = ConcurrentHashMap.newKeySet(); - private volatile CompletableFuture generatorClosingFuture = null; + private final ConcurrentMap waitingTasks = new ConcurrentHashMap<>(); + private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true); + private int pendingTasks() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); } + private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log); private final AtomicInteger finishedRequests = new AtomicInteger(); private final AtomicInteger totalRequests = new AtomicInteger(); @@ -52,12 +49,14 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue this.level = level; } - @Override public byte largestDataDetail() + @Override + public byte largestDataDetail() { return LodUtil.BLOCK_DETAIL_LEVEL; } - @Override public CompletableFuture submitGenTask(DhLodPos lodPos, byte requiredDataDetail, IWorldGenTaskTracker tracker) + @Override + public CompletableFuture submitGenTask(DhLodPos lodPos, byte requiredDataDetail, IWorldGenTaskTracker tracker) { LodUtil.assertTrue(lodPos.detailLevel == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed."); DhSectionPos sectionPos = new DhSectionPos(lodPos.detailLevel, lodPos); @@ -69,7 +68,8 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue return entry.future; } - @Override public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos) + @Override + public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos) { while (generatorClosingFuture == null && networkState.client().isReady() @@ -81,20 +81,42 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue } } + @Override + public void cancelGenTasks(Iterable positions) + { + for (DhSectionPos pos : positions) + { + WorldGenQueueEntry entry = waitingTasks.remove(pos); + if (entry != null) + { + if (entry.request != null) + entry.request.cancel(false); + entry.future.cancel(false); + } + } + } + private void sendNewRequest(DhBlockPos2D targetPos) { - DhSectionPos sectionPos = Objects.requireNonNull(waitingTasks.keySet().stream().reduce(null, (a, b) - -> a != null - && a.getCenter().getCenterBlockPos().distSquared(targetPos) - < b.getCenter().getCenterBlockPos().distSquared(targetPos) - ? a : b)); - WorldGenQueueEntry entry = waitingTasks.remove(sectionPos); + Map.Entry mapEntry = waitingTasks.entrySet().stream() + .filter(task -> task.getValue().request == null) + .reduce(null, (a, b) + -> a != null + && a.getKey().getCenter().getCenterBlockPos().distSquared(targetPos) + < b.getKey().getCenter().getCenterBlockPos().distSquared(targetPos) + ? a : b); + if (mapEntry == null) + { + pendingTasksSemaphore.release(); + return; + } - CompletableFuture request = this.networkState.client().sendRequest(new FullDataSourceRequestMessage(sectionPos)); - pendingRequests.add(request); - request.handle((response, throwable) -> + DhSectionPos sectionPos = mapEntry.getKey(); + WorldGenQueueEntry entry = mapEntry.getValue(); + + entry.request = this.networkState.client().sendRequest(new FullDataSourceRequestMessage(sectionPos)); + entry.request.handle((response, throwable) -> { - pendingRequests.remove(request); pendingTasksSemaphore.release(); finishedRequests.incrementAndGet(); @@ -103,11 +125,20 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue if (throwable != null) throw throwable; + waitingTasks.remove(sectionPos); LOGGER.info("FullDataSourceResponseMessage " + sectionPos); CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level); + // Check is dimension has been switched - received data may no longer be relevant + if (fullDataSource == null) + throw new CancellationException(); + Consumer chunkDataConsumer = entry.tracker.getChunkDataConsumer(); + // FIXME Who decided it was a good idea to use weak references for cancellation purposes? + if (chunkDataConsumer == null) + return entry.future.cancel(false); + sectionPos.forEachChildAtLevel(LodUtil.CHUNK_DETAIL_LEVEL, childPos -> { ChunkSizedFullDataAccessor accessor = new ChunkSizedFullDataAccessor(new DhChunkPos(childPos.sectionX, childPos.sectionZ)); @@ -123,24 +154,27 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue chunkDataConsumer.accept(accessor); }); - - entry.future.complete(WorldGenResult.CreateSuccess(sectionPos)); } - catch (CancellationException | ChannelException | RateLimitedException e) + catch (ChannelException | RateLimitedException e) { if (e instanceof RateLimitedException) LOGGER.warn("Rate limited by server, re-queueing task ["+sectionPos+"]: "+e.getMessage()); finishedRequests.decrementAndGet(); - waitingTasks.put(sectionPos, entry); + } + catch (CancellationException ignored) + { + finishedRequests.decrementAndGet(); + totalRequests.decrementAndGet(); } catch (Throwable e) { LOGGER.error("Error while fetching full data source", e); failedRequests.incrementAndGet(); - entry.future.complete(WorldGenResult.CreateFail()); + return entry.future.complete(WorldGenResult.CreateFail()); } - return null; + + return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos)); }); } @@ -153,24 +187,24 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue return lines.toArray(new String[0]); } - @Override public CompletableFuture startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) + @Override + public CompletableFuture startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning) { return this.generatorClosingFuture = CompletableFuture.runAsync(() -> { while (!pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE)) - { - for (CompletableFuture request : pendingRequests) - request.cancel(false); - } - - if (cancelCurrentGeneration) { for (WorldGenQueueEntry entry : this.waitingTasks.values()) + { + if (entry.request != null) + entry.request.cancel(alsoInterruptRunning); entry.future.cancel(alsoInterruptRunning); + } } }); } - @Override public void close() + @Override + public void close() { f3Message.close(); } @@ -179,6 +213,7 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue { public CompletableFuture future; public IWorldGenTaskTracker tracker; + public CompletableFuture request; public WorldGenQueueEntry(CompletableFuture future, IWorldGenTaskTracker tracker) { diff --git a/core/src/main/java/com/seibel/distanthorizons/core/generation/tasks/IWorldGenTaskTracker.java b/core/src/main/java/com/seibel/distanthorizons/core/generation/tasks/IWorldGenTaskTracker.java index d8704cc83..4f7e6bc4e 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/generation/tasks/IWorldGenTaskTracker.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/generation/tasks/IWorldGenTaskTracker.java @@ -2,6 +2,7 @@ package com.seibel.distanthorizons.core.generation.tasks; import com.seibel.distanthorizons.core.dataObjects.fullData.accessor.ChunkSizedFullDataAccessor; +import javax.annotation.Nullable; import java.util.function.Consumer; /** @@ -13,5 +14,6 @@ public interface IWorldGenTaskTracker /** Returns true if the task hasn't been garbage collected. */ boolean isMemoryAddressValid(); + @Nullable Consumer getChunkDataConsumer(); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index 710337b95..c4b7c7240 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger; import javax.annotation.CheckForNull; import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.*; @@ -43,7 +44,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel private final ConcurrentLinkedQueue worldGenLoopingQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentMap incompleteDataSources = new ConcurrentHashMap<>(); - private final ConcurrentMap fullDataRequests = new ConcurrentHashMap<>(); + private final ConcurrentMap fullDataRequests = new ConcurrentHashMap<>(); private final AppliedConfigState rateLimitConfig = new AppliedConfigState<>(Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit); @@ -91,8 +92,8 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel // If this fails, current entry is being drained and need create another one if (entry.requestCollectionSemaphore.tryAcquire()) { - fullDataRequests.put(msg.futureId, msg); - entry.requestMessages.add(msg); + fullDataRequests.put(msg.futureId, entry); + entry.requestMessages.put(msg.futureId, msg); entry.requestCollectionSemaphore.release(); break; } @@ -101,9 +102,20 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel this.eventSource.registerHandler(CancelMessage.class, msg -> { - this.fullDataRequests.remove(msg.futureId); - ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg); - serverPlayerState.pendingFullDataRequests.decrementAndGet(); + IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId); + if (entry == null) return; + FullDataSourceRequestMessage requestMessage = entry.requestMessages.remove(msg.futureId); + + remotePlayerConnectionHandler.getConnectedPlayer(msg).pendingFullDataRequests.decrementAndGet(); + + entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + if (entry.requestMessages.isEmpty()) + { + incompleteDataSources.remove(requestMessage.dhSectionPos); + serverside.dataFileHandler.removeGenRequestIf(pos -> pos == requestMessage.dhSectionPos); + } + + entry.requestCollectionSemaphore.release(Short.MAX_VALUE); }); } @@ -127,32 +139,32 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel { chunkToLodBuilder.tick(); - for (Iterator it = incompleteDataSources.values().iterator(); it.hasNext(); ) + for (Map.Entry mapEntry : incompleteDataSources.entrySet()) { - IncompleteDataSourceEntry entry = it.next(); - if (entry.fullDataSource == null) continue; + IncompleteDataSourceEntry entry = mapEntry.getValue(); + + if (entry.fullDataSource == null) + continue; if (entry.fullDataSource instanceof IIncompleteFullDataSource) { IIncompleteFullDataSource incompleteSource = (IIncompleteFullDataSource) entry.fullDataSource; - if (!incompleteSource.hasBeenPromoted()) continue; + if (!incompleteSource.hasBeenPromoted()) + continue; entry.fullDataSource = incompleteSource.tryPromotingToCompleteDataSource(); } LodUtil.assertTrue(entry.fullDataSource instanceof CompleteFullDataSource, "Invalid full data source"); + incompleteDataSources.remove(mapEntry.getKey()); - it.remove(); // This semaphore is intentionally acquired forever entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); CompleteFullDataSource completeSource = (CompleteFullDataSource) entry.fullDataSource; - for (FullDataSourceRequestMessage msg : entry.requestMessages) + for (FullDataSourceRequestMessage msg : entry.requestMessages.values()) { ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg); - if (serverPlayerState == null) continue; - - // Check if cancelled - if (this.fullDataRequests.remove(msg.futureId) == null) + if (serverPlayerState == null) continue; serverPlayerState.pendingFullDataRequests.decrementAndGet(); @@ -242,7 +254,7 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel { @CheckForNull public IFullDataSource fullDataSource; - public final Set requestMessages = ConcurrentHashMap.newKeySet(); + public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); public final Semaphore requestCollectionSemaphore = new Semaphore(Short.MAX_VALUE, true); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java index 91179edbe..09f9ee576 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/NetworkEventSource.java @@ -8,6 +8,7 @@ import com.seibel.distanthorizons.core.network.messages.CancelMessage; import com.seibel.distanthorizons.core.network.messages.ExceptionMessage; import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage; import com.seibel.distanthorizons.core.network.protocol.NetworkMessage; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelHandlerContext; import org.apache.logging.log4j.Logger; @@ -89,7 +90,8 @@ public abstract class NetworkEventSource CompletableFuture responseFuture = new CompletableFuture<>(); responseFuture.handle((response, throwable) -> { - pendingFutures.remove(ctx, msg.futureId); + if (!(throwable instanceof ChannelException)) + pendingFutures.remove(ctx, msg.futureId); if (throwable instanceof CancellationException) msg.sendResponse(new CancelMessage()); @@ -126,6 +128,6 @@ public abstract class NetworkEventSource public void close() { this.handlers.clear(); - completeAllFuturesExceptionally(new Exception(this.getClass().getSimpleName()+" is closed.")); + completeAllFuturesExceptionally(new ChannelException(this.getClass().getSimpleName()+" is closed.")); } } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceResponseMessage.java b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceResponseMessage.java index 6806a2364..d8f550fe7 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceResponseMessage.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/network/messages/FullDataSourceResponseMessage.java @@ -12,6 +12,7 @@ import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataOutputStre import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; +import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -20,6 +21,7 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage private CompleteFullDataSource fullDataSource; private DhServerLevel level; + private int levelHashCode; private CompleteFullDataSourceLoader fullDataSourceLoader; private ByteBuf dataBuffer; @@ -28,6 +30,9 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage { this.fullDataSource = fullDataSource; this.level = level; + + // TODO Multiverse support + this.levelHashCode = level.getLevelWrapper().getDimensionType().getDimensionName().hashCode(); } @Override @@ -39,6 +44,7 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage fullDataSource.writeToStream(dhOutputStream, level); dhOutputStream.flush(); + out.writeInt(levelHashCode); out.writeByte(fullDataSource.getBinaryDataFormatVersion()); out.writeInt(outputStream.size()); out.writeBytes(outputStream.toByteArray()); @@ -48,16 +54,19 @@ public class FullDataSourceResponseMessage extends FutureTrackableNetworkMessage @Override public void decode0(ByteBuf in) { + levelHashCode = in.readInt(); byte dataVersion = in.readByte(); - this.fullDataSourceLoader = (CompleteFullDataSourceLoader) AbstractFullDataSourceLoader.getLoader(CompleteFullDataSource.TYPE_ID, dataVersion); - assert this.fullDataSourceLoader != null; - this.dataBuffer = in.readBytes(in.readInt()); } + @Nullable public CompleteFullDataSource getFullDataSource(DhSectionPos pos, IDhLevel level) throws IOException, InterruptedException { + // TODO Multiverse support + if (levelHashCode != level.getLevelWrapper().getDimensionType().getDimensionName().hashCode()) + return null; + try (ByteBufInputStream inputStream = new ByteBufInputStream(dataBuffer)) { return fullDataSourceLoader.loadData(pos, new DhDataInputStream(inputStream), level);