Prevent request cancellation deadlock

This commit is contained in:
s809
2024-07-09 14:47:45 +05:00
parent 84e90a7a9b
commit 9e0edd0cf3
@@ -66,8 +66,8 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
private final RemotePlayerConnectionHandler remotePlayerConnectionHandler;
private final ConcurrentLinkedQueue<IServerPlayerWrapper> worldGenLoopingQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentMap<Long, IncompleteDataSourceEntry> incompleteDataSources = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, IncompleteDataSourceEntry> fullDataRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupsByPos = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, DataSourceRequestGroup> requestGroupsByFutureId = new ConcurrentHashMap<>();
public DhServerLevel(AbstractSaveStructure saveStructure, IServerLevelWrapper serverLevelWrapper, RemotePlayerConnectionHandler remotePlayerConnectionHandler)
{
@@ -107,18 +107,18 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
while (true)
{
IncompleteDataSourceEntry entry = this.incompleteDataSources.computeIfAbsent(msg.sectionPos, pos ->
DataSourceRequestGroup requestGroup = this.requestGroupsByPos.computeIfAbsent(msg.sectionPos, pos ->
{
IncompleteDataSourceEntry newEntry = new IncompleteDataSourceEntry();
this.trySetGeneratedDataSourceToEntry(newEntry, pos);
return newEntry;
DataSourceRequestGroup newGroup = new DataSourceRequestGroup();
this.tryFulfillDataSourceRequestGroup(newGroup, pos);
return newGroup;
});
// If this fails, current entry is being drained and need to create another one
if (entry.requestCollectionSemaphore.tryAcquire())
// If this fails, current group is being drained and need to create another one
if (requestGroup.requestAddSemaphore.tryAcquire())
{
this.fullDataRequests.put(msg.futureId, entry);
entry.requestMessages.put(msg.futureId, msg);
entry.requestCollectionSemaphore.release();
this.requestGroupsByFutureId.put(msg.futureId, requestGroup);
requestGroup.requestMessages.put(msg.futureId, msg);
requestGroup.requestAddSemaphore.release();
break;
}
}
@@ -162,23 +162,30 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
serverPlayerState.session.registerHandler(CancelMessage.class, msg ->
{
IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId);
if (entry == null)
DataSourceRequestGroup requestGroup = this.requestGroupsByFutureId.remove(msg.futureId);
if (requestGroup == null)
{
return;
}
FullDataSourceRequestMessage requestMessage = entry.requestMessages.remove(msg.futureId);
FullDataSourceRequestMessage requestMessage = requestGroup.requestMessages.remove(msg.futureId);
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
if (entry.requestMessages.isEmpty())
if (requestGroup.requestRemoveSemaphore.tryAcquire())
{
this.incompleteDataSources.remove(requestMessage.sectionPos);
this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos);
// Prevent adding requests in case request will be removed by cancellation
requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
if (requestGroup.requestMessages.isEmpty())
{
this.requestGroupsByPos.remove(requestMessage.sectionPos);
this.serverside.fullDataFileHandler.removeRetrievalRequestIf(pos -> pos == requestMessage.sectionPos);
}
else
{
requestGroup.requestAddSemaphore.release(Short.MAX_VALUE);
}
}
entry.requestCollectionSemaphore.release(Short.MAX_VALUE);
});
}
@@ -234,19 +241,20 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
this.chunkToLodBuilder.tick();
// Send finished data source requests
for (Map.Entry<Long, IncompleteDataSourceEntry> mapEntry : this.incompleteDataSources.entrySet())
for (Map.Entry<Long, DataSourceRequestGroup> entry : this.requestGroupsByPos.entrySet())
{
IncompleteDataSourceEntry entry = mapEntry.getValue();
DataSourceRequestGroup requestGroup = entry.getValue();
if (entry.fullDataSource == null)
if (requestGroup.fullDataSource == null)
{
continue;
}
this.incompleteDataSources.remove(mapEntry.getKey());
// Prevent adding or removing requests
requestGroup.requestRemoveSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
requestGroup.requestAddSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
// This semaphore is intentionally acquired forever
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
this.requestGroupsByPos.remove(entry.getKey());
ThreadPoolExecutor executor = ThreadPoolUtil.getNetworkCompressionExecutor();
if (executor == null)
@@ -256,10 +264,10 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
}
CompletableFuture.runAsync(() ->
{
FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(entry.fullDataSource);
for (FullDataSourceRequestMessage msg : entry.requestMessages.values())
FullDataSourceResponseMessage response = new FullDataSourceResponseMessage(requestGroup.fullDataSource);
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
{
this.fullDataRequests.remove(msg.futureId);
this.requestGroupsByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
@@ -387,12 +395,12 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
@Override
public boolean hasSkyLight() { return this.serverLevelWrapper.hasSkyLight(); }
private void trySetGeneratedDataSourceToEntry(IncompleteDataSourceEntry entry, long pos)
private void tryFulfillDataSourceRequestGroup(DataSourceRequestGroup requestGroup, long pos)
{
this.serverside.fullDataFileHandler.getAsync(pos).thenAccept(fullDataSource -> {
if (this.serverside.fullDataFileHandler.isFullyGenerated(fullDataSource.columnGenerationSteps))
{
entry.fullDataSource = fullDataSource;
requestGroup.fullDataSource = fullDataSource;
}
else
{
@@ -404,10 +412,10 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
@Override
public void onWorldGenTaskComplete(long pos)
{
IncompleteDataSourceEntry entry = this.incompleteDataSources.get(pos);
if (entry != null)
DataSourceRequestGroup requestGroup = this.requestGroupsByPos.get(pos);
if (requestGroup != null)
{
this.trySetGeneratedDataSourceToEntry(entry, pos);
this.tryFulfillDataSourceRequestGroup(requestGroup, pos);
}
}
@@ -424,12 +432,15 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
messageList.add("["+dimName+"]");
}
private static class IncompleteDataSourceEntry
private static class DataSourceRequestGroup
{
public final ConcurrentMap<Long, FullDataSourceRequestMessage> requestMessages = new ConcurrentHashMap<>();
@CheckForNull
public FullDataSourceV2 fullDataSource;
public final ConcurrentMap<Long, FullDataSourceRequestMessage> requestMessages = new ConcurrentHashMap<>();
public final Semaphore requestCollectionSemaphore = new Semaphore(Short.MAX_VALUE, true);
public final Semaphore requestAddSemaphore = new Semaphore(Short.MAX_VALUE, true);
public final Semaphore requestRemoveSemaphore = new Semaphore(Short.MAX_VALUE, true);
}
}