Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ __marimo__/
artifacts/logs/
cliagent/shell_gpt/
opms-collab/
kimi-cliagent-benchmark-latest.tar

# OS cruft
.DS_Store
Expand Down
5 changes: 4 additions & 1 deletion src/api/services/runtime_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,10 @@ async def _incident_ingest_loop(self) -> None:
continue
thread_id = wakeup["thread_id"]
event_type = str(wakeup.get("event_type", ""))
should_resume = event_type != "incident_new"
should_resume = event_type in {
"incident_new",
"incident_regressed",
}
event_payload = {
"event_id": f"evt-incident-{uuid.uuid4().hex[:6]}",
"event_type": event_type,
Expand Down
87 changes: 85 additions & 2 deletions src/app/auto_review_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import re
import subprocess
from typing import Any
from urllib.parse import quote

import httpx

from src.app.ecommerce_target import ecommerce_deploy_branch, ecommerce_repo_url
from src.app.incident_registry import set_status
from src.observability import get_logger, sanitize_text
from src.runtime_settings import load_runtime_settings
Expand All @@ -16,6 +18,9 @@
_PR_URL_RE = re.compile(
r"^https://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pull/(?P<number>\d+)"
)
_REPO_URL_RE = re.compile(
r"^https://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+?)(?:\.git)?/?$"
)


def auto_review_merge_reload(
Expand All @@ -28,16 +33,35 @@ def auto_review_merge_reload(
commit_sha: str = "",
pr_url: str = "",
) -> dict[str, Any]:
del service, exception_type, traceback_summary, branch_name, commit_sha
del service, exception_type, traceback_summary
normalized_pr = pr_url.strip()
normalized_branch = branch_name.strip()
token = load_runtime_settings().github_token.strip()

if not normalized_pr and normalized_branch:
if not token:
return {
"ok": False,
"error": "missing_github_token",
"message": "GitHub token is required for automatic PR creation.",
}
created = _ensure_pr_for_branch(
branch_name=normalized_branch,
commit_sha=commit_sha.strip(),
fingerprint=fingerprint,
token=token,
)
if not created["ok"]:
return {"ok": False, "error": "create_pr_failed", **created}
normalized_pr = str(created.get("pr_url", "")).strip()

if not normalized_pr:
return {
"ok": False,
"error": "missing_pr_url",
"message": "Worker did not return a PR URL; falling back to manual review.",
}

token = load_runtime_settings().github_token.strip()
if not token:
return {
"ok": False,
Expand Down Expand Up @@ -66,6 +90,58 @@ def auto_review_merge_reload(
}


def _ensure_pr_for_branch(
*,
branch_name: str,
commit_sha: str,
fingerprint: str,
token: str,
) -> dict[str, Any]:
parsed = _parse_github_repo_url(ecommerce_repo_url())
if parsed is None:
return {
"ok": False,
"stderr": f"Unsupported GitHub repo URL: {ecommerce_repo_url()}",
"status_code": 0,
}
owner, repo = parsed
head_query = quote(f"{owner}:{branch_name}", safe="")
existing = _github_request(
"GET",
f"https://api.github.com/repos/{owner}/{repo}/pulls?head={head_query}&state=open",
token=token,
)
if existing["ok"] and isinstance(existing.get("json"), list):
pulls = existing["json"]
if pulls:
pr_url = str(pulls[0].get("html_url", "")).strip()
if pr_url:
return {"ok": True, "pr_url": pr_url, "json": pulls[0]}

body_lines = [
f"Auto-generated fix for incident `{fingerprint}`.",
"",
f"- Branch: `{branch_name}`",
]
if commit_sha:
body_lines.append(f"- Commit: `{commit_sha}`")
created = _github_request(
"POST",
f"https://api.github.com/repos/{owner}/{repo}/pulls",
token=token,
json={
"title": f"fix: resolve incident {fingerprint[:12]}",
"head": branch_name,
"base": ecommerce_deploy_branch(),
"body": "\n".join(body_lines),
},
)
if not created["ok"] or not isinstance(created.get("json"), dict):
return created
pr_url = str(created["json"].get("html_url", "")).strip()
return {**created, "pr_url": pr_url}


def _approve_pr(pr_url: str, *, token: str) -> dict[str, Any]:
parsed = _parse_github_pr_url(pr_url)
if parsed:
Expand Down Expand Up @@ -157,6 +233,13 @@ def _parse_github_pr_url(pr_url: str) -> tuple[str, str, str] | None:
return match.group("owner"), match.group("repo"), match.group("number")


def _parse_github_repo_url(repo_url: str) -> tuple[str, str] | None:
match = _REPO_URL_RE.match(repo_url.strip())
if match is None:
return None
return match.group("owner"), match.group("repo")


def _github_request(
method: str,
url: str,
Expand Down
47 changes: 43 additions & 4 deletions src/app/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import json
import os
from datetime import UTC, datetime
from typing import Any, TypedDict
from uuid import uuid4
Expand All @@ -17,6 +18,7 @@
from src.memory.working_memory import WorkingMemory
from src.observability import get_logger, sanitize_text
from src.app.runtime_event_bus import RuntimeEventBus
from src.app import incident_registry
from src.tools.call_code_worker_tool import CallCodeWorkerTool

logger = get_logger(__name__)
Expand Down Expand Up @@ -173,8 +175,12 @@ def leader_node(state: RuntimeState) -> RuntimeState:
int(existing.metadata.get("regression_count", 0) or 0) + 1
)
existing.metadata["regression_count"] = regression_count
should_dispatch = existing.status in ("completed", "failed")
if existing.status in ("completed", "failed"):
should_dispatch = existing.status in (
"pending",
"completed",
"failed",
)
if existing.status in ("pending", "completed", "failed"):
existing.status = "pending"
target_task_id = existing.task_id
leader_hint = (
Expand All @@ -183,11 +189,15 @@ def leader_node(state: RuntimeState) -> RuntimeState:
f"与日志已更新到 metadata。请重新评估是否需要再次派单。"
)

if _has_active_code_worker_call(
has_active_worker = _has_active_code_worker_call(
core_memory=state["core_memory"],
task_id=target_task_id,
):
)
if has_active_worker:
should_dispatch = False
elif existing is not None and existing.status == "in_progress":
existing.status = "pending"
should_dispatch = True

if should_dispatch:
dispatch_result = _dispatch_incident_code_worker(
Expand All @@ -197,6 +207,8 @@ def leader_node(state: RuntimeState) -> RuntimeState:
plan_items=plan_items,
)
if dispatch_result.get("ok"):
if fingerprint:
incident_registry.set_status(fingerprint, "running")
for item in plan_items:
if item.task_id == target_task_id:
item.status = "in_progress"
Expand Down Expand Up @@ -440,10 +452,37 @@ def _has_active_code_worker_call(
if str(call.get("task_id", "")).strip() != task_id:
continue
if str(call.get("status", "")).strip() == "in_progress":
if _mark_stale_code_worker_call(call):
continue
return True
return False


def _mark_stale_code_worker_call(call: dict[str, Any]) -> bool:
stale_seconds_raw = (
os.getenv("CODEX_WORKER_ACTIVE_STALE_SECONDS", "").strip() or "900"
)
try:
stale_seconds = max(int(stale_seconds_raw), 60)
except ValueError:
stale_seconds = 900
accepted_at = str(call.get("accepted_at", "")).strip()
if not accepted_at:
return False
try:
accepted_dt = datetime.fromisoformat(accepted_at)
if accepted_dt.tzinfo is None:
accepted_dt = accepted_dt.replace(tzinfo=UTC)
accepted_ts = accepted_dt.astimezone(UTC).timestamp()
except ValueError:
return False
if datetime.now(UTC).timestamp() - accepted_ts <= stale_seconds:
return False
call["status"] = "stale"
call["stale_at"] = datetime.now(UTC).isoformat(timespec="seconds")
return True


def _append_graph_activity(
core_memory: dict[str, Any],
*,
Expand Down
92 changes: 92 additions & 0 deletions tests/test_auto_review_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,98 @@ def test_auto_review_merge_requires_pr_url(monkeypatch: Any, tmp_path: Any) -> N
assert result["error"] == "missing_pr_url"


def test_auto_review_merge_creates_pr_from_branch(
monkeypatch: Any, tmp_path: Any
) -> None:
monkeypatch.setenv("CODE_TERMINATOR_API_STATE_ROOT", str(tmp_path))
monkeypatch.setenv("CODE_TERMINATOR_ECOMMERCE_REPO_URL", "https://github.com/acme/repo.git")
monkeypatch.setenv("CODE_TERMINATOR_ECOMMERCE_DEPLOY_BRANCH", "main")
monkeypatch.setattr(
incident_registry,
"_REGISTRY_FILE",
tmp_path / "incidents" / "registry.json",
)
save_runtime_settings(
github_token="token-123",
auto_review_merge_reload=True,
)
upsert("fp-branch", status="waiting_review", service="svc", exception_type="ValueError")

calls: list[tuple[str, str, dict[str, Any] | None]] = []

class FakeResponse:
def __init__(self, status_code: int, payload: Any) -> None:
self.status_code = status_code
self._payload = payload
self.text = "ok"
self.content = b"ok"

def json(self) -> Any:
return self._payload

def fake_request(
method: str,
url: str,
*,
headers: dict[str, str],
json: dict[str, Any] | None = None,
timeout: float,
) -> FakeResponse:
del timeout
assert headers["Authorization"] == "Bearer token-123"
calls.append((method, url, json))
if method == "GET" and "pulls?head=" in url:
return FakeResponse(200, [])
if method == "POST" and url.endswith("/pulls"):
return FakeResponse(201, {"html_url": "https://github.com/acme/repo/pull/9"})
if method == "GET" and url.endswith("/pulls/9"):
return FakeResponse(
200,
{
"head": {
"ref": "worker-fix",
"repo": {"name": "repo", "owner": {"login": "acme"}},
}
},
)
return FakeResponse(200, {"ok": True})

monkeypatch.setattr("src.app.auto_review_merge.httpx.request", fake_request)

result = auto_review_merge_reload(
fingerprint="fp-branch",
service="svc",
exception_type="ValueError",
traceback_summary="trace",
branch_name="worker-fix",
commit_sha="abc123",
pr_url="",
)

assert result["ok"] is True
assert result["pr_url"] == "https://github.com/acme/repo/pull/9"
assert calls[0] == (
"GET",
"https://api.github.com/repos/acme/repo/pulls?head=acme%3Aworker-fix&state=open",
None,
)
assert calls[1] == (
"POST",
"https://api.github.com/repos/acme/repo/pulls",
{
"title": "fix: resolve incident fp-branch",
"head": "worker-fix",
"base": "main",
"body": "Auto-generated fix for incident `fp-branch`.\n\n- Branch: `worker-fix`\n- Commit: `abc123`",
},
)
assert calls[2][0] == "POST"
assert calls[2][1] == "https://api.github.com/repos/acme/repo/pulls/9/reviews"
assert calls[3][0] == "PUT"
assert calls[3][1] == "https://api.github.com/repos/acme/repo/pulls/9/merge"
assert get("fp-branch")["status"] == "approved"


def test_auto_review_merge_approves_and_merges(
monkeypatch: Any, tmp_path: Any
) -> None:
Expand Down
Loading
Loading