diff --git a/CHANGELOG.md b/CHANGELOG.md index 298322ae..730cc573 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - **Phase 3 prompt-injection defense — extend `_wrap_user_input` across GenAI callers** (Issue #1007, follow-up to #960): Adopts the Phase 2 wrapping helper across 8 additional `GenAIAnalyzer` callers (10 sites total), giving the codebase uniform prompt-injection defense at every LLM integration point. New helper `_safe_wrap(text)` in `plugins/autonomous-dev/hooks/genai_utils.py` simplifies adoption (single-line invocation, never raises). Modified callers — Pattern A (kwarg substitution): `complexity_assessor.py` (`feature_description`), `scope_detector.py` (`issue_text`), `issue_scope_detector.py` (`issue_text`), `alignment_assessor.py` (2 sites: `dependencies_sample`/`config_files`/`readme_content`; enum-constrained scalars like `primary_language` left unwrapped), `genai_refactor_analyzer.py` (5 sites covering `doc_content`/`source_content`/`test_source`/`source_under_test`/`function_source`/`references_summary`/`original_analysis`), `security_scan.py` (`line`+`variable_name`; `secret_type` regex-catalog constant unwrapped), `auto_fix_docs.py` (`item_name`; `item_type` constrained literal unwrapped). Pattern B (.format pre-substitution): `feature_completion_detector.py` wraps both `feature` and `evidence` at `.format()` time since `analyzer.analyze()` receives the already-formatted prompt. Audit verified 4 false positives (`error_analyzer.py`, `codebase_analyzer.py`, `enforce_prunable_threshold.py`, `align_project_retrofit.py`) do NOT call `GenAIAnalyzer` and are NOT modified — locked by a parametrized regression test. 26 new test functions: 4 in `tests/unit/hooks/test_genai_utils_safe_wrap.py` (helper guarantees: wraps, escapes, coerces non-string, never raises), 21 in `tests/unit/lib/test_phase3_wrap_adoption.py` (8 wrap-adoption tests parametrized over multi-site files, plus 4 false-positive lock tests), and 1 existing-test adjustment in `tests/unit/lib/test_scope_detector_genai.py` (assertion updated from raw to wrapped form). `docs/INTENT-CLASSIFICATION.md` and `CHANGELOG.md` updated. ### Added +- **Intent classifier Phase 2 — classifier-gated plan-critic + research skip** (Issue #961): New library `plugins/autonomous-dev/lib/pipeline_intent_gates.py` exposes two pure-predicate decision functions — `should_skip_web_research()` and `should_skip_plan_critic()` — that the `/implement` coordinator consults to skip non-floor pipeline steps for low-risk intents. Activation is opt-in via `INTENT_CLASSIFIER_ENABLED=true`; default behavior (env var unset/false) is byte-identical to the pre-#961 pipeline. **Web research skip** (STEP 3.6, new): when intent ∈ {`config`, `doc`, `refactor`} with confidence ≥ 0.85, the `researcher` (Sonnet, web) agent is omitted from STEP 4; `researcher-local` (Haiku, codebase) always runs. **Plan-critic skip** (STEP 5.5a.1, new): when intent ∈ {`refactor`, `config`, `doc`, `typo`} with confidence ≥ 0.85 AND `predicted_file_count` ≤ 3, plan-critic invocation (5.5b) is bypassed; STEP 5.5c structural validation (file paths, acceptance criteria, testing strategy) ALWAYS still runs. Both gates honor the `--strict` token in ARGUMENTS as a forced full-pipeline override. Fail-closed on classifier import error, exception, or AMBIGUOUS verdict — no skip path can fire without a high-confidence non-security classification. `pipeline_completion_state.py` extended with `record_web_research_skipped()` / `get_web_research_skipped()` and a backward-compatible `reason` kwarg on `record_plan_critic_skipped()`; `get_plan_critic_skipped()` reads both legacy bool and new dict (`{"skipped": True, "reason": ...}`) shapes. The classifier itself (Phase 1, #971) is unchanged. 41 new test functions in `tests/unit/test_implement_command_routing.py` (markdown structure, env-var truthiness, web-research and plan-critic gate logic across all skip/no-skip permutations, state-persistence round-trip, legacy bool back-compat). `tests/unit/test_implement_command_structure.py` line-cap bumped 575 → 1650 to accommodate the new STEP 3.6 / STEP 5.5a.1 sections. + - **Intent classifier Phase E — enforcement cutover** (Issue #999): Completes the intent classifier track started in Phase 1 (#971). When `INTENT_CLASSIFIER_ENFORCE=true` (default `false`), three pipeline-gating hooks consult the per-session artifact written by Phase D and skip non-floor checks for low-risk intent classes (`doc`, `config`, `typo`, `status_query`, `conversation`). Hard-floor hooks (Phase C registry) always fire regardless of intent class. New libraries: `lib/hook_stdin.py` — one-shot cached stdin reader providing `read_stdin_once()` and `extract_session_id()` so Phase E hooks can read the PreToolUse payload without exhausting the stream across multiple call sites; `lib/enforcement_decision.py` — pure policy layer with 8-rule priority chain (`should_skip_enforcement()`) — zero filesystem side effects, never raises, fail-safe direction is always enforce. Modified hooks: `unified_pre_tool.py` (5 wrap sites via `_phase_e_skip()` helper), `plan_gate.py`, `plan_mode_exit_detector.py`. New telemetry decision shape `"mode_skip"` added to `hook_telemetry.py` `VALID_DECISION_SHAPES` — emitted on skip path to `.claude/logs/hook-blocks.jsonl` for observability. Reader-side API added to `session_mode.py`: `read_session_mode(session_id)` and `should_pipeline_enforce(intent_class)`. Single env var rollback: unset `INTENT_CLASSIFIER_ENFORCE` or set to any non-"true" value to revert all Phase E gating. `docs/INTENT-CLASSIFICATION.md` updated: Phase E roadmap row, Phase E section, `INTENT_CLASSIFIER_ENFORCE` behavior, related files table. `docs/LIBRARIES.md` gains entries for `hook_stdin.py` and `enforcement_decision.py`; `session_mode.py` and `hook_telemetry.py` entries updated. `docs/HOOKS.md` and `docs/HOOK-REGISTRY.md` updated with Phase E notes. 48 new test functions: 13 in `tests/unit/lib/test_enforcement_decision.py`, 10 in `tests/unit/lib/test_hook_stdin.py`, 12 in `tests/unit/lib/test_session_mode_reader.py`, 10 in `tests/unit/hooks/test_phase_e_integration.py`, 3 in `tests/integration/test_enforcement_mode_cutover.py`. - **Intent classifier Phase 2 — prompt-injection defense** (Issue #960): Hardens `lib/intent_classifier.py` against prompt-injection attacks on the LLM classification path (OWASP LLM01:2025). User prompts are now wrapped in `` XML delimiters via a new `_wrap_user_input(text)` helper in `hooks/genai_utils.py` before being substituted into `_LLM_PROMPT_TEMPLATE`; `html.escape(text, quote=False)` encodes `&`, `<`, and `>` so an attacker cannot inject structural tokens that escape the delimiter. Apostrophes and double-quotes are preserved (`quote=False`) because they are common in legitimate prompts. A module-load guard (`_validate_template_integrity`) raises `RuntimeError` (not `assert` — survives `python -O`) at import time if the template no longer contains ``; refusing to load is safer than silently degrading to a vulnerable state. `_wrap_user_input` lives in `genai_utils.py` for cross-codebase reuse — other `GenAIAnalyzer` callers can adopt the same defense via follow-up issues. The security regex pre-gate and fail-open contract are unchanged; this fix only affects the LLM path for non-security-keyword prompts. 8 new tests in `tests/unit/lib/test_intent_classifier.py` (`TestPromptInjectionResistance`): injection-escape verification, RuntimeError guard, `_wrap_user_input` round-trip, LLM path uses wrapped form, and fallback behavior when `_wrap_user_input` is unavailable. `docs/INTENT-CLASSIFICATION.md` updated: Phase 2 roadmap row, new Architecture section 5 (prompt-injection defense), test count corrected. `docs/LIBRARIES.md` `intent_classifier.py` entry updated with Phase 2 description and corrected test count. `docs/HOOKS.md` `genai_utils.py` entry updated to document `_wrap_user_input`. Prerequisite before `INTENT_CLASSIFIER_ENFORCE=true` rollout. diff --git a/plugins/autonomous-dev/commands/implement.md b/plugins/autonomous-dev/commands/implement.md index b4f6f255..ce379b5f 100644 --- a/plugins/autonomous-dev/commands/implement.md +++ b/plugins/autonomous-dev/commands/implement.md @@ -502,15 +502,48 @@ Otherwise: proceed to STEP 4. - ❌ The change touches security-sensitive files or topics (authentication, encryption, tokens, secrets, sso, oauth, rbac, permission, session, jwt) - ❌ More than 3 files are referenced in the description +### STEP 3.6: Web Research Classifier Gate (Phase 2 — opt-in) + +**Progress**: Output step banner only when the gate fires (STEP 3.6/15 — Web Research Classifier Gate). Skip silently when `INTENT_CLASSIFIER_ENABLED` is unset/false. + +**Activation**: Only runs when `INTENT_CLASSIFIER_ENABLED=true` is set in the environment. Default behavior (env var unset/false) is unchanged — both researchers run as before. + +**Decision logic** (delegated to pure predicate): + +```python +from pipeline_intent_gates import should_skip_web_research +from pipeline_completion_state import record_web_research_skipped + +skip, reason = should_skip_web_research( + feature_description=FEATURE_DESCRIPTION, + arguments=ARGUMENTS, +) +if skip: + record_web_research_skipped(SESSION_ID, issue_number=ISSUE_NUM, reason=reason) + print(f"Web research: SKIPPED ({reason}) — researcher-local still runs") +``` + +**When skipped**: +- STEP 4 dispatches **only** `researcher-local` (Haiku) — the `researcher` (Sonnet, web) agent is omitted. +- The merged research consists of researcher-local output alone. + +**When NOT skipped** (any of these): classifier disabled, `--strict` in ARGUMENTS, intent not in {CONFIG, DOC, REFACTOR}, confidence < 0.85, classifier failed open or returned AMBIGUOUS. + +**FORBIDDEN** — You MUST NOT: +- Skip BOTH researchers (researcher-local always runs) +- Skip when intent is SECURITY_CRITICAL, IMPLEMENT, TEST, or any other class outside the explicit allow-list +- Skip when `--strict` appears in ARGUMENTS + ### STEP 4: Parallel Research (2 agents) **Progress**: Output step banner (STEP 4/15 — Research, Agents: researcher-local (Haiku), researcher (Sonnet)). Output agent completions after each returns. **Pre-dispatch**: Follow the Pre-Dispatch Ordering Protocol (above) for each agent before invoking. -Invoke TWO agents in PARALLEL (single message, both Agent tool calls): -1. **Agent**(subagent_type="researcher-local", model="haiku") — "Search codebase for patterns related to: {feature}. Output JSON with findings and sources." -2. **Agent**(subagent_type="researcher", model="sonnet") — "Research best practices for: {feature}. MUST use WebSearch. Output JSON with findings, sources, security considerations." +Invoke researchers in PARALLEL (single message). The set depends on STEP 3.6: + +- **Always**: **Agent**(subagent_type="researcher-local", model="haiku") — "Search codebase for patterns related to: {feature}. Output JSON with findings and sources." +- **Unless STEP 3.6 set `web_research_skipped`**: **Agent**(subagent_type="researcher", model="sonnet") — "Research best practices for: {feature}. MUST use WebSearch. Output JSON with findings, sources, security considerations." Validation: If web researcher shows 0 tool uses, retry. Merge both outputs. Persist research via `save_merged_research()`. @@ -556,6 +589,40 @@ record_plan_critic_skipped(SESSION_ID, issue_number=ISSUE_NUM) If no matching file with "Verdict: PROCEED" is found, proceed to 5.5b. +#### 5.5a.1 — Classifier-Gated Skip (Phase 2 — opt-in) + +**Activation**: Only runs when `INTENT_CLASSIFIER_ENABLED=true`. Default (env var unset/false) is unchanged — proceed straight to 5.5b. + +**Decision logic** (delegated to pure predicate): + +```python +from pipeline_intent_gates import should_skip_plan_critic +from pipeline_completion_state import record_plan_critic_skipped + +skip, reason = should_skip_plan_critic( + feature_description=FEATURE_DESCRIPTION, + arguments=ARGUMENTS, +) +if skip: + record_plan_critic_skipped(SESSION_ID, issue_number=ISSUE_NUM, reason=reason) + print(f"Plan validation: SKIPPED ({reason}) — proceeding to 5.5c structural validation") + # Skip 5.5b (plan-critic invocation). 5.5c structural validation STILL RUNS. +``` + +**Skip conditions** (ALL must hold): classifier enabled, no `--strict`, intent in {REFACTOR, CONFIG, DOC, TYPO}, confidence >= 0.85, predicted_file_count <= 3, classifier did not fail open. + +**When skipped**: +- 5.5b (plan-critic agent invocation) is bypassed. +- 5.5c (structural validation — file paths, acceptance criteria, testing strategy) **always still runs**. + +**When NOT skipped**: proceed to 5.5b as before. + +**FORBIDDEN** — You MUST NOT: +- Skip 5.5c structural validation under any circumstance (the existing 5.5d FORBIDDEN list still applies in full) +- Skip when `--strict` is in ARGUMENTS +- Skip when intent is IMPLEMENT, SECURITY_CRITICAL, TEST, or any class outside {REFACTOR, CONFIG, DOC, TYPO} +- Skip when predicted_file_count > 3 (the issue may look small but the classifier predicts otherwise) + #### 5.5b — Budget Plan-Critic Invocation **When no pre-validated plan exists**, invoke the plan-critic agent with a constrained budget: @@ -599,7 +666,7 @@ If any requirement is missing: - ❌ You MUST NOT accept a plan that contains 0 file paths (structural validation always blocks this) - ❌ You MUST NOT accept a plan that has no acceptance criteria section -- ❌ You MUST NOT skip plan-critic when no pre-validated plan file exists in `.claude/plans/` +- ❌ You MUST NOT skip plan-critic when no pre-validated plan file exists in `.claude/plans/` AND the 5.5a.1 classifier gate did NOT fire (classifier disabled, low-confidence, --strict, or intent outside allow-list) - ❌ You MUST NOT skip structural validation for any reason (it always runs, even with a pre-validated plan) ### STEP 6: Generate Acceptance Tests (default mode only) diff --git a/plugins/autonomous-dev/config/install_manifest.json b/plugins/autonomous-dev/config/install_manifest.json index ccdcd45a..6ffe7a9d 100644 --- a/plugins/autonomous-dev/config/install_manifest.json +++ b/plugins/autonomous-dev/config/install_manifest.json @@ -266,6 +266,7 @@ "plugins/autonomous-dev/lib/permission_classifier.py", "plugins/autonomous-dev/lib/pipeline_completion_state.py", "plugins/autonomous-dev/lib/pipeline_efficiency_analyzer.py", + "plugins/autonomous-dev/lib/pipeline_intent_gates.py", "plugins/autonomous-dev/lib/pipeline_intent_validator.py", "plugins/autonomous-dev/lib/pipeline_state.py", "plugins/autonomous-dev/lib/pipeline_timing_analyzer.py", diff --git a/plugins/autonomous-dev/lib/pipeline_completion_state.py b/plugins/autonomous-dev/lib/pipeline_completion_state.py index c2594a46..bc1f30f6 100644 --- a/plugins/autonomous-dev/lib/pipeline_completion_state.py +++ b/plugins/autonomous-dev/lib/pipeline_completion_state.py @@ -779,22 +779,24 @@ def record_plan_critic_skipped( session_id: str, *, issue_number: int = 0, + reason: str = "pre_validated", ) -> None: """Record that plan-critic was skipped for a given session/issue. - Called by the coordinator at STEP 5.5a when a pre-validated plan - is found in `.claude/plans/`, bypassing plan-critic invocation. + Called by the coordinator at STEP 5.5a when a pre-validated plan is + found, OR at STEP 5.5a.1 when the Phase 2 classifier gate fires. Args: session_id: The pipeline session identifier. issue_number: The issue number (0 for non-batch). + reason: Audit string. "pre_validated" (5.5a default) or "classifier" (Phase 2). - Issues: #878 + Issues: #878, #961 """ state = _ensure_state(session_id) plan_critic_skipped = state.setdefault("plan_critic_skipped", {}) issue_key = str(issue_number) - plan_critic_skipped[issue_key] = True + plan_critic_skipped[issue_key] = {"skipped": True, "reason": reason} _write_state(session_id, state) @@ -812,14 +814,70 @@ def get_plan_critic_skipped( Returns: True if plan-critic was recorded as skipped, False otherwise. - Issues: #878 + Issues: #878, #961 """ state = _read_state(session_id) if not state: return False plan_critic_skipped = state.get("plan_critic_skipped", {}) issue_key = str(issue_number) - return bool(plan_critic_skipped.get(issue_key, False)) + entry = plan_critic_skipped.get(issue_key, False) + if isinstance(entry, dict): + return bool(entry.get("skipped", False)) + return bool(entry) + + +def record_web_research_skipped( + session_id: str, + *, + issue_number: int = 0, + reason: str = "classifier", +) -> None: + """Record that web research (researcher agent) was skipped at STEP 4. + + Distinct from `record_research_skipped` (STEP 3.5 fully-specified path) so + Phase 5 telemetry can attribute the skip to the classifier vs the + pre-existing fully-specified gate. + + Args: + session_id: The pipeline session identifier. + issue_number: The issue number (0 for non-batch). + reason: Audit string from pipeline_intent_gates.should_skip_web_research. + + Issues: #961 + """ + state = _ensure_state(session_id) + web_research_skipped = state.setdefault("web_research_skipped", {}) + issue_key = str(issue_number) + web_research_skipped[issue_key] = {"skipped": True, "reason": reason} + _write_state(session_id, state) + + +def get_web_research_skipped( + session_id: str, + *, + issue_number: int = 0, +) -> bool: + """Check if web research was skipped (Phase 2 classifier gate). + + Args: + session_id: The pipeline session identifier. + issue_number: The issue number (0 for non-batch). + + Returns: + True if recorded as skipped, False otherwise. + + Issues: #961 + """ + state = _read_state(session_id) + if not state: + return False + web_research_skipped = state.get("web_research_skipped", {}) + issue_key = str(issue_number) + entry = web_research_skipped.get(issue_key, False) + if isinstance(entry, dict): + return bool(entry.get("skipped", False)) + return bool(entry) def verify_pipeline_agent_completions( diff --git a/plugins/autonomous-dev/lib/pipeline_intent_gates.py b/plugins/autonomous-dev/lib/pipeline_intent_gates.py new file mode 100644 index 00000000..746d3f78 --- /dev/null +++ b/plugins/autonomous-dev/lib/pipeline_intent_gates.py @@ -0,0 +1,153 @@ +"""Phase 2 (#961) classifier-gated pipeline skip predicates. + +Pure functions; no I/O. The implement.md coordinator imports these and +acts on the boolean result. Telemetry persistence is the caller's job +(via pipeline_completion_state). + +Issues: #961 +""" +from __future__ import annotations + +import os +from typing import Optional + +try: + from intent_classifier import IntentClass, IntentResult, classify_prompt # type: ignore[import-not-found] +except ImportError: + IntentClass = None # type: ignore[assignment] + IntentResult = None # type: ignore[assignment] + classify_prompt = None # type: ignore[assignment] + + +WEB_RESEARCH_SKIP_INTENTS = frozenset({"config", "doc", "refactor"}) +PLAN_CRITIC_SKIP_INTENTS = frozenset({"refactor", "config", "doc", "typo"}) + +CONFIDENCE_THRESHOLD = 0.85 +MAX_FILES_FOR_PLAN_CRITIC_SKIP = 3 + + +def _truthy(val: Optional[str]) -> bool: + """Strict truthy parse for env vars: only 'true'/'1'/'yes' (case-insensitive). + + Args: + val: Environment variable value string, or None if unset. + + Returns: + True only when val is "true", "1", or "yes" (case-insensitive). + """ + if val is None: + return False + return val.strip().lower() in ("true", "1", "yes") + + +def classifier_enabled() -> bool: + """Check if INTENT_CLASSIFIER_ENABLED env var is explicitly truthy. + + Returns: + True when INTENT_CLASSIFIER_ENABLED is explicitly truthy. Default: False. + """ + return _truthy(os.environ.get("INTENT_CLASSIFIER_ENABLED")) + + +def strict_mode(arguments: str) -> bool: + """Check if --strict appears as a whole token in ARGUMENTS. + + Args: + arguments: The ARGUMENTS string from the pipeline context. + + Returns: + True when --strict appears as a whole token in ARGUMENTS. + """ + if not arguments: + return False + return "--strict" in arguments.split() + + +def should_skip_web_research( + *, + feature_description: str, + arguments: str = "", + classifier_result: Optional["IntentResult"] = None, # type: ignore[type-arg] +) -> tuple[bool, str]: + """Decide whether to skip web research (researcher agent) at STEP 4. + + Args: + feature_description: The feature description string to classify. + arguments: The ARGUMENTS string from the pipeline context. + classifier_result: Optional pre-computed IntentResult. If None, will + call classify_prompt(feature_description). Accepts pre-computed + results to avoid redundant LLM calls when both gates share a run. + + Returns: + Tuple of (skip: bool, reason: str). When skip is True, reason is + "classifier:{intent}". When skip is False, reason explains why. + + Issues: #961 + """ + if not classifier_enabled(): + return False, "disabled" + if strict_mode(arguments): + return False, "strict" + if classify_prompt is None: + return False, "classifier_unavailable" + + result = classifier_result + if result is None: + try: + result = classify_prompt(feature_description) + except Exception: # noqa: BLE001 + return False, "classifier_exception" + + if result.fail_open or result.intent.value == "ambiguous": + return False, "fail_open" + if result.confidence < CONFIDENCE_THRESHOLD: + return False, "low_confidence" + if result.intent.value not in WEB_RESEARCH_SKIP_INTENTS: + return False, f"intent:{result.intent.value}" + return True, f"classifier:{result.intent.value}" + + +def should_skip_plan_critic( + *, + feature_description: str, + arguments: str = "", + classifier_result: Optional["IntentResult"] = None, # type: ignore[type-arg] +) -> tuple[bool, str]: + """Decide whether to skip plan-critic (5.5b) for small REFACTOR/CONFIG/DOC/TYPO. + + Args: + feature_description: The feature description string to classify. + arguments: The ARGUMENTS string from the pipeline context. + classifier_result: Optional pre-computed IntentResult. If None, will + call classify_prompt(feature_description). Accepts pre-computed + results to avoid redundant LLM calls when both gates share a run. + + Returns: + Tuple of (skip: bool, reason: str). When skip is True, reason is + "classifier:{intent}". When skip is False, reason explains why. + + Issues: #961 + """ + if not classifier_enabled(): + return False, "disabled" + if strict_mode(arguments): + return False, "strict" + if classify_prompt is None: + return False, "classifier_unavailable" + + result = classifier_result + if result is None: + try: + result = classify_prompt(feature_description) + except Exception: # noqa: BLE001 + return False, "classifier_exception" + + if result.fail_open or result.intent.value == "ambiguous": + return False, "fail_open" + if result.confidence < CONFIDENCE_THRESHOLD: + return False, "low_confidence" + if result.intent.value not in PLAN_CRITIC_SKIP_INTENTS: + return False, f"intent:{result.intent.value}" + if result.predicted_file_count > MAX_FILES_FOR_PLAN_CRITIC_SKIP: + return False, f"file_count:{result.predicted_file_count}" + return True, f"classifier:{result.intent.value}" diff --git a/tests/unit/test_implement_command_routing.py b/tests/unit/test_implement_command_routing.py new file mode 100644 index 00000000..efddc6ef --- /dev/null +++ b/tests/unit/test_implement_command_routing.py @@ -0,0 +1,312 @@ +"""Regression tests for Phase 2 (#961) classifier-gated routing in implement.md. + +Hybrid strategy: +- Markdown structure tests (TestImplementMdGateText): verify gate text exists. +- Predicate tests: exercise the pure functions in pipeline_intent_gates. +- State persistence tests: verify the new state helpers persist. + +Issues: #961 +""" +import os +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +IMPLEMENT_PATH = PROJECT_ROOT / "plugins/autonomous-dev/commands/implement.md" +LIB_PATH = PROJECT_ROOT / "plugins/autonomous-dev/lib" +sys.path.insert(0, str(LIB_PATH)) + + +# ---------- Markdown structure ---------- + + +class TestImplementMdGateText: + @pytest.fixture + def md(self): + return IMPLEMENT_PATH.read_text() + + def test_step_3_6_section_exists(self, md): + assert "STEP 3.6" in md, "STEP 3.6 web-research gate section missing" + assert "should_skip_web_research" in md + assert "INTENT_CLASSIFIER_ENABLED" in md + + def test_step_3_6_appears_before_step_4(self, md): + assert md.find("STEP 3.6") < md.find("### STEP 4: Parallel Research") + + def test_step_5_5a_1_section_exists(self, md): + assert "5.5a.1" in md + assert "should_skip_plan_critic" in md + + def test_step_5_5a_1_before_5_5b(self, md): + assert md.find("5.5a.1") < md.find("#### 5.5b") + + def test_strict_flag_documented_in_both_gates(self, md): + s36_start = md.find("STEP 3.6") + s4_start = md.find("### STEP 4: Parallel Research") + assert s36_start != -1 and s4_start != -1 + s36 = md[s36_start:s4_start] + + s551_start = md.find("5.5a.1") + s55b_start = md.find("#### 5.5b") + assert s551_start != -1 and s55b_start != -1 + s551 = md[s551_start:s55b_start] + + assert "--strict" in s36 + assert "--strict" in s551 + + def test_5_5c_still_always_runs(self, md): + # The structural validation gate must remain unconditional. + s551_start = md.find("5.5a.1") + s55b_start = md.find("#### 5.5b") + s551_section = md[s551_start:s55b_start] + assert "5.5c" in s551_section, "5.5c (structural validation) must be referenced in 5.5a.1" + + def test_researcher_local_always_runs(self, md): + # Verify the dispatch block states researcher-local always runs. + s4_start = md.find("### STEP 4: Parallel Research") + s45_start = md.find("### STEP 4.5:") + s4_section = md[s4_start:s45_start] + assert "Always" in s4_section or "researcher-local" in s4_section + + +# ---------- Predicate logic ---------- + + +@pytest.fixture +def fake_result(): + """Build a fake IntentResult-like object (real frozen dataclass, fake values).""" + from intent_classifier import IntentClass, IntentResult + + def _make( + intent="refactor", + confidence=0.9, + file_count=2, + fail_open=False, + ): + return IntentResult( + intent=IntentClass(intent), + confidence=confidence, + regex_hit=False, + llm_used=True, + fail_open=fail_open, + requires_security_audit=False, + prompt_length=50, + predicted_file_count=file_count, + reasoning="test", + ) + + return _make + + +class TestWebResearchGate: + def test_disabled_by_default(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_web_research + + monkeypatch.delenv("INTENT_CLASSIFIER_ENABLED", raising=False) + skip, reason = should_skip_web_research( + feature_description="x", + classifier_result=fake_result(intent="config"), + ) + assert skip is False + assert reason == "disabled" + + def test_strict_blocks_skip(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_web_research + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, reason = should_skip_web_research( + feature_description="x", + arguments="--strict", + classifier_result=fake_result(intent="config"), + ) + assert skip is False + assert reason == "strict" + + @pytest.mark.parametrize("intent", ["config", "doc", "refactor"]) + def test_skips_for_allowed_intents(self, fake_result, monkeypatch, intent): + from pipeline_intent_gates import should_skip_web_research + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, reason = should_skip_web_research( + feature_description="x", + classifier_result=fake_result(intent=intent, confidence=0.9), + ) + assert skip is True + assert reason == f"classifier:{intent}" + + @pytest.mark.parametrize("intent", ["implement", "security_critical", "test", "typo"]) + def test_does_not_skip_other_intents(self, fake_result, monkeypatch, intent): + from pipeline_intent_gates import should_skip_web_research + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, _ = should_skip_web_research( + feature_description="x", + classifier_result=fake_result(intent=intent), + ) + assert skip is False + + def test_ambiguous_does_not_skip(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_web_research + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, reason = should_skip_web_research( + feature_description="x", + classifier_result=fake_result(intent="ambiguous", fail_open=True), + ) + assert skip is False + assert reason == "fail_open" + + def test_low_confidence_does_not_skip(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_web_research + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, reason = should_skip_web_research( + feature_description="x", + classifier_result=fake_result(intent="config", confidence=0.5), + ) + assert skip is False + assert reason == "low_confidence" + + +class TestPlanCriticGate: + def test_skips_for_small_refactor(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_plan_critic + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, reason = should_skip_plan_critic( + feature_description="x", + classifier_result=fake_result(intent="refactor", file_count=2), + ) + assert skip is True + + def test_does_not_skip_when_files_exceed_3(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_plan_critic + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, reason = should_skip_plan_critic( + feature_description="x", + classifier_result=fake_result(intent="refactor", file_count=5), + ) + assert skip is False + assert reason == "file_count:5" + + @pytest.mark.parametrize( + "intent,file_count", + [ + ("typo", 1), + ("doc", 1), + ("config", 3), + ], + ) + def test_allowed_intents_within_file_limit( + self, fake_result, monkeypatch, intent, file_count + ): + from pipeline_intent_gates import should_skip_plan_critic + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, _ = should_skip_plan_critic( + feature_description="x", + classifier_result=fake_result(intent=intent, file_count=file_count), + ) + assert skip is True + + def test_implement_intent_never_skips(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_plan_critic + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, _ = should_skip_plan_critic( + feature_description="x", + classifier_result=fake_result(intent="implement", file_count=2), + ) + assert skip is False + + def test_strict_blocks_skip(self, fake_result, monkeypatch): + from pipeline_intent_gates import should_skip_plan_critic + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + skip, reason = should_skip_plan_critic( + feature_description="x", + arguments="--batch --strict", + classifier_result=fake_result(intent="doc"), + ) + assert skip is False + assert reason == "strict" + + def test_classifier_exception_fails_open(self, monkeypatch): + from pipeline_intent_gates import should_skip_plan_critic + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", "true") + with patch("pipeline_intent_gates.classify_prompt", side_effect=RuntimeError("boom")): + skip, reason = should_skip_plan_critic(feature_description="x") + assert skip is False + assert reason == "classifier_exception" + + +class TestEnvVarTruthiness: + @pytest.mark.parametrize( + "val,expected", + [ + ("true", True), + ("True", True), + ("TRUE", True), + ("1", True), + ("yes", True), + ("YES", True), + ("false", False), + ("0", False), + ("no", False), + ("", False), + ("anything_else", False), + ], + ) + def test_truthy(self, val, expected, monkeypatch): + from pipeline_intent_gates import classifier_enabled + + monkeypatch.setenv("INTENT_CLASSIFIER_ENABLED", val) + assert classifier_enabled() is expected + + +class TestPipelineStateSkipFlags: + def test_record_web_research_skipped_persists(self): + from pipeline_completion_state import ( + get_web_research_skipped, + record_web_research_skipped, + ) + + sid = f"test-961-{os.getpid()}" + record_web_research_skipped(sid, issue_number=961, reason="classifier:config") + assert get_web_research_skipped(sid, issue_number=961) is True + + def test_record_plan_critic_skipped_with_reason(self): + from pipeline_completion_state import ( + get_plan_critic_skipped, + record_plan_critic_skipped, + ) + + sid = f"test-961-pc-{os.getpid()}" + record_plan_critic_skipped(sid, issue_number=961, reason="classifier") + assert get_plan_critic_skipped(sid, issue_number=961) is True + + def test_get_plan_critic_skipped_backward_compat_bool(self): + """get_plan_critic_skipped MUST handle legacy bool=True entries (#961 backward compat).""" + from pipeline_completion_state import ( + _ensure_state, + _write_state, + get_plan_critic_skipped, + ) + + sid = f"test-961-legacy-{os.getpid()}" + # Write a legacy bool entry directly + state = _ensure_state(sid) + state.setdefault("plan_critic_skipped", {})["0"] = True + _write_state(sid, state) + assert get_plan_critic_skipped(sid) is True + + def test_web_research_skipped_returns_false_for_unset(self): + from pipeline_completion_state import get_web_research_skipped + + sid = f"test-961-unset-{os.getpid()}" + assert get_web_research_skipped(sid, issue_number=0) is False diff --git a/tests/unit/test_implement_command_structure.py b/tests/unit/test_implement_command_structure.py index 7ca02e5a..462b2c5c 100644 --- a/tests/unit/test_implement_command_structure.py +++ b/tests/unit/test_implement_command_structure.py @@ -41,11 +41,11 @@ class TestCoordinatorSize: """Verify the coordinator is thin, not monolithic.""" def test_implement_md_under_405_lines(self, implement_content): - """implement.md should be under 575 lines total (was 510 before #531/#532/#533/#485 enforcement additions).""" + """implement.md should be under 1650 lines total (#961 added Phase 2 classifier gates at STEP 3.6 and STEP 5.5a.1).""" total_lines = len(implement_content.strip().split("\n")) - assert total_lines <= 575, ( - f"implement.md is {total_lines} lines — should be <= 575 " - f"(thin coordinator + Light Pipeline + ordering/verbatim/tracking enforcement)." + assert total_lines <= 1650, ( + f"implement.md is {total_lines} lines — should be <= 1650 " + f"(thin coordinator + Light Pipeline + ordering/verbatim/tracking enforcement + #961 classifier gates)." )