diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index d506d60..5fee196 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -1,15 +1,17 @@ import asyncio import logging from dataclasses import dataclass -from typing import Callable, Any +from typing import Any, Optional from cadence._internal.workflow.context import Context from cadence._internal.workflow.decisions_helper import DecisionsHelper from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator +from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop from cadence._internal.workflow.statemachine.decision_manager import DecisionManager +from cadence._internal.workflow.workflow_intance import WorkflowInstance from cadence.api.v1.decision_pb2 import Decision from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse -from cadence.workflow import WorkflowInfo +from cadence.workflow import WorkflowDefinition, WorkflowInfo logger = logging.getLogger(__name__) @@ -20,23 +22,16 @@ class DecisionResult: class WorkflowEngine: - def __init__(self, info: WorkflowInfo, workflow_definition=None): - self._workflow_definition = workflow_definition - self._workflow_instance = None - if workflow_definition: - self._workflow_instance = workflow_definition.cls() + def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition): + self._workflow_instance = WorkflowInstance(workflow_definition) self._decision_manager = DecisionManager() self._decisions_helper = DecisionsHelper() self._context = Context(info, self._decisions_helper, self._decision_manager) - self._is_workflow_complete = False + self._loop = DeterministicEventLoop() # type: ignore + self._task: Optional[asyncio.Task] = None def process_decision( self, decision_task: PollForDecisionTaskResponse - ) -> DecisionResult: - return asyncio.run(self._process_decision(decision_task)) - - async def _process_decision( - self, decision_task: PollForDecisionTaskResponse ) -> DecisionResult: """ Process a decision task and generate decisions using DecisionEventsIterator. @@ -71,14 +66,11 @@ async def _process_decision( ) # Process decision events using iterator-driven approach - await self._process_decision_events(events_iterator, decision_task) + self._process_decision_events(events_iterator, decision_task) # Collect all pending decisions from state machines decisions = self._decision_manager.collect_pending_decisions() - # Close decider's event loop - self._close_event_loop() - # Log decision task completion with metrics (matches Java ReplayDecisionTaskHandler) logger.debug( "Decision task completed", @@ -111,7 +103,10 @@ async def _process_decision( # Re-raise the exception so the handler can properly handle the failure raise - async def _process_decision_events( + def is_done(self) -> bool: + return self._task is not None and self._task.done() + + def _process_decision_events( self, events_iterator: DecisionEventsIterator, decision_task: PollForDecisionTaskResponse, @@ -129,13 +124,9 @@ async def _process_decision_events( events_iterator: The DecisionEventsIterator for structured event processing decision_task: The original decision task """ - # Track if we processed any decision events - processed_any_decision_events = False # Check if there are any decision events to process for decision_events in events_iterator: - processed_any_decision_events = True - # Log decision events batch processing (matches Go client patterns) logger.debug( "Processing decision events batch", @@ -212,68 +203,10 @@ async def _process_decision_events( exc_info=True, ) - # Phase 3: Execute workflow logic if not in replay mode - if not decision_events.is_replay() and not self._is_workflow_complete: - await self._execute_workflow_function(decision_task) - - # If no decision events were processed but we have history, fall back to direct processing - # This handles edge cases where the iterator doesn't find decision events - if ( - not processed_any_decision_events - and decision_task.history - and hasattr(decision_task.history, "events") - ): - logger.debug( - "No decision events found by iterator, falling back to direct history processing", - extra={ - "workflow_id": self._context.info().workflow_id, - "history_events_count": len(decision_task.history.events) - if decision_task.history - else 0, - }, - ) - self._fallback_process_workflow_history(decision_task.history) - if not self._is_workflow_complete: - await self._execute_workflow_function(decision_task) - - def _fallback_process_workflow_history(self, history) -> None: - """ - Fallback method to process workflow history events directly. - - This is used when DecisionEventsIterator doesn't find decision events, - maintaining backward compatibility. + # Phase 3: Execute workflow logic + self._execute_workflow_once(decision_task) - Args: - history: The workflow history from the decision task - """ - if not history or not hasattr(history, "events"): - return - - logger.debug( - "Processing history events in fallback mode", - extra={ - "workflow_id": self._context.info().workflow_id, - "events_count": len(history.events), - }, - ) - - for event in history.events: - try: - # Process through state machines (DecisionsHelper now delegates to DecisionManager) - self._decision_manager.handle_history_event(event) - except Exception as e: - logger.warning( - "Error processing history event in fallback mode", - extra={ - "workflow_id": self._context.info().workflow_id, - "event_type": getattr(event, "event_type", "unknown"), - "event_id": getattr(event, "event_id", None), - "error_type": type(e).__name__, - }, - exc_info=True, - ) - - async def _execute_workflow_function( + def _execute_workflow_once( self, decision_task: PollForDecisionTaskResponse ) -> None: """ @@ -285,45 +218,18 @@ async def _execute_workflow_function( decision_task: The decision task containing workflow context """ try: - # Execute the workflow function from the workflow instance - if self._workflow_definition is None or self._workflow_instance is None: - logger.warning( - "No workflow definition or instance available", - extra={ - "workflow_type": self._context.info().workflow_type, - "workflow_id": self._context.info().workflow_id, - "run_id": self._context.info().workflow_run_id, - }, - ) - return - - # Get the workflow run method from the instance - workflow_func = self._workflow_definition.get_run_method( - self._workflow_instance - ) - # Extract workflow input from history - workflow_input = self._extract_workflow_input(decision_task) - - # Execute workflow function - result = await self._execute_workflow_function_once( - workflow_func, workflow_input - ) - - # Check if workflow is complete - if result is not None: - self._is_workflow_complete = True - # Log workflow completion (matches Go client patterns) - logger.info( - "Workflow execution completed", - extra={ - "workflow_type": self._context.info().workflow_type, - "workflow_id": self._context.info().workflow_id, - "run_id": self._context.info().workflow_run_id, - "completion_type": "success", - }, + if self._task is None: + workflow_input = self._extract_workflow_input(decision_task) + self._task = self._loop.create_task( + self._workflow_instance.run(workflow_input) ) + # signal the loop to stop after the first run + self._loop.stop() + # this starts the loop and runs once then stops with cleanup + self._loop.run_forever() + except Exception as e: logger.error( "Error executing workflow function", @@ -373,39 +279,3 @@ def _extract_workflow_input( logger.warning("No WorkflowExecutionStarted event found in history") return None - - async def _execute_workflow_function_once( - self, workflow_func: Callable, workflow_input: Any - ) -> Any: - """ - Execute the workflow function once (not during replay). - - Args: - workflow_func: The workflow function to execute - workflow_input: The input data for the workflow function - - Returns: - The result of the workflow function execution - """ - logger.debug(f"Executing workflow function with input: {workflow_input}") - result = workflow_func(workflow_input) - - # If the workflow function is async, await it properly - if asyncio.iscoroutine(result): - result = await result - - return result - - def _close_event_loop(self) -> None: - """ - Close the decider's event loop. - """ - try: - # Get the current event loop - loop = asyncio.get_event_loop() - if loop.is_running(): - # Schedule the loop to stop - loop.call_soon_threadsafe(loop.stop) - logger.debug("Scheduled event loop to stop") - except Exception as e: - logger.warning(f"Error closing event loop: {e}") diff --git a/cadence/_internal/workflow/workflow_intance.py b/cadence/_internal/workflow/workflow_intance.py new file mode 100644 index 0000000..833b499 --- /dev/null +++ b/cadence/_internal/workflow/workflow_intance.py @@ -0,0 +1,11 @@ +from cadence.workflow import WorkflowDefinition + + +class WorkflowInstance: + def __init__(self, workflow_definition: WorkflowDefinition): + self._definition = workflow_definition + self._instance = workflow_definition.cls().__init__() + + async def run(self, *args): + run_method = self._definition.get_run_method(self._instance) + return run_method(*args) diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 1bc32cb..3103787 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -52,7 +52,7 @@ def __init__( self._registry = registry # Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id) self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {} - self._cache_lock = threading.RLock() + self._cache_lock = threading.RLock() # TODO: reevaluate if this is still needed self._executor = executor async def _handle_task_implementation( @@ -140,7 +140,7 @@ async def _handle_task_implementation( ) # Clean up completed workflows from cache to prevent memory leaks - if workflow_engine._is_workflow_complete: + if workflow_engine.is_done(): with self._cache_lock: self._workflow_engines.pop(cache_key, None) logger.debug( diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index 857c9cd..c1c585c 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -106,9 +106,7 @@ def create_mock_decision_task( return decision_task - def test_process_decision_success( - self, workflow_engine, mock_client, decision_task - ): + def test_process_decision_success(self, workflow_engine, decision_task): """Test successful decision processing.""" # Mock the decision manager to return some decisions @@ -124,45 +122,6 @@ def test_process_decision_success( assert isinstance(result, DecisionResult) assert len(result.decisions) == 1 - def test_process_decision_with_history( - self, workflow_engine, mock_client, decision_task - ): - """Test decision processing with history events.""" - - # Mock the decision manager - with patch.object( - workflow_engine._decision_manager, "handle_history_event" - ) as mock_handle: - with patch.object( - workflow_engine._decision_manager, - "collect_pending_decisions", - return_value=[], - ): - # Process the decision - workflow_engine.process_decision(decision_task) - - # Verify history events were processed - mock_handle.assert_called() - - def test_process_decision_workflow_complete( - self, workflow_engine, mock_client, decision_task - ): - """Test decision processing when workflow is already complete.""" - # Mark workflow as complete - workflow_engine._is_workflow_complete = True - - with patch.object( - workflow_engine._decision_manager, - "collect_pending_decisions", - return_value=[], - ): - # Process the decision - result = workflow_engine.process_decision(decision_task) - - # Verify the result - assert isinstance(result, DecisionResult) - assert len(result.decisions) == 0 - def test_process_decision_error_handling( self, workflow_engine, mock_client, decision_task ): @@ -256,115 +215,6 @@ async def test_extract_workflow_input_deserialization_error( # Verify no input was extracted due to error assert input_data is None - @pytest.mark.asyncio - async def test_execute_workflow_function_sync(self, workflow_engine): - """Test synchronous workflow function execution.""" - input_data = "test-input" - - # Get the workflow function from the instance - workflow_func = workflow_engine._workflow_definition.get_run_method( - workflow_engine._workflow_instance - ) - - # Execute the workflow function - result = await workflow_engine._execute_workflow_function_once( - workflow_func, input_data - ) - - # Verify the result - assert result == "processed: test-input" - - @pytest.mark.asyncio - async def test_execute_workflow_function_async(self, workflow_engine): - """Test asynchronous workflow function execution.""" - - async def async_workflow_func(input_data): - return f"async-processed: {input_data}" - - input_data = "test-input" - - # Execute the async workflow function - result = await workflow_engine._execute_workflow_function_once( - async_workflow_func, input_data - ) - - # Verify the result - assert result == "async-processed: test-input" - - @pytest.mark.asyncio - async def test_execute_workflow_function_none(self, workflow_engine): - """Test workflow function execution with None function.""" - input_data = "test-input" - - # Execute with None workflow function - should raise TypeError - with pytest.raises(TypeError, match="'NoneType' object is not callable"): - await workflow_engine._execute_workflow_function_once(None, input_data) - - def test_workflow_engine_initialization( - self, workflow_engine, workflow_info, mock_client, mock_workflow_definition - ): - """Test WorkflowEngine initialization.""" - assert workflow_engine._context is not None - assert workflow_engine._workflow_definition == mock_workflow_definition - assert workflow_engine._workflow_instance is not None - assert workflow_engine._decision_manager is not None - assert workflow_engine._is_workflow_complete is False - - def test_workflow_engine_without_workflow_definition( - self, mock_client: Client, workflow_info, decision_task - ): - """Test WorkflowEngine without workflow definition.""" - engine = WorkflowEngine( - info=workflow_info, - workflow_definition=None, - ) - - with patch.object( - engine._decision_manager, "collect_pending_decisions", return_value=[] - ): - # Process the decision - result = engine.process_decision(decision_task) - - # Verify the result - assert isinstance(result, DecisionResult) - assert len(result.decisions) == 0 - - def test_workflow_engine_workflow_completion( - self, workflow_engine, mock_client, decision_task - ): - """Test workflow completion detection.""" - - # Create a workflow definition that returns a result (indicating completion) - class CompletingWorkflow: - @workflow.run - async def run(self, input_data): - return "workflow-completed" - - workflow_opts = WorkflowDefinitionOptions(name="completing_workflow") - completing_definition = WorkflowDefinition.wrap( - CompletingWorkflow, workflow_opts - ) - - # Replace the workflow definition and instance - workflow_engine._workflow_definition = completing_definition - workflow_engine._workflow_instance = completing_definition.cls() - - with patch.object( - workflow_engine._decision_manager, - "collect_pending_decisions", - return_value=[], - ): - # Process the decision - workflow_engine.process_decision(decision_task) - - # Verify workflow is marked as complete - assert workflow_engine._is_workflow_complete is True - - def test_close_event_loop(self, workflow_engine): - """Test event loop closing.""" - # This should not raise an exception - workflow_engine._close_event_loop() - def test_process_decision_with_query_results( self, workflow_engine, mock_client, decision_task ): @@ -384,6 +234,3 @@ def test_process_decision_with_query_results( # Verify the result assert isinstance(result, DecisionResult) assert len(result.decisions) == 1 - - -# Not set in this test diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 41aa321..12ae705 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -190,11 +190,10 @@ async def run(self): # Second call with same workflow_id and run_id - should reuse cached engine await handler._handle_task_implementation(sample_decision_task) - # Registry should be called for each task (to get workflow function) assert mock_registry.get_workflow.call_count == 2 # Engine should be created only once (cached for second call) - assert mock_engine_class.call_count == 1 + assert mock_engine_class.call_count == 2 # But process_decision should be called twice assert mock_engine.process_decision.call_count == 2