Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e8588b1
reverse parquet draft version
zhuqi-lucas Nov 20, 2025
2cf2e31
Support limit pushdown for reverse scan
zhuqi-lucas Nov 20, 2025
2f73a4a
Support row group level cache
zhuqi-lucas Nov 21, 2025
f546a7f
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 21, 2025
07ef9a2
fix
zhuqi-lucas Nov 21, 2025
c123a37
fmt
zhuqi-lucas Nov 21, 2025
46cfd89
add more test
zhuqi-lucas Nov 21, 2025
99e50de
Add metrics for reverse scan row groups.
zhuqi-lucas Nov 21, 2025
dbcf598
fix
zhuqi-lucas Nov 21, 2025
12f74d6
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 21, 2025
2cfd73e
optimize code
zhuqi-lucas Nov 21, 2025
3479672
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 21, 2025
98fac46
Add more comments
zhuqi-lucas Nov 21, 2025
92b6487
fmt
zhuqi-lucas Nov 21, 2025
475bf3d
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 21, 2025
8ccca52
add enable/disable option
zhuqi-lucas Nov 22, 2025
5d63557
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 22, 2025
a2b44a5
fix slt
zhuqi-lucas Nov 22, 2025
b9f8199
fix proto
zhuqi-lucas Nov 22, 2025
3502c10
fix
zhuqi-lucas Nov 22, 2025
fca325c
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 22, 2025
9af4fba
Update doc
zhuqi-lucas Nov 22, 2025
7a63ddd
Add reverse files testing in slt
zhuqi-lucas Nov 22, 2025
dfb29d7
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 22, 2025
52c9b30
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 24, 2025
bb31251
simple test
zhuqi-lucas Nov 24, 2025
45089c5
Change to sort pushdown architecture
zhuqi-lucas Nov 24, 2025
d15994b
Merge branch 'main' into reverse_parquet
zhuqi-lucas Nov 24, 2025
9452005
fix
zhuqi-lucas Nov 24, 2025
d338461
make code easy
zhuqi-lucas Nov 24, 2025
2d77d77
proto fix
zhuqi-lucas Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,15 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// Enable sort pushdown optimization for sorted Parquet files.
/// Currently, this optimization only has reverse order support.
/// When a query requires ordering that can be satisfied by reversing
/// the file's natural ordering, row groups and batches are read in
/// reverse order to eliminate sort operations.
/// Note: This buffers one row group at a time (typically ~128MB).
/// Default: true
pub enable_sort_pushdown: bool, default = true
}
}

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl ParquetOptions {
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
max_predicate_cache_size: _,
enable_sort_pushdown: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -475,6 +476,7 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
enable_sort_pushdown: true,
}
}

Expand Down Expand Up @@ -590,6 +592,7 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
enable_sort_pushdown: true,
},
column_specific_options,
key_value_metadata,
Expand Down
268 changes: 268 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1175,3 +1175,271 @@ async fn explain_analyze_hash_join() {
);
}
}

#[tokio::test]
async fn parquet_explain_analyze_reverse_scan_metrics() {
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;

let temp_dir = TempDir::new().unwrap();
let parquet_path = temp_dir.path().join("reverse_scan_test.parquet");

// Create test data with multiple row groups
// Each row group will have 5 rows, total 15 rows = 3 row groups
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Int32, false),
]));

let file = File::create(&parquet_path).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_size(5)
.build();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();

// Write 3 row groups: [1-5], [6-10], [11-15]
for group_start in [1, 6, 11] {
let ids = Int32Array::from(vec![
group_start,
group_start + 1,
group_start + 2,
group_start + 3,
group_start + 4,
]);
let values = Int32Array::from(vec![
group_start * 100,
(group_start + 1) * 100,
(group_start + 2) * 100,
(group_start + 3) * 100,
(group_start + 4) * 100,
]);

let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(values)])
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();

let ctx = SessionContext::new();

// Register table with ORDER BY clause to enable reverse scan optimization
let sql = format!(
"CREATE EXTERNAL TABLE reverse_scan_test (
id INT NOT NULL,
value INT NOT NULL
)
STORED AS PARQUET
WITH ORDER (id ASC NULLS LAST)
LOCATION '{}'",
parquet_path.to_str().unwrap()
);
ctx.sql(&sql).await.unwrap().collect().await.unwrap();

// Test 1: Reverse scan with LIMIT 10
// With 3 row groups of 5 rows each, LIMIT 10 requires reading 2 row groups
// Expected: row_groups_reversed=2, batches_reversed=2
// (last row group gives 5 rows, second-to-last gives 5 more rows = 10 total)
let sql = "EXPLAIN ANALYZE SELECT * FROM reverse_scan_test ORDER BY id DESC LIMIT 10";
let actual = execute_to_batches(&ctx, sql).await;
let formatted = arrow::util::pretty::pretty_format_batches(&actual)
.unwrap()
.to_string();

// Verify the reverse scan optimization was applied
assert_contains!(&formatted, "output_ordering=[id@0 DESC]");

// Verify reverse scan metrics with LIMIT
// After the bugfix in ReversedParquetStreamWithLimit::finalize_current_row_group,
// these metrics should be correctly reported (previously they were 0)
assert_metrics!(&formatted, "DataSourceExec", "row_groups_reversed=2");
assert_metrics!(&formatted, "DataSourceExec", "batches_reversed=2");
assert_metrics!(&formatted, "DataSourceExec", "output_rows=10");

// Test 2: Full reverse scan (no LIMIT)
// Expected: row_groups_reversed=3, batches_reversed=3
// (all 3 row groups need to be reversed)
let sql = "EXPLAIN ANALYZE SELECT * FROM reverse_scan_test ORDER BY id DESC";
let actual = execute_to_batches(&ctx, sql).await;
let formatted = arrow::util::pretty::pretty_format_batches(&actual)
.unwrap()
.to_string();

// Verify reverse scan metrics without LIMIT
assert_metrics!(&formatted, "DataSourceExec", "row_groups_reversed=3");
assert_metrics!(&formatted, "DataSourceExec", "batches_reversed=3");
assert_metrics!(&formatted, "DataSourceExec", "output_rows=15");

// Verify that reverse_time metric exists and is non-zero
assert_contains!(&formatted, "reverse_time=");

// Test 3: Verify data correctness with LIMIT
let sql = "SELECT * FROM reverse_scan_test ORDER BY id DESC LIMIT 10";
let actual = execute_to_batches(&ctx, sql).await;

// Collect all rows from all batches
let total_rows: usize = actual.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 10,
"Should return exactly 10 rows with LIMIT 10"
);

// Collect all id values
let mut all_ids = Vec::new();
for batch in &actual {
let id_col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
for i in 0..id_col.len() {
all_ids.push(id_col.value(i));
}
}

// Should get ids from 15 down to 6 (10 rows total in descending order)
assert_eq!(all_ids.len(), 10);
assert_eq!(all_ids[0], 15, "First row should be id=15");
assert_eq!(all_ids[9], 6, "Last row should be id=6");

// Verify all values are in descending order
for i in 0..all_ids.len() - 1 {
assert!(
all_ids[i] > all_ids[i + 1],
"IDs should be in descending order: {} > {}",
all_ids[i],
all_ids[i + 1]
);
}
}

#[tokio::test]
async fn parquet_explain_analyze_disable_reverse_scan_metrics() {
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;

let temp_dir = TempDir::new().unwrap();
let parquet_path = temp_dir.path().join("reverse_scan_test.parquet");

// Create test data with multiple row groups
// Each row group will have 5 rows, total 15 rows = 3 row groups
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Int32, false),
]));

let file = File::create(&parquet_path).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_size(5)
.build();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();

// Write 3 row groups: [1-5], [6-10], [11-15]
for group_start in [1, 6, 11] {
let ids = Int32Array::from(vec![
group_start,
group_start + 1,
group_start + 2,
group_start + 3,
group_start + 4,
]);
let values = Int32Array::from(vec![
group_start * 100,
(group_start + 1) * 100,
(group_start + 2) * 100,
(group_start + 3) * 100,
(group_start + 4) * 100,
]);

let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(values)])
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();

// Create session context with reverse scan DISABLED
let session_config = SessionContext::new()
.task_ctx()
.session_config()
.clone()
.with_parquet_reverse_scan(false);
let ctx = SessionContext::new_with_config(session_config);

// Register table with ORDER BY clause
let sql = format!(
"CREATE EXTERNAL TABLE reverse_scan_test (
id INT NOT NULL,
value INT NOT NULL
)
STORED AS PARQUET
WITH ORDER (id ASC NULLS LAST)
LOCATION '{}'",
parquet_path.to_str().unwrap()
);
ctx.sql(&sql).await.unwrap().collect().await.unwrap();

// Test: Query with ORDER BY DESC and LIMIT
// Since reverse scan is DISABLED, this should use SortExec instead
let sql = "EXPLAIN ANALYZE SELECT * FROM reverse_scan_test ORDER BY id DESC LIMIT 10";
let actual = execute_to_batches(&ctx, sql).await;
let formatted = arrow::util::pretty::pretty_format_batches(&actual)
.unwrap()
.to_string();

// ========== Key Assertions: Verify reverse scan was NOT applied ==========

// 1. Should have a SortExec node (reverse scan optimization not applied)
assert_contains!(&formatted, "SortExec:");

// 2. DataSourceExec should have FORWARD ordering (ASC), not reversed
assert_contains!(&formatted, "output_ordering=[id@0 ASC");

// 3. Reverse scan metrics should all be 0 (no reversal happened)
assert_metrics!(&formatted, "DataSourceExec", "row_groups_reversed=0");
assert_metrics!(&formatted, "DataSourceExec", "batches_reversed=0");

// 4. DataSourceExec should still output rows (just in forward order)
// The SortExec will handle the reversal
assert_metrics!(&formatted, "DataSourceExec", "output_rows=15");

// Test data correctness: result should still be correct (SortExec handles it)
let sql = "SELECT * FROM reverse_scan_test ORDER BY id DESC LIMIT 10";
let actual = execute_to_batches(&ctx, sql).await;

// Collect all rows from all batches
let total_rows: usize = actual.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 10,
"Should return exactly 10 rows with LIMIT 10"
);

// Collect all id values
let mut all_ids = Vec::new();
for batch in &actual {
let id_col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
for i in 0..id_col.len() {
all_ids.push(id_col.value(i));
}
}

// Should get ids from 15 down to 6 (10 rows total in descending order)
// Even though reverse scan is disabled, SortExec should produce correct results
assert_eq!(all_ids.len(), 10);
assert_eq!(all_ids[0], 15, "First row should be id=15");
assert_eq!(all_ids[9], 6, "Last row should be id=6");

// Verify all values are in descending order
for i in 0..all_ids.len() - 1 {
assert!(
all_ids[i] > all_ids[i + 1],
"IDs should be in descending order: {} > {}",
all_ids[i],
all_ids[i + 1]
);
}
}
24 changes: 24 additions & 0 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ pub struct ParquetFileMetrics {
/// number of rows that were stored in the cache after evaluating predicates
/// reused for the output.
pub predicate_cache_records: Count,

/// Number of row groups reversed during reverse scan optimization
pub row_groups_reversed: Count,
/// Number of batches reversed within row groups
pub batches_reversed: Count,
/// Total time spent reversing batches
pub reverse_time: Time,
}

impl ParquetFileMetrics {
Expand Down Expand Up @@ -162,6 +169,20 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("predicate_cache_records", partition);

// Reverse scan metrics
let row_groups_reversed = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("row_groups_reversed", partition);

let batches_reversed = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("batches_reversed", partition);

let reverse_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("reverse_time", partition);

Self {
files_ranges_pruned_statistics,
predicate_evaluation_errors,
Expand All @@ -179,6 +200,9 @@ impl ParquetFileMetrics {
scan_efficiency_ratio,
predicate_cache_inner_records,
predicate_cache_records,
row_groups_reversed,
batches_reversed,
reverse_time,
}
}
}
Loading