Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 25 additions & 155 deletions cadence/_internal/workflow/workflow_engine.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import asyncio
import logging
from dataclasses import dataclass
from typing import Callable, Any
from typing import Any, Optional

from cadence._internal.workflow.context import Context
from cadence._internal.workflow.decisions_helper import DecisionsHelper
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
from cadence._internal.workflow.workflow_intance import WorkflowInstance
from cadence.api.v1.decision_pb2 import Decision
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
from cadence.workflow import WorkflowInfo
from cadence.workflow import WorkflowDefinition, WorkflowInfo

logger = logging.getLogger(__name__)

Expand All @@ -20,23 +22,16 @@ class DecisionResult:


class WorkflowEngine:
def __init__(self, info: WorkflowInfo, workflow_definition=None):
self._workflow_definition = workflow_definition
self._workflow_instance = None
if workflow_definition:
self._workflow_instance = workflow_definition.cls()
def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition):
self._workflow_instance = WorkflowInstance(workflow_definition)
self._decision_manager = DecisionManager()
self._decisions_helper = DecisionsHelper()
self._context = Context(info, self._decisions_helper, self._decision_manager)
self._is_workflow_complete = False
self._loop = DeterministicEventLoop() # type: ignore
self._task: Optional[asyncio.Task] = None

def process_decision(
self, decision_task: PollForDecisionTaskResponse
) -> DecisionResult:
return asyncio.run(self._process_decision(decision_task))

async def _process_decision(
self, decision_task: PollForDecisionTaskResponse
) -> DecisionResult:
"""
Process a decision task and generate decisions using DecisionEventsIterator.
Expand Down Expand Up @@ -71,14 +66,11 @@ async def _process_decision(
)

# Process decision events using iterator-driven approach
await self._process_decision_events(events_iterator, decision_task)
self._process_decision_events(events_iterator, decision_task)

# Collect all pending decisions from state machines
decisions = self._decision_manager.collect_pending_decisions()

# Close decider's event loop
self._close_event_loop()

# Log decision task completion with metrics (matches Java ReplayDecisionTaskHandler)
logger.debug(
"Decision task completed",
Expand Down Expand Up @@ -111,7 +103,10 @@ async def _process_decision(
# Re-raise the exception so the handler can properly handle the failure
raise

async def _process_decision_events(
def is_done(self) -> bool:
return self._task is not None and self._task.done()

def _process_decision_events(
self,
events_iterator: DecisionEventsIterator,
decision_task: PollForDecisionTaskResponse,
Expand All @@ -129,13 +124,9 @@ async def _process_decision_events(
events_iterator: The DecisionEventsIterator for structured event processing
decision_task: The original decision task
"""
# Track if we processed any decision events
processed_any_decision_events = False

# Check if there are any decision events to process
for decision_events in events_iterator:
processed_any_decision_events = True

# Log decision events batch processing (matches Go client patterns)
logger.debug(
"Processing decision events batch",
Expand Down Expand Up @@ -212,68 +203,10 @@ async def _process_decision_events(
exc_info=True,
)

# Phase 3: Execute workflow logic if not in replay mode
if not decision_events.is_replay() and not self._is_workflow_complete:
await self._execute_workflow_function(decision_task)

# If no decision events were processed but we have history, fall back to direct processing
# This handles edge cases where the iterator doesn't find decision events
if (
not processed_any_decision_events
and decision_task.history
and hasattr(decision_task.history, "events")
):
logger.debug(
"No decision events found by iterator, falling back to direct history processing",
extra={
"workflow_id": self._context.info().workflow_id,
"history_events_count": len(decision_task.history.events)
if decision_task.history
else 0,
},
)
self._fallback_process_workflow_history(decision_task.history)
if not self._is_workflow_complete:
await self._execute_workflow_function(decision_task)

def _fallback_process_workflow_history(self, history) -> None:
"""
Fallback method to process workflow history events directly.

This is used when DecisionEventsIterator doesn't find decision events,
maintaining backward compatibility.
# Phase 3: Execute workflow logic
self._execute_workflow_once(decision_task)

Args:
history: The workflow history from the decision task
"""
if not history or not hasattr(history, "events"):
return

logger.debug(
"Processing history events in fallback mode",
extra={
"workflow_id": self._context.info().workflow_id,
"events_count": len(history.events),
},
)

for event in history.events:
try:
# Process through state machines (DecisionsHelper now delegates to DecisionManager)
self._decision_manager.handle_history_event(event)
except Exception as e:
logger.warning(
"Error processing history event in fallback mode",
extra={
"workflow_id": self._context.info().workflow_id,
"event_type": getattr(event, "event_type", "unknown"),
"event_id": getattr(event, "event_id", None),
"error_type": type(e).__name__,
},
exc_info=True,
)

async def _execute_workflow_function(
def _execute_workflow_once(
self, decision_task: PollForDecisionTaskResponse
) -> None:
"""
Expand All @@ -285,45 +218,18 @@ async def _execute_workflow_function(
decision_task: The decision task containing workflow context
"""
try:
# Execute the workflow function from the workflow instance
if self._workflow_definition is None or self._workflow_instance is None:
logger.warning(
"No workflow definition or instance available",
extra={
"workflow_type": self._context.info().workflow_type,
"workflow_id": self._context.info().workflow_id,
"run_id": self._context.info().workflow_run_id,
},
)
return

# Get the workflow run method from the instance
workflow_func = self._workflow_definition.get_run_method(
self._workflow_instance
)

# Extract workflow input from history
workflow_input = self._extract_workflow_input(decision_task)

# Execute workflow function
result = await self._execute_workflow_function_once(
workflow_func, workflow_input
)

# Check if workflow is complete
if result is not None:
self._is_workflow_complete = True
# Log workflow completion (matches Go client patterns)
logger.info(
"Workflow execution completed",
extra={
"workflow_type": self._context.info().workflow_type,
"workflow_id": self._context.info().workflow_id,
"run_id": self._context.info().workflow_run_id,
"completion_type": "success",
},
if self._task is None:
workflow_input = self._extract_workflow_input(decision_task)
self._task = self._loop.create_task(
self._workflow_instance.run(workflow_input)
)

# signal the loop to stop after the first run
self._loop.stop()
# this starts the loop and runs once then stops with cleanup
self._loop.run_forever()

except Exception as e:
logger.error(
"Error executing workflow function",
Expand Down Expand Up @@ -373,39 +279,3 @@ def _extract_workflow_input(

logger.warning("No WorkflowExecutionStarted event found in history")
return None

async def _execute_workflow_function_once(
self, workflow_func: Callable, workflow_input: Any
) -> Any:
"""
Execute the workflow function once (not during replay).

Args:
workflow_func: The workflow function to execute
workflow_input: The input data for the workflow function

Returns:
The result of the workflow function execution
"""
logger.debug(f"Executing workflow function with input: {workflow_input}")
result = workflow_func(workflow_input)

# If the workflow function is async, await it properly
if asyncio.iscoroutine(result):
result = await result

return result

def _close_event_loop(self) -> None:
"""
Close the decider's event loop.
"""
try:
# Get the current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# Schedule the loop to stop
loop.call_soon_threadsafe(loop.stop)
logger.debug("Scheduled event loop to stop")
except Exception as e:
logger.warning(f"Error closing event loop: {e}")
11 changes: 11 additions & 0 deletions cadence/_internal/workflow/workflow_intance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from cadence.workflow import WorkflowDefinition


class WorkflowInstance:
def __init__(self, workflow_definition: WorkflowDefinition):
self._definition = workflow_definition
self._instance = workflow_definition.cls().__init__()

async def run(self, *args):
run_method = self._definition.get_run_method(self._instance)
return run_method(*args)
4 changes: 2 additions & 2 deletions cadence/worker/_decision_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(
self._registry = registry
# Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id)
self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {}
self._cache_lock = threading.RLock()
self._cache_lock = threading.RLock() # TODO: reevaluate if this is still needed
self._executor = executor

async def _handle_task_implementation(
Expand Down Expand Up @@ -140,7 +140,7 @@ async def _handle_task_implementation(
)

# Clean up completed workflows from cache to prevent memory leaks
if workflow_engine._is_workflow_complete:
if workflow_engine.is_done():
with self._cache_lock:
self._workflow_engines.pop(cache_key, None)
logger.debug(
Expand Down
Loading