Skip to content

Commit fb05dff

Browse files
committed
Add async workflow capabilities and enhance determinism in durabletask
- Introduced a new `durabletask.aio` package for async workflows, including `AsyncWorkflowContext` and related awaitables. - Added deterministic utilities to ensure consistent behavior across executions. - Enhanced error handling and debugging features for async workflows. - Updated README and added documentation for new async features. - Introduced comprehensive tests for async functionality, including compatibility checks and integration tests. Signed-off-by: Filinto Duran <[email protected]>
1 parent 1b7d7df commit fb05dff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+14219
-170
lines changed

.github/workflows/pr-validation.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
strategy:
1818
fail-fast: false
1919
matrix:
20-
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
20+
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
2121

2222
steps:
2323
- uses: actions/checkout@v4
@@ -69,4 +69,4 @@ jobs:
6969
TWINE_PASSWORD: ${{ secrets.PYPI_UPLOAD_PASS }}
7070
run: |
7171
python -m build
72-
twine upload dist/*
72+
twine upload dist/*

README.md

Lines changed: 312 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,32 @@
66

77
This repo contains a Python client SDK for use with the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go) and [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.
88

9+
> **🚀 Enhanced Async Features**: This fork includes comprehensive async workflow enhancements with advanced error handling, non-determinism detection, timeout support, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md) for details.
10+
11+
## Quick Start - Async Workflows
12+
13+
For async workflow development, use the new `durabletask.aio` package:
14+
15+
```python
16+
from durabletask.aio import AsyncWorkflowContext
17+
from durabletask.worker import TaskHubGrpcWorker
18+
19+
async def my_workflow(ctx: AsyncWorkflowContext, name: str) -> str:
20+
result = await ctx.call_activity(say_hello, input=name)
21+
await ctx.sleep(1.0)
22+
return f"Workflow completed: {result}"
23+
24+
def say_hello(ctx, name: str) -> str:
25+
return f"Hello, {name}!"
26+
27+
# Register and run
28+
with TaskHubGrpcWorker() as worker:
29+
worker.add_activity(say_hello)
30+
worker.add_orchestrator(my_workflow)
31+
worker.start()
32+
# ... schedule workflows with client
33+
```
34+
935
⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️
1036

1137
> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
@@ -118,15 +144,15 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`
118144

119145
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
120146

121-
### Continue-as-new (TODO)
147+
### Continue-as-new
122148

123149
Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.
124150

125151
### Suspend, resume, and terminate
126152

127153
Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.
128154

129-
### Retry policies (TODO)
155+
### Retry policies
130156

131157
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
132158

@@ -155,6 +181,13 @@ python3 -m pip install .
155181

156182
See the [examples](./examples) directory for a list of sample orchestrations and instructions on how to run them.
157183

184+
**Enhanced Async Examples:**
185+
- `async_activity_sequence.py` - Updated to use new `durabletask.aio` package
186+
- `async_fanout_fanin.py` - Updated to use new `durabletask.aio` package
187+
- `async_enhanced_features.py` - Comprehensive demo of all enhanced features
188+
- `async_non_determinism_demo.py` - Non-determinism detection demonstration
189+
- See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for detailed examples and usage patterns
190+
158191
## Development
159192

160193
The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL.
@@ -206,6 +239,283 @@ To run the E2E tests on a specific python version (eg: 3.11), run the following
206239
tox -e py311-e2e
207240
```
208241

242+
### Configuration
243+
244+
#### Connection Configuration
245+
246+
The SDK connects to a Durable Task sidecar. By default it uses `localhost:4001`. You can override via environment variables (checked in order):
247+
248+
- `DAPR_GRPC_ENDPOINT` - Full endpoint (e.g., `localhost:4001`, `grpcs://host:443`)
249+
- `DAPR_GRPC_HOST` (or `DAPR_RUNTIME_HOST`) and `DAPR_GRPC_PORT` - Host and port separately
250+
251+
Example (common ports: 4001 for DurableTask-Go emulator, 50001 for Dapr sidecar):
252+
253+
```sh
254+
export DAPR_GRPC_ENDPOINT=localhost:4001
255+
# or
256+
export DAPR_GRPC_HOST=localhost
257+
export DAPR_GRPC_PORT=50001
258+
```
259+
260+
#### GRPC Keepalive Configuration
261+
262+
Configure GRPC keepalive settings to maintain long-lived connections:
263+
264+
- `DAPR_GRPC_KEEPALIVE_ENABLED` - Enable keepalive (default: `false`)
265+
- `DAPR_GRPC_KEEPALIVE_TIME_MS` - Keepalive time in milliseconds (default: `120000`)
266+
- `DAPR_GRPC_KEEPALIVE_TIMEOUT_MS` - Keepalive timeout in milliseconds (default: `20000`)
267+
- `DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS` - Permit keepalive without active calls (default: `false`)
268+
269+
Example:
270+
271+
```sh
272+
export DAPR_GRPC_KEEPALIVE_ENABLED=true
273+
export DAPR_GRPC_KEEPALIVE_TIME_MS=60000
274+
export DAPR_GRPC_KEEPALIVE_TIMEOUT_MS=10000
275+
```
276+
277+
#### GRPC Retry Configuration
278+
279+
Configure automatic retry behavior for transient failures:
280+
281+
- `DAPR_GRPC_RETRY_ENABLED` - Enable automatic retries (default: `false`)
282+
- `DAPR_GRPC_RETRY_MAX_ATTEMPTS` - Maximum retry attempts (default: `4`)
283+
- `DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS` - Initial backoff in milliseconds (default: `100`)
284+
- `DAPR_GRPC_RETRY_MAX_BACKOFF_MS` - Maximum backoff in milliseconds (default: `1000`)
285+
- `DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER` - Backoff multiplier (default: `2.0`)
286+
- `DAPR_GRPC_RETRY_CODES` - Comma-separated status codes to retry (default: `UNAVAILABLE,DEADLINE_EXCEEDED`)
287+
288+
Example:
289+
290+
```sh
291+
export DAPR_GRPC_RETRY_ENABLED=true
292+
export DAPR_GRPC_RETRY_MAX_ATTEMPTS=5
293+
export DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS=200
294+
```
295+
296+
#### Async Workflow Configuration
297+
298+
Configure async workflow behavior and debugging:
299+
300+
- `DAPR_WF_DEBUG` or `DT_DEBUG` - Enable debug mode for workflows (set to `true`)
301+
- `DAPR_WF_DISABLE_DETECTION` - Disable non-determinism detection (set to `true`)
302+
303+
Example:
304+
305+
```sh
306+
export DAPR_WF_DEBUG=true
307+
export DAPR_WF_DISABLE_DETECTION=false
308+
```
309+
310+
### Async workflow authoring
311+
312+
For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).
313+
314+
You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:
315+
316+
```python
317+
from durabletask.worker import TaskHubGrpcWorker
318+
from durabletask.aio import AsyncWorkflowContext
319+
320+
async def my_orch(ctx: AsyncWorkflowContext, input) -> str:
321+
r1 = await ctx.call_activity(act1, input=input)
322+
await ctx.sleep(1.0)
323+
r2 = await ctx.call_activity(act2, input=r1)
324+
return r2
325+
326+
with TaskHubGrpcWorker() as worker:
327+
worker.add_orchestrator(my_orch)
328+
```
329+
330+
Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.
331+
332+
In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `SandboxViolationError` if used.
333+
334+
> **Enhanced Sandbox Features**: The enhanced version includes comprehensive non-determinism detection, timeout support, enhanced concurrency primitives, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for complete documentation.
335+
336+
#### Async patterns
337+
338+
- Activities and sub-orchestrations can be referenced by function object or by their registered string name. Both forms are supported:
339+
- Function reference (preferred for IDE/type support) or string name (useful across modules/languages).
340+
341+
- Activities:
342+
```python
343+
result = await ctx.call_activity("process", input={"x": 1})
344+
# or: result = await ctx.call_activity(process, input={"x": 1})
345+
```
346+
347+
- Timers:
348+
```python
349+
await ctx.sleep(1.5) # seconds or timedelta
350+
```
351+
352+
- External events:
353+
```python
354+
val = await ctx.wait_for_external_event("approval")
355+
```
356+
357+
- Concurrency:
358+
```python
359+
t1 = ctx.call_activity("a"); t2 = ctx.call_activity("b")
360+
await ctx.when_all([t1, t2])
361+
winner = await ctx.when_any([ctx.wait_for_external_event("x"), ctx.sleep(5)])
362+
363+
# gather combines awaitables and preserves order
364+
results = await ctx.gather(t1, t2)
365+
# gather with exception capture
366+
results_or_errors = await ctx.gather(t1, t2, return_exceptions=True)
367+
```
368+
369+
#### Async vs. generator API differences
370+
371+
- Async authoring (`durabletask.aio`): awaiting returns the operation's value. Exceptions are raised on `await` (no `is_failed`).
372+
- Generator authoring (`durabletask.task`): yielding returns `Task` objects. Use `get_result()` to read values; failures surface via `is_failed()` or by raising on `get_result()`.
373+
374+
Examples:
375+
376+
```python
377+
# Async authoring (await returns value)
378+
# when_any returns a proxy that compares equal to the original awaitable
379+
# and exposes get_result() for the completed item.
380+
approval = ctx.wait_for_external_event("approval")
381+
winner = await ctx.when_any([approval, ctx.sleep(60)])
382+
if winner == approval:
383+
details = winner.get_result()
384+
```
385+
386+
```python
387+
# Async authoring (index + result)
388+
idx, result = await ctx.when_any_with_result([approval, ctx.sleep(60)])
389+
if idx == 0: # approval won
390+
details = result
391+
```
392+
393+
```python
394+
# Generator authoring (yield returns Task)
395+
approval = ctx.wait_for_external_event("approval")
396+
winner = yield task.when_any([approval, ctx.create_timer(timedelta(seconds=60))])
397+
if winner == approval:
398+
details = approval.get_result()
399+
```
400+
401+
Failure handling in async:
402+
403+
```python
404+
try:
405+
val = await ctx.call_activity("might_fail")
406+
except Exception as e:
407+
# handle failure branch
408+
...
409+
```
410+
411+
Or capture with gather:
412+
413+
```python
414+
res = await ctx.gather(ctx.call_activity("a"), return_exceptions=True)
415+
if isinstance(res[0], Exception):
416+
...
417+
```
418+
419+
- Sub-orchestrations (function reference or registered name):
420+
```python
421+
out = await ctx.call_sub_orchestrator(child_fn, input=payload)
422+
# or: out = await ctx.call_sub_orchestrator("child", input=payload)
423+
```
424+
425+
- Deterministic utilities:
426+
```python
427+
now = ctx.now(); rid = ctx.random().random(); uid = ctx.uuid4()
428+
```
429+
430+
- Workflow metadata and info:
431+
```python
432+
# Read-only info snapshot (Temporal-style convenience)
433+
info = ctx.info
434+
print(f"Workflow: {info.workflow_name}, Instance: {info.instance_id}")
435+
print(f"Replaying: {info.is_replaying}, Suspended: {info.is_suspended}")
436+
437+
# Or access properties directly
438+
instance_id = ctx.instance_id
439+
is_replaying = ctx.is_replaying
440+
is_suspended = ctx.is_suspended
441+
workflow_name = ctx.workflow_name
442+
parent_instance_id = ctx.parent_instance_id # for sub-orchestrators
443+
444+
# Execution info (internal metadata if provided by sidecar)
445+
exec_info = ctx.execution_info
446+
447+
# Tracing span IDs
448+
span_id = ctx.orchestration_span_id # or ctx.workflow_span_id (alias)
449+
```
450+
451+
- Workflow metadata/headers (async only for now):
452+
```python
453+
# Attach contextual metadata (e.g., tracing, tenant, app info)
454+
ctx.set_metadata({"x-trace": trace_id, "tenant": "acme"})
455+
md = ctx.get_metadata()
456+
457+
# Header aliases (same data)
458+
ctx.set_headers({"region": "us-east"})
459+
headers = ctx.get_headers()
460+
```
461+
Notes:
462+
- Useful for routing, observability, and cross-cutting concerns passed along activity/sub-orchestrator calls via the sidecar.
463+
- In python-sdk, available for both async and generator orchestrators. In this repo, currently implemented on `durabletask.aio`; generator parity is planned.
464+
465+
- Cross-app activity/sub-orchestrator routing (async only for now):
466+
```python
467+
# Route activity to a different app via app_id
468+
result = await ctx.call_activity("process", input=data, app_id="worker-app-2")
469+
470+
# Route sub-orchestrator to a different app
471+
child_result = await ctx.call_sub_orchestrator("child_workflow", input=data, app_id="orchestrator-app-2")
472+
```
473+
Notes:
474+
- The `app_id` parameter enables multi-app orchestrations where activities or child workflows run in different application instances.
475+
- Requires sidecar support for cross-app invocation.
476+
477+
#### Worker readiness
478+
479+
When starting a worker and scheduling immediately, wait for the connection to the sidecar to be established:
480+
481+
```python
482+
with TaskHubGrpcWorker() as worker:
483+
worker.add_orchestrator(my_orch)
484+
worker.start()
485+
worker.wait_for_ready(timeout=5)
486+
# Now safe to schedule
487+
```
488+
489+
#### Suspension & termination
490+
491+
- `ctx.is_suspended` reflects suspension state during replay/processing.
492+
- Suspend pauses progress without raising inside async orchestrators.
493+
- Terminate completes with `TERMINATED` status; use client APIs to terminate/resume.
494+
- Only new events are buffered while suspended; replay events continue to apply to rebuild local state deterministically.
495+
496+
### Tracing and context propagation
497+
498+
The SDK surfaces W3C tracing context provided by the sidecar:
499+
500+
- Orchestrations: `ctx.trace_parent`, `ctx.trace_state`, and `ctx.orchestration_span_id` are available on `OrchestrationContext` (and on `AsyncWorkflowContext`).
501+
- Activities: `ctx.trace_parent` and `ctx.trace_state` are available on `ActivityContext`.
502+
503+
Propagate tracing to external systems (e.g., HTTP):
504+
505+
```python
506+
def activity(ctx, payload):
507+
headers = {
508+
"traceparent": ctx.trace_parent or "",
509+
"tracestate": ctx.trace_state or "",
510+
}
511+
# requests.post(url, headers=headers, json=payload)
512+
return "ok"
513+
```
514+
515+
Notes:
516+
- The sidecar controls inbound `traceparent`/`tracestate`. App code can append vendor entries to `tracestate` for outbound calls but cannot currently alter the sidecar’s propagation for downstream Durable operations.
517+
- Configure the sidecar endpoint with `DURABLETASK_GRPC_ENDPOINT` (e.g., `127.0.0.1:56178`).
518+
209519
## Contributing
210520

211521
This project welcomes contributions and suggestions. Most contributions require you to agree to a

durabletask/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
# Public async exports (import directly from durabletask.aio)
5+
from durabletask.aio import AsyncWorkflowContext, CoroutineOrchestratorRunner # noqa: F401
6+
47
"""Durable Task SDK for Python"""
58

69
PACKAGE_NAME = "durabletask"

0 commit comments

Comments
 (0)