Skip to content

Commit fa49df2

Browse files
authored
feat(worker): add executor to decision handler (#49)
<!-- Describe what has changed in this PR --> **What changed?** * add threadpool executor to decision handler <!-- Tell your future self why have you made these changes --> **Why?** Each workflow engine needs to be running in a separate thread with a deterministic event loop. The loop hook will be in the next PR <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit Test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** Signed-off-by: Shijie Sheng <[email protected]>
1 parent fa66474 commit fa49df2

File tree

7 files changed

+48
-42
lines changed

7 files changed

+48
-42
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ def __init__(self, info: WorkflowInfo, workflow_definition=None):
3030
self._context = Context(info, self._decisions_helper, self._decision_manager)
3131
self._is_workflow_complete = False
3232

33-
async def process_decision(
33+
def process_decision(
34+
self, decision_task: PollForDecisionTaskResponse
35+
) -> DecisionResult:
36+
return asyncio.run(self._process_decision(decision_task))
37+
38+
async def _process_decision(
3439
self, decision_task: PollForDecisionTaskResponse
3540
) -> DecisionResult:
3641
"""

cadence/worker/_decision.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
23
from typing import Optional
34

45
from cadence.api.v1.service_worker_pb2 import (
@@ -24,8 +25,11 @@ def __init__(
2425
permits = asyncio.Semaphore(
2526
options["max_concurrent_decision_task_execution_size"]
2627
)
28+
executor = ThreadPoolExecutor(
29+
max_workers=options["max_concurrent_decision_task_execution_size"]
30+
)
2731
self._decision_handler = DecisionTaskHandler(
28-
client, task_list, registry, **options
32+
client, task_list, registry, executor=executor, **options
2933
)
3034
self._poller = Poller[PollForDecisionTaskResponse](
3135
options["decision_task_pollers"], permits, self._poll, self._execute

cadence/worker/_decision_task_handler.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
13
import logging
24
import threading
3-
from typing import Dict, Tuple
5+
from typing import Dict, Optional, Tuple
46

57
from cadence._internal.workflow.history_event_iterator import iterate_history_events
68
from cadence.api.v1.common_pb2 import Payload
@@ -33,6 +35,7 @@ def __init__(
3335
task_list: str,
3436
registry: Registry,
3537
identity: str = "unknown",
38+
executor: Optional[ThreadPoolExecutor] = None,
3639
**options,
3740
):
3841
"""
@@ -50,6 +53,7 @@ def __init__(
5053
# Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id)
5154
self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {}
5255
self._cache_lock = threading.RLock()
56+
self._executor = executor
5357

5458
async def _handle_task_implementation(
5559
self, task: PollForDecisionTaskResponse
@@ -131,7 +135,9 @@ async def _handle_task_implementation(
131135
)
132136
self._workflow_engines[cache_key] = workflow_engine
133137

134-
decision_result = await workflow_engine.process_decision(task)
138+
decision_result = await asyncio.get_running_loop().run_in_executor(
139+
self._executor, workflow_engine.process_decision, task
140+
)
135141

136142
# Clean up completed workflows from cache to prevent memory leaks
137143
if workflow_engine._is_workflow_complete:

tests/cadence/_internal/workflow/test_workflow_engine_integration.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ def create_mock_decision_task(
106106

107107
return decision_task
108108

109-
@pytest.mark.asyncio
110-
async def test_process_decision_success(
109+
def test_process_decision_success(
111110
self, workflow_engine, mock_client, decision_task
112111
):
113112
"""Test successful decision processing."""
@@ -119,14 +118,13 @@ async def test_process_decision_success(
119118
return_value=[Mock()],
120119
):
121120
# Process the decision
122-
result = await workflow_engine.process_decision(decision_task)
121+
result = workflow_engine.process_decision(decision_task)
123122

124123
# Verify the result
125124
assert isinstance(result, DecisionResult)
126125
assert len(result.decisions) == 1
127126

128-
@pytest.mark.asyncio
129-
async def test_process_decision_with_history(
127+
def test_process_decision_with_history(
130128
self, workflow_engine, mock_client, decision_task
131129
):
132130
"""Test decision processing with history events."""
@@ -141,13 +139,12 @@ async def test_process_decision_with_history(
141139
return_value=[],
142140
):
143141
# Process the decision
144-
await workflow_engine.process_decision(decision_task)
142+
workflow_engine.process_decision(decision_task)
145143

146144
# Verify history events were processed
147145
mock_handle.assert_called()
148146

149-
@pytest.mark.asyncio
150-
async def test_process_decision_workflow_complete(
147+
def test_process_decision_workflow_complete(
151148
self, workflow_engine, mock_client, decision_task
152149
):
153150
"""Test decision processing when workflow is already complete."""
@@ -160,14 +157,13 @@ async def test_process_decision_workflow_complete(
160157
return_value=[],
161158
):
162159
# Process the decision
163-
result = await workflow_engine.process_decision(decision_task)
160+
result = workflow_engine.process_decision(decision_task)
164161

165162
# Verify the result
166163
assert isinstance(result, DecisionResult)
167164
assert len(result.decisions) == 0
168165

169-
@pytest.mark.asyncio
170-
async def test_process_decision_error_handling(
166+
def test_process_decision_error_handling(
171167
self, workflow_engine, mock_client, decision_task
172168
):
173169
"""Test decision processing error handling."""
@@ -179,7 +175,7 @@ async def test_process_decision_error_handling(
179175
side_effect=Exception("Test error"),
180176
):
181177
# Process the decision
182-
result = await workflow_engine.process_decision(decision_task)
178+
result = workflow_engine.process_decision(decision_task)
183179

184180
# Verify error handling - should return empty decisions
185181
assert isinstance(result, DecisionResult)
@@ -314,8 +310,7 @@ def test_workflow_engine_initialization(
314310
assert workflow_engine._decision_manager is not None
315311
assert workflow_engine._is_workflow_complete is False
316312

317-
@pytest.mark.asyncio
318-
async def test_workflow_engine_without_workflow_definition(
313+
def test_workflow_engine_without_workflow_definition(
319314
self, mock_client: Client, workflow_info, decision_task
320315
):
321316
"""Test WorkflowEngine without workflow definition."""
@@ -328,14 +323,13 @@ async def test_workflow_engine_without_workflow_definition(
328323
engine._decision_manager, "collect_pending_decisions", return_value=[]
329324
):
330325
# Process the decision
331-
result = await engine.process_decision(decision_task)
326+
result = engine.process_decision(decision_task)
332327

333328
# Verify the result
334329
assert isinstance(result, DecisionResult)
335330
assert len(result.decisions) == 0
336331

337-
@pytest.mark.asyncio
338-
async def test_workflow_engine_workflow_completion(
332+
def test_workflow_engine_workflow_completion(
339333
self, workflow_engine, mock_client, decision_task
340334
):
341335
"""Test workflow completion detection."""
@@ -361,7 +355,7 @@ async def run(self, input_data):
361355
return_value=[],
362356
):
363357
# Process the decision
364-
await workflow_engine.process_decision(decision_task)
358+
workflow_engine.process_decision(decision_task)
365359

366360
# Verify workflow is marked as complete
367361
assert workflow_engine._is_workflow_complete is True
@@ -371,8 +365,7 @@ def test_close_event_loop(self, workflow_engine):
371365
# This should not raise an exception
372366
workflow_engine._close_event_loop()
373367

374-
@pytest.mark.asyncio
375-
async def test_process_decision_with_query_results(
368+
def test_process_decision_with_query_results(
376369
self, workflow_engine, mock_client, decision_task
377370
):
378371
"""Test decision processing with query results."""
@@ -386,7 +379,7 @@ async def test_process_decision_with_query_results(
386379
return_value=mock_decisions,
387380
):
388381
# Process the decision
389-
result = await workflow_engine.process_decision(decision_task)
382+
result = workflow_engine.process_decision(decision_task)
390383

391384
# Verify the result
392385
assert isinstance(result, DecisionResult)

tests/cadence/worker/test_decision_task_handler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ async def run(self):
105105
mock_engine._is_workflow_complete = False # Add missing attribute
106106
mock_decision_result = Mock(spec=DecisionResult)
107107
mock_decision_result.decisions = [Decision()]
108-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
108+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
109109

110110
with patch(
111111
"cadence.worker._decision_task_handler.WorkflowEngine",
@@ -178,7 +178,7 @@ async def run(self):
178178
mock_engine._is_workflow_complete = False # Add missing attribute
179179
mock_decision_result = Mock(spec=DecisionResult)
180180
mock_decision_result.decisions = []
181-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
181+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
182182

183183
with patch(
184184
"cadence.worker._decision_task_handler.WorkflowEngine",
@@ -245,7 +245,7 @@ async def run(self):
245245
mock_engine._is_workflow_complete = False # Add missing attribute
246246
mock_decision_result = Mock(spec=DecisionResult)
247247
mock_decision_result.decisions = []
248-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
248+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
249249

250250
with patch(
251251
"cadence.worker._decision_task_handler.WorkflowEngine",
@@ -428,7 +428,7 @@ async def run(self):
428428
mock_engine._is_workflow_complete = False # Add missing attribute
429429
mock_decision_result = Mock(spec=DecisionResult)
430430
mock_decision_result.decisions = []
431-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
431+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
432432

433433
with patch(
434434
"cadence.worker._decision_task_handler.WorkflowEngine",

tests/cadence/worker/test_decision_task_handler_integration.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def create_mock_decision_task(
9999

100100
@pytest.mark.asyncio
101101
async def test_handle_decision_task_success(
102-
self, decision_task_handler, mock_client
102+
self, decision_task_handler: DecisionTaskHandler, mock_client
103103
):
104104
"""Test successful decision task handling."""
105105
# Create a mock decision task
@@ -110,7 +110,7 @@ async def test_handle_decision_task_success(
110110
mock_engine = Mock()
111111
# Create a proper Decision object
112112
decision = Decision()
113-
mock_engine.process_decision = AsyncMock(
113+
mock_engine.process_decision = Mock(
114114
return_value=Mock(
115115
decisions=[decision], # Proper Decision object
116116
)
@@ -182,7 +182,7 @@ async def test_workflow_engine_creation_each_task(
182182
"cadence.worker._decision_task_handler.WorkflowEngine"
183183
) as mock_engine_class:
184184
mock_engine = Mock()
185-
mock_engine.process_decision = AsyncMock(
185+
mock_engine.process_decision = Mock(
186186
return_value=Mock(
187187
decisions=[],
188188
)
@@ -211,9 +211,7 @@ async def test_decision_task_failure_handling(
211211
"cadence.worker._decision_task_handler.WorkflowEngine"
212212
) as mock_engine_class:
213213
mock_engine = Mock()
214-
mock_engine.process_decision = AsyncMock(
215-
side_effect=Exception("Test error")
216-
)
214+
mock_engine.process_decision = Mock(side_effect=Exception("Test error"))
217215
mock_engine_class.return_value = mock_engine
218216

219217
# Handle the decision task - this should catch the exception

tests/cadence/worker/test_task_handler_integration.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async def run(self):
8484
mock_engine._is_workflow_complete = False # Add missing attribute
8585
mock_decision_result = Mock(spec=DecisionResult)
8686
mock_decision_result.decisions = []
87-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
87+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
8888

8989
with patch(
9090
"cadence.worker._decision_task_handler.WorkflowEngine",
@@ -117,7 +117,7 @@ async def run(self):
117117
# Mock workflow engine to raise an error
118118
mock_engine = Mock(spec=WorkflowEngine)
119119
mock_engine._is_workflow_complete = False # Add missing attribute
120-
mock_engine.process_decision = AsyncMock(
120+
mock_engine.process_decision = Mock(
121121
side_effect=RuntimeError("Workflow processing failed")
122122
)
123123

@@ -157,7 +157,7 @@ async def run(self):
157157
mock_engine._is_workflow_complete = False # Add missing attribute
158158
mock_decision_result = Mock(spec=DecisionResult)
159159
mock_decision_result.decisions = []
160-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
160+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
161161

162162
# Track if context is activated
163163
context_activated = False
@@ -229,7 +229,7 @@ async def run(self):
229229
mock_decision_result = Mock(spec=DecisionResult)
230230
mock_decision_result.decisions = []
231231

232-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
232+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
233233

234234
with patch(
235235
"cadence.worker._decision_task_handler.WorkflowEngine",
@@ -266,7 +266,7 @@ async def run(self):
266266
mock_engine._is_workflow_complete = False # Add missing attribute
267267
mock_decision_result = Mock(spec=DecisionResult)
268268
mock_decision_result.decisions = []
269-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
269+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
270270

271271
with patch(
272272
"cadence.worker._decision_task_handler.WorkflowEngine",
@@ -298,7 +298,7 @@ async def run(self):
298298
# Mock workflow engine to raise an error
299299
mock_engine = Mock(spec=WorkflowEngine)
300300
mock_engine._is_workflow_complete = False # Add missing attribute
301-
mock_engine.process_decision = AsyncMock(
301+
mock_engine.process_decision = Mock(
302302
side_effect=RuntimeError("Workflow processing failed")
303303
)
304304

@@ -366,7 +366,7 @@ async def run(self):
366366
mock_engine._is_workflow_complete = False # Add missing attribute
367367
mock_decision_result = Mock(spec=DecisionResult)
368368
mock_decision_result.decisions = []
369-
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
369+
mock_engine.process_decision = Mock(return_value=mock_decision_result)
370370

371371
with patch(
372372
"cadence.worker._decision_task_handler.WorkflowEngine",

0 commit comments

Comments
 (0)