Avoid losing requests due to concurrency

This commit is contained in:
s809
2023-07-31 17:42:14 +05:00
parent 95d721e1a3
commit 6d1f9803ce
@@ -2,8 +2,6 @@ package com.seibel.distanthorizons.core.world;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.CompleteFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IFullDataSource;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.interfaces.IIncompleteFullDataSource;
@@ -14,13 +12,13 @@ import com.seibel.distanthorizons.core.level.IDhLevel;
import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.messages.*;
import com.seibel.distanthorizons.core.network.objects.RemotePlayer;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import com.seibel.distanthorizons.core.util.LodUtil;
import com.seibel.distanthorizons.core.wrapperInterfaces.misc.IServerPlayerWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.IServerLevelWrapper;
import com.seibel.distanthorizons.coreapi.util.math.Vec3d;
import io.netty.channel.ChannelHandlerContext;
import javax.annotation.CheckForNull;
@@ -38,7 +36,7 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
private final BiMap<ChannelHandlerContext, RemotePlayer> playersByConnection;
private final ConcurrentLinkedQueue<IServerPlayerWrapper> worldGenLoopingQueue = new ConcurrentLinkedQueue<>();
private ConcurrentMap<DhSectionPos, IncompleteDataSourceEntry> incompleteDataSources = new ConcurrentHashMap<>();
private final ConcurrentMap<DhSectionPos, IncompleteDataSourceEntry> incompleteDataSources = new ConcurrentHashMap<>();
@@ -106,13 +104,23 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
DhServerLevel level = this.getLevel(playersByConnection.get(msg.getChannelContext()).serverPlayer.getLevel());
GeneratedFullDataFileHandler handler = level.serverside.dataFileHandler;
incompleteDataSources.computeIfAbsent(msg.dhSectionPos, pos -> {
IncompleteDataSourceEntry entry = new IncompleteDataSourceEntry();
handler.read(msg.dhSectionPos).thenAccept(fullDataSource -> {
entry.fullDataSource = fullDataSource;
while (true)
{
IncompleteDataSourceEntry entry = incompleteDataSources.computeIfAbsent(msg.dhSectionPos, pos -> {
IncompleteDataSourceEntry newEntry = new IncompleteDataSourceEntry();
handler.read(msg.dhSectionPos).thenAccept(fullDataSource -> {
newEntry.fullDataSource = fullDataSource;
});
return newEntry;
});
return entry;
}).requestMessages.add(msg);
// If this fails, current entry is being drained and need create another one
if (entry.requestCollectionSemaphore.tryAcquire())
{
entry.requestMessages.add(msg);
entry.requestCollectionSemaphore.release();
return;
}
}
});
}
@@ -183,6 +191,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
for (Iterator<IncompleteDataSourceEntry> it = incompleteDataSources.values().iterator(); it.hasNext(); )
{
IncompleteDataSourceEntry entry = it.next();
if (entry.fullDataSource == null) continue;
if (entry.fullDataSource instanceof IIncompleteFullDataSource)
{
IIncompleteFullDataSource incompleteSource = (IIncompleteFullDataSource) entry.fullDataSource;
@@ -190,12 +200,13 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
entry.fullDataSource = incompleteSource.tryPromotingToCompleteDataSource();
}
if (!(entry.fullDataSource instanceof CompleteFullDataSource))
LodUtil.assertNotReach("Invalid full data source");
it.remove();
CompleteFullDataSource completeSource = (CompleteFullDataSource) entry.fullDataSource;
LodUtil.assertTrue(entry.fullDataSource instanceof CompleteFullDataSource, "Invalid full data source");
it.remove();
// This semaphore is intentionally acquired forever
entry.requestCollectionSemaphore.acquireUninterruptibly(Short.MAX_VALUE);
CompleteFullDataSource completeSource = (CompleteFullDataSource) entry.fullDataSource;
for (FullDataSourceRequestMessage msg : entry.requestMessages)
{
RemotePlayer remotePlayer = playersByConnection.get(msg.getChannelContext());
@@ -217,7 +228,7 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
}
this.worldGenLoopingQueue.add(firstPlayer);
com.seibel.distanthorizons.coreapi.util.math.Vec3d position = firstPlayer.getPosition();
Vec3d position = firstPlayer.getPosition();
level.doWorldGen(new DhBlockPos2D((int) position.x, (int) position.z));
});
}
@@ -247,7 +258,8 @@ public class DhServerWorld extends AbstractDhWorld implements IDhServerWorld
{
@CheckForNull
public IFullDataSource fullDataSource;
public Set<FullDataSourceRequestMessage> requestMessages = ConcurrentHashMap.newKeySet();
public final Set<FullDataSourceRequestMessage> requestMessages = ConcurrentHashMap.newKeySet();
public final Semaphore requestCollectionSemaphore = new Semaphore(Short.MAX_VALUE, true);
}
}