Skip to content

Commit 273f7d0

Browse files
authored
feat(worker): run workflow function in a DeterministicEventLoop (#51)
<!-- Describe what has changed in this PR --> **What changed?** * add WorkflowInstance * workflow class object is constructed once per WorkflowEngine for maintaining state. * workflow coroutine is created once on first decision processing inside the DeterministicEventLoop * remove not useful unite tests TODO * workflow completion logic isn't correct * relay logic isn't correct <!-- Tell your future self why have you made these changes --> **Why?** From user perspective, workflow is just a coroutine which should be able to use any **event loop** APIs. This isn't correct because most I/O are non-deterministic. Instead, workflow coroutines are running in a DeterministicEventLoop which is a minimal implementation that forbids all I/O operations. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit Test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** Signed-off-by: Shijie Sheng <[email protected]>
1 parent fa49df2 commit 273f7d0

File tree

5 files changed

+40
-313
lines changed

5 files changed

+40
-313
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 25 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import asyncio
22
import logging
33
from dataclasses import dataclass
4-
from typing import Callable, Any
4+
from typing import Any, Optional
55

66
from cadence._internal.workflow.context import Context
77
from cadence._internal.workflow.decisions_helper import DecisionsHelper
88
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
9+
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
910
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
11+
from cadence._internal.workflow.workflow_intance import WorkflowInstance
1012
from cadence.api.v1.decision_pb2 import Decision
1113
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
12-
from cadence.workflow import WorkflowInfo
14+
from cadence.workflow import WorkflowDefinition, WorkflowInfo
1315

1416
logger = logging.getLogger(__name__)
1517

@@ -20,23 +22,16 @@ class DecisionResult:
2022

2123

2224
class WorkflowEngine:
23-
def __init__(self, info: WorkflowInfo, workflow_definition=None):
24-
self._workflow_definition = workflow_definition
25-
self._workflow_instance = None
26-
if workflow_definition:
27-
self._workflow_instance = workflow_definition.cls()
25+
def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition):
26+
self._workflow_instance = WorkflowInstance(workflow_definition)
2827
self._decision_manager = DecisionManager()
2928
self._decisions_helper = DecisionsHelper()
3029
self._context = Context(info, self._decisions_helper, self._decision_manager)
31-
self._is_workflow_complete = False
30+
self._loop = DeterministicEventLoop() # type: ignore
31+
self._task: Optional[asyncio.Task] = None
3232

3333
def process_decision(
3434
self, decision_task: PollForDecisionTaskResponse
35-
) -> DecisionResult:
36-
return asyncio.run(self._process_decision(decision_task))
37-
38-
async def _process_decision(
39-
self, decision_task: PollForDecisionTaskResponse
4035
) -> DecisionResult:
4136
"""
4237
Process a decision task and generate decisions using DecisionEventsIterator.
@@ -71,14 +66,11 @@ async def _process_decision(
7166
)
7267

7368
# Process decision events using iterator-driven approach
74-
await self._process_decision_events(events_iterator, decision_task)
69+
self._process_decision_events(events_iterator, decision_task)
7570

7671
# Collect all pending decisions from state machines
7772
decisions = self._decision_manager.collect_pending_decisions()
7873

79-
# Close decider's event loop
80-
self._close_event_loop()
81-
8274
# Log decision task completion with metrics (matches Java ReplayDecisionTaskHandler)
8375
logger.debug(
8476
"Decision task completed",
@@ -111,7 +103,10 @@ async def _process_decision(
111103
# Re-raise the exception so the handler can properly handle the failure
112104
raise
113105

114-
async def _process_decision_events(
106+
def is_done(self) -> bool:
107+
return self._task is not None and self._task.done()
108+
109+
def _process_decision_events(
115110
self,
116111
events_iterator: DecisionEventsIterator,
117112
decision_task: PollForDecisionTaskResponse,
@@ -129,13 +124,9 @@ async def _process_decision_events(
129124
events_iterator: The DecisionEventsIterator for structured event processing
130125
decision_task: The original decision task
131126
"""
132-
# Track if we processed any decision events
133-
processed_any_decision_events = False
134127

135128
# Check if there are any decision events to process
136129
for decision_events in events_iterator:
137-
processed_any_decision_events = True
138-
139130
# Log decision events batch processing (matches Go client patterns)
140131
logger.debug(
141132
"Processing decision events batch",
@@ -212,68 +203,10 @@ async def _process_decision_events(
212203
exc_info=True,
213204
)
214205

215-
# Phase 3: Execute workflow logic if not in replay mode
216-
if not decision_events.is_replay() and not self._is_workflow_complete:
217-
await self._execute_workflow_function(decision_task)
218-
219-
# If no decision events were processed but we have history, fall back to direct processing
220-
# This handles edge cases where the iterator doesn't find decision events
221-
if (
222-
not processed_any_decision_events
223-
and decision_task.history
224-
and hasattr(decision_task.history, "events")
225-
):
226-
logger.debug(
227-
"No decision events found by iterator, falling back to direct history processing",
228-
extra={
229-
"workflow_id": self._context.info().workflow_id,
230-
"history_events_count": len(decision_task.history.events)
231-
if decision_task.history
232-
else 0,
233-
},
234-
)
235-
self._fallback_process_workflow_history(decision_task.history)
236-
if not self._is_workflow_complete:
237-
await self._execute_workflow_function(decision_task)
238-
239-
def _fallback_process_workflow_history(self, history) -> None:
240-
"""
241-
Fallback method to process workflow history events directly.
242-
243-
This is used when DecisionEventsIterator doesn't find decision events,
244-
maintaining backward compatibility.
206+
# Phase 3: Execute workflow logic
207+
self._execute_workflow_once(decision_task)
245208

246-
Args:
247-
history: The workflow history from the decision task
248-
"""
249-
if not history or not hasattr(history, "events"):
250-
return
251-
252-
logger.debug(
253-
"Processing history events in fallback mode",
254-
extra={
255-
"workflow_id": self._context.info().workflow_id,
256-
"events_count": len(history.events),
257-
},
258-
)
259-
260-
for event in history.events:
261-
try:
262-
# Process through state machines (DecisionsHelper now delegates to DecisionManager)
263-
self._decision_manager.handle_history_event(event)
264-
except Exception as e:
265-
logger.warning(
266-
"Error processing history event in fallback mode",
267-
extra={
268-
"workflow_id": self._context.info().workflow_id,
269-
"event_type": getattr(event, "event_type", "unknown"),
270-
"event_id": getattr(event, "event_id", None),
271-
"error_type": type(e).__name__,
272-
},
273-
exc_info=True,
274-
)
275-
276-
async def _execute_workflow_function(
209+
def _execute_workflow_once(
277210
self, decision_task: PollForDecisionTaskResponse
278211
) -> None:
279212
"""
@@ -285,45 +218,18 @@ async def _execute_workflow_function(
285218
decision_task: The decision task containing workflow context
286219
"""
287220
try:
288-
# Execute the workflow function from the workflow instance
289-
if self._workflow_definition is None or self._workflow_instance is None:
290-
logger.warning(
291-
"No workflow definition or instance available",
292-
extra={
293-
"workflow_type": self._context.info().workflow_type,
294-
"workflow_id": self._context.info().workflow_id,
295-
"run_id": self._context.info().workflow_run_id,
296-
},
297-
)
298-
return
299-
300-
# Get the workflow run method from the instance
301-
workflow_func = self._workflow_definition.get_run_method(
302-
self._workflow_instance
303-
)
304-
305221
# Extract workflow input from history
306-
workflow_input = self._extract_workflow_input(decision_task)
307-
308-
# Execute workflow function
309-
result = await self._execute_workflow_function_once(
310-
workflow_func, workflow_input
311-
)
312-
313-
# Check if workflow is complete
314-
if result is not None:
315-
self._is_workflow_complete = True
316-
# Log workflow completion (matches Go client patterns)
317-
logger.info(
318-
"Workflow execution completed",
319-
extra={
320-
"workflow_type": self._context.info().workflow_type,
321-
"workflow_id": self._context.info().workflow_id,
322-
"run_id": self._context.info().workflow_run_id,
323-
"completion_type": "success",
324-
},
222+
if self._task is None:
223+
workflow_input = self._extract_workflow_input(decision_task)
224+
self._task = self._loop.create_task(
225+
self._workflow_instance.run(workflow_input)
325226
)
326227

228+
# signal the loop to stop after the first run
229+
self._loop.stop()
230+
# this starts the loop and runs once then stops with cleanup
231+
self._loop.run_forever()
232+
327233
except Exception as e:
328234
logger.error(
329235
"Error executing workflow function",
@@ -373,39 +279,3 @@ def _extract_workflow_input(
373279

374280
logger.warning("No WorkflowExecutionStarted event found in history")
375281
return None
376-
377-
async def _execute_workflow_function_once(
378-
self, workflow_func: Callable, workflow_input: Any
379-
) -> Any:
380-
"""
381-
Execute the workflow function once (not during replay).
382-
383-
Args:
384-
workflow_func: The workflow function to execute
385-
workflow_input: The input data for the workflow function
386-
387-
Returns:
388-
The result of the workflow function execution
389-
"""
390-
logger.debug(f"Executing workflow function with input: {workflow_input}")
391-
result = workflow_func(workflow_input)
392-
393-
# If the workflow function is async, await it properly
394-
if asyncio.iscoroutine(result):
395-
result = await result
396-
397-
return result
398-
399-
def _close_event_loop(self) -> None:
400-
"""
401-
Close the decider's event loop.
402-
"""
403-
try:
404-
# Get the current event loop
405-
loop = asyncio.get_event_loop()
406-
if loop.is_running():
407-
# Schedule the loop to stop
408-
loop.call_soon_threadsafe(loop.stop)
409-
logger.debug("Scheduled event loop to stop")
410-
except Exception as e:
411-
logger.warning(f"Error closing event loop: {e}")
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from cadence.workflow import WorkflowDefinition
2+
3+
4+
class WorkflowInstance:
5+
def __init__(self, workflow_definition: WorkflowDefinition):
6+
self._definition = workflow_definition
7+
self._instance = workflow_definition.cls().__init__()
8+
9+
async def run(self, *args):
10+
run_method = self._definition.get_run_method(self._instance)
11+
return run_method(*args)

cadence/worker/_decision_task_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def __init__(
5252
self._registry = registry
5353
# Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id)
5454
self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {}
55-
self._cache_lock = threading.RLock()
55+
self._cache_lock = threading.RLock() # TODO: reevaluate if this is still needed
5656
self._executor = executor
5757

5858
async def _handle_task_implementation(
@@ -140,7 +140,7 @@ async def _handle_task_implementation(
140140
)
141141

142142
# Clean up completed workflows from cache to prevent memory leaks
143-
if workflow_engine._is_workflow_complete:
143+
if workflow_engine.is_done():
144144
with self._cache_lock:
145145
self._workflow_engines.pop(cache_key, None)
146146
logger.debug(

0 commit comments

Comments
 (0)