clean up DhDataInputStream

This commit is contained in:
James Seibel
2025-11-24 13:51:48 -06:00
parent ed7511ff6a
commit 3349e5b898
5 changed files with 39 additions and 68 deletions
@@ -69,8 +69,7 @@ public class FullDataSourceV1DTO implements IBaseDTO<Long>
/** @return a stream for the data contained in this DTO. */
public DhDataInputStream getInputStream() throws IOException
{
InputStream inputStream = new ByteArrayInputStream(this.dataArray);
DhDataInputStream compressedStream = new DhDataInputStream(inputStream, EDhApiDataCompressionMode.LZ4); // LZ4 was used by DH before 2.1.0 and as such must be used until the render data format is changed to record the compressor
DhDataInputStream compressedStream = DhDataInputStream.create(this.dataArray, EDhApiDataCompressionMode.LZ4); // LZ4 was used by DH before 2.1.0 and as such must be used until the render data format is changed to record the compressor
return compressedStream;
}
@@ -354,17 +354,7 @@ public class FullDataSourceV2DTO
ByteArrayList inputCompressedDataByteArray, LongArrayList[] outputDataLongArray,
EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
{
// read the data
int dataArrayLength = FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH;
@@ -569,17 +559,8 @@ public class FullDataSourceV2DTO
maxZ = FullDataSourceV2.WIDTH-1;
}
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
{
// 1. column counts, preallocate
for (int x = minX; x < maxX; x++)
@@ -725,17 +706,7 @@ public class FullDataSourceV2DTO
}
private static void readBlobToGenerationSteps(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
try(DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
{
compressedIn.readFully(outputByteArray.elements(), 0, FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH);
}
@@ -768,17 +739,7 @@ public class FullDataSourceV2DTO
}
private static void readBlobToWorldCompressionMode(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
try(DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
{
compressedIn.readFully(outputByteArray.elements(), 0, FullDataSourceV2.WIDTH * FullDataSourceV2.WIDTH);
}
@@ -808,17 +769,7 @@ public class FullDataSourceV2DTO
}
private static FullDataPointIdMap readBlobToDataMapping(ByteArrayList inputCompressedDataByteArray, long pos, @NotNull ILevelWrapper levelWrapper, EDhApiDataCompressionMode compressionModeEnum) throws IOException, InterruptedException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
try (DhDataInputStream compressedIn = DhDataInputStream.create(inputCompressedDataByteArray.toByteArray(), compressionModeEnum))
{
FullDataPointIdMap mapping = FullDataPointIdMap.deserialize(compressedIn, pos, levelWrapper);
return mapping;
@@ -572,9 +572,15 @@ public class FullDataSourceV2Repo extends AbstractDhRepo<Long, FullDataSourceV2D
EDhApiDataCompressionMode compressionModeEnum = EDhApiDataCompressionMode.getFromValue(compressionModeEnumValue);
// decompress the data
try(DhDataInputStream compressedIn = new DhDataInputStream(result.getBinaryStream("ColumnGenerationStep"), compressionModeEnum))
try
{
putAllBytes(compressedIn, outputByteArray);
ByteArrayList byteArrayList = new ByteArrayList();
putAllBytes(result.getBinaryStream("ColumnGenerationStep"), byteArrayList);
try(DhDataInputStream compressedIn = DhDataInputStream.create(byteArrayList.toByteArray(), compressionModeEnum))
{
putAllBytes(compressedIn, outputByteArray);
}
}
catch (IOException e)
{
@@ -20,9 +20,11 @@
package com.seibel.distanthorizons.core.util.objects.dataStreams;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStream;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import net.jpountz.lz4.LZ4FrameInputStream;
import org.apache.logging.log4j.LogManager;
import com.seibel.distanthorizons.core.logging.DhLogger;
@@ -48,23 +50,38 @@ public class DhDataInputStream extends DataInputStream
private static final DhLogger LOGGER = new DhLoggerBuilder().build();
public DhDataInputStream(InputStream stream, EDhApiDataCompressionMode compressionMode) throws IOException
{
super(warpStream(new BufferedInputStream(stream), compressionMode));
public static DhDataInputStream create(byte[] byteArray, EDhApiDataCompressionMode compressionMode) throws IOException
{
// Z_Std handling compression outside the stream provides a significant performance boost
ByteArrayInputStream byteArrayInputStream;
if (compressionMode == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(byteArray));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(byteArray);
}
return new DhDataInputStream(byteArrayInputStream, compressionMode);
}
private static InputStream warpStream(InputStream stream, EDhApiDataCompressionMode compressionMode) throws IOException
private DhDataInputStream(ByteArrayInputStream stream, EDhApiDataCompressionMode compressionMode) throws IOException
{
super(warpStream(stream, compressionMode));
}
private static InputStream warpStream(ByteArrayInputStream stream, EDhApiDataCompressionMode compressionMode) throws IOException
{
try
{
switch (compressionMode)
{
case Z_STD:
// ZStd compression should be handled before this point
// just return the stream
case UNCOMPRESSED:
return stream;
case LZ4:
return new LZ4FrameInputStream(stream);
case Z_STD:
//return new ZstdInputStream(stream, RecyclingBufferPool.INSTANCE);
return stream;
case LZMA2:
// using an array cache significantly reduces GC pressure
ResettableArrayCache arrayCache = LZMA_RESETTABLE_ARRAY_CACHE_GETTER.get();
+1 -3
View File
@@ -77,9 +77,7 @@ public class VarintTest
// read stream
byte[] byteArray = byteArrayOutputStream.toByteArray();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray);
try (DhDataInputStream inputStream = new DhDataInputStream(byteArrayInputStream, EDhApiDataCompressionMode.UNCOMPRESSED))
try (DhDataInputStream inputStream = DhDataInputStream.create(byteArrayOutputStream.toByteArray(), EDhApiDataCompressionMode.UNCOMPRESSED))
{
int encodedValue = VarintUtil.readVarint(inputStream);
int decodedValue = VarintUtil.zigzagDecode(encodedValue);