-
Notifications
You must be signed in to change notification settings - Fork 3
Handle lz4 compression #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #30 +/- ##
=======================================
Coverage ? 77.52%
=======================================
Files ? 32
Lines ? 1477
Branches ? 0
=======================================
Hits ? 1145
Misses ? 332
Partials ? 0
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There are still a few things missing and some room for improvement, but I suggest merging this ASAP to avoid further conflicts (I just resolved the ones after merging #29 and had to rework the compression in the serialization part. For now, this PR is just to get something working). |
CAn you add tests which only test the compression/decompression of a buffer ? |
* @param compression Optional: The compression type to use for record batch bodies. | ||
*/ | ||
chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream); | ||
chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression = std::nullopt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a sparrow-ipc enum to keep public signatures free from flatbuffers
auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); | ||
|
||
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length()); | ||
|
||
auto offset_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); | ||
auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that trying to not have a branch for the compression and another when there is no compression leads code complexity and a lack of visibility.
IMO, we should have two function. One which handle the compressed data and another when there is no compression.
Thanks to this separation, you will not have get_and_decompress_buffer with this strange behavior.
get_and_decompress_buffer should be transformed into a get_uncompressed_data which returns a std::vector<uint_8>, and the caller move the result buffers to a vector of buffer.
And you keep the original code for uncompressed data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(same for the deserialize_primitive_array.hpp btw)
if (compression.has_value()) | ||
{ | ||
// If compressed, the body size is the sum of compressed buffer sizes + original size prefixes + padding | ||
auto [compressed_body, compressed_buffers] = generate_compressed_body_and_buffers(record_batch, compression.value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to compress the data to do the calculation of the size of the message.
I saw that LZ4F_compressFrameBound can give the maximum size of the compressed buffer. It think this should be used instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, you want the exact size, not the maximum size (which is going to be some trivial calculation such as uncompressed size + K
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is used for the memory reservation, not for the message header.
Can you know the compressed size without compressing the data first ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, you can't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will have an issue with the fill_buffers function in flatbuffer_utils.cpp
In this function we create the flatbuffer::Buffer which are the offset and size of each buffer in the body.
As the sizes of the buffers are unknow before the data is compressed, you can't create the record_batch message.
It means that we have to compress the buffers before to create the message and keep the compressed buffers in memory.
Once all the buffers are compressed, we can finally create and send the record_batch message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW a test where we try to deserialize ou serialized with compression is missing. It should not work because of what I said in the previous message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking if we should split the code execution in two different branches for compressed vs uncompressed buffers when we create record batch messages.
Trying to keep the same code path seems to lead to code complexity without so much benefit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking if we should split the code execution in two different branches for compressed vs uncompressed buffers when we create record batch messages.
No, this is really a bad idea because you don't want to write buffers as compressed when they are not compressible (see my other comments about this).
|
||
#include <sparrow/record_batch.hpp> | ||
|
||
#include "Message_generated.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, in Arrow C++ we ensure that flatbuffers headers (and any other dependency) are not exposed through public Arrow headers.
memory_output_stream stream(buffer); | ||
any_output_stream astream(stream); | ||
serialize_record_batch(rb, astream); | ||
serialize_record_batch(rb, astream, m_compression); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: this concatenates all output buffers as a single chunk even though we have chunked_memory_output_stream
which would avoid such copies. It is a bit of a waste.
if (data.empty()) | ||
{ | ||
return {}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this should never happen according to the Flatbuffers spec. Did you encounter this situation somewhere?
if (compression.has_value()) | ||
{ | ||
// If compressed, the body size is the sum of compressed buffer sizes + original size prefixes + padding | ||
auto [compressed_body, compressed_buffers] = generate_compressed_body_and_buffers(record_batch, compression.value()); | ||
actual_body_size = compressed_body.size(); | ||
} | ||
else | ||
{ | ||
// If not compressed, the body size is the sum of uncompressed buffer sizes with padding | ||
actual_body_size = static_cast<std::size_t>(calculate_body_size(record_batch)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same method should be able to handle both the compressed and uncompressed case, especially as you want to transmit non-compressible data uncompressed anyway.
src/serialize_utils.cpp
Outdated
// Write original size (8 bytes) followed by compressed data | ||
compressed_body.insert(compressed_body.end(), reinterpret_cast<const uint8_t*>(&original_size), reinterpret_cast<const uint8_t*>(&original_size) + sizeof(int64_t)); | ||
compressed_body.insert(compressed_body.end(), compressed_buffer_data.begin(), compressed_buffer_data.end()); | ||
|
||
// Add padding to the compressed data | ||
compressed_body.insert(compressed_body.end(), padding_needed, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to resize and copy the data multiple times. Why not reuse the chunked memory output stream instead?
Only handling
lz4
codec for now (zstd
will be handled in a next PR).