diff --git a/core/src/main/java/com/seibel/distanthorizons/core/sql/dto/FullDataSourceV2DTO.java b/core/src/main/java/com/seibel/distanthorizons/core/sql/dto/FullDataSourceV2DTO.java index 6b72ede48..10284c3a0 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/sql/dto/FullDataSourceV2DTO.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/sql/dto/FullDataSourceV2DTO.java @@ -19,6 +19,7 @@ package com.seibel.distanthorizons.core.sql.dto; +import com.github.luben.zstd.Zstd; import com.google.common.base.MoreObjects; import com.seibel.distanthorizons.api.enums.config.EDhApiDataCompressionMode; import com.seibel.distanthorizons.api.enums.config.EDhApiWorldCompressionMode; @@ -317,11 +318,7 @@ public class FullDataSourceV2DTO LongArrayList[] inputDataArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException { - // write the outputs to a stream to prep for writing to the database ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - - // normally a DhStream should be the topmost stream to prevent closing the stream accidentally, - // but since this stream will be closed immediately after writing anyway, it won't be an issue try (DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum)) { // write the data @@ -342,11 +339,14 @@ public class FullDataSourceV2DTO compressedOut.writeLong(dataColumn.getLong(y)); } } - - - // generate the checksum - compressedOut.flush(); - byteArrayOutputStream.close(); + } + + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3)); + } + else + { outputByteArray.addElements(0, byteArrayOutputStream.toByteArray()); } } @@ -354,7 +354,16 @@ public class FullDataSourceV2DTO ByteArrayList inputCompressedDataByteArray, LongArrayList[] outputDataLongArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException { - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + ByteArrayInputStream byteArrayInputStream; + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray())); + } + else + { + byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + } + try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum)) { // read the data @@ -525,9 +534,14 @@ public class FullDataSourceV2DTO } } } - - compressedOut.flush(); - byteArrayOutputStream.close(); + } + + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3)); + } + else + { outputByteArray.addElements(0, byteArrayOutputStream.toByteArray()); } } @@ -555,7 +569,16 @@ public class FullDataSourceV2DTO maxZ = FullDataSourceV2.WIDTH-1; } - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + ByteArrayInputStream byteArrayInputStream; + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray())); + } + else + { + byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + } + try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum)) { // 1. column counts, preallocate @@ -689,15 +712,28 @@ public class FullDataSourceV2DTO { compressedOut.writeByte(inputColumnGenStepByteArray.getByte(i)); } - - compressedOut.flush(); - byteArrayOutputStream.close(); + } + + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3)); + } + else + { outputByteArray.addElements(0, byteArrayOutputStream.toByteArray()); } } private static void readBlobToGenerationSteps(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException { - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + ByteArrayInputStream byteArrayInputStream; + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray())); + } + else + { + byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + } try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum)) { @@ -719,15 +755,28 @@ public class FullDataSourceV2DTO { compressedOut.write(inputWorldCompressionModeByteArray.getByte(i)); } - - compressedOut.flush(); - byteArrayOutputStream.close(); + } + + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3)); + } + else + { outputByteArray.addElements(0, byteArrayOutputStream.toByteArray()); } } private static void readBlobToWorldCompressionMode(ByteArrayList inputCompressedDataByteArray, ByteArrayList outputByteArray, EDhApiDataCompressionMode compressionModeEnum) throws IOException, DataCorruptedException { - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + ByteArrayInputStream byteArrayInputStream; + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray())); + } + else + { + byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + } try(DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum)) { @@ -746,15 +795,29 @@ public class FullDataSourceV2DTO try(DhDataOutputStream compressedOut = new DhDataOutputStream(byteArrayOutputStream, compressionModeEnum)) { mapping.serialize(compressedOut); - - compressedOut.flush(); - byteArrayOutputStream.close(); + } + + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + outputByteArray.addElements(0, Zstd.compress(byteArrayOutputStream.toByteArray(), 3)); + } + else + { outputByteArray.addElements(0, byteArrayOutputStream.toByteArray()); } } - private static FullDataPointIdMap readBlobToDataMapping(ByteArrayList compressedMappingByteArray, long pos, @NotNull ILevelWrapper levelWrapper, EDhApiDataCompressionMode compressionModeEnum) throws IOException, InterruptedException, DataCorruptedException + private static FullDataPointIdMap readBlobToDataMapping(ByteArrayList inputCompressedDataByteArray, long pos, @NotNull ILevelWrapper levelWrapper, EDhApiDataCompressionMode compressionModeEnum) throws IOException, InterruptedException, DataCorruptedException { - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedMappingByteArray.elements()); + ByteArrayInputStream byteArrayInputStream; + if (compressionModeEnum == EDhApiDataCompressionMode.Z_STD) + { + byteArrayInputStream = new ByteArrayInputStream(Zstd.decompress(inputCompressedDataByteArray.toByteArray())); + } + else + { + byteArrayInputStream = new ByteArrayInputStream(inputCompressedDataByteArray.elements()); + } + try (DhDataInputStream compressedIn = new DhDataInputStream(byteArrayInputStream, compressionModeEnum)) { FullDataPointIdMap mapping = FullDataPointIdMap.deserialize(compressedIn, pos, levelWrapper); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataInputStream.java b/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataInputStream.java index 50941d101..ed8fcd830 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataInputStream.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataInputStream.java @@ -63,7 +63,8 @@ public class DhDataInputStream extends DataInputStream case LZ4: return new LZ4FrameInputStream(stream); case Z_STD: - return new ZstdInputStream(stream, RecyclingBufferPool.INSTANCE); + //return new ZstdInputStream(stream, RecyclingBufferPool.INSTANCE); + return stream; case LZMA2: // using an array cache significantly reduces GC pressure ResettableArrayCache arrayCache = LZMA_RESETTABLE_ARRAY_CACHE_GETTER.get(); diff --git a/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataOutputStream.java b/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataOutputStream.java index ed8ae8140..f5a4749d1 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataOutputStream.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/util/objects/dataStreams/DhDataOutputStream.java @@ -57,7 +57,8 @@ public class DhDataOutputStream extends DataOutputStream return stream; case Z_STD: - return new ZstdOutputStream(stream, 3, true, true); + //return new ZstdOutputStream(stream, 3, true, true); + return stream; case LZ4: return new LZ4FrameOutputStream(stream, LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB, -1L,