Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 132 additions & 11 deletions headroom/perf/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
log = logging.getLogger(__name__)

LOG_DIR = _paths.log_dir()
DEFAULT_SLOW_OPTIMIZATION_MS = 500.0

# Matches: 2026-03-07 13:38:31,009 - headroom.proxy - INFO - [hr_...] PERF model=... ...
_PERF_RE = re.compile(
Expand Down Expand Up @@ -536,15 +537,31 @@ def format_report(report: PerfReport) -> str:
# Optimization latency
opt_times = [r.optimization_ms for r in records if r.optimization_ms > 0]
if opt_times:
avg_opt = sum(opt_times) / len(opt_times)
max_opt = max(opt_times)
overhead = build_overhead_summary(report)
opt_stats = overhead["optimization_ms"]
lines.append("Optimization Overhead")
lines.append("-" * 40)
lines.append(f" Average: {avg_opt:.0f}ms")
lines.append(f" Max: {max_opt:.0f}ms")
slow = [t for t in opt_times if t > 500]
if slow:
lines.append(f" >500ms: {len(slow)} requests")
lines.append(f" Average: {opt_stats['average_ms']:.0f}ms")
lines.append(
" p50/p95/p99: "
f"{opt_stats['p50_ms']:.0f} / "
f"{opt_stats['p95_ms']:.0f} / "
f"{opt_stats['p99_ms']:.0f}ms"
)
lines.append(f" Max: {opt_stats['max_ms']:.0f}ms")
if opt_stats["slow_request_count"]:
lines.append(
f" >{overhead['slow_threshold_ms']:.0f}ms: "
f"{opt_stats['slow_request_count']} requests "
f"({opt_stats['slow_request_pct']:.1f}%)"
)
if overhead["stage_breakdown"]:
lines.append(" Slowest stages:")
for stage in overhead["stage_breakdown"][:3]:
lines.append(
f" {stage['stage']}: total {stage['total_ms']:.0f}ms, "
f"p95 {stage['p95_ms']:.0f}ms, max {stage['max_ms']:.0f}ms"
)
lines.append("")

# Throughput
Expand Down Expand Up @@ -723,6 +740,102 @@ def _percentile(data: list[float], pct: float) -> float:
return sorted_data[lower]


def _latency_stats(values: list[float]) -> dict[str, float | int]:
if not values:
return {
"count": 0,
"average_ms": 0.0,
"p50_ms": 0.0,
"p95_ms": 0.0,
"p99_ms": 0.0,
"max_ms": 0.0,
}
return {
"count": len(values),
"average_ms": round(sum(values) / len(values), 2),
"p50_ms": round(_percentile(values, 0.50), 2),
"p95_ms": round(_percentile(values, 0.95), 2),
"p99_ms": round(_percentile(values, 0.99), 2),
"max_ms": round(max(values), 2),
}


def _slowest_stage(record: PerfRecord) -> tuple[str | None, float]:
stages = [(name, value) for name, value in record.stages.items() if value > 0]
if not stages:
return None, 0.0
return max(stages, key=lambda item: item[1])


def build_overhead_summary(
report: PerfReport,
*,
slow_threshold_ms: float = DEFAULT_SLOW_OPTIMIZATION_MS,
limit: int = 10,
) -> dict:
"""Aggregate optimization latency and stage-level bottlenecks."""
records = report.perf_records
opt_times = [r.optimization_ms for r in records if r.optimization_ms > 0]
total_times = [r.total_ms for r in records if r.total_ms > 0]
slow_records = [r for r in records if r.optimization_ms > slow_threshold_ms]

stage_values: dict[str, list[float]] = {}
for record in records:
for stage, value in record.stages.items():
if value > 0:
stage_values.setdefault(stage, []).append(value)

stage_breakdown = []
for stage, values in stage_values.items():
stats = _latency_stats(values)
stage_breakdown.append(
{
"stage": stage,
**stats,
"total_ms": round(sum(values), 2),
}
)
stage_breakdown.sort(key=lambda row: row["total_ms"], reverse=True)

top_slow_requests = []
for record in sorted(
[r for r in records if r.optimization_ms > 0],
key=lambda r: r.optimization_ms,
reverse=True,
)[:limit]:
slowest_stage, slowest_stage_ms = _slowest_stage(record)
top_slow_requests.append(
{
"timestamp": record.timestamp,
"request_id": record.request_id,
"model": record.model,
"client": record.client,
"optimization_ms": record.optimization_ms,
"total_ms": record.total_ms,
"tokens_before": record.tokens_before,
"tokens_after": record.tokens_after,
"tokens_saved": record.tokens_saved,
"slowest_stage": slowest_stage,
"slowest_stage_ms": slowest_stage_ms,
"transforms": record.transforms,
}
)

return {
"slow_threshold_ms": slow_threshold_ms,
"optimization_ms": {
**_latency_stats(opt_times),
"slow_request_count": len(slow_records),
"slow_request_pct": round(len(slow_records) / len(records) * 100, 1)
if records
else 0.0,
},
"total_ms": _latency_stats(total_times),
"stage_breakdown": stage_breakdown,
"top_slow_requests": top_slow_requests,
}


def calculate_throughput(report: PerfReport) -> dict:
records = report.perf_records
parsed_records = []
Expand Down Expand Up @@ -902,6 +1015,7 @@ def build_perf_summary(report: PerfReport) -> dict:
"cache_hit_pct": cache_hit_pct,
"by_model": by_model,
"by_transform": by_transform,
"overhead": build_overhead_summary(report),
"throughput": calculate_throughput(report),
"log_files_read": report.log_files_read,
"total_lines_parsed": report.total_lines_parsed,
Expand Down Expand Up @@ -1027,11 +1141,18 @@ def _generate_recommendations(report: PerfReport) -> list[str]:
)

# Optimization latency
slow = [r for r in report.perf_records if r.optimization_ms > 500]
if len(slow) > len(report.perf_records) * 0.2:
overhead = build_overhead_summary(report)
opt_stats = overhead["optimization_ms"]
slow_count = int(opt_stats["slow_request_count"])
if slow_count > len(report.perf_records) * 0.2:
stage_hint = ""
if overhead["stage_breakdown"]:
stage_hint = f"; top stage: {overhead['stage_breakdown'][0]['stage']}"
recs.append(
f"{len(slow)} requests took >500ms for optimization — "
"consider reducing transform pipeline"
f"{slow_count} requests took >{overhead['slow_threshold_ms']:.0f}ms "
"for optimization"
f"{stage_hint} — consider reducing heavy transforms or lowering "
"HEADROOM_COMPRESSION_TIMEOUT_SECONDS"
)

if report.router_records:
Expand Down
46 changes: 46 additions & 0 deletions tests/test_cli_perf_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
PerfRecord,
PerfReport,
TransformRecord,
build_overhead_summary,
build_perf_summary,
perf_records_as_dicts,
)
Expand Down Expand Up @@ -116,6 +117,49 @@ def test_build_perf_summary_empty_report_no_zero_division():
assert summary["savings_pct"] == 0.0
assert summary["cache_hit_pct"] == 0.0
assert summary["by_model"] == []
assert summary["overhead"]["optimization_ms"]["count"] == 0


def test_build_overhead_summary_attributes_slow_stages():
report = PerfReport(
perf_records=[
PerfRecord(
timestamp="2026-06-05 10:00:00,000",
request_id="fast",
model="gpt-5",
tokens_before=1000,
tokens_after=500,
tokens_saved=500,
optimization_ms=100.0,
total_ms=300.0,
stages={"cache_align": 10.0, "content_router": 90.0},
),
PerfRecord(
timestamp="2026-06-05 10:01:00,000",
request_id="slow",
model="gpt-5",
tokens_before=1000,
tokens_after=500,
tokens_saved=500,
optimization_ms=700.0,
total_ms=900.0,
stages={"kompress": 650.0, "content_router": 40.0},
),
]
)

overhead = build_overhead_summary(report, slow_threshold_ms=500.0)

assert overhead["optimization_ms"]["count"] == 2
assert overhead["optimization_ms"]["average_ms"] == 400.0
assert overhead["optimization_ms"]["p50_ms"] == 400.0
assert overhead["optimization_ms"]["p95_ms"] == 670.0
assert overhead["optimization_ms"]["p99_ms"] == 694.0
assert overhead["optimization_ms"]["slow_request_count"] == 1
assert overhead["stage_breakdown"][0]["stage"] == "kompress"
assert overhead["stage_breakdown"][0]["total_ms"] == 650.0
assert overhead["top_slow_requests"][0]["request_id"] == "slow"
assert overhead["top_slow_requests"][0]["slowest_stage"] == "kompress"


def test_perf_records_as_dicts_roundtrips_fields():
Expand Down Expand Up @@ -144,6 +188,7 @@ def test_perf_json_format(runner, monkeypatch):
assert data["savings_pct"] == 50.0
assert "by_model" in data
assert data["total_requests"] == 2
assert data["overhead"]["optimization_ms"]["p95_ms"] == 11.8


def test_perf_json_raw_is_array(runner, monkeypatch):
Expand Down Expand Up @@ -214,6 +259,7 @@ def test_perf_text_default_unchanged(runner, monkeypatch):
result = runner.invoke(main, ["perf"])
assert result.exit_code == 0, result.output
assert "Headroom Performance Report" in result.output
assert "p50/p95/p99" in result.output


def test_perf_rejects_unknown_format(runner, monkeypatch):
Expand Down
Loading