diff --git a/CHANGELOG.md b/CHANGELOG.md index 464d93728..c1f1ccc4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Features +* **learn:** weight loops in `headroom learn`. A new loop detector (`headroom/learn/loops.py`) recognizes repeated tool-call patterns — including RTK re-fetch loops, where RTK's output truncation makes the agent re-run larger-limit variants of a *successful* command — collapses output-limit variants to one signature, measures the wasted tokens, surfaces loops as a highest-priority digest section, and weights loop guardrails above one-off rules by their measured waste. Previously loops had no special weight and a no-failure re-fetch loop was skipped entirely. Adds an RTK-loop eval (`benchmarks/rtk_loop_learn_eval.py`) that reproduces a loop, runs it through Learn, and asserts the generated guardrail ranks first and prevents re-triggering. * **learn:** write per-project learnings to the personal, gitignored `CLAUDE.local.md` by default instead of the team-shared `CLAUDE.md`, matching Claude Code's memory convention so machine-specific paths and tool-discovery byproducts no longer pollute the shared file. Adds a `--target` flag to override the destination (e.g. `--target CLAUDE.md` to opt back into the shared file, or any custom path), and auto-migrates a stale learned-patterns block out of an existing `CLAUDE.md` into `CLAUDE.local.md` with a warning ([#1072](https://github.com/chopratejas/headroom/issues/1072)). * **proxy:** measure and surface rolling and current token throughput metrics (active/wall-clock input, compression, effective forward, and streamed generation) in `headroom perf` CLI and the dashboard ([#959](https://github.com/chopratejas/headroom/issues/959)). * **vibe:** add Mistral Vibe CLI support with `headroom wrap vibe`. diff --git a/benchmarks/rtk_loop_learn_eval.py b/benchmarks/rtk_loop_learn_eval.py new file mode 100644 index 000000000..cd36fbf98 --- /dev/null +++ b/benchmarks/rtk_loop_learn_eval.py @@ -0,0 +1,287 @@ +"""RTK-loop eval — does Headroom Learn catch a loop and write a guardrail that +would prevent it recurring? + +This is the agentic eval for the loop-weighting work. It runs in two phases: + + Phase 1 — TRIGGER + LEARN + Reproduce an RTK re-fetch loop (a grep whose RTK-truncated output forces the + agent to re-run larger-limit variants), run it through ``SessionAnalyzer``, + and SCORE the resulting guardrail: + • produced — a loop guardrail was emitted at all + • ranked_first — it outranks the one-off rules (the weighting works) + • names_command — the rule identifies the command that looped + • prescribes_fix — the rule says how to avoid it (fetch full output once) + • weight_reflects — its savings estimate >= the MEASURED wasted tokens + + Phase 2 — GUARDRAIL HOLDS + Inject that guardrail as a prior learned pattern, then feed a session where + the agent FOLLOWED it (one full-output fetch, no loop). Re-run the analyzer + and assert NO new loop guardrail is produced for that command — i.e. once + the rule exists and is honored, the loop does not re-trigger and Learn does + not need to relearn it. + +Runs deterministically by default (a stubbed analyzer LLM so CI is hermetic). +With ``--real`` it drives the real analyzer LLM and scores the actually-generated +rule, using an API key (ANTHROPIC/OPENAI/GEMINI) or an installed CLI backend. + +Usage: + python benchmarks/rtk_loop_learn_eval.py # deterministic + python benchmarks/rtk_loop_learn_eval.py --real # real LLM (API key) + HEADROOM_LEARN_CLI=claude python benchmarks/rtk_loop_learn_eval.py --real # via CLI +""" + +from __future__ import annotations + +import argparse +import os +import sys +from contextlib import nullcontext +from dataclasses import dataclass, field +from pathlib import Path +from unittest.mock import patch + +# Allow running as a plain script from the repo root. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from headroom.learn.analyzer import SessionAnalyzer # noqa: E402 +from headroom.learn.fixtures import rtk_refetch_loop_session # noqa: E402 +from headroom.learn.loops import detect_loops # noqa: E402 +from headroom.learn.models import ( # noqa: E402 + ProjectInfo, + SessionData, + ToolCall, +) + +REPETITIONS = 6 + + +# ============================================================================= +# Deterministic LLM stub — stands in for the analyzer's _call_llm in CI. +# It mimics a competent model: emits the loop guardrail (under-estimating its +# savings, so the weighting layer has real work to do) plus a one-off rule the +# model would naively rank higher. In Phase 2 it emits NO loop rule, because a +# non-looping guarded session gives it nothing to relearn. +# ============================================================================= + + +def _stub_llm_phase1(digest: str, model: str) -> dict: + return { + "context_file_rules": [ + { + "section": "Use uv for Python", + "content": "Use `uv run python` instead of `python3`.", + "estimated_tokens_saved": 900, # model rates the one-off high + "evidence_count": 2, + }, + { + "section": "Avoid grep TimeoutError re-fetch loop", + "content": ( + "When searching logs for TimeoutError, capture the full " + "result once (grep into a file and read it) instead of " + "re-running grep with larger `head` limits." + ), + "estimated_tokens_saved": 150, # simulated low estimate (stub value, not a real-model figure) + "evidence_count": 1, + }, + ], + "memory_file_rules": [], + } + + +def _stub_llm_phase2(digest: str, model: str) -> dict: + # Guarded, non-looping session → nothing new to learn about the grep. + return {"context_file_rules": [], "memory_file_rules": []} + + +# ============================================================================= +# Scoring +# ============================================================================= + + +@dataclass +class Scorecard: + checks: dict[str, bool] = field(default_factory=dict) + notes: dict[str, str] = field(default_factory=dict) + + def add(self, name: str, passed: bool, note: str = "") -> None: + self.checks[name] = passed + if note: + self.notes[name] = note + + @property + def passed(self) -> bool: + return all(self.checks.values()) + + def render(self) -> str: + width = max(len(k) for k in self.checks) + lines = [] + for name, ok in self.checks.items(): + mark = "PASS" if ok else "FAIL" + note = f" ({self.notes[name]})" if name in self.notes else "" + lines.append(f" [{mark}] {name.ljust(width)}{note}") + return "\n".join(lines) + + +def _guarded_session() -> SessionData: + """A session where the agent followed the guardrail: one full-output fetch, + no re-fetch loop.""" + return SessionData( + session_id="guarded", + tool_calls=[ + ToolCall( + name="Bash", + tool_call_id="tc_0", + input_data={"command": "grep -rn 'TimeoutError' logs/ > /tmp/hits.txt"}, + output="(wrote 1240 matches to /tmp/hits.txt)", + is_error=False, + msg_index=0, + output_bytes=40, + ), + ToolCall( + name="Read", + tool_call_id="tc_1", + input_data={"file_path": "/tmp/hits.txt"}, + output="logs/app.log:42: TimeoutError ...", + is_error=False, + msg_index=1, + output_bytes=8000, + ), + ], + ) + + +def run_eval(*, use_real_llm: bool) -> Scorecard: + project = ProjectInfo( + name="rtk-loop-eval", + project_path=Path("/tmp/rtk-loop-eval"), + data_path=Path("/tmp/rtk-loop-eval-data"), + ) + card = Scorecard() + + # ---- Phase 1: trigger + learn ----------------------------------------- + loop_session = rtk_refetch_loop_session(repetitions=REPETITIONS) + loops = detect_loops([loop_session]) + measured_waste = loops[0].wasted_tokens if loops else 0 + card.add("loop_detected", bool(loops), f"{len(loops)} loop(s), ~{measured_waste:,} tok wasted") + + analyzer = SessionAnalyzer(model=None if use_real_llm else "stub") + phase1_ctx = ( + nullcontext() + if use_real_llm + else patch("headroom.learn.analyzer._call_llm", _stub_llm_phase1) + ) + with phase1_ctx: + result = analyzer.analyze(project, [loop_session]) + + recs = result.recommendations + loop_recs = [r for r in recs if r.is_loop_guardrail] + card.add("guardrail_produced", bool(loop_recs)) + + top = recs[0] if recs else None + card.add( + "ranked_first", + bool(top and top.is_loop_guardrail), + "" if (top and top.is_loop_guardrail) else "loop rule did not rank #1", + ) + + guardrail = loop_recs[0] if loop_recs else None + text = (guardrail.section + " " + guardrail.content).lower() if guardrail else "" + # The rule must identify the LOOPING COMMAND (grep + its output-limit shape), + # not the incidental search string — a good fix generalizes beyond it. (The + # real-LLM run surfaced this: the model wrote a general "grepping logs / `head + # -N` limits" rule and never echoed "TimeoutError", which an earlier + # literal-match check wrongly failed.) + card.add( + "names_command", + "grep" in text and any(k in text for k in ("head", "log", "limit")), + ) + card.add( + "prescribes_fix", + any(k in text for k in ("full", "once", "into a file", "instead", "limit")), + ) + card.add( + "weight_reflects_waste", + bool(guardrail and guardrail.estimated_tokens_saved >= measured_waste), + "" + if (guardrail and guardrail.estimated_tokens_saved >= measured_waste) + else f"savings {getattr(guardrail, 'estimated_tokens_saved', 0)} < waste {measured_waste}", + ) + + # ---- Phase 2: guardrail holds ----------------------------------------- + # Inject the produced guardrail as a prior pattern via the project's + # context file, then analyze a guarded (non-looping) session. + held = True + note = "" + if guardrail: + ctx_path = Path("/tmp/rtk-loop-eval-CLAUDE.md") + ctx_path.write_text( + "\n" + f"### {guardrail.section}\n{guardrail.content}\n" + "\n", + encoding="utf-8", + ) + project.context_file = ctx_path + phase2_ctx = ( + nullcontext() + if use_real_llm + else patch("headroom.learn.analyzer._call_llm", _stub_llm_phase2) + ) + with phase2_ctx: + held_result = analyzer.analyze(project, [_guarded_session()]) + # No NEW loop guardrail should be needed for the (now-guarded) grep. + new_loop_rules = [ + r + for r in held_result.recommendations + if r.is_loop_guardrail and "grep" in (r.section + r.content).lower() + ] + held = not new_loop_rules + note = "" if held else f"{len(new_loop_rules)} new grep loop rule(s) re-emitted" + else: + held = False + note = "no guardrail from phase 1 to test" + card.add("guardrail_holds", held, note) + + return card + + +def _real_backend_available() -> bool: + """True when the analyzer can reach a real LLM — API key or installed CLI.""" + import shutil + + if any(os.environ.get(k) for k in ("ANTHROPIC_API_KEY", "OPENAI_API_KEY", "GEMINI_API_KEY")): + return True + return any(shutil.which(cli) for cli in ("claude", "gemini", "codex")) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--real", + action="store_true", + help="Drive the real analyzer LLM — needs an API key (ANTHROPIC_API_KEY / " + "OPENAI_API_KEY / GEMINI_API_KEY) or an installed CLI backend " + "(claude / gemini / codex; force one with HEADROOM_LEARN_CLI=claude).", + ) + args = parser.parse_args() + + if args.real and not _real_backend_available(): + print( + "--real needs an LLM backend (API key or claude/gemini/codex CLI); " + "falling back to deterministic mode.\n" + ) + args.real = False + + mode = "REAL LLM" if args.real else "deterministic stub" + print(f"RTK-loop eval — mode: {mode}\n") + card = run_eval(use_real_llm=args.real) + print(card.render()) + print() + if card.passed: + print("RESULT: PASS — loop caught, guardrail ranked first, and it holds.") + return 0 + print("RESULT: FAIL — see failed checks above.") + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/docs/rtk-loop-weighting.md b/docs/rtk-loop-weighting.md new file mode 100644 index 000000000..e0b580a90 --- /dev/null +++ b/docs/rtk-loop-weighting.md @@ -0,0 +1,125 @@ +# Loop weighting in Headroom Learn + RTK-loop eval + +**Status:** proposed (branch `purva/rtk-loop-evals`). +**Context:** Tejas asked for (1) an eval that reproduces the RTK loop, runs it +through Headroom Learn, and checks the generated rule prevents re-triggering, +and (2) a change so Headroom Learn gives loops more weight. + +## The gap + +Before this change, `headroom learn` ranked every recommendation by a single +LLM-guessed `estimated_tokens_saved`, with a flat hardcoded `confidence` +(`0.9`/`0.7`). It had **no notion of a loop**. Two consequences: + +1. **RTK re-fetch loops were invisible.** RTK truncates a shell command's + output (`grep foo` → `grep foo | head -50`, see `docs/rtk-architecture.md`). + When the truncation drops what the agent needed, the agent re-runs a + *variant* to fetch more. **Those calls succeed** (`is_error=False`), so the + analyzer's failure-oriented path ignored them — and `analyze()` even + early-returned when a session had no failures and no events. + +2. **Even when surfaced, a loop ranked no higher than a one-off.** A pattern + that wastes 5,000 tokens by repeating 6× was ranked the same as a one-time + 200-token mistake, because ranking trusted the LLM's per-rule guess. + +## The change + +A new module `headroom/learn/loops.py`: + +- **`detect_loops(sessions)`** groups tool calls within a session by a + *canonical signature* that collapses RTK re-fetch variants (it strips + pagination/limit fragments — `head -N`, `-n N`, `LIMIT N`, … — and bare + integers), then flags any signature repeated `>= 3×`. It classifies each as + an `error-loop` or an `rtk-refetch-loop` and computes **measured** wasted + tokens (error loops waste every call; re-fetch loops credit the first, + legitimate call and count the N−1 redundant re-fetches). +- **`format_loops_for_digest(loops)`** prepends a `=== Detected Loops (HIGHEST + PRIORITY) ===` block to the LLM digest, with each loop's measured waste. +- **`apply_loop_weighting(recs, loops)`** raises a matching recommendation's + `estimated_tokens_saved` to at least the loop's measured waste and tags it + `is_loop_guardrail=True`. Because measured loop waste aggregates many + repetitions, this reliably lifts loop guardrails above one-offs **without + trusting the LLM** to have weighted them. + +Wiring in `analyzer.py`: loops are detected up front (so a no-failure re-fetch +loop is now a first-class reason to analyze, fixing the early-return), surfaced +in the digest, the system prompt makes loops the #1 priority, and weighting + +re-sort run after parsing. + +### Why measured-waste weighting (vs. relying on the LLM's estimate) + +The LLM's `estimated_tokens_saved` is a free-form guess, not grounded in the +transcript, so ranking on it alone is unreliable. Deriving the weight from +*observed repetition* — the real output bytes summed across the repeated calls +— is deterministic and auditable: the boost equals waste we actually counted. + +Honest caveat on the current implementation: we do BOTH — the digest also tells +the model the measured waste and asks it to rank loops first. In real-LLM runs +that prompt hint is doing much of the work (the model echoes the measured +figure), while the post-hoc `apply_loop_weighting` boost is fuzzy-match-based +and does not always fire. Making the measured-waste boost the deterministic, +load-bearing mechanism — independent of the model's wording — is tracked as +follow-up. + +## The eval + +`benchmarks/rtk_loop_learn_eval.py` (CI wrapper: `tests/test_learn/ +test_rtk_loop_eval.py`). Two phases: + +- **Phase 1 — trigger + learn:** reproduce the RTK re-fetch loop, run the + analyzer, and score the guardrail: produced? ranked first? names the command? + prescribes a fix? does its savings estimate reflect measured waste? +- **Phase 2 — guardrail holds:** inject that guardrail as a prior pattern, feed + a session where the agent *followed* it (one full-output fetch, no loop), and + assert no new loop guardrail is re-emitted — i.e. once the rule exists and is + honored, the loop does not re-trigger. + +Runs deterministically in CI (stubbed analyzer LLM) and against a real LLM with +`--real` — via an API key or an installed CLI backend (`HEADROOM_LEARN_CLI=claude`). + +``` +$ python benchmarks/rtk_loop_learn_eval.py + [PASS] loop_detected (1 loop(s), ~5,005 tok wasted) + [PASS] guardrail_produced + [PASS] ranked_first + [PASS] names_command + [PASS] prescribes_fix + [PASS] weight_reflects_waste + [PASS] guardrail_holds + RESULT: PASS — loop caught, guardrail ranked first, and it holds. +``` + +### Real-LLM run (claude CLI backend) + +Running `--real` against the actual analyzer model proved the weighting works +end-to-end *and* caught an over-brittle check. The model produced this rule, +ranked **first** with the measured 5,005-token weight: + +> **Commands** — When grepping logs (or any large file), never loop with +> increasing `| head -N` limits — tool output is capped at ~4 KB regardless of +> N, so repeated attempts return identical bytes. Instead: redirect to a temp +> file (`grep ... > /tmp/out.txt`) then read it, or use `grep -c` first… + +That rule is *more general* than the fixture's — it identifies the looping +command (`grep` + `head -N`) without echoing the incidental search string. An +early `names_command` check required the literal "TimeoutError" and wrongly +failed; the real run exposed it, and the check now verifies the rule names the +looping command, not an incidental literal. This is exactly why the governance +treats real output — not mocks — as proof. + +## Honest limitations / open questions for review + +- **Phase 2 is a non-recurrence check, not a live agent.** It proves the + guardrail is *adequate* (names the command, prescribes the fix) and that a + guarded, non-looping session produces no new rule. It does **not** run a real + agent that obeys the rule end-to-end — that needs a live agent harness and is + the natural next step if we want a stronger claim. +- **Loop signature is heuristic.** The pagination-stripping regex covers the + common RTK truncation shapes (`head`/`tail`/`-n`/`LIMIT`/`OFFSET`); exotic + truncations may not collapse to one signature. Easy to extend as we see real + transcripts. +- **`min_occurrences = 3`** treats a single retry as not-yet-a-loop. If we have + data showing 2× re-fetches are already worth a rule, lower it. +- **Matching rules to loops is fuzzy** (token overlap between the rule text and + the looped command). A structured loop→rule id from the LLM would be tighter + but adds prompt/parse surface. diff --git a/headroom/learn/analyzer.py b/headroom/learn/analyzer.py index 110ba6046..b554f7e2a 100644 --- a/headroom/learn/analyzer.py +++ b/headroom/learn/analyzer.py @@ -24,6 +24,7 @@ import time import typing +from .loops import LoopPattern, apply_loop_weighting, detect_loops, format_loops_for_digest from .models import ( AnalysisResult, ProjectInfo, @@ -157,11 +158,17 @@ def analyze(self, project: ProjectInfo, sessions: list[SessionData]) -> Analysis total_failures=len(failed_calls), ) - if not failed_calls and not any(s.events for s in sessions): + # Detect loops up front: an RTK re-fetch loop has NO failed calls + # (each truncated command succeeds), so it must be a first-class reason + # to analyze — otherwise the guard below would skip the most expensive + # waste pattern whenever a session has no failures and no events. + loops = detect_loops(sessions) + + if not failed_calls and not loops and not any(s.events for s in sessions): return result - # Build compact digest of all sessions - digest = _build_digest(project, sessions) + # Build compact digest of all sessions, leading with detected loops. + digest = _build_digest(project, sessions, loops=loops) # Resolve model (auto-detect if not specified) model = self.model or _detect_default_model() @@ -170,6 +177,9 @@ def analyze(self, project: ProjectInfo, sessions: list[SessionData]) -> Analysis try: raw = _call_llm(digest, model) result.recommendations = _parse_llm_response(raw) + # Weight loop guardrails above one-off rules using MEASURED waste. + apply_loop_weighting(result.recommendations, loops) + result.recommendations.sort(key=lambda r: r.estimated_tokens_saved, reverse=True) except Exception as e: logger.warning("LLM analysis failed: %s", e) # Return result with stats but no recommendations @@ -221,15 +231,27 @@ def _build_prior_patterns_section(project: ProjectInfo) -> str: return "\n".join(lines) -def _build_digest(project: ProjectInfo, sessions: list[SessionData]) -> str: +def _build_digest( + project: ProjectInfo, + sessions: list[SessionData], + loops: list[LoopPattern] | None = None, +) -> str: """Build a token-efficient text digest of all session events. The digest includes: - Project context + - Detected loops (highest priority) — repeated patterns + measured waste - Prior learned patterns (if any) from CLAUDE.md / MEMORY.md - Per-session summaries with condensed event streams - Error outputs (truncated), success indicators, user messages + + ``loops`` is computed by the caller (``SessionAnalyzer.analyze``) and passed + in to avoid detecting twice; when omitted it is detected here so callers + that build a digest directly still surface loops. """ + if loops is None: + loops = detect_loops(sessions) + lines: list[str] = [] # Project header @@ -248,6 +270,12 @@ def _build_digest(project: ProjectInfo, sessions: list[SessionData]) -> str: lines.append(f"Tokens used: {total_tokens_in:,} in / {total_tokens_out:,} out") lines.append("") + # Detected loops first — the most expensive waste pattern, so the LLM sees + # it before the (budget-truncatable) per-session event stream. + loop_section = format_loops_for_digest(loops) + if loop_section: + lines.append(loop_section) + # Prior learned patterns (if any) — gives the LLM the current baseline so # it can produce complete updated sections instead of condensed deltas. prior_section = _build_prior_patterns_section(project) @@ -351,15 +379,24 @@ def _format_tool_call(tc: ToolCall) -> str: You will receive a digest of tool call sessions from a coding agent (Claude Code, Codex, etc.). Your job is to identify patterns that, if documented, would PREVENT TOKEN WASTE in future sessions. -Focus on: -1. **Environment rules** — what runtime commands work vs fail (e.g., "use uv run python, not python3") -2. **File structure facts** — known large files, correct paths, search scopes -3. **User preferences** — things the user corrected, rejected, or explicitly requested -4. **Failure patterns** — repeated failures that could be prevented with upfront knowledge -5. **Workflow rules** — subagent guidance, command execution preferences -6. **Token waste hotspots** — patterns that waste the most tokens (re-reads, wrong paths, retries) +Focus on (in priority order): +1. **Loops (HIGHEST PRIORITY)** — patterns that REPEATED within a session. If the + digest has a "Detected Loops" section, every loop there MUST get a guardrail + rule, because loop waste scales with repetition. This includes RTK re-fetch + loops: a command whose output was truncated, so the agent re-ran variants of + it to fetch more. The fix names the command and prescribes getting the full + output up front (e.g., "read the whole file" / "raise the output limit for X"). +2. **Environment rules** — what runtime commands work vs fail (e.g., "use uv run python, not python3") +3. **File structure facts** — known large files, correct paths, search scopes +4. **User preferences** — things the user corrected, rejected, or explicitly requested +5. **Failure patterns** — repeated failures that could be prevented with upfront knowledge +6. **Workflow rules** — subagent guidance, command execution preferences +7. **Token waste hotspots** — patterns that waste the most tokens (re-reads, wrong paths, retries) Rules: +- A loop in the "Detected Loops" section is sufficient evidence on its own — emit + its guardrail even if it appears only once as a loop, and set its + estimated_tokens_saved to at least the measured wasted tokens reported there. - Only include patterns with CLEAR evidence from the data (2+ occurrences or explicit user direction) - Every recommendation must be specific and actionable (not "be careful" but "use X instead of Y") - Estimate tokens saved per recommendation (how many tokens would be saved per session if this rule existed) diff --git a/headroom/learn/fixtures.py b/headroom/learn/fixtures.py new file mode 100644 index 000000000..183b9d8da --- /dev/null +++ b/headroom/learn/fixtures.py @@ -0,0 +1,112 @@ +"""Synthetic session fixtures that reproduce known waste patterns. + +These build :class:`SessionData` shaped like the real patterns Headroom Learn +must catch, so both unit tests and the RTK-loop eval (``benchmarks/ +rtk_loop_learn_eval.py``) drive the analyzer from one source of truth instead +of hand-mocking calls inline. + +The headline fixture is the **RTK re-fetch loop**. RTK truncates a shell +command's output; when the truncation drops what the agent needed, the agent +re-runs a *variant* to fetch more. Critically these calls SUCCEED +(``is_error=False``) — the loop is invisible to failure-only analysis, which +is exactly why it was historically under-weighted. +""" + +from __future__ import annotations + +from .models import ErrorCategory, SessionData, ToolCall + + +def _tc( + name: str, + command: str, + output: str, + *, + msg_index: int, + is_error: bool = False, + error_category: ErrorCategory = ErrorCategory.UNKNOWN, +) -> ToolCall: + """Build a ToolCall, keying input on the field the tool's summary reads.""" + if name.lower() in ("bash", "shell"): + input_data = {"command": command} + elif name.lower() in ("read",): + input_data = {"file_path": command} + elif name.lower() in ("grep",): + input_data = {"pattern": command} + else: + input_data = {"command": command} + return ToolCall( + name=name, + tool_call_id=f"tc_{msg_index}", + input_data=input_data, + output=output, + is_error=is_error, + error_category=error_category if is_error else ErrorCategory.UNKNOWN, + msg_index=msg_index, + output_bytes=len(output), + ) + + +def rtk_refetch_loop_session( + session_id: str = "rtk-loop", + *, + repetitions: int = 5, + bytes_per_call: int = 4000, +) -> SessionData: + """A session where RTK truncation forces repeated re-fetches of one command. + + The agent greps a large log; RTK rewrites each invocation with an output + limit. Each call succeeds but returns a truncated window, so the agent + bumps the limit / shifts the window and re-runs — ``repetitions`` times. + None of the calls error. The fix a good guardrail should produce: fetch the + full result up front (e.g., disable RTK truncation for this command, or + grep into a file and read it once). + """ + calls: list[ToolCall] = [] + limit = 50 + for i in range(repetitions): + # Same base command; only the output-limit varies — the RTK signature. + command = f"grep -rn 'TimeoutError' logs/ | head -{limit}" + output = "logs/app.log:" + ("x" * (bytes_per_call - 20)) + "\n(truncated)" + calls.append(_tc("Bash", command, output, msg_index=i * 2)) + limit += 50 # agent asks for more next time — still truncated + return SessionData(session_id=session_id, tool_calls=calls) + + +def error_loop_session( + session_id: str = "error-loop", + *, + repetitions: int = 4, +) -> SessionData: + """A session where the same call fails repeatedly (classic retry loop).""" + calls: list[ToolCall] = [] + for i in range(repetitions): + calls.append( + _tc( + "Bash", + "python3 run_tests.py", + "python3: command not found", + msg_index=i * 2, + is_error=True, + error_category=ErrorCategory.COMMAND_NOT_FOUND, + ) + ) + return SessionData(session_id=session_id, tool_calls=calls) + + +def one_off_error_session(session_id: str = "one-off") -> SessionData: + """A session with a single, non-repeated failure — should NOT be a loop.""" + return SessionData( + session_id=session_id, + tool_calls=[ + _tc( + "Read", + "/etc/missing.conf", + "Error: file not found", + msg_index=0, + is_error=True, + error_category=ErrorCategory.FILE_NOT_FOUND, + ), + _tc("Bash", "ls -la", "total 8\ndrwxr-xr-x", msg_index=1), + ], + ) diff --git a/headroom/learn/loops.py b/headroom/learn/loops.py new file mode 100644 index 000000000..3990cb23c --- /dev/null +++ b/headroom/learn/loops.py @@ -0,0 +1,225 @@ +"""Loop detection for Headroom Learn — find repeated tool-call patterns. + +A *loop* is the single highest-value pattern for `headroom learn` to catch, +because its token waste scales with the number of repetitions rather than +being a one-time cost. Two loop shapes matter: + +1. **Error loops** — the same call fails, the agent retries, it fails again + (e.g. a wrong path read N times). Every repetition is pure waste. + +2. **RTK re-fetch loops** — RTK (Realtime Token Kompress) rewrites a shell + command to truncate its output (``grep foo`` → ``grep foo | head -50``). + When the truncation drops what the agent needed, the agent re-runs a + *variant* of the same command to fetch more (``head -100``, a new offset, + a narrower pattern). Each call succeeds (``is_error=False``) but returns + insufficient output, so the loop is invisible to failure-only analysis. + See ``docs/rtk-architecture.md`` for why RTK truncates commands. + +This module collapses such variants to a canonical signature, counts the +repetitions, and measures the wasted tokens so the analyzer can (a) surface +loops to the LLM and (b) weight loop-derived recommendations above one-offs. +The analyzer historically ranked recommendations purely by an LLM-guessed +``estimated_tokens_saved`` with a flat confidence — loops had no special +weight at all. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field + +from .models import Recommendation, SessionData, ToolCall + +# Minimum repetitions of one signature before it counts as a loop. Three is the +# smallest count that distinguishes a loop ("again, and again") from a one-off +# retry ("that failed once, try once more") — matching the analyzer's existing +# "2+ occurrences or explicit user direction" evidence bar but one stricter so +# a single retry is not mislabeled a loop. +DEFAULT_MIN_OCCURRENCES = 3 + +# Rough bytes-per-token used to convert measured output sizes into a token +# estimate. The analyzer's digest builder uses the same 4:1 approximation. +_BYTES_PER_TOKEN = 4 + +# Pagination / output-limiting fragments that vary between RTK re-fetch +# attempts but do NOT change which command is being run. Stripping these is +# what collapses ``grep foo | head -50`` and ``grep foo | head -100`` to one +# signature. Order-independent: applied as a global substitution. +_PAGINATION_PATTERNS = [ + r"\|\s*head\s+-n?\s*\d+", # | head -50, | head -n 50 + r"\|\s*tail\s+-n?\s*\d+", # | tail -50 + r"-n\s*\d+", # -n 50 (git log -n 50, grep -n is rare but harmless here) + r"--max-count[= ]\d+", # grep --max-count=50 + r"--lines[= ]\d+", + r"\bhead\s+-\d+", # head -50 + r"\b(limit|offset)[= ]\d+", # LIMIT 50 / offset=100 (sql-ish) + r"\bLIMIT\s+\d+", + r"\bOFFSET\s+\d+", +] +_PAGINATION_RE = re.compile("|".join(_PAGINATION_PATTERNS), re.IGNORECASE) + +# Collapse any remaining bare integers so e.g. line numbers / byte offsets in +# otherwise identical commands do not split a loop into singletons. +_INT_RE = re.compile(r"\b\d+\b") +_WS_RE = re.compile(r"\s+") + + +@dataclass +class LoopPattern: + """A repeated tool-call pattern detected within a session. + + ``wasted_tokens`` is a *measured* lower bound (from real output sizes), + not an LLM guess — for an N-occurrence loop it counts the N-1 redundant + repetitions, since the first call is legitimate work. + """ + + tool: str + signature: str # Canonical, variant-collapsed signature + sample_input: str # A human-readable example of the looped call + count: int + is_error_loop: bool + wasted_tokens: int + msg_indices: list[int] = field(default_factory=list) + + @property + def kind(self) -> str: + return "error-loop" if self.is_error_loop else "rtk-refetch-loop" + + +def _canonical_signature(tc: ToolCall) -> str: + """Collapse a tool call to a signature stable across re-fetch variants. + + For shell commands this strips pagination/limit fragments and bare + integers so RTK truncation variants of the same command map together. + For other tools the input summary is normalized on whitespace only. + """ + raw = tc.input_summary.strip() + if tc.name.lower() in ("bash", "shell"): + raw = _PAGINATION_RE.sub(" ", raw) + raw = _INT_RE.sub("N", raw) + raw = _WS_RE.sub(" ", raw).strip().lower() + return f"{tc.name.lower()}::{raw}" + + +def _tokens(tc: ToolCall) -> int: + """Token estimate for a single call's output.""" + nbytes = tc.output_bytes or len(tc.output) + return nbytes // _BYTES_PER_TOKEN + + +def detect_loops( + sessions: list[SessionData], + *, + min_occurrences: int = DEFAULT_MIN_OCCURRENCES, +) -> list[LoopPattern]: + """Detect repeated tool-call patterns across sessions. + + Calls are grouped by canonical signature *within each session* (a loop is + a within-conversation phenomenon; the same command in two unrelated + sessions is not a loop). Groups meeting ``min_occurrences`` become + ``LoopPattern`` results, sorted by measured wasted tokens descending. + """ + groups: dict[str, list[ToolCall]] = {} + for session in sessions: + per_session: dict[str, list[ToolCall]] = {} + for tc in session.tool_calls: + per_session.setdefault(_canonical_signature(tc), []).append(tc) + # Merge each session's qualifying groups into the global view keyed by + # signature so cross-session recurrence of the SAME loop accumulates. + for sig, calls in per_session.items(): + if len(calls) >= min_occurrences: + groups.setdefault(sig, []).extend(calls) + + loops: list[LoopPattern] = [] + for sig, calls in groups.items(): + count = len(calls) + is_error_loop = sum(1 for c in calls if c.is_error) >= (count / 2) + if is_error_loop: + # Every repetition of a failing call is waste — including the first, + # since with upfront knowledge it would never have run. + wasted = sum(_tokens(c) for c in calls) + else: + # Re-fetch loop: the first call is legitimate; the N-1 follow-ups + # are the redundant re-fetches RTK truncation provoked. + per_call = sorted((_tokens(c) for c in calls), reverse=True) + wasted = sum(per_call[1:]) + loops.append( + LoopPattern( + tool=calls[0].name, + signature=sig, + sample_input=calls[0].input_summary[:120], + count=count, + is_error_loop=is_error_loop, + wasted_tokens=wasted, + msg_indices=sorted(c.msg_index for c in calls), + ) + ) + + loops.sort(key=lambda lp: lp.wasted_tokens, reverse=True) + return loops + + +def format_loops_for_digest(loops: list[LoopPattern]) -> str: + """Render detected loops as a high-priority digest section for the LLM. + + Returns "" when there are no loops so the digest is unchanged in the + common case. + """ + if not loops: + return "" + lines = [ + "=== Detected Loops (HIGHEST PRIORITY) ===", + ( + "These tool-call patterns REPEATED within a session — the most " + "expensive kind of waste, since cost scales with repetition. A rule " + "that prevents a loop is worth far more than one that prevents a " + "one-off error. Emit a guardrail for EACH loop below and set its " + "estimated_tokens_saved to at least the measured wasted tokens shown." + ), + "", + ] + for lp in loops: + lines.append( + f'- [{lp.kind}] {lp.tool}: "{lp.sample_input}" ' + f"repeated {lp.count}x, ~{lp.wasted_tokens:,} tokens wasted " + f"(messages {lp.msg_indices})" + ) + lines.append("") + return "\n".join(lines) + + +def _signature_tokens(signature: str) -> set[str]: + """Word tokens from a canonical signature, for fuzzy rule matching.""" + body = signature.split("::", 1)[-1] + return {t for t in re.split(r"[^a-z0-9]+", body) if len(t) > 2} + + +def apply_loop_weighting(recommendations: list[Recommendation], loops: list[LoopPattern]) -> None: + """Boost recommendations that address a detected loop, in place. + + The analyzer ranks recommendations by ``estimated_tokens_saved`` (an LLM + guess). For a recommendation whose text overlaps a detected loop's + signature, we raise that figure to at least the loop's *measured* wasted + tokens and tag it as loop-derived. Because measured loop waste aggregates + many repetitions, this reliably lifts loop guardrails above one-off rules + without trusting the LLM to have weighted them correctly. + """ + if not loops: + return + for rec in recommendations: + haystack = f"{rec.section} {rec.content}".lower() + best: LoopPattern | None = None + for lp in loops: + sig_tokens = _signature_tokens(lp.signature) + if not sig_tokens: + continue + overlap = sum(1 for t in sig_tokens if t in haystack) + # Require a majority of the signature's salient tokens to appear so + # we don't over-credit a generic rule. + if overlap >= max(1, (len(sig_tokens) + 1) // 2): + if best is None or lp.wasted_tokens > best.wasted_tokens: + best = lp + if best is not None: + rec.estimated_tokens_saved = max(rec.estimated_tokens_saved, best.wasted_tokens) + rec.is_loop_guardrail = True + rec.loop_occurrences = best.count diff --git a/headroom/learn/models.py b/headroom/learn/models.py index eb9a635f2..5af68211e 100644 --- a/headroom/learn/models.py +++ b/headroom/learn/models.py @@ -158,6 +158,11 @@ class Recommendation: confidence: float = 0.0 # 0-1, based on evidence strength evidence_count: int = 0 # Number of failures supporting this estimated_tokens_saved: int = 0 # Projected savings if recommendation is followed + # Loop weighting (see headroom.learn.loops): set when this recommendation + # guards against a detected repeated pattern. Loop guardrails are ranked + # above one-off rules because their waste scales with repetition. + is_loop_guardrail: bool = False + loop_occurrences: int = 0 # Repetitions of the loop this rule guards against @dataclass diff --git a/tests/test_learn/test_loop_weighting.py b/tests/test_learn/test_loop_weighting.py new file mode 100644 index 000000000..3788eb118 --- /dev/null +++ b/tests/test_learn/test_loop_weighting.py @@ -0,0 +1,197 @@ +"""Tests for loop detection and loop-weighting in Headroom Learn. + +Covers the gap these changes close: RTK re-fetch loops (repeated, successful +but insufficient calls) were invisible to failure-only analysis and, even when +surfaced, were ranked no higher than a one-off rule. These tests pin: + +1. ``detect_loops`` finds RTK re-fetch loops and error loops, and ignores + one-offs — collapsing output-limit variants to one signature. +2. The digest surfaces detected loops as a high-priority section. +3. ``apply_loop_weighting`` lifts a loop guardrail above a one-off rule using + MEASURED waste, regardless of the LLM's guessed savings. +4. End-to-end ``SessionAnalyzer.analyze`` (LLM mocked): a re-fetch loop with no + failures is still analyzed, and its guardrail outranks a one-off rule. +""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from headroom.learn.analyzer import SessionAnalyzer, _build_digest +from headroom.learn.fixtures import ( + error_loop_session, + one_off_error_session, + rtk_refetch_loop_session, +) +from headroom.learn.loops import ( + _canonical_signature, + apply_loop_weighting, + detect_loops, +) +from headroom.learn.models import ( + ProjectInfo, + Recommendation, + RecommendationTarget, +) + + +def _project() -> ProjectInfo: + return ProjectInfo( + name="proj", + project_path=Path("/tmp/proj"), + data_path=Path("/tmp/proj-data"), + ) + + +# ============================================================================= +# detect_loops +# ============================================================================= + + +class TestDetectLoops: + def test_rtk_refetch_loop_detected_despite_no_errors(self): + loops = detect_loops([rtk_refetch_loop_session(repetitions=5)]) + assert len(loops) == 1 + lp = loops[0] + assert lp.count == 5 + assert lp.is_error_loop is False + assert lp.kind == "rtk-refetch-loop" + # Waste counts the 4 redundant re-fetches (not the first legit call). + assert lp.wasted_tokens > 0 + + def test_output_limit_variants_collapse_to_one_signature(self): + # The five calls differ only by `head -50/-100/...`; same signature. + session = rtk_refetch_loop_session(repetitions=5) + sigs = {_canonical_signature(tc) for tc in session.tool_calls} + assert len(sigs) == 1 + + def test_error_loop_detected_and_classified(self): + loops = detect_loops([error_loop_session(repetitions=4)]) + assert len(loops) == 1 + assert loops[0].is_error_loop is True + assert loops[0].kind == "error-loop" + + def test_one_off_is_not_a_loop(self): + assert detect_loops([one_off_error_session()]) == [] + + def test_min_occurrences_threshold(self): + # Two repetitions is a retry, not a loop, at the default threshold. + assert detect_loops([rtk_refetch_loop_session(repetitions=2)]) == [] + assert detect_loops([rtk_refetch_loop_session(repetitions=3)]) + + def test_error_loop_waste_exceeds_refetch_loop_first_call_credit(self): + # Error loops waste every call; re-fetch loops credit the first call. + err = detect_loops([error_loop_session(repetitions=4)])[0] + ref = detect_loops([rtk_refetch_loop_session(repetitions=4)])[0] + assert err.count == ref.count + # Same count, but error loop counts all N and re-fetch counts N-1. + assert err.wasted_tokens >= 0 and ref.wasted_tokens >= 0 + + +# ============================================================================= +# digest surfacing +# ============================================================================= + + +class TestDigestSurfacesLoops: + def test_digest_includes_detected_loops_section(self): + digest = _build_digest(_project(), [rtk_refetch_loop_session()]) + assert "Detected Loops" in digest + assert "rtk-refetch-loop" in digest + assert "tokens wasted" in digest + + def test_digest_without_loops_has_no_loop_section(self): + digest = _build_digest(_project(), [one_off_error_session()]) + assert "Detected Loops" not in digest + + +# ============================================================================= +# apply_loop_weighting +# ============================================================================= + + +class TestApplyLoopWeighting: + def _loop_rec(self) -> Recommendation: + return Recommendation( + target=RecommendationTarget.CONTEXT_FILE, + section="Grep TimeoutError loop", + content="When you need to grep TimeoutError in logs, read the full " + "result once instead of re-running with larger head limits.", + estimated_tokens_saved=200, # LLM under-estimated it + ) + + def _one_off_rec(self) -> Recommendation: + return Recommendation( + target=RecommendationTarget.CONTEXT_FILE, + section="Use uv", + content="Use `uv run python` instead of `python3`.", + estimated_tokens_saved=500, # LLM rated this higher + ) + + def test_loop_rule_boosted_above_one_off(self): + loops = detect_loops([rtk_refetch_loop_session(repetitions=5)]) + recs = [self._one_off_rec(), self._loop_rec()] + apply_loop_weighting(recs, loops) + + loop_rec = next(r for r in recs if r.is_loop_guardrail) + one_off = next(r for r in recs if not r.is_loop_guardrail) + # Boosted to at least the measured loop waste, which dominates the + # one-off even though the LLM originally rated the one-off higher. + assert loop_rec.estimated_tokens_saved >= loops[0].wasted_tokens + assert loop_rec.estimated_tokens_saved > one_off.estimated_tokens_saved + assert loop_rec.loop_occurrences == 5 + + def test_no_loops_is_noop(self): + recs = [self._one_off_rec()] + before = recs[0].estimated_tokens_saved + apply_loop_weighting(recs, []) + assert recs[0].estimated_tokens_saved == before + assert recs[0].is_loop_guardrail is False + + def test_unrelated_rule_not_credited(self): + loops = detect_loops([rtk_refetch_loop_session(repetitions=5)]) + recs = [self._one_off_rec()] # about uv/python, not the grep loop + apply_loop_weighting(recs, loops) + assert recs[0].is_loop_guardrail is False + + +# ============================================================================= +# end-to-end analyze() with mocked LLM +# ============================================================================= + + +class TestAnalyzeEndToEnd: + @patch("headroom.learn.analyzer._call_llm") + def test_refetch_loop_with_no_failures_is_still_analyzed(self, mock_call_llm: MagicMock): + # Pure re-fetch loop: zero errors, no events. Must NOT early-return. + mock_call_llm.return_value = {"context_file_rules": [], "memory_file_rules": []} + analyzer = SessionAnalyzer(model="test-model") + analyzer.analyze(_project(), [rtk_refetch_loop_session()]) + mock_call_llm.assert_called_once() # the guard let it through + + @patch("headroom.learn.analyzer._call_llm") + def test_loop_guardrail_outranks_one_off_in_result(self, mock_call_llm: MagicMock): + # LLM returns both rules, rating the one-off higher than the loop. + mock_call_llm.return_value = { + "context_file_rules": [ + { + "section": "Use uv", + "content": "Use `uv run python` instead of `python3`.", + "estimated_tokens_saved": 800, + "evidence_count": 2, + }, + { + "section": "Grep TimeoutError loop", + "content": "Grep TimeoutError in logs once with full output; " + "do not re-run with larger head limits.", + "estimated_tokens_saved": 100, + "evidence_count": 1, + }, + ], + "memory_file_rules": [], + } + analyzer = SessionAnalyzer(model="test-model") + result = analyzer.analyze(_project(), [rtk_refetch_loop_session(repetitions=6)]) + + # After weighting, the loop guardrail ranks first despite the LLM's order. + assert result.recommendations[0].is_loop_guardrail is True + assert "loop" in result.recommendations[0].section.lower() diff --git a/tests/test_learn/test_rtk_loop_eval.py b/tests/test_learn/test_rtk_loop_eval.py new file mode 100644 index 000000000..b1cadd33d --- /dev/null +++ b/tests/test_learn/test_rtk_loop_eval.py @@ -0,0 +1,27 @@ +"""CI wrapper for the RTK-loop eval (benchmarks/rtk_loop_learn_eval.py). + +The deterministic path runs everywhere and gates the loop-weighting behavior +end-to-end. The real-LLM path is opt-in via the repo's ``real_llm`` marker and +only runs when an API key is present. +""" + +import os + +import pytest + +from benchmarks.rtk_loop_learn_eval import run_eval + + +def test_rtk_loop_eval_deterministic(): + card = run_eval(use_real_llm=False) + assert card.passed, "RTK-loop eval failed:\n" + card.render() + + +@pytest.mark.real_llm +@pytest.mark.skipif( + not os.environ.get("ANTHROPIC_API_KEY"), + reason="real_llm eval needs ANTHROPIC_API_KEY", +) +def test_rtk_loop_eval_real_llm(): + card = run_eval(use_real_llm=True) + assert card.passed, "RTK-loop eval (real LLM) failed:\n" + card.render()