Conversation
| vdata = per_query_validation.get(vkey) | ||
| validation_result = ( | ||
| { | ||
| "status": "expected-failure" if vdata["status"] == "xfail" else vdata["status"], |
There was a problem hiding this comment.
If you're able to, I'd recommend standardizing on "expected-failure" as the string to represent an expected failure, rather than converting between xfail and expected-failure.
But if other parts of this project already use xfail then nvm.
| # Handle failed queries that may not have times | ||
| for query_name, error_info in failed_queries.items(): | ||
| if query_name not in raw_times: | ||
| vkey = "q" + query_name.lstrip("Q").lower() |
There was a problem hiding this comment.
If the comment above is accurate ("failed queries that may not have times") won't this be "not-validated" by definition?
There was a problem hiding this comment.
If a query fails before producing a result file then it should fall-back to "not-validated". I've added a comment to clarify.
| try: | ||
| validation_results = json.loads(validation_results_path.read_text()) | ||
| except (json.JSONDecodeError, FileNotFoundError) as e: | ||
| print(f" Warning: could not load validation results: {e}", file=sys.stderr) |
There was a problem hiding this comment.
Is continuing here the right behavior?
I'd recommend either raising here or recording some kind of error status and ensuring that the process exits with a non-zero status code.
There was a problem hiding this comment.
I'm now popagating the exception and exiting if the json is malformed.
| """ | ||
| Validate TPC-H query results against expected parquet files. | ||
|
|
||
| Validation logic is ported from cudf_polars's assert_tpch_result_equal |
There was a problem hiding this comment.
I think vendoring the code here is the right call, at least for now. Maybe longer term we can try to find a home for this that both velox-testing and cudf-polars can depend on.
So a request to our future selves: try to remember to fix issues in both places.
| - Schema (dtypes) is NOT checked — Presto may produce different parquet | ||
| types than polars for the same logical values. | ||
| - Decimal columns are cast to Float64 before comparison (same as polars). | ||
| - Floating-point values are compared with rel_tol=1e-5, abs_tol=1e-8. |
There was a problem hiding this comment.
Nice, I think cudf-polars only validates with abs_tol=1e-2.
| # sort_by entries: (column_name, descending) | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| QUERY_CONFIG: dict[str, dict] = { |
There was a problem hiding this comment.
Is velox-testing running mypy / some other type checker?
| QUERY_CONFIG: dict[str, dict] = { | |
| class SortLimit(TypedDict): | |
| sort_by: list[tuple[str, bool]] | None | |
| limit: int | None | |
| xfail_if_empty: bool | |
| QUERY_CONFIG: dict[str, SortLimit] = { |
There was a problem hiding this comment.
Ah, I guess xfail_if_empty is only sometimes there. There is xfail_if_empty: typing.NotRequired[bool] for that.
There was a problem hiding this comment.
Done. mypy isn't used in this repo (to my knowledge). We us some other checkers as part of the pre-commit but I'm not sure if anything uses type-checking rules.
|
|
||
| def _polars_assert_frame_equal(left: pl.DataFrame, right: pl.DataFrame, **kwargs: Any) -> None: | ||
| """Call polars.testing.assert_frame_equal, handling rel_tol/abs_tol API differences.""" | ||
| try: |
There was a problem hiding this comment.
FWIW, I think requiring a new enough polars is reasonable here (cudf-polars needs to support older versions).
|
|
||
| def _reconcile_presto_col_names(result: pl.DataFrame, expected: pl.DataFrame) -> pl.DataFrame: | ||
| """ | ||
| Rename Presto's anonymous aggregate columns (_col0, _col1, ...) to match |
There was a problem hiding this comment.
Maybe out of scope for this PR, but does presto offer the option to rename these anonymous columns in the query? When implementing validation in cudf-polars, we had some similar issues that we decided to fix by adjusting the query.
There was a problem hiding this comment.
We could rename the columns in the query, but I'd rather leave the SQL alone. I think it's easy enough to keep the original SQL and just reconcile them here.
| polars_kwargs: dict[str, Any] = { | ||
| "check_row_order": True, | ||
| "check_column_order": True, | ||
| "check_dtypes": False, # Presto types may differ from polars types |
There was a problem hiding this comment.
This feels slightly risky. Do you worry at all about unexpectedly getting different dtypes here?
On the cudf-polars side, we handled a similar issue by explicitly listing the casts required to match duckdb: https://github.com/rapidsai/cudf/blob/9ba0eb36f55712dae230ebb1b40b7fa1326fe147/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsh.py#L44-L105. For tpc-h this wasn't too bad. For something like tpc-ds it might not be tenable.
On the other hand, for both cudf-polars and velox-cudf, we do compare against the CPU engine. So it's not like we're completely ignoring dtype validation.
There was a problem hiding this comment.
I've turned the dtype detection on and added a duckdb->presto mapping of types. As far as I can tell, it's just occasional int32 vs int64 differences between the two.
| query_id: str, | ||
| actual: pl.DataFrame, | ||
| expected: pl.DataFrame, | ||
| ) -> tuple[str, str | None]: |
There was a problem hiding this comment.
I'd recommend encoding the status as a Literal or enum. And if possible use the same values as the API expects (expected-failure).
… misiug/validationSplit
TomAugspurger
left a comment
There was a problem hiding this comment.
Looks good. Just one note about a recent change to assert_tpch_result_equal in cudf-polars.
| right: pl.DataFrame, | ||
| *, | ||
| sort_by: list[tuple[str, bool]], | ||
| limit: int | None = None, |
There was a problem hiding this comment.
@Matt711 just added a new nulls_last keyword to support tpc-ds: rapidsai/cudf@109634d. I'd recommend trying to adopt that now, or making an issue to do it in the future.
| --skip-drop-cache Skip dropping system caches before each benchmark query (dropped by default). | ||
| -m, --metrics Collect detailed metrics from Presto REST API after each query. | ||
| Metrics are stored in query-specific directories. | ||
| --expected-dir Path to a directory containing expected TPC-H parquet files. |
There was a problem hiding this comment.
Can we consistently use --reference-results-dir as done in the integration test script?
| VALIDATE_SCRIPT="${SCRIPT_DIR}/../../benchmark_reporting_tools/validate_results.py" | ||
|
|
||
| # Determine the expected directory. | ||
| # If --expected-dir was not provided, auto-detect from ${PRESTO_DATA_DIR}/sf${SCALE_FACTOR}_expected |
There was a problem hiding this comment.
Why is the reference results directory placed under PRESTO_DATA_DIR (which is supposed to be for the datasets)?
There was a problem hiding this comment.
Since the reference results are going to be tied to the source data, the convention so far as been that the two should live together. This is overwrite-able, but the default location for a reference result is the same as the source data + the _expected suffix.
To that end, I've re-written this section so that it prioritizes an explicit path parameter, and otherwise defaults to looking in the same data directory that the source data was taken from with that "_expected" suffix. If an explicit path is provided but there are no reference results it's an error, if no path is specified and there are no implicit reference results it's a "not validated".
There was a problem hiding this comment.
the convention so far as been that the two should live together.
Where is this convention defined? I don't think any of the existing scripts does this?
| EXPECTED_DIR_EXPLICIT=false | ||
| if [[ -n ${EXPECTED_DIR} ]]; then | ||
| EXPECTED_DIR_EXPLICIT=true | ||
| elif [[ -n ${BENCHMARK_EXPECTED_BASE_DIR:-${PRESTO_DATA_DIR}} ]]; then | ||
| BENCHMARK_RESULT_JSON="${ACTUAL_OUTPUT_DIR}/benchmark_result.json" | ||
| SCALE_FACTOR_FROM_DATA="$(python3 -c " | ||
| import json | ||
| try: | ||
| d = json.load(open('${BENCHMARK_RESULT_JSON}')) | ||
| sf = d.get('context', {}).get('scale_factor') | ||
| if sf is not None: | ||
| sf = float(sf) | ||
| print(int(sf) if sf == int(sf) else sf) | ||
| except Exception: | ||
| pass | ||
| " 2>/dev/null)" | ||
| if [[ -n ${SCALE_FACTOR_FROM_DATA} ]]; then | ||
| EXPECTED_DIR="${BENCHMARK_EXPECTED_BASE_DIR:-${PRESTO_DATA_DIR}}/sf${SCALE_FACTOR_FROM_DATA}_expected" | ||
| fi | ||
| fi |
There was a problem hiding this comment.
Can all these be moved into the python validation script?
| """ | ||
| Validate TPC-H query results against expected parquet files. | ||
|
|
||
| Validation logic is ported from cudf_polars's assert_tpch_result_equal |
There was a problem hiding this comment.
We seem to have created divergence here. Why are we not re-using the validation logic in test_utils.py (
)?There was a problem hiding this comment.
We intentionally want to using the same comparison logic as cudf-polars. I don't know if we want the integration tests to use the same validation logic, but I do think we want to make sure that we are using the same logic as polars. Ideally, we want to pull this out as shared code between multiple projects.
There was a problem hiding this comment.
I think there is an open question here about how we want to consolidate the verification logic going forward, but I think for now it would be fine to leave our existing integration test logic alone and use the polars logic for benchmark verification as we do now. We can adjust later as necessary.
There was a problem hiding this comment.
The validation logic should be semantically the same in both projects, so what is the reason for the change? Using the same module for validation across projects would be ideal, but I don't think code duplication is the best way to get there.
… misiug/validationSplit
mattgara
left a comment
There was a problem hiding this comment.
LGTM. Awesome been waiting for a Polars verification utility, will really help with validation of larger runs!
| times = raw_times[query_name] | ||
| is_failed = query_name in failed_queries | ||
|
|
||
| # Look up validation result for this query (keys are lowercase e.g. "q1") |
There was a problem hiding this comment.
nit: This loop logic plus that around lines 444 look to be nearly identical, consider pulling out a shared base?
… misiug/validationSplit
… misiug/validationSplit
|
I've updated this PR to deduplicate the code between the Polars validation and what already existed in integration tests. Current output looks like: Case with failures: |
|
@misiugodfrey can you walk me through what those two outputs are saying? IIUC, it's that with the previous code there's some tests somewhere (where?) running something and validating the results, and all the tests passed validation. But with the new validation code, some of those tests are failing with validation errors? Based on the traceback, it looks like the values are different, so it's a good thing there's a validation error there (but the tests or implementation will need to be updated?). Or am I misreading things? |
@TomAugspurger I should clarify, the two cases I posted above were for the sake of showing the new output format. The "failing" case was a set of parquet files I intentionally changed to fail to test that path. So far everything seems to validate correctly (tested up to 3k). |
paul-aiyedun
left a comment
There was a problem hiding this comment.
Changes overall make sense to me. However, I had a number of questions.
| # PRESTO_EXPECTED_RESULTS_DIR env var is the implicit fallback (warning if missing). | ||
| # Explicit --reference-results-dir was already validated before the benchmark ran. | ||
| if [[ -n ${PRESTO_EXPECTED_RESULTS_DIR} && ! -d ${PRESTO_EXPECTED_RESULTS_DIR} ]]; then | ||
| echo "[Validation] Warning: PRESTO_EXPECTED_RESULTS_DIR not found: ${PRESTO_EXPECTED_RESULTS_DIR}; validation skipped." |
There was a problem hiding this comment.
Unless I am missing something, we still seem to proceed with validation in this case. Should we exit after this line?
There was a problem hiding this comment.
We proceeded to the next step where the script would output more details about how the directory does not exist and that validation was skipped. I think you are right though that an early exit is a better idea here, since the extra output is effectively redundant since we are already stating we are skipping due to a missing directory. I've changed this to an early exit.
| # Compute the actual output directory (mirrors pytest's --output-dir / --tag logic). | ||
| ACTUAL_OUTPUT_DIR="${OUTPUT_DIR:-$(pwd)/benchmark_output}" | ||
| if [[ -n ${TAG} ]]; then | ||
| ACTUAL_OUTPUT_DIR="${ACTUAL_OUTPUT_DIR}/${TAG}" | ||
| fi |
There was a problem hiding this comment.
Consider moving this logic into the validate_results.py script and reusing the same function that sets this (
There was a problem hiding this comment.
I've refactored this to use the same logic.
| VALIDATE_REQUIREMENTS="${SCRIPT_DIR}/../../benchmark_reporting_tools/requirements.txt" | ||
| echo "[Validation] Running validation: ${RESULTS_DIR} vs ${PRESTO_EXPECTED_RESULTS_DIR:-<not set>}" | ||
| pip install -q -r "${VALIDATE_REQUIREMENTS}" | ||
| python "${VALIDATE_SCRIPT}" "${VALIDATE_ARGS[@]}" |
There was a problem hiding this comment.
Did you consider using run_py_script.sh?
There was a problem hiding this comment.
Switching to use the script.
| def _get_validation_result(query_name): | ||
| # Look up validation result for this query (keys are lowercase e.g. "q1") | ||
| vkey = "q" + query_name.lstrip("Q").lower() | ||
| vdata = per_query_validation.get(vkey) |
There was a problem hiding this comment.
What happens if per_query_validation is an empty dictionary (from line 400)?
There was a problem hiding this comment.
If per_query_validation is empty then per_query_validation.get(vkey) should return None for each query and the status will be returned as "not-validated".
| print(f" Node count: {payload['node_count']}", file=sys.stderr) | ||
| print(f" Query logs: {len(payload['query_logs'])}", file=sys.stderr) | ||
| print(f" Validation status: {payload['validation_status']}", file=sys.stderr) | ||
| xfail_queries = [ |
There was a problem hiding this comment.
What does the x prefix mean here?
There was a problem hiding this comment.
The "x" prefix was short for "expected". As in this is an "expected failure". This convention was set in the Benchmarking DB where one of the possible validation states is "XFAIL" for this particular case.
| # 1 & 2. Column reconciliation and validation | ||
| actual = _reconcile_col_names(actual, expected) | ||
| if list(actual.columns) != list(expected.columns): | ||
| extra = set(actual.columns) - set(expected.columns) | ||
| missing = set(expected.columns) - set(actual.columns) | ||
| raise AssertionError( | ||
| f"Column name mismatch — extra: {extra}, missing: {missing}\n" | ||
| f" actual: {list(actual.columns)}\n" | ||
| f" expected: {list(expected.columns)}" | ||
| ) |
There was a problem hiding this comment.
Why are we concerned about column names for this validation?
There was a problem hiding this comment.
This is because the Polars validation did this. If we are unconcerned with the column names I could remove this, but I would rather we keep as close to them as we can.
There was a problem hiding this comment.
Perhaps, @TomAugspurger can speak to why this is done for Polars, but I believe the TPC-H specification states that column names are optional
2.1.3.4 (a) Columns appear in the order specified by the SELECT list of either the functional query definition or an
approved variant. Column headings are optional.
Also, it is possible to have queries without clear column names. For instance, Q18 projects sum(l_quantity) without an alias and so, the column name would be query engine defined.
There was a problem hiding this comment.
Having the column names is helpful for debugging, and simplifies the rest of the validation logic (which can now assume column names match) and error reporting (it's easier to say and read "the data type of column '<name>' doesn't match" rather than something about a positional index or left_name=... right_name=...)
the column name would be query engine defined
We made minor changes to our polars expressions to match (e.g. the .alias("sum(l_quantity)") to match duckdb here). Nothing too onerous.
| # Decimal → float64 | ||
| if _is_decimal_like(actual[col]): | ||
| actual[col] = pd.to_numeric(actual[col], errors="coerce") | ||
| if _is_decimal_like(expected[col]): |
There was a problem hiding this comment.
Instead of repeated if statements, can we have this function do normalization for one dataframe at a time (similar to the normalize_rows function that existed before)?
| # Safety net: compare as Timestamps when one side is a date | ||
| # string (e.g. Presto '1995-03-05') and the other is a | ||
| # Timestamp object that slipped through _normalize_dtypes. | ||
| try: | ||
| if pd.Timestamp(v1) == pd.Timestamp(v2): | ||
| continue | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Why is the column type not checked here?
There was a problem hiding this comment.
I'll add a check.
| if not sort_by: | ||
| # No ORDER BY (or unparsable) — sort both sides and compare | ||
| _assert_frames_equal(_sort_for_comparison(actual), _sort_for_comparison(expected)) | ||
| return |
There was a problem hiding this comment.
Is this check sufficient for queries with limits?
There was a problem hiding this comment.
I'm not sure there' s more that we can do in this case, as a LIMIT without a sort_by means any limit-sized subset of the data is valid.
| Extract ORDER BY information from SQL using sqlglot. | ||
|
|
||
| Returns: | ||
| (sort_by, nulls_last) where sort_by is [(col_name, descending), ...] |
There was a problem hiding this comment.
Why are we concerned about nulls_last?
There was a problem hiding this comment.
It's a DuckDB thing. DuckDB defaults to ASC → NULLS LAST, DESC → NULLS FIRST, and we need to track this to know where we should expect nulls in ORDER BY columns.
There was a problem hiding this comment.
Why do we only get it for the first sorted column?
misiugodfrey
left a comment
There was a problem hiding this comment.
Addressed recent feedback.
| # PRESTO_EXPECTED_RESULTS_DIR env var is the implicit fallback (warning if missing). | ||
| # Explicit --reference-results-dir was already validated before the benchmark ran. | ||
| if [[ -n ${PRESTO_EXPECTED_RESULTS_DIR} && ! -d ${PRESTO_EXPECTED_RESULTS_DIR} ]]; then | ||
| echo "[Validation] Warning: PRESTO_EXPECTED_RESULTS_DIR not found: ${PRESTO_EXPECTED_RESULTS_DIR}; validation skipped." |
There was a problem hiding this comment.
We proceeded to the next step where the script would output more details about how the directory does not exist and that validation was skipped. I think you are right though that an early exit is a better idea here, since the extra output is effectively redundant since we are already stating we are skipping due to a missing directory. I've changed this to an early exit.
| # Compute the actual output directory (mirrors pytest's --output-dir / --tag logic). | ||
| ACTUAL_OUTPUT_DIR="${OUTPUT_DIR:-$(pwd)/benchmark_output}" | ||
| if [[ -n ${TAG} ]]; then | ||
| ACTUAL_OUTPUT_DIR="${ACTUAL_OUTPUT_DIR}/${TAG}" | ||
| fi |
There was a problem hiding this comment.
I've refactored this to use the same logic.
| VALIDATE_REQUIREMENTS="${SCRIPT_DIR}/../../benchmark_reporting_tools/requirements.txt" | ||
| echo "[Validation] Running validation: ${RESULTS_DIR} vs ${PRESTO_EXPECTED_RESULTS_DIR:-<not set>}" | ||
| pip install -q -r "${VALIDATE_REQUIREMENTS}" | ||
| python "${VALIDATE_SCRIPT}" "${VALIDATE_ARGS[@]}" |
There was a problem hiding this comment.
Switching to use the script.
| def _get_validation_result(query_name): | ||
| # Look up validation result for this query (keys are lowercase e.g. "q1") | ||
| vkey = "q" + query_name.lstrip("Q").lower() | ||
| vdata = per_query_validation.get(vkey) |
There was a problem hiding this comment.
If per_query_validation is empty then per_query_validation.get(vkey) should return None for each query and the status will be returned as "not-validated".
| print(f" Node count: {payload['node_count']}", file=sys.stderr) | ||
| print(f" Query logs: {len(payload['query_logs'])}", file=sys.stderr) | ||
| print(f" Validation status: {payload['validation_status']}", file=sys.stderr) | ||
| xfail_queries = [ |
There was a problem hiding this comment.
The "x" prefix was short for "expected". As in this is an "expected failure". This convention was set in the Benchmarking DB where one of the possible validation states is "XFAIL" for this particular case.
| # 1 & 2. Column reconciliation and validation | ||
| actual = _reconcile_col_names(actual, expected) | ||
| if list(actual.columns) != list(expected.columns): | ||
| extra = set(actual.columns) - set(expected.columns) | ||
| missing = set(expected.columns) - set(actual.columns) | ||
| raise AssertionError( | ||
| f"Column name mismatch — extra: {extra}, missing: {missing}\n" | ||
| f" actual: {list(actual.columns)}\n" | ||
| f" expected: {list(expected.columns)}" | ||
| ) |
There was a problem hiding this comment.
This is because the Polars validation did this. If we are unconcerned with the column names I could remove this, but I would rather we keep as close to them as we can.
| DuckDB reference) and benchmark validation (comparing result parquet files | ||
| against expected parquet files). | ||
|
|
||
| Comparison behaviour |
There was a problem hiding this comment.
I'll strip the module docstring down and let the function docstring contain the details.
| # Safety net: compare as Timestamps when one side is a date | ||
| # string (e.g. Presto '1995-03-05') and the other is a | ||
| # Timestamp object that slipped through _normalize_dtypes. | ||
| try: | ||
| if pd.Timestamp(v1) == pd.Timestamp(v2): | ||
| continue | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
I'll add a check.
| if not sort_by: | ||
| # No ORDER BY (or unparsable) — sort both sides and compare | ||
| _assert_frames_equal(_sort_for_comparison(actual), _sort_for_comparison(expected)) | ||
| return |
There was a problem hiding this comment.
I'm not sure there' s more that we can do in this case, as a LIMIT without a sort_by means any limit-sized subset of the data is valid.
| Extract ORDER BY information from SQL using sqlglot. | ||
|
|
||
| Returns: | ||
| (sort_by, nulls_last) where sort_by is [(col_name, descending), ...] |
There was a problem hiding this comment.
It's a DuckDB thing. DuckDB defaults to ASC → NULLS LAST, DESC → NULLS FIRST, and we need to track this to know where we should expect nulls in ORDER BY columns.
There was a problem hiding this comment.
common/testing is not a Python project, so I don't think requirements.txt should be here. The dependencies should probably be managed by the project that uses the shared modules/files.
| # 1 & 2. Column reconciliation and validation | ||
| actual = _reconcile_col_names(actual, expected) | ||
| if list(actual.columns) != list(expected.columns): | ||
| extra = set(actual.columns) - set(expected.columns) | ||
| missing = set(expected.columns) - set(actual.columns) | ||
| raise AssertionError( | ||
| f"Column name mismatch — extra: {extra}, missing: {missing}\n" | ||
| f" actual: {list(actual.columns)}\n" | ||
| f" expected: {list(expected.columns)}" | ||
| ) |
There was a problem hiding this comment.
Perhaps, @TomAugspurger can speak to why this is done for Polars, but I believe the TPC-H specification states that column names are optional
2.1.3.4 (a) Columns appear in the order specified by the SELECT list of either the functional query definition or an
approved variant. Column headings are optional.
Also, it is possible to have queries without clear column names. For instance, Q18 projects sum(l_quantity) without an alias and so, the column name would be query engine defined.
| ACTUAL_OUTPUT_DIR="${OUTPUT_DIR:-$(pwd)/benchmark_output}" | ||
| [[ -n ${TAG} ]] && ACTUAL_OUTPUT_DIR="${ACTUAL_OUTPUT_DIR}/${TAG}" |
There was a problem hiding this comment.
We are not using the same python function per #275 (comment).
| Extract ORDER BY information from SQL using sqlglot. | ||
|
|
||
| Returns: | ||
| (sort_by, nulls_last) where sort_by is [(col_name, descending), ...] |
There was a problem hiding this comment.
Why do we only get it for the first sorted column?
|
|
||
| if limit is None: | ||
| # ORDER BY, no LIMIT — sort by non-float cols for tie-breaking | ||
| _assert_frames_equal(_sort_for_comparison(actual), _sort_for_comparison(expected)) |
There was a problem hiding this comment.
Why are we sorting the results in this case?
Summary
a. --reference-results-dir (explicit; missing directory is a fatal error)
b. PRESTO_EXPECTED_RESULTS_DIR env var (implicit; missing directory is a warning, validation skipped)
c. Auto-detection from benchmark_result.json (fallback; missing → not-validated)
The following updates to validation were added based on Polar's validation scripts:
Validation status priority: failed > not-validated > xfail > passed