Skip to content

Commit 6799e24

Browse files
committed
Merge branch 'main' of https://github.com/apache/arrow into fix-mingw-json-test-segfault
2 parents 67a4188 + c8e069d commit 6799e24

File tree

22 files changed

+372
-121
lines changed

22 files changed

+372
-121
lines changed

cpp/src/arrow/filesystem/s3fs.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@
119119
#include "arrow/util/string.h"
120120
#include "arrow/util/task_group.h"
121121
#include "arrow/util/thread_pool.h"
122-
#include "arrow/util/value_parsing.h"
123122

124123
namespace arrow::fs {
125124

@@ -3579,9 +3578,10 @@ S3GlobalOptions S3GlobalOptions::Defaults() {
35793578
log_level = S3LogLevel::Off;
35803579
}
35813580

3582-
value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1");
3583-
if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), &u)) {
3584-
num_event_loop_threads = u;
3581+
auto maybe_num_threads =
3582+
arrow::internal::GetEnvVarInteger("ARROW_S3_THREADS", /*min_value=*/1);
3583+
if (maybe_num_threads.ok()) {
3584+
num_event_loop_threads = static_cast<int>(*maybe_num_threads);
35853585
}
35863586
return S3GlobalOptions{log_level, num_event_loop_threads};
35873587
}

cpp/src/arrow/io/interfaces.cc

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -390,23 +390,15 @@ namespace {
390390
constexpr int kDefaultNumIoThreads = 8;
391391

392392
std::shared_ptr<ThreadPool> MakeIOThreadPool() {
393-
int threads = 0;
394-
auto maybe_env_var = ::arrow::internal::GetEnvVar("ARROW_IO_THREADS");
395-
if (maybe_env_var.ok()) {
396-
auto str = *std::move(maybe_env_var);
397-
if (!str.empty()) {
398-
try {
399-
threads = std::stoi(str);
400-
} catch (...) {
401-
}
402-
if (threads <= 0) {
403-
ARROW_LOG(WARNING)
404-
<< "ARROW_IO_THREADS does not contain a valid number of threads "
405-
"(should be an integer > 0)";
406-
}
407-
}
393+
int threads = kDefaultNumIoThreads;
394+
auto maybe_num_threads = ::arrow::internal::GetEnvVarInteger(
395+
"ARROW_IO_THREADS", /*min_value=*/1, /*max_value=*/std::numeric_limits<int>::max());
396+
if (maybe_num_threads.ok()) {
397+
threads = static_cast<int>(*maybe_num_threads);
398+
} else if (!maybe_num_threads.status().IsKeyError()) {
399+
maybe_num_threads.status().Warn();
408400
}
409-
auto maybe_pool = ThreadPool::MakeEternal(threads > 0 ? threads : kDefaultNumIoThreads);
401+
auto maybe_pool = ThreadPool::MakeEternal(threads);
410402
if (!maybe_pool.ok()) {
411403
maybe_pool.status().Abort("Failed to create global IO thread pool");
412404
}

cpp/src/arrow/ipc/message.cc

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <cstddef>
2222
#include <cstdint>
2323
#include <memory>
24+
#include <optional>
2425
#include <string>
2526
#include <utility>
2627
#include <vector>
@@ -363,9 +364,13 @@ Result<std::unique_ptr<Message>> ReadMessage(std::shared_ptr<Buffer> metadata,
363364
}
364365
}
365366

366-
Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
367-
io::RandomAccessFile* file,
368-
const FieldsLoaderFunction& fields_loader) {
367+
// Common helper for the two ReadMessage overloads that take a file + offset.
368+
// When body_length is provided, metadata and body are read in a single IO.
369+
// When body_length is absent, metadata is read first, then the body is read
370+
// separately.
371+
static Result<std::unique_ptr<Message>> ReadMessageInternal(
372+
int64_t offset, int32_t metadata_length, std::optional<int64_t> body_length,
373+
io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) {
369374
std::unique_ptr<Message> result;
370375
auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
371376
MessageDecoder decoder(listener);
@@ -375,15 +380,18 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
375380
decoder.next_required_size());
376381
}
377382

378-
// TODO(GH-48846): we should take a body_length just like ReadMessageAsync
379-
// and read metadata + body in one go.
380-
ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
383+
// When body_length is known, read metadata + body in one IO call.
384+
// Otherwise, read only metadata first.
385+
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> metadata,
386+
file->ReadAt(offset, metadata_length + body_length.value_or(0)));
387+
381388
if (metadata->size() < metadata_length) {
382389
return Status::Invalid("Expected to read ", metadata_length,
383390
" metadata bytes at offset ", offset, " but got ",
384391
metadata->size());
385392
}
386-
ARROW_RETURN_NOT_OK(decoder.Consume(metadata));
393+
394+
ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length)));
387395

388396
switch (decoder.state()) {
389397
case MessageDecoder::State::INITIAL:
@@ -398,14 +406,23 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
398406
case MessageDecoder::State::BODY: {
399407
std::shared_ptr<Buffer> body;
400408
if (fields_loader) {
409+
// Selective field loading: allocate a body buffer and read only the
410+
// requested field ranges into it.
401411
ARROW_ASSIGN_OR_RAISE(
402412
body, AllocateBuffer(decoder.next_required_size(), default_memory_pool()));
403413
RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, fields_loader,
404-
metadata, decoder.next_required_size(), body));
414+
SliceBuffer(metadata, 0, metadata_length),
415+
decoder.next_required_size(), body));
416+
} else if (body_length.has_value()) {
417+
// Body was already read as part of the combined IO; just slice it out.
418+
body = SliceBuffer(metadata, metadata_length,
419+
std::min(*body_length, metadata->size() - metadata_length));
405420
} else {
421+
// Body length was unknown; do a separate IO to read the body.
406422
ARROW_ASSIGN_OR_RAISE(
407423
body, file->ReadAt(offset + metadata_length, decoder.next_required_size()));
408424
}
425+
409426
if (body->size() < decoder.next_required_size()) {
410427
return Status::IOError("Expected to be able to read ",
411428
decoder.next_required_size(),
@@ -421,6 +438,21 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
421438
}
422439
}
423440

441+
Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
442+
io::RandomAccessFile* file,
443+
const FieldsLoaderFunction& fields_loader) {
444+
return ReadMessageInternal(offset, metadata_length, /*body_length=*/std::nullopt, file,
445+
fields_loader);
446+
}
447+
448+
Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
449+
const int32_t metadata_length,
450+
const int64_t body_length,
451+
io::RandomAccessFile* file) {
452+
return ReadMessageInternal(offset, metadata_length, body_length, file,
453+
/*fields_loader=*/{});
454+
}
455+
424456
Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t metadata_length,
425457
int64_t body_length,
426458
io::RandomAccessFile* file,

cpp/src/arrow/ipc/message.h

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ class ARROW_EXPORT MessageReader {
449449
// org::apache::arrow::flatbuf::RecordBatch*)
450450
using FieldsLoaderFunction = std::function<Status(const void*, io::RandomAccessFile*)>;
451451

452-
/// \brief Read encapsulated RPC message from position in file
452+
/// \brief Read encapsulated IPC message from position in file
453453
///
454454
/// Read a length-prefixed message flatbuffer starting at the indicated file
455455
/// offset. If the message has a body with non-zero length, it will also be
@@ -469,7 +469,27 @@ Result<std::unique_ptr<Message>> ReadMessage(
469469
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
470470
const FieldsLoaderFunction& fields_loader = {});
471471

472-
/// \brief Read encapsulated RPC message from cached buffers
472+
/// \brief Read encapsulated IPC message from position in file
473+
///
474+
/// Read a length-prefixed message flatbuffer starting at the indicated file
475+
/// offset.
476+
///
477+
/// The metadata_length includes at least the length prefix and the flatbuffer
478+
///
479+
/// \param[in] offset the position in the file where the message starts. The
480+
/// first 4 bytes after the offset are the message length
481+
/// \param[in] metadata_length the total number of bytes to read from file
482+
/// \param[in] body_length the number of bytes for the message body
483+
/// \param[in] file the seekable file interface to read from
484+
/// \return the message read
485+
486+
ARROW_EXPORT
487+
Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
488+
const int32_t metadata_length,
489+
const int64_t body_length,
490+
io::RandomAccessFile* file);
491+
492+
/// \brief Read encapsulated IPC message from cached buffers
473493
///
474494
/// The buffers should contain an entire message. Partial reads are not handled.
475495
///

cpp/src/arrow/ipc/read_write_test.cc

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -552,9 +552,15 @@ class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>,
552552
ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length,
553553
&body_length, options_));
554554

555-
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
555+
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message1,
556556
ReadMessage(0, metadata_length, mmap_.get()));
557-
ASSERT_EQ(expected_version, message->metadata_version());
557+
ASSERT_EQ(expected_version, message1->metadata_version());
558+
559+
ASSERT_OK_AND_ASSIGN(auto message2,
560+
ReadMessage(0, metadata_length, body_length, mmap_.get()));
561+
ASSERT_EQ(expected_version, message2->metadata_version());
562+
563+
ASSERT_TRUE(message1->Equals(*message2));
558564
}
559565
};
560566

@@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) {
613619
ASSERT_EQ(nullptr, message);
614620
}
615621

622+
TEST(TestReadMessage, ReadBodyWithLength) {
623+
// Test the optimized ReadMessage(offset, meta_len, body_len, file) overload
624+
std::shared_ptr<RecordBatch> batch;
625+
ASSERT_OK(MakeIntRecordBatch(&batch));
626+
627+
ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0));
628+
int32_t metadata_length;
629+
int64_t body_length;
630+
ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length, &body_length,
631+
IpcWriteOptions::Defaults()));
632+
633+
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish());
634+
io::BufferReader reader(buffer);
635+
636+
ASSERT_OK_AND_ASSIGN(auto message,
637+
ReadMessage(0, metadata_length, body_length, &reader));
638+
639+
ASSERT_EQ(body_length, message->body_length());
640+
ASSERT_TRUE(message->Verify());
641+
}
642+
616643
TEST(TestMetadata, GetMetadataVersion) {
617644
ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion(
618645
flatbuf::MetadataVersion::MetadataVersion_V1));
@@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) {
10941121
&schema));
10951122

10961123
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
1097-
ReadMessage(0, metadata_length, mmap_.get()));
1124+
ReadMessage(0, metadata_length, body_length, mmap_.get()));
10981125

10991126
io::BufferReader reader(message->body());
11001127

@@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) {
11191146
&schema));
11201147

11211148
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
1122-
ReadMessage(0, metadata_length, mmap_.get()));
1149+
ReadMessage(0, metadata_length, body_length, mmap_.get()));
11231150

11241151
DictionaryMemo empty_memo;
11251152

@@ -3018,25 +3045,56 @@ void GetReadRecordBatchReadRanges(
30183045

30193046
auto read_ranges = tracked->get_read_ranges();
30203047

3021-
// there are 3 read IOs before reading body:
3022-
// 1) read magic and footer length IO
3023-
// 2) read footer IO
3024-
// 3) read record batch metadata IO
3025-
EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
30263048
const int32_t magic_size = static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
30273049
// read magic and footer length IO
30283050
auto file_end_size = magic_size + sizeof(int32_t);
30293051
auto footer_length_offset = buffer->size() - file_end_size;
30303052
auto footer_length = bit_util::FromLittleEndian(
30313053
util::SafeLoadAs<int32_t>(buffer->data() + footer_length_offset));
3054+
3055+
// there are at least 2 read IOs before reading body:
3056+
// 1) read magic and footer length IO
3057+
// 2) footer IO
3058+
EXPECT_GE(read_ranges.size(), 2);
3059+
3060+
// read magic and footer length IO
30323061
EXPECT_EQ(read_ranges[0].length, file_end_size);
30333062
// read footer IO
30343063
EXPECT_EQ(read_ranges[1].length, footer_length);
3035-
// read record batch metadata. The exact size is tricky to determine but it doesn't
3036-
// matter for this test and it should be smaller than the footer.
3037-
EXPECT_LE(read_ranges[2].length, footer_length);
3038-
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
3039-
EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
3064+
3065+
if (included_fields.empty()) {
3066+
// When no fields are explicitly included, the reader optimizes by
3067+
// reading metadata and the entire body in a single IO.
3068+
// Thus, there are exactly 3 read IOs in total:
3069+
// 1) magic and footer length
3070+
// 2) footer
3071+
// 3) record batch metadata + body
3072+
EXPECT_EQ(read_ranges.size(), 3);
3073+
3074+
int64_t total_body = 0;
3075+
for (auto len : expected_body_read_lengths) total_body += len;
3076+
3077+
// In the optimized path (included_fields is empty), the 3rd read operation
3078+
// fetches both the message metadata (flatbuffer) and the entire message body
3079+
// in one contiguous block. Therefore, its length must at least exceed the
3080+
// total body length by the size of the metadata.
3081+
EXPECT_GT(read_ranges[2].length, total_body);
3082+
EXPECT_LE(read_ranges[2].length, total_body + footer_length);
3083+
} else {
3084+
// When fields are filtered, we see 3 initial reads followed by N body reads
3085+
// (one for each field/buffer range):
3086+
// 1) magic and footer length
3087+
// 2) footer
3088+
// 3) record batch metadata
3089+
// 4) individual body buffer reads
3090+
EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
3091+
3092+
// read record batch metadata. The exact size is tricky to determine but it doesn't
3093+
// matter for this test and it should be smaller than the footer.
3094+
EXPECT_LE(read_ranges[2].length, footer_length);
3095+
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
3096+
EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
3097+
}
30403098
}
30413099
}
30423100

@@ -3186,7 +3244,9 @@ class PreBufferingTest : public ::testing::TestWithParam<bool> {
31863244
metadata_reads++;
31873245
}
31883246
}
3189-
ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered);
3247+
// With ReadMessage optimization, non-prebuffered reads verify metadata and body
3248+
// in a single large read, so we no longer see small metadata-only reads here.
3249+
ASSERT_EQ(metadata_reads, 0);
31903250
ASSERT_EQ(data_reads, reader_->num_record_batches());
31913251
}
31923252

cpp/src/arrow/ipc/reader.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,9 +1236,15 @@ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
12361236
const FileBlock& block, io::RandomAccessFile* file,
12371237
const FieldsLoaderFunction& fields_loader) {
12381238
RETURN_NOT_OK(CheckAligned(block));
1239-
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
1240-
file, fields_loader));
1241-
return CheckBodyLength(std::move(message), block);
1239+
if (fields_loader) {
1240+
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
1241+
file, fields_loader));
1242+
return CheckBodyLength(std::move(message), block);
1243+
} else {
1244+
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
1245+
block.body_length, file));
1246+
return CheckBodyLength(std::move(message), block);
1247+
}
12421248
}
12431249

12441250
Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(

cpp/src/arrow/json/reader_test.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@
3737
// On MinGW, install a crash handler to log the exception address for debugging
3838
// intermittent segfaults. See https://github.com/apache/arrow/issues/49272
3939
#if defined(__MINGW32__) || defined(__MINGW64__)
40-
# include <cstdio>
4140
# include <windows.h>
41+
# include <cstdio>
4242
namespace {
4343
LONG WINAPI ArrowCrashHandler(EXCEPTION_POINTERS* ep) {
44-
fprintf(stderr, "CRASH: code=0x%lx addr=%p\n",
45-
ep->ExceptionRecord->ExceptionCode,
44+
fprintf(stderr, "CRASH: code=0x%lx addr=%p\n", ep->ExceptionRecord->ExceptionCode,
4645
ep->ExceptionRecord->ExceptionAddress);
4746
fflush(stderr);
4847
return EXCEPTION_CONTINUE_SEARCH;

cpp/src/arrow/testing/gtest_util.cc

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -660,21 +660,24 @@ LocaleGuard::LocaleGuard(const char* new_locale) : impl_(new Impl(new_locale)) {
660660

661661
LocaleGuard::~LocaleGuard() {}
662662

663-
EnvVarGuard::EnvVarGuard(const std::string& name, const std::string& value)
664-
: name_(name) {
665-
auto maybe_value = arrow::internal::GetEnvVar(name);
663+
EnvVarGuard::EnvVarGuard(std::string name, std::optional<std::string> value)
664+
: name_(std::move(name)) {
665+
auto maybe_value = arrow::internal::GetEnvVar(name_);
666666
if (maybe_value.ok()) {
667-
was_set_ = true;
668667
old_value_ = *std::move(maybe_value);
669668
} else {
670-
was_set_ = false;
669+
old_value_ = std::nullopt;
670+
}
671+
if (value.has_value()) {
672+
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *value));
673+
} else {
674+
ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
671675
}
672-
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name, value));
673676
}
674677

675678
EnvVarGuard::~EnvVarGuard() {
676-
if (was_set_) {
677-
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, old_value_));
679+
if (old_value_.has_value()) {
680+
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *old_value_));
678681
} else {
679682
ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
680683
}

0 commit comments

Comments
 (0)