From da24cfc96fff32b2827fe2803818701cb862686d Mon Sep 17 00:00:00 2001 From: Abhishek Bansal Date: Sun, 25 Jan 2026 01:08:08 +0530 Subject: [PATCH 1/5] GH-48846: [C++] Optimize ReadMessage to read metadata and body in one go --- cpp/src/arrow/ipc/message.cc | 50 +++++++++++++++++++ cpp/src/arrow/ipc/message.h | 20 ++++++++ cpp/src/arrow/ipc/read_write_test.cc | 72 ++++++++++++++++++++++------ cpp/src/arrow/ipc/reader.cc | 12 +++-- 4 files changed, 136 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 8be09956f10..bcf6243bf7c 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -421,6 +421,56 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le } } +Result> ReadMessage(const int64_t offset, + const int32_t metadata_length, + const int64_t body_length, + io::RandomAccessFile* file) { + std::unique_ptr result; + auto listener = std::make_shared(&result); + MessageDecoder decoder(listener); + + if (metadata_length < decoder.next_required_size()) { + return Status::Invalid("metadata_length should be at least ", + decoder.next_required_size()); + } + + ARROW_ASSIGN_OR_RAISE(auto metadata, + file->ReadAt(offset, metadata_length + body_length)); + if (metadata->size() < metadata_length) { + return Status::Invalid("Expected to read ", metadata_length, + " metadata bytes at offset ", offset, " but got ", + metadata->size()); + } + + ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length))); + + switch (decoder.state()) { + case MessageDecoder::State::INITIAL: + return result; + case MessageDecoder::State::METADATA_LENGTH: + return Status::Invalid("metadata length is missing. File offset: ", offset, + ", metadata length: ", metadata_length); + case MessageDecoder::State::METADATA: + return Status::Invalid("flatbuffer size ", decoder.next_required_size(), + " invalid. File offset: ", offset, + ", metadata length: ", metadata_length); + case MessageDecoder::State::BODY: { + auto body = SliceBuffer(metadata, metadata_length, body_length); + if (body->size() < decoder.next_required_size()) { + return Status::IOError("Expected to be able to read ", + decoder.next_required_size(), + " bytes for message body, got ", body->size()); + } + RETURN_NOT_OK(decoder.Consume(body)); + return result; + } + case MessageDecoder::State::EOS: + return Status::Invalid("Unexpected empty message in IPC file format"); + default: + return Status::Invalid("Unexpected state: ", decoder.state()); + } +} + Future> ReadMessageAsync(int64_t offset, int32_t metadata_length, int64_t body_length, io::RandomAccessFile* file, diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 1cd72ce993e..e36a8859b9e 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -469,6 +469,26 @@ Result> ReadMessage( const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {}); +/// \brief Read encapsulated RPC message from position in file +/// +/// Read a length-prefixed message flatbuffer starting at the indicated file +/// offset. +/// +/// The metadata_length includes at least the length prefix and the flatbuffer +/// +/// \param[in] offset the position in the file where the message starts. The +/// first 4 bytes after the offset are the message length +/// \param[in] metadata_length the total number of bytes to read from file +/// \param[in] body_length the number of bytes for the message body +/// \param[in] file the seekable file interface to read from +/// \return the message read + +ARROW_EXPORT +Result> ReadMessage(const int64_t offset, + const int32_t metadata_length, + const int64_t body_length, + io::RandomAccessFile* file); + /// \brief Read encapsulated RPC message from cached buffers /// /// The buffers should contain an entire message. Partial reads are not handled. diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 9f7df541bd7..e872abb41f8 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -552,9 +552,15 @@ class TestIpcRoundTrip : public ::testing::TestWithParam, ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, options_)); - ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + ASSERT_OK_AND_ASSIGN(std::unique_ptr message1, ReadMessage(0, metadata_length, mmap_.get())); - ASSERT_EQ(expected_version, message->metadata_version()); + ASSERT_EQ(expected_version, message1->metadata_version()); + + ASSERT_OK_AND_ASSIGN(auto message2, + ReadMessage(0, metadata_length, body_length, mmap_.get())); + ASSERT_EQ(expected_version, message2->metadata_version()); + + ASSERT_TRUE(message1->Equals(*message2)); } }; @@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) { ASSERT_EQ(nullptr, message); } +TEST(TestReadMessage, ReadBodyWithLength) { + // Test the optimized ReadMessage(offset, meta_len, body_len, file) overload + std::shared_ptr batch; + ASSERT_OK(MakeIntRecordBatch(&batch)); + + ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0)); + int32_t metadata_length; + int64_t body_length; + ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length, &body_length, + IpcWriteOptions::Defaults())); + + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish()); + io::BufferReader reader(buffer); + + ASSERT_OK_AND_ASSIGN(auto message, + ReadMessage(0, metadata_length, body_length, &reader)); + + ASSERT_EQ(body_length, message->body_length()); + ASSERT_TRUE(message->Verify()); +} + TEST(TestMetadata, GetMetadataVersion) { ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion( flatbuf::MetadataVersion::MetadataVersion_V1)); @@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) { &schema)); ASSERT_OK_AND_ASSIGN(std::unique_ptr message, - ReadMessage(0, metadata_length, mmap_.get())); + ReadMessage(0, metadata_length, body_length, mmap_.get())); io::BufferReader reader(message->body()); @@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) { &schema)); ASSERT_OK_AND_ASSIGN(std::unique_ptr message, - ReadMessage(0, metadata_length, mmap_.get())); + ReadMessage(0, metadata_length, body_length, mmap_.get())); DictionaryMemo empty_memo; @@ -3018,25 +3045,38 @@ void GetReadRecordBatchReadRanges( auto read_ranges = tracked->get_read_ranges(); - // there are 3 read IOs before reading body: - // 1) read magic and footer length IO - // 2) read footer IO - // 3) read record batch metadata IO - EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); const int32_t magic_size = static_cast(strlen(ipc::internal::kArrowMagicBytes)); // read magic and footer length IO auto file_end_size = magic_size + sizeof(int32_t); auto footer_length_offset = buffer->size() - file_end_size; auto footer_length = bit_util::FromLittleEndian( util::SafeLoadAs(buffer->data() + footer_length_offset)); + + // read magic and footer length IO EXPECT_EQ(read_ranges[0].length, file_end_size); // read footer IO EXPECT_EQ(read_ranges[1].length, footer_length); - // read record batch metadata. The exact size is tricky to determine but it doesn't - // matter for this test and it should be smaller than the footer. - EXPECT_LE(read_ranges[2].length, footer_length); - for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { - EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); + + // there are 3 read IOs before reading body: + // 1) read magic and footer length IO + // 2) read footer IO + // 3) read record batch metadata IO + if (included_fields.empty()) { + EXPECT_EQ(read_ranges.size(), 3); + + int64_t total_body = 0; + for (auto len : expected_body_read_lengths) total_body += len; + + EXPECT_GT(read_ranges[2].length, total_body); + } else { + EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); + + // read record batch metadata. The exact size is tricky to determine but it doesn't + // matter for this test and it should be smaller than the footer. + EXPECT_LE(read_ranges[2].length, footer_length); + for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { + EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); + } } } @@ -3186,7 +3226,9 @@ class PreBufferingTest : public ::testing::TestWithParam { metadata_reads++; } } - ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered); + // With ReadMessage optimization, non-prebuffered reads verify metadata and body + // in a single large read, so we no longer see small metadata-only reads here. + ASSERT_EQ(metadata_reads, 0); ASSERT_EQ(data_reads, reader_->num_record_batches()); } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index a47a6290723..908a223a57d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1236,9 +1236,15 @@ Result> ReadMessageFromBlock( const FileBlock& block, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { RETURN_NOT_OK(CheckAligned(block)); - ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, - file, fields_loader)); - return CheckBodyLength(std::move(message), block); + if (fields_loader) { + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, + file, fields_loader)); + return CheckBodyLength(std::move(message), block); + } else { + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, + block.body_length, file)); + return CheckBodyLength(std::move(message), block); + } } Future> ReadMessageFromBlockAsync( From a0e7d11627ce044d86b2c6d71f8c209b53b848b0 Mon Sep 17 00:00:00 2001 From: Abhishek Bansal Date: Tue, 3 Feb 2026 13:40:03 +0530 Subject: [PATCH 2/5] GH-48846: [C++][IPC] Fix heap-buffer-overflow in ReadMessage body slicing --- cpp/src/arrow/ipc/message.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index bcf6243bf7c..a69c246a6b4 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -455,7 +455,8 @@ Result> ReadMessage(const int64_t offset, " invalid. File offset: ", offset, ", metadata length: ", metadata_length); case MessageDecoder::State::BODY: { - auto body = SliceBuffer(metadata, metadata_length, body_length); + auto body = SliceBuffer(metadata, metadata_length, + std::min(body_length, metadata->size() - metadata_length)); if (body->size() < decoder.next_required_size()) { return Status::IOError("Expected to be able to read ", decoder.next_required_size(), From e1058fcd56a460e6c50a8dd24859d5188379a9bc Mon Sep 17 00:00:00 2001 From: Abhishek Bansal Date: Sat, 7 Feb 2026 19:29:22 +0530 Subject: [PATCH 3/5] chore: trigger CI From d78c3837f7cbc9f3675961bf9ea2f6f4d25c26db Mon Sep 17 00:00:00 2001 From: Abhishek Bansal Date: Mon, 16 Feb 2026 22:15:01 +0530 Subject: [PATCH 4/5] GH-48846: [C++][IPC] Refactor ReadMessage overloads with common helper and fix documentation --- cpp/src/arrow/ipc/message.cc | 89 +++++++++++----------------- cpp/src/arrow/ipc/message.h | 6 +- cpp/src/arrow/ipc/read_write_test.cc | 26 ++++++-- 3 files changed, 61 insertions(+), 60 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index a69c246a6b4..2acf2bb0bfc 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -363,9 +364,13 @@ Result> ReadMessage(std::shared_ptr metadata, } } -Result> ReadMessage(int64_t offset, int32_t metadata_length, - io::RandomAccessFile* file, - const FieldsLoaderFunction& fields_loader) { +// Common helper for the two ReadMessage overloads that take a file + offset. +// When body_length is provided, metadata and body are read in a single IO. +// When body_length is absent, metadata is read first, then the body is read +// separately. +static Result> ReadMessageInternal( + int64_t offset, int32_t metadata_length, std::optional body_length, + io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { std::unique_ptr result; auto listener = std::make_shared(&result); MessageDecoder decoder(listener); @@ -375,15 +380,22 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le decoder.next_required_size()); } - // TODO(GH-48846): we should take a body_length just like ReadMessageAsync - // and read metadata + body in one go. - ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length)); + // When body_length is known, read metadata + body in one IO call. + // Otherwise, read only metadata first. + std::shared_ptr metadata; + if (body_length.has_value()) { + ARROW_ASSIGN_OR_RAISE(metadata, file->ReadAt(offset, metadata_length + *body_length)); + } else { + ARROW_ASSIGN_OR_RAISE(metadata, file->ReadAt(offset, metadata_length)); + } + if (metadata->size() < metadata_length) { return Status::Invalid("Expected to read ", metadata_length, " metadata bytes at offset ", offset, " but got ", metadata->size()); } - ARROW_RETURN_NOT_OK(decoder.Consume(metadata)); + + ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length))); switch (decoder.state()) { case MessageDecoder::State::INITIAL: @@ -398,14 +410,23 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le case MessageDecoder::State::BODY: { std::shared_ptr body; if (fields_loader) { + // Selective field loading: allocate a body buffer and read only the + // requested field ranges into it. ARROW_ASSIGN_OR_RAISE( body, AllocateBuffer(decoder.next_required_size(), default_memory_pool())); RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader, - metadata, decoder.next_required_size(), body)); + SliceBuffer(metadata, 0, metadata_length), + decoder.next_required_size(), body)); + } else if (body_length.has_value()) { + // Body was already read as part of the combined IO; just slice it out. + body = SliceBuffer(metadata, metadata_length, + std::min(*body_length, metadata->size() - metadata_length)); } else { + // Body length was unknown; do a separate IO to read the body. ARROW_ASSIGN_OR_RAISE( body, file->ReadAt(offset + metadata_length, decoder.next_required_size())); } + if (body->size() < decoder.next_required_size()) { return Status::IOError("Expected to be able to read ", decoder.next_required_size(), @@ -421,55 +442,17 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le } } +Result> ReadMessage(int64_t offset, int32_t metadata_length, + io::RandomAccessFile* file, + const FieldsLoaderFunction& fields_loader) { + return ReadMessageInternal(offset, metadata_length, std::nullopt, file, fields_loader); +} + Result> ReadMessage(const int64_t offset, const int32_t metadata_length, const int64_t body_length, io::RandomAccessFile* file) { - std::unique_ptr result; - auto listener = std::make_shared(&result); - MessageDecoder decoder(listener); - - if (metadata_length < decoder.next_required_size()) { - return Status::Invalid("metadata_length should be at least ", - decoder.next_required_size()); - } - - ARROW_ASSIGN_OR_RAISE(auto metadata, - file->ReadAt(offset, metadata_length + body_length)); - if (metadata->size() < metadata_length) { - return Status::Invalid("Expected to read ", metadata_length, - " metadata bytes at offset ", offset, " but got ", - metadata->size()); - } - - ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length))); - - switch (decoder.state()) { - case MessageDecoder::State::INITIAL: - return result; - case MessageDecoder::State::METADATA_LENGTH: - return Status::Invalid("metadata length is missing. File offset: ", offset, - ", metadata length: ", metadata_length); - case MessageDecoder::State::METADATA: - return Status::Invalid("flatbuffer size ", decoder.next_required_size(), - " invalid. File offset: ", offset, - ", metadata length: ", metadata_length); - case MessageDecoder::State::BODY: { - auto body = SliceBuffer(metadata, metadata_length, - std::min(body_length, metadata->size() - metadata_length)); - if (body->size() < decoder.next_required_size()) { - return Status::IOError("Expected to be able to read ", - decoder.next_required_size(), - " bytes for message body, got ", body->size()); - } - RETURN_NOT_OK(decoder.Consume(body)); - return result; - } - case MessageDecoder::State::EOS: - return Status::Invalid("Unexpected empty message in IPC file format"); - default: - return Status::Invalid("Unexpected state: ", decoder.state()); - } + return ReadMessageInternal(offset, metadata_length, body_length, file, {}); } Future> ReadMessageAsync(int64_t offset, int32_t metadata_length, diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index e36a8859b9e..df80b0eba25 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -449,7 +449,7 @@ class ARROW_EXPORT MessageReader { // org::apache::arrow::flatbuf::RecordBatch*) using FieldsLoaderFunction = std::function; -/// \brief Read encapsulated RPC message from position in file +/// \brief Read encapsulated IPC message from position in file /// /// Read a length-prefixed message flatbuffer starting at the indicated file /// offset. If the message has a body with non-zero length, it will also be @@ -469,7 +469,7 @@ Result> ReadMessage( const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {}); -/// \brief Read encapsulated RPC message from position in file +/// \brief Read encapsulated IPC message from position in file /// /// Read a length-prefixed message flatbuffer starting at the indicated file /// offset. @@ -489,7 +489,7 @@ Result> ReadMessage(const int64_t offset, const int64_t body_length, io::RandomAccessFile* file); -/// \brief Read encapsulated RPC message from cached buffers +/// \brief Read encapsulated IPC message from cached buffers /// /// The buffers should contain an entire message. Partial reads are not handled. /// diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index e872abb41f8..86cd0e06ab0 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -3052,23 +3052,41 @@ void GetReadRecordBatchReadRanges( auto footer_length = bit_util::FromLittleEndian( util::SafeLoadAs(buffer->data() + footer_length_offset)); + // there are at least 2 read IOs before reading body: + // 1) read magic and footer length IO + // 2) footer IO + EXPECT_GE(read_ranges.size(), 2); + // read magic and footer length IO EXPECT_EQ(read_ranges[0].length, file_end_size); // read footer IO EXPECT_EQ(read_ranges[1].length, footer_length); - // there are 3 read IOs before reading body: - // 1) read magic and footer length IO - // 2) read footer IO - // 3) read record batch metadata IO if (included_fields.empty()) { + // When no fields are explicitly included, the reader optimizes by + // reading metadata and the entire body in a single IO. + // Thus, there are exactly 3 read IOs in total: + // 1) magic and footer length + // 2) footer + // 3) record batch metadata + body EXPECT_EQ(read_ranges.size(), 3); int64_t total_body = 0; for (auto len : expected_body_read_lengths) total_body += len; + // In the optimized path (included_fields is empty), the 3rd read operation + // fetches both the message metadata (flatbuffer) and the entire message body + // in one contiguous block. Therefore, its length must at least exceed the + // total body length by the size of the metadata. EXPECT_GT(read_ranges[2].length, total_body); + EXPECT_LE(read_ranges[2].length, total_body + footer_length); } else { + // When fields are filtered, we see 3 initial reads followed by N body reads + // (one for each field/buffer range): + // 1) magic and footer length + // 2) footer + // 3) record batch metadata + // 4) individual body buffer reads EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); // read record batch metadata. The exact size is tricky to determine but it doesn't From ff6a9e5a68203007e2f11c69415b58d3011bc441 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 17 Feb 2026 12:20:12 +0100 Subject: [PATCH 5/5] Cosmetic style changes --- cpp/src/arrow/ipc/message.cc | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 2acf2bb0bfc..c21eb913c38 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -382,12 +382,8 @@ static Result> ReadMessageInternal( // When body_length is known, read metadata + body in one IO call. // Otherwise, read only metadata first. - std::shared_ptr metadata; - if (body_length.has_value()) { - ARROW_ASSIGN_OR_RAISE(metadata, file->ReadAt(offset, metadata_length + *body_length)); - } else { - ARROW_ASSIGN_OR_RAISE(metadata, file->ReadAt(offset, metadata_length)); - } + ARROW_ASSIGN_OR_RAISE(std::shared_ptr metadata, + file->ReadAt(offset, metadata_length + body_length.value_or(0))); if (metadata->size() < metadata_length) { return Status::Invalid("Expected to read ", metadata_length, @@ -445,14 +441,16 @@ static Result> ReadMessageInternal( Result> ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { - return ReadMessageInternal(offset, metadata_length, std::nullopt, file, fields_loader); + return ReadMessageInternal(offset, metadata_length, /*body_length=*/std::nullopt, file, + fields_loader); } Result> ReadMessage(const int64_t offset, const int32_t metadata_length, const int64_t body_length, io::RandomAccessFile* file) { - return ReadMessageInternal(offset, metadata_length, body_length, file, {}); + return ReadMessageInternal(offset, metadata_length, body_length, file, + /*fields_loader=*/{}); } Future> ReadMessageAsync(int64_t offset, int32_t metadata_length,