diff --git a/skills/storm/orchestrator/README.md b/skills/storm/orchestrator/README.md new file mode 100644 index 00000000000..6676dba6a7a --- /dev/null +++ b/skills/storm/orchestrator/README.md @@ -0,0 +1,9 @@ +# Orchestrator + +Long-running task automation framework for OpenClaw agents. + +Turn blocking operations into background daemons with instant response + completion callbacks. + +**Core idea:** Agent responds in 0.1s, loop runs independently, wakes agent when done. + +See [SKILL.md](./SKILL.md) for usage. diff --git a/skills/storm/orchestrator/SKILL.md b/skills/storm/orchestrator/SKILL.md new file mode 100644 index 00000000000..2059ef16259 --- /dev/null +++ b/skills/storm/orchestrator/SKILL.md @@ -0,0 +1,421 @@ +--- +name: orchestrator +description: "Long-running task automation framework. Use when: (1) tasks take >30s, (2) multi-step workflows with dependencies, (3) parallel execution needed, (4) you want instant response while work happens in background. NOT for: quick edits (<10s), single-shot commands, or simple tool calls." +metadata: + openclaw: + emoji: "πŸ”" + requires: + anyBins: ["python3"] +--- + +# Orchestrator + +Turn blocking operations into background daemons with instant response + completion callbacks. + +## The Problem + +Agents block on long tasks. You wait. Can't do anything else. + +**Before:** +``` +You: "Analyze this large dataset" +Agent: *analyzing... analyzing... analyzing...* (5 minutes) +You: 😴 *can't ask anything else* +``` + +**After:** +``` +You: "Analyze this large dataset" +Agent: "βœ… Started loop-001" (0.1 seconds) +You: "What's the weather?" *continues working* +Agent: "Sunny, 72Β°F" +Loop: *analyzing in background...* +Agent: "πŸ”” Analysis complete! [results]" +``` + +## Quick Start + +### 1. Create a loop + +```bash +cd ~/clawd +python3 orchestrator/create_loop.py \ + --id data-analysis \ + --mission "Process multiple datasets in parallel" \ + --tasks '[ + {"id": "t1", "name": "Analyze dataset A", "worker": "claude", "command": "Summarize trends in dataset A"}, + {"id": "t2", "name": "Analyze dataset B", "worker": "claude", "command": "Summarize trends in dataset B"} + ]' +``` + +This creates `~/clawd/loops/data-analysis/` with: +- `manifest.json` β€” loop state + config +- `inbox.jsonl` β€” commands from you β†’ loop +- `outbox.jsonl` β€” events from loop β†’ you +- `context.md` β€” mission + context layers + +### 2. Start the loop + +```bash +python3 orchestrator/launcher.py start data-analysis +``` + +Loop runs as independent daemon. You can close terminal, agent can restart, loop keeps running. + +### 3. Monitor progress + +```bash +# Check status +python3 orchestrator/launcher.py status data-analysis + +# Read manifest +cat ~/clawd/loops/data-analysis/manifest.json | jq '.status, .tasks[].status' + +# Watch outbox (live feed) +tail -f ~/clawd/loops/data-analysis/outbox.jsonl +``` + +### 4. Add tasks dynamically + +```bash +echo '{"type": "ADD_TASK", "task": {"id": "t3", "name": "Check API status", "worker": "shell", "command": "curl https://api.example.com/health"}}' \ + >> ~/clawd/loops/data-analysis/inbox.jsonl +``` + +### 5. Stop the loop + +```bash +python3 orchestrator/launcher.py stop data-analysis +``` + +## Core Concepts + +### Protocol: JSONL Files + +Communication happens via append-only JSONL files. Simple, debuggable, crash-safe. + +**Inbox** (you β†’ loop): +- `START` β€” begin execution +- `ADD_TASK` β€” add new task +- `CANCEL` β€” cancel a task +- `DECIDE` β€” answer a NEED_DECISION question +- `SHUTDOWN` β€” graceful stop + +**Outbox** (loop β†’ you): +- `TASK_STARTED` β€” task began +- `TASK_DONE` β€” task completed (success/fail) +- `PROGRESS` β€” update from running task +- `NEED_DECISION` β€” loop needs your input +- `ERROR` β€” something broke +- `ALL_DONE` β€” all tasks complete + +### Workers + +Four types of workers execute tasks: + +| Worker | Use For | Example | +|--------|---------|---------| +| `shell` | Shell commands, scripts | `docker system df`, `git status` | +| `opencode` | Code editing, builds | `opencode run "Fix bug #123"` | +| `claude` | Analysis, research, writing | "Summarize this dataset" | +| `python` | Inline Python scripts | Quick calculations, JSON processing | + +Each worker runs in a subprocess with timeout + retry support. + +### Context Layers + +Context flows through 4 layers (L0 β†’ L3): + +| Layer | Purpose | Example | +|-------|---------|---------| +| **L0** | Project overview | "Data Processing Pipeline" | +| **L1** | Loop mission | "Build 8-deliverable pipeline" | +| **L2** | Progress log | Auto-accumulated from completed tasks | +| **L3** | Task-specific | Individual task instructions | + +**L2 auto-accumulation:** +Each completed task adds: +- Task name + status +- Key outputs (stdout snippets, file changes) +- Decisions made + +Later tasks see earlier results automatically β€” no manual context passing. + +### Sub-Agents (auto_judge) + +When `auto_judge: true`, loop delegates decisions to sub-agents before escalating to you: + +| Role | Model | Purpose | +|------|-------|---------| +| **JUDGE** | sonnet | Simple yes/no decisions | +| **LOG_ANALYST** | gemini | Parse logs, extract errors | +| **REVIEWER** | sonnet | Code review, validation | +| **REPORTER** | gemini | Summarize results | + +**Decision flow:** +1. Task encounters `NEED_DECISION` +2. Loop asks JUDGE sub-agent first +3. If JUDGE is confident β†’ proceed +4. If JUDGE is uncertain β†’ escalate to you + +Saves your attention for hard problems. + +## Advanced Features + +### Dependencies (DAG Execution) + +Tasks can depend on other tasks. Loop builds a dependency graph and executes in order. + +```yaml +tasks: + - id: fetch + name: "Fetch data" + worker: shell + command: "curl https://api.example.com/data > data.json" + + - id: analyze + name: "Analyze data" + worker: python + command: | + import json + data = json.load(open('data.json')) + print(f"Found {len(data)} records") + depends_on: + - fetch # Waits for 'fetch' to complete + + - id: report + name: "Generate report" + worker: claude + command: "Summarize the analysis results" + depends_on: + - analyze +``` + +**Parallel execution:** Tasks without dependencies run concurrently (up to `max_concurrent_workers`). + +### Validation + +Tasks can validate their own output: + +```yaml +- id: build + name: "Build Docker image" + worker: shell + command: "docker build -t myapp ." + validate: + worker: shell + command: "docker images myapp:latest --format '{{.Size}}' | grep -v '0B'" +``` + +If validation fails, loop marks task as failed and optionally retries. + +### Retry & Timeout + +```yaml +- id: flaky-api + name: "Call flaky API" + worker: shell + command: "curl https://flaky.example.com/data" + timeout: 30 # Seconds + retry: 3 # Attempts + on_fail: warn # continue | warn | abort (default: abort) +``` + +### Dynamic Task Management + +Add tasks mid-flight: + +```bash +# Add a new task +echo '{"type": "ADD_TASK", "task": {...}}' >> inbox.jsonl + +# Cancel a running/pending task +echo '{"type": "CANCEL", "task_id": "t5"}' >> inbox.jsonl + +# Update a pending task +echo '{"type": "UPDATE_TASK", "task_id": "t3", "updates": {"timeout": 120}}' >> inbox.jsonl +``` + +Loop processes inbox events every 3 seconds. + +## YAML Config Reference + +Full task spec: + +```yaml +id: unique-task-id # Required: unique identifier +name: "Human-readable name" # Required: for logs/reports +worker: shell # Required: shell | opencode | claude | python +command: "echo hello" # Required: what to execute +workdir: ~/project # Optional: working directory (default: loop dir) +timeout: 300 # Optional: seconds (default: 600) +retry: 2 # Optional: attempts (default: 0) +depends_on: # Optional: list of task IDs + - other-task-id +validate: # Optional: validation step + worker: shell + command: "test -f output.txt" +on_fail: abort # Optional: abort | warn | continue +model: "openai/o3" # Optional: for 'claude' worker (default: sonnet) +``` + +Top-level loop config: + +```yaml +mission: "High-level goal" # Optional but recommended +max_concurrent_workers: 4 # Optional (default: 3) +auto_judge: true # Optional (default: false) +notify_on_complete: true # Optional: wake agent when done +context_layers: # Optional: structured context + L0_PROJECT: "Project name" + L1_MISSION: "Loop mission" +``` + +## Examples + +See `examples/` directory: +- **simple.yaml** β€” Two parallel checks (holidays + disk) β€” proven in loop-pilot-001 +- **stress-test.yaml** β€” 10 tasks with various delays β€” proven in loop-stress-001 + +Load an example: + +```bash +python3 orchestrator/create_loop.py --from-yaml examples/simple.yaml --id my-loop +python3 orchestrator/launcher.py start my-loop +``` + +## Troubleshooting + +### Loop doesn't wake agent when done + +**Symptom:** Tasks complete but no completion notification. + +**Cause:** `openclaw system event` command bug (sends doctor output instead of event). + +**Workaround:** Check `outbox.jsonl` manually: +```bash +tail ~/clawd/loops//outbox.jsonl | grep ALL_DONE +``` + +Track: https://github.com/openclaw/openclaw/issues/XXXX + +### Task fails immediately + +**Debug:** +1. Check task status: `jq '.tasks[] | select(.id=="t1")' manifest.json` +2. Read worker logs: `cat workers/t1.log` +3. Check outbox: `grep '"task_id":"t1"' outbox.jsonl` + +Common causes: +- Timeout too short +- Missing dependencies (command not found) +- Wrong workdir (file paths broken) + +### Loop zombie (runs forever) + +**Kill the daemon:** +```bash +python3 orchestrator/launcher.py stop +# If that fails: +cat ~/clawd/loops//.pid | xargs kill -9 +``` + +### Sub-agent judgment is wrong + +Turn off auto_judge and handle decisions manually: + +```bash +# In manifest.json, set: +"auto_judge": false +``` + +Loop will escalate all NEED_DECISION events to you via outbox. + +## Best Practices + +1. **Start small** β€” Test with 2-3 tasks before building complex pipelines +2. **Use dependencies** β€” Don't rely on task execution order (it's parallel!) +3. **Set realistic timeouts** β€” Allow 2-3x expected duration +4. **Validate critical steps** β€” Add validation for builds, deployments, API calls +5. **Monitor L2 context size** β€” If it grows huge, summarize completed phases +6. **One loop per workflow** β€” Don't reuse loop IDs (create fresh ones) + +## Integration with Agent Workflow + +**When agent receives a long-running request:** + +1. Agent creates loop (0.1s) +2. Agent replies: "βœ… Started loop-XXX, running in background" +3. User continues working (agent responsive) +4. Loop sends wake event when done +5. Agent reports results + +**Agent turn is minimal:** +- Write inbox.jsonl (add tasks) +- Read manifest.json (check status) +- NO polling, NO sleep, NO blocking + +**Loop handles:** +- Task execution +- Dependency resolution +- Retries +- Sub-agent consultation +- Wake events + +## Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Agent Session β”‚ ← Responds instantly +β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ Creates loop + ↓ +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Loop Daemon β”‚ ← Runs independently +β”‚ (launcher.py) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ Spawns workers + ↓ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Shell β”‚ Claude β”‚ Python β”‚ ← Execute tasks + β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + ↓ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ openclaw system β”‚ ← Wake agent + β”‚ event β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +**File Protocol:** +``` +loops// +β”œβ”€β”€ manifest.json ← State (loop status, task statuses) +β”œβ”€β”€ inbox.jsonl ← Commands (agent β†’ loop) +β”œβ”€β”€ outbox.jsonl ← Events (loop β†’ agent) +β”œβ”€β”€ context.md ← Mission + layers +└── workers/ ← Task logs + β”œβ”€β”€ t1.log + └── t2.log +``` + +## Requirements + +- Python 3.9+ +- OpenClaw 2026.1.10+ +- Standard library only (no pip installs) + +Workers require their respective tools: +- `opencode` worker β†’ OpenCode installed +- `claude` worker β†’ Claude available via sessions_spawn +- Shell/Python workers β†’ always available + +## License + +MIT β€” Use freely, modify as needed, share improvements. + +--- + +**Questions? Issues?** Open a PR or issue on the skills repo. + +Built with ❀️ by Storm ([@jshstorm](https://t.me/jshstorm)) diff --git a/skills/storm/orchestrator/examples/simple.yaml b/skills/storm/orchestrator/examples/simple.yaml new file mode 100644 index 00000000000..e28304ef8a3 --- /dev/null +++ b/skills/storm/orchestrator/examples/simple.yaml @@ -0,0 +1,25 @@ +# Simple Example: Check holidays + disk space in parallel + +mission: | + Check US holidays in 2026 and monitor Docker disk usage. + Two independent tasks that can run in parallel. + +context_layers: + L0_PROJECT: "Orchestrator simple example" + L1_MISSION: "Parallel checks for holidays and disk" + +tasks: + - id: t1 + name: "Check US holidays 2026" + worker: claude + command: "List all US federal holidays in 2026 with dates" + timeout: 60 + + - id: t2 + name: "Check Docker disk usage" + worker: shell + command: "docker system df -v" + timeout: 30 + +max_concurrent_workers: 2 +auto_judge: true diff --git a/skills/storm/orchestrator/examples/stress-test.yaml b/skills/storm/orchestrator/examples/stress-test.yaml new file mode 100644 index 00000000000..3ee9fe97dab --- /dev/null +++ b/skills/storm/orchestrator/examples/stress-test.yaml @@ -0,0 +1,79 @@ +# Stress Test Example: 40 tasks with dynamic additions/cancellations +# Based on loop-stress-001 successful test + +mission: | + Stress test the orchestrator with: + - 40 parallel tasks (10s-180s delays) + - Dynamic task additions during execution + - Task cancellations + - Agent remains responsive throughout + +context_layers: + L0_PROJECT: "Orchestrator Stress Test" + L1_MISSION: "Validate parallel execution + dynamic management" + +tasks: + # Initial 10 tasks + - id: t1 + name: "Task 1" + worker: shell + command: "sleep 10 && echo 'Task 1 done'" + timeout: 20 + + - id: t2 + name: "Task 2" + worker: shell + command: "sleep 15 && echo 'Task 2 done'" + timeout: 30 + + - id: t3 + name: "Task 3" + worker: shell + command: "sleep 20 && echo 'Task 3 done'" + timeout: 40 + + - id: t4 + name: "Task 4" + worker: shell + command: "sleep 25 && echo 'Task 4 done'" + timeout: 50 + + - id: t5 + name: "Task 5" + worker: shell + command: "sleep 30 && echo 'Task 5 done'" + timeout: 60 + + - id: t6 + name: "Task 6" + worker: claude + command: "What's 2+2?" + timeout: 60 + + - id: t7 + name: "Task 7" + worker: shell + command: "docker system df" + timeout: 30 + + - id: t8 + name: "Task 8" + worker: shell + command: "date && uptime" + timeout: 10 + + - id: t9 + name: "Task 9" + worker: shell + command: "sleep 45 && echo 'Task 9 done'" + timeout: 90 + + - id: t10 + name: "Task 10" + worker: shell + command: "sleep 60 && echo 'Task 10 done'" + timeout: 120 + +max_concurrent_workers: 5 +auto_judge: false # Manual decision for stress test observation +notify_on_complete: true diff --git a/skills/storm/orchestrator/scripts/agents.py b/skills/storm/orchestrator/scripts/agents.py new file mode 100644 index 00000000000..9a40e963995 --- /dev/null +++ b/skills/storm/orchestrator/scripts/agents.py @@ -0,0 +1,174 @@ +""" +μ„œλΈŒμ—μ΄μ „νŠΈ λ§€λ‹ˆμ € β€” 루프가 νŒλ‹¨/뢄석을 μœ„μž„ +openclaw system event + sessions_spawn νŒ¨ν„΄ ν™œμš© +""" +import subprocess +import json +import time +from pathlib import Path +from typing import Optional +from protocol import now_ts + + +class AgentRole: + """μ„œλΈŒμ—μ΄μ „νŠΈ μ—­ν•  프리셋""" + JUDGE = { + "name": "judge", + "model": "sonnet", + "description": "루프 νŒλ‹¨ 처리 β€” NEED_DECISION 응닡", + "system": ( + "λ„ˆλŠ” μ˜€μΌ€μŠ€νŠΈλ ˆμ΄ν„° λ£¨ν”„μ˜ νŒλ‹¨ 담당이닀. " + "manifest.jsonκ³Ό context.mdλ₯Ό 기반으둜 기술적 νŒλ‹¨μ„ 내렀라. " + "νŒλ‹¨ κ²°κ³ΌλŠ” λ°˜λ“œμ‹œ JSON으둜 좜λ ₯ν•˜λΌ: " + '{{"decision": "approve|reject|modify", "reason": "...", "detail": "..."}}' + ), + } + LOG_ANALYST = { + "name": "log-analyst", + "model": "gemini", + "description": "μ›Œμ»€ 둜그 뢄석 β€” 성곡/μ‹€νŒ¨/이슈 μš”μ•½", + "system": ( + "λ„ˆλŠ” 둜그 뢄석가닀. μ›Œμ»€ μ‹€ν–‰ 둜그λ₯Ό 읽고 ν•΅μ‹¬λ§Œ μš”μ•½ν•˜λΌ. " + "좜λ ₯ ν˜•μ‹: 1) 성곡/μ‹€νŒ¨ μ—¬λΆ€ 2) μ£Όμš” 이슈 3) λ‹€μŒ μ•‘μ…˜ μ œμ•ˆ" + ), + } + REVIEWER = { + "name": "reviewer", + "model": "sonnet", + "description": "μ½”λ“œ 리뷰 β€” μƒμ„±λœ 파일 ν’ˆμ§ˆ 체크", + "system": ( + "λ„ˆλŠ” μ½”λ“œ 리뷰어닀. νŒŒμΌμ„ 읽고 버그, λ³΄μ•ˆ 이슈, κ°œμ„ μ μ„ 찾아라. " + "심각도: CRITICAL / WARNING / INFO 둜 λΆ„λ₯˜. κ°„κ²°ν•˜κ²Œ." + ), + } + REPORTER = { + "name": "reporter", + "model": "gemini", + "description": "λ³΄κ³ μ„œ μž‘μ„± β€” 진행상황 μš”μ•½", + "system": ( + "λ„ˆλŠ” λ³΄κ³ μ„œ μž‘μ„±μžλ‹€. manifest와 contextλ₯Ό 읽고 " + "진행상황을 ν•œκ΅­μ–΄λ‘œ κ°„κ²°ν•˜κ²Œ μš”μ•½ν•˜λΌ. μŠ€ν†°λ‹˜κ»˜ 보고할 ν˜•μ‹μœΌλ‘œ." + ), + } + + +class SubAgentManager: + """μ„œλΈŒμ—μ΄μ „νŠΈ 호좜 관리""" + + def __init__(self, loop_dir: Path): + self.loop_dir = loop_dir + self.results_dir = loop_dir / "agent_results" + self.results_dir.mkdir(exist_ok=True) + + def call_agent( + self, + role: dict, + task_prompt: str, + context_files: list[str] = None, + timeout: int = 120, + ) -> dict: + """ + μ„œλΈŒμ—μ΄μ „νŠΈ 동기 호좜 (openclaw sessions_spawn λŒ€μ‹  claude -p μ‚¬μš©) + + Returns: + {"success": bool, "output": str, "elapsed": float} + """ + # μ»¨ν…μŠ€νŠΈ 파일 λ‚΄μš© μˆ˜μ§‘ + file_context = "" + for fpath in (context_files or []): + p = Path(fpath) if Path(fpath).is_absolute() else self.loop_dir / fpath + if p.exists(): + content = p.read_text(errors="replace")[:5000] + file_context += f"\n### {p.name}\n```\n{content}\n```\n" + + full_prompt = f"""{role['system']} + +## μ»¨ν…μŠ€νŠΈ +{file_context} + +## μš”μ²­ +{task_prompt} +""" + + result_file = self.results_dir / f"{role['name']}-{int(time.time())}.txt" + + start = time.time() + try: + proc = subprocess.run( + [ + "claude", "-p", + "--model", role.get("model_full", f"anthropic/claude-sonnet-4-5"), + "--output-format", "text", + full_prompt, + ], + capture_output=True, + text=True, + timeout=timeout, + cwd=str(self.loop_dir), + ) + elapsed = time.time() - start + output = proc.stdout.strip() if proc.returncode == 0 else proc.stderr.strip() + + # κ²°κ³Ό μ €μž₯ + result_file.write_text(output) + + return { + "success": proc.returncode == 0, + "output": output, + "elapsed": round(elapsed, 1), + "result_file": str(result_file), + } + except subprocess.TimeoutExpired: + return { + "success": False, + "output": f"Timeout after {timeout}s", + "elapsed": timeout, + } + except Exception as e: + return { + "success": False, + "output": str(e), + "elapsed": time.time() - start, + } + + def judge(self, context: str, options: list[str], ref: str) -> dict: + """NEED_DECISION νŒλ‹¨ μš”μ²­""" + prompt = f"""νŒλ‹¨ μš”μ²­ (ref: {ref}) + +상황: {context} +선택지: {json.dumps(options, ensure_ascii=False)} + +manifest.jsonκ³Ό context.mdλ₯Ό μ°Έκ³ ν•˜μ—¬ μ΅œμ„ μ˜ νŒλ‹¨μ„ 내렀라. +""" + return self.call_agent( + role=AgentRole.JUDGE, + task_prompt=prompt, + context_files=["manifest.json", "context.md"], + ) + + def analyze_log(self, task_id: str, log_file: str) -> dict: + """μ›Œμ»€ 둜그 뢄석""" + prompt = f"νƒœμŠ€ν¬ {task_id}의 μ‹€ν–‰ 둜그λ₯Ό λΆ„μ„ν•˜λΌ." + return self.call_agent( + role=AgentRole.LOG_ANALYST, + task_prompt=prompt, + context_files=[log_file], + ) + + def review_files(self, file_paths: list[str], task_name: str) -> dict: + """μƒμ„±λœ 파일 리뷰""" + prompt = f"'{task_name}' νƒœμŠ€ν¬λ‘œ 생성/μˆ˜μ •λœ νŒŒμΌλ“€μ„ λ¦¬λ·°ν•˜λΌ." + return self.call_agent( + role=AgentRole.REVIEWER, + task_prompt=prompt, + context_files=file_paths, + ) + + def summarize_progress(self) -> dict: + """전체 진행상황 μš”μ•½""" + prompt = "ν˜„μž¬ λ£¨ν”„μ˜ 전체 진행상황을 μš”μ•½ν•˜λΌ. μ™„λ£Œ/진행쀑/λŒ€κΈ° νƒœμŠ€ν¬ 수, μ£Όμš” 이슈, μ˜ˆμƒ 남은 μ‹œκ°„." + return self.call_agent( + role=AgentRole.REPORTER, + task_prompt=prompt, + context_files=["manifest.json", "context.md"], + ) diff --git a/skills/storm/orchestrator/scripts/bridge.py b/skills/storm/orchestrator/scripts/bridge.py new file mode 100644 index 00000000000..145ce8abf72 --- /dev/null +++ b/skills/storm/orchestrator/scripts/bridge.py @@ -0,0 +1,134 @@ +""" +OpenClaw λΈŒλ¦Ώμ§€ β€” 루프 ↔ μŸˆλΉ„μŠ€ μ—°κ²° +""" +import subprocess +import json +from pathlib import Path +from typing import Optional, List +from protocol import Message, OutboxType, make_msg, now_ts + + +class OpenClawBridge: + """루프가 μŸˆλΉ„μŠ€λ₯Ό 깨우고 λ©”μ‹œμ§€λ₯Ό κ΅ν™˜ν•˜λŠ” λΈŒλ¦Ώμ§€""" + + def __init__(self, loop_dir: Path, wake_events: Optional[List[str]] = None): + self.loop_dir = loop_dir + self.inbox_path = loop_dir / "inbox.jsonl" + self.outbox_path = loop_dir / "outbox.jsonl" + self.context_path = loop_dir / "context.md" + self.wake_events = wake_events or [ + OutboxType.NEED_DECISION.value, + OutboxType.ERROR.value, + OutboxType.ALL_DONE.value, + ] + self._inbox_offset = 0 # λ§ˆμ§€λ§‰μœΌλ‘œ 읽은 μœ„μΉ˜ + + # 파일 μ΄ˆκΈ°ν™” + for p in [self.inbox_path, self.outbox_path]: + if not p.exists(): + p.touch() + if not self.context_path.exists(): + self.context_path.write_text(f"# Loop Context\nCreated: {now_ts()}\n\n") + + # ─── Outbox (루프 β†’ μŸˆλΉ„μŠ€) ─── + + def emit(self, msg: Message): + """outbox에 λ©”μ‹œμ§€ μΆ”κ°€ + ν•„μš”μ‹œ μŸˆλΉ„μŠ€ 깨움""" + with open(self.outbox_path, "a") as f: + f.write(msg.to_json() + "\n") + + # context.md에도 기둝 + self._append_context(f"루프", f"[{msg.type}] {json.dumps(msg.data, ensure_ascii=False)[:200]}") + + # wake 이벀트면 μŸˆλΉ„μŠ€ 깨움 + if msg.type in self.wake_events: + self._wake_jarvis(msg) + + def emit_task_done(self, task_id: str, summary: str): + self.emit(make_msg(OutboxType.TASK_DONE.value, task_id=task_id, summary=summary)) + + def emit_task_started(self, task_id: str, name: str): + self.emit(make_msg(OutboxType.TASK_STARTED.value, task_id=task_id, name=name)) + + def emit_progress(self, task_id: str, pct: int, msg: str): + self.emit(make_msg(OutboxType.PROGRESS.value, task_id=task_id, pct=pct, msg=msg)) + + def emit_need_decision(self, ref: str, context: str, options: list[str] = None): + self.emit(make_msg( + OutboxType.NEED_DECISION.value, + ref=ref, context=context, options=options or [] + )) + + def emit_error(self, task_id: str, msg: str, log_file: str = ""): + self.emit(make_msg(OutboxType.ERROR.value, task_id=task_id, msg=msg, log=log_file)) + + def emit_all_done(self, summary: str): + self.emit(make_msg(OutboxType.ALL_DONE.value, summary=summary)) + + # ─── Inbox (μŸˆλΉ„μŠ€ β†’ 루프) ─── + + def read_inbox(self) -> list[Message]: + """μƒˆ inbox λ©”μ‹œμ§€ 읽기 (이전에 읽은 이후 μΆ”κ°€λœ κ²ƒλ§Œ)""" + if not self.inbox_path.exists(): + return [] + + messages = [] + with open(self.inbox_path, "r") as f: + f.seek(self._inbox_offset) + for line in f: + line = line.strip() + if line: + try: + messages.append(Message.from_json(line)) + except Exception: + pass + self._inbox_offset = f.tell() + return messages + + # ─── Context ─── + + def _append_context(self, who: str, text: str): + ts = now_ts().split("T")[1][:5] # HH:MM + with open(self.context_path, "a") as f: + f.write(f"\n## [{ts}] {who}\n{text}\n") + + def append_context_jarvis(self, text: str): + """μŸˆλΉ„μŠ€κ°€ context에 기둝할 λ•Œ""" + self._append_context("μŸˆλΉ„μŠ€", text) + + # ─── Wake ─── + + def _wake_jarvis(self, msg: Message, max_retries: int = 3): + """openclaw system event둜 μŸˆλΉ„μŠ€ 깨움 (μž¬μ‹œλ„ 포함)""" + loop_id = self.loop_dir.name + summary = msg.data.get("summary", msg.data.get("context", msg.data.get("msg", ""))) + text = f"[{loop_id}] {msg.type}: {summary}"[:500] + + for attempt in range(max_retries): + try: + result = subprocess.run( + ["openclaw", "system", "event", "--text", text, "--mode", "now"], + capture_output=True, text=True, timeout=15, + ) + if result.returncode == 0: + print(f"[Bridge] Wake sent (attempt {attempt+1}): {text[:80]}") + return + else: + print(f"[Bridge] Wake attempt {attempt+1} failed: {result.stderr[:100]}") + except subprocess.TimeoutExpired: + print(f"[Bridge] Wake attempt {attempt+1} timed out") + except Exception as e: + print(f"[Bridge] Wake attempt {attempt+1} error: {e}") + + import time + time.sleep(2) + + # λͺ¨λ“  μž¬μ‹œλ„ μ‹€νŒ¨ β†’ κ²°κ³Ό νŒŒμΌμ— 기둝 (μŸˆλΉ„μŠ€κ°€ λ‹€μŒ 턴에 확인) + wake_fail_path = self.loop_dir / "wake_pending.json" + wake_fail_path.write_text(json.dumps({ + "ts": now_ts(), + "type": msg.type, + "text": text, + "retries_exhausted": True, + }, ensure_ascii=False)) + print(f"[Bridge] Wake failed after {max_retries} retries, saved to wake_pending.json") diff --git a/skills/storm/orchestrator/scripts/create_loop.py b/skills/storm/orchestrator/scripts/create_loop.py new file mode 100644 index 00000000000..5b765cd223a --- /dev/null +++ b/skills/storm/orchestrator/scripts/create_loop.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +""" +루프 생성 헬퍼 β€” μŸˆλΉ„μŠ€κ°€ 호좜 +manifest.json + 디렉토리 ꡬ쑰 생성 ν›„ 루프 ν”„λ‘œμ„ΈμŠ€ μ‹œμž‘ +""" +import json +import sys +from pathlib import Path +from datetime import datetime + +sys.path.insert(0, str(Path(__file__).parent)) +from protocol import LoopManifest, Task, InboxType, now_ts, make_msg + + +def create_loop( + loop_id: str, + description: str, + tasks: list[dict], + workdir: str = "", + config: dict = None, +) -> Path: + """ + 루프 디렉토리 생성 + manifest μž‘μ„± + + Args: + loop_id: 루프 ID (예: loop-us-pipeline-20260226) + description: μ„€λͺ… + tasks: [{"id": "t001", "name": "...", "worker": "shell", "command": "..."}] + workdir: κΈ°λ³Έ μž‘μ—… 디렉토리 + config: 루프 μ„€μ • (tick_interval, max_concurrent_workers λ“±) + + Returns: + μƒμ„±λœ 루프 디렉토리 경둜 + """ + loops_dir = Path.home() / "clawd" / "loops" + loop_dir = loops_dir / loop_id + loop_dir.mkdir(parents=True, exist_ok=True) + (loop_dir / "workers").mkdir(exist_ok=True) + + task_objs = [] + for t in tasks: + task = Task( + id=t["id"], + name=t["name"], + worker=t.get("worker", "shell"), + command=t.get("command", ""), + workdir=t.get("workdir", workdir), + depends_on=t.get("depends_on", []), + ) + task_objs.append(task) + + manifest = LoopManifest( + id=loop_id, + created=now_ts(), + description=description, + workdir=workdir, + tasks=task_objs, + config=config or { + "tick_interval": 5, + "wake_on": ["NEED_DECISION", "ERROR", "ALL_DONE"], + "max_concurrent_workers": 3, + }, + ) + + manifest_path = loop_dir / "manifest.json" + manifest_path.write_text(json.dumps(manifest.to_dict(), ensure_ascii=False, indent=2)) + + # inbox에 START λ©”μ‹œμ§€ + inbox_path = loop_dir / "inbox.jsonl" + start_msg = make_msg(InboxType.START.value) + inbox_path.write_text(start_msg.to_json() + "\n") + + # context.md μ΄ˆκΈ°ν™” + context_path = loop_dir / "context.md" + context_path.write_text(f"# {loop_id}\n{description}\nCreated: {now_ts()}\n\n") + + print(f"βœ… Loop created: {loop_dir}") + print(f" Tasks: {len(task_objs)}") + print(f" Start: python3 ~/clawd/orchestrator/loop.py {loop_dir}") + + return loop_dir + + +if __name__ == "__main__": + # CLI μ‚¬μš© μ˜ˆμ‹œ + if len(sys.argv) > 1 and sys.argv[1] == "--example": + create_loop( + loop_id="loop-test-001", + description="ν…ŒμŠ€νŠΈ 루프", + tasks=[ + {"id": "t001", "name": "Hello", "worker": "shell", "command": "echo hello && sleep 2 && echo done"}, + {"id": "t002", "name": "World", "worker": "shell", "command": "echo world", "depends_on": ["t001"]}, + ], + ) diff --git a/skills/storm/orchestrator/scripts/launcher.py b/skills/storm/orchestrator/scripts/launcher.py new file mode 100644 index 00000000000..69f43799157 --- /dev/null +++ b/skills/storm/orchestrator/scripts/launcher.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +""" +루프 런처 β€” λ‚΄ ν”„λ‘œμ„ΈμŠ€ νŠΈλ¦¬μ™€ μ™„μ „ λΆ„λ¦¬ν•˜μ—¬ 루프 μ‹€ν–‰ +nohup + double-fork둜 데λͺ¬ν™” +""" +import os +import sys +import json +import subprocess +from pathlib import Path + +ORCHESTRATOR_DIR = Path(__file__).parent +LOOPS_DIR = Path.home() / "clawd" / "loops" + + +def launch(loop_id: str) -> dict: + """ + 루프λ₯Ό 독립 ν”„λ‘œμ„ΈμŠ€λ‘œ μ‹œμž‘. + μŸˆλΉ„μŠ€μ˜ exec ν„΄κ³Ό μ™„μ „ 뢄리됨. + + Returns: + {"success": bool, "pid": int, "log": str} + """ + loop_dir = LOOPS_DIR / loop_id + if not loop_dir.exists(): + return {"success": False, "error": f"Loop dir not found: {loop_dir}"} + + manifest_path = loop_dir / "manifest.json" + if not manifest_path.exists(): + return {"success": False, "error": "manifest.json not found"} + + log_path = loop_dir / "loop.log" + pid_path = loop_dir / "loop.pid" + loop_script = ORCHESTRATOR_DIR / "loop.py" + + # 이미 μ‹€ν–‰ 쀑인지 확인 + if pid_path.exists(): + old_pid = int(pid_path.read_text().strip()) + try: + os.kill(old_pid, 0) # ν”„λ‘œμ„ΈμŠ€ 쑴재 확인 + return {"success": False, "error": f"Already running (pid={old_pid})"} + except OSError: + pid_path.unlink() # stale pid 정리 + + # nohup + disown으둜 μ™„μ „ 독립 ν”„λ‘œμ„ΈμŠ€ μ‹œμž‘ (macOS ν˜Έν™˜) + cmd = ( + f'nohup python3 "{loop_script}" "{loop_dir}" ' + f'> "{log_path}" 2>&1 & echo $!' + ) + + result = subprocess.run( + cmd, shell=True, capture_output=True, text=True, timeout=5 + ) + + if result.returncode != 0: + return {"success": False, "error": result.stderr.strip()} + + pid = int(result.stdout.strip()) + pid_path.write_text(str(pid)) + + return { + "success": True, + "pid": pid, + "loop_id": loop_id, + "log": str(log_path), + } + + +def stop(loop_id: str) -> dict: + """루프 쀑지""" + loop_dir = LOOPS_DIR / loop_id + pid_path = loop_dir / "loop.pid" + + if not pid_path.exists(): + return {"success": False, "error": "No pid file β€” not running?"} + + pid = int(pid_path.read_text().strip()) + try: + os.kill(pid, 15) # SIGTERM + pid_path.unlink() + return {"success": True, "pid": pid, "action": "stopped"} + except OSError as e: + pid_path.unlink() + return {"success": False, "error": str(e)} + + +def status(loop_id: str) -> dict: + """루프 μƒνƒœ 확인""" + loop_dir = LOOPS_DIR / loop_id + if not loop_dir.exists(): + return {"error": f"Loop not found: {loop_id}"} + + manifest = json.loads((loop_dir / "manifest.json").read_text()) + + # ν”„λ‘œμ„ΈμŠ€ μ‚΄μ•„μžˆλŠ”μ§€ + pid_path = loop_dir / "loop.pid" + running = False + pid = None + if pid_path.exists(): + pid = int(pid_path.read_text().strip()) + try: + os.kill(pid, 0) + running = True + except OSError: + running = False + + tasks = manifest.get("tasks", []) + summary = {} + for t in tasks: + s = t.get("status", "unknown") + summary[s] = summary.get(s, 0) + 1 + + return { + "loop_id": loop_id, + "process_alive": running, + "pid": pid, + "loop_status": manifest.get("status", "unknown"), + "tasks": summary, + "total": len(tasks), + } + + +def list_loops() -> list[dict]: + """λͺ¨λ“  루프 λͺ©λ‘""" + if not LOOPS_DIR.exists(): + return [] + results = [] + for d in sorted(LOOPS_DIR.iterdir()): + if d.is_dir() and (d / "manifest.json").exists(): + results.append(status(d.name)) + return results + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage:") + print(" python3 launcher.py start ") + print(" python3 launcher.py stop ") + print(" python3 launcher.py status ") + print(" python3 launcher.py list") + sys.exit(1) + + action = sys.argv[1] + + if action == "list": + for l in list_loops(): + alive = "🟒" if l.get("process_alive") else "⚫" + print(f"{alive} {l['loop_id']}: {l['loop_status']} β€” {l.get('tasks',{})}") + sys.exit(0) + + if len(sys.argv) < 3: + print("Error: loop_id required") + sys.exit(1) + + loop_id = sys.argv[2] + + if action == "start": + r = launch(loop_id) + print(json.dumps(r, indent=2)) + elif action == "stop": + r = stop(loop_id) + print(json.dumps(r, indent=2)) + elif action == "status": + r = status(loop_id) + print(json.dumps(r, indent=2)) + else: + print(f"Unknown action: {action}") diff --git a/skills/storm/orchestrator/scripts/loop.py b/skills/storm/orchestrator/scripts/loop.py new file mode 100644 index 00000000000..7a2b0e5f13b --- /dev/null +++ b/skills/storm/orchestrator/scripts/loop.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +""" +μ˜€μΌ€μŠ€νŠΈλ ˆμ΄ν„° 메인 루프 +μŸˆλΉ„μŠ€κ°€ μƒμ„±ν•œ νƒœμŠ€ν¬λ₯Ό λΉ„λ™κΈ°λ‘œ μ‹€ν–‰ν•˜κ³  κ²°κ³Όλ₯Ό λ³΄κ³ ν•œλ‹€. + +Usage: + python3 loop.py + python3 loop.py ~/clawd/loops/loop-us-pipeline-20260226-1630 +""" +import asyncio +import json +import signal +import sys +from pathlib import Path + +# orchestrator λͺ¨λ“ˆ 경둜 +sys.path.insert(0, str(Path(__file__).parent)) + +from protocol import ( + LoopManifest, Task, TaskStatus, + InboxType, OutboxType, Message, now_ts +) +from workers import WorkerManager +from bridge import OpenClawBridge +from agents import SubAgentManager + + +class OrchestratorLoop: + def __init__(self, loop_dir: Path): + self.loop_dir = loop_dir + self.manifest_path = loop_dir / "manifest.json" + self.manifest: LoopManifest = self._load_manifest() + self.bridge = OpenClawBridge( + loop_dir, + wake_events=self.manifest.config.get("wake_on", [ + OutboxType.NEED_DECISION.value, + OutboxType.ERROR.value, + OutboxType.ALL_DONE.value, + ]) + ) + self.workers = WorkerManager( + loop_dir, + max_concurrent=self.manifest.config.get("max_concurrent_workers", 3) + ) + self.agents = SubAgentManager(loop_dir) + self.auto_judge = self.manifest.config.get("auto_judge", True) + self.running = False + self.tick_interval = self.manifest.config.get("tick_interval", 5) + + def _load_manifest(self) -> LoopManifest: + if not self.manifest_path.exists(): + print(f"[Loop] manifest.json not found in {self.loop_dir}") + sys.exit(1) + data = json.loads(self.manifest_path.read_text()) + return LoopManifest.from_dict(data) + + def _save_manifest(self): + self.manifest_path.write_text( + json.dumps(self.manifest.to_dict(), ensure_ascii=False, indent=2) + ) + + def _get_task(self, task_id: str) -> Task | None: + for t in self.manifest.tasks: + if t.id == task_id: + return t + return None + + # ─── μŠ€μΌ€μ€„λ§ ─── + + def _get_ready_tasks(self) -> list[Task]: + """μ‹€ν–‰ κ°€λŠ₯ν•œ νƒœμŠ€ν¬ (pending + μ˜μ‘΄μ„± μΆ©μ‘±)""" + done_ids = {t.id for t in self.manifest.tasks if t.status == TaskStatus.DONE.value} + ready = [] + for t in self.manifest.tasks: + if t.status != TaskStatus.PENDING.value: + continue + if all(dep in done_ids for dep in t.depends_on): + ready.append(t) + return ready + + # ─── Inbox 처리 ─── + + async def _handle_inbox(self): + messages = self.bridge.read_inbox() + for msg in messages: + await self._handle_inbox_msg(msg) + + async def _handle_inbox_msg(self, msg: Message): + t = msg.type + data = msg.data + + if t == InboxType.START.value: + self.manifest.status = "running" + print(f"[Loop] START received") + + elif t == InboxType.DECIDE.value: + ref = data.get("ref", "") + # νŒλ‹¨ 결과둜 blocked νƒœμŠ€ν¬ ν•΄μ œ + for task in self.manifest.tasks: + if task.status == TaskStatus.BLOCKED.value: + task.status = TaskStatus.PENDING.value + print(f"[Loop] Task {task.id} unblocked by decision {ref}") + + elif t == InboxType.ADD_TASK.value: + task_data = data.get("task", {}) + task = Task(**task_data) + self.manifest.tasks.append(task) + print(f"[Loop] Task added: {task.id} - {task.name}") + + elif t == InboxType.UPDATE_TASK.value: + task_id = data.get("task_id", "") + updates = data.get("updates", {}) + task = self._get_task(task_id) + if task: + for k, v in updates.items(): + if hasattr(task, k): + setattr(task, k, v) + + elif t == InboxType.CANCEL.value: + task_id = data.get("task_id", "") + await self.workers.kill_task(task_id) + task = self._get_task(task_id) + if task: + task.status = TaskStatus.CANCELLED.value + + elif t == InboxType.SHUTDOWN.value: + print(f"[Loop] SHUTDOWN received") + self.running = False + await self.workers.kill_all() + + self._save_manifest() + + # ─── 메인 ν‹± ─── + + async def _tick(self): + """ν•œ 번의 루프 ν‹±""" + # 1. inbox 처리 + await self._handle_inbox() + + if not self.running: + return + + # 2. μ™„λ£Œλœ μ›Œμ»€ μˆ˜μ§‘ + completed = await self.workers.poll_all() + for task_id, rc, log_tail in completed: + task = self._get_task(task_id) + if not task: + continue + + if rc == 0: + # 검증 단계 + if task.validate: + valid = await self._run_validate(task) + if not valid: + await self._handle_task_failure(task, "validation failed") + continue + + summary = log_tail.splitlines()[-1] if log_tail.strip() else "μ™„λ£Œ" + task.result_summary = summary[:200] + self.bridge.emit_task_done(task_id, task.result_summary) + print(f"[Loop] βœ… {task_id} done: {task.result_summary[:80]}") + else: + await self._handle_task_failure(task, log_tail[-500:] if log_tail else f"exit {rc}") + + # 3. μƒˆ νƒœμŠ€ν¬ μ‹œμž‘ + ready = self._get_ready_tasks() + for task in ready: + if not self.workers.can_start(): + break + wp = await self.workers.start_task(task) + self.bridge.emit_task_started(task.id, task.name) + print(f"[Loop] πŸš€ {task.id} started: {task.name} (pid={task.pid})") + + # 4. μ„œλΈŒμ—μ΄μ „νŠΈ μžλ™ νŒλ‹¨ (NEED_DECISION이 있고 auto_judgeκ°€ μΌœμ Έμžˆμ„ λ•Œ) + if self.auto_judge: + blocked = [t for t in self.manifest.tasks if t.status == TaskStatus.BLOCKED.value] + # outboxμ—μ„œ 아직 미응닡인 NEED_DECISION 확인 + # (κ°„λž΅ν™”: blocked νƒœμŠ€ν¬κ°€ 있으면 μ„œλΈŒμ—μ΄μ „νŠΈμ— μœ„μž„) + for task in blocked: + await self._auto_judge_task(task) + + # 5. 전체 μ™„λ£Œ 체크 + statuses = [t.status for t in self.manifest.tasks] + if all(s in (TaskStatus.DONE.value, TaskStatus.CANCELLED.value, TaskStatus.FAILED.value) for s in statuses): + done_count = statuses.count(TaskStatus.DONE.value) + fail_count = statuses.count(TaskStatus.FAILED.value) + self.manifest.status = "done" + self.bridge.emit_all_done(f"{done_count} done, {fail_count} failed, total {len(statuses)}") + self.running = False + + self._save_manifest() + + # ─── 검증 + μ‹€νŒ¨ 처리 + μžλ™ νŒλ‹¨ ─── + + async def _run_validate(self, task: Task) -> bool: + """νƒœμŠ€ν¬ μ™„λ£Œ ν›„ 검증 λͺ…λ Ή μ‹€ν–‰""" + try: + proc = await asyncio.create_subprocess_shell( + task.validate, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=task.workdir or None, + ) + await asyncio.wait_for(proc.wait(), timeout=30) + if proc.returncode == 0: + print(f"[Loop] βœ” {task.id} validation passed") + return True + else: + print(f"[Loop] ✘ {task.id} validation failed (exit {proc.returncode})") + return False + except Exception as e: + print(f"[Loop] ✘ {task.id} validation error: {e}") + return False + + async def _handle_task_failure(self, task: Task, error: str): + """νƒœμŠ€ν¬ μ‹€νŒ¨ 처리 β€” μž¬μ‹œλ„ / μŠ€ν‚΅ / μŸˆλΉ„μŠ€ 깨움""" + task.error = error[:500] + + if task.on_fail == "retry" and task.retry > 0: + task.retry -= 1 + task.status = TaskStatus.PENDING.value + task.error = "" + print(f"[Loop] πŸ”„ {task.id} retrying ({task.max_retry - task.retry}/{task.max_retry})") + return + + if task.on_fail == "skip": + task.status = TaskStatus.CANCELLED.value + print(f"[Loop] ⏭ {task.id} skipped") + return + + # κΈ°λ³Έ: wake jarvis + task.status = TaskStatus.FAILED.value + self.bridge.emit_error(task.id, error[:200], f"workers/{task.id}.log") + print(f"[Loop] ❌ {task.id} failed: {error[:80]}") + + async def _auto_judge_task(self, task: Task): + """μ„œλΈŒμ—μ΄μ „νŠΈλ‘œ μžλ™ νŒλ‹¨""" + print(f"[Loop] πŸ€– Auto-judging blocked task: {task.id}") + + result = self.agents.judge( + context=f"νƒœμŠ€ν¬ '{task.name}'이 λΈ”λ‘œν‚Ήλ¨. μ—λŸ¬: {task.error}", + options=["proceed", "skip", "escalate"], + ref=task.id, + ) + + if result["success"]: + output = result["output"] + print(f"[Loop] πŸ€– Judge result for {task.id}: {output[:100]}") + + # νŒλ‹¨ κ²°κ³Όλ₯Ό context에 기둝 + self.bridge._append_context( + "μ„œλΈŒμ—μ΄μ „νŠΈ(judge)", + f"νƒœμŠ€ν¬ {task.id} νŒλ‹¨: {output[:300]}" + ) + + # JSON νŒŒμ‹± μ‹œλ„ + try: + import re + json_match = re.search(r'\{.*\}', output, re.DOTALL) + if json_match: + decision = json.loads(json_match.group()) + d = decision.get("decision", "") + if d == "approve" or d == "proceed": + task.status = TaskStatus.PENDING.value + task.error = "" + print(f"[Loop] πŸ€– {task.id} β†’ unblocked by judge") + elif d == "reject" or d == "skip": + task.status = TaskStatus.CANCELLED.value + print(f"[Loop] πŸ€– {task.id} β†’ skipped by judge") + else: + # escalate to jarvis + self.bridge.emit_need_decision( + ref=task.id, + context=f"μ„œλΈŒμ—μ΄μ „νŠΈ νŒλ‹¨ λΆˆν™•μ‹€: {output[:200]}", + options=["approve", "reject", "modify"] + ) + except Exception: + # νŒŒμ‹± μ‹€νŒ¨ β†’ μŸˆλΉ„μŠ€μ— μ—μŠ€μ»¬λ ˆμ΄μ…˜ + self.bridge.emit_need_decision( + ref=task.id, + context=f"μžλ™ νŒλ‹¨ μ‹€νŒ¨, μˆ˜λ™ νŒλ‹¨ ν•„μš”: {output[:200]}", + options=["approve", "reject"] + ) + else: + # μ„œλΈŒμ—μ΄μ „νŠΈ 호좜 μ‹€νŒ¨ β†’ μŸˆλΉ„μŠ€μ— μ—μŠ€μ»¬λ ˆμ΄μ…˜ + self.bridge.emit_need_decision( + ref=task.id, + context=f"μ„œλΈŒμ—μ΄μ „νŠΈ 호좜 μ‹€νŒ¨: {result['output'][:200]}", + options=["approve", "reject"] + ) + + # ─── μ‹€ν–‰ ─── + + async def run(self): + """메인 루프 μ‹€ν–‰""" + self.running = True + self.manifest.status = "running" + self._save_manifest() + + print(f"[Loop] === {self.manifest.id} ===") + print(f"[Loop] Tasks: {len(self.manifest.tasks)}") + print(f"[Loop] Tick interval: {self.tick_interval}s") + print(f"[Loop] Max workers: {self.workers.max_concurrent}") + print(f"[Loop] Starting...\n") + + # μ‹œκ·Έλ„ 핸듀링 + loop = asyncio.get_event_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, lambda: asyncio.create_task(self._shutdown())) + + while self.running: + try: + await self._tick() + await asyncio.sleep(self.tick_interval) + except Exception as e: + print(f"[Loop] Tick error: {e}") + self.bridge.emit_error("loop", str(e)) + await asyncio.sleep(self.tick_interval) + + # 클린업 + await self.workers.kill_all() + self.manifest.status = "done" + self._save_manifest() + print(f"\n[Loop] === Finished ===") + + async def _shutdown(self): + print(f"\n[Loop] Shutdown signal received") + self.running = False + await self.workers.kill_all() + + +async def main(): + if len(sys.argv) < 2: + print("Usage: python3 loop.py ") + print(" e.g. python3 loop.py ~/clawd/loops/loop-test-001") + sys.exit(1) + + loop_dir = Path(sys.argv[1]).expanduser().resolve() + if not loop_dir.exists(): + print(f"Error: {loop_dir} does not exist") + sys.exit(1) + + orch = OrchestratorLoop(loop_dir) + await orch.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/skills/storm/orchestrator/scripts/protocol.py b/skills/storm/orchestrator/scripts/protocol.py new file mode 100644 index 00000000000..a8841f2a338 --- /dev/null +++ b/skills/storm/orchestrator/scripts/protocol.py @@ -0,0 +1,136 @@ +""" +μ˜€μΌ€μŠ€νŠΈλ ˆμ΄ν„° 톡신 ν”„λ‘œν† μ½œ +μŸˆλΉ„μŠ€ ↔ 루프 κ°„ JSONL 기반 λ©”μ‹œμ§• +""" +from dataclasses import dataclass, field, asdict +from enum import Enum +from typing import Optional, List, Dict, Any +from datetime import datetime +import json + + +# ─── 이벀트 νƒ€μž… ─── + +class InboxType(str, Enum): + """μŸˆλΉ„μŠ€ β†’ 루프""" + START = "START" + DECIDE = "DECIDE" + ADD_TASK = "ADD_TASK" + UPDATE_TASK = "UPDATE_TASK" + CANCEL = "CANCEL" + SHUTDOWN = "SHUTDOWN" + + +class OutboxType(str, Enum): + """루프 β†’ μŸˆλΉ„μŠ€""" + TASK_STARTED = "TASK_STARTED" + TASK_DONE = "TASK_DONE" + PROGRESS = "PROGRESS" + NEED_DECISION = "NEED_DECISION" + ERROR = "ERROR" + ALL_DONE = "ALL_DONE" + LOG = "LOG" + + +class TaskStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + DONE = "done" + FAILED = "failed" + CANCELLED = "cancelled" + BLOCKED = "blocked" # νŒλ‹¨ λŒ€κΈ° + + +class WorkerType(str, Enum): + SHELL = "shell" # λ‹¨μˆœ shell λͺ…λ Ή + OPENCODE = "opencode" # opencode run + CLAUDE = "claude" # claude -p + PYTHON = "python" # python script + + +# ─── 데이터 λͺ¨λΈ ─── + +@dataclass +class Task: + id: str + name: str + worker: str = "shell" + command: str = "" + workdir: str = "" + depends_on: List[str] = field(default_factory=list) + status: str = TaskStatus.PENDING.value + pid: Optional[int] = None + result_summary: str = "" + error: str = "" + started_at: Optional[str] = None + finished_at: Optional[str] = None + # ν™•μž₯ ν•„λ“œ + timeout: int = 300 # μ›Œμ»€ νƒ€μž„μ•„μ›ƒ (초) + retry: int = 0 # 남은 μž¬μ‹œλ„ 횟수 + max_retry: int = 0 # μ΅œλŒ€ μž¬μ‹œλ„ + validate: Optional[str] = None # 검증 λͺ…λ Ή (shell), 성곡=exit 0 + on_fail: str = "wake" # μ‹€νŒ¨ μ‹œ: wake(μŸˆλΉ„μŠ€ 깨움), retry, skip + + def to_dict(self): + return {k: v for k, v in asdict(self).items() if v is not None and v != "" and v != []} + + +@dataclass +class LoopManifest: + id: str + created: str + status: str = "idle" # idle, running, paused, done, error + description: str = "" + workdir: str = "" + tasks: List[Task] = field(default_factory=list) + config: Dict[str, Any] = field(default_factory=lambda: { + "tick_interval": 5, + "wake_on": ["NEED_DECISION", "ERROR", "ALL_DONE"], + "max_concurrent_workers": 3, + }) + + def to_dict(self): + d = asdict(self) + d["tasks"] = [t.to_dict() if isinstance(t, Task) else t for t in self.tasks] + return d + + @classmethod + def from_dict(cls, d: dict) -> "LoopManifest": + tasks = [Task(**t) if isinstance(t, dict) else t for t in d.get("tasks", [])] + config = d.get("config", {}) + return cls( + id=d["id"], + created=d["created"], + status=d.get("status", "idle"), + description=d.get("description", ""), + workdir=d.get("workdir", ""), + tasks=tasks, + config=config, + ) + + +# ─── JSONL λ©”μ‹œμ§€ ─── + +@dataclass +class Message: + ts: str + type: str + data: Dict[str, Any] = field(default_factory=dict) + + def to_json(self) -> str: + return json.dumps({"ts": self.ts, "type": self.type, **self.data}, ensure_ascii=False) + + @classmethod + def from_json(cls, line: str) -> "Message": + d = json.loads(line.strip()) + ts = d.pop("ts") + typ = d.pop("type") + return cls(ts=ts, type=typ, data=d) + + +def now_ts() -> str: + return datetime.now().astimezone().isoformat(timespec="seconds") + + +def make_msg(typ: str, **kwargs) -> Message: + return Message(ts=now_ts(), type=typ, data=kwargs) diff --git a/skills/storm/orchestrator/scripts/workers.py b/skills/storm/orchestrator/scripts/workers.py new file mode 100644 index 00000000000..01b06a79d2c --- /dev/null +++ b/skills/storm/orchestrator/scripts/workers.py @@ -0,0 +1,160 @@ +""" +μ›Œμ»€ 관리 β€” subprocess 기반 비동기 μž‘μ—… μ‹€ν–‰ +""" +import asyncio +import os +from pathlib import Path +from typing import Optional, Callable, Awaitable +from protocol import Task, TaskStatus, now_ts + + +class WorkerProcess: + """ν•˜λ‚˜μ˜ νƒœμŠ€ν¬λ₯Ό μ‹€ν–‰ν•˜λŠ” μ›Œμ»€""" + + def __init__(self, task: Task, loop_dir: Path): + self.task = task + self.loop_dir = loop_dir + self.process: Optional[asyncio.subprocess.Process] = None + self.log_path = loop_dir / "workers" / f"{task.id}.log" + self.log_path.parent.mkdir(parents=True, exist_ok=True) + + def _build_command(self) -> str: + """νƒœμŠ€ν¬ νƒ€μž…μ— 따라 μ‹€ν–‰ λͺ…λ Ή 생성""" + t = self.task + if t.worker == "shell": + return t.command + elif t.worker == "opencode": + cmd = f'opencode run' + if t.workdir: + cmd += f' --dir {t.workdir}' + cmd += f' "{t.command}"' + return cmd + elif t.worker == "claude": + # ν”„λ‘¬ν”„νŠΈλ₯Ό μž„μ‹œ 파일둜 전달 (shell escaping 문제 νšŒν”Ό) + prompt_file = self.loop_dir / "workers" / f"{t.id}_prompt.txt" + prompt_file.write_text(t.command) + cmd = f'cat "{prompt_file}" | claude -p --allowedTools "Bash,Read,Write,Edit"' + return cmd + elif t.worker == "python": + return f'python3 {t.command}' + else: + return t.command + + async def start(self) -> int: + """μ›Œμ»€ ν”„λ‘œμ„ΈμŠ€ μ‹œμž‘, PID λ°˜ν™˜""" + cmd = self._build_command() + workdir = self.task.workdir or None + + log_file = open(self.log_path, "w") + + self.process = await asyncio.create_subprocess_shell( + cmd, + stdout=log_file, + stderr=asyncio.subprocess.STDOUT, + cwd=workdir, + env={**os.environ, "FORCE_COLOR": "0", "NO_COLOR": "1"}, + ) + + self.task.pid = self.process.pid + self.task.status = TaskStatus.RUNNING.value + self.task.started_at = now_ts() + return self.process.pid + + async def poll(self) -> Optional[int]: + """μ’…λ£Œ μ—¬λΆ€ 확인. μ‹€ν–‰ 쀑이면 None, λλ‚¬μœΌλ©΄ return code""" + if self.process is None: + return None + return self.process.returncode + + async def wait(self) -> int: + """μ™„λ£Œ λŒ€κΈ°""" + if self.process is None: + return -1 + return await self.process.wait() + + async def kill(self): + """κ°•μ œ μ’…λ£Œ""" + if self.process and self.process.returncode is None: + self.process.terminate() + try: + await asyncio.wait_for(self.process.wait(), timeout=5) + except asyncio.TimeoutError: + self.process.kill() + + def read_log_tail(self, lines: int = 20) -> str: + """둜그 λ§ˆμ§€λ§‰ N쀄""" + if not self.log_path.exists(): + return "" + text = self.log_path.read_text(errors="replace") + return "\n".join(text.splitlines()[-lines:]) + + +class WorkerManager: + """λ™μ‹œ μ‹€ν–‰ μ›Œμ»€ ν’€ 관리""" + + def __init__(self, loop_dir: Path, max_concurrent: int = 3): + self.loop_dir = loop_dir + self.max_concurrent = max_concurrent + self.active: dict[str, WorkerProcess] = {} # task_id β†’ WorkerProcess + + @property + def running_count(self) -> int: + return len(self.active) + + def can_start(self) -> bool: + return self.running_count < self.max_concurrent + + async def start_task(self, task: Task) -> WorkerProcess: + """νƒœμŠ€ν¬ μ›Œμ»€ μ‹œμž‘""" + wp = WorkerProcess(task, self.loop_dir) + await wp.start() + self.active[task.id] = wp + # νƒ€μž„μ•„μ›ƒ μ„€μ • + if task.timeout > 0: + asyncio.get_event_loop().call_later( + task.timeout, lambda tid=task.id: asyncio.ensure_future(self._timeout_task(tid)) + ) + return wp + + async def _timeout_task(self, task_id: str): + """νƒ€μž„μ•„μ›ƒ 초과 μ‹œ κ°•μ œ μ’…λ£Œ""" + if task_id in self.active: + wp = self.active[task_id] + if await wp.poll() is None: # 아직 μ‹€ν–‰ 쀑 + print(f"[Worker] ⏰ {task_id} timed out after {wp.task.timeout}s") + await self.kill_task(task_id) + wp.task.status = TaskStatus.FAILED.value + wp.task.error = f"timeout after {wp.task.timeout}s" + + async def poll_all(self) -> list[tuple[str, int, str]]: + """ + λͺ¨λ“  ν™œμ„± μ›Œμ»€ 폴링. + μ™„λ£Œλœ μ›Œμ»€ 리슀트 λ°˜ν™˜: [(task_id, return_code, log_tail)] + """ + completed = [] + for task_id, wp in list(self.active.items()): + rc = await wp.poll() + if rc is not None: + log_tail = wp.read_log_tail(30) + completed.append((task_id, rc, log_tail)) + wp.task.finished_at = now_ts() + if rc == 0: + wp.task.status = TaskStatus.DONE.value + else: + wp.task.status = TaskStatus.FAILED.value + wp.task.error = f"exit code {rc}" + del self.active[task_id] + return completed + + async def kill_task(self, task_id: str): + """νŠΉμ • μ›Œμ»€ κ°•μ œ μ’…λ£Œ""" + if task_id in self.active: + await self.active[task_id].kill() + self.active[task_id].task.status = TaskStatus.CANCELLED.value + self.active[task_id].task.finished_at = now_ts() + del self.active[task_id] + + async def kill_all(self): + """λͺ¨λ“  μ›Œμ»€ μ’…λ£Œ""" + for task_id in list(self.active.keys()): + await self.kill_task(task_id)