Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9c65635
修改state字段,新增incident字段
Apr 25, 2026
746f50f
修改状态字段
Apr 25, 2026
6cb7a59
修改leader处理路由,接收线上告警
Apr 25, 2026
e2753e7
feat: add incident fingerprinting and wakeup modules
Apr 25, 2026
774c0c1
feature add runtime_service
Apr 25, 2026
e632bc4
第一阶段调整完成,感知电商平台报错,唤醒leader处理
Apr 25, 2026
87393b7
feature add router to get_error
Apr 25, 2026
c760825
feat: extend worker schema with branch/commit/pr fields (phase 3)
Apr 25, 2026
c3cfcb0
feat: add feishu review notification (phase 4)
Apr 25, 2026
aaac2e1
feat: add review feedback endpoint (phase 4)
Apr 25, 2026
3fd01a9
feat: add deploy watcher and gitops (phase 5)
Apr 25, 2026
1e25511
chore: update package-lock
Apr 25, 2026
312f1e3
fix: filter service_request records without traceback
Apr 25, 2026
f8baed1
fix: filter empty traceback, regression detection verified
Apr 25, 2026
bfb1251
feat: connect doubao model, fix docker mount, add default repo_url, f…
Apr 25, 2026
924fa47
fix: remove duplicate incident policy, fix repo creation conflict
Apr 26, 2026
7466a60
fix: preserve full traceback and request context in incident payload
Apr 26, 2026
aa330d8
feat: materialize incident payload into PlanItem with full metadata
Apr 26, 2026
cff5631
feat: extend worker dispatch to read incident metadata + github_token
Apr 26, 2026
4900e36
test: add e2e probe script for incident → worker context propagation
Apr 26, 2026
5076ee4
Merge branch 'main' into feature/incident-ingest
xubinrui Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ecommerce-platform/scripts/run_local_reload_stack.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ start_service() {
local port="$3"
local service_name="$4"
shift 4
local extra_env=("$@")
local logfile="$LOG_DIR/$name.log"
local pidfile="$PID_DIR/$name.pid"
if [[ -f "$pidfile" ]] && kill -0 "$(cat "$pidfile")" 2>/dev/null; then
Expand All @@ -89,7 +88,7 @@ start_service() {
fi

local env_prefix=""
for kv in "${extra_env[@]}"; do
for kv in "$@"; do
env_prefix+="export ${kv}; "
done
nohup bash -lc "
Expand Down
191 changes: 191 additions & 0 deletions scripts/probe_incident.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""手工灌 incident → 再喂用户消息让 Leader 调 call_code_worker → 检查 worker bundle。"""

from __future__ import annotations

import asyncio
import json
import sys
import time
from pathlib import Path
from unittest.mock import patch

_ROOT = Path(__file__).resolve().parents[1]
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))

from src.app.graph import build_graph
from src.app.state import EventEnvelope


def _incident_task_id(plan_items: list[dict]) -> str:
for item in plan_items:
meta = item.get("metadata") or {}
if meta.get("incident_fingerprint"):
return str(item.get("task_id", "")).strip()
for item in plan_items:
tid = str(item.get("task_id", "")).strip()
if tid.startswith("incident-"):
return tid
return ""


def _noop_real_worker(*_args: object, **_kwargs: object) -> None:
"""不写 Docker:bundle 已在 CallCodeWorkerTool.run 里落盘,此处跳过容器。"""
return


def _find_latest_bundle_json(after_mtime: float | None = None) -> Path | None:
root = _ROOT / ".code-terminator" / "worker-jobs"
if not root.is_dir():
return None
candidates: list[Path] = []
for p in root.rglob("leader-task.json"):
try:
st = p.stat()
except OSError:
continue
if after_mtime is not None and st.st_mtime < after_mtime:
continue
candidates.append(p)
if not candidates:
return None
return max(candidates, key=lambda p: p.stat().st_mtime)


def main() -> None:
graph = build_graph()
fake_record = {
"service": "order-service",
"exception_type": "KeyError",
"traceback": (
"Traceback (most recent call last):\n"
' File "services/order/service.py", line 88, in _coupon_discount\n'
" return COUPON_DISCOUNTS[code]\n"
"KeyError: 'FLASH50'"
),
"trace_id": "trace-probe-001",
"path": "/api/v1/orders",
"method": "POST",
"status_code": 500,
"error_message": "'FLASH50'",
}
fingerprint = "probe-fp-001"
thread_id = f"incident::{fingerprint}"
initial_state = {
"task": "probe incident",
"conversation_turns": [],
"conversation_summary": "",
"task_units": [],
"worker_outputs": [],
"reviewer_outputs": [],
"final_output": "",
"errors": [],
"core_memory": {"workflow": {"thread_id": thread_id}},
"plan_items": [],
"event_log": [],
"current_event": EventEnvelope(
event_id="evt-probe-001",
event_type="incident_new",
payload={
"fingerprint": fingerprint,
"thread_id": thread_id,
"service": fake_record["service"],
"exception_type": fake_record["exception_type"],
"traceback": fake_record["traceback"],
"traceback_summary": fake_record["traceback"][:400],
"trace_id": fake_record["trace_id"],
"path": fake_record["path"],
"method": fake_record["method"],
"status_code": fake_record["status_code"],
"error_message": fake_record["error_message"],
"occurrence_count": 2,
"wake_reason": "incident_new",
"sample_record": fake_record,
"incident_entry": {},
},
).model_dump(),
"dispatch_queue": [],
}

final = asyncio.run(graph.ainvoke(initial_state))
print("=== after incident: plan_items ===")
for item in final["plan_items"]:
print(item.get("task_id"), "|", item.get("status"))
meta = item.get("metadata", {})
print(" metadata keys:", list(meta.keys()))
print(" fingerprint:", meta.get("incident_fingerprint"))
print(" service:", meta.get("incident_service"))
print(" traceback_len:", len(str(meta.get("incident_traceback", ""))))

task_id = _incident_task_id(final["plan_items"])
if not task_id:
print("ERROR: no incident plan task_id; cannot ask for call_code_worker.")
return

dispatch_msg = (
f"请基于该 task_id={task_id} 调用 call_code_worker 工具派单给 worker,"
"使用当前计划项中的 details 与 metadata。"
)
state2 = {
**final,
"current_event": EventEnvelope(
event_id="evt-probe-dispatch",
event_type="user_input",
payload={"message": dispatch_msg},
).model_dump(),
}

from src.tools.call_code_worker_tool import CallCodeWorkerTool

bundle_cutoff = time.time() - 2.0
with patch.object(
CallCodeWorkerTool,
"_run_real_worker_and_emit_hook",
_noop_real_worker,
):
final2 = asyncio.run(graph.ainvoke(state2))

print("\n=== after dispatch message: final_output (head) ===")
out = str(final2.get("final_output", ""))
print((out[:800] + "…") if len(out) > 800 else out)

bundle_json = _find_latest_bundle_json(after_mtime=bundle_cutoff)
if bundle_json is None:
bundle_json = _find_latest_bundle_json()

if bundle_json is None or not bundle_json.is_file():
print("\nERROR: leader-task.json not found under .code-terminator/worker-jobs/")
print("Leader may not have invoked call_code_worker; check LLM / API keys.")
return

md_path = bundle_json.with_suffix(".md")
print(f"\n=== bundle dir ===\n{bundle_json.parent}\n")
print(f"leader-task.json: {bundle_json}")
print(f"leader-task.md: {md_path}")

data = json.loads(bundle_json.read_text(encoding="utf-8"))
ic = data.get("incident_context") or {}
print("\n=== leader-task.json incident_context ===")
if not ic:
print("(empty or missing)")
else:
for k in sorted(ic.keys()):
v = ic[k]
preview = str(v) if len(str(v)) <= 120 else str(v)[:117] + "..."
print(f" {k}: {preview}")
tb = str(ic.get("traceback", ""))
print(f"\n traceback full length: {len(tb)} chars")

md_text = md_path.read_text(encoding="utf-8") if md_path.is_file() else ""
print("\n=== leader-task.md: Incident Context section ===")
if "## Incident Context" in md_text:
start = md_text.index("## Incident Context")
end = md_text.find("\n## ", start + 1)
chunk = md_text[start:] if end == -1 else md_text[start:end]
print(chunk.strip())
else:
print("MISSING: no '## Incident Context' heading in leader-task.md")


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion src/agents/leader_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"or execution work is needed. If you are fully ready to reply to the user, "
"do not call any tool and answer in markdown directly."
),
"strict": True,
"strict": False,
"parameters": {
"type": "object",
"additionalProperties": False,
Expand Down
21 changes: 18 additions & 3 deletions src/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
"properties": {
"repo_url": {"type": "string"},
"collaboration_target": {"type": "string"},
"branch_name": {"type": "string"},
"commit_sha": {"type": "string"},
"pr_url": {"type": "string"},
"base_branch": {"type": "string"},
"feishu_message_hint": {"type": "string"},
},
"required": [],
},
Expand All @@ -73,7 +78,7 @@ class WorkerCodexConfig:
docker_image: str
container_workspace: str
codex_bin: str
host_node_root: str
host_node_root: str | None
container_host_node_root: str
model: str
timeout_seconds: int
Expand Down Expand Up @@ -546,8 +551,8 @@ def execute_leader_assignment(
"run",
"--rm",
"-i",
"-v",
f"{host_job_dir}:{container_workspace}",
"--mount",
f"type=bind,source={host_job_dir},target={container_workspace}",
"-w",
container_workspace,
"--entrypoint",
Expand Down Expand Up @@ -853,6 +858,16 @@ def _normalize_worker_workflow_updates(value: Any) -> dict[str, str]:
normalized["repo_url"] = repo_url
if collaboration_target:
normalized["collaboration_target"] = collaboration_target
for key in (
"branch_name",
"commit_sha",
"pr_url",
"base_branch",
"feishu_message_hint",
):
val = str(value.get(key, "")).strip()
if val:
normalized[key] = val
return normalized


Expand Down
2 changes: 2 additions & 0 deletions src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from src.api.routes.health import router as health_router
from src.api.routes.history import router as history_router
from src.api.routes.settings import router as settings_router
from src.api.routes.agent_control import router as agent_control_router
from src.observability import setup_logging


Expand Down Expand Up @@ -39,6 +40,7 @@ async def _shutdown() -> None:
app.include_router(chat_router, prefix="/api")
app.include_router(history_router, prefix="/api")
app.include_router(settings_router, prefix="/api")
app.include_router(agent_control_router, prefix="/api")
return app


Expand Down
103 changes: 103 additions & 0 deletions src/api/routes/agent_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

from typing import Any

from fastapi import APIRouter, Depends

from src.api.deps import get_runtime_service
from src.api.services.runtime_service import RuntimeService
from src.app.incident_registry import all_entries, get, set_status
from src.app.incidents import _load_offset, _LOG_FILE

router = APIRouter(prefix="/agent", tags=["agent"])


@router.get("/health")
def agent_health(service: RuntimeService = Depends(get_runtime_service)) -> dict[str, Any]:
entries = all_entries()
return {
"ingest_enabled": True,
"log_file": str(_LOG_FILE),
"log_file_exists": _LOG_FILE.exists(),
"last_log_offset": _load_offset(),
"incident_total": len(entries),
"incident_by_status": _count_by_status(entries),
Comment on lines +18 to +24
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/agent/health reports ingest_enabled: True unconditionally, which can be misleading when CODE_TERMINATOR_AGENT_ENABLE_INGEST=0 or when the ingest task failed to start. Consider reflecting the actual enablement flag and/or whether _ingest_task is running (and avoid relying on private fields when possible).

Copilot uses AI. Check for mistakes.
"planner_active": service._role_status.get("leader", {}).status
if hasattr(service._role_status.get("leader", {}), "status")
else "unknown",
}


@router.get("/incidents")
def list_incidents() -> dict[str, Any]:
entries = all_entries()
return {
"total": len(entries),
"incidents": entries,
}


@router.post("/incidents")
def incident_action(body: dict[str, Any]) -> dict[str, Any]:
"""
支持三个动作:
- action=suppress, fingerprint=xxx → 压制某个 incident
- action=resolve, fingerprint=xxx → 手动标记为已解决
- action=rescan → 手动触发重新扫描(清 offset)
"""
action = str(body.get("action", "")).strip()
fingerprint = str(body.get("fingerprint", "")).strip()

if action == "suppress" and fingerprint:
set_status(fingerprint, "suppressed")
return {"ok": True, "action": "suppress", "fingerprint": fingerprint}

if action == "resolve" and fingerprint:
set_status(fingerprint, "resolved")
return {"ok": True, "action": "resolve", "fingerprint": fingerprint}

if action == "rescan":
from src.app.incidents import _OFFSET_FILE

if _OFFSET_FILE.exists():
_OFFSET_FILE.unlink()
return {
"ok": True,
"action": "rescan",
"message": "offset cleared, next ingest cycle will rescan",
}

return {"ok": False, "error": f"unknown action: {action}"}


@router.post("/review/feedback")
def review_feedback(body: dict[str, Any]) -> dict[str, Any]:
"""
接收管理员审批反馈。
body 格式:
{ "action": "approve", "incident_id": "2380f29e" }
{ "action": "reject", "incident_id": "2380f29e", "reason": "逻辑有问题" }
{ "action": "suppress","incident_id": "2380f29e" }
"""
from src.app.review_bridge import handle_admin_feedback

action = str(body.get("action", "")).strip()
incident_id = str(body.get("incident_id", "")).strip()
reason = str(body.get("reason", "")).strip()

if not action or not incident_id:
return {"ok": False, "error": "action 和 incident_id 都是必填项"}

return handle_admin_feedback(
action=action,
incident_id=incident_id,
reason=reason,
)


def _count_by_status(entries: list[dict[str, Any]]) -> dict[str, int]:
counts: dict[str, int] = {}
for entry in entries:
status = str(entry.get("status", "unknown"))
counts[status] = counts.get(status, 0) + 1
return counts
Loading
Loading