Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions backend/app/model/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,44 @@ class QuestionAnalysisResult(BaseModel):
)


class TaskAnalysisResult(BaseModel):
"""Result of combined task analysis (complexity + summary).

Merges question_confirm (complexity) and summary_task (name/summary)
into one LLM call. See issue #1427.
"""

is_complex: bool = Field(
description="True if complex task requiring tools/workforce, "
"False if simple question answerable directly."
)
task_name: str | None = Field(
default=None,
description="Short descriptive task name. For complex tasks: describe "
"the task. For simple questions: short label (e.g. 'Greeting', "
"'Fact Query').",
)
summary: str | None = Field(
default=None,
description="Concise task summary. Only when is_complex=True.",
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add some post validation or postprocess that when is_complex=True task_name and summary need to be None.
Also cc @Wendong-Fan IMO we should still have a task_name when it's a non complex chat?


def has_valid_prefetch_data(self) -> bool:
"""Return True if prefetched summary can be used (skip summary_task).

When is_complex=True, task_name and summary must both be non-empty.
Otherwise we must fall back to calling summary_task.
"""
if not self.is_complex:
return False
return bool(
self.task_name
and self.task_name.strip()
and self.summary
and self.summary.strip()
)


McpServers = dict[Literal["mcpServers"], dict[str, dict]]


Expand Down
265 changes: 203 additions & 62 deletions backend/app/service/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import datetime
import json
import logging
import platform
from pathlib import Path
Expand Down Expand Up @@ -43,7 +44,14 @@
from app.agent.toolkit.skill_toolkit import SkillToolkit
from app.agent.toolkit.terminal_toolkit import TerminalToolkit
from app.agent.tools import get_mcp_tools, get_toolkits
from app.model.chat import Chat, NewAgent, Status, TaskContent, sse_json
from app.model.chat import (
Chat,
NewAgent,
Status,
TaskAnalysisResult,
TaskContent,
sse_json,
)
from app.service.task import (
Action,
ActionDecomposeProgressData,
Expand Down Expand Up @@ -501,17 +509,31 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
is_complex_task: bool
if len(attaches_to_use) > 0:
is_complex_task = True
task_lock._prefetched_summary = None
task_lock._fallback_task_name = "Task"
logger.info(
"[NEW-QUESTION] Has attachments"
", treating as complex task"
)
else:
is_complex_task = await question_confirm(
analysis = await analyze_task(
question_agent, question, task_lock
)
is_complex_task = analysis.is_complex
if analysis.has_valid_prefetch_data():
task_lock._prefetched_summary = (
f"{analysis.task_name}|{analysis.summary}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that different from the original one?
f"Task|{summary_task_content}" though I am not sure why we used the fixed Task before cc @Wendong-Fan

)
task_lock._fallback_task_name = None
else:
task_lock._prefetched_summary = None
task_lock._fallback_task_name = (
analysis.task_name
if analysis.task_name
else "Task"
)
logger.info(
"[NEW-QUESTION] question_confirm"
" result: is_complex="
"[NEW-QUESTION] analyze_task result: is_complex="
f"{is_complex_task}"
)

Expand Down Expand Up @@ -547,21 +569,29 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):

task_lock.add_conversation("assistant", answer_content)

wait_confirm_payload: dict[str, Any] = {
"content": answer_content,
"question": question,
}
if analysis.task_name:
wait_confirm_payload["task_name"] = (
analysis.task_name
)
yield sse_json(
"wait_confirm",
{"content": answer_content, "question": question},
wait_confirm_payload,
)
except Exception as e:
logger.error(f"Error generating simple answer: {e}")
yield sse_json(
"wait_confirm",
{
"content": "I encountered an error"
" while processing "
"your question.",
"question": question,
},
)
error_payload: dict[str, Any] = {
"content": "I encountered an error"
" while processing "
"your question.",
"question": question,
}
if analysis.task_name:
error_payload["task_name"] = analysis.task_name
yield sse_json("wait_confirm", error_payload)

# Clean up empty folder if it was created for this task
if (
Expand Down Expand Up @@ -736,57 +766,79 @@ async def run_decomposition():
except Exception:
pass

# Generate task summary
summary_task_agent = task_summary_agent(options)
try:
summary_task_content = await asyncio.wait_for(
summary_task(
summary_task_agent, camel_task
),
timeout=10,
)
task_lock.summary_generated = True
except TimeoutError:
logger.warning(
"summary_task timeout",
extra={
"project_id": options.project_id,
"task_id": options.task_id,
},
)
task_lock.summary_generated = True
content_preview = (
camel_task.content
if hasattr(camel_task, "content")
else ""
)
if content_preview is None:
content_preview = ""
if len(content_preview) > 80:
cp = content_preview[:80]
summary_task_content = cp + "..."
else:
summary_task_content = content_preview
summary_task_content = (
f"Task|{summary_task_content}"
)
except Exception:
# Generate task summary (use prefetched if available)
prefetched = getattr(
task_lock, "_prefetched_summary", None
)
if prefetched:
summary_task_content = prefetched
task_lock.summary_generated = True
content_preview = (
camel_task.content
if hasattr(camel_task, "content")
else ""
logger.debug(
"Using prefetched task summary from "
"analyze_task (issue #1427)"
)
if content_preview is None:
content_preview = ""
if len(content_preview) > 80:
cp = content_preview[:80]
summary_task_content = cp + "..."
else:
summary_task_content = content_preview
summary_task_content = (
f"Task|{summary_task_content}"
else:
summary_task_agent = task_summary_agent(
options
)
try:
summary_task_content = (
await asyncio.wait_for(
summary_task(
summary_task_agent,
camel_task,
),
timeout=10,
)
)
task_lock.summary_generated = True
except TimeoutError:
logger.warning(
"summary_task timeout",
extra={
"project_id": options.project_id,
"task_id": options.task_id,
},
)
task_lock.summary_generated = True
content_preview = (
camel_task.content
if hasattr(camel_task, "content")
else ""
)
if content_preview is None:
content_preview = ""
if len(content_preview) > 80:
cp = content_preview[:80]
summary_task_content = cp + "..."
else:
summary_task_content = content_preview
fallback_name = getattr(
task_lock,
"_fallback_task_name",
"Task",
)
summary_task_content = f"{fallback_name}|{summary_task_content}"
except Exception:
task_lock.summary_generated = True
content_preview = (
camel_task.content
if hasattr(camel_task, "content")
else ""
)
if content_preview is None:
content_preview = ""
if len(content_preview) > 80:
cp = content_preview[:80]
summary_task_content = cp + "..."
else:
summary_task_content = content_preview
fallback_name = getattr(
task_lock,
"_fallback_task_name",
"Task",
)
summary_task_content = f"{fallback_name}|{summary_task_content}"

state_holder["summary_task"] = summary_task_content
try:
Expand Down Expand Up @@ -1927,6 +1979,95 @@ def add_sub_tasks(
return added_tasks


async def analyze_task(
agent: ListenChatAgent,
question: str,
task_lock: TaskLock | None = None,
) -> TaskAnalysisResult:
"""Analyze user query in one LLM call: complexity + task name/summary.

Falls back to question_confirm when structured output parsing fails.
"""
context_prompt = ""
if task_lock:
context_prompt = build_conversation_context(
task_lock, header="=== Previous Conversation ==="
)

full_prompt = f"""{context_prompt}User Query: {question}

Analyze this user query and return structured JSON with these fields:
- is_complex (boolean): True if this requires tools, code execution, file operations,
multi-step planning, or creating/modifying content. False if it can be answered
directly (greetings, fact queries, clarifications, status checks).
- task_name (string, always): A short descriptive name. For complex tasks: describe
the task. For simple questions: use a short label (e.g. "Greeting", "Fact Query",
"Clarification").
- summary (string, only when is_complex): A concise summary of the task's main points.
Omit or use null when is_complex is false.

Examples of complex: "create a file", "search for X", "implement feature Y".
Examples of simple: "hello", "what is X?", "how are you?"

Return valid JSON only, no other text."""

try:
resp = agent.step(full_prompt, response_format=TaskAnalysisResult)

if not resp or not resp.msgs or len(resp.msgs) == 0:
logger.warning(
"analyze_task: no response, falling back to question_confirm"
)
is_complex = await question_confirm(agent, question, task_lock)
return TaskAnalysisResult(
is_complex=is_complex, task_name=None, summary=None
)

content = resp.msgs[0].content
parsed = getattr(resp.msgs[0], "parsed", None)

if parsed is not None and isinstance(parsed, TaskAnalysisResult):
logger.info(
"analyze_task: got structured result",
extra={"is_complex": parsed.is_complex},
)
return parsed

if content:
try:
data = json.loads(content.strip())
result = TaskAnalysisResult(
is_complex=bool(data.get("is_complex", True)),
task_name=data.get("task_name"),
summary=data.get("summary"),
)
logger.info(
"analyze_task: parsed JSON from content",
extra={"is_complex": result.is_complex},
)
return result
except (json.JSONDecodeError, TypeError):
pass

logger.warning(
"analyze_task: could not parse response, falling back to question_confirm"
)
is_complex = await question_confirm(agent, question, task_lock)
return TaskAnalysisResult(
is_complex=is_complex, task_name=None, summary=None
)

except Exception as e:
logger.warning(
f"analyze_task failed: {e}, falling back to question_confirm",
exc_info=True,
)
is_complex = await question_confirm(agent, question, task_lock)
return TaskAnalysisResult(
is_complex=is_complex, task_name=None, summary=None
)


async def question_confirm(
agent: ListenChatAgent, prompt: str, task_lock: TaskLock | None = None
) -> bool:
Expand Down
Loading
Loading