Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def cfg(path: str, default=None):
MAX_CONCURRENT_TASKS = cfg("execution.max_concurrent_tasks", 3)
TICK_INTERVAL = cfg("execution.tick_interval_sec", 2.0)
MAX_TOOL_ROUNDS = cfg("execution.max_tool_rounds", 10)
MAX_HISTORY_ROUNDS = cfg("execution.max_history_rounds", 4)
DEFAULT_MAX_TOKENS = cfg("execution.default_max_tokens", 4096)
MAX_TASK_RETRIES = cfg("execution.max_task_retries", 5)
WAVE_CHECKPOINTS = cfg("execution.wave_checkpoints", False)
Expand Down
5 changes: 4 additions & 1 deletion backend/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import httpx
from dependency_injector import containers, providers

from backend.config import cfg
from backend.db.connection import Database
from backend.services.auth import AuthService
from backend.services.oidc import OIDCService
Expand Down Expand Up @@ -54,7 +55,9 @@ class Container(containers.DeclarativeContainer):

# --- Core ---
db = providers.Singleton(Database)
http_client = providers.Singleton(httpx.AsyncClient, timeout=300.0)
http_client = providers.Singleton(
httpx.AsyncClient, timeout=cfg("execution.http_client_timeout", 300.0),
)
rag_cache = providers.Singleton(RAGIndexCache)
tool_registry = providers.Singleton(
ToolRegistry, http_client=http_client, rag_cache=rag_cache,
Expand Down
13 changes: 7 additions & 6 deletions backend/routes/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@ async def cancel_project(
now = time.time()
await db.execute_write(
"UPDATE tasks SET status = ?, updated_at = ? "
"WHERE project_id = ? AND status IN (?, ?, ?)",
"WHERE project_id = ? AND status IN (?, ?, ?, ?)",
(TaskStatus.CANCELLED, now, project_id,
TaskStatus.PENDING, TaskStatus.BLOCKED, TaskStatus.QUEUED),
TaskStatus.PENDING, TaskStatus.BLOCKED, TaskStatus.QUEUED, TaskStatus.RUNNING),
)
await db.execute_write(
"UPDATE projects SET status = ?, updated_at = ? WHERE id = ?",
Expand Down Expand Up @@ -611,9 +611,10 @@ async def get_coverage(
row = await _get_owned_project(db, project_id, current_user)
requirements = row["requirements"] or ""

# Parse requirement lines (same numbering as planner.py)
req_lines = [line.strip() for line in requirements.strip().split("\n") if line.strip()]
all_req_ids = [f"R{i + 1}" for i in range(len(req_lines))]
# Parse requirement blocks (same numbering as planner.py)
from backend.utils.json_utils import parse_requirements
req_blocks = parse_requirements(requirements)
all_req_ids = [f"R{i + 1}" for i in range(len(req_blocks))]

# Gather requirement IDs from all tasks in this project
task_rows = await db.fetchall(
Expand All @@ -629,7 +630,7 @@ async def get_coverage(
for i, req_id in enumerate(all_req_ids):
requirements_detail.append({
"id": req_id,
"text": req_lines[i],
"text": req_blocks[i],
"covered": req_id in covered,
})

Expand Down
2 changes: 2 additions & 0 deletions backend/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ async def review_task(
(TaskStatus.COMPLETED, time.time(), task_id),
)
elif body.action == "retry":
if row["retry_count"] >= MAX_TASK_RETRIES:
raise HTTPException(400, f"Maximum retry limit reached ({MAX_TASK_RETRIES})")
ctx = json.loads(row["context_json"]) if row["context_json"] else []
if body.feedback:
ctx.append({
Expand Down
9 changes: 8 additions & 1 deletion backend/services/claude_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import json
import logging

from backend.config import API_TIMEOUT, KNOWLEDGE_INJECTION_MAX_CHARS, MAX_TOOL_ROUNDS
from backend.config import API_TIMEOUT, KNOWLEDGE_INJECTION_MAX_CHARS, MAX_HISTORY_ROUNDS, MAX_TOOL_ROUNDS
from backend.models.enums import ModelTier
from backend.services.model_router import calculate_cost, get_model_id

Expand Down Expand Up @@ -200,6 +200,13 @@ async def run_claude_task(
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})

# Prune old rounds to prevent quadratic token growth.
# Keep the first user message (task description) + last N rounds
# (each round = assistant + user messages).
max_msgs = 1 + (MAX_HISTORY_ROUNDS * 2)
if len(messages) > max_msgs:
messages = [messages[0]] + messages[-(MAX_HISTORY_ROUNDS * 2):]

return {
"output": "\n".join(text_parts),
"prompt_tokens": total_prompt,
Expand Down
20 changes: 13 additions & 7 deletions backend/services/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,20 @@ async def _tick(self):
for project in projects:
pid = project["id"]

# Check budget
if not await self._budget.can_spend(0.001): # Minimal check
await self._progress.push_event(pid, "budget_warning", "Budget limit reached. Execution paused.")
await self._db.execute_write(
"UPDATE projects SET status = ?, updated_at = ? WHERE id = ?",
(ProjectStatus.PAUSED, time.time(), pid),
# Check budget — skip for projects with only Ollama (free) tasks remaining
if not await self._budget.can_spend(0.001):
non_ollama = await self._db.fetchone(
"SELECT COUNT(*) as cnt FROM tasks "
"WHERE project_id = ? AND model_tier != ? AND status NOT IN (?, ?, ?, ?)",
(pid, ModelTier.OLLAMA.value, *_TERMINAL),
)
continue
if non_ollama and non_ollama["cnt"] > 0:
await self._progress.push_event(pid, "budget_warning", "Budget limit reached. Execution paused.")
await self._db.execute_write(
"UPDATE projects SET status = ?, updated_at = ? WHERE id = ?",
(ProjectStatus.PAUSED, time.time(), pid),
)
continue

# Unblock tasks whose dependencies are now met
await self._update_blocked_tasks(pid)
Expand Down
13 changes: 10 additions & 3 deletions backend/services/knowledge_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Findings are project-scoped and deduplicated by content hash.
#
# Depends on: config.py, models/enums.py, db/connection.py,
# services/model_router.py
# services/model_router.py, utils/json_utils.py
# Used by: services/task_lifecycle.py

import hashlib
Expand All @@ -14,13 +14,14 @@
import uuid

from backend.config import (
API_TIMEOUT,
KNOWLEDGE_EXTRACTION_MAX_TOKENS,
KNOWLEDGE_EXTRACTION_MODEL,
KNOWLEDGE_MIN_OUTPUT_LENGTH,
)
from backend.models.enums import FindingCategory
from backend.services.model_router import calculate_cost
from backend.services.planner import _extract_json_object
from backend.utils.json_utils import extract_json_object

logger = logging.getLogger("orchestration.knowledge")

Expand Down Expand Up @@ -78,6 +79,11 @@ async def extract_knowledge(
if not output_text or len(output_text.strip()) < KNOWLEDGE_MIN_OUTPUT_LENGTH:
return []

# Skip extraction if budget is exhausted — task output is already paid for
if not await budget.can_spend(0.001):
logger.warning("Budget exhausted, skipping knowledge extraction for task %s", task_id)
return []

try:
return await _do_extract(
task_title=task_title,
Expand Down Expand Up @@ -116,6 +122,7 @@ async def _do_extract(
max_tokens=KNOWLEDGE_EXTRACTION_MAX_TOKENS,
system=_EXTRACTION_PROMPT,
messages=[{"role": "user", "content": user_msg}],
timeout=API_TIMEOUT,
)

pt = response.usage.input_tokens
Expand All @@ -139,7 +146,7 @@ async def _do_extract(
parsed = json.loads(raw)
except (json.JSONDecodeError, AttributeError):
# Fallback: extract JSON from markdown fences / trailing commas
parsed = _extract_json_object(raw)
parsed = extract_json_object(raw)

if not parsed or not isinstance(parsed, dict):
logger.debug("Could not parse knowledge extraction response: %s", raw[:200])
Expand Down
5 changes: 5 additions & 0 deletions backend/services/model_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
_warned_models: set[str] = set()


def _reset_warned_models():
"""Clear the warned-models set. Used by test fixtures to prevent state leak."""
_warned_models.clear()


# ---------------------------------------------------------------------------
# Model ID resolution
# ---------------------------------------------------------------------------
Expand Down
83 changes: 24 additions & 59 deletions backend/services/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
#
# Uses Claude to generate structured plans from requirements.
#
# Depends on: backend/config.py, services/model_router.py
# Depends on: backend/config.py, services/model_router.py, utils/json_utils.py
# Used by: routes/projects.py, container.py

import json
import re
import time
import uuid

Expand All @@ -16,6 +15,7 @@
from backend.exceptions import BudgetExhaustedError, NotFoundError, PlanParseError
from backend.models.enums import PlanningRigor, PlanStatus, ProjectStatus
from backend.services.model_router import calculate_cost
from backend.utils.json_utils import extract_json_object, parse_requirements

# Token estimates for budget reservation before API calls
_EST_PLANNING_INPUT_TOKENS = 2000 # system prompt (~1.5k) + requirements
Expand All @@ -28,58 +28,8 @@
PlanningRigor.L3: 8192,
}


def _strip_trailing_commas(json_str: str) -> str:
"""Remove trailing commas before ] and } that LLMs commonly produce.

Operates outside of string literals to avoid corrupting values.
E.g. [{"a":1},] -> [{"a":1}] and {"a":1,} -> {"a":1}
"""
return re.sub(r',\s*([}\]])', r'\1', json_str)


def _extract_json_object(text: str) -> dict | None:
"""Extract the first balanced JSON object from text.

Uses brace-counting instead of a greedy regex to avoid capturing
past the actual closing brace when Claude wraps JSON in explanation.
Falls back to stripping trailing commas if strict parsing fails.
"""
start = text.find("{")
if start == -1:
return None

depth = 0
in_string = False
escape = False
for i in range(start, len(text)):
ch = text[i]
if escape:
escape = False
continue
if ch == "\\":
escape = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
raw = text[start:i + 1]
try:
return json.loads(raw)
except json.JSONDecodeError:
# Retry after stripping trailing commas
try:
return json.loads(_strip_trailing_commas(raw))
except json.JSONDecodeError:
return None
return None
# Backward-compat alias for external importers
_extract_json_object = extract_json_object


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -304,14 +254,15 @@ async def generate(
if owns_client:
client = anthropic.AsyncAnthropic(api_key=ANTHROPIC_API_KEY)

# Number requirements for traceability
req_lines = [line for line in requirements.strip().split("\n") if line.strip()]
if req_lines:
numbered = "\n".join(f"[R{i+1}] {line}" for i, line in enumerate(req_lines))
# Number requirements for traceability (paragraph-based splitting)
req_blocks = parse_requirements(requirements)
if req_blocks:
numbered = "\n".join(f"[R{i+1}] {block}" for i, block in enumerate(req_blocks))
else:
numbered = requirements
user_msg = f"Project: {project_name}\n\nRequirements:\n{numbered}"

response = None
try:
response = await client.messages.create(
model=PLANNING_MODEL,
Expand All @@ -337,11 +288,25 @@ async def generate(
# Try to extract JSON from the response (in case of markdown fences).
# Use a balanced-brace approach to find the outermost JSON object,
# instead of a greedy regex that could match too much.
plan_data = _extract_json_object(response_text)
plan_data = extract_json_object(response_text)
if plan_data is None:
raise PlanParseError("Failed to parse plan JSON from Claude response")

except Exception:
# Record actual API spend even if parsing failed — prevents budget leak
if response is not None and hasattr(response, "usage"):
pt = response.usage.input_tokens
ct = response.usage.output_tokens
actual_cost = calculate_cost(PLANNING_MODEL, pt, ct)
await budget.record_spend(
cost_usd=actual_cost,
prompt_tokens=pt,
completion_tokens=ct,
provider="anthropic",
model=PLANNING_MODEL,
purpose="planning",
project_id=project_id,
)
# Reset project status so it's not stuck in PLANNING
await db.execute_write(
"UPDATE projects SET status = ?, updated_at = ? WHERE id = ?",
Expand Down
31 changes: 18 additions & 13 deletions backend/services/task_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,18 @@ async def verify_task_output(
retry_count = task_row["retry_count"]
max_retries = task_row["max_retries"]
if retry_count < max_retries:
# Auto-retry with verification feedback appended to context
# Auto-retry with verification feedback appended to context.
# Sliding window: keep the most recent feedbacks up to the cap.
ctx = json.loads(task_row["context_json"]) if task_row["context_json"] else []
# Cap feedback entries to prevent unbounded context growth
non_feedbacks = [e for e in ctx if e.get("type") != "verification_feedback"]
feedbacks = [e for e in ctx if e.get("type") == "verification_feedback"]
if len(feedbacks) >= _MAX_VERIFICATION_FEEDBACKS:
ctx = [e for e in ctx if e.get("type") != "verification_feedback"]
ctx.append({
feedbacks = feedbacks[-(_MAX_VERIFICATION_FEEDBACKS - 1):]
feedbacks.append({
"type": "verification_feedback",
"content": f"Previous attempt had gaps: {v_notes}. Address these issues.",
})
ctx = non_feedbacks + feedbacks
await db.execute_write(
"UPDATE tasks SET status = ?, context_json = ?, "
"retry_count = retry_count + 1, completed_at = NULL, updated_at = ? WHERE id = ?",
Expand Down Expand Up @@ -275,16 +277,19 @@ async def forward_context(*, completed_task, output_text, db):
}

for dep in deps:
dep_task = await db.fetchone(
"SELECT context_json FROM tasks WHERE id = ?", (dep["task_id"],),
)
if dep_task:
ctx = json.loads(dep_task["context_json"]) if dep_task["context_json"] else []
ctx.append(context_entry)
await db.execute_write(
"UPDATE tasks SET context_json = ?, updated_at = ? WHERE id = ?",
(json.dumps(ctx), time.time(), dep["task_id"]),
# Wrap each read-modify-write in a transaction to prevent
# concurrent upstream completions from clobbering each other.
async with db.transaction():
dep_task = await db.fetchone(
"SELECT context_json FROM tasks WHERE id = ?", (dep["task_id"],),
)
if dep_task:
ctx = json.loads(dep_task["context_json"]) if dep_task["context_json"] else []
ctx.append(context_entry)
await db.execute_write(
"UPDATE tasks SET context_json = ?, updated_at = ? WHERE id = ?",
(json.dumps(ctx), time.time(), dep["task_id"]),
)


async def execute_task(
Expand Down
Loading