diff --git a/.gitignore b/.gitignore index 01fdc81..3c9b5a6 100644 --- a/.gitignore +++ b/.gitignore @@ -213,6 +213,7 @@ __marimo__/ artifacts/logs/ cliagent/shell_gpt/ opms-collab/ +kimi-cliagent-benchmark-latest.tar # OS cruft .DS_Store diff --git a/src/api/services/runtime_service.py b/src/api/services/runtime_service.py index b4c23c5..4eed450 100644 --- a/src/api/services/runtime_service.py +++ b/src/api/services/runtime_service.py @@ -515,7 +515,10 @@ async def _incident_ingest_loop(self) -> None: continue thread_id = wakeup["thread_id"] event_type = str(wakeup.get("event_type", "")) - should_resume = event_type != "incident_new" + should_resume = event_type in { + "incident_new", + "incident_regressed", + } event_payload = { "event_id": f"evt-incident-{uuid.uuid4().hex[:6]}", "event_type": event_type, diff --git a/src/app/auto_review_merge.py b/src/app/auto_review_merge.py index 8274605..f0bfdab 100644 --- a/src/app/auto_review_merge.py +++ b/src/app/auto_review_merge.py @@ -4,9 +4,11 @@ import re import subprocess from typing import Any +from urllib.parse import quote import httpx +from src.app.ecommerce_target import ecommerce_deploy_branch, ecommerce_repo_url from src.app.incident_registry import set_status from src.observability import get_logger, sanitize_text from src.runtime_settings import load_runtime_settings @@ -16,6 +18,9 @@ _PR_URL_RE = re.compile( r"^https://github\.com/(?P[^/\s]+)/(?P[^/\s]+)/pull/(?P\d+)" ) +_REPO_URL_RE = re.compile( + r"^https://github\.com/(?P[^/\s]+)/(?P[^/\s]+?)(?:\.git)?/?$" +) def auto_review_merge_reload( @@ -28,8 +33,28 @@ def auto_review_merge_reload( commit_sha: str = "", pr_url: str = "", ) -> dict[str, Any]: - del service, exception_type, traceback_summary, branch_name, commit_sha + del service, exception_type, traceback_summary normalized_pr = pr_url.strip() + normalized_branch = branch_name.strip() + token = load_runtime_settings().github_token.strip() + + if not normalized_pr and normalized_branch: + if not token: + return { + "ok": False, + "error": "missing_github_token", + "message": "GitHub token is required for automatic PR creation.", + } + created = _ensure_pr_for_branch( + branch_name=normalized_branch, + commit_sha=commit_sha.strip(), + fingerprint=fingerprint, + token=token, + ) + if not created["ok"]: + return {"ok": False, "error": "create_pr_failed", **created} + normalized_pr = str(created.get("pr_url", "")).strip() + if not normalized_pr: return { "ok": False, @@ -37,7 +62,6 @@ def auto_review_merge_reload( "message": "Worker did not return a PR URL; falling back to manual review.", } - token = load_runtime_settings().github_token.strip() if not token: return { "ok": False, @@ -66,6 +90,58 @@ def auto_review_merge_reload( } +def _ensure_pr_for_branch( + *, + branch_name: str, + commit_sha: str, + fingerprint: str, + token: str, +) -> dict[str, Any]: + parsed = _parse_github_repo_url(ecommerce_repo_url()) + if parsed is None: + return { + "ok": False, + "stderr": f"Unsupported GitHub repo URL: {ecommerce_repo_url()}", + "status_code": 0, + } + owner, repo = parsed + head_query = quote(f"{owner}:{branch_name}", safe="") + existing = _github_request( + "GET", + f"https://api.github.com/repos/{owner}/{repo}/pulls?head={head_query}&state=open", + token=token, + ) + if existing["ok"] and isinstance(existing.get("json"), list): + pulls = existing["json"] + if pulls: + pr_url = str(pulls[0].get("html_url", "")).strip() + if pr_url: + return {"ok": True, "pr_url": pr_url, "json": pulls[0]} + + body_lines = [ + f"Auto-generated fix for incident `{fingerprint}`.", + "", + f"- Branch: `{branch_name}`", + ] + if commit_sha: + body_lines.append(f"- Commit: `{commit_sha}`") + created = _github_request( + "POST", + f"https://api.github.com/repos/{owner}/{repo}/pulls", + token=token, + json={ + "title": f"fix: resolve incident {fingerprint[:12]}", + "head": branch_name, + "base": ecommerce_deploy_branch(), + "body": "\n".join(body_lines), + }, + ) + if not created["ok"] or not isinstance(created.get("json"), dict): + return created + pr_url = str(created["json"].get("html_url", "")).strip() + return {**created, "pr_url": pr_url} + + def _approve_pr(pr_url: str, *, token: str) -> dict[str, Any]: parsed = _parse_github_pr_url(pr_url) if parsed: @@ -157,6 +233,13 @@ def _parse_github_pr_url(pr_url: str) -> tuple[str, str, str] | None: return match.group("owner"), match.group("repo"), match.group("number") +def _parse_github_repo_url(repo_url: str) -> tuple[str, str] | None: + match = _REPO_URL_RE.match(repo_url.strip()) + if match is None: + return None + return match.group("owner"), match.group("repo") + + def _github_request( method: str, url: str, diff --git a/src/app/graph.py b/src/app/graph.py index 91c9452..b01078a 100644 --- a/src/app/graph.py +++ b/src/app/graph.py @@ -2,6 +2,7 @@ import asyncio import json +import os from datetime import UTC, datetime from typing import Any, TypedDict from uuid import uuid4 @@ -17,6 +18,7 @@ from src.memory.working_memory import WorkingMemory from src.observability import get_logger, sanitize_text from src.app.runtime_event_bus import RuntimeEventBus +from src.app import incident_registry from src.tools.call_code_worker_tool import CallCodeWorkerTool logger = get_logger(__name__) @@ -173,8 +175,12 @@ def leader_node(state: RuntimeState) -> RuntimeState: int(existing.metadata.get("regression_count", 0) or 0) + 1 ) existing.metadata["regression_count"] = regression_count - should_dispatch = existing.status in ("completed", "failed") - if existing.status in ("completed", "failed"): + should_dispatch = existing.status in ( + "pending", + "completed", + "failed", + ) + if existing.status in ("pending", "completed", "failed"): existing.status = "pending" target_task_id = existing.task_id leader_hint = ( @@ -183,11 +189,15 @@ def leader_node(state: RuntimeState) -> RuntimeState: f"与日志已更新到 metadata。请重新评估是否需要再次派单。" ) - if _has_active_code_worker_call( + has_active_worker = _has_active_code_worker_call( core_memory=state["core_memory"], task_id=target_task_id, - ): + ) + if has_active_worker: should_dispatch = False + elif existing is not None and existing.status == "in_progress": + existing.status = "pending" + should_dispatch = True if should_dispatch: dispatch_result = _dispatch_incident_code_worker( @@ -197,6 +207,8 @@ def leader_node(state: RuntimeState) -> RuntimeState: plan_items=plan_items, ) if dispatch_result.get("ok"): + if fingerprint: + incident_registry.set_status(fingerprint, "running") for item in plan_items: if item.task_id == target_task_id: item.status = "in_progress" @@ -440,10 +452,37 @@ def _has_active_code_worker_call( if str(call.get("task_id", "")).strip() != task_id: continue if str(call.get("status", "")).strip() == "in_progress": + if _mark_stale_code_worker_call(call): + continue return True return False +def _mark_stale_code_worker_call(call: dict[str, Any]) -> bool: + stale_seconds_raw = ( + os.getenv("CODEX_WORKER_ACTIVE_STALE_SECONDS", "").strip() or "900" + ) + try: + stale_seconds = max(int(stale_seconds_raw), 60) + except ValueError: + stale_seconds = 900 + accepted_at = str(call.get("accepted_at", "")).strip() + if not accepted_at: + return False + try: + accepted_dt = datetime.fromisoformat(accepted_at) + if accepted_dt.tzinfo is None: + accepted_dt = accepted_dt.replace(tzinfo=UTC) + accepted_ts = accepted_dt.astimezone(UTC).timestamp() + except ValueError: + return False + if datetime.now(UTC).timestamp() - accepted_ts <= stale_seconds: + return False + call["status"] = "stale" + call["stale_at"] = datetime.now(UTC).isoformat(timespec="seconds") + return True + + def _append_graph_activity( core_memory: dict[str, Any], *, diff --git a/tests/test_auto_review_merge.py b/tests/test_auto_review_merge.py index 9f71aea..6257ea0 100644 --- a/tests/test_auto_review_merge.py +++ b/tests/test_auto_review_merge.py @@ -22,6 +22,98 @@ def test_auto_review_merge_requires_pr_url(monkeypatch: Any, tmp_path: Any) -> N assert result["error"] == "missing_pr_url" +def test_auto_review_merge_creates_pr_from_branch( + monkeypatch: Any, tmp_path: Any +) -> None: + monkeypatch.setenv("CODE_TERMINATOR_API_STATE_ROOT", str(tmp_path)) + monkeypatch.setenv("CODE_TERMINATOR_ECOMMERCE_REPO_URL", "https://github.com/acme/repo.git") + monkeypatch.setenv("CODE_TERMINATOR_ECOMMERCE_DEPLOY_BRANCH", "main") + monkeypatch.setattr( + incident_registry, + "_REGISTRY_FILE", + tmp_path / "incidents" / "registry.json", + ) + save_runtime_settings( + github_token="token-123", + auto_review_merge_reload=True, + ) + upsert("fp-branch", status="waiting_review", service="svc", exception_type="ValueError") + + calls: list[tuple[str, str, dict[str, Any] | None]] = [] + + class FakeResponse: + def __init__(self, status_code: int, payload: Any) -> None: + self.status_code = status_code + self._payload = payload + self.text = "ok" + self.content = b"ok" + + def json(self) -> Any: + return self._payload + + def fake_request( + method: str, + url: str, + *, + headers: dict[str, str], + json: dict[str, Any] | None = None, + timeout: float, + ) -> FakeResponse: + del timeout + assert headers["Authorization"] == "Bearer token-123" + calls.append((method, url, json)) + if method == "GET" and "pulls?head=" in url: + return FakeResponse(200, []) + if method == "POST" and url.endswith("/pulls"): + return FakeResponse(201, {"html_url": "https://github.com/acme/repo/pull/9"}) + if method == "GET" and url.endswith("/pulls/9"): + return FakeResponse( + 200, + { + "head": { + "ref": "worker-fix", + "repo": {"name": "repo", "owner": {"login": "acme"}}, + } + }, + ) + return FakeResponse(200, {"ok": True}) + + monkeypatch.setattr("src.app.auto_review_merge.httpx.request", fake_request) + + result = auto_review_merge_reload( + fingerprint="fp-branch", + service="svc", + exception_type="ValueError", + traceback_summary="trace", + branch_name="worker-fix", + commit_sha="abc123", + pr_url="", + ) + + assert result["ok"] is True + assert result["pr_url"] == "https://github.com/acme/repo/pull/9" + assert calls[0] == ( + "GET", + "https://api.github.com/repos/acme/repo/pulls?head=acme%3Aworker-fix&state=open", + None, + ) + assert calls[1] == ( + "POST", + "https://api.github.com/repos/acme/repo/pulls", + { + "title": "fix: resolve incident fp-branch", + "head": "worker-fix", + "base": "main", + "body": "Auto-generated fix for incident `fp-branch`.\n\n- Branch: `worker-fix`\n- Commit: `abc123`", + }, + ) + assert calls[2][0] == "POST" + assert calls[2][1] == "https://api.github.com/repos/acme/repo/pulls/9/reviews" + assert calls[3][0] == "PUT" + assert calls[3][1] == "https://api.github.com/repos/acme/repo/pulls/9/merge" + assert get("fp-branch")["status"] == "approved" + + def test_auto_review_merge_approves_and_merges( monkeypatch: Any, tmp_path: Any ) -> None: diff --git a/tests/test_incident_auto_dispatch.py b/tests/test_incident_auto_dispatch.py index 589e3b2..9ad770b 100644 --- a/tests/test_incident_auto_dispatch.py +++ b/tests/test_incident_auto_dispatch.py @@ -1,11 +1,12 @@ from __future__ import annotations import json +from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any from unittest.mock import patch -from src.app.graph import leader_node +from src.app.graph import _has_active_code_worker_call, leader_node from src.app.runtime_event_bus import RuntimeEventBus from src.app.state import EventEnvelope @@ -74,9 +75,11 @@ def test_incident_new_directly_dispatches_code_worker_bundle( with patch( "src.tools.call_code_worker_tool.CallCodeWorkerTool._run_real_worker_and_emit_hook", return_value=None, - ): + ), patch("src.app.graph.incident_registry.set_status") as set_status: result = leader_node(state) + set_status.assert_called_once_with("0ab1a7d9d8e49df7", "running") + plan_items = result["plan_items"] assert len(plan_items) == 1 assert plan_items[0]["task_id"] == "incident-0ab1a7d9d8e4" @@ -120,7 +123,7 @@ def test_incident_new_does_not_duplicate_active_code_worker( with patch( "src.tools.call_code_worker_tool.CallCodeWorkerTool._run_real_worker_and_emit_hook", return_value=None, - ): + ), patch("src.app.graph.incident_registry.set_status"): first = leader_node(state) second = leader_node( { @@ -133,3 +136,64 @@ def test_incident_new_does_not_duplicate_active_code_worker( calls = workflow["code_worker_calls"] assert len(calls) == 1 assert second["event_log"][-1]["payload"]["event"] == "code_worker_already_active" + + +def test_stale_code_worker_call_does_not_block_dispatch(monkeypatch: Any) -> None: + monkeypatch.setenv("CODEX_WORKER_ACTIVE_STALE_SECONDS", "60") + accepted_at = (datetime.now(UTC) - timedelta(minutes=10)).isoformat( + timespec="seconds" + ) + call = { + "task_id": "incident-0ab1a7d9d8e4", + "subworker_id": "subworker-old", + "status": "in_progress", + "accepted_at": accepted_at, + } + core_memory = {"workflow": {"code_worker_calls": [call]}} + + assert not _has_active_code_worker_call( + core_memory=core_memory, + task_id="incident-0ab1a7d9d8e4", + ) + assert call["status"] == "stale" + + +def test_incident_new_redispatches_when_existing_worker_is_stale( + monkeypatch: Any, + tmp_path: Path, +) -> None: + monkeypatch.setenv("CODEX_WORKER_ACTIVE_STALE_SECONDS", "60") + monkeypatch.setenv( + "CODE_TERMINATOR_ECOMMERCE_REPO_URL", + "https://github.com/KKK985429/ecommerce-platform-demo.git", + ) + + state = _incident_state(tmp_path) + with patch( + "src.tools.call_code_worker_tool.CallCodeWorkerTool._run_real_worker_and_emit_hook", + return_value=None, + ), patch("src.app.graph.incident_registry.set_status"): + first = leader_node(state) + + stale_accepted_at = (datetime.now(UTC) - timedelta(minutes=10)).isoformat( + timespec="seconds" + ) + first["core_memory"]["workflow"]["code_worker_calls"][0][ + "accepted_at" + ] = stale_accepted_at + + with patch( + "src.tools.call_code_worker_tool.CallCodeWorkerTool._run_real_worker_and_emit_hook", + return_value=None, + ), patch("src.app.graph.incident_registry.set_status"): + second = leader_node( + { + **first, + "current_event": _incident_state(tmp_path)["current_event"], + } + ) + + calls = second["core_memory"]["workflow"]["code_worker_calls"] + assert len(calls) == 2 + assert calls[0]["status"] == "stale" + assert calls[1]["status"] == "in_progress"