diff --git a/plugins/automation/runtime_data/state.py b/plugins/automation/runtime_data/state.py index 60ea1cf..6da8f20 100644 --- a/plugins/automation/runtime_data/state.py +++ b/plugins/automation/runtime_data/state.py @@ -83,21 +83,23 @@ def __init__( def get(self, path: str, default: Any = None) -> Any: """ - Get value from context using dot notation path. + Get value from context using dot notation path with array indexing. Supports: - inputs.key - steps.step_id.result + - steps.step_id.result.tasks[0].items - steps.step_id.status - outputs.key Args: - path: Dot-notation path (e.g., "steps.step1.result") + path: Dot-notation path with array indexing (e.g., "steps.step1.result.tasks[0].items") default: Default value if path not found Returns: Value at path or default """ + parts = path.split(".") if not parts: return default @@ -106,6 +108,10 @@ def get(self, path: str, default: Any = None) -> Any: if parts[0] == "inputs": obj = self.inputs parts = parts[1:] + elif parts[0] in self.inputs: + # Direct access to input variable with nested path (e.g., "chunk.items") + obj = self.inputs[parts[0]] + parts = parts[1:] elif parts[0] == "steps": if len(parts) < 2: return default @@ -130,14 +136,41 @@ def get(self, path: str, default: Any = None) -> Any: else: return default - # Navigate remaining path + # Navigate remaining path with array indexing support for part in parts: - if isinstance(obj, dict): - obj = obj.get(part) - if obj is None: + # Check for array indexing like "tasks[0]" + if "[" in part and part.endswith("]"): + # Split into key and index + key, index_str = part[:-1].split("[", 1) + try: + index = int(index_str) + except ValueError: + return default + + # Get the array/list + if isinstance(obj, dict): + obj = obj.get(key) + if obj is None: + return default + else: + return default + + # Index into the array + if isinstance(obj, list): + if 0 <= index < len(obj): + obj = obj[index] + else: + return default + else: return default else: - return default + # Regular dictionary access + if isinstance(obj, dict): + obj = obj.get(part) + if obj is None: + return default + else: + return default return obj if obj is not None else default diff --git a/plugins/automation/tests/test_ai_split_operation.py b/plugins/automation/tests/test_ai_split_operation.py deleted file mode 100644 index f32e75b..0000000 --- a/plugins/automation/tests/test_ai_split_operation.py +++ /dev/null @@ -1,377 +0,0 @@ -"""Tests for AI-powered split operation.""" - -import pytest -import json -from unittest.mock import AsyncMock, MagicMock, patch - -from plugins.automation.workflows.steps.operations import AISplitOperation - - -class TestAISplitOperation: - """Test AI-powered split operation.""" - - def test_validation_success(self): - """Test validation with valid config.""" - config = {"max_tasks": 5, "min_tasks": 2} - inputs = {"goal": "Understand authentication"} - - operation = AISplitOperation(config, inputs) - assert operation.validate() is None - - def test_validation_missing_goal(self): - """Test validation fails without goal.""" - config = {} - inputs = {} - - operation = AISplitOperation(config, inputs) - error = operation.validate() - assert error is not None - assert "goal" in error.lower() - - def test_validation_invalid_task_counts(self): - """Test validation fails when max_tasks < min_tasks.""" - config = {"max_tasks": 2, "min_tasks": 5} - inputs = {"goal": "Test"} - - operation = AISplitOperation(config, inputs) - error = operation.validate() - assert error is not None - assert "max_tasks" in error - - def test_build_split_prompt_basic(self): - """Test prompt building with basic inputs.""" - config = {"min_tasks": 2, "max_tasks": 5} - inputs = {"goal": "Understand authentication"} - - operation = AISplitOperation(config, inputs) - prompt = operation._build_split_prompt( - goal="Understand authentication", - codebase_path=None, - constraints=None, - context="", - min_tasks=2, - max_tasks=5 - ) - - assert "Understand authentication" in prompt - assert "2-5" in prompt - assert "JSON" in prompt - - def test_build_split_prompt_comprehensive(self): - """Test prompt building with all optional fields.""" - config = {} - inputs = {"goal": "Test"} - - operation = AISplitOperation(config, inputs) - prompt = operation._build_split_prompt( - goal="Understand security", - codebase_path="/path/to/code", - constraints="Focus on backend only", - context="Legacy system", - min_tasks=3, - max_tasks=8 - ) - - assert "Understand security" in prompt - assert "/path/to/code" in prompt - assert "Focus on backend only" in prompt - assert "Legacy system" in prompt - - @pytest.mark.asyncio - async def test_execute_with_valid_ai_response(self): - """Test execution with valid AI JSON response.""" - config = {"model": "haiku", "max_tasks": 5, "min_tasks": 2} - inputs = {"goal": "Understand auth", "codebase_path": "/code"} - - operation = AISplitOperation(config, inputs) - - # Mock AI response - mock_ai_response = { - "reasoning": "Split into login, session, and security", - "tasks": [ - { - "title": "Login Flow", - "query": "Trace user login", - "type": "flow", - "priority": "high", - "estimated_complexity": "moderate" - }, - { - "title": "Session Management", - "query": "Find session handling", - "type": "implementation", - "priority": "high", - "estimated_complexity": "complex" - } - ] - } - - # Mock the AI call - with patch.object(operation, '_call_ai_for_split', new_callable=AsyncMock) as mock_call: - mock_call.return_value = mock_ai_response - - result = await operation.execute() - - assert result["task_count"] == 2 - assert len(result["tasks"]) == 2 - assert result["reasoning"] == "Split into login, session, and security" - assert result["tasks"][0]["title"] == "Login Flow" - assert result["tasks"][0]["index"] == 0 - assert result["metadata"]["goal"] == "Understand auth" - - @pytest.mark.asyncio - async def test_execute_with_too_many_tasks(self): - """Test execution truncates when AI generates too many tasks.""" - config = {"max_tasks": 3, "min_tasks": 2} - inputs = {"goal": "Test"} - - operation = AISplitOperation(config, inputs) - - # Mock AI response with too many tasks - mock_ai_response = { - "reasoning": "Detailed split", - "tasks": [ - {"title": f"Task {i}", "query": f"Q{i}", "type": "question", "priority": "medium"} - for i in range(10) # Generate 10 tasks - ] - } - - with patch.object(operation, '_call_ai_for_split', new_callable=AsyncMock) as mock_call: - mock_call.return_value = mock_ai_response - - result = await operation.execute() - - # Should be truncated to max_tasks - assert result["task_count"] == 3 - assert len(result["tasks"]) == 3 - - @pytest.mark.asyncio - async def test_call_ai_for_split_with_clean_json(self): - """Test AI call with clean JSON response.""" - config = {} - inputs = {"goal": "Test"} - operation = AISplitOperation(config, inputs) - - mock_response = json.dumps({ - "reasoning": "Test reasoning", - "tasks": [ - {"title": "Task 1", "query": "Q1", "type": "question", "priority": "high"} - ] - }) - - with patch('utils.agent.cli_executor.CLIExecutor') as MockExecutor: - mock_executor = MockExecutor.return_value - mock_executor.execute = AsyncMock(return_value=mock_response) - - result = await operation._call_ai_for_split("test prompt", "haiku") - - assert result["reasoning"] == "Test reasoning" - assert len(result["tasks"]) == 1 - - @pytest.mark.asyncio - async def test_call_ai_for_split_with_json_in_text(self): - """Test AI call when JSON is embedded in text.""" - config = {} - inputs = {"goal": "Test"} - operation = AISplitOperation(config, inputs) - - # AI response with explanation before/after JSON - mock_response = """Here's my analysis: - - { - "reasoning": "Embedded JSON", - "tasks": [ - {"title": "Task", "query": "Q", "type": "question", "priority": "high"} - ] - } - - Hope this helps!""" - - with patch('utils.agent.cli_executor.CLIExecutor') as MockExecutor: - mock_executor = MockExecutor.return_value - mock_executor.execute = AsyncMock(return_value=mock_response) - - result = await operation._call_ai_for_split("test prompt", "haiku") - - assert result["reasoning"] == "Embedded JSON" - assert len(result["tasks"]) == 1 - - @pytest.mark.asyncio - async def test_call_ai_for_split_fallback_on_error(self): - """Test fallback split when AI call fails.""" - config = {} - inputs = {"goal": "Test authentication"} - operation = AISplitOperation(config, inputs) - - with patch('utils.agent.cli_executor.CLIExecutor') as MockExecutor: - mock_executor = MockExecutor.return_value - mock_executor.execute = AsyncMock(side_effect=Exception("API error")) - - result = await operation._call_ai_for_split("test prompt", "haiku") - - # Should return fallback split - assert "Fallback split" in result["reasoning"] - assert len(result["tasks"]) == 3 # Default fallback has 3 tasks - assert "Test authentication" in result["tasks"][0]["query"] - - @pytest.mark.asyncio - async def test_call_ai_for_split_fallback_on_invalid_json(self): - """Test fallback when AI returns invalid JSON.""" - config = {} - inputs = {"goal": "Test"} - operation = AISplitOperation(config, inputs) - - # AI returns text without JSON - mock_response = "I cannot parse this request as JSON" - - with patch('utils.agent.cli_executor.CLIExecutor') as MockExecutor: - mock_executor = MockExecutor.return_value - mock_executor.execute = AsyncMock(return_value=mock_response) - - result = await operation._call_ai_for_split("test prompt", "haiku") - - # Should use fallback - assert "Fallback split" in result["reasoning"] - assert len(result["tasks"]) >= 3 - - def test_create_fallback_split(self): - """Test fallback split generation.""" - config = {} - inputs = {"goal": "Understand database operations"} - operation = AISplitOperation(config, inputs) - - result = operation._create_fallback_split("Some error") - - assert "Fallback split" in result["reasoning"] - assert len(result["tasks"]) == 3 - assert "database operations" in result["tasks"][0]["query"] - assert all("title" in task for task in result["tasks"]) - assert all("query" in task for task in result["tasks"]) - assert all("type" in task for task in result["tasks"]) - - def test_parse_ai_response_valid(self): - """Test parsing valid AI response.""" - config = {} - inputs = {"goal": "Test"} - operation = AISplitOperation(config, inputs) - - ai_response = { - "reasoning": "Test", - "tasks": [ - {"title": "T1", "query": "Q1", "type": "question", "priority": "high"}, - {"title": "T2", "query": "Q2", "type": "implementation", "priority": "medium"} - ] - } - - tasks = operation._parse_ai_response(ai_response, min_tasks=2, max_tasks=5) - - assert len(tasks) == 2 - assert tasks[0]["index"] == 0 - assert tasks[1]["index"] == 1 - - @pytest.mark.asyncio - async def test_execute_with_constraints(self): - """Test execution with constraints.""" - config = {"max_tasks": 5} - inputs = { - "goal": "Understand system", - "constraints": "Backend only, focus on security" - } - - operation = AISplitOperation(config, inputs) - - mock_response = { - "reasoning": "Focused on backend security", - "tasks": [ - {"title": "Security", "query": "Security aspects", "type": "question", "priority": "high"} - ] - } - - with patch.object(operation, '_call_ai_for_split', new_callable=AsyncMock) as mock_call: - mock_call.return_value = mock_response - - result = await operation.execute() - - # Verify constraints were included in the call - call_args = mock_call.call_args[0][0] # Get prompt - assert "Backend only, focus on security" in call_args - - -class TestAISplitIntegration: - """Integration tests for AI split with CLIExecutor.""" - - @pytest.mark.asyncio - @pytest.mark.integration - async def test_full_ai_split_execution(self): - """ - Integration test with real CLIExecutor (mocked). - - This tests the full flow including CLIExecutor integration. - """ - config = {"model": "haiku", "max_tasks": 5, "min_tasks": 3} - inputs = { - "goal": "Understand authentication flow in the application", - "codebase_path": "/path/to/code", - "constraints": "Focus on security and user management" - } - - operation = AISplitOperation(config, inputs) - - # Mock CLIExecutor at a higher level - mock_cli_response = json.dumps({ - "reasoning": "Authentication involves login, session, and security. Splitting into focused areas.", - "tasks": [ - { - "title": "User Login Flow", - "query": "Trace the user login process from form submission to session creation", - "type": "flow", - "priority": "high", - "estimated_complexity": "complex" - }, - { - "title": "Session Management", - "query": "Investigate how user sessions are stored and validated", - "type": "implementation", - "priority": "high", - "estimated_complexity": "moderate" - }, - { - "title": "Security Mechanisms", - "query": "Find password hashing, encryption, and security measures", - "type": "structure", - "priority": "high", - "estimated_complexity": "moderate" - }, - { - "title": "User Management", - "query": "Explore user creation, updates, and permission management", - "type": "implementation", - "priority": "medium", - "estimated_complexity": "simple" - } - ] - }) - - with patch('utils.agent.cli_executor.CLIExecutor') as MockExecutor: - mock_executor = MockExecutor.return_value - mock_executor.execute = AsyncMock(return_value=mock_cli_response) - - result = await operation.execute() - - # Verify execution - assert mock_executor.execute.called - prompt = mock_executor.execute.call_args[0][0] - assert "Understand authentication flow" in prompt - assert "security" in prompt - - # Verify results - assert result["task_count"] == 4 - assert len(result["tasks"]) == 4 - assert "Authentication involves" in result["reasoning"] - - # Verify task structure - first_task = result["tasks"][0] - assert first_task["title"] == "User Login Flow" - assert first_task["type"] == "flow" - assert first_task["priority"] == "high" - assert "index" in first_task diff --git a/plugins/automation/tests/test_loop_step.py b/plugins/automation/tests/test_loop_step.py new file mode 100644 index 0000000..f655742 --- /dev/null +++ b/plugins/automation/tests/test_loop_step.py @@ -0,0 +1,307 @@ +"""Tests for Loop Step.""" + +import pytest +from plugins.automation.workflows.engine import WorkflowEngine +from plugins.automation.workflows.definition import WorkflowDefinition + + +class TestLoopStep: + """Tests for loop step functionality.""" + + @pytest.mark.asyncio + async def test_loop_basic(self): + """Test basic loop execution.""" + yaml_str = """ +workflow: + name: test-loop + + inputs: + numbers: + type: array + required: true + + steps: + - id: process_numbers + type: loop + items: "{{ inputs.numbers }}" + item_var: num + steps: + - id: double + type: transform + config: + operation: aggregate + function: sum + inputs: + items: ["{{ num }}", "{{ num }}"] +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + engine = WorkflowEngine() + + result = await engine.execute(workflow, {"numbers": [1, 2, 3]}) + + assert result.status.value == "completed" + assert "process_numbers" in result.step_results + + loop_result = result.step_results["process_numbers"].result + assert loop_result["iterations"] == 3 + assert loop_result["successful"] == 3 + assert loop_result["failed"] == 0 + + @pytest.mark.asyncio + async def test_loop_empty_list(self): + """Test loop with empty list.""" + yaml_str = """ +workflow: + name: test-loop-empty + + inputs: + items: + type: array + required: true + + steps: + - id: process_items + type: loop + items: "{{ inputs.items }}" + item_var: item + steps: + - id: noop + type: transform + config: + operation: aggregate + function: count + inputs: + items: ["{{ item }}"] +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + engine = WorkflowEngine() + + result = await engine.execute(workflow, {"items": []}) + + assert result.status.value == "completed" + loop_result = result.step_results["process_items"].result + assert loop_result["iterations"] == 0 + assert loop_result["successful"] == 0 + + @pytest.mark.asyncio + async def test_loop_with_error_continue(self): + """Test loop with error handling (continue).""" + yaml_str = """ +workflow: + name: test-loop-error + + inputs: + items: + type: array + required: true + + steps: + - id: process_items + type: loop + items: "{{ inputs.items }}" + item_var: item + steps: + - id: divide + type: transform + script: "result = 10 / int(inputs.get('item', 0))" + inputs: + item: "{{ item }}" + on_error: continue +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + engine = WorkflowEngine() + + # Include 0 which will cause division by zero + result = await engine.execute(workflow, {"items": [2, 0, 5]}) + + # Loop should complete despite one failure + assert result.status.value in ["completed", "partial"] + loop_result = result.step_results["process_items"].result + assert loop_result["iterations"] == 3 + # At least 2 should succeed (2 and 5) + assert loop_result["successful"] >= 2 + + @pytest.mark.asyncio + async def test_loop_nested(self): + """Test nested loops.""" + yaml_str = """ +workflow: + name: test-nested-loop + + inputs: + matrix: + type: array + required: true + + steps: + - id: outer_loop + type: loop + items: "{{ inputs.matrix }}" + item_var: row + steps: + - id: inner_loop + type: loop + items: "{{ row }}" + item_var: cell + steps: + - id: process_cell + type: transform + config: + operation: aggregate + function: sum + inputs: + items: ["{{ cell }}"] +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + engine = WorkflowEngine() + + result = await engine.execute(workflow, { + "matrix": [[1, 2], [3, 4], [5, 6]] + }) + + assert result.status.value == "completed" + loop_result = result.step_results["outer_loop"].result + assert loop_result["iterations"] == 3 # 3 rows + assert loop_result["successful"] == 3 + + @pytest.mark.asyncio + async def test_loop_with_dict_items(self): + """Test loop with dictionary items.""" + yaml_str = """ +workflow: + name: test-loop-dict + + inputs: + tasks: + type: array + required: true + + steps: + - id: process_tasks + type: loop + items: "{{ inputs.tasks }}" + item_var: task + steps: + - id: get_items + type: transform + config: + operation: aggregate + function: count + inputs: + items: "{{ task.items }}" +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + engine = WorkflowEngine() + + result = await engine.execute(workflow, { + "tasks": [ + {"items": [1, 2, 3]}, + {"items": [4, 5]}, + {"items": [6, 7, 8, 9]} + ] + }) + + assert result.status.value == "completed" + loop_result = result.step_results["process_tasks"].result + assert loop_result["iterations"] == 3 + assert loop_result["successful"] == 3 + + @pytest.mark.asyncio + async def test_loop_validation_missing_items(self): + """Test loop validation fails without items.""" + yaml_str = """ +workflow: + name: test-loop-invalid + + steps: + - id: bad_loop + type: loop + item_var: item + steps: + - id: noop + type: transform + script: "result = 1" +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + + # Should fail validation + errors = workflow.validate() + assert any("items" in error.lower() for error in errors) + + @pytest.mark.asyncio + async def test_loop_validation_missing_steps(self): + """Test loop validation fails without child steps.""" + yaml_str = """ +workflow: + name: test-loop-invalid + + steps: + - id: bad_loop + type: loop + items: "{{ inputs.data }}" + item_var: item +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + + # Should fail validation + errors = workflow.validate() + assert any("at least one substep" in error.lower() for error in errors) + + @pytest.mark.asyncio + async def test_loop_item_variable_scoping(self): + """Test that loop item variable doesn't leak between iterations.""" + yaml_str = """ +workflow: + name: test-loop-scoping + + inputs: + values: + type: array + required: true + + steps: + - id: first_loop + type: loop + items: "{{ inputs.values }}" + item_var: val + steps: + - id: store_val + type: transform + script: "result = inputs['val'] * 2" + inputs: + val: "{{ val }}" + + - id: second_loop + type: loop + depends_on: [first_loop] + items: "{{ inputs.values }}" + item_var: val + steps: + - id: use_val + type: transform + script: "result = inputs['val'] * 3" + inputs: + val: "{{ val }}" +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + engine = WorkflowEngine() + + result = await engine.execute(workflow, {"values": [1, 2, 3]}) + + assert result.status.value == "completed" + # Both loops should complete successfully + assert result.step_results["first_loop"].result["successful"] == 3 + assert result.step_results["second_loop"].result["successful"] == 3 + + +class TestLoopStepIntegration: + """Integration tests for loop step in complete workflows.""" + pass diff --git a/plugins/automation/tests/test_mapreduce_operations.py b/plugins/automation/tests/test_mapreduce_operations.py deleted file mode 100644 index 91f927e..0000000 --- a/plugins/automation/tests/test_mapreduce_operations.py +++ /dev/null @@ -1,476 +0,0 @@ -"""Tests for map-reduce exploration operations.""" - -import pytest -import tempfile -import json -from pathlib import Path -from unittest.mock import AsyncMock, patch - -from plugins.automation.workflows.steps.operations import ( - SplitOperation, - ExplorationOperation, - SummarizeOperation, -) - - -class TestSplitOperation: - """Test split operation for map-reduce workflows.""" - - def test_split_by_items(self): - """Test splitting one task per item.""" - config = {"operation": "split", "strategy": "by_items"} - inputs = {"items": ["task1", "task2", "task3", "task4"]} - - operation = SplitOperation(config, inputs) - assert operation.validate() is None - - @pytest.mark.asyncio - async def test_split_by_items_execution(self): - """Test split by items execution.""" - config = {"strategy": "by_items"} - inputs = {"items": ["Q1", "Q2", "Q3"]} - - operation = SplitOperation(config, inputs) - result = await operation.execute() - - assert result["task_count"] == 3 - assert len(result["tasks"]) == 3 - assert result["tasks"][0]["item"] == "Q1" - assert result["tasks"][1]["index"] == 1 - assert result["metadata"]["strategy"] == "by_items" - - @pytest.mark.asyncio - async def test_split_by_count(self): - """Test splitting into N chunks.""" - config = {"strategy": "by_count", "count": 2} - inputs = {"items": [1, 2, 3, 4, 5, 6]} - - operation = SplitOperation(config, inputs) - result = await operation.execute() - - assert result["task_count"] <= 2 - # Should have roughly equal distribution - total_items = sum(len(task["items"]) for task in result["tasks"]) - assert total_items == 6 - - @pytest.mark.asyncio - async def test_split_by_chunk_size(self): - """Test splitting by chunk size.""" - config = {"strategy": "by_chunk_size", "chunk_size": 2} - inputs = {"items": [1, 2, 3, 4, 5]} - - operation = SplitOperation(config, inputs) - result = await operation.execute() - - assert result["task_count"] == 3 # 2 + 2 + 1 - assert len(result["tasks"][0]["items"]) == 2 - assert len(result["tasks"][1]["items"]) == 2 - assert len(result["tasks"][2]["items"]) == 1 - - def test_split_validation_errors(self): - """Test split operation validation.""" - # Invalid strategy - op = SplitOperation({"strategy": "invalid"}, {}) - assert "Invalid strategy" in op.validate() - - # Missing count for by_count - op = SplitOperation({"strategy": "by_count"}, {}) - assert "requires 'count'" in op.validate() - - # Missing chunk_size for by_chunk_size - op = SplitOperation({"strategy": "by_chunk_size"}, {}) - assert "requires 'chunk_size'" in op.validate() - - @pytest.mark.asyncio - async def test_split_with_single_item(self): - """Test split with single non-list item.""" - config = {"strategy": "by_items"} - inputs = {"data": "single_task"} # Not in 'items' key - - operation = SplitOperation(config, inputs) - result = await operation.execute() - - assert result["task_count"] == 1 - assert result["tasks"][0]["item"] == "single_task" - - -class TestExplorationOperation: - """Test exploration operation for AI-powered exploration.""" - - def test_exploration_validation(self): - """Test exploration operation validation.""" - # Valid exploration type - op = ExplorationOperation({"exploration_type": "question"}, {}) - assert op.validate() is None - - # Invalid exploration type - op = ExplorationOperation({"exploration_type": "invalid"}, {}) - assert "Invalid exploration_type" in op.validate() - - @pytest.mark.asyncio - async def test_exploration_execution(self): - """Test basic exploration execution.""" - with tempfile.TemporaryDirectory() as tmpdir: - config = { - "exploration_type": "question", - "session_dir": tmpdir, - "save_to_session": True, - } - inputs = { - "task": {"item": "How does auth work?", "index": 0}, - "codebase_path": "/fake/path", - "question": "How does auth work?" - } - - operation = ExplorationOperation(config, inputs) - - # Mock ExploreAgent to avoid real AI calls - with patch('plugins.automation.agents.ExploreAgent') as MockAgent: - mock_agent = MockAgent.return_value - mock_agent.explore = AsyncMock(return_value="Authentication uses JWT tokens...") - - result = await operation.execute() - - assert "finding" in result - assert result["finding"]["exploration_type"] == "question" - assert result["finding"]["status"] == "completed" - assert "JWT tokens" in result["finding"]["result"] - assert result["task_info"]["index"] == 0 - assert result["metadata"]["saved_to_session"] is True - - @pytest.mark.asyncio - async def test_exploration_session_storage(self): - """Test that exploration findings are stored in session files.""" - with tempfile.TemporaryDirectory() as tmpdir: - config = { - "exploration_type": "question", - "session_dir": tmpdir, - "session_id": "test_session", - "save_to_session": True, - } - inputs = { - "task": {"item": "Test question", "index": 5}, - "question": "Test question", - } - - operation = ExplorationOperation(config, inputs) - - # Mock ExploreAgent - with patch('plugins.automation.agents.ExploreAgent') as MockAgent: - mock_agent = MockAgent.return_value - mock_agent.explore = AsyncMock(return_value="Test result") - - result = await operation.execute() - - # Check session file was created - session_file = Path(result["session_file"]) - assert session_file.exists() - - # Verify content - with open(session_file) as f: - data = json.load(f) - assert data["session_id"] == "test_session" - assert data["task_index"] == 5 - assert data["exploration_type"] == "question" - assert "finding" in data - assert data["finding"]["status"] == "completed" - - @pytest.mark.asyncio - async def test_exploration_without_session_storage(self): - """Test exploration without saving to session.""" - config = {"exploration_type": "question", "save_to_session": False} - inputs = {"task": "Simple task", "question": "Simple question"} - - operation = ExplorationOperation(config, inputs) - - # Mock ExploreAgent - with patch('plugins.automation.agents.ExploreAgent') as MockAgent: - mock_agent = MockAgent.return_value - mock_agent.explore = AsyncMock(return_value="Result") - - result = await operation.execute() - - assert result["session_file"] is None - assert result["metadata"]["saved_to_session"] is False - assert result["finding"]["status"] == "completed" - - @pytest.mark.asyncio - async def test_exploration_types(self): - """Test different exploration types call appropriate agent methods.""" - test_cases = [ - ("question", "explore", "What is this?"), - ("implementation", "find_implementation", "auth_function"), - ("structure", "analyze_structure", "auth_module"), - ("usage", "find_usage", "User"), - ("flow", "explain_flow", "login process"), - ] - - for exp_type, method_name, query in test_cases: - config = {"exploration_type": exp_type, "save_to_session": False} - inputs = { - "task": {"item": query, "index": 0}, - "codebase_path": "/code" - } - - # Add type-specific input - if exp_type == "question": - inputs["question"] = query - elif exp_type == "implementation": - inputs["feature"] = query - elif exp_type == "structure": - inputs["component"] = query - elif exp_type == "usage": - inputs["symbol"] = query - elif exp_type == "flow": - inputs["flow"] = query - - operation = ExplorationOperation(config, inputs) - - with patch('plugins.automation.agents.ExploreAgent') as MockAgent: - mock_agent = MockAgent.return_value - mock_method = AsyncMock(return_value=f"Result for {query}") - setattr(mock_agent, method_name, mock_method) - - result = await operation.execute() - - # Verify correct method was called - assert mock_method.called, f"Method {method_name} should have been called for {exp_type}" - assert result["finding"]["status"] == "completed" - assert query in result["finding"]["result"] - - @pytest.mark.asyncio - async def test_exploration_error_handling(self): - """Test exploration handles agent errors gracefully.""" - config = {"exploration_type": "question", "save_to_session": False} - inputs = {"task": "Test", "question": "Test question"} - - operation = ExplorationOperation(config, inputs) - - # Mock ExploreAgent to raise exception - with patch('plugins.automation.agents.ExploreAgent') as MockAgent: - mock_agent = MockAgent.return_value - mock_agent.explore = AsyncMock(side_effect=Exception("API error")) - - result = await operation.execute() - - # Should return failed status, not raise exception - assert result["finding"]["status"] == "failed" - assert "error" in result["finding"] - assert "API error" in result["finding"]["error"] - - -class TestSummarizeOperation: - """Test summarize operation for aggregating findings.""" - - def test_summarize_validation(self): - """Test summarize operation validation.""" - # Valid format - op = SummarizeOperation({"summary_format": "detailed"}, {}) - assert op.validate() is None - - # Invalid format - op = SummarizeOperation({"summary_format": "invalid"}, {}) - assert "Invalid summary_format" in op.validate() - - @pytest.mark.asyncio - async def test_summarize_from_findings(self): - """Test summarizing from finding objects.""" - config = {"summary_format": "detailed", "include_metadata": True} - findings = [ - { - "task": "Question 1", - "exploration_type": "question", - "finding": {"query": "Q1", "result": "Answer 1"}, - "task_index": 0, - }, - { - "task": "Question 2", - "exploration_type": "question", - "finding": {"query": "Q2", "result": "Answer 2"}, - "task_index": 1, - }, - ] - inputs = {"findings": findings} - - operation = SummarizeOperation(config, inputs) - result = await operation.execute() - - assert result["finding_count"] == 2 - assert result["summary"]["total_findings"] == 2 - assert len(result["summary"]["findings"]) == 2 - - @pytest.mark.asyncio - async def test_summarize_from_session_files(self): - """Test summarizing from session files.""" - with tempfile.TemporaryDirectory() as tmpdir: - session_id = "test_session_123" - - # Create mock session files - for i in range(3): - session_file = Path(tmpdir) / f"{session_id}_task_{i}.json" - data = { - "session_id": session_id, - "task_index": i, - "task": f"Task {i}", - "exploration_type": "question", - "finding": {"query": f"Q{i}", "result": f"A{i}"}, - } - with open(session_file, "w") as f: - json.dump(data, f) - - config = { - "summary_format": "structured", - "session_dir": tmpdir, - "session_id": session_id, - } - inputs = {"session_id": session_id} - - operation = SummarizeOperation(config, inputs) - result = await operation.execute() - - assert result["finding_count"] == 3 - assert len(result["session_files_read"]) == 3 - assert result["summary"]["total_findings"] == 3 - - @pytest.mark.asyncio - async def test_summarize_concise_format(self): - """Test concise summary format.""" - config = {"summary_format": "concise", "include_metadata": True} - findings = [ - { - "exploration_type": "question", - "finding": { - "query": "Test query", - "result": "Long result " * 20, - "status": "completed", - }, - } - ] - inputs = {"findings": findings} - - operation = SummarizeOperation(config, inputs) - result = await operation.execute() - - summary = result["summary"] - assert "key_findings" in summary - assert summary["total_findings"] == 1 - # Result should be truncated to 100 chars - assert len(summary["key_findings"][0]["result_preview"]) <= 100 - - @pytest.mark.asyncio - async def test_summarize_structured_format(self): - """Test structured summary format grouped by type.""" - config = {"summary_format": "structured"} - findings = [ - {"exploration_type": "question", "task": "Q1", "finding": {"result": "A1"}}, - {"exploration_type": "question", "task": "Q2", "finding": {"result": "A2"}}, - { - "exploration_type": "implementation", - "task": "F1", - "finding": {"result": "impl"}, - }, - ] - inputs = {"findings": findings} - - operation = SummarizeOperation(config, inputs) - result = await operation.execute() - - summary = result["summary"] - assert "by_exploration_type" in summary - assert summary["by_exploration_type"]["question"]["count"] == 2 - assert summary["by_exploration_type"]["implementation"]["count"] == 1 - - @pytest.mark.asyncio - async def test_summarize_output_file(self): - """Test writing summary to output file.""" - with tempfile.TemporaryDirectory() as tmpdir: - output_file = Path(tmpdir) / "summary.json" - - config = {"summary_format": "detailed", "output_file": str(output_file)} - findings = [{"task": "Task 1", "finding": {"result": "Result 1"}}] - inputs = {"findings": findings} - - operation = SummarizeOperation(config, inputs) - await operation.execute() - - # Verify file was created - assert output_file.exists() - - # Verify content - with open(output_file) as f: - data = json.load(f) - assert data["total_findings"] == 1 - - @pytest.mark.asyncio - async def test_summarize_empty_findings(self): - """Test summarizing with no findings.""" - config = {"summary_format": "detailed"} - inputs = {"findings": []} - - operation = SummarizeOperation(config, inputs) - result = await operation.execute() - - assert result["finding_count"] == 0 - assert "No findings" in result["summary"]["message"] - - -class TestMapReduceIntegration: - """Integration tests for full map-reduce workflow.""" - - @pytest.mark.asyncio - async def test_full_mapreduce_flow(self): - """Test complete split -> explore -> summarize flow.""" - with tempfile.TemporaryDirectory() as tmpdir: - session_id = "integration_test" - - # Step 1: Split - split_config = {"strategy": "by_items"} - split_inputs = {"items": ["Q1", "Q2", "Q3"]} - split_op = SplitOperation(split_config, split_inputs) - split_result = await split_op.execute() - - assert split_result["task_count"] == 3 - - # Step 2: Explore (simulate parallel execution) - # Mock ExploreAgent for all explorations - with patch('plugins.automation.agents.ExploreAgent') as MockAgent: - mock_agent = MockAgent.return_value - mock_agent.explore = AsyncMock(side_effect=[ - "Answer to Q1", - "Answer to Q2", - "Answer to Q3" - ]) - - exploration_results = [] - for task in split_result["tasks"]: - explore_config = { - "exploration_type": "question", - "session_dir": tmpdir, - "session_id": session_id, - "save_to_session": True, - } - explore_inputs = {"task": task, "question": task["item"]} - explore_op = ExplorationOperation(explore_config, explore_inputs) - explore_result = await explore_op.execute() - exploration_results.append(explore_result) - - assert len(exploration_results) == 3 - - # Verify all explorations completed - for result in exploration_results: - assert result["finding"]["status"] == "completed" - - # Step 3: Summarize - summarize_config = { - "summary_format": "structured", - "session_dir": tmpdir, - "session_id": session_id, - } - summarize_inputs = {"session_id": session_id} - summarize_op = SummarizeOperation(summarize_config, summarize_inputs) - summarize_result = await summarize_op.execute() - - # Verify final result - assert summarize_result["finding_count"] == 3 - assert len(summarize_result["session_files_read"]) == 3 - assert summarize_result["summary"]["total_findings"] == 3 diff --git a/plugins/automation/tests/test_runtime_data.py b/plugins/automation/tests/test_runtime_data.py index b84f879..ac7528c 100644 --- a/plugins/automation/tests/test_runtime_data.py +++ b/plugins/automation/tests/test_runtime_data.py @@ -259,6 +259,79 @@ def test_from_dict(self): assert "step1" in context.step_results assert context.outputs == {"final": "result"} + def test_get_array_indexing(self): + """Test getting values with array indexing.""" + context = WorkflowContext() + + step_result = StepResult( + step_id="split", + status=StepStatus.COMPLETED, + result={ + "tasks": [ + {"items": [10, 20], "chunk_index": 0}, + {"items": [30, 40], "chunk_index": 1}, + ] + }, + ) + context.set_step_result("split", step_result) + + # Test array indexing + assert context.get("steps.split.result.tasks[0]") == {"items": [10, 20], "chunk_index": 0} + assert context.get("steps.split.result.tasks[1]") == {"items": [30, 40], "chunk_index": 1} + + # Test array indexing with nested property access + assert context.get("steps.split.result.tasks[0].items") == [10, 20] + assert context.get("steps.split.result.tasks[1].items") == [30, 40] + assert context.get("steps.split.result.tasks[0].chunk_index") == 0 + + def test_get_array_indexing_out_of_bounds(self): + """Test array indexing with out of bounds index.""" + context = WorkflowContext() + + step_result = StepResult( + step_id="split", + status=StepStatus.COMPLETED, + result={"tasks": [{"items": [10, 20]}]}, + ) + context.set_step_result("split", step_result) + + # Out of bounds should return default + assert context.get("steps.split.result.tasks[5].items", "default") == "default" + + def test_get_array_indexing_invalid_index(self): + """Test array indexing with invalid index.""" + context = WorkflowContext() + + step_result = StepResult( + step_id="split", + status=StepStatus.COMPLETED, + result={"tasks": [{"items": [10, 20]}]}, + ) + context.set_step_result("split", step_result) + + # Invalid index should return default + assert context.get("steps.split.result.tasks[abc].items", "default") == "default" + + def test_resolve_template_with_array_indexing(self): + """Test resolving templates with array indexing.""" + context = WorkflowContext() + + step_result = StepResult( + step_id="split", + status=StepStatus.COMPLETED, + result={ + "tasks": [ + {"items": [10, 20]}, + {"items": [30, 40]}, + ] + }, + ) + context.set_step_result("split", step_result) + + # Resolve template with array indexing + assert context.resolve_template("{{ steps.split.result.tasks[0].items }}") == [10, 20] + assert context.resolve_template("{{ steps.split.result.tasks[1].items }}") == [30, 40] + def test_repr(self): """Test string representation.""" context = WorkflowContext( diff --git a/plugins/automation/tests/test_workflow_definition.py b/plugins/automation/tests/test_workflow_definition.py index 40c98a4..daa0122 100644 --- a/plugins/automation/tests/test_workflow_definition.py +++ b/plugins/automation/tests/test_workflow_definition.py @@ -326,6 +326,67 @@ def test_validate_agent_step_missing_operation(self): assert any("operation" in error.lower() and "specify" in error.lower() for error in errors) + def test_validate_transform_step_with_operation(self): + """Test validating transform step with operation (no script required).""" + yaml_str = """ +workflow: + name: "test" + + steps: + - id: step1 + type: transform + config: + operation: aggregate + function: sum + inputs: + items: [1, 2, 3] +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + errors = workflow.validate() + + # Should not have errors - operation is valid alternative to script + assert len(errors) == 0 + + def test_validate_transform_step_with_script(self): + """Test validating transform step with script.""" + yaml_str = """ +workflow: + name: "test" + + steps: + - id: step1 + type: transform + script: "result = inputs['x'] * 2" + inputs: + x: 5 +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + errors = workflow.validate() + + # Should not have errors - script is provided + assert len(errors) == 0 + + def test_validate_transform_step_missing_both(self): + """Test validating transform step without script or operation.""" + yaml_str = """ +workflow: + name: "test" + + steps: + - id: step1 + type: transform + inputs: + x: 5 +""" + + workflow = WorkflowDefinition.from_yaml(yaml_str) + errors = workflow.validate() + + # Should have error - needs either script or operation + assert any("script" in error.lower() or "operation" in error.lower() for error in errors) + def test_get_step(self): """Test getting step by ID.""" yaml_str = """ diff --git a/plugins/automation/workflows/definition.py b/plugins/automation/workflows/definition.py index 7222c1d..4fba016 100644 --- a/plugins/automation/workflows/definition.py +++ b/plugins/automation/workflows/definition.py @@ -329,8 +329,14 @@ def validate(self) -> List[str]: f"Loop step '{step.id}' must have at least one substep" ) elif step.type == "transform": - if not step.script: - errors.append(f"Transform step '{step.id}' must specify 'script'") + # Transform steps must have either 'script' or 'operation' + has_script = bool(step.script) + has_operation = bool(step.config.get("operation")) + if not has_script and not has_operation: + errors.append( + f"Transform step '{step.id}' must specify either 'script' or " + f"'operation' in config" + ) return errors diff --git a/plugins/automation/workflows/engine.py b/plugins/automation/workflows/engine.py index 97e5e40..b9fb278 100644 --- a/plugins/automation/workflows/engine.py +++ b/plugins/automation/workflows/engine.py @@ -10,10 +10,13 @@ from enum import Enum from typing import Any, Dict, List, Optional from dataclasses import dataclass, field +from pathlib import Path from ..runtime_data import WorkflowContext, StepResult, StepStatus from .definition import WorkflowDefinition, StepDefinition -from .steps import BaseStep, AgentStep, TransformStep +from .steps import BaseStep, AgentStep, TransformStep, LoopStep +from utils.session import SessionManager, Session +from utils.session.storage import FileSystemSessionStorage class WorkflowStatus(str, Enum): @@ -65,15 +68,36 @@ class WorkflowEngine: Executes workflows with dependency resolution, error handling, and state management. """ - def __init__(self): - """Initialize workflow engine.""" + def __init__(self, session_manager: Optional[SessionManager] = None): + """ + Initialize workflow engine. + + Args: + session_manager: Optional SessionManager for state persistence. + If not provided, creates a default file-based one. + """ self.logger = logging.getLogger(__name__) + # Initialize session management + if session_manager is None: + # Create default session storage in .mcp/workflows directory + workflows_dir = Path.home() / ".mcp" / "workflows" + history_dir = workflows_dir / ".history" + storage = FileSystemSessionStorage( + sessions_dir=workflows_dir / "sessions", + history_dir=history_dir + ) + self.session_manager = SessionManager(storage) + else: + self.session_manager = session_manager + async def execute( self, workflow: WorkflowDefinition, inputs: Optional[Dict[str, Any]] = None, context: Optional[WorkflowContext] = None, + session_id: Optional[str] = None, + persist: bool = True, ) -> WorkflowExecutionResult: """ Execute a workflow. @@ -82,6 +106,8 @@ async def execute( workflow: Workflow definition inputs: Workflow inputs context: Existing context (for resume) + session_id: Optional session ID for persistence. If not provided, creates new session. + persist: Whether to persist state to session (default: True) Returns: WorkflowExecutionResult with execution outcome @@ -97,6 +123,23 @@ async def execute( # Validate inputs self._validate_inputs(workflow, inputs or {}) + # Session management + session: Optional[Session] = None + if persist: + # Create or get session + if session_id: + session = self.session_manager.get_session(session_id) + if not session: + raise ValueError(f"Session {session_id} not found") + else: + # Create new session + session = self.session_manager.create_session( + purpose=f"Workflow: {workflow.name}", + tags=["workflow", workflow.name], + ) + session_id = session.metadata.session_id + self.logger.info(f"Created session: {session_id}") + # Create or use existing context if context is None: execution_id = str(uuid.uuid4()) @@ -108,6 +151,10 @@ async def execute( else: execution_id = context.execution_id or str(uuid.uuid4()) + # Store session_id in context metadata + if session_id: + context.metadata["session_id"] = session_id + # Create execution result result = WorkflowExecutionResult( workflow_id=workflow.name, @@ -120,9 +167,13 @@ async def execute( f"Starting workflow '{workflow.name}' (execution: {execution_id})" ) + # Save initial state to session + if persist and session: + self._save_to_session(session, workflow, context, result) + try: # Execute steps - await self._execute_steps(workflow, context) + await self._execute_steps(workflow, context, session if persist else None) # Collect outputs result.outputs = context.outputs @@ -136,6 +187,15 @@ async def execute( f"Workflow '{workflow.name}' completed with status: {result.status}" ) + # Final save to session + if persist and session: + self._save_to_session(session, workflow, context, result) + from utils.session.models import SessionStatus + self.session_manager.complete_session( + session_id, + SessionStatus.COMPLETED if result.status == WorkflowStatus.COMPLETED else SessionStatus.FAILED + ) + except Exception as e: self.logger.error(f"Workflow execution failed: {e}", exc_info=True) result.status = WorkflowStatus.FAILED @@ -145,6 +205,12 @@ async def execute( result.step_results = context.step_results result.outputs = context.outputs + # Save error state to session + if persist and session: + self._save_to_session(session, workflow, context, result) + from utils.session.models import SessionStatus + self.session_manager.complete_session(session_id, SessionStatus.FAILED) + return result def _validate_inputs( @@ -165,7 +231,7 @@ def _validate_inputs( raise ValueError(f"Required input '{name}' not provided") async def _execute_steps( - self, workflow: WorkflowDefinition, context: WorkflowContext + self, workflow: WorkflowDefinition, context: WorkflowContext, session: Optional[Session] = None ): """ Execute workflow steps in dependency order. @@ -173,6 +239,7 @@ async def _execute_steps( Args: workflow: Workflow definition context: Execution context + session: Optional session for persistence """ # Create step instances steps = [] @@ -203,6 +270,10 @@ async def _execute_steps( # Store step result in context context.set_step_result(step.step_id, step_result) + # Save intermediate state to session + if session: + self._save_step_to_session(session, step.step_id, step_result) + # Handle step result if step_result.status == StepStatus.COMPLETED: executed.add(step.step_id) @@ -250,10 +321,12 @@ def _create_step(self, step_def: StepDefinition) -> BaseStep: return AgentStep(step_def) elif step_def.type == "transform": return TransformStep(step_def) + elif step_def.type == "loop": + return LoopStep(step_def) else: raise ValueError( f"Step type '{step_def.type}' not yet implemented. " - f"Currently supported types: 'agent', 'transform'" + f"Currently supported types: 'agent', 'transform', 'loop'" ) def _determine_status( @@ -289,3 +362,185 @@ def _determine_status( return WorkflowStatus.PARTIAL else: return WorkflowStatus.COMPLETED + + def _save_to_session( + self, + session: Session, + workflow: WorkflowDefinition, + context: WorkflowContext, + result: WorkflowExecutionResult, + ): + """ + Save workflow state to session. + + Args: + session: Session to save to + workflow: Workflow definition + context: Execution context + result: Execution result + """ + # Store workflow definition as YAML + session.set("workflow_name", workflow.name) + session.set("workflow_version", workflow.version) + + # Store execution state + session.set("execution_id", result.execution_id) + session.set("workflow_status", result.status.value) + session.set("started_at", result.started_at.isoformat() if result.started_at else None) + session.set("completed_at", result.completed_at.isoformat() if result.completed_at else None) + + # Store context + session.set("context", context.to_dict()) + + # Store outputs and error + session.set("outputs", result.outputs) + if result.error: + session.set("error", result.error) + + # Save session + self.session_manager.storage.save_session(session) + + def _save_step_to_session( + self, + session: Session, + step_id: str, + step_result: StepResult, + ): + """ + Save individual step result to session. + + Args: + session: Session to save to + step_id: Step identifier + step_result: Step execution result + """ + # Get existing step results + step_results = session.get("step_results", {}) + + # Add new step result + step_results[step_id] = step_result.to_dict() + + # Save back to session + session.set("step_results", step_results) + session.set("last_completed_step", step_id) + + # Save session + self.session_manager.storage.save_session(session) + + def load_from_session(self, session_id: str) -> Optional[tuple[WorkflowContext, Dict[str, Any]]]: + """ + Load workflow state from session. + + Args: + session_id: Session identifier + + Returns: + Tuple of (WorkflowContext, workflow_metadata) if found, None otherwise + """ + session = self.session_manager.get_session(session_id) + if not session: + return None + + # Load context + context_dict = session.get("context") + if not context_dict: + return None + + context = WorkflowContext.from_dict(context_dict) + + # Load workflow metadata + metadata = { + "workflow_name": session.get("workflow_name"), + "workflow_version": session.get("workflow_version"), + "execution_id": session.get("execution_id"), + "workflow_status": session.get("workflow_status"), + "started_at": session.get("started_at"), + "completed_at": session.get("completed_at"), + "outputs": session.get("outputs", {}), + "error": session.get("error"), + "last_completed_step": session.get("last_completed_step"), + } + + return context, metadata + + async def resume_from_session( + self, + session_id: str, + workflow: WorkflowDefinition, + ) -> WorkflowExecutionResult: + """ + Resume workflow execution from a saved session. + + Args: + session_id: Session identifier to resume from + workflow: Workflow definition (must match the saved workflow) + + Returns: + WorkflowExecutionResult with execution outcome + + Raises: + ValueError: If session not found or workflow doesn't match + """ + loaded = self.load_from_session(session_id) + if not loaded: + raise ValueError(f"Session {session_id} not found or has no workflow state") + + context, metadata = loaded + + # Validate workflow matches + if metadata["workflow_name"] != workflow.name: + raise ValueError( + f"Workflow mismatch: session has '{metadata['workflow_name']}', " + f"but trying to resume with '{workflow.name}'" + ) + + self.logger.info( + f"Resuming workflow '{workflow.name}' from session {session_id} " + f"(last step: {metadata['last_completed_step']})" + ) + + # Resume execution with existing context + return await self.execute( + workflow=workflow, + inputs=context.inputs, + context=context, + session_id=session_id, + persist=True, + ) + + def list_workflow_sessions( + self, + workflow_name: Optional[str] = None, + limit: int = 100, + ) -> List[Dict[str, Any]]: + """ + List workflow sessions with optional filtering. + + Args: + workflow_name: Optional workflow name to filter by + limit: Maximum number of sessions to return + + Returns: + List of session summaries + """ + # Get all workflow sessions + sessions = self.session_manager.list_sessions( + tags=["workflow"] if not workflow_name else ["workflow", workflow_name], + limit=limit, + ) + + summaries = [] + for session in sessions: + summary = { + "session_id": session.metadata.session_id, + "workflow_name": session.get("workflow_name"), + "status": session.metadata.status.value, + "created_at": session.metadata.created_at.isoformat(), + "updated_at": session.metadata.updated_at.isoformat(), + "execution_id": session.get("execution_id"), + "last_completed_step": session.get("last_completed_step"), + "error": session.get("error"), + } + summaries.append(summary) + + return summaries diff --git a/plugins/automation/workflows/examples/ai_exploration_workflow.yaml b/plugins/automation/workflows/examples/ai_exploration_workflow.yaml deleted file mode 100644 index ef600b3..0000000 --- a/plugins/automation/workflows/examples/ai_exploration_workflow.yaml +++ /dev/null @@ -1,169 +0,0 @@ -# AI-Powered Exploration Workflow -# -# This workflow uses AI to intelligently split exploration work: -# 1. AI Split: AI analyzes the goal and decides how to break it down -# 2. Parallel Exploration: Execute AI-generated tasks independently -# 3. AI Summarize: Aggregate all findings into a comprehensive report -# -# The AI makes ALL the decisions about: -# - What aspects to explore -# - How many tasks to create -# - What questions to ask -# - How to organize the work - -workflow: - name: ai_exploration - version: 1.0 - description: Fully AI-driven exploration with intelligent task decomposition - - inputs: - goal: - type: string - required: true - description: High-level exploration goal (e.g., "Understand how authentication works") - - codebase_path: - type: string - required: true - description: Path to the codebase to explore - - constraints: - type: string - required: false - description: Optional constraints (e.g., "focus on backend only") - - session_dir: - type: string - required: false - default: ".mcp_sessions" - - outputs: - summary: - type: object - description: AI-generated comprehensive summary - - ai_reasoning: - type: string - description: AI's reasoning for how it split the work - - steps: - # Step 1: AI decides how to split the exploration - - id: ai_split - type: transform - config: - operation: ai_split - model: haiku # Fast model for task decomposition - max_tasks: 8 # Let AI create up to 8 parallel tasks - min_tasks: 3 # Require at least 3 focused tasks - inputs: - goal: "{{ inputs.goal }}" - codebase_path: "{{ inputs.codebase_path }}" - constraints: "{{ inputs.constraints }}" - outputs: - tasks: result.tasks - reasoning: result.reasoning - - # Step 2: Execute AI-generated explorations - # Note: In production, these would be dynamically generated based on ai_split output - # For now, we show the pattern with fixed task slots - - - id: explore_0 - type: transform - depends_on: [ai_split] - config: - operation: explore - exploration_type: "{{ steps.ai_split.result.tasks[0].type }}" - session_dir: "{{ inputs.session_dir }}" - session_id: "ai_explore_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.ai_split.result.tasks[0] }}" - question: "{{ steps.ai_split.result.tasks[0].query }}" - codebase_path: "{{ inputs.codebase_path }}" - on_error: continue - - - id: explore_1 - type: transform - depends_on: [ai_split] - config: - operation: explore - exploration_type: "{{ steps.ai_split.result.tasks[1].type }}" - session_dir: "{{ inputs.session_dir }}" - session_id: "ai_explore_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.ai_split.result.tasks[1] }}" - question: "{{ steps.ai_split.result.tasks[1].query }}" - codebase_path: "{{ inputs.codebase_path }}" - on_error: continue - - - id: explore_2 - type: transform - depends_on: [ai_split] - config: - operation: explore - exploration_type: "{{ steps.ai_split.result.tasks[2].type }}" - session_dir: "{{ inputs.session_dir }}" - session_id: "ai_explore_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.ai_split.result.tasks[2] }}" - question: "{{ steps.ai_split.result.tasks[2].query }}" - codebase_path: "{{ inputs.codebase_path }}" - on_error: continue - - # Additional task slots (if AI generates more tasks) - - id: explore_3 - type: transform - depends_on: [ai_split] - config: - operation: explore - exploration_type: "{{ steps.ai_split.result.tasks[3].type }}" - session_dir: "{{ inputs.session_dir }}" - session_id: "ai_explore_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.ai_split.result.tasks[3] }}" - question: "{{ steps.ai_split.result.tasks[3].query }}" - codebase_path: "{{ inputs.codebase_path }}" - on_error: continue - - - id: explore_4 - type: transform - depends_on: [ai_split] - config: - operation: explore - exploration_type: "{{ steps.ai_split.result.tasks[4].type }}" - session_dir: "{{ inputs.session_dir }}" - session_id: "ai_explore_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.ai_split.result.tasks[4] }}" - question: "{{ steps.ai_split.result.tasks[4].query }}" - codebase_path: "{{ inputs.codebase_path }}" - on_error: continue - - # Step 3: AI aggregates and summarizes all findings - - id: summarize - type: transform - depends_on: - - explore_0 - - explore_1 - - explore_2 - - explore_3 - - explore_4 - config: - operation: summarize - session_dir: "{{ inputs.session_dir }}" - session_id: "ai_explore_{{ context.execution_id }}" - summary_format: structured - include_metadata: true - output_file: "{{ inputs.session_dir }}/ai_summary_{{ context.execution_id }}.json" - inputs: - session_id: "ai_explore_{{ context.execution_id }}" - outputs: - summary: result.summary - ai_reasoning: "{{ steps.ai_split.result.reasoning }}" - - error_handling: - default: continue # Continue even if some explorations fail diff --git a/plugins/automation/workflows/examples/demo.py b/plugins/automation/workflows/examples/demo.py deleted file mode 100755 index 7fb0627..0000000 --- a/plugins/automation/workflows/examples/demo.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env python3 -""" -Quick demo of AI-powered task decomposition. - -Shows how Claude intelligently breaks down exploration goals. - -Usage: - uv run python -m plugins.automation.workflows.examples.demo -""" - -import asyncio -import sys -from pathlib import Path - -# Add project root to path -project_root = Path(__file__).parent.parent.parent.parent.parent -sys.path.insert(0, str(project_root)) - -from plugins.automation.workflows.steps.operations import AISplitOperation - - -async def main(): - print("\n" + "=" * 80) - print("AI-POWERED TASK DECOMPOSITION DEMO") - print("=" * 80 + "\n") - - # Configure the AI split operation - config = { - "model": "haiku", # Fast model for task decomposition - "max_tasks": 6, # Allow up to 6 parallel tasks - "min_tasks": 3, # Require at least 3 tasks - } - - inputs = { - "goal": "Understand how the MCP workflow system orchestrates AI agents", - "codebase_path": ".", # Current directory (git root) - "focus_areas": ["architecture", "execution flow", "error handling"], - "constraints": "Focus on the core workflow engine and step execution" - } - - print("🎯 Goal:") - print(f" {inputs['goal']}\n") - - print("📍 Focus Areas:") - for area in inputs['focus_areas']: - print(f" • {area}") - print() - - print("⏳ Asking Claude to decompose this into focused exploration tasks...\n") - print("-" * 80 + "\n") - - # Create and execute operation - operation = AISplitOperation(config, inputs) - - # Validate - error = operation.validate() - if error: - print(f"❌ Validation error: {error}") - return - - try: - # Execute AI split - result = await operation.execute() - - # Display results - print("✅ AI Task Decomposition Complete!\n") - print("=" * 80) - print(f"AI REASONING:") - print("=" * 80) - print(result['reasoning']) - print() - - print("=" * 80) - print(f"GENERATED TASKS ({result['task_count']} tasks)") - print("=" * 80 + "\n") - - for task in result['tasks']: - print(f"📋 Task {task['index'] + 1}: {task['title']}") - print(f" Type: {task['type']}") - print(f" Priority: {task.get('priority', 'N/A')}") - print(f" Complexity: {task.get('estimated_complexity', 'N/A')}") - print(f" Query: {task['query']}") - print() - - print("=" * 80) - print("METADATA") - print("=" * 80) - print(f"Goal: {result['metadata']['goal']}") - print(f"Codebase: {result['metadata'].get('codebase_path', 'N/A')}") - print(f"Model: {result['metadata'].get('model', 'haiku')}") - print(f"Max Tasks: {result['metadata'].get('max_tasks', 'N/A')}") - print(f"Min Tasks: {result['metadata'].get('min_tasks', 'N/A')}") - print() - - print("💡 Next Steps:") - print(" • These tasks can now be executed in parallel by ExploreAgent") - print(" • Each task will store findings in session files") - print(" • A summarize operation will aggregate all results") - print(" • See README.md for full workflow examples") - print() - - except Exception as e: - print(f"❌ Error during execution: {e}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/plugins/automation/workflows/examples/demo_ai_research.py b/plugins/automation/workflows/examples/demo_ai_research.py new file mode 100644 index 0000000..da14e4e --- /dev/null +++ b/plugins/automation/workflows/examples/demo_ai_research.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +""" +AI-Powered Research Workflow Demo + +This demo showcases an AI-powered multi-agent research system inspired by +Anthropic's approach: + +1. PLAN: AI analyzes research question and decomposes into subtopics +2. EXPLORE: Multiple agents work in parallel on different subtopics +3. SYNTHESIZE: Aggregate findings into final comprehensive answer + +The AI dynamically decides: +- How many sub-agents to spawn (2-6) +- What specific topic each agent should explore +- How to best parallelize the research + +Usage: + # Uses the internal agent system (claude, codex, or copilot CLI) + uv run python -m plugins.automation.workflows.examples.demo_ai_research +""" + +import asyncio +import sys +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent.parent.parent.parent.parent +sys.path.insert(0, str(project_root)) + +from plugins.automation.workflows.engine import WorkflowEngine +from plugins.automation.workflows.definition import WorkflowDefinition +from plugins.automation.workflows.steps.operations.schemas import ( + get_decompose_result, + get_subtopics, + get_loop_result, + get_exploration_findings, +) + + +async def main(): + print("\n" + "=" * 80) + print("AI-POWERED MULTI-AGENT RESEARCH SYSTEM") + print("=" * 80 + "\n") + + print("This demo showcases Anthropic-style multi-agent research:") + print("1. PLAN: AI decomposes research question into subtopics") + print("2. EXPLORE: Multiple agents work in parallel") + print("3. SYNTHESIZE: Aggregate findings into final answer\n") + + # Create research workflow + workflow_yaml = """ +workflow: + name: ai_research_system + version: 1.0 + description: | + Multi-agent research system with AI-powered task decomposition. + + Flow: + 1. PLAN: AI analyzes question and creates exploration tasks + 2. EXPLORE: Agents work in parallel on subtopics + 3. SYNTHESIZE: Aggregate findings into comprehensive answer + + inputs: + question: + type: string + required: true + description: The question to investigate + + outputs: + final_report: + type: string + description: Synthesized findings + + steps: + # PLAN PHASE: AI decomposes question into subtopics + - id: decompose_question + type: transform + config: + operation: decompose + min_subtopics: 2 + max_subtopics: 5 + cli_type: claude + model: haiku + inputs: + question: "{{ inputs.question }}" + + # EXPLORE PHASE: Multiple agents work in parallel + - id: parallel_exploration + type: loop + depends_on: [decompose_question] + items: "{{ steps.decompose_question.result.subtopics }}" + item_var: subtopic + steps: + - id: explore_subtopic + type: agent + agent: explore + operation: explore + config: + cli_type: claude + model: haiku + inputs: + question: "{{ subtopic.exploration_task }}" + + # SYNTHESIZE PHASE: Aggregate exploration count + - id: synthesize_findings + type: transform + depends_on: [parallel_exploration] + config: + operation: aggregate + function: count + inputs: + items: "{{ steps.parallel_exploration.result.results }}" +""" + + try: + print("📋 Parsing workflow definition...\n") + workflow = WorkflowDefinition.from_yaml(workflow_yaml) + + errors = workflow.validate() + if errors: + print("❌ Validation errors:") + for error in errors: + print(f" • {error}") + return + + print("✅ Workflow validated\n") + + # Example question + question = "How does the MCP workflow system handle dynamic parallelism?" + + print("=" * 80) + print("RESEARCH QUESTION") + print("=" * 80 + "\n") + print(f"❓ {question}\n") + + inputs = {"question": question} + + print("🤖 PHASE 1: AI Planning - Decomposing research question...\n") + + engine = WorkflowEngine() + + # Execute with session persistence enabled + result = await engine.execute(workflow, inputs, persist=True) + + # Show session info + print(f"\n💾 SESSION PERSISTENCE") + print(f" Execution ID: {result.execution_id}") + print(f" Workflow Status: {result.status.value}") + print(f" All intermediate results are stored in: ~/.mcp/workflows/sessions/") + + # List recent workflow sessions + sessions = engine.list_workflow_sessions(workflow_name="ai_research_system", limit=5) + if sessions: + latest_session = sessions[0] + print(f" Latest Session ID: {latest_session['session_id']}") + print(f" Last Step: {latest_session.get('last_completed_step', 'N/A')}") + print() + + # Show decomposition using schema helper functions + if "decompose_question" in result.step_results: + decompose_result = get_decompose_result( + result.step_results["decompose_question"].result + ) + if decompose_result: + subtopics = get_subtopics(decompose_result) + reasoning = decompose_result["reasoning"] + print(f"✅ AI created {len(subtopics)} research subtopics:\n") + + for idx, subtopic in enumerate(subtopics, 1): + title = subtopic["title"] + task = subtopic["exploration_task"] + importance = subtopic["importance"] + print(f" {idx}. {title} [{importance}]") + print(f" → {task}") + + if reasoning: + print(f"\n💭 AI Decomposition Strategy:") + print(f" {reasoning}\n") + + # Show exploration phase using schema helper functions + print("🔍 PHASE 2: Parallel Exploration - Agents working...\n") + + if "parallel_exploration" in result.step_results: + loop_result = get_loop_result( + result.step_results["parallel_exploration"].result + ) + if loop_result: + iterations = loop_result["iterations"] + successful = loop_result["successful"] + print(f"✅ Completed {successful}/{iterations} explorations\n") + + # Show exploration results using helper function + # DEBUG: Let's see what's actually in loop_result + print(f"\n[DEBUG] Loop result keys: {loop_result.keys()}") + print(f"[DEBUG] Number of iterations: {len(loop_result['results'])}") + for i, iteration in enumerate(loop_result['results']): + print(f"[DEBUG] Iteration {i}: status={iteration.get('status')}, step_results keys={list(iteration.get('step_results', {}).keys())}") + for step_id, step_res in iteration.get('step_results', {}).items(): + print(f"[DEBUG] Step {step_id}: type={type(step_res)}, has result={hasattr(step_res, 'result')}") + if hasattr(step_res, 'result'): + print(f"[DEBUG] Result type: {type(step_res.result)}, value preview: {str(step_res.result)[:100]}") + print() + + findings = get_exploration_findings(loop_result) + print(f"📋 Exploration Findings ({len(findings)} found):\n") + + if findings: + for idx, finding in enumerate(findings, 1): + subtopic_title = ( + subtopics[idx - 1]["title"] + if idx <= len(subtopics) + else f"Subtopic {idx}" + ) + + print(f" {idx}. {subtopic_title}") + + # Handle different finding types + if isinstance(finding, str) and finding: + # String result - show preview + finding_preview = ( + finding[:300] + "..." if len(finding) > 300 else finding + ) + print(f" {finding_preview}\n") + elif isinstance(finding, dict): + # Dict result - try to extract meaningful content + # First check common exploration result keys + content = None + if 'answer' in finding: + content = finding['answer'] + elif 'result' in finding: + content = finding['result'] + elif 'finding' in finding: + content = finding['finding'] + elif 'response' in finding: + content = finding['response'] + + if content and isinstance(content, str): + content_preview = content[:300] + "..." if len(content) > 300 else content + print(f" {content_preview}\n") + elif content: + # Non-string content + content_str = str(content) + content_preview = content_str[:300] + "..." if len(content_str) > 300 else content_str + print(f" {content_preview}\n") + else: + # Show structure + print(f" [Dict with keys: {', '.join(finding.keys())}]") + # Show first few items for debugging + import json + dict_preview = json.dumps(finding, indent=2, default=str)[:300] + print(f" {dict_preview}...\n") + else: + print(f" [Result type: {type(finding).__name__}]\n") + else: + print(" [No findings extracted - check schema helper function]\n") + + # Show synthesis + print("📊 PHASE 3: Synthesis - Aggregating findings...\n") + + if "synthesize_findings" in result.step_results: + synth_result = result.step_results["synthesize_findings"].result + # Extract count from aggregate result + if isinstance(synth_result, dict): + count = synth_result.get("result", synth_result.get("item_count", 0)) + else: + count = synth_result + print(f"✅ Research Complete - Aggregated {count} exploration results!\n") + + # Show session persistence summary + print("=" * 80) + print("SESSION PERSISTENCE SUMMARY") + print("=" * 80 + "\n") + + if sessions: + latest = sessions[0] + print(f"📁 Session Details:") + print(f" ID: {latest['session_id']}") + print(f" Status: {latest['status']}") + print(f" Created: {latest['created_at']}") + print(f" Last Updated: {latest['updated_at']}\n") + + print("💡 You can access persisted data:") + print(f" • All step results are saved") + print(f" • Agent exploration findings are preserved") + print(f" • Context and outputs are recoverable") + print(f" • Session can be resumed or inspected later\n") + + # Show how to load session data + print("🔍 To inspect session data programmatically:") + print(f" ```python") + print(f" engine = WorkflowEngine()") + print(f" context, metadata = engine.load_from_session('{latest['session_id']}')") + print(f" print(metadata['last_completed_step'])") + print(f" print(context.step_results.keys())") + print(f" ```\n") + + print() + + except Exception as e: + print(f"❌ Error: {e}") + import traceback + + traceback.print_exc() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/plugins/automation/workflows/examples/exploration_mapreduce.yaml b/plugins/automation/workflows/examples/exploration_mapreduce.yaml deleted file mode 100644 index cdddda8..0000000 --- a/plugins/automation/workflows/examples/exploration_mapreduce.yaml +++ /dev/null @@ -1,154 +0,0 @@ -# Exploration Map-Reduce Workflow -# -# This workflow demonstrates the map-reduce pattern for AI-powered codebase exploration: -# 1. Split: Divide exploration tasks into smaller chunks -# 2. Map: Run parallel AI explorations, storing findings in session files -# 3. Reduce: Aggregate and summarize all findings -# -# Use case: Explore multiple aspects of a codebase in parallel and get a comprehensive summary - -workflow: - name: exploration_mapreduce - version: 1.0 - description: Map-reduce workflow for parallel AI exploration with session storage - - inputs: - codebase_path: - type: string - required: true - description: Path to the codebase to explore - - exploration_questions: - type: array - required: true - description: List of questions to explore about the codebase - default: - - "How does authentication work in this codebase?" - - "What are the main API endpoints?" - - "How is error handling implemented?" - - "What database operations are performed?" - - session_dir: - type: string - required: false - default: ".mcp_sessions" - description: Directory to store exploration session files - - summary_format: - type: string - required: false - default: "detailed" - description: Format for final summary (detailed, concise, structured) - - outputs: - summary: - type: object - description: Aggregated summary of all explorations - - session_files: - type: array - description: List of session files created during exploration - - steps: - # Step 1: Split - Divide questions into individual exploration tasks - - id: split_tasks - type: transform - config: - operation: split - strategy: by_items # One task per question - inputs: - items: "{{ inputs.exploration_questions }}" - outputs: - tasks: result.tasks - - # Step 2: Map - Explore each question in parallel (simulated) - # Note: In a real implementation, you would use a loop or parallel step - # For now, we'll show the pattern with individual steps - - - id: explore_task_0 - type: transform - depends_on: - - split_tasks - config: - operation: explore - exploration_type: question - session_dir: "{{ inputs.session_dir }}" - session_id: "exploration_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.split_tasks.result.tasks[0] }}" - codebase_path: "{{ inputs.codebase_path }}" - outputs: - finding_0: result - - - id: explore_task_1 - type: transform - depends_on: - - split_tasks - config: - operation: explore - exploration_type: question - session_dir: "{{ inputs.session_dir }}" - session_id: "exploration_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.split_tasks.result.tasks[1] }}" - codebase_path: "{{ inputs.codebase_path }}" - outputs: - finding_1: result - - - id: explore_task_2 - type: transform - depends_on: - - split_tasks - config: - operation: explore - exploration_type: question - session_dir: "{{ inputs.session_dir }}" - session_id: "exploration_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.split_tasks.result.tasks[2] }}" - codebase_path: "{{ inputs.codebase_path }}" - outputs: - finding_2: result - - - id: explore_task_3 - type: transform - depends_on: - - split_tasks - config: - operation: explore - exploration_type: question - session_dir: "{{ inputs.session_dir }}" - session_id: "exploration_{{ context.execution_id }}" - save_to_session: true - inputs: - task: "{{ steps.split_tasks.result.tasks[3] }}" - codebase_path: "{{ inputs.codebase_path }}" - outputs: - finding_3: result - - # Step 3: Reduce - Aggregate all findings into a comprehensive summary - - id: summarize_findings - type: transform - depends_on: - - explore_task_0 - - explore_task_1 - - explore_task_2 - - explore_task_3 - config: - operation: summarize - session_dir: "{{ inputs.session_dir }}" - session_id: "exploration_{{ context.execution_id }}" - summary_format: "{{ inputs.summary_format }}" - include_metadata: true - output_file: "{{ inputs.session_dir }}/exploration_{{ context.execution_id }}_summary.json" - inputs: - session_id: "exploration_{{ context.execution_id }}" - outputs: - summary: result.summary - session_files: result.session_files_read - - error_handling: - default: continue # Continue with other explorations even if one fails diff --git a/plugins/automation/workflows/examples/feature_exploration.yaml b/plugins/automation/workflows/examples/feature_exploration.yaml deleted file mode 100644 index a625646..0000000 --- a/plugins/automation/workflows/examples/feature_exploration.yaml +++ /dev/null @@ -1,71 +0,0 @@ -workflow: - name: "feature-exploration" - version: "1.0" - description: "Comprehensive exploration of a specific feature implementation" - - inputs: - feature_name: - type: string - required: true - description: "Name of the feature to explore" - - codebase_path: - type: string - required: true - description: "Path to the codebase root directory" - - outputs: - exploration_report: - type: object - description: "Complete feature exploration report" - - steps: - # Step 1: Find the implementation - - id: find_implementation - type: agent - agent: explore - operation: find_implementation - inputs: - feature_or_function: "{{ inputs.feature_name }}" - codebase_path: "{{ inputs.codebase_path }}" - outputs: - implementation_details: result - config: - cli_type: claude - model: haiku - - # Step 2: Find where it's used - - id: find_usage - type: agent - agent: explore - operation: find_usage - inputs: - symbol: "{{ inputs.feature_name }}" - codebase_path: "{{ inputs.codebase_path }}" - depends_on: [find_implementation] - outputs: - usage_locations: result - config: - cli_type: claude - model: haiku - - # Step 3: Explain the flow - - id: explain_flow - type: agent - agent: explore - operation: explain_flow - inputs: - flow_description: "{{ inputs.feature_name }} implementation and usage" - codebase_path: "{{ inputs.codebase_path }}" - depends_on: [find_implementation, find_usage] - outputs: - flow_explanation: result - config: - cli_type: claude - model: haiku - - error_handling: - on_failure: stop - retry_policy: - max_attempts: 2 - backoff: exponential diff --git a/plugins/automation/workflows/examples/model_comparison_mapreduce.yaml b/plugins/automation/workflows/examples/model_comparison_mapreduce.yaml deleted file mode 100644 index 6285884..0000000 --- a/plugins/automation/workflows/examples/model_comparison_mapreduce.yaml +++ /dev/null @@ -1,121 +0,0 @@ -workflow: - name: "model_comparison_mapreduce" - version: "1.0.0" - description: | - Map-Reduce workflow for comparing AI model outputs. - Map phase: Run the same prompt with different AI models - Reduce phase: Aggregate and verify if models return similar results - - inputs: - prompt: - type: string - required: true - description: "The prompt to send to all AI models" - - models: - type: list - required: false - default: ["sonnet", "haiku"] - description: "List of AI models to compare (sonnet, haiku, opus)" - - similarity_threshold: - type: float - required: false - default: 0.75 - description: "Minimum similarity score (0-1) for results to be considered similar" - - outputs: - all_results: - type: dict - description: "Results from all models in the map phase" - - comparison_result: - type: dict - description: "Aggregated comparison analysis including similarity scores and consensus" - - models_agree: - type: boolean - description: "Whether all models produced similar results" - - steps: - # MAP PHASE: Execute same prompt with different models - - id: map_model_1 - type: agent - agent: explore - operation: explore - config: - model: "{{ inputs.models[0] | default('sonnet') }}" - cli_type: "claude" - inputs: - question: "{{ inputs.prompt }}" - retry: - max_attempts: 2 - backoff: exponential - on_error: stop - timeout: 300 - - - id: map_model_2 - type: agent - agent: explore - operation: explore - config: - model: "{{ inputs.models[1] | default('haiku') }}" - cli_type: "claude" - inputs: - question: "{{ inputs.prompt }}" - retry: - max_attempts: 2 - backoff: exponential - on_error: stop - timeout: 300 - - # Optional third model - - id: map_model_3 - type: agent - agent: explore - operation: explore - config: - model: "{{ inputs.models[2] | default('haiku') }}" - cli_type: "claude" - inputs: - question: "{{ inputs.prompt }}" - retry: - max_attempts: 2 - backoff: exponential - on_error: continue - timeout: 300 - - # REDUCE PHASE: Aggregate and compare results - - id: reduce_aggregate - type: transform - depends_on: - - map_model_1 - - map_model_2 - - map_model_3 - inputs: - model_1_result: "{{ steps.map_model_1.result }}" - model_2_result: "{{ steps.map_model_2.result }}" - model_3_result: "{{ steps.map_model_3.result }}" - prompt: "{{ inputs.prompt }}" - threshold: "{{ inputs.similarity_threshold }}" - config: - operation: "compare_results" - on_error: stop - - # Final verification and consensus building - - id: verify_consensus - type: transform - depends_on: - - reduce_aggregate - inputs: - comparison: "{{ steps.reduce_aggregate.result }}" - threshold: "{{ inputs.similarity_threshold }}" - config: - operation: "verify_consensus" - outputs: - all_results: - model_1: "{{ steps.map_model_1.result }}" - model_2: "{{ steps.map_model_2.result }}" - model_3: "{{ steps.map_model_3.result }}" - comparison_result: "{{ steps.reduce_aggregate.result }}" - models_agree: "{{ steps.reduce_aggregate.result.all_similar }}" diff --git a/plugins/automation/workflows/steps/__init__.py b/plugins/automation/workflows/steps/__init__.py index a95f6db..f09b69e 100644 --- a/plugins/automation/workflows/steps/__init__.py +++ b/plugins/automation/workflows/steps/__init__.py @@ -7,9 +7,11 @@ from .base import BaseStep from .agent_step import AgentStep from .transform_step import TransformStep +from .loop import LoopStep __all__ = [ "BaseStep", "AgentStep", "TransformStep", + "LoopStep", ] diff --git a/plugins/automation/workflows/steps/loop.py b/plugins/automation/workflows/steps/loop.py new file mode 100644 index 0000000..702b401 --- /dev/null +++ b/plugins/automation/workflows/steps/loop.py @@ -0,0 +1,252 @@ +""" +Loop Step + +Execute steps for each item in a collection. +""" + +import logging +from typing import Any, Dict, List +from datetime import datetime + +from ...runtime_data import WorkflowContext, StepResult, StepStatus +from ..definition import StepDefinition +from .base import BaseStep + + +class LoopStep(BaseStep): + """ + Loop step that executes child steps for each item in a collection. + + The loop step iterates over items and executes its child steps for each item, + making the current item available in the context via the item_var. + + Example YAML: + - id: process_chunks + type: loop + items: "{{ steps.split.result.tasks }}" + item_var: chunk + steps: + - id: sum_chunk + type: transform + config: + operation: aggregate + function: sum + inputs: + items: "{{ chunk.items }}" + """ + + def __init__(self, definition: StepDefinition): + """ + Initialize loop step. + + Args: + definition: Step definition with loop configuration + """ + super().__init__(definition) + self.logger = logging.getLogger(__name__) + + async def execute(self, context: WorkflowContext) -> StepResult: + """ + Execute loop step. + + Args: + context: Workflow execution context + + Returns: + StepResult with loop execution results + """ + started_at = datetime.utcnow() + + # Resolve items to iterate over + items_expr = self.definition.items + if not items_expr: + return StepResult( + step_id=self.step_id, + status=StepStatus.FAILED, + error="Loop step requires 'items' expression", + started_at=started_at, + completed_at=datetime.utcnow(), + ) + + # Resolve items from context + items = context.resolve_template(items_expr) + + if not isinstance(items, list): + return StepResult( + step_id=self.step_id, + status=StepStatus.FAILED, + error=f"Loop items must be a list, got {type(items)}", + started_at=started_at, + completed_at=datetime.utcnow(), + ) + + self.logger.info( + f"Loop step '{self.step_id}' iterating over {len(items)} items" + ) + + # Get item variable name (default: "item") + item_var = self.definition.item_var or "item" + + # Get child steps + if not self.definition.loop_steps: + return StepResult( + step_id=self.step_id, + status=StepStatus.FAILED, + error="Loop step requires child steps", + started_at=started_at, + completed_at=datetime.utcnow(), + ) + + # Execute loop iterations + iteration_results = [] + failed_iterations = 0 + + for index, item in enumerate(items): + self.logger.debug(f"Loop iteration {index + 1}/{len(items)}") + + # Create iteration context by injecting item variable + # We need to inject at the workflow-level context so templates can resolve it + # Store original value if it exists + original_value = None + had_original = False + + # Check if item_var was in inputs + if item_var in context.inputs: + original_value = context.inputs[item_var] + had_original = True + + # Inject current item into context inputs + # This makes it available via {{ item_var }} templates + context.inputs[item_var] = item + + # Also store in a temporary step result so it's accessible via steps.X pattern + # Create a pseudo step result for the iteration variable + temp_step_id = f"_loop_var_{item_var}" + if temp_step_id in context.step_results: + original_step_result = context.step_results[temp_step_id] + else: + original_step_result = None + + context.step_results[temp_step_id] = StepResult( + step_id=temp_step_id, + status=StepStatus.COMPLETED, + result=item + ) + + try: + # Execute child steps for this iteration + iteration_result = await self._execute_iteration( + index, item, context, item_var + ) + iteration_results.append(iteration_result) + + if iteration_result.get("status") == "failed": + failed_iterations += 1 + + finally: + # Restore original values + if had_original: + context.inputs[item_var] = original_value + else: + context.inputs.pop(item_var, None) + + if original_step_result is not None: + context.step_results[temp_step_id] = original_step_result + else: + context.step_results.pop(temp_step_id, None) + + # Aggregate results + completed_at = datetime.utcnow() + success_count = len(iteration_results) - failed_iterations + + return StepResult( + step_id=self.step_id, + status=StepStatus.COMPLETED if failed_iterations == 0 else StepStatus.PARTIAL, + result={ + "iterations": len(items), + "successful": success_count, + "failed": failed_iterations, + "results": iteration_results, + }, + started_at=started_at, + completed_at=completed_at, + ) + + async def _execute_iteration( + self, + index: int, + item: Any, + context: WorkflowContext, + item_var: str, + ) -> Dict[str, Any]: + """ + Execute child steps for one iteration. + + Args: + index: Iteration index + item: Current item + context: Workflow context + item_var: Variable name for current item + + Returns: + Dictionary with iteration results + """ + from ..engine import WorkflowEngine + + iteration_results = {} + iteration_status = "completed" + + # Import step creation method + engine = WorkflowEngine() + + # Execute each child step + for child_def in self.definition.loop_steps: + # Create unique step ID for this iteration + iteration_step_id = f"{self.step_id}.{index}.{child_def.id}" + + self.logger.debug(f"Executing loop child step '{iteration_step_id}'") + + try: + # Create step instance + child_step = engine._create_step(child_def) + + # Override step ID to make it unique per iteration + child_step.step_id = iteration_step_id + child_step.definition.id = iteration_step_id + + # Execute child step + step_result = await child_step.execute_with_retry(context) + + # Store result + iteration_results[child_def.id] = { + "status": step_result.status.value, + "result": step_result.result, + "error": step_result.error, + } + + if step_result.status == StepStatus.FAILED: + iteration_status = "failed" + # Check error handling + if child_def.on_error == "stop": + break + + except Exception as e: + self.logger.error( + f"Error executing child step '{child_def.id}': {e}", + exc_info=True, + ) + iteration_results[child_def.id] = { + "status": "failed", + "error": str(e), + } + iteration_status = "failed" + + if child_def.on_error == "stop": + break + + return { + "index": index, + "item": item, + "status": iteration_status, + "step_results": iteration_results, + } diff --git a/plugins/automation/workflows/steps/operations/__init__.py b/plugins/automation/workflows/steps/operations/__init__.py index 59c8947..1545d35 100644 --- a/plugins/automation/workflows/steps/operations/__init__.py +++ b/plugins/automation/workflows/steps/operations/__init__.py @@ -9,10 +9,9 @@ from .aggregation import AggregateOperation from .filtering import FilterOperation from .mapping import MapOperation -from .split import SplitOperation -from .ai_split import AISplitOperation from .exploration import ExplorationOperation from .summarize import SummarizeOperation +from .decompose import DecomposeOperation # Global operation registry registry = OperationRegistry() @@ -24,11 +23,10 @@ registry.register("filter", FilterOperation) registry.register("map", MapOperation) -# Register map-reduce exploration operations -registry.register("split", SplitOperation) # Deterministic split -registry.register("ai_split", AISplitOperation) # AI-powered intelligent split +# Register AI exploration operations registry.register("explore", ExplorationOperation) registry.register("summarize", SummarizeOperation) +registry.register("decompose", DecomposeOperation) # AI-powered task decomposition __all__ = [ "BaseOperation", diff --git a/plugins/automation/workflows/steps/operations/ai_split.py b/plugins/automation/workflows/steps/operations/ai_split.py deleted file mode 100644 index bb1cfb5..0000000 --- a/plugins/automation/workflows/steps/operations/ai_split.py +++ /dev/null @@ -1,301 +0,0 @@ -"""AI-powered split operation for intelligent task decomposition.""" - -import json -import logging -from typing import Any, Dict, Optional, List - -from .base import BaseOperation - - -class AISplitOperation(BaseOperation): - """ - AI-powered task splitting operation. - - Uses an AI model to intelligently decompose a high-level exploration goal into - smaller, focused sub-tasks. The AI decides what aspects to explore, how to - organize the work, and how many parallel tasks to create. - - Unlike the deterministic split operation, this uses AI reasoning to: - - Understand the exploration goal - - Identify key areas to investigate - - Break down into logical, focused sub-tasks - - Determine optimal task granularity - - Config: - - model: AI model to use for splitting (default: haiku for speed) - - max_tasks: Maximum number of tasks to generate (default: 10) - - min_tasks: Minimum number of tasks to generate (default: 2) - - context: Additional context for the AI to consider - - Inputs: - - goal: High-level exploration goal or question - - codebase_path: Path to codebase being explored (optional, for context) - - constraints: Optional constraints (e.g., "focus on security", "prioritize performance") - - Returns: - - tasks: List of AI-generated exploration tasks - - task_count: Number of tasks created - - reasoning: AI's reasoning for the split - - metadata: Split metadata - - Example: - Input goal: "Understand how the authentication system works" - - AI-generated tasks: - 1. "Explore user login flow and session management" - 2. "Investigate password hashing and credential storage" - 3. "Analyze JWT token generation and validation" - 4. "Find OAuth integration and third-party auth providers" - 5. "Review authentication middleware and route protection" - """ - - def __init__(self, config: Dict[str, Any], inputs: Dict[str, Any]): - """Initialize AI split operation.""" - super().__init__(config, inputs) - self.logger = logging.getLogger(__name__) - - def validate(self) -> Optional[str]: - """Validate AI split configuration.""" - if "goal" not in self.inputs: - return "AI split operation requires 'goal' input" - - max_tasks = self.config.get("max_tasks", 10) - min_tasks = self.config.get("min_tasks", 2) - - if max_tasks < min_tasks: - return f"max_tasks ({max_tasks}) must be >= min_tasks ({min_tasks})" - - return None - - async def execute(self) -> Dict[str, Any]: - """Execute AI-powered task splitting.""" - goal = self.inputs.get("goal") - codebase_path = self.inputs.get("codebase_path") - constraints = self.inputs.get("constraints") - - # Get config - model = self.config.get("model", "haiku") - max_tasks = self.config.get("max_tasks", 10) - min_tasks = self.config.get("min_tasks", 2) - context = self.config.get("context", "") - - # Build prompt for AI - prompt = self._build_split_prompt( - goal=goal, - codebase_path=codebase_path, - constraints=constraints, - context=context, - min_tasks=min_tasks, - max_tasks=max_tasks - ) - - # Call AI to generate split - # TODO: Integrate with actual AI executor - # For now, return a structured placeholder that shows the pattern - ai_response = await self._call_ai_for_split(prompt, model) - - # Parse AI response into tasks - tasks = self._parse_ai_response(ai_response, min_tasks, max_tasks) - - return { - "tasks": tasks, - "task_count": len(tasks), - "reasoning": ai_response.get("reasoning", ""), - "metadata": { - "goal": goal, - "model_used": model, - "min_tasks": min_tasks, - "max_tasks": max_tasks, - "ai_reasoning": ai_response.get("reasoning", ""), - }, - } - - def _build_split_prompt( - self, - goal: str, - codebase_path: Optional[str], - constraints: Optional[str], - context: str, - min_tasks: int, - max_tasks: int - ) -> str: - """ - Build prompt for AI to generate task split. - - Args: - goal: High-level exploration goal - codebase_path: Path to codebase - constraints: Additional constraints - context: Extra context - min_tasks: Minimum tasks to generate - max_tasks: Maximum tasks to generate - - Returns: - Formatted prompt for AI - """ - prompt = f"""You are an expert at breaking down complex codebase exploration goals into focused, parallel tasks. - -**Goal:** {goal} -""" - - if codebase_path: - prompt += f"\n**Codebase:** {codebase_path}" - - if constraints: - prompt += f"\n**Constraints:** {constraints}" - - if context: - prompt += f"\n**Additional Context:** {context}" - - prompt += f""" - -Your task is to break this down into {min_tasks}-{max_tasks} focused exploration sub-tasks that can be executed in parallel. - -**Guidelines:** -1. Each task should be specific and actionable -2. Tasks should be independent and non-overlapping -3. Together, tasks should comprehensively cover the goal -4. Consider different aspects: implementation, usage, architecture, flow, edge cases -5. Use clear, specific questions or investigation targets - -**Output Format (JSON):** -{{ - "reasoning": "Brief explanation of your split strategy", - "tasks": [ - {{ - "title": "Short descriptive title", - "query": "Specific exploration question or target", - "type": "question|implementation|structure|usage|flow", - "priority": "high|medium|low", - "estimated_complexity": "simple|moderate|complex" - }}, - ... - ] -}} - -Generate the split now:""" - - return prompt - - async def _call_ai_for_split(self, prompt: str, model: str) -> Dict[str, Any]: - """ - Call AI model to generate task split. - - Args: - prompt: Prompt for AI - model: Model to use - - Returns: - AI response with reasoning and tasks - """ - from utils.agent import SpecializedAgent, AgentConfig, CLIType - from utils.agent.cli_executor import CLIExecutor, CLIConfig - - # Create a simple agent config for the split task - cli_config = CLIConfig( - cli_type=CLIType.CLAUDE, - model=model, - skip_permissions=True - ) - - executor = CLIExecutor(cli_config) - - try: - # Execute the prompt with the AI - response = await executor.execute(prompt) - - # Try to parse as JSON - # The AI should return JSON, but handle cases where it adds explanation - response_text = response.strip() - - # Find JSON in response (AI might add explanation before/after) - json_start = response_text.find('{') - json_end = response_text.rfind('}') + 1 - - if json_start >= 0 and json_end > json_start: - json_text = response_text[json_start:json_end] - return json.loads(json_text) - else: - # If no JSON found, create a fallback response - self.logger.warning("AI response didn't contain valid JSON, using fallback") - return self._create_fallback_split(response_text) - - except Exception as e: - self.logger.error(f"Error calling AI for split: {e}", exc_info=True) - # Return fallback split on error - return self._create_fallback_split(str(e)) - - def _create_fallback_split(self, error_context: str) -> Dict[str, Any]: - """ - Create a fallback split when AI call fails. - - Args: - error_context: Error message or context - - Returns: - Fallback split response - """ - goal = self.inputs.get("goal", "unknown goal") - - return { - "reasoning": f"Fallback split due to error. Creating basic exploration tasks for: {goal}", - "tasks": [ - { - "title": "Overview and Architecture", - "query": f"Provide an overview of {goal}", - "type": "structure", - "priority": "high", - "estimated_complexity": "moderate" - }, - { - "title": "Implementation Details", - "query": f"Explore the implementation of {goal}", - "type": "implementation", - "priority": "high", - "estimated_complexity": "complex" - }, - { - "title": "Usage and Examples", - "query": f"Find how {goal} is used in the codebase", - "type": "usage", - "priority": "medium", - "estimated_complexity": "simple" - } - ] - } - - def _parse_ai_response( - self, - ai_response: Dict[str, Any], - min_tasks: int, - max_tasks: int - ) -> List[Dict[str, Any]]: - """ - Parse and validate AI response. - - Args: - ai_response: Response from AI - min_tasks: Minimum tasks required - max_tasks: Maximum tasks allowed - - Returns: - List of validated tasks - """ - tasks = ai_response.get("tasks", []) - - # Validate task count - if len(tasks) < min_tasks: - self.logger.warning( - f"AI generated {len(tasks)} tasks, less than minimum {min_tasks}" - ) - elif len(tasks) > max_tasks: - self.logger.warning( - f"AI generated {len(tasks)} tasks, truncating to {max_tasks}" - ) - tasks = tasks[:max_tasks] - - # Enrich tasks with index - for i, task in enumerate(tasks): - task["index"] = i - - return tasks diff --git a/plugins/automation/workflows/steps/operations/decompose.py b/plugins/automation/workflows/steps/operations/decompose.py new file mode 100644 index 0000000..03c2d6f --- /dev/null +++ b/plugins/automation/workflows/steps/operations/decompose.py @@ -0,0 +1,253 @@ +"""Decompose Operation - AI-powered task decomposition.""" + +import json +import logging +import re +from typing import Any, Dict, Optional, List + +from .base import BaseOperation +from utils.agent.agent import SpecializedAgent, AgentConfig +from utils.agent.cli_executor import CLIType + + +class DecomposeOperation(BaseOperation): + """ + AI-powered task decomposition. + + Uses the internal agent system to analyze a question or task and decompose it into + multiple subtopics that can be explored in parallel by agents. + + Inspired by Anthropic's multi-agent research system approach. + + Config: + - min_subtopics: Minimum number of subtopics (default: 2) + - max_subtopics: Maximum number of subtopics (default: 6) + - cli_type: CLI type to use - "claude", "codex", or "copilot" (default: "copilot") + - model: Optional AI model to use (depends on CLI type) + + Inputs: + - question: The main question or task to decompose + + Returns: + - subtopics: List of subtopics with exploration tasks + - subtopic_count: Number of subtopics created + - reasoning: AI's reasoning for the decomposition + - metadata: Additional information about decomposition + + Example: + config: + operation: decompose + min_subtopics: 2 + max_subtopics: 5 + cli_type: copilot + inputs: + question: "How does the MCP workflow system work?" + # Result: 3-5 subtopics like: + # - "Workflow definition and YAML structure" + # - "Step execution and dependency resolution" + # - "Agent integration and tool registry" + """ + + def __init__(self, config: Dict[str, Any], inputs: Dict[str, Any]): + """Initialize decompose operation.""" + super().__init__(config, inputs) + self.logger = logging.getLogger(__name__) + + def validate(self) -> Optional[str]: + """Validate configuration.""" + if "question" not in self.inputs: + return "decompose operation requires 'question' input" + return None + + async def execute(self) -> Dict[str, Any]: + """ + Execute task decomposition using AI. + + Returns: + Dictionary with subtopics and metadata + """ + try: + question = self.inputs.get("question", "") + min_subtopics = self.config.get("min_subtopics", 2) + max_subtopics = self.config.get("max_subtopics", 6) + model = self.config.get("model") + cli_type = self.config.get("cli_type", "copilot") + + self.logger.info("=" * 60) + self.logger.info("Starting task decomposition operation") + self.logger.info(f"Question: {question[:100]}..." if len(question) > 100 else f"Question: {question}") + self.logger.info(f"Config: cli_type={cli_type}, model={model}, subtopics={min_subtopics}-{max_subtopics}") + self.logger.info("=" * 60) + + # Create a specialized agent for task decomposition + self.logger.debug("Creating specialized decompose agent") + agent_config = AgentConfig( + cli_type=CLIType(cli_type), + model=model, + session_id="decompose_operation", + skip_permissions=True, + ) + + # Create anonymous agent class inline + class DecomposeAgent(SpecializedAgent): + def get_system_prompt(self) -> str: + return """You are an expert task planner specialized in decomposing complex questions into parallelizable subtasks. + +Your role is to analyze questions and break them down into focused, independently explorable subtopics that can be investigated in parallel by different agents.""" + + agent = DecomposeAgent(agent_config) + self.logger.debug("Agent created successfully") + + # Construct prompt for AI decomposition + prompt = f"""Analyze the following question and decompose it into specific subtopics that can be explored in parallel. + +Question: +{question} + +Guidelines: +- Create between {min_subtopics} and {max_subtopics} subtopics +- Each subtopic should be focused and independently explorable +- Subtopics should cover different aspects of the main question +- Avoid overlapping or redundant subtopics +- Order subtopics by importance/logical progression + +Respond with a JSON object: +{{ + "reasoning": "Brief explanation of your decomposition strategy and why you chose this number of subtopics", + "subtopic_count": , + "subtopics": [ + {{ + "id": "subtopic_1", + "title": "Brief title", + "exploration_task": "Specific question or task for the exploration agent", + "importance": "high|medium|low", + "expected_findings": "What kind of information this subtopic should reveal" + }}, + ... + ] +}} + +Make each exploration_task a clear, focused question that an agent can directly investigate.""" + + self.logger.info(f"Invoking agent for task decomposition (cli_type={cli_type})") + self.logger.debug(f"Full prompt length: {len(prompt)} characters") + + # Invoke the agent + response_text = await agent.invoke(prompt, include_history=False) + + self.logger.info(f"Received response from agent ({len(response_text)} characters)") + self.logger.debug(f"Response preview: {response_text[:200]}...") + + # Extract JSON from response + if "```json" in response_text: + self.logger.debug("Extracting JSON from ```json code block") + json_str = response_text.split("```json")[1].split("```")[0].strip() + elif "```" in response_text: + self.logger.debug("Extracting JSON from ``` code block") + json_str = response_text.split("```")[1].split("```")[0].strip() + else: + self.logger.debug("Using full response as JSON") + json_str = response_text.strip() + + # Log the raw JSON for debugging + self.logger.debug(f"Extracted JSON length: {len(json_str)} characters") + self.logger.debug(f"JSON preview: {json_str[:200]}...") + + # Parse JSON with strict=False to handle control characters + self.logger.debug("Attempting to parse JSON response") + try: + ai_result = json.loads(json_str, strict=False) + self.logger.debug("JSON parsed successfully on first attempt") + except json.JSONDecodeError as e: + # If that fails, try to clean the string + self.logger.warning(f"JSON parsing failed on first attempt: {e}") + self.logger.warning(f"Error at position {e.pos}: '{json_str[max(0, e.pos-20):e.pos+20]}'") + + # More aggressive cleaning: replace actual control characters + # Remove any actual control characters (ASCII 0-31 except tab/newline in proper contexts) + cleaned_json = json_str + + # Replace literal newlines and tabs that appear in string values + # This is a simple approach - just remove them + cleaned_json = cleaned_json.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') + + self.logger.info("Attempting to parse cleaned JSON") + # Try parsing the cleaned version + try: + ai_result = json.loads(cleaned_json, strict=False) + self.logger.info("JSON parsed successfully after cleaning") + except json.JSONDecodeError as e2: + # Last resort: log the problematic JSON and use fallback + self.logger.error(f"Failed to parse JSON even after cleaning: {e2}") + self.logger.error(f"Error at position {e2.pos}: '{cleaned_json[max(0, e2.pos-20):e2.pos+20]}'") + self.logger.error(f"Problematic JSON (first 500 chars): {json_str[:500]}") + self.logger.error(f"Full JSON length: {len(json_str)} characters") + raise + + subtopics = ai_result.get("subtopics", []) + reasoning = ai_result.get("reasoning", "") + + self.logger.info(f"✓ Successfully decomposed into {len(subtopics)} subtopics") + self.logger.info(f"✓ AI reasoning: {reasoning[:100]}..." if len(reasoning) > 100 else f"✓ AI reasoning: {reasoning}") + + # Log each subtopic + for i, subtopic in enumerate(subtopics, 1): + title = subtopic.get("title", "") + importance = subtopic.get("importance", "") + self.logger.debug(f" Subtopic {i}: {title} [{importance}]") + + # Transform subtopics into exploration tasks + self.logger.debug("Transforming subtopics into exploration tasks") + exploration_tasks = [] + for subtopic in subtopics: + exploration_tasks.append({ + "subtopic_id": subtopic.get("id", f"subtopic_{len(exploration_tasks)}"), + "title": subtopic.get("title", ""), + "exploration_task": subtopic.get("exploration_task", ""), + "importance": subtopic.get("importance", "medium"), + "expected_findings": subtopic.get("expected_findings", ""), + }) + + result = { + "subtopics": exploration_tasks, + "subtopic_count": len(exploration_tasks), + "reasoning": reasoning, + "metadata": { + "original_question": question, + "cli_type": cli_type, + "model": model, + "ai_decomposition": True, + }, + } + + self.logger.info("=" * 60) + self.logger.info(f"✓ Decomposition complete: {len(exploration_tasks)} tasks created") + self.logger.info("=" * 60) + + return result + + except Exception as e: + self.logger.error("=" * 60) + self.logger.error(f"✗ Task decomposition failed: {e}", exc_info=True) + self.logger.error("=" * 60) + # Fallback: create single basic exploration task + self.logger.warning("Using fallback: creating single exploration task") + fallback_result = { + "subtopics": [{ + "subtopic_id": "fallback", + "title": "General Exploration", + "exploration_task": self.inputs.get("question", ""), + "importance": "high", + "expected_findings": "General information about the topic", + }], + "subtopic_count": 1, + "reasoning": f"Fallback due to error: {str(e)}", + "metadata": { + "original_question": self.inputs.get("question", ""), + "ai_decomposition": False, + "fallback": True, + }, + } + + self.logger.info("Fallback decomposition created with 1 task") + return fallback_result diff --git a/plugins/automation/workflows/steps/operations/exploration.py b/plugins/automation/workflows/steps/operations/exploration.py index 8c71ff8..5f9deac 100644 --- a/plugins/automation/workflows/steps/operations/exploration.py +++ b/plugins/automation/workflows/steps/operations/exploration.py @@ -64,6 +64,10 @@ def validate(self) -> Optional[str]: """Validate exploration configuration.""" exploration_type = self.config.get("exploration_type", "question") + # Skip validation for template variables (they'll be resolved at runtime) + if isinstance(exploration_type, str) and "{{" in exploration_type: + return None + valid_types = ["question", "implementation", "structure", "usage", "flow", "generic"] if exploration_type not in valid_types: return f"Invalid exploration_type '{exploration_type}'. Valid: {', '.join(valid_types)}" diff --git a/plugins/automation/workflows/steps/operations/schemas.py b/plugins/automation/workflows/steps/operations/schemas.py new file mode 100644 index 0000000..fbe5482 --- /dev/null +++ b/plugins/automation/workflows/steps/operations/schemas.py @@ -0,0 +1,235 @@ +""" +Schema definitions for workflow operation results. + +Provides typed data structures for operation outputs to make it easier +to access and validate data flowing between workflow steps. +""" + +from typing import TypedDict, List, Optional, Any, Literal + + +# Decompose Operation Schemas +class SubtopicSchema(TypedDict): + """Schema for a single subtopic from decompose operation.""" + subtopic_id: str + title: str + exploration_task: str + importance: Literal["high", "medium", "low"] + expected_findings: str + + +class DecomposeMetadata(TypedDict): + """Metadata for decompose operation.""" + original_question: str + cli_type: str + model: Optional[str] + ai_decomposition: bool + fallback: Optional[bool] + + +class DecomposeResult(TypedDict): + """Result schema for decompose operation. + + Example: + { + "subtopics": [ + { + "subtopic_id": "subtopic_1", + "title": "Workflow Definition", + "exploration_task": "How are workflows defined in YAML?", + "importance": "high", + "expected_findings": "YAML structure and validation" + } + ], + "subtopic_count": 3, + "reasoning": "Decomposed into architecture, execution, and integration", + "metadata": {...} + } + """ + subtopics: List[SubtopicSchema] + subtopic_count: int + reasoning: str + metadata: DecomposeMetadata + + +# Loop Operation Schemas +class LoopIterationResult(TypedDict): + """Result from a single loop iteration.""" + status: str # "completed", "failed", "skipped" + step_results: dict # Step results keyed by step ID + error: Optional[str] + + +class LoopResult(TypedDict): + """Result schema for loop operation. + + Example: + { + "iterations": 3, + "successful": 3, + "failed": 0, + "results": [ + { + "status": "completed", + "step_results": {...}, + "error": None + } + ] + } + """ + iterations: int + successful: int + failed: int + results: List[LoopIterationResult] + + +# Aggregate Operation Schemas +class AggregateResult(TypedDict): + """Result schema for aggregate operation. + + Example: + { + "result": 150, + "function": "sum", + "item_count": 10 + } + """ + result: Any # Number, list, or other aggregated value + function: str # "sum", "avg", "count", etc. + item_count: int + + +# Summarize Operation Schemas +class SummarizeSection(TypedDict): + """A section in a summarized report.""" + title: str + content: str + + +class SummarizeResult(TypedDict): + """Result schema for summarize operation. + + Example: + { + "summary": "Overall findings summary...", + "sections": [ + {"title": "Findings", "content": "..."}, + {"title": "Analysis", "content": "..."} + ], + "result": {...} + } + """ + summary: str + sections: Optional[List[SummarizeSection]] + result: Optional[Any] # Structured result data + + +# Helper functions for type-safe access +def get_decompose_result(step_result: Any) -> Optional[DecomposeResult]: + """ + Safely extract and validate decompose operation result. + + Args: + step_result: Raw step result from workflow execution + + Returns: + DecomposeResult dict if valid, None otherwise + + Example: + result = workflow.execute(...) + decompose = get_decompose_result(result.step_results["decompose_question"].result) + if decompose: + for subtopic in decompose["subtopics"]: + print(f"Task: {subtopic['exploration_task']}") + """ + if not isinstance(step_result, dict): + return None + + required_keys = {"subtopics", "subtopic_count", "reasoning", "metadata"} + if not all(key in step_result for key in required_keys): + return None + + return step_result # type: ignore + + +def get_loop_result(step_result: Any) -> Optional[LoopResult]: + """ + Safely extract and validate loop operation result. + + Args: + step_result: Raw step result from workflow execution + + Returns: + LoopResult dict if valid, None otherwise + + Example: + result = workflow.execute(...) + loop = get_loop_result(result.step_results["parallel_exploration"].result) + if loop: + print(f"Completed {loop['successful']}/{loop['iterations']} iterations") + """ + if not isinstance(step_result, dict): + return None + + required_keys = {"iterations", "successful", "failed", "results"} + if not all(key in step_result for key in required_keys): + return None + + return step_result # type: ignore + + +def get_subtopics(decompose_result: DecomposeResult) -> List[SubtopicSchema]: + """ + Extract subtopics from decompose result. + + Args: + decompose_result: Decompose operation result + + Returns: + List of subtopic schemas + + Example: + subtopics = get_subtopics(decompose_result) + for i, subtopic in enumerate(subtopics, 1): + print(f"{i}. {subtopic['title']} [{subtopic['importance']}]") + print(f" Task: {subtopic['exploration_task']}") + """ + return decompose_result["subtopics"] + + +def get_exploration_findings(loop_result: LoopResult) -> List[Any]: + """ + Extract findings from parallel exploration loop. + + Args: + loop_result: Loop operation result + + Returns: + List of findings from each iteration + + Example: + findings = get_exploration_findings(loop_result) + for i, finding in enumerate(findings, 1): + print(f"Finding {i}: {finding}") + """ + findings = [] + for iteration in loop_result["results"]: + if iteration["status"] == "completed" and iteration["step_results"]: + # Get first step result from iteration + first_step = list(iteration["step_results"].values())[0] + + # Handle different result formats + if hasattr(first_step, 'result'): + # StepResult object - extract the result + result = first_step.result + + # If result is a dict with 'finding' key (from exploration operation) + if isinstance(result, dict) and 'finding' in result: + findings.append(result['finding']) + # Otherwise use the result directly (e.g., string from agent step) + else: + findings.append(result) + elif isinstance(first_step, dict): + # Already a dict - use directly + findings.append(first_step) + return findings diff --git a/plugins/automation/workflows/steps/operations/split.py b/plugins/automation/workflows/steps/operations/split.py deleted file mode 100644 index 1eb375a..0000000 --- a/plugins/automation/workflows/steps/operations/split.py +++ /dev/null @@ -1,219 +0,0 @@ -"""Split operation for dividing work into smaller tasks (Map phase).""" - -from typing import Any, Dict, Optional, List - -from .base import BaseOperation - - -class SplitOperation(BaseOperation): - """ - Split work into smaller tasks for parallel processing (Map phase of map-reduce). - - This operation divides input into multiple smaller tasks that can be processed - in parallel by downstream steps. - - Config: - - strategy: Split strategy (by_items, by_count, by_chunk_size, custom) - - count: Number of splits (for by_count) - - chunk_size: Size of each chunk (for by_chunk_size) - - expression: Custom split expression (for custom) - - Inputs: - - items: List or data to split - - Or a single input that will be split based on strategy - - Returns: - - tasks: List of split tasks - - task_count: Number of tasks created - - metadata: Information about the split - - Example: - # Split by count - create N tasks - config: - operation: split - strategy: by_count - count: 5 - inputs: - items: ["task1", "task2", "task3", "task4", "task5", "task6"] - # Result: 5 tasks with ~2 items each - - # Split by items - one task per item - config: - operation: split - strategy: by_items - inputs: - items: ["area1", "area2", "area3"] - # Result: 3 tasks, one for each item - - # Split by chunk size - config: - operation: split - strategy: by_chunk_size - chunk_size: 2 - inputs: - items: [1, 2, 3, 4, 5, 6] - # Result: 3 tasks with 2 items each - """ - - def validate(self) -> Optional[str]: - """Validate split configuration.""" - strategy = self.config.get("strategy", "by_items") - - valid_strategies = ["by_items", "by_count", "by_chunk_size", "custom"] - if strategy not in valid_strategies: - return f"Invalid strategy '{strategy}'. Valid: {', '.join(valid_strategies)}" - - if strategy == "by_count" and "count" not in self.config: - return "by_count strategy requires 'count' in config" - - if strategy == "by_chunk_size" and "chunk_size" not in self.config: - return "by_chunk_size strategy requires 'chunk_size' in config" - - if strategy == "custom" and "expression" not in self.config: - return "custom strategy requires 'expression' in config" - - return None - - async def execute(self) -> Dict[str, Any]: - """Execute split operation.""" - # Get items to split - if "items" in self.inputs: - items = self.inputs["items"] - else: - # Use first input value - items = next(iter(self.inputs.values()), []) - - # Ensure items is a list - if not isinstance(items, list): - items = [items] - - strategy = self.config.get("strategy", "by_items") - - # Execute split strategy - if strategy == "by_items": - tasks = self._split_by_items(items) - elif strategy == "by_count": - tasks = self._split_by_count(items) - elif strategy == "by_chunk_size": - tasks = self._split_by_chunk_size(items) - elif strategy == "custom": - tasks = self._split_custom(items) - else: - tasks = [{"items": items}] - - return { - "tasks": tasks, - "task_count": len(tasks), - "metadata": { - "strategy": strategy, - "original_item_count": len(items), - "split_config": self.config, - }, - } - - def _split_by_items(self, items: List[Any]) -> List[Dict[str, Any]]: - """ - Split into one task per item. - - Args: - items: List of items - - Returns: - List of tasks, one per item - """ - return [{"item": item, "index": i} for i, item in enumerate(items)] - - def _split_by_count(self, items: List[Any]) -> List[Dict[str, Any]]: - """ - Split into N tasks with roughly equal distribution. - - Args: - items: List of items - - Returns: - List of N tasks - """ - count = self.config.get("count", 1) - if count <= 0: - count = 1 - - # Calculate chunk size - chunk_size = max(1, len(items) // count) - - tasks = [] - for i in range(0, len(items), chunk_size): - chunk = items[i:i + chunk_size] - if chunk: # Only add non-empty chunks - tasks.append({ - "items": chunk, - "chunk_index": len(tasks), - "chunk_size": len(chunk), - }) - - # If we have more tasks than requested, merge the last ones - while len(tasks) > count and len(tasks) > 1: - last = tasks.pop() - tasks[-1]["items"].extend(last["items"]) - tasks[-1]["chunk_size"] = len(tasks[-1]["items"]) - - return tasks - - def _split_by_chunk_size(self, items: List[Any]) -> List[Dict[str, Any]]: - """ - Split into chunks of specified size. - - Args: - items: List of items - - Returns: - List of tasks with chunk_size items each - """ - chunk_size = self.config.get("chunk_size", 1) - if chunk_size <= 0: - chunk_size = 1 - - tasks = [] - for i in range(0, len(items), chunk_size): - chunk = items[i:i + chunk_size] - tasks.append({ - "items": chunk, - "chunk_index": len(tasks), - "chunk_size": len(chunk), - }) - - return tasks - - def _split_custom(self, items: List[Any]) -> List[Dict[str, Any]]: - """ - Split using custom expression. - - The expression should return a list of tasks. - - Args: - items: List of items - - Returns: - List of custom tasks - """ - expression = self.config.get("expression", "") - if not expression: - return [{"items": items}] - - try: - namespace = {"items": items, "len": len, "range": range} - result = eval(expression, {"__builtins__": {}}, namespace) - - if isinstance(result, list): - # Ensure each result is a dict - tasks = [] - for i, item in enumerate(result): - if isinstance(item, dict): - tasks.append(item) - else: - tasks.append({"item": item, "index": i}) - return tasks - else: - return [{"result": result}] - except Exception as e: - # Fallback to simple split on error - return [{"items": items, "error": f"Custom split failed: {str(e)}"}] diff --git a/utils/agent/cli_executor.py b/utils/agent/cli_executor.py index 1252fc4..4296a82 100644 --- a/utils/agent/cli_executor.py +++ b/utils/agent/cli_executor.py @@ -232,8 +232,10 @@ async def execute(self, prompt: str) -> str: if result.returncode != 0: error_msg = f"CLI failed with exit code {result.returncode}" if error_output: - error_msg += f"\n{error_output}" - logger.error(error_msg) + error_msg += f"\nStderr: {error_output}" + if response: + error_msg += f"\nStdout: {response}" + logger.error(f"CLI execution failed:\nCommand: {' '.join(cmd[:3])} [prompt...]\n{error_msg}") return f"Error: {error_msg}" if error_output: