Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](status) Change the return type for block_compression #47566

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 92 additions & 38 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class Lz4BlockCompression : public BlockCompressionCodec {
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len);
return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len);
}
output->size = decompressed_len;
return Status::OK();
Expand Down Expand Up @@ -458,8 +458,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
&input_size, nullptr);
if (LZ4F_isError(lres)) {
decompress_failed = true;
return Status::InvalidArgument("Fail to do LZ4F decompress, res={}",
LZ4F_getErrorName(lres));
return Status::InternalError("Fail to do LZ4F decompress, res={}",
LZ4F_getErrorName(lres));
} else if (input_size != input.size) {
decompress_failed = true;
return Status::InvalidArgument(
Expand Down Expand Up @@ -635,7 +635,10 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len);
return Status::InvalidArgument(
"destination buffer is not large enough or the source stream is detected "
"malformed, fail to do LZ4 decompress, error={}",
decompressed_len);
}
output->size = decompressed_len;
return Status::OK();
Expand Down Expand Up @@ -854,8 +857,12 @@ class ZlibBlockCompression : public BlockCompressionCodec {
Slice s(*output);

auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres));
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(fmt::format(
"ZLib compression failed due to memory allocationerror.error = {}, res = {} ",
zError(zres), zres)));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres));
}
output->resize(s.size);
return Status::OK();
Expand All @@ -871,9 +878,12 @@ class ZlibBlockCompression : public BlockCompressionCodec {
zstrm.zfree = Z_NULL;
zstrm.opaque = Z_NULL;
auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}
// we assume that output is e
zstrm.next_out = (Bytef*)output->data();
Expand All @@ -888,16 +898,19 @@ class ZlibBlockCompression : public BlockCompressionCodec {

zres = deflate(&zstrm, flush);
if (zres != Z_OK && zres != Z_STREAM_END) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}
}

output->resize(zstrm.total_out);
zres = deflateEnd(&zstrm);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres),
zres);
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
}
return Status::OK();
}
Expand All @@ -906,8 +919,13 @@ class ZlibBlockCompression : public BlockCompressionCodec {
size_t input_size = input.size;
auto zres =
::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
if (zres != Z_OK) {
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres));
} else if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}",
zError(zres)));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here should be a cover branch of zres != Z_OK. and check other changes for this problem

return Status::OK();
}
Expand All @@ -932,8 +950,14 @@ class Bzip2BlockCompression : public BlockCompressionCodec {
uint32_t size = output->size();
auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data,
input.size, 9, 0, 0);
if (bzres != BZ_OK) {
return Status::InternalError("Fail to do Bzip2 compress, ret={}", bzres);
if (bzres == BZ_MEM_ERROR) {
throw Exception(
Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres));
} else if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres);
} else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
bzres != BZ_STREAM_END && bzres != BZ_OK) {
return Status::InternalError("Failed to init bz2. status code: {}", bzres);
}
output->resize(size);
return Status::OK();
Expand All @@ -947,7 +971,12 @@ class Bzip2BlockCompression : public BlockCompressionCodec {
bz_stream bzstrm;
bzero(&bzstrm, sizeof(bzstrm));
int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
if (bzres != BZ_OK) {
if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
} else if (bzres == BZ_MEM_ERROR) {
throw Exception(
Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres));
} else if (bzres != BZ_OK) {
return Status::InternalError("Failed to init bz2. status code: {}", bzres);
}
// we assume that output is e
Expand All @@ -962,15 +991,20 @@ class Bzip2BlockCompression : public BlockCompressionCodec {
int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;

bzres = BZ2_bzCompress(&bzstrm, flush);
if (bzres != BZ_OK && bzres != BZ_STREAM_END) {
return Status::InternalError("Fail to do bzip2 stream compress, res={}", bzres);
if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
} else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
bzres != BZ_STREAM_END && bzres != BZ_OK) {
return Status::InternalError("Failed to init bz2. status code: {}", bzres);
}
}

size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32;
output->resize(total_out);
bzres = BZ2_bzCompressEnd(&bzstrm);
if (bzres != BZ_OK) {
if (bzres == BZ_PARAM_ERROR) {
return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
} else if (bzres != BZ_OK) {
return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
}
return Status::OK();
Expand Down Expand Up @@ -1102,14 +1136,14 @@ class ZstdBlockCompression : public BlockCompressionCodec {

if (ZSTD_isError(ret)) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
return Status::InternalError("ZSTD_compressStream2 error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}

// ret is ZSTD hint for needed output buffer size
if (ret > 0 && out_buf.pos == out_buf.size) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 output buffer full");
return Status::InternalError("ZSTD_compressStream2 output buffer full");
}

finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
Expand Down Expand Up @@ -1146,8 +1180,8 @@ class ZstdBlockCompression : public BlockCompressionCodec {
input.size);
if (ZSTD_isError(ret)) {
decompress_failed = true;
return Status::InvalidArgument("ZSTD_decompressDCtx error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
return Status::InternalError("ZSTD_decompressDCtx error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}

// set decompressed size for caller
Expand Down Expand Up @@ -1239,8 +1273,12 @@ class GzipBlockCompression : public ZlibBlockCompression {
int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
8, Z_DEFAULT_STRATEGY);

if (zres != Z_OK) {
return Status::InvalidArgument("Fail to init zlib compress");
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to init ZLib compress, error={}, res={}", zError(zres), zres));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to init ZLib compress, error={}, res={}",
zError(zres), zres);
}

z_strm.next_in = (Bytef*)input.get_data();
Expand All @@ -1250,14 +1288,16 @@ class GzipBlockCompression : public ZlibBlockCompression {

zres = deflate(&z_strm, Z_FINISH);
if (zres != Z_OK && zres != Z_STREAM_END) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}

output->resize(z_strm.total_out);
zres = deflateEnd(&z_strm);
if (zres != Z_OK) {
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to end zlib compress");
} else if (zres != Z_OK) {
return Status::InternalError("Fail to end zlib compress");
}
return Status::OK();
}
Expand All @@ -1273,10 +1313,14 @@ class GzipBlockCompression : public ZlibBlockCompression {
zstrm.opaque = Z_NULL;
auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
8, Z_DEFAULT_STRATEGY);
if (zres != Z_OK) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
if (zres == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres));
} else if (zres != Z_OK) {
return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}",
zError(zres), zres);
}

// we assume that output is e
zstrm.next_out = (Bytef*)output->data();
zstrm.avail_out = output->size();
Expand All @@ -1290,16 +1334,19 @@ class GzipBlockCompression : public ZlibBlockCompression {

zres = deflate(&zstrm, flush);
if (zres != Z_OK && zres != Z_STREAM_END) {
return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres);
}
}

output->resize(zstrm.total_out);
zres = deflateEnd(&zstrm);
if (zres != Z_OK) {
if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
} else if (zres != Z_OK) {
return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
zError(zres), zres);
}
return Status::OK();
}
Expand All @@ -1312,7 +1359,7 @@ class GzipBlockCompression : public ZlibBlockCompression {

int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
if (ret != Z_OK) {
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
return Status::InternalError("Fail to init ZLib decompress, error={}, res={}",
zError(ret), ret);
}

Expand All @@ -1327,6 +1374,13 @@ class GzipBlockCompression : public ZlibBlockCompression {
ret = inflate(&z_strm, Z_FINISH);
if (ret != Z_OK && ret != Z_STREAM_END) {
(void)inflateEnd(&z_strm);
if (ret == Z_MEM_ERROR) {
throw Exception(Status::MemoryLimitExceeded(
"Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret));
} else if (ret == Z_DATA_ERROR) {
return Status::InvalidArgument(
"Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret);
}
return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
zError(ret), ret);
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/functions/function_compress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class FunctionCompress : public IFunction {
}

// Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK
auto st = compression_codec->compress(data, &compressed_str);
auto st = RETURN_IF_ERROR(compression_codec->compress(data, &compressed_str));
col_data.resize(col_data.size() + 4 + compressed_str.size());

std::memcpy(col_data.data() + idx, &length, sizeof(length));
Expand Down Expand Up @@ -184,7 +184,8 @@ class FunctionUncompress : public IFunction {
uncompressed_slice = Slice(col_data.data() + idx, length.value);

Slice compressed_data(data.data + 4, data.size - 4);
auto st = compression_codec->decompress(compressed_data, &uncompressed_slice);
auto st = RETURN_IF_ERROR(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to modify this

compression_codec->decompress(compressed_data, &uncompressed_slice));

if (!st.ok()) { // is not a legal compressed string
col_data.resize(col_data.size() - length.value); // remove compressed_data
Expand Down