Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)

# Sorted Data Benchmarks (ORDER BY Optimization)
data_sorted_clickbench: ClickBench queries on pre-sorted data WITH sort order info (tests sort elimination optimization)

# H2O.ai Benchmarks (Group By, Join, Window)
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
Expand Down Expand Up @@ -322,6 +325,9 @@ main() {
compile_profile)
data_tpch "1"
;;
data_sorted_clickbench)
data_sorted_clickbench
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -505,6 +511,9 @@ main() {
compile_profile)
run_compile_profile "${PROFILE_ARGS[@]}"
;;
data_sorted_clickbench)
run_data_sorted_clickbench
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
Expand Down Expand Up @@ -1197,6 +1206,86 @@ compare_benchmarks() {

}

# Sorted Data Benchmark Functions (Optimized for hits_0.parquet)
# Add these functions to bench.sh

# Creates sorted ClickBench data from hits_0.parquet (partitioned dataset)
# The data is sorted by EventTime in ascending order
# Using hits_0.parquet (~150MB) instead of full hits.parquet (~14GB) for faster testing
data_sorted_clickbench() {
SORTED_FILE="${DATA_DIR}/hits_0_sorted.parquet"
ORIGINAL_FILE="${DATA_DIR}/hits_partitioned/hits_0.parquet"

echo "Creating sorted ClickBench dataset from hits_0.parquet..."

# Check if partitioned data exists
if [ ! -f "${ORIGINAL_FILE}" ]; then
echo "hits_partitioned/hits_0.parquet not found. Running data_clickbench_partitioned first..."
data_clickbench_partitioned
fi

# Check if sorted file already exists
if [ -f "${SORTED_FILE}" ]; then
echo "Sorted hits_0.parquet already exists at ${SORTED_FILE}"
return 0
fi

echo "Sorting hits_0.parquet by EventTime (this takes ~10 seconds)..."

# Ensure virtual environment exists and has pyarrow
if [ ! -d "$VIRTUAL_ENV" ]; then
echo "Creating virtual environment at $VIRTUAL_ENV..."
python3 -m venv "$VIRTUAL_ENV"
fi

# Activate virtual environment
source "$VIRTUAL_ENV/bin/activate"

# Check and install pyarrow if needed
if ! python3 -c "import pyarrow" 2>/dev/null; then
echo "Installing pyarrow (this may take a minute)..."
pip install --quiet pyarrow
fi

# Use the standalone Python script to sort
python3 "${SCRIPT_DIR}"/sort_clickbench.py "${ORIGINAL_FILE}" "${SORTED_FILE}"
local result=$?

# Deactivate virtual environment
deactivate

if [ $result -eq 0 ]; then
echo "✓ Successfully created sorted ClickBench dataset"
return 0
else
echo "✗ Error: Failed to create sorted dataset"
return 1
fi
}

# Sorted Data Benchmark Functions for bench.sh
# Add these functions to your bench.sh script

# Runs the sorted data benchmark (sorted only) with sort order information
run_data_sorted_clickbench() {
RESULTS_FILE="${RESULTS_DIR}/data_sorted_clickbench.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sorted data benchmark (sorted only) with sort order optimization..."

# Ensure sorted data exists
data_sorted_clickbench

# Run benchmark with --sorted-by parameter to inform DataFusion about the sort order
debug_run $CARGO_COMMAND --bin dfbench -- clickbench \
--iterations 5 \
--path "${DATA_DIR}/hits_0_sorted.parquet" \
--queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \
--sorted-by "EventTime" \
--sort-order "ASC" \
-o "${RESULTS_FILE}" \
${QUERY_ARG}
}

setup_venv() {
python3 -m venv "$VIRTUAL_ENV"
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/queries/clickbench/queries/sorted_data/q0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
-- set datafusion.execution.parquet.binary_as_string = true
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;
253 changes: 253 additions & 0 deletions benchmarks/sort_clickbench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
#!/usr/bin/env python3

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#!/usr/bin/env python3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Duplicate shebang line in Python script

There are two shebang lines in the script - one at line 1 (#!/usr/bin/env python3) and another at line 20. The second shebang is likely a copy-paste artifact from merging different versions of the script. While it doesn't cause a runtime error (it's treated as a comment after line 1), it indicates accidentally committed code.

Fix in Cursor Fix in Web

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The Bugbot AI reviewer is correct! The shebang line should be the very first one in a script. The second line should be removed since it is interpreted as a normal comment.

"""
Sort ClickBench data by EventTime for reverse scan benchmark.
Enhanced version with configurable row group size and optimization options.
"""
Comment on lines +1 to +24
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove duplicate shebang line.

There are two shebang lines (lines 1 and 20). The second one at line 20 is invalid as shebangs must be at the beginning of the file.

 #!/usr/bin/env python3
 
 # Licensed to the Apache Software Foundation (ASF) under one
 ...
 # under the License.
 
-#!/usr/bin/env python3
 """
 Sort ClickBench data by EventTime for reverse scan benchmark.

Based on static analysis hints.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#!/usr/bin/env python3
"""
Sort ClickBench data by EventTime for reverse scan benchmark.
Enhanced version with configurable row group size and optimization options.
"""
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Sort ClickBench data by EventTime for reverse scan benchmark.
Enhanced version with configurable row group size and optimization options.
"""
🧰 Tools
🪛 Ruff (0.14.7)

1-1: Shebang is present but file is not executable

(EXE001)


20-20: Shebang is present but file is not executable

(EXE001)


20-20: Shebang should be at the beginning of the file

(EXE005)

🤖 Prompt for AI Agents
In benchmarks/sort_clickbench.py around lines 1 to 24, there is a duplicate
shebang: the file already starts with "#!/usr/bin/env python3" on line 1 but has
another shebang at line 20; remove the second shebang (line 20) so only the
first line contains the interpreter directive and leave the rest of the file
(docstring and comments) intact.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct! The shebang line should be the very first one in a script. The second line should be removed since it is interpreted as a normal comment.


import sys
import argparse
from pathlib import Path

try:
import pyarrow.parquet as pq
import pyarrow.compute as pc
except ImportError:
print("Error: pyarrow is not installed.")
print("Please install it with: pip install pyarrow")
sys.exit(1)


def sort_clickbench_data(
input_path: str,
output_path: str,
row_group_size: int = 1024 * 1024, # 1M rows default
compression: str = 'snappy',
verify: bool = True
):
"""Sort parquet file by EventTime column with optimized settings."""

input_file = Path(input_path)
output_file = Path(output_path)

if not input_file.exists():
print(f"Error: Input file not found: {input_file}")
sys.exit(1)

if output_file.exists():
print(f"Sorted file already exists: {output_file}")
if verify:
verify_sorted_file(output_file)
return

try:
print(f"Reading {input_file.name}...")
table = pq.read_table(str(input_file))

print(f"Original table has {len(table):,} rows")
print("Sorting by EventTime...")

# Sort the table by EventTime
sorted_indices = pc.sort_indices(table, sort_keys=[("EventTime", "ascending")])
sorted_table = pc.take(table, sorted_indices)

print(f"Sorted table has {len(sorted_table):,} rows")

# Verify sort
event_times = sorted_table.column('EventTime').to_pylist()
if event_times and verify:
print(f"First EventTime: {event_times[0]}")
print(f"Last EventTime: {event_times[-1]}")
# Verify ascending order
is_sorted = all(event_times[i] <= event_times[i+1] for i in range(min(1000, len(event_times)-1)))
print(f"Sort verification (first 1000 rows): {'✓ PASS' if is_sorted else '✗ FAIL'}")

print(f"Writing sorted file to {output_file}...")
print(f" Row group size: {row_group_size:,} rows")
print(f" Compression: {compression}")

# Write sorted table with optimized settings
pq.write_table(
sorted_table,
str(output_file),
compression=compression,
use_dictionary=True,
write_statistics=True,
# Optimize row group size for better performance
row_group_size=row_group_size,
# Set data page size (1MB is good for most cases)
data_page_size=1024 * 1024,
# Use v2 data page format for better compression
use_deprecated_int96_timestamps=False,
coerce_timestamps='us', # Use microsecond precision
# Batch size for writing
write_batch_size=min(row_group_size, 1024 * 64),
# Enable compression for all columns
compression_level=None, # Use default compression level
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The --compression-level CLI option is parsed but not applied when writing the Parquet file (the writer sets compression_level=None), so the flag currently has no effect. Consider wiring this option through to the writer to avoid confusing behavior.

🤖 Was this useful? React with 👍 or 👎

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The Augment AI reviewer is correct! The compression level option is ignored and never used. Prevents confusion in the user why the files are not compressed with the requested level

)
Comment on lines +104 to +105
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

--compression-level CLI argument is parsed but not used.

The compression_level argument is defined in argparse (line 211-215) but never passed to pq.write_table(). Line 104 hardcodes compression_level=None.

Either remove the unused argument or wire it through:

 def sort_clickbench_data(
         input_path: str,
         output_path: str,
         row_group_size: int = 64 * 1024,
         compression: str = 'zstd',
+        compression_level: int = None,
         verify: bool = True
 ):

And in pq.write_table():

-            compression_level=None,  # Use default compression level
+            compression_level=compression_level,

And in main():

     sort_clickbench_data(
         args.input,
         args.output,
         row_group_size=args.row_group_size,
         compression=args.compression,
+        compression_level=args.compression_level,
         verify=not args.no_verify
     )

Also applies to: 210-215

🤖 Prompt for AI Agents
In benchmarks/sort_clickbench.py around lines 104 and 210-215, the CLI option
--compression-level is parsed but never used: pq.write_table() currently
hardcodes compression_level=None. Wire the parsed value through by replacing the
hardcoded None with the parsed args.compression_level (or equivalent variable)
when calling pq.write_table(), and ensure the argparse block (lines ~210-215)
either remains or is removed if you choose not to support compression; if
keeping it, validate/convert the parsed value as needed before passing it.
Ensure the function call propagates the selected compression level so the CLI
option takes effect.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct! The compression level option is ignored and never used. Prevents confusion in the user why the files are not compressed with the requested level


# Report results
input_size_mb = input_file.stat().st_size / (1024**2)
output_size_mb = output_file.stat().st_size / (1024**2)

# Read metadata to verify row groups
parquet_file = pq.ParquetFile(str(output_file))
num_row_groups = parquet_file.num_row_groups

print(f"\n✓ Successfully created sorted file!")
print(f" Input: {input_size_mb:.1f} MB")
print(f" Output: {output_size_mb:.1f} MB")
print(f" Compression ratio: {input_size_mb/output_size_mb:.2f}x")
print(f"\nRow Group Statistics:")
print(f" Total row groups: {num_row_groups}")
print(f" Total rows: {len(sorted_table):,}")

# Show row group details
for i in range(min(3, num_row_groups)):
rg_metadata = parquet_file.metadata.row_group(i)
print(f" Row group {i}: {rg_metadata.num_rows:,} rows, {rg_metadata.total_byte_size / 1024**2:.1f} MB")

if num_row_groups > 3:
print(f" ... and {num_row_groups - 3} more row groups")

avg_rows_per_group = len(sorted_table) / num_row_groups if num_row_groups > 0 else 0
print(f" Average rows per group: {avg_rows_per_group:,.0f}")

except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)


def verify_sorted_file(file_path: Path):
"""Verify that a parquet file is sorted by EventTime."""
try:
print(f"Verifying sorted file: {file_path}")
parquet_file = pq.ParquetFile(str(file_path))

num_row_groups = parquet_file.num_row_groups
file_size_mb = file_path.stat().st_size / (1024**2)

print(f" File size: {file_size_mb:.1f} MB")
print(f" Row groups: {num_row_groups}")

# Read first and last row group to verify sort
first_rg = parquet_file.read_row_group(0)
last_rg = parquet_file.read_row_group(num_row_groups - 1)

first_time = first_rg.column('EventTime')[0].as_py()
last_time = last_rg.column('EventTime')[-1].as_py()

print(f" First EventTime: {first_time}")
print(f" Last EventTime: {last_time}")
print(f" Sorted: {'✓ YES' if first_time <= last_time else '✗ NO'}")

except Exception as e:
print(f"Error during verification: {e}")


def main():
parser = argparse.ArgumentParser(
description='Sort ClickBench parquet file by EventTime',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic usage (1M rows per group)
%(prog)s input.parquet output.parquet

# Custom row group size (2M rows)
%(prog)s input.parquet output.parquet --row-group-size 2097152

# Use zstd compression
%(prog)s input.parquet output.parquet --compression zstd

# Verify existing file
%(prog)s --verify-only output.parquet
"""
)

parser.add_argument(
'input',
nargs='?',
help='Input parquet file path'
)
parser.add_argument(
'output',
nargs='?',
help='Output sorted parquet file path'
)
parser.add_argument(
'--row-group-size',
type=int,
default=64 * 1024, # 64K rows
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The examples and inline comment indicate a 1M row-group default, but the parser default here is 64K, which is inconsistent and could confuse users. Consider aligning the help text and default value to match the intended default.

🤖 Was this useful? React with 👍 or 👎

Copy link
Owner Author

@martin-augment martin-augment Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct! The usage example tells that the default value for row_group_size is not the default value of the command line interface option. They should be aligned.

help='Number of rows per row group (default: 65536 = 64K)'
)
parser.add_argument(
'--compression',
choices=['snappy', 'gzip', 'brotli', 'lz4', 'zstd', 'none'],
default='zstd',
help='Compression codec (default: zstd)'
)
parser.add_argument(
'--compression-level',
type=int,
default=3,
help='Compression level (default: 3 for zstd)'
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Command-line compression level argument is never used

The --compression-level command-line argument is parsed via argparse but never passed to sort_clickbench_data(). The call at lines 238-244 omits args.compression_level, and the function hardcodes compression_level=None when calling pq.write_table(). Users setting this flag would see no effect on compression behavior.

Fix in Cursor Fix in Web

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The Bugbot AI reviewer is correct! The compression level option is ignored and never used. Prevents confusion in the user why the files are not compressed with the requested level

parser.add_argument(
'--no-verify',
action='store_true',
help='Skip sort verification'
)
parser.add_argument(
'--verify-only',
action='store_true',
help='Only verify an existing sorted file (no sorting)'
)

args = parser.parse_args()

if args.verify_only:
if not args.input:
parser.error("--verify-only requires input file")
verify_sorted_file(Path(args.input))
return

if not args.input or not args.output:
parser.error("input and output paths are required")

sort_clickbench_data(
args.input,
args.output,
row_group_size=args.row_group_size,
compression=args.compression,
verify=not args.no_verify
)


if __name__ == '__main__':
if len(sys.argv) == 1:
print("Usage: python3 sort_clickbench_enhanced.py <input_file> <output_file>")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage message references sort_clickbench_enhanced.py, but the file name is sort_clickbench.py. This mismatch is user-facing and may cause confusion when following the printed instructions.

🤖 Was this useful? React with 👍 or 👎

Copy link
Owner Author

@martin-augment martin-augment Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:typo; feedback:The Augment AI reviewer is correct! The name of the script is sort_clickbench.py and the usage needs to be updated. Prevents confusion in the user trying to execute this script.

print("Run with --help for more options")
sys.exit(1)

main()
Loading