Skip to content

Commit 4acaa8a

Browse files
authored
Merge branch 'main' into dev_parquet
2 parents 4f8937f + 5363e61 commit 4acaa8a

1 file changed

Lines changed: 206 additions & 0 deletions

File tree

src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,80 @@
3838

3939
namespace paimon::test {
4040

41+
class ControlledMockFileBatchReader : public MockFileBatchReader {
42+
public:
43+
ControlledMockFileBatchReader(const std::shared_ptr<arrow::Array>& data,
44+
const std::shared_ptr<arrow::DataType>& file_schema,
45+
int32_t read_batch_size,
46+
std::vector<std::pair<uint64_t, uint64_t>> read_ranges,
47+
bool need_prefetch, Status set_read_ranges_status = Status::OK())
48+
: MockFileBatchReader(data, file_schema, read_batch_size),
49+
read_ranges_(std::move(read_ranges)),
50+
need_prefetch_(need_prefetch),
51+
set_read_ranges_status_(std::move(set_read_ranges_status)) {}
52+
53+
Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
54+
bool* need_prefetch) const override {
55+
*need_prefetch = need_prefetch_;
56+
return read_ranges_;
57+
}
58+
59+
Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) override {
60+
if (!set_read_ranges_status_.ok()) {
61+
return set_read_ranges_status_;
62+
}
63+
return MockFileBatchReader::SetReadRanges(read_ranges);
64+
}
65+
66+
private:
67+
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
68+
bool need_prefetch_ = true;
69+
Status set_read_ranges_status_;
70+
};
71+
72+
class ControlledMockFormatReaderBuilder : public ReaderBuilder {
73+
public:
74+
ControlledMockFormatReaderBuilder(const std::shared_ptr<arrow::Array>& data,
75+
const std::shared_ptr<arrow::DataType>& schema,
76+
int32_t read_batch_size,
77+
std::vector<std::pair<uint64_t, uint64_t>> read_ranges,
78+
bool need_prefetch,
79+
std::vector<Status> set_read_ranges_statuses)
80+
: data_(data),
81+
schema_(schema),
82+
read_batch_size_(read_batch_size),
83+
read_ranges_(std::move(read_ranges)),
84+
need_prefetch_(need_prefetch),
85+
set_read_ranges_statuses_(std::move(set_read_ranges_statuses)) {}
86+
87+
ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) override {
88+
return this;
89+
}
90+
91+
Result<std::unique_ptr<FileBatchReader>> Build(
92+
const std::shared_ptr<InputStream>& path) const override {
93+
size_t index = build_count_++;
94+
Status set_read_ranges_status = index < set_read_ranges_statuses_.size()
95+
? set_read_ranges_statuses_[index]
96+
: Status::OK();
97+
return std::make_unique<ControlledMockFileBatchReader>(
98+
data_, schema_, read_batch_size_, read_ranges_, need_prefetch_, set_read_ranges_status);
99+
}
100+
101+
Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path) const override {
102+
return Status::Invalid("do not support build reader with path in mock format");
103+
}
104+
105+
private:
106+
std::shared_ptr<arrow::Array> data_;
107+
std::shared_ptr<arrow::DataType> schema_;
108+
int32_t read_batch_size_ = 0;
109+
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
110+
bool need_prefetch_ = true;
111+
std::vector<Status> set_read_ranges_statuses_;
112+
mutable size_t build_count_ = 0;
113+
};
114+
41115
struct TestParam {
42116
std::string file_format;
43117
PrefetchCacheMode cache_mode;
@@ -347,6 +421,27 @@ TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) {
347421
ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
348422
}
349423

424+
TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRangesDisablePrefetchByAdaptiveStrategy) {
425+
auto data_array = PrepareArray(200);
426+
int32_t batch_size = 10;
427+
int32_t prefetch_max_parallel_num = 1;
428+
ControlledMockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size,
429+
/*read_ranges=*/{{0, 100}},
430+
/*need_prefetch=*/true,
431+
/*set_read_ranges_statuses=*/{});
432+
433+
ASSERT_OK_AND_ASSIGN(
434+
auto reader,
435+
PrefetchFileBatchReaderImpl::Create(
436+
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
437+
/*prefetch_batch_count=*/2,
438+
/*enable_adaptive_prefetch_strategy=*/true, executor_,
439+
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
440+
CacheConfig(), GetDefaultPool()));
441+
442+
ASSERT_FALSE(reader->NeedPrefetch());
443+
}
444+
350445
TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
351446
auto data_array = PrepareArray(400);
352447
int32_t batch_size = 30;
@@ -384,6 +479,117 @@ TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
384479
ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
385480
}
386481

482+
TEST_F(PrefetchFileBatchReaderImplTest, SetReadRangesReturnErrorWhenPushDownFailed) {
483+
auto data_array = PrepareArray(100);
484+
int32_t batch_size = 10;
485+
int32_t prefetch_max_parallel_num = 2;
486+
ControlledMockFormatReaderBuilder reader_builder(
487+
data_array, data_type_, batch_size,
488+
/*read_ranges=*/{{0, 50}, {50, 100}},
489+
/*need_prefetch=*/true,
490+
/*set_read_ranges_statuses=*/{Status::OK(), Status::IOError("set read ranges failed")});
491+
492+
ASSERT_OK_AND_ASSIGN(
493+
auto reader,
494+
PrefetchFileBatchReaderImpl::Create(
495+
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
496+
prefetch_max_parallel_num * 2,
497+
/*enable_adaptive_prefetch_strategy=*/false, executor_,
498+
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
499+
CacheConfig(), GetDefaultPool()));
500+
501+
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
502+
prefetch_reader->need_prefetch_ = true;
503+
504+
Status status = prefetch_reader->SetReadRanges({{0, 50}, {50, 100}});
505+
ASSERT_FALSE(status.ok());
506+
ASSERT_TRUE(status.IsIOError());
507+
}
508+
509+
TEST_F(PrefetchFileBatchReaderImplTest, NeedInitCacheNeverMode) {
510+
auto data_array = PrepareArray(10);
511+
int32_t batch_size = 5;
512+
int32_t prefetch_max_parallel_num = 1;
513+
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
514+
ASSERT_OK_AND_ASSIGN(
515+
auto reader,
516+
PrefetchFileBatchReaderImpl::Create(
517+
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
518+
prefetch_max_parallel_num * 2,
519+
/*enable_adaptive_prefetch_strategy=*/false, executor_,
520+
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::NEVER,
521+
CacheConfig(), GetDefaultPool()));
522+
523+
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
524+
ASSERT_FALSE(prefetch_reader->NeedInitCache());
525+
}
526+
527+
TEST_F(PrefetchFileBatchReaderImplTest, WorkloopSetReadStatusWhenCacheInitFailed) {
528+
auto data_array = PrepareArray(10);
529+
int32_t batch_size = 5;
530+
int32_t prefetch_max_parallel_num = 1;
531+
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
532+
CacheConfig invalid_cache_config(
533+
/*buffer_size_limit=*/512 * 1024,
534+
/*range_size_limit=*/static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1,
535+
/*hole_size_limit=*/8 * 1024,
536+
/*pre_buffer_limit=*/128 * 1024);
537+
538+
ASSERT_OK_AND_ASSIGN(
539+
auto reader,
540+
PrefetchFileBatchReaderImpl::Create(
541+
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
542+
prefetch_max_parallel_num * 2,
543+
/*enable_adaptive_prefetch_strategy=*/false, executor_,
544+
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
545+
invalid_cache_config, GetDefaultPool()));
546+
547+
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
548+
prefetch_reader->Workloop();
549+
550+
Status status = prefetch_reader->GetReadStatus();
551+
ASSERT_FALSE(status.ok());
552+
ASSERT_TRUE(status.IsInvalid());
553+
}
554+
555+
TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenShutdown) {
556+
auto data_array = PrepareArray(10);
557+
int32_t batch_size = 5;
558+
int32_t prefetch_max_parallel_num = 1;
559+
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
560+
ASSERT_OK_AND_ASSIGN(
561+
auto reader,
562+
PrefetchFileBatchReaderImpl::Create(
563+
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
564+
prefetch_max_parallel_num * 2,
565+
/*enable_adaptive_prefetch_strategy=*/false, executor_,
566+
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
567+
CacheConfig(), GetDefaultPool()));
568+
569+
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
570+
prefetch_reader->is_shutdown_ = true;
571+
ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
572+
}
573+
574+
TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenNoCurrentReadRange) {
575+
auto data_array = PrepareArray(10);
576+
int32_t batch_size = 5;
577+
int32_t prefetch_max_parallel_num = 1;
578+
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
579+
ASSERT_OK_AND_ASSIGN(
580+
auto reader,
581+
PrefetchFileBatchReaderImpl::Create(
582+
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
583+
prefetch_max_parallel_num * 2,
584+
/*enable_adaptive_prefetch_strategy=*/false, executor_,
585+
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
586+
CacheConfig(), GetDefaultPool()));
587+
588+
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
589+
prefetch_reader->read_ranges_in_group_ = {{}};
590+
ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
591+
}
592+
387593
TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) {
388594
auto data_array = PrepareArray(101);
389595
int32_t batch_size = 150;

0 commit comments

Comments
 (0)