"Fix" buffer release errors in FullDataPayload

This commit is contained in:
s809
2024-08-23 14:16:20 +05:00
parent 81e6f55dbf
commit f17c3fa267
3 changed files with 46 additions and 49 deletions
@@ -294,22 +294,20 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
}
CompletableFuture.runAsync(() ->
{
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource))
FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource);
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
{
for (FullDataSourceRequestMessage msg : requestGroup.requestMessages.values())
this.requestGroupsByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
{
this.requestGroupsByFutureId.remove(msg.futureId);
ServerPlayerState serverPlayerState = this.remotePlayerConnectionHandler.getConnectedPlayer(msg.serverPlayer());
if (serverPlayerState == null)
{
continue;
}
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, msg.getSession()::sendMessage);
msg.sendResponse(new FullDataSourceResponseMessage(payload.retain()));
continue;
}
serverPlayerState.getRateLimiterSet(this).fullDataRequestConcurrencyLimiter.release();
payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, msg.getSession()::sendMessage);
msg.sendResponse(new FullDataSourceResponseMessage(payload));
}
}, executor);
}
@@ -331,28 +329,26 @@ public class DhServerLevel extends AbstractDhLevel implements IDhServerLevel
}
CompletableFuture.runAsync(() ->
{
try (FullDataPayload payload = new FullDataPayload(data))
FullDataPayload payload = new FullDataPayload(data);
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
{
for (ServerPlayerState serverPlayerState : this.remotePlayerConnectionHandler.getConnectedPlayers())
if (serverPlayerState.serverPlayer().getLevel() != this.serverLevelWrapper)
{
if (serverPlayerState.serverPlayer().getLevel() != this.serverLevelWrapper)
{
continue;
}
if (!serverPlayerState.config.isRealTimeUpdatesEnabled())
{
continue;
}
Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition();
int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
{
payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage);
serverPlayerState.session.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload.retain()));
}
continue;
}
if (!serverPlayerState.config.isRealTimeUpdatesEnabled())
{
continue;
}
Vec3d playerPosition = serverPlayerState.serverPlayer().getPosition();
int distanceFromPlayer = DhSectionPos.getManhattanBlockDistance(data.getPos(), new DhBlockPos2D((int) playerPosition.x, (int) playerPosition.z)) / 16;
if (distanceFromPlayer >= serverPlayerState.serverPlayer().getViewDistance() &&
distanceFromPlayer <= serverPlayerState.config.getRenderDistanceRadius())
{
payload.acceptInChunkMessages(FULL_DATA_CHUNK_SIZE, serverPlayerState.session::sendMessage);
serverPlayerState.session.sendMessage(new FullDataPartialUpdateMessage(this.serverLevelWrapper, payload));
}
}
}, executor);
@@ -75,7 +75,7 @@ public interface INetworkObject
default String readString(ByteBuf inputByteBuf)
{
int length = inputByteBuf.readUnsignedShort();
return inputByteBuf.readBytes(length).toString(StandardCharsets.UTF_8);
return inputByteBuf.readSlice(length).toString(StandardCharsets.UTF_8);
}
default <T> void writeCollection(ByteBuf outputByteBuf, Collection<T> collection)
@@ -6,21 +6,26 @@ import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.dataObjects.fullData.sources.FullDataSourceV2;
import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.util.TimerUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.jetbrains.annotations.NotNull;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public class FullDataPayload implements INetworkObject, Closeable
public class FullDataPayload implements INetworkObject
{
private static final AtomicInteger lastBufferId = new AtomicInteger();
// Reference counting is unreliable here for some reason so this is a "fix"
private static final Timer bufferCleanupTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer");
public int dtoBufferId;
public ByteBuf dtoBuffer;
@@ -39,6 +44,15 @@ public class FullDataPayload implements INetworkObject, Closeable
this.dtoBuffer = ByteBufAllocator.DEFAULT.buffer();
dataSourceDto.encode(this.dtoBuffer);
bufferCleanupTimer.schedule(new TimerTask()
{
@Override
public void run()
{
FullDataPayload.this.dtoBuffer.release();
}
}, 5000L);
}
catch (IOException e)
{
@@ -64,24 +78,11 @@ public class FullDataPayload implements INetworkObject, Closeable
}
}
public FullDataPayload retain()
{
this.dtoBuffer.retain();
return this;
}
@Override public void close()
{
this.dtoBuffer.release();
}
@Override
public void encode(ByteBuf out)
{
out.writeInt(this.dtoBufferId);
this.dtoBuffer.release();
}
@Override