Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 76 additions & 10 deletions benchmark_reporting_tools/post_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse, urlunparse

from typing import Any
import httpx

LARGE_ASSET_DIRECT_UPLOAD_THRESHOLD_BYTES = 10 * 1024 * 1024

@dataclasses.dataclass(kw_only=True)
class BenchmarkMetadata:
Expand Down Expand Up @@ -424,6 +425,8 @@ def build_submission_payload(

for query_name in query_names:
times = raw_times[query_name]
if times is None:
times = []
is_failed = query_name in failed_queries

# Look up validation result for this query (keys are lowercase e.g. "q1")
Expand Down Expand Up @@ -525,6 +528,50 @@ def build_http_client(api_url: str, api_key: str, timeout: float) -> httpx.Async
timeout=timeout,
)

async def _s3_presigned_put(
upload_url: str,
required_headers: dict[str, Any],
content: bytes,
timeout: float,
) -> tuple[int, str]:
headers = {str(k): str(v) for k, v in required_headers.items()}
async with httpx.AsyncClient(timeout=timeout) as s3_client:
response = await s3_client.put(upload_url, headers=headers, content=content)
return response.status_code, response.text


async def _upload_asset_presigned(
client: httpx.AsyncClient,
content: bytes,
filename: str,
title: str,
media_type: str,
timeout: float,
) -> int:
url_resp = await client.post(
"/api/assets/upload-url/",
json={"original_filename": filename, "media_type": media_type},
)
if url_resp.status_code not in (200, 201):
raise RuntimeError(f"Failed to get upload URL: {url_resp.status_code} {url_resp.text}")

presign = url_resp.json()
upload_url = presign["upload_url"]
s3_key = presign["s3_key"]
required_headers = presign.get("required_headers") or {}

put_status, put_body = await _s3_presigned_put(upload_url, required_headers, content, timeout)
if put_status not in (200, 204):
raise RuntimeError(f"S3 PUT failed: {put_status} {put_body}")

complete = await client.post(
"/api/assets/complete-upload/",
json={"s3_key": s3_key, "title": title, "media_type": media_type},
)
if complete.status_code != 201:
raise RuntimeError(f"Complete upload failed: {complete.status_code} {complete.text}")
return complete.json()["asset_id"]


async def upload_log_files(
benchmark_dir: Path,
Expand All @@ -546,6 +593,10 @@ async def upload_log_files(
List of asset IDs from the uploaded files
"""
log_files = sorted(benchmark_dir.glob("*.log"))
log_files.extend(sorted(benchmark_dir.glob("*.nsys-rep")))
metrics_dir = benchmark_dir / "metrics"
if metrics_dir.is_dir():
log_files.extend(sorted(metrics_dir.glob("*.json")))
if not log_files:
return []

Expand All @@ -558,15 +609,26 @@ async def _upload_one(log_file: Path) -> int:
async with semaphore:
print(f" Uploading {log_file.name}...", file=sys.stderr)
content = log_file.read_bytes()
response = await client.post(
"/api/assets/upload/",
files={"file": (log_file.name, content, "text/plain")},
data={"title": log_file.name, "media_type": "text/plain"},
)
if response.status_code >= 400:
raise RuntimeError(f"Failed to upload {log_file.name}: {response.status_code} {response.text}")
result = response.json()
asset_id = result["asset_id"]
if log_file.suffix == ".json":
media_type = "application/json"
elif log_file.suffix == ".nsys-rep":
media_type = "application/octet-stream"
else:
media_type = "text/plain"

if len(content) > LARGE_ASSET_DIRECT_UPLOAD_THRESHOLD_BYTES:
print(f" Using presigned upload for {log_file.name} ({len(content) // (1024 * 1024)} MiB)...", file=sys.stderr)
asset_id = await _upload_asset_presigned(client, content, log_file.name, log_file.name, media_type, timeout)
else:
response = await client.post(
"/api/assets/upload/",
files={"file": (log_file.name, content, media_type)},
data={"title": log_file.name, "media_type": media_type},
)
if response.status_code >= 400:
raise RuntimeError(f"Failed to upload {log_file.name}: {response.status_code} {response.text}")
asset_id = response.json()["asset_id"]

print(f" Uploaded {log_file.name} (asset_id={asset_id})", file=sys.stderr)
return asset_id

Expand Down Expand Up @@ -736,6 +798,10 @@ async def process_benchmark_dir(
if upload_logs:
if dry_run:
log_files = sorted(benchmark_dir.glob("*.log"))
log_files.extend(sorted(benchmark_dir.glob("*.nsys-rep")))
metrics_dir = benchmark_dir / "metrics"
if metrics_dir.is_dir():
log_files.extend(sorted(metrics_dir.glob("*.json")))
print(
f" [DRY RUN] Would upload {len(log_files)} log file(s): {[f.name for f in log_files]}", file=sys.stderr
)
Expand Down
16 changes: 15 additions & 1 deletion presto/scripts/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ OPTIONS:
stored inside a directory under the --output-dir path with a name matching the tag name.
Tags must contain only alphanumeric and underscore characters.
-p, --profile Enable profiling of benchmark queries.
--profile-script-path Path to a custom profiler functions script. Defaults to ./profiler_functions.sh.
--skip-drop-cache Skip dropping system caches before each benchmark query (dropped by default).
-m, --metrics Collect detailed metrics from Presto REST API after each query.
Metrics are stored in query-specific directories.
Expand Down Expand Up @@ -154,6 +155,15 @@ parse_args() {
PROFILE=true
shift
;;
--profile-script-path)
if [[ -n $2 ]]; then
PROFILE_SCRIPT_PATH=$2
shift 2
else
echo "Error: --profile-script-path requires a value"
exit 1
fi
;;
--skip-drop-cache)
SKIP_DROP_CACHE=true
shift
Expand Down Expand Up @@ -236,7 +246,11 @@ if [[ -n ${TAG} ]]; then
fi

if [[ "${PROFILE}" == "true" ]]; then
PYTEST_ARGS+=("--profile --profile-script-path $(readlink -f ./profiler_functions.sh)")
if [[ -z "${PROFILE_SCRIPT_PATH}" ]]; then
PROFILE_SCRIPT_PATH="$(readlink -f ./profiler_functions.sh)"
fi
PYTEST_ARGS+=("--profile --profile-script-path ${PROFILE_SCRIPT_PATH}")

fi

if [[ "${METRICS}" == "true" ]]; then
Expand Down
2 changes: 1 addition & 1 deletion presto/slurm/presto-nvl72/defaults.env
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ unset _vt_path
: "${HIVE_METASTORE_SOURCE:=/mnt/data/tpch-rs/HIVE-METASTORE-MG-260313}"

# --- SLURM node defaults (cluster-specific) ---
: "${DEFAULT_NODELIST:=presto-gb200-gcn-[01-13,15-16]}"
: "${DEFAULT_NODELIST:=presto-gb200-gcn-[01-16]}"
: "${DEFAULT_SINGLE_NODE:=presto-gb200-gcn-01}"
133 changes: 116 additions & 17 deletions presto/slurm/presto-nvl72/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ function run_coord_image {
srun -w $COORD --ntasks=1 --overlap \
--container-image=${coord_image} \
--container-remap-root \
--export=ALL,JAVA_HOME=/usr/lib/jvm/jre-17-openjdk \
--container-env=JAVA_HOME=/usr/lib/jvm/jre-17-openjdk \
--container-env=PATH=/usr/lib/jvm/jre-17-openjdk/bin:$PATH \
--export=ALL \
--container-mounts=${VT_ROOT}:/workspace,\
${coord_data}:/var/lib/presto/data,\
${CONFIGS}/etc_common:/opt/presto-server/etc,\
Expand All @@ -110,9 +108,7 @@ ${VT_ROOT}/.hive_metastore:/var/lib/presto/data/hive/metastore${extra_mounts} \
srun -w $COORD --ntasks=1 --overlap \
--container-remap-root \
--container-image=${coord_image} \
--export=ALL,JAVA_HOME=/usr/lib/jvm/jre-17-openjdk \
--container-env=JAVA_HOME=/usr/lib/jvm/jre-17-openjdk \
--container-env=PATH=/usr/lib/jvm/jre-17-openjdk/bin:$PATH \
--export=ALL \
--container-mounts=${VT_ROOT}:/workspace,\
${coord_data}:/var/lib/presto/data,\
${CONFIGS}/etc_common:/opt/presto-server/etc,\
Expand Down Expand Up @@ -178,8 +174,8 @@ function run_worker {
validate_environment_preconditions LOGS CONFIGS VT_ROOT COORD CUDF_LIB DATA

local gpu_id=$1 image=$2 node=$3 worker_id=$4
# Assign NUMA node based on GPU ID: GPUs 0-3 → node 0, GPUs 4-7 → node 1, etc.
local numa_node=$((gpu_id / 4))
# Assign NUMA node based on GPU ID: GPUs 0-1 → node 0, GPUs 2-3 → node 1, etc.
local numa_node=$((gpu_id / 2))
echo "running worker ${worker_id} with image ${image} on node ${node} with gpu_id ${gpu_id} numa_node ${numa_node}"

local worker_image="${IMAGE_DIR}/${image}.sqsh"
Expand All @@ -199,6 +195,54 @@ function run_worker {
mkdir -p ${worker_data}/hive/data/user_data
mkdir -p ${VT_ROOT}/.hive_metastore

local vt_cufile_log_dir="/var/log/cufile"
local vt_cufile_log="${vt_cufile_log_dir}/cufile_worker_${worker_id}.log"

local gds_mounts=""
function add_gds_sys_path {
local path="${1:?Path argument missing}"
local read_only="${2:-0}"

# System file path must exist
if [[ ! -e ${path} ]]; then
echo "${path} required by GDS does not exist"
exit 1
fi

# If gds_mounts is not empty, append a comma
[[ -n "${gds_mounts}" ]] && gds_mounts+=","

# Append path
gds_mounts+="${path}:${path}"
if [[ "${read_only}" == "1" ]]; then
gds_mounts+=":ro"
fi
}

if [[ "${ENABLE_GDS}" == "1" ]]; then
# Add GDS-required system paths
add_gds_sys_path "/run/udev" 1
add_gds_sys_path "/dev/infiniband"
add_gds_sys_path "/etc/cufile.json" 1
for dev in /dev/nvidia-fs*; do
# If file exists, append the path, otherwise, exit the loop
[[ -e "${dev}" ]] || continue
add_gds_sys_path "${dev}"
done
fi

local nsys_bin=""
local nsys_launch_opts=""
local vt_nsys_report_dir="/var/log/nsys"
if [[ "${ENABLE_NSYS}" == "1" && "${worker_id}" == "0" ]]; then
nsys_bin="/opt/nvidia/nsight-systems-cli/2025.5.1/bin/nsys"
nsys_launch_opts="-t nvtx,cuda"
# nsys_launch_opts="-t nvtx,cuda,osrt,ucx \
# --cuda-memory-usage=true \
# --cuda-um-cpu-page-faults=true \
# --cuda-um-gpu-page-faults=true"
fi

# The parent SLURM job allocates --gres=gpu:NUM_GPUS_PER_NODE so all GPU kernel
# capabilities are already set up for the job cgroup. Do NOT use --gres=gpu:1
# on the step: it restricts the step's cgroup to one GPU and then nvidia-container-cli
Expand All @@ -212,6 +256,8 @@ function run_worker {
# compat library with the host driver so cudaMallocAsync works.
# CUDA_VISIBLE_DEVICES=${gpu_id} inside the container restricts each worker to
# its assigned GPU while still allowing the CUDA driver to enumerate all devices.
# export GLOG_vmodule=IntraNodeTransferRegistry=3,ExchangeOperator=3
# export GLOG_logtostderr=1
srun -N1 -w $node --ntasks=1 --overlap \
--container-image=${worker_image} \
--container-remap-root \
Expand All @@ -224,19 +270,67 @@ ${worker_hive}:/opt/presto-server/etc/catalog/hive.properties,\
${worker_data}:/var/lib/presto/data,\
${DATA}:/var/lib/presto/data/hive/data/user_data,\
${VT_ROOT}/.hive_metastore:/var/lib/presto/data/hive/metastore,\
${LOGS}:${vt_cufile_log_dir},\
${LOGS}:${vt_nsys_report_dir},\
/usr/lib/aarch64-linux-gnu/libcuda.so.580.105.08:/usr/local/cuda-13.0/compat/libcuda.so.1,\
/usr/lib/aarch64-linux-gnu/libnvidia-ml.so.580.105.08:/usr/local/lib/libnvidia-ml.so.1 \
--container-env=LD_LIBRARY_PATH="$CUDF_LIB:$LD_LIBRARY_PATH" \
--container-env=GLOG_vmodule=IntraNodeTransferRegistry=3,ExchangeOperator=3 \
--container-env=GLOG_logtostderr=1 \
/usr/lib/aarch64-linux-gnu/libnvidia-ml.so.580.105.08:/usr/local/lib/libnvidia-ml.so.1\
${gds_mounts:+,${gds_mounts}} \
-- /bin/bash -c "
export LD_LIBRARY_PATH=\"${CUDF_LIB}:${LD_LIBRARY_PATH}\"
if [[ '${ENABLE_GDS}' == '1' ]]; then
export KVIKIO_COMPAT_MODE=OFF
export CUFILE_LOGFILE_PATH=${vt_cufile_log}
export CUFILE_LOGGING_LEVEL=INFO
fi
if [[ '${VARIANT_TYPE}' == 'gpu' ]]; then export CUDA_VISIBLE_DEVICES=${gpu_id}; fi
echo \"Worker ${worker_id}: CUDA_VISIBLE_DEVICES=\${CUDA_VISIBLE_DEVICES:-none}, NUMA_NODE=${numa_node}\"
if [[ '${USE_NUMA}' == '1' ]]; then
numactl --cpubind=${numa_node} --membind=${numa_node} /usr/bin/presto_server --etc-dir=/opt/presto-server/etc
echo \"Worker ${worker_id}: ENABLE_GDS=\${ENABLE_GDS:-unset}\"
echo \"Worker ${worker_id}: ENABLE_NSYS=\${ENABLE_NSYS:-unset}\"
echo \"Worker ${worker_id}: KVIKIO_COMPAT_MODE=\${KVIKIO_COMPAT_MODE:-unset}\"
echo \"Worker ${worker_id}: CUFILE_LOGFILE_PATH=\${CUFILE_LOGFILE_PATH:-unset}\"

if [[ -n '${nsys_bin}' ]]; then
(
echo \"Worker ${worker_id}: nsys subshell started\"
if [[ -n '${QUERIES:-}' ]]; then
IFS=',' read -ra qlist <<< '${QUERIES}'
else
qlist=({1..22})
fi
for qnum in \"\${qlist[@]}\"; do
qid=\"Q\${qnum}\"
while [[ ! -f ${vt_nsys_report_dir}/.nsys_start_token_\${qid} ]]; do
read -t 2 -r _ <<< '' || true
done
echo \"Worker ${worker_id}: start token found for \${qid}\"
rm ${vt_nsys_report_dir}/.nsys_start_token_\${qid}
${nsys_bin} start -o ${vt_nsys_report_dir}/nsys_worker_${worker_id}_\${qid} -f true; echo \"Worker ${worker_id}: nsys start exit code: \$?\"
echo \"Worker ${worker_id}: post-start token created for \${qid}\"
touch ${vt_nsys_report_dir}/.nsys_started_token_\${qid}

while [[ ! -f ${vt_nsys_report_dir}/.nsys_stop_token_\${qid} ]]; do
read -t 2 -r _ <<< '' || true
done
echo \"Worker ${worker_id}: stop token found for \${qid}\"
rm ${vt_nsys_report_dir}/.nsys_stop_token_\${qid}
${nsys_bin} stop; echo \"Worker ${worker_id}: nsys stop exit code: \$?\"
done
echo \"Worker ${worker_id}: nsys subshell done, all queries profiled\"
) &

echo \"Worker ${worker_id}: Nsight System program at ${nsys_bin}\"
echo \"Worker ${worker_id}: running nsys launch\"
${nsys_bin} launch ${nsys_launch_opts} /usr/bin/presto_server --etc-dir=/opt/presto-server/etc
echo \"Worker ${worker_id}: nsys launch exited with code: \$?\"
else
/usr/bin/presto_server --etc-dir=/opt/presto-server/etc
fi" > ${LOGS}/worker_${worker_id}.log 2>&1 &
if [[ '${USE_NUMA}' == '1' ]]; then
numactl --cpubind=${numa_node} --membind=${numa_node} /usr/bin/presto_server --etc-dir=/opt/presto-server/etc
else
/usr/bin/presto_server --etc-dir=/opt/presto-server/etc
fi
fi

" > ${LOGS}/worker_${worker_id}.log 2>&1 &
}

function copy_hive_metastore {
Expand Down Expand Up @@ -274,6 +368,11 @@ function run_queries {
[ $# -ne 2 ] && echo_error "$0 expected two arguments for '<iterations>' and '<scale_factor>'"
local num_iterations=$1
local scale_factor=$2
local extra_args=()
[[ "${ENABLE_METRICS}" == "1" ]] && extra_args+=("-m")
[[ "${ENABLE_NSYS}" == "1" ]] && extra_args+=("-p" "--profile-script-path" "/workspace/presto/slurm/presto-nvl72/profiler_functions.sh")
[[ -n "${QUERIES:-}" ]] && extra_args+=("-q" "${QUERIES}")

source "${SCRIPT_DIR}/defaults.env"
# We currently skip dropping cache because it requires docker (not available on the cluster).
run_coord_image "export PORT=$PORT; \
Expand All @@ -282,7 +381,7 @@ function run_queries {
export MINIFORGE_HOME=/workspace/miniforge3; \
export HOME=/workspace; \
cd /workspace/presto/scripts; \
./run_benchmark.sh -b tpch -s tpchsf${scale_factor} -i ${num_iterations} \
./run_benchmark.sh -b tpch -s tpchsf${scale_factor} -i ${num_iterations} ${extra_args[*]} \
--hostname ${COORD} --port $PORT -o /workspace/presto/slurm/presto-nvl72/result_dir --skip-drop-cache; \
echo 'Validating query results...'; \
MINIFORGE_HOME=/workspace/miniforge3 /workspace/scripts/run_py_script.sh \
Expand Down
Loading