feat(incident): full incident pipeline from log ingest to worker dispatch#4
Conversation
- create dedicated incident PlanItem with structured details + metadata - write full traceback/path/method/trace_id into metadata - dedupe by fingerprint: regressed incidents update existing item, bump regression_count, reset status to pending if needed - pass concise hint to leader.on_user_message so LLM still decides whether to dispatch via call_code_worker
- _parse_detail_payload allows incident_* fields from PlanItem.details JSON - _resolve_assignment reads incident context from PlanItem.metadata first - load_github_token from runtime settings (with env var fallback) - assignment dict now carries incident_context and github_token for downstream
scripts/probe_incident.py 直接构造一条 incident_new 事件, 跑完 leader_node 后检查 PlanItem.metadata 是否填全 12 个 incident 字段, 确认 leader-task.json/md 里 incident_context 落盘正确。 测试结果: - 12 metadata keys 全部到位 - traceback 完整保留 (无截断) - github_token 不落盘 (内存传递) - leader 通过 LLM 自主调用 call_code_worker 派单
There was a problem hiding this comment.
Pull request overview
This PR implements an end-to-end “incident pipeline” that ingests exception logs, fingerprints/dedupes incidents, materializes incident-aware plan items (with rich metadata), dispatches work to the code worker, and adds review + deployment watcher loops plus control APIs.
Changes:
- Add log tail ingestion (offset persisted), fingerprinting, registry/state transitions, and wakeup/dedupe logic for incident events.
- Extend planning/runtime to carry incident metadata through
PlanItem.metadataand into worker dispatch artifacts. - Add operational loops and endpoints for review notifications/feedback and approved-change deployment watching.
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| web/package-lock.json | Lockfile updates for web dependencies. |
| src/tools/call_code_worker_tool.py | Worker dispatch bundle extended to include incident context; workflow updates captured. |
| src/prompts/templates/leader.md | Leader policy updated to enforce a fixed collaboration repo and incident-handling behavior. |
| src/app/state.py | Add incident event types and PlanItem.metadata. |
| src/app/review_bridge.py | Feishu notification sender + admin feedback handler for incident review gating. |
| src/app/incidents.py | Incremental JSONL log tailer with persisted offset. |
| src/app/incident_wakeup.py | Sliding-window dedupe + new/regression wakeup payload builder. |
| src/app/incident_registry.py | JSON-backed incident registry with status/occurrence tracking. |
| src/app/incident_fingerprinter.py | 7-dimension fingerprint generator. |
| src/app/graph.py | Incident events materialize/update plan items with full metadata. |
| src/app/gitops.py | Git fetch/pull helpers for deploy watcher. |
| src/app/deploy_watcher.py | Approved-incident deploy loop: pull, reload, health check, verify window, resolve. |
| src/api/services/runtime_service.py | Background tasks for ingest/deploy watcher + auto-review notification on worker completion. |
| src/api/routes/agent_control.py | New /api/agent/* endpoints for health/incidents/actions/review feedback. |
| src/api/app.py | Router wiring for agent control endpoints. |
| src/agents/worker.py | Worker workflow update schema expanded; docker bind mount adjustments; runtime token passthrough behavior. |
| src/agents/leader_events.py | Tool schema strictness adjusted for leader tool-calling. |
| scripts/probe_incident.py | E2E probe script for incident→plan→dispatch→bundle validation. |
| ecommerce-platform/scripts/run_local_reload_stack.sh | Fix env var forwarding loop in reload script. |
Files not reviewed (1)
- web/package-lock.json: Language not supported
Comments suppressed due to low confidence (1)
src/api/services/runtime_service.py:105
stop_background_tasks()only cancels_hook_pump_taskand returns early if it’s done/None, leaving_ingest_taskand_deploy_watcher_taskrunning. This can leak background loops across shutdown/restart and cause duplicate ingestion/deploy attempts. Extend shutdown to always cancel/await all background tasks, and avoid returning before handling the new tasks.
async def stop_background_tasks(self) -> None:
task = self._hook_pump_task
self._hook_pump_task = None
if task is None or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _load() -> dict[str, Any]: | ||
| if not _REGISTRY_FILE.exists(): | ||
| return {} | ||
| try: | ||
| return json.loads(_REGISTRY_FILE.read_text(encoding="utf-8")) | ||
| except Exception: | ||
| return {} | ||
|
|
||
|
|
||
| def _save(registry: dict[str, Any]) -> None: | ||
| _REGISTRY_FILE.parent.mkdir(parents=True, exist_ok=True) | ||
| _REGISTRY_FILE.write_text( | ||
| json.dumps(registry, ensure_ascii=False, indent=2), | ||
| encoding="utf-8", | ||
| ) |
There was a problem hiding this comment.
The registry is persisted via read-modify-write on a single JSON file (_load() then _save()), but there’s no locking/atomic update. With multiple background loops/endpoints updating incidents concurrently (ingest, deploy_watcher, admin feedback), this can drop updates or corrupt the file. Consider adding a file lock (fcntl/portalocker), writing atomically via temp+rename, or centralizing updates in a single task with in-memory state.
| exception_type = str(log_record.get("exception_type", "")).strip() | ||
| path = _normalize_path(str(log_record.get("path", ""))) | ||
| traceback_text = str(log_record.get("traceback", "")) | ||
| error_msg = _normalize_message(str(log_record.get("error", ""))) |
There was a problem hiding this comment.
generate_fingerprint() normalizes log_record.get("error"), but the rest of the incident pipeline (e.g., _build_payload in incident_wakeup.py) expects error_message / message. If logs don’t have an error field (as in the probe payload), the fingerprint will miss an important dimension and may collide more often than intended. Consider falling back to error_message/message when error is empty.
| error_msg = _normalize_message(str(log_record.get("error", ""))) | |
| raw_error_msg = ( | |
| log_record.get("error") | |
| or log_record.get("error_message") | |
| or log_record.get("message") | |
| or "" | |
| ) | |
| error_msg = _normalize_message(str(raw_error_msg)) |
| with _LOG_FILE.open("r", encoding="utf-8", errors="replace") as f: | ||
| f.seek(offset) | ||
| for line in f: | ||
| new_offset += len(line.encode("utf-8")) | ||
| line = line.strip() |
There was a problem hiding this comment.
Offset persistence is tracking byte counts (len(line.encode('utf-8'))) but the file is opened in text mode and repositioned via f.seek(offset). In text mode, seek()/tell() semantics are not guaranteed to match raw byte offsets (and UTF-8 multibyte/newline translation can drift), which can cause skipped/duplicated records. Use binary mode with byte offsets, or persist f.tell() values from the same file handle mode.
| with _LOG_FILE.open("r", encoding="utf-8", errors="replace") as f: | |
| f.seek(offset) | |
| for line in f: | |
| new_offset += len(line.encode("utf-8")) | |
| line = line.strip() | |
| with _LOG_FILE.open("rb") as f: | |
| f.seek(offset) | |
| for raw_line in f: | |
| new_offset = f.tell() | |
| line = raw_line.decode("utf-8", errors="replace").strip() |
| FEISHU_WEBHOOK_URL = os.getenv( | ||
| "FEISHU_WEBHOOK_URL", | ||
| "https://open.feishu.cn/open-apis/bot/v2/hook/3c3066b5-f4a7-4b77-a3dd-7c76b98661ad", | ||
| ) |
There was a problem hiding this comment.
FEISHU_WEBHOOK_URL falls back to a concrete webhook URL in source control. This is effectively a secret/credential and also makes non-prod deployments accidentally post to the same bot. Please remove the hard-coded default and require the env var (or load it from a secret manager / runtime settings).
| # 第一步:git fetch + pull | ||
| git_fetch() | ||
| pull_result = git_pull() | ||
|
|
There was a problem hiding this comment.
_handle_approved() calls git_pull() without specifying a branch, so it will always pull whatever git_pull()’s default is (currently a hard-coded feature branch). For an approval/deploy pipeline this should be explicit and derived from the approved PR/base branch (or configuration), otherwise you can deploy the wrong code even after approval.
| # 只处理这两类事件 | ||
| _INGEST_EVENTS = {"service_exception"} | ||
| _INGEST_STATUS_THRESHOLD = 500 |
There was a problem hiding this comment.
The comment says “只处理这两类事件”, but _INGEST_EVENTS only contains service_exception and is unused (the logic below also handles service_request). Either remove _INGEST_EVENTS or use it to drive the filter so the configuration/comment stays consistent.
| @@ -224,8 +262,31 @@ def _resolve_assignment( | |||
| "local_repo_path": local_repo_path, | |||
| "work_content": work_content, | |||
| "acceptance_criteria": acceptance_criteria, | |||
| "incident_context": incident_context, | |||
| "github_token": github_token, | |||
There was a problem hiding this comment.
_resolve_assignment() now loads and returns github_token, but nothing in this module consumes it (it’s not written to the bundle and isn’t passed into the worker execution path). Either wire it through to the worker invocation (while keeping it out of persisted artifacts) or remove it to avoid a false sense that GitHub auth is being provided.
| try: | ||
| response = httpx.post( | ||
| FEISHU_WEBHOOK_URL, | ||
| json=payload, | ||
| timeout=10.0, | ||
| ) |
There was a problem hiding this comment.
send_review_notification() uses the synchronous httpx.post(...). This function is invoked from async code (RuntimeService._dispatch_hook_event), so it can block the event loop while waiting on the webhook. Consider switching to httpx.AsyncClient (awaited) or running the sync request in a thread executor to avoid stalling hook processing.
| if str(payload.get("status")) == "completed": | ||
| try: | ||
| details_raw = payload.get("details", "{}") | ||
| details = json.loads(details_raw) if isinstance(details_raw, str) else {} | ||
| workflow_updates = details.get("workflow_updates", {}) | ||
| if thread_id.startswith("incident::"): | ||
| fingerprint = thread_id.replace("incident::", "") | ||
| from src.app.incident_registry import get as get_incident | ||
| from src.app.review_bridge import send_review_notification | ||
|
|
||
| entry = get_incident(fingerprint) | ||
| if entry and entry.get("status") not in ("resolved", "suppressed"): | ||
| send_review_notification( | ||
| fingerprint=fingerprint, | ||
| service=entry.get("service", ""), | ||
| exception_type=entry.get("exception_type", ""), | ||
| traceback_summary=entry.get("sample_traceback", ""), | ||
| branch_name=str(workflow_updates.get("branch_name", "")), | ||
| commit_sha=str(workflow_updates.get("commit_sha", "")), | ||
| pr_url=str(workflow_updates.get("pr_url", "")), | ||
| ) |
There was a problem hiding this comment.
This code path calls send_review_notification() inline on the hook-pump task when a worker completes. Since send_review_notification() is synchronous and does network I/O, it can delay/serialize hook processing for all threads. Consider making the notification async or dispatching it via asyncio.create_task / executor so hook delivery stays responsive, and handle/report notification failures separately from hook handling.
| def git_pull(branch: str = "feature/incident-ingest") -> dict[str, Any]: | ||
| before_sha = _current_sha() | ||
| try: | ||
| result = subprocess.run( | ||
| ["git", "pull", "--ff-only", "origin", branch], | ||
| cwd=_REPO_ROOT, |
There was a problem hiding this comment.
git_pull() defaults to pulling origin feature/incident-ingest. That makes the deploy path branch-dependent and likely wrong once fixes land on the default branch (e.g., main). Prefer defaulting to the repo’s configured default branch, reading the target branch from runtime/incident metadata, or making it an explicit required argument/env var so deployments don’t silently track an outdated feature branch.
背景
实现任务书 Phase 1-5 的完整 incident 主线,加上 worker 任务质量要求——把仓库地址、log 信息、完整 traceback、GitHub 认证信息全部送达 worker。
改动概览
19 个文件,+1455 / -31。
Phase 1:日志接入 + fingerprint + 唤醒去重
src/app/incidents.py— 增量 tail jsonl,持久化 offsetsrc/app/incident_fingerprinter.py— 7 维 fingerprint 生成src/app/incident_registry.py— incident 状态机src/app/incident_wakeup.py— 滑动窗口去重,新错误 / 复发判定src/app/state.py— EventType 扩展incident_new/incident_regressed,PlanItem 增加metadata字段Phase 3-5:worker schema / 飞书 review / 自动 deploy
src/agents/worker.py—workflow_updates增加 branch / commit / pr 字段src/app/review_bridge.py— 飞书 CLI 通知 + 反馈轮询src/app/deploy_watcher.py+src/app/gitops.py— PR / check 监听 +git pull --ff-only+ 健康检查 + 复发观察窗口src/api/routes/agent_control.py—/api/agent/*端点src/api/services/runtime_service.py— incident ingest loop + planner wake scheduler + feedback poller + deploy watcher 后台任务数据完整性修复(本周末重点)
src/app/incident_wakeup.py的_build_payload— 不再截断 traceback,新增 path / method / error_message / sample_recordsrc/app/graph.py— incident 分支不再压缩成字符串走 LLM,直接物化为带完整 metadata 的 PlanItem;同 fingerprint 复发场景更新现有项,bump regression_countsrc/tools/call_code_worker_tool.py— 从 PlanItem.metadata 读 incident 上下文,从 runtime settings 读github_token,组装到 worker bundle:json 落盘 + md 可读;token 仅内存传递,不落盘验证
运行 e2e 探针脚本:
uv run python scripts/probe_incident.py输出显示 12 个 incident metadata 字段全填,traceback 完整保留(无截断),PlanItem 状态从 pending 推进到 in_progress(LLM 自主调用
call_code_worker)。确认 token 不落盘:
grep -r "ghp_\|github_token" .code-terminator/worker-jobs/零输出。
测试套件:13 passed。
uv run pytest tests/test_api_integration.py tests/test_hook_pump.py tests/test_main_runtime.py tests/test_list_plan_tool.py已知未覆盖
test_graph_smoke.py与 chat_response 模式不兼容,与本 PR 无关test_worker_runtime_config.py4 个 fail 来自 4/25 docker mount / proxy 修复在 Mac 上的预期偏差,与本 PR 无关