From 820e2283068e3b933b4fbe74c4d939310614ff07 Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Thu, 6 Feb 2025 23:42:29 +0800 Subject: [PATCH 1/4] start --- be/src/util/block_compression.cpp | 124 +++++++++++++++++++++--------- 1 file changed, 86 insertions(+), 38 deletions(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 7a0aacd4252dec..2ca8f571862d6d 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -187,7 +187,7 @@ class Lz4BlockCompression : public BlockCompressionCodec { auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { - return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len); + return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len); } output->size = decompressed_len; return Status::OK(); @@ -458,8 +458,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec { &input_size, nullptr); if (LZ4F_isError(lres)) { decompress_failed = true; - return Status::InvalidArgument("Fail to do LZ4F decompress, res={}", - LZ4F_getErrorName(lres)); + return Status::InternalError("Fail to do LZ4F decompress, res={}", + LZ4F_getErrorName(lres)); } else if (input_size != input.size) { decompress_failed = true; return Status::InvalidArgument( @@ -635,7 +635,10 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { - return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len); + return Status::InvalidArgument( + "destination buffer is not large enough or the source stream is detected " + "malformed, fail to do LZ4 decompress, error={}", + decompressed_len); } output->size = decompressed_len; return Status::OK(); @@ -854,8 +857,12 @@ class ZlibBlockCompression : public BlockCompressionCodec { Slice s(*output); auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres)); + if (zres == Z_MEM_ERROR) { + return Status::MemoryLimitExceeded(fmt::format( + "ZLib compression failed due to memory allocationerror.error = {}, res = {} ", + zError(zres), zres)); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres)); } output->resize(s.size); return Status::OK(); @@ -871,9 +878,12 @@ class ZlibBlockCompression : public BlockCompressionCodec { zstrm.zfree = Z_NULL; zstrm.opaque = Z_NULL; auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + if (zres == Z_MEM_ERROR) { + return Status::MemoryLimitExceeded("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); + } else if (zres != Z_OK) { + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } // we assume that output is e zstrm.next_out = (Bytef*)output->data(); @@ -888,16 +898,19 @@ class ZlibBlockCompression : public BlockCompressionCodec { zres = deflate(&zstrm, flush); if (zres != Z_OK && zres != Z_STREAM_END) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } } output->resize(zstrm.total_out); zres = deflateEnd(&zstrm); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", - zError(zres), zres); + if (zres == Z_DATA_ERROR) { + return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres), + zres); + } else if (zres == Z_STREAM_ERROR) { + return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", + zError(zres), zres); } return Status::OK(); } @@ -906,8 +919,13 @@ class ZlibBlockCompression : public BlockCompressionCodec { size_t input_size = input.size; auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); - if (zres != Z_OK) { + if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); + } else if (zres == Z_MEM_ERROR) { + return Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", + zError(zres)); + } else if (zres == Z_BUF_ERROR) { + return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); } return Status::OK(); } @@ -932,7 +950,11 @@ class Bzip2BlockCompression : public BlockCompressionCodec { uint32_t size = output->size(); auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, input.size, 9, 0, 0); - if (bzres != BZ_OK) { + if (bzres == BZ_MEM_ERROR) { + return Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres); + } else if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); + } else if (bzres == BZ_OUTBUFF_FULL || bzres == BZ_SEQUENCE_ERROR) { return Status::InternalError("Fail to do Bzip2 compress, ret={}", bzres); } output->resize(size); @@ -947,8 +969,12 @@ class Bzip2BlockCompression : public BlockCompressionCodec { bz_stream bzstrm; bzero(&bzstrm, sizeof(bzstrm)); int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); - if (bzres != BZ_OK) { + if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); + } else if (bzres == BZ_CONFIG_ERROR) { return Status::InternalError("Failed to init bz2. status code: {}", bzres); + } else if (bzres == BZ_MEM_ERROR) { + return Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres); } // we assume that output is e bzstrm.next_out = (char*)output->data(); @@ -962,16 +988,18 @@ class Bzip2BlockCompression : public BlockCompressionCodec { int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN; bzres = BZ2_bzCompress(&bzstrm, flush); - if (bzres != BZ_OK && bzres != BZ_STREAM_END) { - return Status::InternalError("Fail to do bzip2 stream compress, res={}", bzres); + if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); + } else if (bzres == BZ_CONFIG_ERROR || bzres == BZ_SEQUENCE_ERROR) { + return Status::InternalError("Failed to init bz2. status code: {}", bzres); } } size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32; output->resize(total_out); bzres = BZ2_bzCompressEnd(&bzstrm); - if (bzres != BZ_OK) { - return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); + if (bzres == BZ_PARAM_ERROR) { + return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres); } return Status::OK(); } @@ -1102,14 +1130,14 @@ class ZstdBlockCompression : public BlockCompressionCodec { if (ZSTD_isError(ret)) { compress_failed = true; - return Status::InvalidArgument("ZSTD_compressStream2 error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + return Status::InternalError("ZSTD_compressStream2 error: {}", + ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // ret is ZSTD hint for needed output buffer size if (ret > 0 && out_buf.pos == out_buf.size) { compress_failed = true; - return Status::InvalidArgument("ZSTD_compressStream2 output buffer full"); + return Status::InternalError("ZSTD_compressStream2 output buffer full"); } finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); @@ -1146,8 +1174,8 @@ class ZstdBlockCompression : public BlockCompressionCodec { input.size); if (ZSTD_isError(ret)) { decompress_failed = true; - return Status::InvalidArgument("ZSTD_decompressDCtx error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + return Status::InternalError("ZSTD_decompressDCtx error: {}", + ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // set decompressed size for caller @@ -1239,8 +1267,12 @@ class GzipBlockCompression : public ZlibBlockCompression { int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, 8, Z_DEFAULT_STRATEGY); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to init zlib compress"); + if (zres == Z_MEM_ERROR) { + return Status::MemoryLimitExceeded("Fail to init ZLib compress, error={}, res={}", + zError(zres), zres); + } else if (zres == Z_STREAM_ERROR) { + return Status::InternalError("Fail to init ZLib compress, error={}, res={}", + zError(zres), zres); } z_strm.next_in = (Bytef*)input.get_data(); @@ -1250,14 +1282,16 @@ class GzipBlockCompression : public ZlibBlockCompression { zres = deflate(&z_strm, Z_FINISH); if (zres != Z_OK && zres != Z_STREAM_END) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } output->resize(z_strm.total_out); zres = deflateEnd(&z_strm); - if (zres != Z_OK) { + if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to end zlib compress"); + } else if (zres == Z_STREAM_ERROR) { + return Status::InternalError("Fail to end zlib compress"); } return Status::OK(); } @@ -1273,10 +1307,14 @@ class GzipBlockCompression : public ZlibBlockCompression { zstrm.opaque = Z_NULL; auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, 8, Z_DEFAULT_STRATEGY); - if (zres != Z_OK) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + if (zres == Z_MEM_ERROR) { + return Status::MemoryLimitExceeded( + "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres); + } else if (zres == Z_STREAM_ERROR) { + return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", + zError(zres), zres); } + // we assume that output is e zstrm.next_out = (Bytef*)output->data(); zstrm.avail_out = output->size(); @@ -1290,16 +1328,19 @@ class GzipBlockCompression : public ZlibBlockCompression { zres = deflate(&zstrm, flush); if (zres != Z_OK && zres != Z_STREAM_END) { - return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + zError(zres), zres); } } output->resize(zstrm.total_out); zres = deflateEnd(&zstrm); - if (zres != Z_OK) { + if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", zError(zres), zres); + } else if (zres == Z_STREAM_ERROR) { + return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", + zError(zres), zres); } return Status::OK(); } @@ -1312,7 +1353,7 @@ class GzipBlockCompression : public ZlibBlockCompression { int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); if (ret != Z_OK) { - return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", + return Status::InternalError("Fail to init ZLib decompress, error={}, res={}", zError(ret), ret); } @@ -1327,6 +1368,13 @@ class GzipBlockCompression : public ZlibBlockCompression { ret = inflate(&z_strm, Z_FINISH); if (ret != Z_OK && ret != Z_STREAM_END) { (void)inflateEnd(&z_strm); + if (ret == Z_MEM_ERROR) { + return Status::MemoryLimitExceeded( + "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); + } else if (ret == Z_DATA_ERROR) { + return Status::InvalidArgument( + "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); + } return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); } From 20824637a9173d75da600915b573bb2145914da9 Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Fri, 7 Feb 2025 16:05:43 +0800 Subject: [PATCH 2/4] cover all situation --- be/src/util/block_compression.cpp | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 2ca8f571862d6d..6f8dd32307b5c6 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -908,7 +908,7 @@ class ZlibBlockCompression : public BlockCompressionCodec { if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres), zres); - } else if (zres == Z_STREAM_ERROR) { + } else if (zres != Z_OK) { return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", zError(zres), zres); } @@ -924,7 +924,7 @@ class ZlibBlockCompression : public BlockCompressionCodec { } else if (zres == Z_MEM_ERROR) { return Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", zError(zres)); - } else if (zres == Z_BUF_ERROR) { + } else if (zres != Z_OK) { return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); } return Status::OK(); @@ -954,8 +954,9 @@ class Bzip2BlockCompression : public BlockCompressionCodec { return Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres); } else if (bzres == BZ_PARAM_ERROR) { return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); - } else if (bzres == BZ_OUTBUFF_FULL || bzres == BZ_SEQUENCE_ERROR) { - return Status::InternalError("Fail to do Bzip2 compress, ret={}", bzres); + } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && + bzres != BZ_STREAM_END && bzres != BZ_OK) { + return Status::InternalError("Failed to init bz2. status code: {}", bzres); } output->resize(size); return Status::OK(); @@ -971,10 +972,10 @@ class Bzip2BlockCompression : public BlockCompressionCodec { int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); if (bzres == BZ_PARAM_ERROR) { return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); - } else if (bzres == BZ_CONFIG_ERROR) { - return Status::InternalError("Failed to init bz2. status code: {}", bzres); } else if (bzres == BZ_MEM_ERROR) { return Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres); + } else if (bzres != BZ_OK) { + return Status::InternalError("Failed to init bz2. status code: {}", bzres); } // we assume that output is e bzstrm.next_out = (char*)output->data(); @@ -990,7 +991,8 @@ class Bzip2BlockCompression : public BlockCompressionCodec { bzres = BZ2_bzCompress(&bzstrm, flush); if (bzres == BZ_PARAM_ERROR) { return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); - } else if (bzres == BZ_CONFIG_ERROR || bzres == BZ_SEQUENCE_ERROR) { + } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && + bzres != BZ_STREAM_END && bzres != BZ_OK) { return Status::InternalError("Failed to init bz2. status code: {}", bzres); } } @@ -1000,6 +1002,8 @@ class Bzip2BlockCompression : public BlockCompressionCodec { bzres = BZ2_bzCompressEnd(&bzstrm); if (bzres == BZ_PARAM_ERROR) { return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres); + } else if (bzres != BZ_OK) { + return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); } return Status::OK(); } @@ -1270,7 +1274,7 @@ class GzipBlockCompression : public ZlibBlockCompression { if (zres == Z_MEM_ERROR) { return Status::MemoryLimitExceeded("Fail to init ZLib compress, error={}, res={}", zError(zres), zres); - } else if (zres == Z_STREAM_ERROR) { + } else if (zres != Z_OK) { return Status::InternalError("Fail to init ZLib compress, error={}, res={}", zError(zres), zres); } @@ -1290,7 +1294,7 @@ class GzipBlockCompression : public ZlibBlockCompression { zres = deflateEnd(&z_strm); if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to end zlib compress"); - } else if (zres == Z_STREAM_ERROR) { + } else if (zres != Z_OK) { return Status::InternalError("Fail to end zlib compress"); } return Status::OK(); @@ -1310,7 +1314,7 @@ class GzipBlockCompression : public ZlibBlockCompression { if (zres == Z_MEM_ERROR) { return Status::MemoryLimitExceeded( "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres); - } else if (zres == Z_STREAM_ERROR) { + } else if (zres != Z_OK) { return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres); } @@ -1338,7 +1342,7 @@ class GzipBlockCompression : public ZlibBlockCompression { if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", zError(zres), zres); - } else if (zres == Z_STREAM_ERROR) { + } else if (zres != Z_OK) { return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", zError(zres), zres); } From cb73b4e80e339d37c0c5cbd9d22beb1102278ce1 Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Fri, 7 Feb 2025 20:18:50 +0800 Subject: [PATCH 3/4] throw memoryLimit --- be/src/util/block_compression.cpp | 30 ++++++++++++---------- be/src/vec/functions/function_compress.cpp | 5 ++-- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 6f8dd32307b5c6..fcb2f3fae08033 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -858,9 +858,9 @@ class ZlibBlockCompression : public BlockCompressionCodec { auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); if (zres == Z_MEM_ERROR) { - return Status::MemoryLimitExceeded(fmt::format( + throw Exception(Status::MemoryLimitExceeded(fmt::format( "ZLib compression failed due to memory allocationerror.error = {}, res = {} ", - zError(zres), zres)); + zError(zres), zres))); } else if (zres != Z_OK) { return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres)); } @@ -879,8 +879,8 @@ class ZlibBlockCompression : public BlockCompressionCodec { zstrm.opaque = Z_NULL; auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); if (zres == Z_MEM_ERROR) { - return Status::MemoryLimitExceeded("Fail to do ZLib stream compress, error={}, res={}", - zError(zres), zres); + throw Exception(Status::MemoryLimitExceeded( + "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres)); } else if (zres != Z_OK) { return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres); @@ -922,8 +922,8 @@ class ZlibBlockCompression : public BlockCompressionCodec { if (zres == Z_DATA_ERROR) { return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); } else if (zres == Z_MEM_ERROR) { - return Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", - zError(zres)); + throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", + zError(zres))); } else if (zres != Z_OK) { return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); } @@ -951,7 +951,8 @@ class Bzip2BlockCompression : public BlockCompressionCodec { auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, input.size, 9, 0, 0); if (bzres == BZ_MEM_ERROR) { - return Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres); + throw Exception( + Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres)); } else if (bzres == BZ_PARAM_ERROR) { return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && @@ -973,7 +974,8 @@ class Bzip2BlockCompression : public BlockCompressionCodec { if (bzres == BZ_PARAM_ERROR) { return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); } else if (bzres == BZ_MEM_ERROR) { - return Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres); + throw Exception( + Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres)); } else if (bzres != BZ_OK) { return Status::InternalError("Failed to init bz2. status code: {}", bzres); } @@ -1272,8 +1274,8 @@ class GzipBlockCompression : public ZlibBlockCompression { 8, Z_DEFAULT_STRATEGY); if (zres == Z_MEM_ERROR) { - return Status::MemoryLimitExceeded("Fail to init ZLib compress, error={}, res={}", - zError(zres), zres); + throw Exception(Status::MemoryLimitExceeded( + "Fail to init ZLib compress, error={}, res={}", zError(zres), zres)); } else if (zres != Z_OK) { return Status::InternalError("Fail to init ZLib compress, error={}, res={}", zError(zres), zres); @@ -1312,8 +1314,8 @@ class GzipBlockCompression : public ZlibBlockCompression { auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, 8, Z_DEFAULT_STRATEGY); if (zres == Z_MEM_ERROR) { - return Status::MemoryLimitExceeded( - "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres); + throw Exception(Status::MemoryLimitExceeded( + "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres)); } else if (zres != Z_OK) { return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres); @@ -1373,8 +1375,8 @@ class GzipBlockCompression : public ZlibBlockCompression { if (ret != Z_OK && ret != Z_STREAM_END) { (void)inflateEnd(&z_strm); if (ret == Z_MEM_ERROR) { - return Status::MemoryLimitExceeded( - "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); + throw Exception(Status::MemoryLimitExceeded( + "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret)); } else if (ret == Z_DATA_ERROR) { return Status::InvalidArgument( "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); diff --git a/be/src/vec/functions/function_compress.cpp b/be/src/vec/functions/function_compress.cpp index 4c175a5fd44379..066e6c08f59634 100644 --- a/be/src/vec/functions/function_compress.cpp +++ b/be/src/vec/functions/function_compress.cpp @@ -102,7 +102,7 @@ class FunctionCompress : public IFunction { } // Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK - auto st = compression_codec->compress(data, &compressed_str); + auto st = RETURN_IF_ERROR(compression_codec->compress(data, &compressed_str)); col_data.resize(col_data.size() + 4 + compressed_str.size()); std::memcpy(col_data.data() + idx, &length, sizeof(length)); @@ -184,7 +184,8 @@ class FunctionUncompress : public IFunction { uncompressed_slice = Slice(col_data.data() + idx, length.value); Slice compressed_data(data.data + 4, data.size - 4); - auto st = compression_codec->decompress(compressed_data, &uncompressed_slice); + auto st = RETURN_IF_ERROR( + compression_codec->decompress(compressed_data, &uncompressed_slice)); if (!st.ok()) { // is not a legal compressed string col_data.resize(col_data.size() - length.value); // remove compressed_data From 3cd851ba57144518369395453a3ac1ea90f4b016 Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Fri, 7 Feb 2025 20:50:16 +0800 Subject: [PATCH 4/4] update --- be/src/vec/functions/function_compress.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/be/src/vec/functions/function_compress.cpp b/be/src/vec/functions/function_compress.cpp index 066e6c08f59634..b645e944bfecc8 100644 --- a/be/src/vec/functions/function_compress.cpp +++ b/be/src/vec/functions/function_compress.cpp @@ -102,7 +102,7 @@ class FunctionCompress : public IFunction { } // Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK - auto st = RETURN_IF_ERROR(compression_codec->compress(data, &compressed_str)); + RETURN_IF_ERROR(compression_codec->compress(data, &compressed_str)); col_data.resize(col_data.size() + 4 + compressed_str.size()); std::memcpy(col_data.data() + idx, &length, sizeof(length)); @@ -184,8 +184,7 @@ class FunctionUncompress : public IFunction { uncompressed_slice = Slice(col_data.data() + idx, length.value); Slice compressed_data(data.data + 4, data.size - 4); - auto st = RETURN_IF_ERROR( - compression_codec->decompress(compressed_data, &uncompressed_slice)); + auto st = compression_codec->decompress(compressed_data, &uncompressed_slice); if (!st.ok()) { // is not a legal compressed string col_data.resize(col_data.size() - length.value); // remove compressed_data