proof-of-concept block Zstd compression

This commit is contained in:
James Seibel
2025-11-24 12:40:49 -06:00
parent 8516e8f9ab
commit ed7511ff6a
3 changed files with 94 additions and 29 deletions
@@ -19,6 +19,7 @@
package com.seibel.distanthorizons.core.sql.dto;
import com.github.luben.zstd.Zstd;
import com.google.common.base.MoreObjects;
import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode;
import com.seibel.distanthorizons.api.enums.config.EDhApiWorldCompressionMode;
@@ -317,11 +318,7 @@ public class FullDataSourceV2DTO
LongArrayList[] inputDataArray, ByteArrayList outputByteArray,
EDhApiDataCompressionMode compressionModeEnum) throws IOException
{
// write the outputs to a stream to prep for writing to the database
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
// normally a DhStream should be the topmost stream to prevent closing the stream accidentally,
// but since this stream will be closed immediately after writing anyway, it won't be an issue
try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
{
// write the data
@@ -342,11 +339,14 @@ public class FullDataSourceV2DTO
compressedOut.writeLong(dataColumn.getLong(y));
}
}
// generate the checksum
compressedOut.flush();
byteArrayOutputStream.close();
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
@@ -354,7 +354,16 @@ public class FullDataSourceV2DTO
ByteArrayList inputCompressedDataByteArray, LongArrayList[] outputDataLongArray,
EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
// read the data
@@ -525,9 +534,14 @@ public class FullDataSourceV2DTO
}
}
}
compressedOut.flush();
byteArrayOutputStream.close();
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
@@ -555,7 +569,16 @@ public class FullDataSourceV2DTO
maxZ = FullDataSourceV2.WIDTH-1;
}
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
// 1. column counts, preallocate
@@ -689,15 +712,28 @@ public class FullDataSourceV2DTO
{
compressedOut.writeByte(inputColumnGenStepByteArray.getByte(i));
}
compressedOut.flush();
byteArrayOutputStream.close();
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static void readBlobToGenerationSteps(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
@@ -719,15 +755,28 @@ public class FullDataSourceV2DTO
{
compressedOut.write(inputWorldCompressionModeByteArray.getByte(i));
}
compressedOut.flush();
byteArrayOutputStream.close();
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static void readBlobToWorldCompressionMode(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
@@ -746,15 +795,29 @@ public class FullDataSourceV2DTO
try(DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum))
{
mapping.serialize(compressedOut);
compressedOut.flush();
byteArrayOutputStream.close();
}
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3));
}
else
{
outputByteArray.addElements(0, byteArrayOutputStream.toByteArray());
}
}
private static FullDataPointIdMap readBlobToDataMapping(ByteArrayList compressedMappingByteArray, long pos, @NotNull ILevelWrapper levelWrapper, EDhApiDataCompressionMode compressionModeEnum) throws IOException, InterruptedException, DataCorruptedException
private static FullDataPointIdMap readBlobToDataMapping(ByteArrayList inputCompressedDataByteArray, long pos, @NotNull ILevelWrapper levelWrapper, EDhApiDataCompressionMode compressionModeEnum) throws IOException, InterruptedException, DataCorruptedException
{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedMappingByteArray.elements());
ByteArrayInputStream byteArrayInputStream;
if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD)
{
byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray()));
}
else
{
byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements());
}
try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum))
{
FullDataPointIdMap mapping = FullDataPointIdMap.deserialize(compressedIn, pos, levelWrapper);
@@ -63,7 +63,8 @@ public class DhDataInputStream extends DataInputStream
case LZ4:
return new LZ4FrameInputStream(stream);
case Z_STD:
return new ZstdInputStream(stream, RecyclingBufferPool.INSTANCE);
//return new ZstdInputStream(stream, RecyclingBufferPool.INSTANCE);
return stream;
case LZMA2:
// using an array cache significantly reduces GC pressure
ResettableArrayCache arrayCache = LZMA_RESETTABLE_ARRAY_CACHE_GETTER.get();
@@ -57,7 +57,8 @@ public class DhDataOutputStream extends DataOutputStream
return stream;
case Z_STD:
return new ZstdOutputStream(stream, 3, true, true);
//return new ZstdOutputStream(stream, 3, true, true);
return stream;
case LZ4:
return new LZ4FrameOutputStream(stream,
LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB, -1L,