Move request handling to another class and rewrite group locking logic

This commit is contained in:
s809
2024-11-13 18:19:22 +05:00
parent 15b2d56d8c
commit 3d11a208d7
3 changed files with 385 additions and 253 deletions
@@ -4,21 +4,18 @@ import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.file.fullDatafile.FullDataSourceProviderV2;
import com.seibel.distanthorizons.core.file.structure.ISaveStructure;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.logging.f3.F3Screen;
import com.seibel.distanthorizons.core.multiplayer.server.FullDataSourceRequestHandler;
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerState;
import com.seibel.distanthorizons.core.multiplayer.server.ServerPlayerStateManager;
import com.seibel.distanthorizons.core.network.exceptions.InvalidLevelException;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException;
import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
import com.seibel.distanthorizons.core.network.messages.AbstractTrackableMessage;
import com.seibel.distanthorizons.core.network.messages.ILevelRelatedMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataPartialUpdateMessage;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.requests.CancelMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.pos.blockPos.DhBlockPos2D;
@@ -28,21 +25,16 @@ import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
public abstract class AbstractDhServerLevel extends AbstractDhLevel implements IDhServerLevel
{
protected static final Logger LOGGER = DhLoggerBuilder.getLogger();
private static final ConfigBasedLogger NETWORK_LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Common.Logging.logNetworkEvent.get());
public final ServerLevelModule serverside;
protected final IServerLevelWrapper serverLevelWrapper;
@@ -56,11 +48,9 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
*/
protected final ConcurrentLinkedQueue<IServerPlayerWrapper> worldGenPlayerCenteringQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupByPos = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupByFutureId = new ConcurrentHashMap<>();
private final FullDataSourceRequestHandler requestHandler = new FullDataSourceRequestHandler(this);
private final boolean generatorSupportsNSizedGeneration = false;
private final boolean NSizedGenerationSupported = false;
//=============//
@@ -104,52 +94,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
@Override
public void serverTick()
{
// Send finished data source requests
for (Map.Entry<Long, DataSourceRequestGroup> entry : this.requestGroupByPos.entrySet())
{
DataSourceRequestGroup requestGroup = entry.getValue();
if (requestGroup.fullDataSource == null)
{
continue;
}
NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]");
// Make this group unavailable for adding into
this.requestGroupByPos.remove(entry.getKey());
requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
continue;
}
CompletableFuture.runAsync(() ->
{
Objects.requireNonNull(this.beaconBeamRepo);
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(entry.getKey())))
{
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
{
this.requestGroupByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
{
continue;
}
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
msg.sendResponse(new FullDataSourceResponseMessage(payload));
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
});
}
}
}, executor);
}
this.requestHandler.tick();
}
@Override
@@ -188,149 +133,35 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
{
serverPlayerState.networkSession.registerHandler(FullDataSourceRequestMessage.class, (message) ->
{
if (!this.messagePlayerInThisLevel(message))
if (!this.validatePlayerInCurrentLevel(message))
{
// we can't handle players in other levels, don't continue
return;
}
ServerPlayerState.RateLimiterSet rateLimiterSet = serverPlayerState.getRateLimiterSet(this);
LOGGER.info("received message ["+DhSectionPos.toString(message.sectionPos)+"]");
if (message.clientTimestamp == null)
{
this.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet);
this.requestHandler.queueWorldGenForRequestMessage(serverPlayerState, message, rateLimiterSet);
}
else
{
this.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet);
this.requestHandler.queueLodSyncForRequestMessage(serverPlayerState, message, rateLimiterSet);
}
});
serverPlayerState.networkSession.registerHandler(CancelMessage.class, msg ->
{
DataSourceRequestGroup requestGroup = this.requestGroupByFutureId.remove(msg.futureId);
if (requestGroup == null)
{
return;
}
// If this fails, the group is being removed and completing cancellation is not necessary
if (requestGroup.requestRemoveSemaphore.tryAcquire())
{
// Prevent adding requests in case the group will be removed by this cancellation
requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
requestGroup.requestRemoveSemaphore.release();
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
FullDataSourceRequestMessage requestMessage = requestGroup.requestMessages.remove(msg.futureId);
if (requestGroup.requestMessages.isEmpty())
{
NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestMessage.sectionPos) + "].");
this.requestGroupByPos.remove(requestMessage.sectionPos);
this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos);
}
else
{
requestGroup.requestAddSemaphore.release(Short.MAX_VALUE);
}
}
this.requestHandler.cancelRequest(msg.futureId);
});
}
private void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message))
{
return;
}
// the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated
long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1;
// the server timestamp will be null if no LOD data exists for this position
Long serverTimestamp = this.serverside.fullDataFileHandler.getTimestampForPos(message.sectionPos);
if (serverTimestamp == null
|| serverTimestamp <= clientTimestamp)
{
// either no data exists to sync, or the client is already up to date
rateLimiterSet.syncOnLoginRateLimiter.release();
message.sendResponse(new FullDataSourceResponseMessage(null));
return;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
// shouldn't normally happen, but just in case
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
return;
}
this.serverside.fullDataFileHandler.getAsync(message.sectionPos).thenAcceptAsync(fullDataSource ->
{
Objects.requireNonNull(this.beaconBeamRepo);
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.beaconBeamRepo.getAllBeamsForPos(message.sectionPos)))
{
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () ->
{
message.sendResponse(new FullDataSourceResponseMessage(payload));
rateLimiterSet.syncOnLoginRateLimiter.release();
});
}
}, executor);
}
private void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message))
{
return;
}
while (true)
{
DataSourceRequestGroup requestGroup = this.requestGroupByPos.computeIfAbsent(message.sectionPos, pos ->
{
DataSourceRequestGroup newGroup = new DataSourceRequestGroup();
this.tryFulfillDataSourceRequestGroup(newGroup, pos);
NETWORK_LOGGER.debug("[" + this.serverLevelWrapper.getDhIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "].");
return newGroup;
});
// If this fails, loop until either a permit is acquired or the group is removed to create another one
if (!requestGroup.requestAddSemaphore.tryAcquire())
{
Thread.yield();
continue;
}
this.requestGroupByFutureId.put(message.futureId, requestGroup);
requestGroup.requestMessages.put(message.futureId, message);
requestGroup.requestAddSemaphore.release();
break;
}
}
/** May send an error message in response if the message is a {@link AbstractTrackableMessage} */
private <T extends AbstractNetworkMessage> boolean messagePlayerInThisLevel(T message)
private <T extends AbstractNetworkMessage> boolean validatePlayerInCurrentLevel(T message)
{
if (!(message instanceof ILevelRelatedMessage))
{
@@ -374,57 +205,12 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
// world gen //
//===========//
public boolean isNSizedGenerationSupported() { return this.NSizedGenerationSupported; }
@Override
public void onWorldGenTaskComplete(long pos)
{
DataSourceRequestGroup requestGroup = this.requestGroupByPos.get(pos);
if (requestGroup != null)
{
requestGroup.worldGenTaskComplete = true;
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
}
private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos)
{
this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource ->
{
if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps))
{
LOGGER.info("sending - complete ["+DhSectionPos.toString(pos)+"]");
requestGroup.fullDataSource = fullDataSource;
}
else if (!this.generatorSupportsNSizedGeneration && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL)
{
// Make this group unavailable for adding into
this.requestGroupByPos.remove(pos);
requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
{
this.requestGroupByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.serverPlayerStateManager.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState != null)
{
serverPlayerState.getRateLimiterSet(this).generationRequestRateLimiter.release();
}
msg.sendResponse(new SectionRequiresSplittingException());
}
}
else if (requestGroup.worldGenTaskComplete)
{
LOGGER.info("sending - retry ["+DhSectionPos.toString(pos)+"]");
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
else
{
LOGGER.info("sending - queueing ["+DhSectionPos.toString(pos)+"]");
this.serverside.fullDataFileHandler.queuePositionForRetrieval(pos);
}
});
this.requestHandler.onWorldGenTaskComplete(pos);
}
@@ -564,31 +350,4 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
LOGGER.info("Closed DHLevel for [" + this.getLevelWrapper() + "].");
}
//================//
// helper classes //
//================//
private static class DataSourceRequestGroup
{
public final ConcurrentMap<Long, FullDataSourceRequestMessage> requestMessages = new ConcurrentHashMap<>();
/** If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps */
boolean worldGenTaskComplete = false;
@CheckForNull
public FullDataSourceV2 fullDataSource = null;
/**
* These two Semaphores are used to prevent all threads from locking on the group after it being fulfilled,
* as opposed to ReentrantReadWriteLocks which would allow the locking thread continue using it anyway. <br>
* Short.MAX_VALUE is chosen as a large enough number so non-exclusive accesses never block each other.
*/
public final Semaphore requestAddSemaphore = new Semaphore(Short.MAX_VALUE, true);
/** @see DataSourceRequestGroup#requestAddSemaphore */
public final Semaphore requestRemoveSemaphore = new Semaphore(Short.MAX_VALUE, true);
}
}
@@ -0,0 +1,109 @@
package com.seibel.distanthorizons.core.multiplayer.server;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import javax.annotation.CheckForNull;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
class DataSourceRequestGroup
{
public final long pos;
/**
* If this variable is true, we definitely know that generation is complete and there's no need for checking column gen steps
*/
private boolean worldGenTaskComplete = false;
void markWorldGenTaskComplete()
{
this.worldGenTaskComplete = true;
}
boolean isWorldGenTaskComplete()
{
return this.worldGenTaskComplete;
}
@CheckForNull
public FullDataSourceV2 fullDataSource = null;
public final ConcurrentMap<Long, RequestData> requestMessages = new ConcurrentHashMap<>();
public final Semaphore pendingAdditionSemaphore = new Semaphore(Short.MAX_VALUE, true);
public final AtomicBoolean isClosed = new AtomicBoolean();
DataSourceRequestGroup(long pos)
{
this.pos = pos;
}
public boolean tryClose()
{
if (!this.isClosed.compareAndSet(false, true))
{
return false;
}
this.pendingAdditionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
return true;
}
public boolean tryAddRequest(RequestData requestData)
{
if (!this.pendingAdditionSemaphore.tryAcquire())
{
return false;
}
this.requestMessages.put(requestData.futureId(), requestData);
this.pendingAdditionSemaphore.release();
return true;
}
public RequestData tryRemoveRequest(long requestId, HangingRequestTransferConsumer hangingRequestTransferConsumer)
{
RequestData removed = this.requestMessages.remove(requestId);
if (this.requestMessages.isEmpty() && this.tryClose())
{
hangingRequestTransferConsumer.accept(this.requestMessages.values());
}
return removed;
}
static class RequestData
{
public final ServerPlayerState serverPlayerState;
public final ServerPlayerState.RateLimiterSet rateLimiterSet;
public final FullDataSourceRequestMessage message;
public long futureId() { return this.message.futureId; }
public long sectionPos() { return this.message.sectionPos; }
RequestData(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
this.serverPlayerState = serverPlayerState;
this.rateLimiterSet = rateLimiterSet;
this.message = message;
}
}
/**
* While closing this group, some requests may slip through and end up lost. <br>
* This is a workaround that allows the caller to transfer these requests to a new group.
*/
@FunctionalInterface
interface HangingRequestTransferConsumer extends Consumer<Collection<RequestData>> { }
}
@@ -0,0 +1,264 @@
package com.seibel.distanthorizons.core.multiplayer.server;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.file.fullDatafile.GeneratedFullDataSourceProvider;
import com.seibel.distanthorizons.core.level.AbstractDhServerLevel;
import com.seibel.distanthorizons.core.logging.ConfigBasedLogger;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.exceptions.RequestRejectedException;
import com.seibel.distanthorizons.core.network.exceptions.SectionRequiresSplittingException;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import org.apache.logging.log4j.LogManager;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
public class FullDataSourceRequestHandler
{
private static final ConfigBasedLogger LOGGER = new ConfigBasedLogger(LogManager.getLogger(),
() -> Config.Common.Logging.logNetworkEvent.get());
private final AbstractDhServerLevel serverLevel;
private String getLevelIdentifier() { return this.serverLevel.getLevelWrapper().getDhIdentifier(); }
private GeneratedFullDataSourceProvider fullDataSourceProvider() { return this.serverLevel.serverside.fullDataFileHandler; }
private List<BeaconBeamDTO> getAllBeamsForPos(long pos) { return this.serverLevel.beaconBeamRepo.getAllBeamsForPos(pos); }
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupsByPos = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupsByFutureId = new ConcurrentHashMap<>();
public FullDataSourceRequestHandler(AbstractDhServerLevel serverLevel)
{
this.serverLevel = serverLevel;
}
//==================//
// network handling //
//==================//
public void queueLodSyncForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.getSynchronizeOnLoad())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.syncOnLoginRateLimiter.tryAcquire(message))
{
return;
}
// the client timestamp will be null if we want to retrieve the LOD regardless of when it was last updated
long clientTimestamp = (message.clientTimestamp != null) ? message.clientTimestamp : -1;
// the server timestamp will be null if no LOD data exists for this position
Long serverTimestamp = this.fullDataSourceProvider().getTimestampForPos(message.sectionPos);
if (serverTimestamp == null
|| serverTimestamp <= clientTimestamp)
{
// either no data exists to sync, or the client is already up to date
rateLimiterSet.syncOnLoginRateLimiter.release();
message.sendResponse(new FullDataSourceResponseMessage(null));
return;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
// shouldn't normally happen, but just in case
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
return;
}
this.fullDataSourceProvider().getAsync(message.sectionPos).thenAcceptAsync(fullDataSource ->
{
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.getAllBeamsForPos(message.sectionPos)))
{
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () ->
{
message.sendResponse(new FullDataSourceResponseMessage(payload));
rateLimiterSet.syncOnLoginRateLimiter.release();
});
}
}, executor);
}
public void queueWorldGenForRequestMessage(ServerPlayerState serverPlayerState, FullDataSourceRequestMessage message, ServerPlayerState.RateLimiterSet rateLimiterSet)
{
if (!serverPlayerState.sessionConfig.isDistantGenerationEnabled())
{
message.sendResponse(new RequestRejectedException("Operation is disabled in config."));
return;
}
if (!rateLimiterSet.generationRequestRateLimiter.tryAcquire(message))
{
return;
}
this.doQueueWorldGenForRequestMessage(new DataSourceRequestGroup.RequestData(serverPlayerState, message, rateLimiterSet));
}
private void doQueueWorldGenForRequestMessage(DataSourceRequestGroup.RequestData requestData)
{
while (true)
{
DataSourceRequestGroup requestGroup = this.requestGroupsByPos.computeIfAbsent(requestData.sectionPos(), pos ->
{
DataSourceRequestGroup newGroup = new DataSourceRequestGroup(pos);
this.tryFulfillDataSourceRequestGroup(newGroup, pos);
LOGGER.debug("[" + this.getLevelIdentifier() + "] Created request group for pos [" + DhSectionPos.toString(pos) + "].");
return newGroup;
});
// If this fails, loop until either a permit is acquired or the group is removed to create another one
if (!requestGroup.tryAddRequest(requestData))
{
Thread.yield();
continue;
}
this.requestGroupsByFutureId.put(requestData.futureId(), requestGroup);
break;
}
}
public void cancelRequest(long requestId)
{
DataSourceRequestGroup requestGroup = this.requestGroupsByFutureId.remove(requestId);
if (requestGroup == null)
{
return;
}
DataSourceRequestGroup.RequestData removedRequest = requestGroup.tryRemoveRequest(requestId, requestsToTransfer ->
{
LOGGER.debug("[" + this.getLevelIdentifier() + "] Cancelled request group [" + DhSectionPos.toString(requestGroup.pos) + "].");
this.requestGroupsByPos.remove(requestGroup.pos);
if (!requestsToTransfer.isEmpty())
{
for (DataSourceRequestGroup.RequestData requestToTransfer : requestsToTransfer)
{
this.doQueueWorldGenForRequestMessage(requestToTransfer);
}
}
else
{
this.fullDataSourceProvider().removeRetrievalRequestIf(pos -> pos == requestGroup.pos);
}
});
if (removedRequest != null)
{
removedRequest.rateLimiterSet.generationRequestRateLimiter.release();
}
}
public void tick()
{
// Send finished data source requests
for (Map.Entry<Long, DataSourceRequestGroup> entry : this.requestGroupsByPos.entrySet())
{
DataSourceRequestGroup requestGroup = entry.getValue();
if (requestGroup.fullDataSource == null)
{
continue;
}
LOGGER.debug("[" + this.getLevelIdentifier() + "] Fulfilled request group [" + entry.getKey() + "]");
// Make this group unavailable for adding into
this.requestGroupsByPos.remove(entry.getKey());
if (!requestGroup.tryClose())
{
continue;
}
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
{
LOGGER.warn("Unable to send FullDataSourceResponseMessage - getNetworkCompressionExecutor() is null");
continue;
}
CompletableFuture.runAsync(() ->
{
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.getAllBeamsForPos(entry.getKey())))
{
for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values())
{
this.requestGroupsByFutureId.remove(requestData.futureId());
requestData.serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> {
requestData.message.sendResponse(new FullDataSourceResponseMessage(payload));
requestData.rateLimiterSet.generationRequestRateLimiter.release();
});
}
}
}, executor);
}
}
private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos)
{
this.fullDataSourceProvider().getAsync(pos).thenAccept(fullDataSource ->
{
if (this.fullDataSourceProvider().isFullyGenerated(fullDataSource.columnGenerationSteps))
{
LOGGER.info("sending - complete [" + DhSectionPos.toString(pos) + "]");
requestGroup.fullDataSource = fullDataSource;
}
else if (!this.serverLevel.isNSizedGenerationSupported() && DhSectionPos.getDetailLevel(pos) > DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL)
{
// Make this group unavailable for adding into
this.requestGroupsByPos.remove(pos);
if (!requestGroup.tryClose())
{
return;
}
for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values())
{
this.requestGroupsByFutureId.remove(requestData.futureId());
requestData.rateLimiterSet.generationRequestRateLimiter.release();
requestData.message.sendResponse(new SectionRequiresSplittingException());
}
}
else if (requestGroup.isWorldGenTaskComplete())
{
LOGGER.info("sending - retry [" + DhSectionPos.toString(pos) + "]");
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
else
{
LOGGER.info("sending - queueing [" + DhSectionPos.toString(pos) + "]");
this.fullDataSourceProvider().queuePositionForRetrieval(pos);
}
});
}
public void onWorldGenTaskComplete(long pos)
{
DataSourceRequestGroup requestGroup = this.requestGroupsByPos.get(pos);
if (requestGroup != null)
{
requestGroup.markWorldGenTaskComplete();
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
}
}