Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,80 @@

namespace paimon::test {

class ControlledMockFileBatchReader : public MockFileBatchReader {
public:
ControlledMockFileBatchReader(const std::shared_ptr<arrow::Array>& data,
const std::shared_ptr<arrow::DataType>& file_schema,
int32_t read_batch_size,
std::vector<std::pair<uint64_t, uint64_t>> read_ranges,
bool need_prefetch, Status set_read_ranges_status = Status::OK())
: MockFileBatchReader(data, file_schema, read_batch_size),
read_ranges_(std::move(read_ranges)),
need_prefetch_(need_prefetch),
set_read_ranges_status_(std::move(set_read_ranges_status)) {}

Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
bool* need_prefetch) const override {
*need_prefetch = need_prefetch_;
return read_ranges_;
}

Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) override {
if (!set_read_ranges_status_.ok()) {
return set_read_ranges_status_;
}
return MockFileBatchReader::SetReadRanges(read_ranges);
}

private:
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
bool need_prefetch_ = true;
Status set_read_ranges_status_;
};

class ControlledMockFormatReaderBuilder : public ReaderBuilder {
public:
ControlledMockFormatReaderBuilder(const std::shared_ptr<arrow::Array>& data,
const std::shared_ptr<arrow::DataType>& schema,
int32_t read_batch_size,
std::vector<std::pair<uint64_t, uint64_t>> read_ranges,
bool need_prefetch,
std::vector<Status> set_read_ranges_statuses)
: data_(data),
schema_(schema),
read_batch_size_(read_batch_size),
read_ranges_(std::move(read_ranges)),
need_prefetch_(need_prefetch),
set_read_ranges_statuses_(std::move(set_read_ranges_statuses)) {}

ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) override {
return this;
}

Result<std::unique_ptr<FileBatchReader>> Build(
const std::shared_ptr<InputStream>& path) const override {
size_t index = build_count_++;
Comment thread
lucasfang marked this conversation as resolved.
Status set_read_ranges_status = index < set_read_ranges_statuses_.size()
? set_read_ranges_statuses_[index]
: Status::OK();
return std::make_unique<ControlledMockFileBatchReader>(
data_, schema_, read_batch_size_, read_ranges_, need_prefetch_, set_read_ranges_status);
}

Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path) const override {
return Status::Invalid("do not support build reader with path in mock format");
Comment thread
lucasfang marked this conversation as resolved.
}

private:
std::shared_ptr<arrow::Array> data_;
std::shared_ptr<arrow::DataType> schema_;
int32_t read_batch_size_ = 0;
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
bool need_prefetch_ = true;
std::vector<Status> set_read_ranges_statuses_;
mutable size_t build_count_ = 0;
};

struct TestParam {
std::string file_format;
PrefetchCacheMode cache_mode;
Expand Down Expand Up @@ -347,6 +421,27 @@ TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) {
ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
}

TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRangesDisablePrefetchByAdaptiveStrategy) {
auto data_array = PrepareArray(200);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 1;
ControlledMockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size,
/*read_ranges=*/{{0, 100}},
/*need_prefetch=*/true,
/*set_read_ranges_statuses=*/{});

ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
/*prefetch_batch_count=*/2,
/*enable_adaptive_prefetch_strategy=*/true, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));

ASSERT_FALSE(reader->NeedPrefetch());
}

TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
auto data_array = PrepareArray(400);
int32_t batch_size = 30;
Expand Down Expand Up @@ -384,6 +479,117 @@ TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
}

TEST_F(PrefetchFileBatchReaderImplTest, SetReadRangesReturnErrorWhenPushDownFailed) {
auto data_array = PrepareArray(100);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 2;
ControlledMockFormatReaderBuilder reader_builder(
data_array, data_type_, batch_size,
/*read_ranges=*/{{0, 50}, {50, 100}},
/*need_prefetch=*/true,
/*set_read_ranges_statuses=*/{Status::OK(), Status::IOError("set read ranges failed")});

ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));

auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->need_prefetch_ = true;
Comment thread
lucasfang marked this conversation as resolved.

Status status = prefetch_reader->SetReadRanges({{0, 50}, {50, 100}});
Comment thread
lucasfang marked this conversation as resolved.
ASSERT_FALSE(status.ok());
ASSERT_TRUE(status.IsIOError());
}

TEST_F(PrefetchFileBatchReaderImplTest, NeedInitCacheNeverMode) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::NEVER,
CacheConfig(), GetDefaultPool()));

auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
ASSERT_FALSE(prefetch_reader->NeedInitCache());
}

TEST_F(PrefetchFileBatchReaderImplTest, WorkloopSetReadStatusWhenCacheInitFailed) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
CacheConfig invalid_cache_config(
/*buffer_size_limit=*/512 * 1024,
/*range_size_limit=*/static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1,
/*hole_size_limit=*/8 * 1024,
/*pre_buffer_limit=*/128 * 1024);

ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
invalid_cache_config, GetDefaultPool()));

auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->Workloop();

Status status = prefetch_reader->GetReadStatus();
ASSERT_FALSE(status.ok());
ASSERT_TRUE(status.IsInvalid());
}

TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenShutdown) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));

auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->is_shutdown_ = true;
Comment thread
lucasfang marked this conversation as resolved.
ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
}

TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenNoCurrentReadRange) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));

auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->read_ranges_in_group_ = {{}};
ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
}

TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) {
auto data_array = PrepareArray(101);
int32_t batch_size = 150;
Expand Down
Loading