data stream cleanup

This commit is contained in:
James Seibel
2025-11-24 14:15:23 -06:00
parent 3349e5b898
commit c8c9df3a34
5 changed files with 78 additions and 80 deletions
@@ -318,8 +318,7 @@ public class FullDataSourceV2DTO
LongArrayList[] inputDataArray, ByteArrayList outputByteArray,
EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
try (DhDataOutputStream compressedOut = DhDataOutputStream.create(compressionModeEnum, outputByteArray))
{
// write the data
int dataArrayLength = FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH;
@@ -340,21 +339,12 @@ public class FullDataSourceV2DTO
}
}
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static void readBlobToDataSourceDataArrayV1(
ByteArrayList inputCompressedDataByteArray, LongArrayList[] outputDataLongArray,
EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray, compressionModeEnum))
{
// read the data
int dataArrayLength = FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH;
@@ -406,8 +396,7 @@ public class FullDataSourceV2DTO
maxZ = FullDataSourceV2.WIDTH-1;
}
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
try (DhDataOutputStream compressedOut = DhDataOutputStream.create(compressionModeEnum, outputByteArray))
{
// this method would be simpler if we allocated a bunch of temporary arrays,
// but we're trying to avoid garbage.
@@ -525,15 +514,6 @@ public class FullDataSourceV2DTO
}
}
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static void readBlobToDataSourceDataArrayV2(
ByteArrayList inputCompressedDataByteArray,
@@ -560,7 +540,7 @@ public class FullDataSourceV2DTO
}
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray, compressionModeEnum))
{
// 1. column counts, preallocate
for (int x = minX; x < maxX; x++)
@@ -686,27 +666,17 @@ public class FullDataSourceV2DTO
private static void writeGenerationStepsToBlob(ByteArrayList inputColumnGenStepByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
try (DhDataOutputStream compressedOut = DhDataOutputStream.create(compressionModeEnum, outputByteArray))
{
for (int i = 0; i < inputColumnGenStepByteArray.size(); i++)
{
compressedOut.writeByte(inputColumnGenStepByteArray.getByte(i));
}
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static void readBlobToGenerationSteps(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
try(DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
try(DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray, compressionModeEnum))
{
compressedIn.readFully(outputByteArray.elements(), 0, FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH);
}
@@ -719,27 +689,17 @@ public class FullDataSourceV2DTO
private static void writeWorldCompressionModeToBlob(ByteArrayList inputWorldCompressionModeByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
try (DhDataOutputStream compressedOut = DhDataOutputStream.create(compressionModeEnum, outputByteArray))
{
for (int i = 0; i < inputWorldCompressionModeByteArray.size(); i++)
{
compressedOut.write(inputWorldCompressionModeByteArray.getByte(i));
}
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static void readBlobToWorldCompressionMode(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
try(DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
try(DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray, compressionModeEnum))
{
compressedIn.readFully(outputByteArray.elements(), 0, FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH);
}
@@ -752,24 +712,14 @@ public class FullDataSourceV2DTO
private static void writeDataMappingToBlob(FullDataPointIdMap mapping, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try(DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
try(DhDataOutputStream compressedOut = DhDataOutputStream.create(compressionModeEnum, outputByteArray))
{
mapping.serialize(compressedOut);
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static FullDataPointIdMap readBlobToDataMapping(ByteArrayList inputCompressedDataByteArray, long pos, @NotNull ILevelWrapper levelWrapper, EDhApiDataCompressionMode compressionModeEnum) throws IOException, InterruptedException, DataCorruptedException
{
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray, compressionModeEnum))
{
FullDataPointIdMap mapping = FullDataPointIdMap.deserialize(compressedIn, pos, levelWrapper);
return mapping;
@@ -577,7 +577,7 @@ public class FullDataSourceV2Repo extends AbstractDhRepo<Long, FullDataSourceV2D
{
ByteArrayList byteArrayList = new ByteArrayList();
putAllBytes(result.getBinaryStream("ColumnGenerationStep"), byteArrayList);
try(DhDataInputStream compressedIn = DhDataInputStream.create(byteArrayList.toByteArray(), compressionModeEnum))
try(DhDataInputStream compressedIn = DhDataInputStream.create(byteArrayList, compressionModeEnum))
{
putAllBytes(compressedIn, outputByteArray);
}
@@ -50,6 +50,16 @@ public class DhDataInputStream extends DataInputStream
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
//=============//
// constructor //
//=============//
public static DhDataInputStream create(ByteArrayList byteArrayList, EDhApiDataCompressionMode compressionMode) throws IOException
{
return create(byteArrayList.toByteArray(), compressionMode);
}
public static DhDataInputStream create(byte[] byteArray, EDhApiDataCompressionMode compressionMode) throws IOException
{
// Z_Std handling compression outside the stream provides a significant performance boost
@@ -103,6 +113,11 @@ public class DhDataInputStream extends DataInputStream
}
//================//
// base overrides //
//================//
@Override
public int read() throws IOException
{
@@ -132,10 +147,6 @@ public class DhDataInputStream extends DataInputStream
}
}
// TODO at one point closing the streams caused errors, is that due to a bug with LZMA streams or some bug in DH's code that was since fixed?
// if streams aren't closed that cause cause higher-than-expected native memory use if the GC decides
// it doesn't want to clear the stream objects
//@Override
//public void close() throws IOException { /* Do nothing. */ }
}
@@ -19,9 +19,11 @@
package com.seibel.distanthorizons.core.util.objects.dataStreams;
import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdOutputStream;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FrameOutputStream;
import net.jpountz.xxhash.XXHashFactory;
@@ -41,24 +43,40 @@ public class DhDataOutputStream extends DataOutputStream
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
private static final ThreadLocal<ResettableArrayCache> LZMA_RESETTABLE_ARRAY_CACHE_GETTER = ThreadLocal.withInitial(() -> new ResettableArrayCache(new LzmaArrayCache()));
private final ByteArrayList outputByteArray;
private final ByteArrayOutputStream wrappedByteStream;
private final EDhApiDataCompressionMode compressionMode;
public DhDataOutputStream(OutputStream stream, EDhApiDataCompressionMode compressionMode) throws IOException
//=============//
// constructor //
//=============//
/**
* @param outputByteArray where the contents of this stream will be written to when done
*/
public static DhDataOutputStream create(EDhApiDataCompressionMode compressionMode, ByteArrayList outputByteArray) throws IOException
{ return new DhDataOutputStream(new ByteArrayOutputStream(), compressionMode, outputByteArray); }
private DhDataOutputStream(ByteArrayOutputStream wrappedByteStream, EDhApiDataCompressionMode compressionMode, ByteArrayList outputByteArray) throws IOException
{
super(warpStream(new BufferedOutputStream(stream), compressionMode));
super(warpStream(wrappedByteStream, compressionMode));
this.wrappedByteStream = wrappedByteStream;
this.outputByteArray = outputByteArray;
this.compressionMode = compressionMode;
}
private static OutputStream warpStream(OutputStream stream, EDhApiDataCompressionMode compressionMode) throws IOException
private static OutputStream warpStream(ByteArrayOutputStream stream, EDhApiDataCompressionMode compressionMode) throws IOException
{
try
{
switch (compressionMode)
{
case Z_STD:
// Z_Std handling compression outside the stream provides a significant performance boost
case UNCOMPRESSED:
return stream;
case Z_STD:
//return new ZstdOutputStream(stream, 3, true, true);
return stream;
case LZ4:
return new LZ4FrameOutputStream(stream,
LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB, -1L,
@@ -91,10 +109,29 @@ public class DhDataOutputStream extends DataOutputStream
}
// TODO at one point closing the streams caused errors, is that due to a bug with LZMA streams or some bug in DH's code that was since fixed?
// if streams aren't closed that cause cause higher-than-expected native memory use if the GC decides
// it doesn't want to clear the stream objects
//@Override
//public void close() throws IOException { /* Do nothing. */ }
//================//
// base overrides //
//================//
@Override
public void close() throws IOException
{
super.close();
this.outputByteArray.clear();
if (this.compressionMode == EDhApiDataCompressionMode.Z_STD)
{
this.outputByteArray.addElements(0, Zstd.compress(this.wrappedByteStream.toByteArray(), 3));
}
else
{
this.outputByteArray.addElements(0, this.wrappedByteStream.toByteArray());
}
}
}
+3 -3
View File
@@ -63,8 +63,8 @@ public class VarintTest
private static void testSingleVarint(int value)
{
// write to stream
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (DhDataOutputStream outputStream = new DhDataOutputStream(byteArrayOutputStream, EDhApiDataCompressionMode.UNCOMPRESSED))
ByteArrayList byteArrayList = new ByteArrayList();
try (DhDataOutputStream outputStream = DhDataOutputStream.create(EDhApiDataCompressionMode.UNCOMPRESSED, byteArrayList))
{
int encodedValue = VarintUtil.zigzagEncode(value);
VarintUtil.writeVarint(outputStream, encodedValue); // varint requires zig-zag encoding to function
@@ -77,7 +77,7 @@ public class VarintTest
// read stream
try (DhDataInputStream inputStream = DhDataInputStream.create(byteArrayOutputStream.toByteArray(), EDhApiDataCompressionMode.UNCOMPRESSED))
try (DhDataInputStream inputStream = DhDataInputStream.create(byteArrayList, EDhApiDataCompressionMode.UNCOMPRESSED))
{
int encodedValue = VarintUtil.readVarint(inputStream);
int decodedValue = VarintUtil.zigzagDecode(encodedValue);