diff --git a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java index 91b1402ef..42328cd88 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/level/DhServerLevel.java @@ -66,8 +66,8 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel private final RemotePlayerConnectionHandler remotePlayerConnectionHandler; private final ConcurrentLinkedQueue worldGenLoopingQueue = new ConcurrentLinkedQueue<>(); - private final ConcurrentMap incompleteDataSources = new ConcurrentHashMap<>(); - private final ConcurrentMap fullDataRequests = new ConcurrentHashMap<>(); + private final ConcurrentMap requestGroupsByPos = new ConcurrentHashMap<>(); + private final ConcurrentMap requestGroupsByFutureId = new ConcurrentHashMap<>(); public DhServerLevel(AbstractSaveStructure saveStructure, IServerLevelWrapper serverLevelWrapper, RemotePlayerConnectionHandler remotePlayerConnectionHandler) { @@ -107,18 +107,18 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel while (true) { - IncompleteDataSourceEntry entry = this.incompleteDataSources.computeIfAbsent(msg.sectionPos, pos -> + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.computeIfAbsent(msg.sectionPos, pos -> { - IncompleteDataSourceEntry newEntry = new IncompleteDataSourceEntry(); - this.trySetGeneratedDataSourceToEntry(newEntry, pos); - return newEntry; + DataSourceRequestGroup newGroup = new DataSourceRequestGroup(); + this.tryFulfillDataSourceRequestGroup(newGroup, pos); + return newGroup; }); - // If this fails, current entry is being drained and need to create another one - if (entry.requestCollectionSemaphore.tryAcquire()) + // If this fails, current group is being drained and need to create another one + if (requestGroup.requestAddSemaphore.tryAcquire()) { - this.fullDataRequests.put(msg.futureId, entry); - entry.requestMessages.put(msg.futureId, msg); - entry.requestCollectionSemaphore.release(); + this.requestGroupsByFutureId.put(msg.futureId, requestGroup); + requestGroup.requestMessages.put(msg.futureId, msg); + requestGroup.requestAddSemaphore.release(); break; } } @@ -162,23 +162,30 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel serverPlayerState.session.registerHandler(CancelMessage.class, msg -> { - IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId); - if (entry == null) + DataSourceRequestGroup requestGroup = this.requestGroupsByFutureId.remove(msg.futureId); + if (requestGroup == null) { return; } - FullDataSourceRequestMessage requestMessage = entry.requestMessages.remove(msg.futureId); + FullDataSourceRequestMessage requestMessage = requestGroup.requestMessages.remove(msg.futureId); serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release(); - entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - if (entry.requestMessages.isEmpty()) + if (requestGroup.requestRemoveSemaphore.tryAcquire()) { - this.incompleteDataSources.remove(requestMessage.sectionPos); - this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos); + // Prevent adding requests in case request will be removed by cancellation + requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + + if (requestGroup.requestMessages.isEmpty()) + { + this.requestGroupsByPos.remove(requestMessage.sectionPos); + this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos); + } + else + { + requestGroup.requestAddSemaphore.release(Short.MAX_VALUE); + } } - - entry.requestCollectionSemaphore.release(Short.MAX_VALUE); }); } @@ -234,19 +241,20 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel this.chunkToLodBuilder.tick(); // Send finished data source requests - for (Map.Entry mapEntry : this.incompleteDataSources.entrySet()) + for (Map.Entry entry : this.requestGroupsByPos.entrySet()) { - IncompleteDataSourceEntry entry = mapEntry.getValue(); + DataSourceRequestGroup requestGroup = entry.getValue(); - if (entry.fullDataSource == null) + if (requestGroup.fullDataSource == null) { continue; } - this.incompleteDataSources.remove(mapEntry.getKey()); + // Prevent adding or removing requests + requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE); - // This semaphore is intentionally acquired forever - entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE); + this.requestGroupsByPos.remove(entry.getKey()); ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor(); if (executor == null) @@ -256,10 +264,10 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel } CompletableFuture.runAsync(() -> { - FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(entry.fullDataSource); - for (FullDataSourceRequestMessage msg : entry.requestMessages.values()) + FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource); + for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values()) { - this.fullDataRequests.remove(msg.futureId); + this.requestGroupsByFutureId.remove(msg.futureId); ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer()); if (serverPlayerState == null) @@ -387,12 +395,12 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel @Override public boolean hasSkyLight() { return this.serverLevelWrapper.hasSkyLight(); } - private void trySetGeneratedDataSourceToEntry(IncompleteDataSourceEntry entry, long pos) + private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos) { this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource -> { if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps)) { - entry.fullDataSource = fullDataSource; + requestGroup.fullDataSource = fullDataSource; } else { @@ -404,10 +412,10 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel @Override public void onWorldGenTaskComplete(long pos) { - IncompleteDataSourceEntry entry = this.incompleteDataSources.get(pos); - if (entry != null) + DataSourceRequestGroup requestGroup = this.requestGroupsByPos.get(pos); + if (requestGroup != null) { - this.trySetGeneratedDataSourceToEntry(entry, pos); + this.tryFulfillDataSourceRequestGroup(requestGroup, pos); } } @@ -424,12 +432,15 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel messageList.add("["+dimName+"]"); } - private static class IncompleteDataSourceEntry + private static class DataSourceRequestGroup { + public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); + @CheckForNull public FullDataSourceV2 fullDataSource; - public final ConcurrentMap requestMessages = new ConcurrentHashMap<>(); - public final Semaphore requestCollectionSemaphore = new Semaphore(Short.MAX_VALUE, true); + + public final Semaphore requestAddSemaphore = new Semaphore(Short.MAX_VALUE, true); + public final Semaphore requestRemoveSemaphore = new Semaphore(Short.MAX_VALUE, true); } } \ No newline at end of file