Manually close compression streams to try reducing GC reliance

This commit is contained in:
James Seibel
2025-09-29 17:21:01 -05:00
parent d33be490a7
commit 12a885aa6e
5 changed files with 96 additions and 84 deletions
@@ -9,6 +9,7 @@ import com.seibel.distanthorizons.core.sql.dto.FullDataSourceV1DTO;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import com.seibel.distanthorizons.core.sql.repo.FullDataSourceV1Repo;
import com.seibel.distanthorizons.core.util.objects.DataCorruptedException;
import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataInputStream;
import com.seibel.distanthorizons.core.util.threading.ThreadPoolUtil;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.logging.log4j.Logger;
@@ -79,7 +80,10 @@ public class FullDataSourceProviderV1<TDhLevel extends IDhLevel>
protected FullDataSourceV1 createDataSourceFromDto(FullDataSourceV1DTO dto) throws InterruptedException, IOException, DataCorruptedException
{
FullDataSourceV1 dataSource = FullDataSourceV1.createEmpty(dto.pos);
dataSource.populateFromStream(dto, dto.getInputStream(), this.level);
try (DhDataInputStream inputStream = dto.getInputStream())
{
dataSource.populateFromStream(dto, inputStream, this.level);
}
return dataSource;
}
@@ -237,63 +237,63 @@ public class FullDataSourceV2DTO
// normally a DhStream should be the topmost stream to prevent closing the stream accidentally,
// but since this stream will be closed immediately after writing anyway, it won't be an issue
DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum);
// write the data
int dataArrayLength = FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH;
for (int xz = 0; xz < dataArrayLength; xz++)
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
{
LongArrayList dataColumn = inputDataArray[xz];
// write column length
short columnLength = (dataColumn != null) ? (short) dataColumn.size() : 0;
// a short is used instead of an int because at most we store 4096 vertical slices and a
// short fits that with less wasted spaces vs an int (short has max value of 32,767 vs int's max of 2 billion)
compressedOut.writeShort(columnLength);
// write column data (will be skipped if no data was present)
for (int y = 0; y < columnLength; y++)
// write the data
int dataArrayLength = FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH;
for (int xz = 0; xz < dataArrayLength; xz++)
{
compressedOut.writeLong(dataColumn.getLong(y));
LongArrayList dataColumn = inputDataArray[xz];
// write column length
short columnLength = (dataColumn != null) ? (short) dataColumn.size() : 0;
// a short is used instead of an int because at most we store 4096 vertical slices and a
// short fits that with less wasted spaces vs an int (short has max value of 32,767 vs int's max of 2 billion)
compressedOut.writeShort(columnLength);
// write column data (will be skipped if no data was present)
for (int y = 0; y < columnLength; y++)
{
compressedOut.writeLong(dataColumn.getLong(y));
}
}
// generate the checksum
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
// generate the checksum
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
private static void readBlobToDataSourceDataArray(ByteArrayList inputCompressedDataByteArray, LongArrayList[] outputDataLongArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum);
// read the data
int dataArrayLength = FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH;
for (int xz = 0; xz < dataArrayLength; xz++)
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
// read the column length
short dataColumnLength = compressedIn.readShort(); // separate variables are used for debugging and in case validation wants to be added later
if (dataColumnLength < 0)
// read the data
int dataArrayLength = FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH;
for (int xz = 0; xz < dataArrayLength; xz++)
{
throw new DataCorruptedException("Read DataSource Blob data at index ["+xz+"], column length ["+dataColumnLength+"] should be greater than zero.");
}
LongArrayList dataColumn = outputDataLongArray[xz];
ListUtil.clearAndSetSize(dataColumn, dataColumnLength);
// read column data (will be skipped if no data was present)
for (int y = 0; y < dataColumnLength; y++)
{
long dataPoint = compressedIn.readLong();
if (VALIDATE_INPUT_DATAPOINTS)
// read the column length
short dataColumnLength = compressedIn.readShort(); // separate variables are used for debugging and in case validation wants to be added later
if (dataColumnLength < 0)
{
FullDataPointUtil.validateDatapoint(dataPoint);
throw new DataCorruptedException("Read DataSource Blob data at index [" + xz + "], column length [" + dataColumnLength + "] should be greater than zero.");
}
LongArrayList dataColumn = outputDataLongArray[xz];
ListUtil.clearAndSetSize(dataColumn, dataColumnLength);
// read column data (will be skipped if no data was present)
for (int y = 0; y < dataColumnLength; y++)
{
long dataPoint = compressedIn.readLong();
if (VALIDATE_INPUT_DATAPOINTS)
{
FullDataPointUtil.validateDatapoint(dataPoint);
}
dataColumn.set(y, dataPoint);
}
dataColumn.set(y, dataPoint);
}
}
}
@@ -302,23 +302,23 @@ public class FullDataSourceV2DTO
private static void writeGenerationStepsToBlob(ByteArrayList inputColumnGenStepByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum);
for (int i = 0; i < inputColumnGenStepByteArray.size(); i++)
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
{
compressedOut.writeByte(inputColumnGenStepByteArray.getByte(i));
for (int i = 0; i < inputColumnGenStepByteArray.size(); i++)
{
compressedOut.writeByte(inputColumnGenStepByteArray.getByte(i));
}
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
private static void readBlobToGenerationSteps(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum);
try
try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
compressedIn.readFully(outputByteArray.elements(), 0, FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH);
}
@@ -332,23 +332,23 @@ public class FullDataSourceV2DTO
private static void writeWorldCompressionModeToBlob(ByteArrayList inputWorldCompressionModeByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum);
for (int i = 0; i < inputWorldCompressionModeByteArray.size(); i++)
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
{
compressedOut.write(inputWorldCompressionModeByteArray.getByte(i));
for (int i = 0; i < inputWorldCompressionModeByteArray.size(); i++)
{
compressedOut.write(inputWorldCompressionModeByteArray.getByte(i));
}
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
private static void readBlobToWorldCompressionMode(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum);
try
try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
compressedIn.readFully(outputByteArray.elements(), 0, FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH);
}
@@ -362,21 +362,23 @@ public class FullDataSourceV2DTO
private static void writeDataMappingToBlob(FullDataPointIdMap mapping, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum);
mapping.serialize(compressedOut);
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
try(DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
{
mapping.serialize(compressedOut);
compressedOut.flush();
byteArrayOutputStream.close();
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static FullDataPointIdMap readBlobToDataMapping(ByteArrayList compressedMappingByteArray, long pos, @NotNull ILevelWrapper levelWrapper, EDhApiDataCompressionMode compressionModeEnum) throws IOException, InterruptedException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedMappingByteArray.elements());
DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum);
FullDataPointIdMap mapping = FullDataPointIdMap.deserialize(compressedIn, pos, levelWrapper);
return mapping;
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
FullDataPointIdMap mapping = FullDataPointIdMap.deserialize(compressedIn, pos, levelWrapper);
return mapping;
}
}
@@ -410,10 +410,9 @@ public class FullDataSourceV2Repo extends AbstractDhRepo<Long, FullDataSourceV2D
byte compressionModeEnumValue = result.getByte("CompressionMode");
EDhApiDataCompressionMode compressionModeEnum = EDhApiDataCompressionMode.getFromValue(compressionModeEnumValue);
try
// decompress the data
try(DhDataInputStream compressedIn = new DhDataInputStream(result.getBinaryStream("ColumnGenerationStep"), compressionModeEnum))
{
// decompress the data
DhDataInputStream compressedIn = new DhDataInputStream(result.getBinaryStream("ColumnGenerationStep"), compressionModeEnum);
putAllBytes(compressedIn, outputByteArray);
}
catch (IOException e)
@@ -113,7 +113,10 @@ public class DhDataInputStream extends DataInputStream
}
}
@Override
public void close() throws IOException { /* Do nothing. */ }
// TODO at one point closing the streams caused errors, is that due to a bug with LZMA streams or some bug in DH's code that was since fixed?
// if streams aren't closed that cause cause higher-than-expected native memory use if the GC decides
// it doesn't want to clear the stream objects
//@Override
//public void close() throws IOException { /* Do nothing. */ }
}
@@ -88,7 +88,11 @@ public class DhDataOutputStream extends DataOutputStream
}
}
@Override
public void close() throws IOException { /* Do nothing. */ }
// TODO at one point closing the streams caused errors, is that due to a bug with LZMA streams or some bug in DH's code that was since fixed?
// if streams aren't closed that cause cause higher-than-expected native memory use if the GC decides
// it doesn't want to clear the stream objects
//@Override
//public void close() throws IOException { /* Do nothing. */ }
}