Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 158 additions & 22 deletions benchmark_reporting_tools/post_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

../benchmark-root/
├── benchmark.json # optional
├── benchmark_result.json
├── configs # optional
│ ├── coordinator.config
│ └── worker.config
├── logs # optional
│ └── slurm-4575179.out
└── result_dir
└── benchmark_result.json
└── logs # optional
└── slurm-4575179.out

Usage:
python benchmark_reporting_tools/post_results.py /path/to/benchmark/dir \
Expand Down Expand Up @@ -65,10 +64,12 @@ class BenchmarkMetadata:
timestamp: datetime
execution_number: int
n_workers: int
node_count: int | None = None
scale_factor: int
gpu_count: int
num_drivers: int
worker_image: str | None = None
image_digest: str | None = None
gpu_name: str
engine: str

Expand All @@ -81,6 +82,29 @@ def from_file(cls, file_path: Path) -> "BenchmarkMetadata":

return cls(**data)

@classmethod
def from_results_context(cls, context: dict) -> "BenchmarkMetadata":
"""Construct from the context dict embedded in benchmark_result.json."""
timestamp_str = context["timestamp"]
timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
engine = context["engine"]
is_cpu = "cpu" in engine
return cls(
kind=context["kind"],
benchmark=context.get("benchmark", "tpch"),
timestamp=timestamp,
execution_number=context.get("execution_number", 1),
n_workers=int(context["n_workers"]),
node_count=int(context["node_count"]) if "node_count" in context else None,
scale_factor=int(context["scale_factor"]),
gpu_count=0 if is_cpu else int(context["gpu_count"]),
gpu_name="N/A" if is_cpu else context["gpu_name"],
num_drivers=int(context["num_drivers"]),
worker_image=context.get("worker_image"),
image_digest=context.get("image_digest"),
engine=engine,
)

def serialize(self) -> dict:
out = dataclasses.asdict(self)
out["timestamp"] = out["timestamp"].isoformat()
Expand Down Expand Up @@ -195,8 +219,9 @@ def parse_args() -> argparse.Namespace:
)
parser.add_argument(
"--identifier-hash",
help="Unique identifier hash for software environment (e.g. a container image digest).",
required=True,
help="Unique identifier hash for software environment (e.g. a container image digest). "
"If omitted, the image_digest from benchmark_result.json context is used.",
default=None,
)
parser.add_argument(
"--version",
Expand Down Expand Up @@ -235,6 +260,26 @@ def parse_args() -> argparse.Namespace:
help="Benchmark definition name",
required=True,
)
parser.add_argument(
"--velox-branch",
default=None,
help="Velox branch used to build the worker image",
)
parser.add_argument(
"--velox-repo",
default=None,
help="Velox repository used to build the worker image",
)
parser.add_argument(
"--presto-branch",
default=None,
help="Presto branch used to build the worker image",
)
parser.add_argument(
"--presto-repo",
default=None,
help="Presto repository used to build the worker image",
)
parser.add_argument(
"--concurrency-streams",
help="Number of concurrency streams to use for the benchmark run",
Expand All @@ -260,7 +305,13 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--execution-number", help="Execution number of the benchmark run", type=int, default=1)
parser.add_argument(
"--n-workers",
help="Number of workers in the benchmark run",
help="Number of GPU workers in the benchmark run",
type=int,
default=None,
)
parser.add_argument(
"--node-count",
help="Number of cluster nodes in the benchmark run",
type=int,
default=None,
)
Expand Down Expand Up @@ -329,6 +380,11 @@ def build_submission_payload(
is_official: bool,
asset_ids: list[int] | None = None,
concurrency_streams: int = 1,
velox_branch: str | None = None,
velox_repo: str | None = None,
presto_branch: str | None = None,
presto_repo: str | None = None,
validation_results: dict | None = None,
) -> dict:
"""Build a BenchmarkSubmission payload from parsed dataclasses.

Expand Down Expand Up @@ -364,10 +420,24 @@ def build_submission_payload(
# Sort query names for consistent ordering (Q1, Q2, ..., Q22)
query_names = sorted(raw_times.keys(), key=lambda x: int(x[1:]))

per_query_validation = (validation_results or {}).get("queries", {})

for query_name in query_names:
times = raw_times[query_name]
is_failed = query_name in failed_queries

# Look up validation result for this query (keys are lowercase e.g. "q1")
vkey = "q" + query_name.lstrip("Q").lower()
vdata = per_query_validation.get(vkey)
validation_result = (
{
"status": "expected-failure" if vdata["status"] == "xfail" else vdata["status"],
"message": vdata.get("message"),
}
if vdata
else {"status": "not-validated"}
)

# Each execution becomes a separate query log entry
for exec_idx, runtime_ms in enumerate(times):
if is_failed:
Expand All @@ -384,6 +454,7 @@ def build_submission_payload(
"extra_info": {
"execution_number": exec_idx + 1,
},
"validation_result": validation_result,
}
)
execution_order += 1
Expand Down Expand Up @@ -426,13 +497,21 @@ def build_submission_payload(
"commit_hash": commit_hash,
},
"run_at": benchmark_metadata.timestamp.isoformat(),
"node_count": benchmark_metadata.n_workers,
"node_count": benchmark_metadata.node_count,
"gpu_count": benchmark_metadata.gpu_count,
"query_logs": query_logs,
"concurrency_streams": concurrency_streams,
"engine_config": engine_config.serialize() if engine_config else {},
"engine_config": {
**(engine_config.serialize() if engine_config else {}),
"velox_branch": velox_branch,
"velox_repo": velox_repo,
"presto_branch": presto_branch,
"presto_repo": presto_repo,
},
"extra_info": extra_info,
"is_official": is_official,
"asset_ids": asset_ids,
"validation_status": "expected-failure" if (validation_results or {}).get("overall_status") == "xfail" else (validation_results or {}).get("overall_status", "not-validated"),
}


Expand Down Expand Up @@ -526,11 +605,16 @@ async def process_benchmark_dir(
benchmark_definition_name: str,
# all the optional arguments for when benchmark.json is not present.
concurrency_streams: int = 1,
velox_branch: str | None = None,
velox_repo: str | None = None,
presto_branch: str | None = None,
presto_repo: str | None = None,
kind: str | None = None,
benchmark: str | None = None,
timestamp: str | None = None,
execution_number: int = 1,
n_workers: int | None = None,
node_count: int | None = None,
scale_factor: int | None = None,
gpu_count: int | None = None,
gpu_name: str | None = None,
Expand All @@ -544,15 +628,39 @@ async def process_benchmark_dir(
"""
print(f"\nProcessing: {benchmark_dir}", file=sys.stderr)

# Load metadata, results, and config
# Load results file — also used as the primary metadata source via its context.
result_file = benchmark_dir / "benchmark_result.json"
try:
result_data = json.loads(result_file.read_text())
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f" Error loading results: {e}", file=sys.stderr)
return 1

# benchmark.json is only optionally written out.
# We give preference to getting this from the user CLI options,
# falling back to
context = result_data.get("context", {})

# Determine metadata source: context > benchmark.json > CLI args.
benchmark_json_path = benchmark_dir / "benchmark.json"

if not benchmark_json_path.exists():
# Resolve identifier_hash: CLI arg > context image_digest > "unknown"
resolved_identifier_hash = identifier_hash or context.get("image_digest") or "unknown"
if resolved_identifier_hash == "unknown":
print(" Warning: image_digest not found in benchmark_result.json context and --identifier-hash not provided; using 'unknown'", file=sys.stderr)

if "kind" in context:
print(" Loading metadata from benchmark_result.json context...", file=sys.stderr)
try:
benchmark_metadata = BenchmarkMetadata.from_results_context(context)
except (KeyError, ValueError) as e:
print(f" Error loading metadata from results context: {e}", file=sys.stderr)
return 1
elif benchmark_json_path.exists():
print(" Loading metadata from benchmark.json...", file=sys.stderr)
try:
benchmark_metadata = BenchmarkMetadata.from_file(benchmark_json_path)
except (ValueError, json.JSONDecodeError, FileNotFoundError) as e:
print(f" Error loading metadata: {e}", file=sys.stderr)
return 1
else:
missing_args = []
if kind is None:
missing_args.append("kind")
Expand All @@ -562,6 +670,8 @@ async def process_benchmark_dir(
missing_args.append("timestamp")
if n_workers is None:
missing_args.append("n_workers")
if node_count is None:
missing_args.append("node_count")
if scale_factor is None:
missing_args.append("scale_factor")
if gpu_count is None:
Expand All @@ -585,28 +695,35 @@ async def process_benchmark_dir(
timestamp=datetime.fromisoformat(timestamp.replace("Z", "+00:00")), # type: ignore[union-attr]
execution_number=execution_number,
n_workers=n_workers, # type: ignore[arg-type]
node_count=node_count, # type: ignore[arg-type]
scale_factor=scale_factor, # type: ignore[arg-type]
gpu_count=gpu_count, # type: ignore[arg-type]
gpu_name=gpu_name, # type: ignore[arg-type]
num_drivers=num_drivers, # type: ignore[arg-type]
worker_image=worker_image,
engine=engine_name, # type: ignore[arg-type]
)
else:
try:
benchmark_metadata = BenchmarkMetadata.from_file(benchmark_dir / "benchmark.json")
except (ValueError, json.JSONDecodeError, FileNotFoundError) as e:
print(f" Error loading metadata: {e}", file=sys.stderr)
return 1

try:
results = BenchmarkResults.from_file(
benchmark_dir / "result_dir" / "benchmark_result.json", benchmark_name=benchmark_metadata.benchmark
benchmark_dir / "benchmark_result.json", benchmark_name=benchmark_metadata.benchmark
)
except (ValueError, json.JSONDecodeError, FileNotFoundError) as e:
print(f" Error loading results: {e}", file=sys.stderr)
return 1

validation_results_path = benchmark_dir / "validation_results.json"
if validation_results_path.exists():
print(" Loading validation results...", file=sys.stderr)
try:
validation_results = json.loads(validation_results_path.read_text())
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f" Warning: could not load validation results: {e}", file=sys.stderr)
validation_results = None
else:
print(" No validation results found.", file=sys.stderr)
validation_results = None

if (benchmark_dir / "configs").exists():
print(" Loading engine config...", file=sys.stderr)
engine_config = EngineConfig.from_dir(benchmark_dir / "configs")
Expand Down Expand Up @@ -640,12 +757,17 @@ async def process_benchmark_dir(
storage_configuration_name=storage_configuration_name,
cache_state=cache_state,
engine_name=engine_name,
identifier_hash=identifier_hash,
identifier_hash=resolved_identifier_hash,
version=version,
commit_hash=commit_hash,
is_official=is_official,
asset_ids=asset_ids,
concurrency_streams=concurrency_streams,
velox_branch=velox_branch,
velox_repo=velox_repo,
presto_branch=presto_branch,
presto_repo=presto_repo,
validation_results=validation_results,
)
except Exception as e:
print(f" Error building payload: {e}", file=sys.stderr)
Expand All @@ -657,6 +779,15 @@ async def process_benchmark_dir(
print(f" Identifier hash: {payload['query_engine']['identifier_hash']}", file=sys.stderr)
print(f" Node count: {payload['node_count']}", file=sys.stderr)
print(f" Query logs: {len(payload['query_logs'])}", file=sys.stderr)
print(f" Validation status: {payload['validation_status']}", file=sys.stderr)
xfail_queries = [
ql["query_name"]
for ql in payload["query_logs"]
if ql.get("validation_result", {}).get("status") == "xfail"
]
if xfail_queries:
unique_xfail = sorted(set(xfail_queries), key=lambda x: int(x))
print(f" XFailed queries: {unique_xfail}", file=sys.stderr)

if dry_run:
print("\n [DRY RUN] Payload:", file=sys.stderr)
Expand Down Expand Up @@ -721,11 +852,16 @@ async def main() -> int:
timeout=args.timeout,
upload_logs=args.upload_logs,
benchmark_definition_name=args.benchmark_name,
velox_branch=args.velox_branch,
velox_repo=args.velox_repo,
presto_branch=args.presto_branch,
presto_repo=args.presto_repo,
kind=args.kind,
benchmark=args.benchmark,
timestamp=args.timestamp,
execution_number=args.execution_number,
n_workers=args.n_workers,
node_count=args.node_count,
scale_factor=args.scale_factor,
gpu_count=args.gpu_count,
gpu_name=args.gpu_name,
Expand Down
2 changes: 2 additions & 0 deletions benchmark_reporting_tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
httpx>=0.28.1
polars>=1.0
Loading