diff --git a/backend/app/utils/single_agent_worker.py b/backend/app/utils/single_agent_worker.py index acd4b6ba4..21cd22807 100644 --- a/backend/app/utils/single_agent_worker.py +++ b/backend/app/utils/single_agent_worker.py @@ -12,18 +12,13 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -import datetime import logging -from camel.agents.chat_agent import AsyncStreamingChatAgentResponse -from camel.societies.workforce.prompts import PROCESS_TASK_PROMPT from camel.societies.workforce.single_agent_worker import ( SingleAgentWorker as BaseSingleAgentWorker, ) -from camel.societies.workforce.utils import TaskResult -from camel.tasks.task import Task, TaskState, is_task_result_insufficient +from camel.tasks.task import Task, TaskState from camel.utils.context_utils import ContextUtility -from colorama import Fore from app.agent.listen_chat_agent import ListenChatAgent @@ -36,7 +31,7 @@ def __init__( description: str, worker: ListenChatAgent, use_agent_pool: bool = True, - pool_initial_size: int = 0, # Changed from 1 to 0 to avoid pre-creating clones that waste CDP resources + pool_initial_size: int = 0, pool_max_size: int = 10, auto_scale_pool: bool = True, use_structured_output_handler: bool = True, @@ -64,278 +59,41 @@ def __init__( context_utility=context_utility, enable_workflow_memory=enable_workflow_memory, ) - self.worker = worker # change type hint + self.worker = worker # narrow type hint + # Track current task id for process_task_id injection + self._current_task_id: str | None = None async def _process_task( self, task: Task, dependencies: list[Task], stream_callback=None ) -> TaskState: - r"""Processes a task with its dependencies using an efficient agent - management system. - - This method asynchronously processes a given task, considering its - dependencies, by sending a generated prompt to a worker agent. - Uses an agent pool for efficiency when enabled, or falls back to - cloning when pool is disabled. - - Args: - task (Task): The task to process, which includes necessary details - like content and type. - dependencies (List[Task]): Tasks that the given task depends on. - - Returns: - TaskState: `TaskState.DONE` if processed successfully, otherwise - `TaskState.FAILED`. - """ - # Log task details before getting agent (for clone tracking) task_content_preview = ( task.content[:100] + "..." if len(task.content) > 100 else task.content ) logger.debug( - f"[TASK REQUEST] Requesting agent for task_id={task.id}, content_preview='{task_content_preview}'" + f"[TASK REQUEST] task_id={task.id}, " + f"content_preview='{task_content_preview}'" ) + # Store task id so _get_worker_agent can inject process_task_id + self._current_task_id = task.id + try: + return await super()._process_task( + task, dependencies, stream_callback + ) + finally: + self._current_task_id = None - # Get agent efficiently (from pool or by cloning) - worker_agent = await self._get_worker_agent() - worker_agent.process_task_id = task.id # type: ignore rewrite line - + async def _get_worker_agent(self): + agent = await super()._get_worker_agent() + # Inject eigent-specific process_task_id + if self._current_task_id and hasattr(agent, "process_task_id"): + agent.process_task_id = self._current_task_id logger.info( "Starting task processing", extra={ - "task_id": task.id, - "worker_agent_id": worker_agent.agent_id, - "dependencies_count": len(dependencies), + "task_id": self._current_task_id, + "worker_agent_id": agent.agent_id, }, ) - - response_content = "" - final_response = None - try: - dependency_tasks_info = self._get_dep_tasks_info(dependencies) - prompt = PROCESS_TASK_PROMPT.format( - content=task.content, - parent_task_content=task.parent.content if task.parent else "", - dependency_tasks_info=dependency_tasks_info, - additional_info=task.additional_info, - ) - - if self.use_structured_output_handler and self.structured_handler: - # Use structured output handler for prompt-based extraction - enhanced_prompt = self.structured_handler.generate_structured_prompt( - base_prompt=prompt, - schema=TaskResult, - examples=[ - { - "content": "I have successfully completed the task...", - "failed": False, - } - ], - additional_instructions="Ensure you provide a clear " - "description of what was done and whether the task " - "succeeded or failed.", - ) - response = await worker_agent.astep(enhanced_prompt) - - # Handle streaming response - if isinstance(response, AsyncStreamingChatAgentResponse): - # With stream_accumulate=False, we need to accumulate delta content - accumulated_content = "" - last_chunk = None - chunk_count = 0 - async for chunk in response: - chunk_count += 1 - last_chunk = chunk - if chunk.msg and chunk.msg.content: - accumulated_content += chunk.msg.content - logger.info( - f"Streaming complete: {chunk_count} chunks, content_length={len(accumulated_content)}" - ) - response_content = accumulated_content - # Store usage info from last chunk for later use - response._last_chunk_info = ( - last_chunk.info if last_chunk else {} - ) - else: - # Regular ChatAgentResponse - response_content = ( - response.msg.content if response.msg else "" - ) - - task_result = ( - self.structured_handler.parse_structured_response( - response_text=response_content, - schema=TaskResult, - fallback_values={ - "content": "Task processing failed", - "failed": True, - }, - ) - ) - else: - # Use native structured output if supported - response = await worker_agent.astep( - prompt, response_format=TaskResult - ) - - # Handle streaming response for native output (shouldn't happen now but keep for safety) - if isinstance(response, AsyncStreamingChatAgentResponse): - task_result = None - # With stream_accumulate=False, we need to accumulate delta content - accumulated_content = "" - last_chunk = None - async for chunk in response: - last_chunk = chunk - if chunk.msg: - if chunk.msg.content: - accumulated_content += chunk.msg.content - if chunk.msg.parsed: - task_result = chunk.msg.parsed - response_content = accumulated_content - # Store usage info from last chunk for later use - response._last_chunk_info = ( - last_chunk.info if last_chunk else {} - ) - # If no parsed result found in streaming, create fallback - if task_result is None: - task_result = TaskResult( - content="Failed to parse streaming response", - failed=True, - ) - else: - # Regular ChatAgentResponse - task_result = response.msg.parsed - response_content = ( - response.msg.content if response.msg else "" - ) - - # Get token usage from the response - if isinstance(response, AsyncStreamingChatAgentResponse): - # For streaming responses, get info from last chunk captured during iteration - chunk_info = getattr(response, "_last_chunk_info", {}) - usage_info = chunk_info.get("usage") or chunk_info.get( - "token_usage" - ) - else: - usage_info = response.info.get("usage") or response.info.get( - "token_usage" - ) - total_tokens = ( - usage_info.get("total_tokens", 0) if usage_info else 0 - ) - - # collect conversation from working agent to - # accumulator for workflow memory - # Only transfer memory if workflow memory is enabled - if self.enable_workflow_memory: - accumulator = self._get_conversation_accumulator() - - # transfer all memory records from working agent to accumulator - try: - # retrieve all context records from the working agent - work_records = worker_agent.memory.retrieve() - - # write these records to the accumulator's memory - memory_records = [ - record.memory_record for record in work_records - ] - accumulator.memory.write_records(memory_records) - - logger.debug( - f"Transferred {len(memory_records)} memory records to accumulator" - ) - - except Exception as e: - logger.warning( - f"Failed to transfer conversation to accumulator: {e}" - ) - - except Exception as e: - logger.error( - f"Error processing task {task.id}: {type(e).__name__}: {e}" - ) - # Store error information in task result - task.result = f"{type(e).__name__}: {e!s}" - return TaskState.FAILED - finally: - # Return agent to pool or let it be garbage collected - await self._return_worker_agent(worker_agent) - - # Populate additional_info with worker attempt details - if task.additional_info is None: - task.additional_info = {} - - # Create worker attempt details with descriptive keys - # Use final_response if available (streaming), otherwise use response - response_for_info = ( - final_response if final_response is not None else response - ) - worker_attempt_details = { - "agent_id": getattr( - worker_agent, "agent_id", worker_agent.role_name - ), - "original_worker_id": getattr( - self.worker, "agent_id", self.worker.role_name - ), - "timestamp": str(datetime.datetime.now()), - "description": f"Attempt by " - f"{getattr(worker_agent, 'agent_id', worker_agent.role_name)} " - f"(from pool/clone of " - f"{getattr(self.worker, 'agent_id', self.worker.role_name)}) " - f"to process task: {task.content}", - "response_content": response_content[:50], - "tool_calls": str( - response_for_info.info.get("tool_calls", []) - if response_for_info and hasattr(response_for_info, "info") - else [] - )[:50], - "total_tokens": total_tokens, - } - - # Store the worker attempt in additional_info - if "worker_attempts" not in task.additional_info: - task.additional_info["worker_attempts"] = [] - task.additional_info["worker_attempts"].append(worker_attempt_details) - - # Store the actual token usage for this specific task - task.additional_info["token_usage"] = {"total_tokens": total_tokens} - - print(f"======\n{Fore.GREEN}Response from {self}:{Fore.RESET}") - - logger.info(f"Response from {self}:") - - if not self.use_structured_output_handler: - # Handle native structured output parsing - if task_result is None: - logger.error( - "Error in worker step execution: Invalid task result" - ) - print( - f"{Fore.RED}Error in worker step execution: Invalid task result{Fore.RESET}" - ) - task_result = TaskResult( - content="Failed to generate valid task result.", - failed=True, - ) - - color = Fore.RED if task_result.failed else Fore.GREEN # type: ignore[union-attr] - print( - f"\n{color}{task_result.content}{Fore.RESET}\n======", # type: ignore[union-attr] - ) - - if task_result.failed: # type: ignore[union-attr] - logger.error(f"{task_result.content}") # type: ignore[union-attr] - else: - logger.info(f"{task_result.content}") # type: ignore[union-attr] - - task.result = task_result.content # type: ignore[union-attr] - - if task_result.failed: # type: ignore[union-attr] - return TaskState.FAILED - - if is_task_result_insufficient(task): - logger.warning( - f"Task {task.id}: Content validation failed - task marked as failed" - ) - return TaskState.FAILED - return TaskState.DONE + return agent diff --git a/backend/tests/app/utils/test_single_agent_worker.py b/backend/tests/app/utils/test_single_agent_worker.py index 129e9b95d..c71970196 100644 --- a/backend/tests/app/utils/test_single_agent_worker.py +++ b/backend/tests/app/utils/test_single_agent_worker.py @@ -15,9 +15,9 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from camel.agents.chat_agent import AsyncStreamingChatAgentResponse -from camel.societies.workforce.utils import TaskResult -from camel.tasks import Task +from camel.societies.workforce.single_agent_worker import ( + SingleAgentWorker as BaseSingleAgentWorker, +) from camel.tasks.task import TaskState from app.agent.listen_chat_agent import ListenChatAgent @@ -28,7 +28,7 @@ class TestSingleAgentWorker: """Test cases for SingleAgentWorker class.""" - def test_single_agent_worker_initialization(self): + def test_initialization(self): """Test SingleAgentWorker initialization.""" mock_worker = MagicMock(spec=ListenChatAgent) mock_worker.role_name = "test_worker" @@ -41,645 +41,106 @@ def test_single_agent_worker_initialization(self): use_agent_pool=True, pool_initial_size=2, pool_max_size=5, - auto_scale_pool=True, - use_structured_output_handler=True, ) assert worker.worker is mock_worker assert worker.use_agent_pool is True - assert worker.use_structured_output_handler is True - # Pool configuration is managed by the AgentPool, not as individual attributes - assert worker.agent_pool is not None # Pool should be created - assert worker.use_structured_output_handler is True + assert worker.agent_pool is not None - @pytest.mark.asyncio - async def test_process_task_success_with_structured_output(self): - """Test _process_task with successful structured output.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.role_name = "test_worker" - mock_worker.agent_id = "worker_123" - mock_worker.agent_name = "test_worker" - - worker = SingleAgentWorker( - description="Test worker", - worker=mock_worker, - use_structured_output_handler=True, - ) - - # Mock the structured handler - mock_structured_handler = MagicMock() - worker.structured_handler = mock_structured_handler - - # Create test task - task = Task(content="Test task content", id="test_task_123") - dependencies = [] - - # Mock worker agent retrieval and return - mock_worker_agent = AsyncMock() - mock_worker_agent.role_name = "pooled_worker" - mock_worker_agent.agent_id = "pooled_worker_123" - - # Mock response - mock_response = MagicMock() - mock_response.msg.content = "Task completed successfully" - mock_response.info = {"usage": {"total_tokens": 100}} - - mock_worker_agent.astep.return_value = mock_response - - # Mock structured output parsing - mock_task_result = TaskResult( - content="Task completed successfully", failed=False - ) - mock_structured_handler.parse_structured_response.return_value = ( - mock_task_result - ) - mock_structured_handler.generate_structured_prompt.return_value = ( - "Enhanced prompt" - ) - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(task, dependencies) - - assert result == TaskState.DONE - assert task.result == "Task completed successfully" - assert "worker_attempts" in task.additional_info - assert len(task.additional_info["worker_attempts"]) == 1 - - attempt = task.additional_info["worker_attempts"][0] - assert attempt["agent_id"] == "pooled_worker_123" - assert attempt["total_tokens"] == 100 - - mock_return_agent.assert_called_once_with(mock_worker_agent) - - @pytest.mark.asyncio - async def test_process_task_success_with_native_structured_output(self): - """Test _process_task with successful native structured output.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.role_name = "test_worker" - mock_worker.agent_id = "worker_123" - mock_worker.agent_name = "test_worker" - - worker = SingleAgentWorker( - description="Test worker", - worker=mock_worker, - use_structured_output_handler=False, # Use native structured output - ) - - # Create test task - task = Task(content="Test task content", id="test_task_123") - dependencies = [] - - # Mock worker agent - mock_worker_agent = AsyncMock() - mock_worker_agent.role_name = "pooled_worker" - mock_worker_agent.agent_id = "pooled_worker_123" - - # Mock response with parsed result - mock_response = MagicMock() - mock_response.msg.content = "Task completed successfully" - mock_response.msg.parsed = TaskResult( - content="Task completed successfully", failed=False - ) - mock_response.info = {"usage": {"total_tokens": 75}} - - mock_worker_agent.astep.return_value = mock_response - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(task, dependencies) - - assert result == TaskState.DONE - assert task.result == "Task completed successfully" - - # Verify native structured output was used - mock_worker_agent.astep.assert_called_once() - call_args = mock_worker_agent.astep.call_args - assert "response_format" in call_args.kwargs - assert call_args.kwargs["response_format"] == TaskResult - - mock_return_agent.assert_called_once_with(mock_worker_agent) - - @pytest.mark.skip(reason="Complex streaming response mock - needs fixing") - @pytest.mark.asyncio - async def test_process_task_with_streaming_response(self): - """Test _process_task with streaming response.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.role_name = "test_worker" - mock_worker.agent_id = "test_agent_123" - mock_worker.agent_name = "test_worker" - - worker = SingleAgentWorker( - description="Test worker", - worker=mock_worker, - use_structured_output_handler=True, - ) - - # Mock structured handler - mock_structured_handler = MagicMock() - worker.structured_handler = mock_structured_handler - - task = Task(content="Test task content", id="test_task_123") - dependencies = [] - - # Mock worker agent - mock_worker_agent = AsyncMock() - mock_worker_agent.role_name = "streaming_worker" - mock_worker_agent.agent_id = "streaming_worker_123" - - # Create mock streaming response - mock_streaming_response = MagicMock( - spec=AsyncStreamingChatAgentResponse - ) - - # Mock the async iteration - create async generator - async def async_chunks(): - chunk1 = MagicMock() - chunk1.msg.content = "Partial response" - yield chunk1 - chunk2 = MagicMock() - chunk2.msg.content = "Complete response" - yield chunk2 - - mock_streaming_response.__aiter__ = lambda self: async_chunks() - - mock_worker_agent.astep.return_value = mock_streaming_response - - # Mock structured parsing - mock_task_result = TaskResult( - content="Complete response", failed=False - ) - mock_structured_handler.parse_structured_response.return_value = ( - mock_task_result - ) - mock_structured_handler.generate_structured_prompt.return_value = ( - "Enhanced prompt" - ) - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(task, dependencies) - - assert result == TaskState.DONE - assert task.result == "Complete response" - mock_return_agent.assert_called_once_with(mock_worker_agent) - - @pytest.mark.asyncio - async def test_process_task_failure_exception(self): - """Test _process_task handles exceptions properly.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.role_name = "test_worker" - mock_worker.agent_id = "test_agent_123" - mock_worker.agent_name = "test_worker" - - worker = SingleAgentWorker( - description="Test worker", worker=mock_worker - ) - - task = Task(content="Test task content", id="test_task_123") - dependencies = [] - - # Mock worker agent that raises exception - mock_worker_agent = AsyncMock() - mock_worker_agent.astep.side_effect = Exception("Processing error") - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(task, dependencies) - - assert result == TaskState.FAILED - assert "Exception: Processing error" in task.result - mock_return_agent.assert_called_once_with(mock_worker_agent) - - @pytest.mark.asyncio - async def test_process_task_with_failed_task_result(self): - """Test _process_task when task result indicates failure.""" + def test_inherits_from_base_class(self): + """Test that SingleAgentWorker inherits from BaseSingleAgentWorker.""" mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.role_name = "test_worker" mock_worker.agent_id = "test_agent_123" mock_worker.agent_name = "test_worker" + worker = SingleAgentWorker(description="Test", worker=mock_worker) - worker = SingleAgentWorker( - description="Test worker", - worker=mock_worker, - use_structured_output_handler=True, - ) - - # Mock structured handler - mock_structured_handler = MagicMock() - worker.structured_handler = mock_structured_handler - - task = Task(content="Test task content", id="test_task_123") - dependencies = [] - - # Mock worker agent - mock_worker_agent = AsyncMock() - mock_response = MagicMock() - mock_response.msg.content = "Task failed" - mock_response.info = {"usage": {"total_tokens": 25}} - mock_worker_agent.astep.return_value = mock_response - - # Mock failed task result - mock_task_result = TaskResult( - content="Task failed due to error", failed=True - ) - mock_structured_handler.parse_structured_response.return_value = ( - mock_task_result - ) - mock_structured_handler.generate_structured_prompt.return_value = ( - "Enhanced prompt" - ) - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(task, dependencies) - - assert result == TaskState.FAILED - assert task.result == "Task failed due to error" - mock_return_agent.assert_called_once_with(mock_worker_agent) + assert isinstance(worker, BaseSingleAgentWorker) @pytest.mark.asyncio - async def test_process_task_with_dependencies(self): - """Test _process_task with task dependencies.""" + async def test_get_worker_agent_injects_process_task_id(self): + """Test that _get_worker_agent sets process_task_id on the agent.""" mock_worker = MagicMock(spec=ListenChatAgent) mock_worker.role_name = "test_worker" - mock_worker.agent_id = "test_agent_123" + mock_worker.agent_id = "worker_123" mock_worker.agent_name = "test_worker" worker = SingleAgentWorker( description="Test worker", worker=mock_worker, - use_structured_output_handler=False, ) - # Create main task and dependencies - main_task = Task(content="Main task", id="main_123") - dep_task1 = Task(content="Dependency 1", id="dep_1") - dep_task2 = Task(content="Dependency 2", id="dep_2") - dependencies = [dep_task1, dep_task2] - - # Mock worker agent - mock_worker_agent = AsyncMock() - mock_response = MagicMock() - mock_response.msg.content = "Task completed with dependencies" - mock_response.msg.parsed = TaskResult( - content="Task completed with dependencies", failed=False - ) - mock_response.info = {"usage": {"total_tokens": 120}} - mock_worker_agent.astep.return_value = mock_response + mock_agent = MagicMock(spec=ListenChatAgent) + mock_agent.agent_id = "pooled_agent_1" + mock_agent.process_task_id = "" - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, - "_get_dep_tasks_info", - return_value="Dependencies: dep_1, dep_2", - ) as mock_get_deps, + with patch.object( + BaseSingleAgentWorker, + "_get_worker_agent", + new_callable=AsyncMock, + return_value=mock_agent, ): - result = await worker._process_task(main_task, dependencies) + worker._current_task_id = "task-42" + agent = await worker._get_worker_agent() - assert result == TaskState.DONE - assert main_task.result == "Task completed with dependencies" - - # Verify dependencies were processed - mock_get_deps.assert_called_once_with(dependencies) - mock_return_agent.assert_called_once_with(mock_worker_agent) + assert agent.process_task_id == "task-42" + assert agent is mock_agent @pytest.mark.asyncio - async def test_process_task_with_parent_task(self): - """Test _process_task with parent task context.""" + async def test_process_task_delegates_to_base(self): + """Test that _process_task delegates to base class.""" mock_worker = MagicMock(spec=ListenChatAgent) mock_worker.role_name = "test_worker" - mock_worker.agent_id = "test_agent_123" + mock_worker.agent_id = "worker_123" mock_worker.agent_name = "test_worker" worker = SingleAgentWorker( description="Test worker", worker=mock_worker, - use_structured_output_handler=False, ) - # Create parent and child task - parent_task = Task(content="Parent task", id="parent_123") - child_task = Task(content="Child task", id="child_123") - child_task.parent = parent_task + from camel.tasks.task import Task - # Mock worker agent - mock_worker_agent = AsyncMock() - mock_response = MagicMock() - mock_response.msg.content = "Child task completed" - mock_response.msg.parsed = TaskResult( - content="Child task completed", failed=False - ) - mock_response.info = {"usage": {"total_tokens": 80}} - mock_worker_agent.astep.return_value = mock_response + task = Task(content="Test task", id="task-1") - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(child_task, []) + with patch.object( + BaseSingleAgentWorker, + "_process_task", + new_callable=AsyncMock, + return_value=TaskState.DONE, + ) as mock_base_process: + result = await worker._process_task(task, []) assert result == TaskState.DONE - assert child_task.result == "Child task completed" - - # Verify the prompt included parent task context - call_args = mock_worker_agent.astep.call_args - prompt = call_args[0][0] # First positional argument - assert "Parent task" in prompt - - mock_return_agent.assert_called_once_with(mock_worker_agent) + mock_base_process.assert_called_once_with(task, [], None) + # _current_task_id should be cleaned up + assert worker._current_task_id is None @pytest.mark.asyncio - async def test_process_task_content_validation_failure(self): - """Test _process_task when content validation fails.""" + async def test_process_task_cleans_up_on_failure(self): + """Test that _current_task_id is cleaned up even on failure.""" mock_worker = MagicMock(spec=ListenChatAgent) mock_worker.role_name = "test_worker" - mock_worker.agent_id = "test_agent_123" + mock_worker.agent_id = "worker_123" mock_worker.agent_name = "test_worker" worker = SingleAgentWorker( description="Test worker", worker=mock_worker, - use_structured_output_handler=False, - ) - - task = Task(content="Test task content", id="test_task_123") - - # Mock worker agent - mock_worker_agent = AsyncMock() - mock_response = MagicMock() - mock_response.msg.content = "Task completed" - mock_response.msg.parsed = TaskResult( - content="Task completed", failed=False - ) - mock_response.info = {"usage": {"total_tokens": 50}} - mock_worker_agent.astep.return_value = mock_response - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - patch( - "app.utils.single_agent_worker.is_task_result_insufficient", - return_value=True, - ), - ): - result = await worker._process_task(task, []) - - assert result == TaskState.FAILED - mock_return_agent.assert_called_once_with(mock_worker_agent) - - def test_worker_inherits_from_base_class(self): - """Test that SingleAgentWorker inherits from BaseSingleAgentWorker.""" - from camel.societies.workforce.single_agent_worker import ( - SingleAgentWorker as BaseSingleAgentWorker, - ) - - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.agent_id = "test_agent_123" - mock_worker.agent_name = "test_worker" - worker = SingleAgentWorker(description="Test", worker=mock_worker) - - assert isinstance(worker, BaseSingleAgentWorker) - - -@pytest.mark.integration -class TestSingleAgentWorkerIntegration: - """Integration tests for SingleAgentWorker.""" - - @pytest.mark.asyncio - async def test_worker_with_multiple_tasks(self): - """Test worker processing multiple tasks in sequence.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.role_name = "integration_worker" - mock_worker.agent_id = "test_agent_123" - mock_worker.agent_name = "integration_worker" - - worker = SingleAgentWorker( - description="Integration test worker", - worker=mock_worker, - use_structured_output_handler=False, ) - # Create multiple tasks - tasks = [Task(content=f"Task {i}", id=f"task_{i}") for i in range(3)] + from camel.tasks.task import Task - # Mock worker agent for all tasks - mock_worker_agent = AsyncMock() - - def mock_astep(prompt, **kwargs): - mock_response = MagicMock() - mock_response.msg.content = f"Completed: {prompt[:20]}..." - mock_response.msg.parsed = TaskResult( - content=f"Completed: {prompt[:20]}...", failed=False - ) - mock_response.info = {"usage": {"total_tokens": 60}} - return mock_response - - mock_worker_agent.astep.side_effect = mock_astep + task = Task(content="Test task", id="task-1") with ( patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent"), - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" + BaseSingleAgentWorker, + "_process_task", + new_callable=AsyncMock, + side_effect=RuntimeError("boom"), ), + pytest.raises(RuntimeError, match="boom"), ): - # Process all tasks - results = [] - for task in tasks: - result = await worker._process_task(task, []) - results.append(result) - - # All tasks should succeed - assert all(result == TaskState.DONE for result in results) - - # Each task should have results - for task in tasks: - assert task.result is not None - assert "Completed:" in task.result - assert "worker_attempts" in task.additional_info - - -@pytest.mark.model_backend -class TestSingleAgentWorkerWithLLM: - """Tests that require LLM backend (marked for selective running).""" + await worker._process_task(task, []) - @pytest.mark.asyncio - async def test_worker_with_real_agent(self): - """Test SingleAgentWorker with real ListenChatAgent.""" - # This test would use real agent instances and LLM calls - # Marked as model_backend test for selective execution - assert True # Placeholder - - @pytest.mark.very_slow - async def test_worker_full_workflow_integration(self): - """Test SingleAgentWorker in full workflow context (very slow test).""" - # This test would run complete workflow with real agents - # Marked as very_slow for execution only in full test mode - assert True # Placeholder - - -@pytest.mark.unit -class TestSingleAgentWorkerErrorCases: - """Test error cases and edge conditions for SingleAgentWorker.""" - - @pytest.mark.asyncio - async def test_process_task_with_none_response(self): - """Test _process_task when agent returns None response.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.agent_id = "test_agent_123" - mock_worker.agent_name = "test_worker" - worker = SingleAgentWorker( - description="Test", - worker=mock_worker, - use_structured_output_handler=False, - ) - - task = Task(content="Test task", id="test_123") - - # Mock worker agent returning None - mock_worker_agent = AsyncMock() - mock_worker_agent.astep.return_value = None - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(task, []) - - # Should handle None response gracefully - assert result == TaskState.FAILED - mock_return_agent.assert_called_once_with(mock_worker_agent) - - @pytest.mark.asyncio - async def test_process_task_with_malformed_response(self): - """Test _process_task with malformed response structure.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.agent_id = "test_agent_123" - mock_worker.agent_name = "test_worker" - worker = SingleAgentWorker( - description="Test", - worker=mock_worker, - use_structured_output_handler=False, - ) - - task = Task(content="Test task", id="test_123") - - # Mock worker agent with malformed response - mock_worker_agent = AsyncMock() - mock_response = MagicMock() - mock_response.msg = None # Missing msg attribute - mock_response.info = {} - mock_worker_agent.astep.return_value = mock_response - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - # Should handle malformed response and likely raise exception - result = await worker._process_task(task, []) - - # Depending on implementation, this might fail or handle gracefully - assert result in [TaskState.FAILED, TaskState.DONE] - mock_return_agent.assert_called_once_with(mock_worker_agent) - - @pytest.mark.asyncio - async def test_process_task_with_missing_usage_info(self): - """Test _process_task when usage information is missing.""" - mock_worker = MagicMock(spec=ListenChatAgent) - mock_worker.agent_id = "test_agent_123" - mock_worker.role_name = "test_worker" - mock_worker.agent_name = "test_worker" - worker = SingleAgentWorker( - description="Test", - worker=mock_worker, - use_structured_output_handler=False, - ) - - task = Task(content="Test task", id="test_123") - - # Mock worker agent with missing usage info - mock_worker_agent = AsyncMock() - mock_response = MagicMock() - mock_response.msg.content = "Task completed" - mock_response.msg.parsed = TaskResult( - content="Task completed", failed=False - ) - mock_response.info = {} # Missing usage information - mock_worker_agent.astep.return_value = mock_response - - with ( - patch.object( - worker, "_get_worker_agent", return_value=mock_worker_agent - ), - patch.object(worker, "_return_worker_agent") as mock_return_agent, - patch.object( - worker, "_get_dep_tasks_info", return_value="No dependencies" - ), - ): - result = await worker._process_task(task, []) - - assert result == TaskState.DONE - assert task.additional_info["token_usage"]["total_tokens"] == 0 - mock_return_agent.assert_called_once_with(mock_worker_agent) + assert worker._current_task_id is None