diff --git a/.github/workflows/velox-benchmark.yml b/.github/workflows/velox-benchmark.yml index 398c9b8c..c0015a37 100644 --- a/.github/workflows/velox-benchmark.yml +++ b/.github/workflows/velox-benchmark.yml @@ -95,10 +95,10 @@ jobs: source .venv/bin/activate pip install -q -r benchmark_data_tools/requirements.txt python benchmark_data_tools/generate_data_files.py \ - --benchmark-type tpch \ - --data-dir-path "${TPCH_DATA_DIR}" \ - --scale-factor 0.01 \ - --convert-decimals-to-floats + tpch \ + --output "${TPCH_DATA_DIR}" \ + --scale 0.01 \ + --use-float-type - name: Run TPC-H GPU benchmarks env: diff --git a/benchmark_data_tools/generate_data_files.py b/benchmark_data_tools/generate_data_files.py index 5fb3eade..511cb64f 100644 --- a/benchmark_data_tools/generate_data_files.py +++ b/benchmark_data_tools/generate_data_files.py @@ -1,5 +1,26 @@ +#!/usr/bin/env python3 # SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 +""" +Generate TPC-H datasets with specific partition configurations. + +Configuration for partition counts based on scale factor. +These partition counts give us roughly 100,000,000 rows per file. + +The total number of rows is determined by the scale factor and the table multipliers. +Table sizes (from Fig. 2 in https://www.tpc.org/TPC_Documents_Current_Versions/pdf/TPC-H_v3.0.1.pdf): +Table Multiplier +part 200000 +partsupp 800000 +supplier 10000 +customer 150000 +lineitem 6000000 +orders 1500000 + +With the remaining tables (nation, region) being constant. +The number of partitions equals: + max(1, ceil(SF * multiplier / 100_000_000)) +""" import argparse import json @@ -7,149 +28,297 @@ import os import shutil import subprocess -from concurrent.futures import ThreadPoolExecutor +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import duckdb from duckdb_utils import init_benchmark_tables, is_decimal_column -from rewrite_parquet import process_dir +# Table multipliers (rows per scale factor) +TABLE_MULTIPLIERS = { + "customer": 150000, + "lineitem": 6000000, + "nation": 0, # constant size, always 1 partition + "orders": 1500000, + "part": 200000, + "partsupp": 800000, + "region": 0, # constant size, always 1 partition + "supplier": 10000, +} + +# Target rows per partition file +TARGET_ROWS_PER_FILE = 100_000_000 + +# Per-table parquet row group byte defaults +# These give approximately 1,000,000 rows per row group (maximum). +PARQUET_ROW_GROUP_BYTES_DEFAULTS = { + "customer": 165000000, + "lineitem": 68700000, + "nation": 5000, + "orders": 99000000, + "part": 69000000, + "partsupp": 147000000, + "region": 5000, + "supplier": 154000000, +} + +# Default: disable compression for certain columns to match cudf-polars defaults at sf3k +UNCOMPRESSED_COLUMN_OVERRIDES = ( + "c_mktsegment,c_nationkey,l_commitdate,l_discount,l_quantity,l_receiptdate," + "l_shipdate,l_shipinstruct,l_shipmode,l_tax,n_nationkey,n_regionkey,o_orderdate," + "o_orderpriority,o_shippriority,p_brand,p_container,p_mfgr,p_size,p_type," + "r_regionkey,s_nationkey" +) + +DELTA_BINARY_PACKED = "DELTA_BINARY_PACKED" +DELTA_LENGTH_BYTE_ARRAY = "DELTA_LENGTH_BYTE_ARRAY" +PLAIN = "PLAIN" +RLE_DICTIONARY = "RLE_DICTIONARY" +DEFAULT_COLUMN_ENCODINGS = { + "l_comment": DELTA_LENGTH_BYTE_ARRAY, + "ps_comment": DELTA_LENGTH_BYTE_ARRAY, + "l_extendedprice": DELTA_BINARY_PACKED, + "l_partkey": DELTA_BINARY_PACKED, + "o_comment": DELTA_LENGTH_BYTE_ARRAY, + "l_orderkey": DELTA_BINARY_PACKED, + "o_orderkey": DELTA_BINARY_PACKED, + "o_totalprice": DELTA_BINARY_PACKED, + "o_custkey": DELTA_BINARY_PACKED, + "ps_supplycost": PLAIN, + "c_comment": DELTA_LENGTH_BYTE_ARRAY, + "ps_partkey": DELTA_BINARY_PACKED, + "l_suppkey": DELTA_BINARY_PACKED, + "p_name": DELTA_LENGTH_BYTE_ARRAY, + "p_partkey": DELTA_BINARY_PACKED, + "c_custkey": DELTA_BINARY_PACKED, + "c_address": DELTA_LENGTH_BYTE_ARRAY, + "c_acctbal": DELTA_BINARY_PACKED, + "ps_availqty": PLAIN, + "ps_suppkey": DELTA_BINARY_PACKED, + "p_comment": DELTA_LENGTH_BYTE_ARRAY, + "l_receiptdate": PLAIN, + "l_shipdate": PLAIN, + "l_commitdate": PLAIN, + "c_name": DELTA_LENGTH_BYTE_ARRAY, + "c_phone": DELTA_LENGTH_BYTE_ARRAY, + "l_linestatus": PLAIN, + "o_orderdate": PLAIN, + "s_suppkey": DELTA_BINARY_PACKED, + "l_returnflag": PLAIN, + "o_clerk": PLAIN, + "s_address": DELTA_LENGTH_BYTE_ARRAY, + "s_acctbal": DELTA_BINARY_PACKED, + "s_comment": DELTA_LENGTH_BYTE_ARRAY, + "s_name": DELTA_LENGTH_BYTE_ARRAY, + "s_phone": DELTA_LENGTH_BYTE_ARRAY, + "l_quantity": PLAIN, + "l_shipinstruct": PLAIN, + "l_shipmode": PLAIN, + "l_discount": PLAIN, + "l_tax": PLAIN, + "o_orderstatus": PLAIN, + "o_orderpriority": PLAIN, + "o_shippriority": PLAIN, + "c_nationkey": PLAIN, + "c_mktsegment": PLAIN, + "p_size": PLAIN, + "n_nationkey": DELTA_BINARY_PACKED, + "n_comment": DELTA_LENGTH_BYTE_ARRAY, + "p_container": PLAIN, + "n_name": DELTA_LENGTH_BYTE_ARRAY, + "r_regionkey": DELTA_BINARY_PACKED, + "r_comment": DELTA_LENGTH_BYTE_ARRAY, + "r_name": DELTA_LENGTH_BYTE_ARRAY, + "n_regionkey": PLAIN, + "p_mfgr": PLAIN, + "s_nationkey": PLAIN, + "p_brand": PLAIN, + "p_type": PLAIN, + "p_retailprice": PLAIN, + "l_linenumber": PLAIN, +} + +DEFAULT_DISABLE_DICTIONARY_ENCODING_COLUMNS = [ + "l_comment", + "ps_comment", + "l_extendedprice", + "l_partkey", + "o_comment", + "l_orderkey", + "o_orderkey", + "o_totalprice", + "c_comment", + "ps_partkey", + "l_suppkey", + "p_name", + "p_partkey", + "c_custkey", + "c_address", + "c_acctbal", + "p_comment", + "c_name", + "c_phone", + "s_suppkey", + "s_address", + "s_acctbal", + "s_comment", + "s_name", + "s_phone", + "n_nationkey", + "n_comment", + "n_name", + "r_regionkey", + "r_comment", + "r_name", +] + + +def calculate_partitions(scale: int, multiplier: int) -> int: + """ + Calculate partition count for a table at a given scale. + Uses ceiling division: ceil(a/b) = (a + b - 1) / b + """ + if multiplier == 0: + # Constant-size tables (nation, region) always get 1 partition + return 1 + + total_rows = scale * multiplier + partitions = math.ceil(total_rows / TARGET_ROWS_PER_FILE) + + # Ensure at least 1 partition + return max(1, partitions) + + +def generate_partition( + table: str, + num_parts: int, + part: int, + scale: int, + format: str, + output_base: Path, + temp_root: Path, + parquet_row_group_bytes_override: int | None, + use_upstream_compression: bool, + use_upstream_encoding: bool, + parquet_version: str, + decimal_column_type: str, + date_column_type: str, + nationkey_type: str, + regionkey_type: str, + use_upstream_disable_dictionary_encoding: bool, + no_delta_length_byte_array: bool, +) -> tuple[str, int, float]: + """ + Generate a single partition. + + Returns: + Tuple of (table, part, elapsed_seconds) + """ + start_time = time.time() + + # Create a temporary directory within the output directory + temp_dir = temp_root / f"{table}-part-{part}-{os.getpid()}" + temp_dir.mkdir(parents=True, exist_ok=True) + + # Determine parquet row group bytes: use override if set, otherwise per-table default + if parquet_row_group_bytes_override is not None: + row_group_bytes = parquet_row_group_bytes_override + else: + row_group_bytes = PARQUET_ROW_GROUP_BYTES_DEFAULTS[table] + + print(f" Generating partition {part} of {num_parts} for {table}...") -def generate_partition(table, partition, raw_data_path, scale_factor, num_partitions, verbose, approx_row_group_bytes): - if verbose: - print(f"Generating '{table}' partition: {partition}") - Path(f"{raw_data_path}/part-{partition}").mkdir(parents=True, exist_ok=True) - command = [ + # Build the command with optional flags + cmd = [ "tpchgen-cli", - "-T", - table, "-s", - str(scale_factor), + str(scale), + "--tables", + table, + f"--format={format}", "--output-dir", - str(f"{raw_data_path}/part-{partition}"), + str(temp_dir), "--parts", - str(num_partitions), + str(num_parts), "--part", - str(partition), - "--format", - "parquet", + str(part), "--parquet-row-group-bytes", - str(approx_row_group_bytes), + str(row_group_bytes), + "--num-threads", + "1", ] - try: - subprocess.run(command, check=True) - except subprocess.CalledProcessError as e: - print(f"Error generating TPC-H data: {e}") - -def generate_data_files(args): - if os.path.exists(args.data_dir_path): - shutil.rmtree(args.data_dir_path) - Path(f"{args.data_dir_path}").mkdir(parents=True, exist_ok=True) + # Add uncompressed column overrides unless using upstream compression + if not use_upstream_compression and UNCOMPRESSED_COLUMN_OVERRIDES: + cmd.append(f"--uncompressed-column-overrides={UNCOMPRESSED_COLUMN_OVERRIDES}") + + # Add column encoding overrides unless using upstream encoding + # Use DELTA_LENGTH_BYTE_ARRAY for string columns (better compression than default RLE_DICTIONARY) + if not use_upstream_encoding: + decimal_columns_with_delta = {"c_acctbal", "l_extendedprice", "o_totalprice", "s_acctbal"} + + for col, encoding in DEFAULT_COLUMN_ENCODINGS.items(): + if decimal_column_type == "f64" and col in decimal_columns_with_delta and encoding == DELTA_BINARY_PACKED: + encoding = PLAIN + elif encoding == DELTA_LENGTH_BYTE_ARRAY and no_delta_length_byte_array: + encoding = PLAIN + + cmd.append(f"--column-encoding={col}={encoding}") + + # Add disable dictionary encoding columns if specified + if not use_upstream_disable_dictionary_encoding: + cmd.append(f"--disable-dictionary-encoding={','.join(DEFAULT_DISABLE_DICTIONARY_ENCODING_COLUMNS)}") + + # Add column type flags + cmd.extend( + [ + "--decimal-column-type", + decimal_column_type, + "--date-column-type", + date_column_type, + "--nationkey-type", + nationkey_type, + "--regionkey-type", + regionkey_type, + ] + ) - # tpchgen is much faster, but is exclusive to generating tpch data. Use duckdb as a fallback. - if args.benchmark_type == "tpch" and not args.use_duckdb: - if args.verbose: - print("generating with tpchgen") - generate_data_files_with_tpchgen(args) - else: - if args.verbose: - print("generating with duckdb") - generate_data_files_with_duckdb(args) + # Add parquet version + cmd.extend(["--parquet-version", parquet_version]) + subprocess.run(cmd, check=True) -def generate_data_files_with_tpchgen(args): - tables_sf_ratio = get_table_sf_ratios(args.scale_factor, args.max_rows_per_file) + # Move the generated file to the final location with the desired name + table_dir = output_base / table + table_dir.mkdir(parents=True, exist_ok=True) - if args.convert_decimals_to_floats: - raw_data_path = args.data_dir_path + "-temp" - if os.path.exists(raw_data_path): - shutil.rmtree(raw_data_path) - else: - raw_data_path = args.data_dir_path - - max_partitions = 1 - with ThreadPoolExecutor(args.num_threads) as executor: - futures = [] - - for table, num_partitions in tables_sf_ratio.items(): - if args.verbose: - print(f"Generating TPC-H data for table '{table}' with {num_partitions} partitions") - for partition in range(1, num_partitions + 1): - futures.append( - executor.submit( - generate_partition, - table, - partition, - raw_data_path, - args.scale_factor, - num_partitions, - args.verbose, - args.approx_row_group_bytes, - ) - ) - max_partitions = num_partitions if num_partitions > max_partitions else max_partitions + # The file will be named table/table.1.format in the temp directory + src_file = temp_dir / table / f"{table}.{part}.{format}" + dst_file = table_dir / f"part.{part - 1}.{format}" - for future in futures: - future.result() + shutil.move(str(src_file), str(dst_file)) + shutil.rmtree(temp_dir) - rearrange_directory(raw_data_path, max_partitions) + elapsed = time.time() - start_time + print(f" Finished partition {part - 1} of {num_parts} for {table} in {elapsed:.1f}s") - if args.verbose: - print(f"Raw data created at: {raw_data_path}") + return (table, part, elapsed) - if args.convert_decimals_to_floats: - process_dir(raw_data_path, args.data_dir_path, args.num_threads, args.verbose, args.convert_decimals_to_floats) - if not args.keep_original_dataset: - shutil.rmtree(raw_data_path) - write_metadata(args) +def build_job_list(partition_config: dict[str, int]) -> list[tuple[str, int, int]]: + """ + Build a flat list of all (table, num_parts, part) jobs. - -# This dictionary maps each table to the number of partitions it should have based on it's -# expected file size relative to the SF. -# We generate a small sample benchmark (sf-0.01) to sample the ratio of how many rows are generated. -def get_table_sf_ratios(scale_factor, max_rows): - int_scale_factor = int(scale_factor) - int_scale_factor = 1 if int_scale_factor < 1 else int_scale_factor - tables_sf_ratio = {} - init_benchmark_tables("tpch", 0.01) - tables = duckdb.sql("SHOW TABLES").fetchall() - for table in tables: - stripped_table = table[0].strip("'") - num_rows = duckdb.sql(f"SELECT COUNT (*) FROM {stripped_table}").fetchall() - tables_sf_ratio[stripped_table] = math.ceil(int_scale_factor / (max_rows / (int(num_rows[0][0]) * 100))) - return tables_sf_ratio - - -def rearrange_directory(raw_data_path, num_partitions): - # When we generate partitioned data it will have the form ///.parquet. - # We want to re-arrange it to have the form //-.parquet - tables = os.listdir(f"{raw_data_path}/part-1") - - for table in tables: - Path(f"{raw_data_path}/{table}").mkdir(parents=True, exist_ok=True) - - # Move the partitioned data into the new directory structure. - for partition in range(1, num_partitions + 1): - for table in tables: - part_file_path = f"{raw_data_path}/part-{partition}/{table}/{table}.{partition}.parquet" - if os.path.exists(part_file_path): - shutil.move(part_file_path, f"{raw_data_path}/{table}/{table}-{partition}.parquet") - part_dir_path = f"{raw_data_path}/part-{partition}" - for dir_name in os.listdir(part_dir_path): - os.rmdir(f"{part_dir_path}/{dir_name}") - os.rmdir(part_dir_path) - - -def write_metadata(args): - with open(f"{args.data_dir_path}/metadata.json", "w") as file: - metadata = { - "scale_factor": args.scale_factor, - "approx_row_group_bytes": args.approx_row_group_bytes, - } - json.dump(metadata, file, indent=2) - file.write("\n") + Returns: + List of (table, num_parts, part) tuples + """ + jobs = [] + for table, num_parts in partition_config.items(): + for part in range(1, num_parts + 1): + jobs.append((table, num_parts, part)) + return jobs def generate_data_files_with_duckdb(args): @@ -191,20 +360,18 @@ def get_column_projection(column_metadata, convert_decimals_to_floats): return projection -if __name__ == "__main__": +def parse_args(args: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Generate benchmark parquet data files for a given scale factor. " "Only the TPC-H and TPC-DS benchmarks are currently supported." ) - parser.add_argument( - "-b", - "--benchmark-type", - type=str, - required=True, - choices=["tpch", "tpcds"], - help="The type of benchmark to generate data for.", + + subparsers = parser.add_subparsers( + dest="benchmark_type", help="Generate benchmark data files for a given benchmark type" ) - parser.add_argument( + + tpcds_parser = subparsers.add_parser("tpcds", help="Generate TPC-DS data files") + tpcds_parser.add_argument( "-d", "--data-dir-path", type=str, @@ -212,10 +379,10 @@ def get_column_projection(column_metadata, convert_decimals_to_floats): help="The path to the directory that will contain the benchmark data files. " "This directory will be created if it does not already exist.", ) - parser.add_argument( + tpcds_parser.add_argument( "-s", "--scale-factor", type=float, required=True, help="The scale factor of the generated dataset." ) - parser.add_argument( + tpcds_parser.add_argument( "-c", "--convert-decimals-to-floats", action="store_true", @@ -223,28 +390,17 @@ def get_column_projection(column_metadata, convert_decimals_to_floats): default=False, help="Convert all decimal columns to float column type.", ) - parser.add_argument( - "--use-duckdb", action="store_true", required=False, default=False, help="Use duckdb instead of tpchgen" - ) - parser.add_argument( - "-j", - "--num-threads", - type=int, - required=False, - default=4, - help="Number of threads to generate data with tpchgen", - ) - parser.add_argument( + tpcds_parser.add_argument( "-v", "--verbose", action="store_true", required=False, default=False, help="Extra verbose logging" ) - parser.add_argument( + tpcds_parser.add_argument( "--max-rows-per-file", type=int, required=False, default=100_000_000, help="Limit number of rows in each file (creates more partitions)", ) - parser.add_argument( + tpcds_parser.add_argument( "-k", "--keep-original-dataset", action="store_true", @@ -252,13 +408,241 @@ def get_column_projection(column_metadata, convert_decimals_to_floats): default=False, help="Keep the original dataset that was generated before transformations", ) - parser.add_argument( + tpcds_parser.add_argument( "--approx-row-group-bytes", type=int, required=False, default=128 * 1024 * 1024, help="Approximate row group size in bytes. 128MB by default.", ) - args = parser.parse_args() - generate_data_files(args) + # subcommand for tpch + + tpch_parser = subparsers.add_parser("tpch", help="Generate TPC-H data files") + + tpch_parser.add_argument( + "-s", + "--scale", + type=int, + default=1000, + help="Scale factor (any positive integer; default: 1000)", + ) + tpch_parser.add_argument( + "-f", + "--format", + choices=["parquet", "tbl"], + default="parquet", + help="Output format: parquet or tbl (default: parquet)", + ) + tpch_parser.add_argument( + "-o", + "--output", + type=Path, + default=Path("tpch-data"), + help="Base output directory (default: tpch-data)", + ) + tpch_parser.add_argument( + "-j", + "--jobs", + type=int, + default=None, + help="Number of parallel jobs (default: number of CPU threads)", + ) + tpch_parser.add_argument( + "--parquet-row-group-bytes", + type=int, + default=None, + help="Override parquet row group size in bytes for all tables", + ) + tpch_parser.add_argument( + "--use-upstream-compression", + action="store_true", + help="Use upstream default compression (compress all columns)", + ) + tpch_parser.add_argument( + "--use-upstream-encoding", + action="store_true", + help="Use upstream default encoding (PLAIN/RLE_DICTIONARY for strings)", + ) + tpch_parser.add_argument( + "--use-upstream-parquet-version", + action="store_true", + help="Use upstream Parquet version (v1 instead of v2)", + ) + tpch_parser.add_argument( + "--use-float-type", + action="store_true", + help="Use f64 for decimal columns (instead of decimal128)", + ) + tpch_parser.add_argument( + "--use-timestamp-type", + action="store_true", + help="Use timestamp_ms for date columns (instead of date32)", + ) + tpch_parser.add_argument( + "--use-large-ids", + action="store_true", + help="Use i64 for nationkey/regionkey columns (instead of i32)", + ) + tpch_parser.add_argument( + "--use-upstream-disable-dictionary-encoding", + action="store_true", + help="Use upstream default disable dictionary encoding (disable dictionary encoding for all columns)", + ) + tpch_parser.add_argument( + "--no-delta-length-byte-array", + action="store_true", + help="Use PLAIN encoding (instead of DELTA_LENGTH_BYTE_ARRAY). Some engines don't support DELTA_LENGTH_BYTE_ARRAY.", + ) + + return parser.parse_args(args) + + +def generate_data_files_with_tpchgen(args): + # Determine column types based on flags + decimal_column_type = "f64" if args.use_float_type else "decimal128" + date_column_type = "timestamp_ms" if args.use_timestamp_type else "date32" + nationkey_type = "i64" if args.use_large_ids else "i32" + regionkey_type = "i64" if args.use_large_ids else "i32" + parquet_version = "v1" if args.use_upstream_parquet_version else "v2" + + # Calculate partition counts dynamically for each table + partition_config = {} + for table, multiplier in TABLE_MULTIPLIERS.items(): + partition_config[table] = calculate_partitions(args.scale, multiplier) + + # Display the calculated partition configuration + print(f"Scale factor: {args.scale}") + print("Partition configuration:") + for table in [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier", + ]: + print(f" {table}: {partition_config[table]} partition(s)") + + # Create the base output directory and temporary directory + output_base = args.output + output_base.mkdir(parents=True, exist_ok=True) + temp_root = output_base / ".tmp" + temp_root.mkdir(parents=True, exist_ok=True) + + # Build job list + jobs = build_job_list(partition_config) + total_jobs = len(jobs) + print(f"Generating {total_jobs} total partitions across all tables using {args.jobs} threads...") + + try: + # Generate all partitions in parallel across all tables + with ThreadPoolExecutor(max_workers=args.jobs) as executor: + futures = [] + for table, num_parts, part in jobs: + future = executor.submit( + generate_partition, + table=table, + num_parts=num_parts, + part=part, + scale=args.scale, + format=args.format, + output_base=output_base, + temp_root=temp_root, + parquet_row_group_bytes_override=args.parquet_row_group_bytes, + use_upstream_compression=args.use_upstream_compression, + use_upstream_encoding=args.use_upstream_encoding, + parquet_version=parquet_version, + decimal_column_type=decimal_column_type, + date_column_type=date_column_type, + nationkey_type=nationkey_type, + regionkey_type=regionkey_type, + use_upstream_disable_dictionary_encoding=args.use_upstream_disable_dictionary_encoding, + no_delta_length_byte_array=args.no_delta_length_byte_array, + ) + futures.append(future) + + # Wait for all futures to complete and handle exceptions + for future in as_completed(futures): + try: + future.result() + except Exception as e: + print(f"Error generating partition: {e}") + raise + + finally: + # Cleanup temporary files + print("Cleaning up temporary files...") + if temp_root.exists(): + shutil.rmtree(temp_root) + + print("TPC-H data generation complete!") + print(f"Data has been generated in: {output_base}") + + # Generate metadata.json + script_dir = Path(__file__).parent + inspect_script = script_dir / "inspect_tpch_parquet.py" + + print("Generating metadata.json...") + + options = { + "scale_factor": args.scale, + "format": args.format, + "output_base_dir": str(output_base), + "threads": args.jobs, + "use_upstream_compression": args.use_upstream_compression, + "use_upstream_encoding": args.use_upstream_encoding, + "parquet_version": parquet_version, + "nationkey_type": nationkey_type, + "regionkey_type": regionkey_type, + "decimal_column_type": decimal_column_type, + "date_column_type": date_column_type, + "parquet_row_group_bytes_customer": PARQUET_ROW_GROUP_BYTES_DEFAULTS["customer"], + "parquet_row_group_bytes_lineitem": PARQUET_ROW_GROUP_BYTES_DEFAULTS["lineitem"], + "parquet_row_group_bytes_nation": PARQUET_ROW_GROUP_BYTES_DEFAULTS["nation"], + "parquet_row_group_bytes_orders": PARQUET_ROW_GROUP_BYTES_DEFAULTS["orders"], + "parquet_row_group_bytes_part": PARQUET_ROW_GROUP_BYTES_DEFAULTS["part"], + "parquet_row_group_bytes_partsupp": PARQUET_ROW_GROUP_BYTES_DEFAULTS["partsupp"], + "parquet_row_group_bytes_region": PARQUET_ROW_GROUP_BYTES_DEFAULTS["region"], + "parquet_row_group_bytes_supplier": PARQUET_ROW_GROUP_BYTES_DEFAULTS["supplier"], + } + + subprocess.run( + [ + sys.executable, + str(inspect_script), + str(output_base), + "--output", + "json", + "--output-file", + str(output_base / "metadata.json"), + "--options", + json.dumps(options), + ], + check=True, + ) + + print(f"Metadata written to: {output_base / 'metadata.json'}") + + +def main(args=None): + parsed = parse_args(args) + if os.path.exists(args.data_dir_path): + shutil.rmtree(args.data_dir_path) + Path(f"{parsed.data_dir_path}").mkdir(parents=True, exist_ok=True) + + # tpchgen is much faster, but is exclusive to generating tpch data. Use duckdb as a fallback. + if parsed.benchmark_type == "tpch" and not parsed.use_duckdb: + if parsed.verbose: + print("generating with tpchgen") + generate_data_files_with_tpchgen(parsed) + else: + if parsed.verbose: + print("generating with duckdb") + generate_data_files_with_duckdb(parsed) + + +if __name__ == "__main__": + main() diff --git a/benchmark_data_tools/inspect_tpch_parquet.py b/benchmark_data_tools/inspect_tpch_parquet.py new file mode 100644 index 00000000..0699fe90 --- /dev/null +++ b/benchmark_data_tools/inspect_tpch_parquet.py @@ -0,0 +1,558 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "pyarrow", +# ] +# /// + +import argparse +import csv +import dataclasses +import datetime +import functools +import json +import statistics +import sys +from concurrent.futures import ThreadPoolExecutor +from decimal import Decimal +from pathlib import Path +from typing import Any, Literal + +import pyarrow as pa +import pyarrow.parquet as pq + +# TODO: think about how to capture things like compression ratio. + + +def _convert_value(v: Any) -> Any: + """Convert non-JSON-serializable values.""" + if isinstance(v, bytes): + return v.decode("utf-8", errors="replace") + if isinstance(v, (datetime.date, datetime.datetime)): + return v.isoformat() + if isinstance(v, Decimal): + sign, digits, exponent = v.as_tuple() + return { + "type": "decimal", + "value": str(v), + "sign": sign, + "digits": digits, + "exponent": exponent, + } + return v + + +@dataclasses.dataclass +class ColumnStats: + min: Any # Type depends on column type + max: Any # Type depends on column type + null_count: int + num_values: int + + def serialize(self) -> dict[str, Any]: + """Convert to JSON-serializable dict.""" + result = dataclasses.asdict(self) + result["min"] = _convert_value(result["min"]) + result["max"] = _convert_value(result["max"]) + return result + + +@dataclasses.dataclass +class ColumnInfo: + """ + Metadata for a single column in a parquet table. + + Parameters + ---------- + name + The name of the column. + physical_type + The physical type of the column. + is_stats_set + Whether the stats are set for the column. + stats + The stats for the column. + + These statistics are aggregated over all the row groups + and partitions in the table. The stats are aggregated + as you'd expect: + + - minimum is the minimum value of the column over all row groups. + - maximum is the maximum value of the column over all row groups. + - null_count is the sum of the null counts over all row groups. + - num_values is the sum of the number of values over all row groups. + + encodings + The encodings used for the column. + compression + The compression used for the column. + total_compressed_size + The total size of the compressed column data in bytes. + total_uncompressed_size + The total size of the uncompressed column data in bytes. + + """ + + name: str + physical_type: str + is_stats_set: bool + stats: ColumnStats | None # Aggregated over row groups + encodings: list[str] + compression: str + total_compressed_size: int # sum over all the rgs + total_uncompressed_size: int # sum over all the rgs + + def serialize(self) -> dict[str, Any]: + """Convert to JSON-serializable dict.""" + result = dataclasses.asdict(self) + result["stats"] = self.stats.serialize() if self.stats else None + return result + + +@dataclasses.dataclass +class TableInfo: + """ + Summary statistics for a parquet table. + + Parameters + ---------- + partition_count + The number of partitions (files) in the table. + row_group_count + The total number of row groups in the table. + row_count + The total number of rows in the table. + total_bytes + The total size of the table in bytes. + avg_rows_per_partition + The average number of rows per partition. + min_rows_per_partition + The fewest rows in any partition. + max_rows_per_partition + The most rows in any partition. + avg_bytes_per_partition + The average size of a partition in bytes. + min_bytes_per_partition + The smallest partition in bytes. + max_bytes_per_partition + The largest partition in bytes. + avg_rows_per_row_group + The average number of rows per row group. + min_rows_per_row_group + The fewest rows in any row group. + max_rows_per_row_group + The most rows in any row group. + avg_bytes_per_row_group + The average size of a row group in bytes. + min_bytes_per_row_group + The smallest row group in bytes. + max_bytes_per_row_group + The largest row group in bytes. + parquet_format_version + The version of the parquet format used. + created_by + The software that created the parquet file. + columns + A dictionary mapping column names to ColumnInfo. + """ + + table_schema: pa.Schema + # global table stats + partition_count: int + row_group_count: int + row_count: int + total_bytes: int + parquet_format_version: Literal[1, 2] + created_by: str + + # partitions summary stats + avg_rows_per_partition: int + min_rows_per_partition: int + max_rows_per_partition: int + + avg_bytes_per_partition: int + min_bytes_per_partition: int + max_bytes_per_partition: int + + # row groups summary stats + avg_rows_per_row_group: int + min_rows_per_row_group: int + max_rows_per_row_group: int + + avg_bytes_per_row_group: int + min_bytes_per_row_group: int + max_bytes_per_row_group: int + + columns: dict[str, ColumnInfo] + + @classmethod + def build(cls, file_path: Path) -> "TableInfo": + ds = pq.ParquetDataset(file_path) + + rg_flat = [rg for fragment in ds.fragments for rg in fragment.row_groups] + + # Per-partition stats + rows_per_partition = [sum(rg.num_rows for rg in fragment.row_groups) for fragment in ds.fragments] + bytes_per_partition = [sum(rg.total_byte_size for rg in fragment.row_groups) for fragment in ds.fragments] + + # Per-row-group stats + rows_per_rg = [rg.num_rows for rg in rg_flat] + bytes_per_rg = [rg.total_byte_size for rg in rg_flat] + + # Get file metadata from the first file + parquet_files = list(file_path.glob("*.parquet")) + if not parquet_files: + parquet_files = [file_path] # Single file case + + first_pf = pq.ParquetFile(parquet_files[0]) + file_metadata = first_pf.metadata + parquet_format_version: Literal[1, 2] = 2 if file_metadata.format_version == "2.6" else 1 + created_by = file_metadata.created_by or "" + + # Collect column metadata aggregated across all files/row groups + columns: dict[str, ColumnInfo] = {} + column_aggregates: dict[str, dict[str, Any]] = {} + + # Process files in parallel + with ThreadPoolExecutor() as executor: + results = list( + executor.map( + functools.partial(process_parquet_file, table_schema=ds.schema), + parquet_files, + ) + ) + + # Merge results from all files + for file_aggregates in results: + for col_name, file_agg in file_aggregates.items(): + if col_name not in column_aggregates: + column_aggregates[col_name] = file_agg + else: + agg = column_aggregates[col_name] + agg["encodings"].update(file_agg["encodings"]) + agg["total_compressed_size"] += file_agg["total_compressed_size"] + agg["total_uncompressed_size"] += file_agg["total_uncompressed_size"] + agg["min_values"].extend(file_agg["min_values"]) + agg["max_values"].extend(file_agg["max_values"]) + agg["null_count"] += file_agg["null_count"] + agg["num_values"] += file_agg["num_values"] + + # Build ColumnInfo objects from aggregates + for col_name, agg in column_aggregates.items(): + # Compute aggregated stats + col_stats: ColumnStats | None = None + if agg["is_stats_set"] and agg["min_values"] and agg["max_values"]: + col_stats = ColumnStats( + min=min(agg["min_values"]), + max=max(agg["max_values"]), + null_count=agg["null_count"], + num_values=agg["num_values"], + ) + + columns[col_name] = ColumnInfo( + name=col_name, + physical_type=agg["physical_type"], + is_stats_set=agg["is_stats_set"], + stats=col_stats, + encodings=sorted(agg["encodings"]), + compression=agg["compression"], + total_compressed_size=agg["total_compressed_size"], + total_uncompressed_size=agg["total_uncompressed_size"], + ) + + return cls( + table_schema=ds.schema, + partition_count=len(ds.fragments), + row_group_count=len(rg_flat), + row_count=sum(rows_per_rg), + total_bytes=sum(bytes_per_rg), + parquet_format_version=parquet_format_version, + created_by=created_by, + avg_rows_per_partition=int(statistics.mean(rows_per_partition)) if rows_per_partition else 0, + min_rows_per_partition=min(rows_per_partition, default=0), + max_rows_per_partition=max(rows_per_partition, default=0), + avg_bytes_per_partition=int(statistics.mean(bytes_per_partition)) if bytes_per_partition else 0, + min_bytes_per_partition=min(bytes_per_partition, default=0), + max_bytes_per_partition=max(bytes_per_partition, default=0), + avg_rows_per_row_group=int(statistics.mean(rows_per_rg)) if rows_per_rg else 0, + min_rows_per_row_group=min(rows_per_rg, default=0), + max_rows_per_row_group=max(rows_per_rg, default=0), + avg_bytes_per_row_group=int(statistics.mean(bytes_per_rg)) if bytes_per_rg else 0, + min_bytes_per_row_group=min(bytes_per_rg, default=0), + max_bytes_per_row_group=max(bytes_per_rg, default=0), + columns=columns, + ) + + def serialize(self) -> dict[str, Any]: + """Convert to JSON-serializable dict.""" + return { + "partition_count": self.partition_count, + "row_group_count": self.row_group_count, + "row_count": self.row_count, + "total_bytes": self.total_bytes, + "parquet_format_version": self.parquet_format_version, + "created_by": self.created_by, + "table_schema": [ + { + "name": field.name, + "type": str(field.type), + "nullable": field.nullable, + } + for field in self.table_schema + ], + "avg_rows_per_partition": self.avg_rows_per_partition, + "min_rows_per_partition": self.min_rows_per_partition, + "max_rows_per_partition": self.max_rows_per_partition, + "avg_bytes_per_partition": self.avg_bytes_per_partition, + "min_bytes_per_partition": self.min_bytes_per_partition, + "max_bytes_per_partition": self.max_bytes_per_partition, + "avg_rows_per_row_group": self.avg_rows_per_row_group, + "min_rows_per_row_group": self.min_rows_per_row_group, + "max_rows_per_row_group": self.max_rows_per_row_group, + "avg_bytes_per_row_group": self.avg_bytes_per_row_group, + "min_bytes_per_row_group": self.min_bytes_per_row_group, + "max_bytes_per_row_group": self.max_bytes_per_row_group, + "columns": {name: col.serialize() for name, col in self.columns.items()}, + } + + +@dataclasses.dataclass +class Metadata: + options: dict[str, Any] + tables: dict[str, TableInfo] + + def serialize(self) -> dict[str, Any]: + result = dataclasses.asdict(self) + result["tables"] = {table: info.serialize() for table, info in self.tables.items()} + return result + + +def process_parquet_file(pf_path: Path, table_schema: pa.Schema) -> dict[str, dict[str, Any]]: + """Process a single parquet file and return its column aggregates.""" + file_aggregates: dict[str, dict[str, Any]] = {} + pf = pq.ParquetFile(pf_path) + metadata = pf.metadata + + for rg_idx in range(metadata.num_row_groups): + rg = metadata.row_group(rg_idx) + + for col_idx in range(rg.num_columns): + col = rg.column(col_idx) + col_name = col.path_in_schema + + if col_name not in file_aggregates: + file_aggregates[col_name] = { + "physical_type": str(col.physical_type), + "compression": str(col.compression), + "encodings": set(), + "total_compressed_size": 0, + "total_uncompressed_size": 0, + "is_stats_set": col.is_stats_set, + "min_values": [], + "max_values": [], + "null_count": 0, + "num_values": 0, + } + + agg = file_aggregates[col_name] + agg["encodings"].update(str(e) for e in col.encodings) + agg["total_compressed_size"] += col.total_compressed_size + agg["total_uncompressed_size"] += col.total_uncompressed_size + + # Aggregate stats if available + if col.is_stats_set and col.statistics is not None: + stats = col.statistics + if stats.has_min_max: + # For Decimal types, pyarrow might throw a not implemented error here :/ + if not pa.types.is_decimal128(table_schema.field(col_name).type): + agg["min_values"].append(stats.min) + agg["max_values"].append(stats.max) + agg["null_count"] += stats.null_count + agg["num_values"] += stats.num_values + + return file_aggregates + + +def inspect_table_text(table_name: str, info: TableInfo, show_schema: bool = True, show_sizes: bool = True): + """Print metadata for a parquet table in text format.""" + print(f"\n{table_name}") + print("=" * 80) + + if show_sizes: + print(f" Partitions: {info.partition_count}") + print(f" Row groups: {info.row_group_count}") + print(f" Total rows: {info.row_count:,}") + print(f" Total bytes: {info.total_bytes:,}") + + print("\n Partition stats:") + print( + f" Rows: avg={info.avg_rows_per_partition:,} min={info.min_rows_per_partition:,} max={info.max_rows_per_partition:,}" + ) + print( + f" Bytes: avg={info.avg_bytes_per_partition:,} min={info.min_bytes_per_partition:,} max={info.max_bytes_per_partition:,}" + ) + + print("\n Row group stats:") + print( + f" Rows: avg={info.avg_rows_per_row_group:,} min={info.min_rows_per_row_group:,} max={info.max_rows_per_row_group:,}" + ) + print( + f" Bytes: avg={info.avg_bytes_per_row_group:,} min={info.min_bytes_per_row_group:,} max={info.max_bytes_per_row_group:,}" + ) + + if show_schema: + if show_sizes: + print() + print(" Schema:") + print(" " + "-" * 40) + for i, field in enumerate(info.table_schema): + print(f" {i}: {field.name} ({field.type}, nullable={field.nullable})") + + +def main(): + parser = argparse.ArgumentParser(description="Inspect TPC-H parquet files") + parser.add_argument("data_dir", type=Path, help="Path to TPC-H data directory") + parser.add_argument( + "--schema/--no-schema", + dest="show_schema", + action=argparse.BooleanOptionalAction, + default=True, + help="Print schema information (default: True)", + ) + parser.add_argument( + "--sizes/--no-sizes", + dest="show_sizes", + action=argparse.BooleanOptionalAction, + default=True, + help="Print partition/row group size statistics (default: True)", + ) + parser.add_argument( + "-o", + "--output", + choices=["text", "csv", "json"], + default="text", + help="Output format (default: text)", + ) + parser.add_argument( + "--output-file", + type=Path, + default=None, + help="Write output to file instead of stdout (only used with --output json)", + ) + parser.add_argument( + "--options", + type=json.loads, + default={}, + help="Options to include in the metadata (default: {})", + ) + args = parser.parse_args() + + data_dir = args.data_dir + if not data_dir.exists(): + print(f"Error: Directory {data_dir} does not exist", file=sys.stderr) + sys.exit(1) + if args.output == "text": + print(f"Inspecting {data_dir}") + + tables = [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier", + ] + + # Collect all table info + table_infos: list[tuple[str, TableInfo]] = [] + for table in tables: + # Check for partitioned data (directory) or unpartitioned data (single file) + table_dir = data_dir / table + table_file = data_dir / f"{table}.parquet" + + if table_dir.exists(): + table_path = table_dir + elif table_file.exists(): + table_path = table_file + else: + if args.output == "text": + print(f"Warning: {table} not found (checked {table_dir} and {table_file})") + continue + + info = TableInfo.build(table_path) + table_infos.append((table, info)) + + if args.output == "text": + for table, info in table_infos: + inspect_table_text(table, info, show_schema=args.show_schema, show_sizes=args.show_sizes) + elif args.output == "csv": + writer = csv.writer(sys.stdout) + if args.show_schema: + writer.writerow(["table_name", "name", "type", "nullable"]) + for table, info in table_infos: + for field in info.table_schema: + writer.writerow([table, field.name, str(field.type), field.nullable]) + else: + writer.writerow( + [ + "table_name", + "partition_count", + "row_group_count", + "row_count", + "total_bytes", + "avg_rows_per_partition", + "min_rows_per_partition", + "max_rows_per_partition", + "avg_bytes_per_partition", + "min_bytes_per_partition", + "max_bytes_per_partition", + "avg_rows_per_row_group", + "min_rows_per_row_group", + "max_rows_per_row_group", + "avg_bytes_per_row_group", + "min_bytes_per_row_group", + "max_bytes_per_row_group", + ] + ) + for table, info in table_infos: + writer.writerow( + [ + table, + info.partition_count, + info.row_group_count, + info.row_count, + info.total_bytes, + info.avg_rows_per_partition, + info.min_rows_per_partition, + info.max_rows_per_partition, + info.avg_bytes_per_partition, + info.min_bytes_per_partition, + info.max_bytes_per_partition, + info.avg_rows_per_row_group, + info.min_rows_per_row_group, + info.max_rows_per_row_group, + info.avg_bytes_per_row_group, + info.min_bytes_per_row_group, + info.max_bytes_per_row_group, + ] + ) + elif args.output == "json": + metadata = Metadata(options=args.options, tables=dict(table_infos)) + # output_data = {table: info.serialize() for table, info in table_infos} + output_data = metadata.serialize() + if args.output_file: + with open(args.output_file, "w") as f: + json.dump(output_data, f, indent=2) + else: + json.dump(output_data, sys.stdout, indent=2) + print() # Add newline after JSON + + +if __name__ == "__main__": + main() diff --git a/benchmark_data_tools/requirements.txt b/benchmark_data_tools/requirements.txt index 2f90c5d4..ddf400ef 100644 --- a/benchmark_data_tools/requirements.txt +++ b/benchmark_data_tools/requirements.txt @@ -7,6 +7,6 @@ presto-python-client==0.8.4 requests==2.32.4 six==1.17.0 urllib3==2.5.0 -tpchgen-cli==2.0.1 +tpchgen-cli @ git+https://github.com/TomAugspurger/tpchgen-rs@tom/sync-upstream-clean#subdirectory=tpchgen-cli pyarrow==21.0.0 pytest==9.0.2