diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 6db4eed42c0f3..06011f990ed61 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -99,6 +99,9 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) +# Sorted Data Benchmarks (ORDER BY Optimization) +data_sorted_clickbench: ClickBench queries on pre-sorted data WITH sort order info (tests sort elimination optimization) + # H2O.ai Benchmarks (Group By, Join, Window) h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv @@ -322,6 +325,9 @@ main() { compile_profile) data_tpch "1" ;; + data_sorted_clickbench) + data_sorted_clickbench + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -505,6 +511,9 @@ main() { compile_profile) run_compile_profile "${PROFILE_ARGS[@]}" ;; + data_sorted_clickbench) + run_data_sorted_clickbench + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" usage @@ -1197,6 +1206,86 @@ compare_benchmarks() { } +# Sorted Data Benchmark Functions (Optimized for hits_0.parquet) +# Add these functions to bench.sh + +# Creates sorted ClickBench data from hits_0.parquet (partitioned dataset) +# The data is sorted by EventTime in ascending order +# Using hits_0.parquet (~150MB) instead of full hits.parquet (~14GB) for faster testing +data_sorted_clickbench() { + SORTED_FILE="${DATA_DIR}/hits_0_sorted.parquet" + ORIGINAL_FILE="${DATA_DIR}/hits_partitioned/hits_0.parquet" + + echo "Creating sorted ClickBench dataset from hits_0.parquet..." + + # Check if partitioned data exists + if [ ! -f "${ORIGINAL_FILE}" ]; then + echo "hits_partitioned/hits_0.parquet not found. Running data_clickbench_partitioned first..." + data_clickbench_partitioned + fi + + # Check if sorted file already exists + if [ -f "${SORTED_FILE}" ]; then + echo "Sorted hits_0.parquet already exists at ${SORTED_FILE}" + return 0 + fi + + echo "Sorting hits_0.parquet by EventTime (this takes ~10 seconds)..." + + # Ensure virtual environment exists and has pyarrow + if [ ! -d "$VIRTUAL_ENV" ]; then + echo "Creating virtual environment at $VIRTUAL_ENV..." + python3 -m venv "$VIRTUAL_ENV" + fi + + # Activate virtual environment + source "$VIRTUAL_ENV/bin/activate" + + # Check and install pyarrow if needed + if ! python3 -c "import pyarrow" 2>/dev/null; then + echo "Installing pyarrow (this may take a minute)..." + pip install --quiet pyarrow + fi + + # Use the standalone Python script to sort + python3 "${SCRIPT_DIR}"/sort_clickbench.py "${ORIGINAL_FILE}" "${SORTED_FILE}" + local result=$? + + # Deactivate virtual environment + deactivate + + if [ $result -eq 0 ]; then + echo "✓ Successfully created sorted ClickBench dataset" + return 0 + else + echo "✗ Error: Failed to create sorted dataset" + return 1 + fi +} + +# Sorted Data Benchmark Functions for bench.sh +# Add these functions to your bench.sh script + +# Runs the sorted data benchmark (sorted only) with sort order information +run_data_sorted_clickbench() { + RESULTS_FILE="${RESULTS_DIR}/data_sorted_clickbench.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running sorted data benchmark (sorted only) with sort order optimization..." + + # Ensure sorted data exists + data_sorted_clickbench + + # Run benchmark with --sorted-by parameter to inform DataFusion about the sort order + debug_run $CARGO_COMMAND --bin dfbench -- clickbench \ + --iterations 5 \ + --path "${DATA_DIR}/hits_0_sorted.parquet" \ + --queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \ + --sorted-by "EventTime" \ + --sort-order "ASC" \ + -o "${RESULTS_FILE}" \ + ${QUERY_ARG} +} + setup_venv() { python3 -m venv "$VIRTUAL_ENV" PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt diff --git a/benchmarks/queries/clickbench/queries/sorted_data/q0.sql b/benchmarks/queries/clickbench/queries/sorted_data/q0.sql new file mode 100644 index 0000000000000..1170a383bcb22 --- /dev/null +++ b/benchmarks/queries/clickbench/queries/sorted_data/q0.sql @@ -0,0 +1,3 @@ +-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591 +-- set datafusion.execution.parquet.binary_as_string = true +SELECT * FROM hits ORDER BY "EventTime" DESC limit 10; diff --git a/benchmarks/sort_clickbench.py b/benchmarks/sort_clickbench.py new file mode 100644 index 0000000000000..df1c9c5ef88a5 --- /dev/null +++ b/benchmarks/sort_clickbench.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#!/usr/bin/env python3 +""" +Sort ClickBench data by EventTime for reverse scan benchmark. +Enhanced version with configurable row group size and optimization options. +""" + +import sys +import argparse +from pathlib import Path + +try: + import pyarrow.parquet as pq + import pyarrow.compute as pc +except ImportError: + print("Error: pyarrow is not installed.") + print("Please install it with: pip install pyarrow") + sys.exit(1) + + +def sort_clickbench_data( + input_path: str, + output_path: str, + row_group_size: int = 1024 * 1024, # 1M rows default + compression: str = 'snappy', + verify: bool = True +): + """Sort parquet file by EventTime column with optimized settings.""" + + input_file = Path(input_path) + output_file = Path(output_path) + + if not input_file.exists(): + print(f"Error: Input file not found: {input_file}") + sys.exit(1) + + if output_file.exists(): + print(f"Sorted file already exists: {output_file}") + if verify: + verify_sorted_file(output_file) + return + + try: + print(f"Reading {input_file.name}...") + table = pq.read_table(str(input_file)) + + print(f"Original table has {len(table):,} rows") + print("Sorting by EventTime...") + + # Sort the table by EventTime + sorted_indices = pc.sort_indices(table, sort_keys=[("EventTime", "ascending")]) + sorted_table = pc.take(table, sorted_indices) + + print(f"Sorted table has {len(sorted_table):,} rows") + + # Verify sort + event_times = sorted_table.column('EventTime').to_pylist() + if event_times and verify: + print(f"First EventTime: {event_times[0]}") + print(f"Last EventTime: {event_times[-1]}") + # Verify ascending order + is_sorted = all(event_times[i] <= event_times[i+1] for i in range(min(1000, len(event_times)-1))) + print(f"Sort verification (first 1000 rows): {'✓ PASS' if is_sorted else '✗ FAIL'}") + + print(f"Writing sorted file to {output_file}...") + print(f" Row group size: {row_group_size:,} rows") + print(f" Compression: {compression}") + + # Write sorted table with optimized settings + pq.write_table( + sorted_table, + str(output_file), + compression=compression, + use_dictionary=True, + write_statistics=True, + # Optimize row group size for better performance + row_group_size=row_group_size, + # Set data page size (1MB is good for most cases) + data_page_size=1024 * 1024, + # Use v2 data page format for better compression + use_deprecated_int96_timestamps=False, + coerce_timestamps='us', # Use microsecond precision + # Batch size for writing + write_batch_size=min(row_group_size, 1024 * 64), + # Enable compression for all columns + compression_level=None, # Use default compression level + ) + + # Report results + input_size_mb = input_file.stat().st_size / (1024**2) + output_size_mb = output_file.stat().st_size / (1024**2) + + # Read metadata to verify row groups + parquet_file = pq.ParquetFile(str(output_file)) + num_row_groups = parquet_file.num_row_groups + + print(f"\n✓ Successfully created sorted file!") + print(f" Input: {input_size_mb:.1f} MB") + print(f" Output: {output_size_mb:.1f} MB") + print(f" Compression ratio: {input_size_mb/output_size_mb:.2f}x") + print(f"\nRow Group Statistics:") + print(f" Total row groups: {num_row_groups}") + print(f" Total rows: {len(sorted_table):,}") + + # Show row group details + for i in range(min(3, num_row_groups)): + rg_metadata = parquet_file.metadata.row_group(i) + print(f" Row group {i}: {rg_metadata.num_rows:,} rows, {rg_metadata.total_byte_size / 1024**2:.1f} MB") + + if num_row_groups > 3: + print(f" ... and {num_row_groups - 3} more row groups") + + avg_rows_per_group = len(sorted_table) / num_row_groups if num_row_groups > 0 else 0 + print(f" Average rows per group: {avg_rows_per_group:,.0f}") + + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +def verify_sorted_file(file_path: Path): + """Verify that a parquet file is sorted by EventTime.""" + try: + print(f"Verifying sorted file: {file_path}") + parquet_file = pq.ParquetFile(str(file_path)) + + num_row_groups = parquet_file.num_row_groups + file_size_mb = file_path.stat().st_size / (1024**2) + + print(f" File size: {file_size_mb:.1f} MB") + print(f" Row groups: {num_row_groups}") + + # Read first and last row group to verify sort + first_rg = parquet_file.read_row_group(0) + last_rg = parquet_file.read_row_group(num_row_groups - 1) + + first_time = first_rg.column('EventTime')[0].as_py() + last_time = last_rg.column('EventTime')[-1].as_py() + + print(f" First EventTime: {first_time}") + print(f" Last EventTime: {last_time}") + print(f" Sorted: {'✓ YES' if first_time <= last_time else '✗ NO'}") + + except Exception as e: + print(f"Error during verification: {e}") + + +def main(): + parser = argparse.ArgumentParser( + description='Sort ClickBench parquet file by EventTime', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Basic usage (1M rows per group) + %(prog)s input.parquet output.parquet + + # Custom row group size (2M rows) + %(prog)s input.parquet output.parquet --row-group-size 2097152 + + # Use zstd compression + %(prog)s input.parquet output.parquet --compression zstd + + # Verify existing file + %(prog)s --verify-only output.parquet + """ + ) + + parser.add_argument( + 'input', + nargs='?', + help='Input parquet file path' + ) + parser.add_argument( + 'output', + nargs='?', + help='Output sorted parquet file path' + ) + parser.add_argument( + '--row-group-size', + type=int, + default=64 * 1024, # 64K rows + help='Number of rows per row group (default: 65536 = 64K)' + ) + parser.add_argument( + '--compression', + choices=['snappy', 'gzip', 'brotli', 'lz4', 'zstd', 'none'], + default='zstd', + help='Compression codec (default: zstd)' + ) + parser.add_argument( + '--compression-level', + type=int, + default=3, + help='Compression level (default: 3 for zstd)' + ) + parser.add_argument( + '--no-verify', + action='store_true', + help='Skip sort verification' + ) + parser.add_argument( + '--verify-only', + action='store_true', + help='Only verify an existing sorted file (no sorting)' + ) + + args = parser.parse_args() + + if args.verify_only: + if not args.input: + parser.error("--verify-only requires input file") + verify_sorted_file(Path(args.input)) + return + + if not args.input or not args.output: + parser.error("input and output paths are required") + + sort_clickbench_data( + args.input, + args.output, + row_group_size=args.row_group_size, + compression=args.compression, + verify=not args.no_verify + ) + + +if __name__ == '__main__': + if len(sys.argv) == 1: + print("Usage: python3 sort_clickbench_enhanced.py ") + print("Run with --help for more options") + sys.exit(1) + + main() \ No newline at end of file diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index a550503390c54..bb54045dd042c 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -78,6 +78,16 @@ pub struct RunOpt { /// If present, write results json here #[structopt(parse(from_os_str), short = "o", long = "output")] output_path: Option, + + /// Column name that the data is sorted by (e.g., "EventTime") + /// If specified, DataFusion will be informed that the data has this sort order + /// using CREATE EXTERNAL TABLE with WITH ORDER clause + #[structopt(long = "sorted-by")] + sorted_by: Option, + + /// Sort order: ASC or DESC (default: ASC) + #[structopt(long = "sort-order", default_value = "ASC")] + sort_order: String, } /// Get the SQL file path @@ -125,6 +135,18 @@ impl RunOpt { // configure parquet options let mut config = self.common.config()?; + + // CRITICAL: If sorted_by is specified, force target_partitions=1 + // This ensures the file is not split into multiple partitions, we + // can get the pure performance benefit of sorted data to compare. + if self.sorted_by.is_some() { + println!("⚠️ Forcing target_partitions=1 to preserve sort order"); + println!( + "⚠️ (Because we want to get the pure performance benefit of sorted data to compare)" + ); + config = config.with_target_partitions(1); + } + { let parquet_options = &mut config.options_mut().execution.parquet; // The hits_partitioned dataset specifies string columns @@ -136,10 +158,24 @@ impl RunOpt { parquet_options.pushdown_filters = true; parquet_options.reorder_filters = true; } + + if self.sorted_by.is_some() { + // We should compare the dynamic topk optimization when data is sorted, so we make the + // assumption that filter pushdown is also enabled in this case. + parquet_options.pushdown_filters = true; + parquet_options.reorder_filters = true; + } } let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + + // Debug: print actual target_partitions being used + println!( + "📊 Session config target_partitions: {}", + ctx.state().config().target_partitions() + ); + self.register_hits(&ctx).await?; let mut benchmark_run = BenchmarkRun::new(); @@ -214,17 +250,54 @@ impl RunOpt { } /// Registers the `hits.parquet` as a table named `hits` + /// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER async fn register_hits(&self, ctx: &SessionContext) -> Result<()> { - let options = Default::default(); let path = self.path.as_os_str().to_str().unwrap(); - ctx.register_parquet("hits", path, options) - .await - .map_err(|e| { - DataFusionError::Context( - format!("Registering 'hits' as {path}"), - Box::new(e), - ) - }) + + // If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER + if let Some(ref sort_column) = self.sorted_by { + println!( + "Registering table with sort order: {} {}", + sort_column, self.sort_order + ); + + // Escape column name with double quotes + let escaped_column = if sort_column.contains('"') { + sort_column.clone() + } else { + format!("\"{sort_column}\"") + }; + + // Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause + // Schema will be automatically inferred from the Parquet file + let create_table_sql = format!( + "CREATE EXTERNAL TABLE hits \ + STORED AS PARQUET \ + LOCATION '{}' \ + WITH ORDER ({} {})", + path, + escaped_column, + self.sort_order.to_uppercase() + ); + + println!("Executing: {create_table_sql}"); + + // Execute the CREATE EXTERNAL TABLE statement + ctx.sql(&create_table_sql).await?.collect().await?; + + Ok(()) + } else { + // Original registration without sort order + let options = Default::default(); + ctx.register_parquet("hits", path, options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'hits' as {path}"), + Box::new(e), + ) + }) + } } fn iterations(&self) -> usize {