Replace pooled buffers with unpooled

This commit is contained in:
s809
2025-08-07 17:55:22 +05:00
parent a05bd307f9
commit 674fc30e77
8 changed files with 47 additions and 103 deletions
@@ -261,8 +261,7 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
} }
LodUtil.assertTrue(this.beaconBeamRepo != null, "beaconBeamRepo should not be null"); LodUtil.assertTrue(this.beaconBeamRepo != null, "beaconBeamRepo should not be null");
try (FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos()))) FullDataPayload payload = new FullDataPayload(data, this.beaconBeamRepo.getAllBeamsForPos(data.getPos()));
{
for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers()) for (ServerPlayerState serverPlayerState : this.serverPlayerStateManager.getReadyPlayers())
{ {
if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper) if (serverPlayerState.getServerPlayer().getLevel() != this.serverLevelWrapper)
@@ -285,7 +284,6 @@ public abstract class AbstractDhServerLevel extends AbstractDhLevel implements I
}); });
} }
} }
}
}); });
} }
@@ -164,7 +164,7 @@ public class DhClientLevel extends AbstractDhLevel implements IDhClientLevel
} }
try(FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(message.payload)) try (FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSource(message.payload))
{ {
boolean isSameLevel = message.isSameLevelAs(this.levelWrapper); boolean isSameLevel = message.isSameLevelAs(this.levelWrapper);
NETWORK_LOGGER.debug("Buffer {} isSameLevel: {}", message.payload.dtoBufferId, isSameLevel); NETWORK_LOGGER.debug("Buffer {} isSameLevel: {}", message.payload.dtoBufferId, isSameLevel);
@@ -256,7 +256,7 @@ public abstract class AbstractFullDataNetworkRequestQueue implements IDebugRende
if (response.payload != null) if (response.payload != null)
{ {
FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSourceAndReleaseBuffer(response.payload); FullDataSourceV2DTO dataSourceDto = this.networkState.fullDataPayloadReceiver.decodeDataSource(response.payload);
// set application flags based on the received detail level, // set application flags based on the received detail level,
// this is needed so the data sources propagate correctly // this is needed so the data sources propagate correctly
@@ -9,18 +9,17 @@ import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMe
import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO; import com.seibel.distanthorizons.core.sql.dto.BeaconBeamDTO;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/** /**
* @see FullDataSplitMessage * @see FullDataSplitMessage
*/ */
public class FullDataPayload implements INetworkObject, AutoCloseable public class FullDataPayload implements INetworkObject
{ {
private static final AtomicInteger lastBufferId = new AtomicInteger(); private static final AtomicInteger lastBufferId = new AtomicInteger();
@@ -47,7 +46,7 @@ public class FullDataPayload implements INetworkObject, AutoCloseable
EDhApiDataCompressionMode compressionMode = Config.Common.LodBuilding.dataCompression.get(); EDhApiDataCompressionMode compressionMode = Config.Common.LodBuilding.dataCompression.get();
try (FullDataSourceV2DTO dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode)) try (FullDataSourceV2DTO dataSourceDto = FullDataSourceV2DTO.CreateFromDataSource(fullDataSource, compressionMode))
{ {
this.dtoBuffer = ByteBufAllocator.DEFAULT.buffer(); this.dtoBuffer = Unpooled.buffer();
dataSourceDto.encode(this.dtoBuffer); dataSourceDto.encode(this.dtoBuffer);
} }
} }
@@ -85,12 +84,6 @@ public class FullDataPayload implements INetworkObject, AutoCloseable
// base overrides // // base overrides //
//================// //================//
@Override
public void close()
{
this.dtoBuffer.release();
}
@Override @Override
public String toString() public String toString()
{ {
@@ -9,8 +9,8 @@ import com.seibel.distanthorizons.core.network.INetworkObject;
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage; import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO; import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV2DTO;
import com.seibel.distanthorizons.core.util.LodUtil; import com.seibel.distanthorizons.core.util.LodUtil;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import java.util.Objects; import java.util.Objects;
@@ -24,15 +24,7 @@ public class FullDataPayloadReceiver implements AutoCloseable
private final ConcurrentMap<Integer, CompositeByteBuf> buffersById = CacheBuilder.newBuilder() private final ConcurrentMap<Integer, CompositeByteBuf> buffersById = CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.SECONDS) .expireAfterAccess(10, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Integer, CompositeByteBuf> notification) -> .<Integer, CompositeByteBuf>build().asMap();
{
// If an entry was replaced without removing, the buffer has to be released manually
if (notification.getCause() != RemovalCause.REPLACED)
{
Objects.requireNonNull(notification.getValue()).release();
}
})
.build().asMap();
@Override @Override
public void close() public void close()
@@ -46,13 +38,7 @@ public class FullDataPayloadReceiver implements AutoCloseable
{ {
if (message.isFirst) if (message.isFirst)
{ {
if (composite != null) composite = Unpooled.compositeBuffer();
{
composite.release();
LOGGER.debug("Released existing full data buffer [" + message.bufferId + "]");
}
composite = ByteBufAllocator.DEFAULT.compositeBuffer();
LOGGER.debug("Created new full data buffer [" + message.bufferId + "]: [" + composite + "]"); LOGGER.debug("Created new full data buffer [" + message.bufferId + "]: [" + composite + "]");
} }
else if (composite == null) else if (composite == null)
@@ -67,7 +53,7 @@ public class FullDataPayloadReceiver implements AutoCloseable
}); });
} }
public FullDataSourceV2DTO decodeDataSourceAndReleaseBuffer(FullDataPayload payload) public FullDataSourceV2DTO decodeDataSource(FullDataPayload payload)
{ {
CompositeByteBuf compositeByteBuffer = this.buffersById.get(payload.dtoBufferId); CompositeByteBuf compositeByteBuffer = this.buffersById.get(payload.dtoBufferId);
LodUtil.assertTrue(compositeByteBuffer != null); LodUtil.assertTrue(compositeByteBuffer != null);
@@ -38,12 +38,6 @@ public class FullDataPayloadSender implements AutoCloseable
public void close() public void close()
{ {
this.tickTimerTask.cancel(); this.tickTimerTask.cancel();
PendingTransfer pendingTransfer;
while ((pendingTransfer = this.transferQueue.poll()) != null)
{
pendingTransfer.close();
}
} }
@@ -78,36 +72,25 @@ public class FullDataPayloadSender implements AutoCloseable
if (pendingTransfer.buffer.readableBytes() == 0) if (pendingTransfer.buffer.readableBytes() == 0)
{ {
pendingTransfer.sendFinalMessage.run(); pendingTransfer.sendFinalMessage.run();
pendingTransfer.close();
this.transferQueue.poll(); this.transferQueue.poll();
} }
} }
} }
private static class PendingTransfer implements AutoCloseable private static class PendingTransfer
{ {
public final int bufferId; public final int bufferId;
public final ByteBuf buffer; public final ByteBuf buffer;
public final Runnable sendFinalMessage; public final Runnable sendFinalMessage;
private final AtomicBoolean isClosed = new AtomicBoolean();
private PendingTransfer(FullDataPayload payload, Runnable sendFinalMessage) private PendingTransfer(FullDataPayload payload, Runnable sendFinalMessage)
{ {
this.bufferId = payload.dtoBufferId; this.bufferId = payload.dtoBufferId;
this.buffer = payload.dtoBuffer.retainedDuplicate().readerIndex(0); this.buffer = payload.dtoBuffer.duplicate().readerIndex(0);
this.sendFinalMessage = sendFinalMessage; this.sendFinalMessage = sendFinalMessage;
} }
@Override
public void close()
{
if (this.isClosed.compareAndSet(false, true))
{
this.buffer.release();
}
}
} }
} }
@@ -119,8 +119,7 @@ public class FullDataSourceRequestHandler
} }
// send the found data source to client // send the found data source to client
try (FullDataPayload payload = new FullDataPayload(fullDataSource, this.getAllBeamsForPos(message.sectionPos))) FullDataPayload payload = new FullDataPayload(fullDataSource, this.getAllBeamsForPos(message.sectionPos));
{
fullDataSource.close(); fullDataSource.close();
serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () -> serverPlayerState.fullDataPayloadSender.sendInChunks(payload, () ->
@@ -129,7 +128,6 @@ public class FullDataSourceRequestHandler
rateLimiterSet.syncOnLoginRateLimiter.release(); rateLimiterSet.syncOnLoginRateLimiter.release();
}); });
} }
}
catch (Exception e) catch (Exception e)
{ {
LOGGER.error("Unexpected issue sending request for pos [" + DhSectionPos.toString(message.sectionPos) + "], error: [" + e.getMessage() + "].", e); LOGGER.error("Unexpected issue sending request for pos [" + DhSectionPos.toString(message.sectionPos) + "], error: [" + e.getMessage() + "].", e);
@@ -245,8 +243,7 @@ public class FullDataSourceRequestHandler
} }
CompletableFuture.runAsync(() -> CompletableFuture.runAsync(() ->
{ {
try (FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.getAllBeamsForPos(entry.getKey()))) FullDataPayload payload = new FullDataPayload(requestGroup.fullDataSource, this.getAllBeamsForPos(entry.getKey()));
{
requestGroup.fullDataSource.close(); requestGroup.fullDataSource.close();
for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values()) for (DataSourceRequestGroup.RequestData requestData : requestGroup.requestMessages.values())
@@ -258,7 +255,6 @@ public class FullDataSourceRequestHandler
requestData.rateLimiterSet.generationRequestRateLimiter.release(); requestData.rateLimiterSet.generationRequestRateLimiter.release();
}); });
} }
}
}, executor); }, executor);
} }
} }
@@ -22,8 +22,8 @@ package com.seibel.distanthorizons.core.network.messages.fullData;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload; import com.seibel.distanthorizons.core.multiplayer.fullData.FullDataPayload;
import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage; import com.seibel.distanthorizons.core.network.messages.AbstractNetworkMessage;
import com.seibel.distanthorizons.core.util.TimerUtil;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Timer; import java.util.Timer;
@@ -34,16 +34,10 @@ import java.util.Timer;
*/ */
public class FullDataSplitMessage extends AbstractNetworkMessage public class FullDataSplitMessage extends AbstractNetworkMessage
{ {
private static final long BUFFER_RELEASE_DELAY_MS = 5000L;
public int bufferId; public int bufferId;
public ByteBuf buffer; public ByteBuf buffer;
public boolean isFirst; public boolean isFirst;
// Reference counting is unreliable here for some reason so this is a "fix"
private static final Timer bufferReleaseTimer = TimerUtil.CreateTimer("FullDataBufferCleanupTimer");
private boolean releaseScheduled = false;
//==============// //==============//
// constructors // // constructors //
@@ -72,12 +66,6 @@ public class FullDataSplitMessage extends AbstractNetworkMessage
out.writeBytes(this.buffer.readerIndex(0)); out.writeBytes(this.buffer.readerIndex(0));
out.writeBoolean(this.isFirst); out.writeBoolean(this.isFirst);
if (!this.releaseScheduled)
{
bufferReleaseTimer.schedule(TimerUtil.createTimerTask(this.buffer::release), BUFFER_RELEASE_DELAY_MS);
this.releaseScheduled = true;
}
} }
@Override @Override
@@ -86,7 +74,7 @@ public class FullDataSplitMessage extends AbstractNetworkMessage
this.bufferId = in.readInt(); this.bufferId = in.readInt();
int bufferSize = in.readInt(); int bufferSize = in.readInt();
this.buffer = in.readBytes(bufferSize); this.buffer = Unpooled.copiedBuffer(in.readSlice(bufferSize));
this.isFirst = in.readBoolean(); this.isFirst = in.readBoolean();
} }