Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 148 additions & 6 deletions src/app/graph.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

import asyncio
import json
from datetime import UTC, datetime
from typing import Any, TypedDict
from uuid import uuid4

from langgraph.graph import END, START, StateGraph

Expand All @@ -13,6 +16,8 @@
from src.memory.types import ArchivalRecord
from src.memory.working_memory import WorkingMemory
from src.observability import get_logger, sanitize_text
from src.app.runtime_event_bus import RuntimeEventBus
from src.tools.call_code_worker_tool import CallCodeWorkerTool

logger = get_logger(__name__)

Expand Down Expand Up @@ -119,6 +124,8 @@ def leader_node(state: RuntimeState) -> RuntimeState:

if existing is None:
incident_task_id = f"incident-{(fingerprint or 'unknown')[:12]}"
target_task_id = incident_task_id
should_dispatch = True
details_text = "\n".join(
[
f"# Incident {incident_status.upper()}",
Expand Down Expand Up @@ -166,19 +173,74 @@ def leader_node(state: RuntimeState) -> RuntimeState:
int(existing.metadata.get("regression_count", 0) or 0) + 1
)
existing.metadata["regression_count"] = regression_count
should_dispatch = existing.status in ("completed", "failed")
if existing.status in ("completed", "failed"):
existing.status = "pending"
target_task_id = existing.task_id
leader_hint = (
f"复发提醒: incident task_id={existing.task_id} 再次触发 "
f"(regression_count={regression_count})。最新 traceback "
f"与日志已更新到 metadata。请重新评估是否需要再次派单。"
)

plan_items, produced_event = leader.on_user_message(
message=leader_hint,
plan_items=plan_items,
conversation_turns=conversation_turns,
conversation_summary=state.get("conversation_summary", ""),
if _has_active_code_worker_call(
core_memory=state["core_memory"],
task_id=target_task_id,
):
should_dispatch = False

if should_dispatch:
dispatch_result = _dispatch_incident_code_worker(
core_memory=state["core_memory"],
thread_id=thread_id,
task_id=target_task_id,
plan_items=plan_items,
)
if dispatch_result.get("ok"):
for item in plan_items:
if item.task_id == target_task_id:
item.status = "in_progress"
break
_append_graph_activity(
state["core_memory"],
thread_id=thread_id,
message=(
f"Incident {target_task_id} has been dispatched to "
"the code worker."
),
kind="success",
)
else:
state["errors"].append(
f"incident code worker dispatch failed: {dispatch_result}"
)
_append_graph_activity(
state["core_memory"],
thread_id=thread_id,
message=(
f"Incident {target_task_id} code worker dispatch failed."
),
kind="error",
)
else:
dispatch_result = {
"ok": True,
"event": "code_worker_already_active",
"task_id": target_task_id,
"status": "in_progress",
}
_append_graph_activity(
state["core_memory"],
thread_id=thread_id,
message=(
f"Incident {target_task_id} already has an active code worker."
),
kind="info",
)
produced_event = EventEnvelope(
event_id=f"{current_event.event_id}-dispatch",
event_type="system",
payload=dispatch_result,
)
conversation_turns.append(
ConversationTurn(role="user", content=leader_hint)
Expand Down Expand Up @@ -307,7 +369,7 @@ def aggregate_node(state: RuntimeState) -> RuntimeState:
chat_response = ""
if isinstance(workflow, dict):
chat_response = str(workflow.get("chat_response", "")).strip()
if chat_response:
if chat_response and not state.get("worker_outputs") and not state.get("reviewer_outputs"):
turns = [ConversationTurn.model_validate(item) for item in state.get("conversation_turns", [])]
turns.append(ConversationTurn(role="assistant", content=chat_response))
state["conversation_turns"] = [turn.model_dump() for turn in turns[-24:]]
Expand Down Expand Up @@ -336,3 +398,83 @@ def build_graph(*, checkpointer: Any | None = None, store: Any | None = None):
graph_builder.add_edge("dispatch_parallel", "aggregate")
graph_builder.add_edge("aggregate", END)
return graph_builder.compile(checkpointer=checkpointer, store=store)


def _dispatch_incident_code_worker(
*,
core_memory: dict[str, Any],
thread_id: str,
task_id: str,
plan_items: list[PlanItem],
) -> dict[str, Any]:
tool = CallCodeWorkerTool()
raw = tool.run(
core_memory=core_memory,
thread_id=thread_id,
task_id=task_id,
plan_items=[item.model_dump() for item in plan_items],
)
try:
payload = json.loads(raw)
if isinstance(payload, dict):
return payload
except Exception:
pass
return {"ok": False, "error": "invalid_call_code_worker_result", "raw": raw}


def _has_active_code_worker_call(
*,
core_memory: dict[str, Any],
task_id: str,
) -> bool:
workflow = core_memory.get("workflow", {})
if not isinstance(workflow, dict):
return False
calls = workflow.get("code_worker_calls", [])
if not isinstance(calls, list):
return False
for call in calls:
if not isinstance(call, dict):
continue
if str(call.get("task_id", "")).strip() != task_id:
continue
if str(call.get("status", "")).strip() == "in_progress":
return True
return False


def _append_graph_activity(
core_memory: dict[str, Any],
*,
thread_id: str,
message: str,
kind: str = "info",
) -> None:
cleaned = " ".join(message.strip().split())
if not cleaned:
return
workflow = core_memory.setdefault("workflow", {})
if not isinstance(workflow, dict):
workflow = {}
core_memory["workflow"] = workflow
entries = workflow.setdefault("activity_log", [])
if not isinstance(entries, list):
entries = []
workflow["activity_log"] = entries
entry = {
"entry_id": f"log-{uuid4().hex[:10]}",
"message": cleaned,
"kind": kind if kind in {"info", "success", "warning", "error"} else "info",
"created_at": datetime.now(UTC).isoformat(timespec="seconds"),
}
entries.append(entry)
workflow["activity_log"] = entries[-60:]
if thread_id:
RuntimeEventBus.push(
thread_id,
{
"event_type": "log",
"payload": entry,
},
)
14 changes: 12 additions & 2 deletions src/tools/call_code_worker_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ def _resolve_assignment(
details = target_item.details.strip()
parsed_details = self._parse_detail_payload(details)
job_root = self._resolve_job_root(workflow=workflow)
thread_stem = thread_id or "thread-unset"
job_directory = job_root / thread_stem / f"{task_id}__{subworker_id}"
thread_stem = self._safe_path_segment(thread_id or "thread-unset")
task_stem = self._safe_path_segment(task_id or "task-unset")
subworker_stem = self._safe_path_segment(subworker_id or "subworker-unset")
job_directory = job_root / thread_stem / f"{task_stem}__{subworker_stem}"
job_directory.mkdir(parents=True, exist_ok=True)

work_content = parsed_details.get("work_content", "").strip()
Expand Down Expand Up @@ -298,6 +300,14 @@ def _resolve_job_root(*, workflow: dict[str, Any]) -> Path:
return root
return (Path.cwd() / ".code-terminator" / "worker-jobs").resolve()

@staticmethod
def _safe_path_segment(value: str) -> str:
cleaned = "".join(
char if char.isalnum() or char in {"-", "_", "."} else "-"
for char in value.strip()
).strip(".-_")
return cleaned or "unset"

@staticmethod
def _parse_detail_payload(details: str) -> dict[str, str]:
if not details:
Expand Down
64 changes: 60 additions & 4 deletions tests/test_graph_smoke.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,70 @@
import asyncio
from unittest.mock import patch

from src.main import run


def _set_plan_step() -> dict:
return {
"thought": "create deterministic smoke plan",
"is_final": False,
"final_reply": "",
"workflow_updates": {"repo_url": "https://github.com/acme/demo-repo"},
"action": {
"name": "list_plan_set",
"arguments": {
"tasks": [
{
"content": "Implement smoke change",
"details": "repo_url=https://github.com/acme/demo-repo",
"assignee": "worker",
},
{
"content": "Review smoke change",
"details": "review implementation result",
"assignee": "reviewer",
},
],
"task_id": "",
"content": "",
"details": "",
"assignee": "",
},
},
}


def _finish_step() -> dict:
return {
"thought": "done",
"is_final": True,
"final_reply": "Plan ready.",
"workflow_updates": {},
"action": {
"name": "finish",
"arguments": {
"tasks": [],
"task_id": "",
"content": "",
"details": "",
"assignee": "",
},
},
}


def test_graph_smoke() -> None:
result = asyncio.run(
run(
"任务目标:在现有仓库执行(repo_url=https://github.com/acme/demo-repo,new_repo=false)。请建立 issue 与 PR 维护计划。"
steps = iter([_set_plan_step(), _finish_step()])
with patch(
"src.agents.leader_events.LeaderEventKernel._llm_react_step",
side_effect=lambda **_: next(steps),
):
result = asyncio.run(
run(
"Task goal: work in existing repo "
"(repo_url=https://github.com/acme/demo-repo, new_repo=false)."
)
)
)
assert "Worker Results" in result["final_output"]
assert "Reviewer Results" in result["final_output"]
assert len(result["worker_outputs"]) >= 1
Expand Down
Loading
Loading