Skip to content

Commit 0126d47

Browse files
committed
merge from main
Signed-off-by: Filinto Duran <[email protected]>
1 parent 16e1de1 commit 0126d47

38 files changed

+12818
-15
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,6 @@ dmypy.json
130130

131131
# IDEs
132132
.idea
133+
.vscode
133134

134135
coverage.lcov

.vscode/settings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"[python]": {
3-
"editor.defaultFormatter": "ms-python.autopep8",
3+
"editor.defaultFormatter": "charliermarsh.ruff",
44
"editor.formatOnSave": true,
55
"editor.codeActionsOnSave": {
66
"source.organizeImports": "explicit"

README.md

Lines changed: 203 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`
150150

151151
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
152152

153-
### Continue-as-new (TODO)
153+
### Continue-as-new
154154

155155
Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.
156156

@@ -281,6 +281,9 @@ The following is more information about how to develop this project. Note that d
281281
### Generating protobufs
282282

283283
```sh
284+
# install dev dependencies for generating protobufs and running tests
285+
pip3 install '.[dev]'
286+
284287
make gen-proto
285288
```
286289

@@ -319,9 +322,207 @@ dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/co
319322
To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:
320323

321324
```sh
322-
tox -e py311 -- e2e
325+
tox -e py311-e2e
326+
```
327+
328+
### Configuration
329+
330+
#### Connection Configuration
331+
332+
The SDK connects to a Durable Task sidecar. By default it uses `localhost:4001`. You can override via environment variables (checked in order):
333+
334+
- `DAPR_GRPC_ENDPOINT` - Full endpoint (e.g., `localhost:4001`, `grpcs://host:443`)
335+
- `DAPR_GRPC_HOST` (or `DAPR_RUNTIME_HOST`) and `DAPR_GRPC_PORT` - Host and port separately
336+
337+
Example (common ports: 4001 for DurableTask-Go emulator, 50001 for Dapr sidecar):
338+
339+
```sh
340+
export DAPR_GRPC_ENDPOINT=localhost:4001
341+
# or
342+
export DAPR_GRPC_HOST=localhost
343+
export DAPR_GRPC_PORT=50001
344+
```
345+
346+
347+
#### Async Workflow Configuration
348+
349+
Configure async workflow behavior and debugging:
350+
351+
- `DAPR_WF_DISABLE_DETECTION` - Disable non-determinism detection (set to `true`)
352+
353+
Example:
354+
355+
```sh
356+
export DAPR_WF_DISABLE_DETECTION=false
357+
```
358+
359+
### Async workflow authoring
360+
361+
For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).
362+
363+
You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:
364+
365+
```python
366+
from durabletask.worker import TaskHubGrpcWorker
367+
from durabletask.aio import AsyncWorkflowContext
368+
369+
async def my_orch(ctx: AsyncWorkflowContext, input) -> str:
370+
r1 = await ctx.call_activity(act1, input=input)
371+
await ctx.sleep(1.0)
372+
r2 = await ctx.call_activity(act2, input=r1)
373+
return r2
374+
375+
with TaskHubGrpcWorker() as worker:
376+
worker.add_orchestrator(my_orch)
377+
```
378+
379+
Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.
380+
381+
In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `SandboxViolationError` if used.
382+
383+
> **Enhanced Sandbox Features**: The enhanced version includes comprehensive non-determinism detection, timeout support, enhanced concurrency primitives, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for complete documentation.
384+
385+
#### Async patterns
386+
387+
- Activities and sub-orchestrations can be referenced by function object or by their registered string name. Both forms are supported:
388+
- Function reference (preferred for IDE/type support) or string name (useful across modules/languages).
389+
390+
- Activities:
391+
```python
392+
result = await ctx.call_activity("process", input={"x": 1})
393+
# or: result = await ctx.call_activity(process, input={"x": 1})
394+
```
395+
396+
- Timers:
397+
```python
398+
await ctx.sleep(1.5) # seconds or timedelta
323399
```
324400

401+
- External events:
402+
```python
403+
val = await ctx.wait_for_external_event("approval")
404+
```
405+
406+
- Concurrency:
407+
```python
408+
t1 = ctx.call_activity("a"); t2 = ctx.call_activity("b")
409+
await ctx.when_all([t1, t2])
410+
winner = await ctx.when_any([ctx.wait_for_external_event("x"), ctx.sleep(5)])
411+
412+
# gather combines awaitables and preserves order
413+
results = await ctx.gather(t1, t2)
414+
# gather with exception capture
415+
results_or_errors = await ctx.gather(t1, t2, return_exceptions=True)
416+
```
417+
418+
#### Async vs. generator API differences
419+
420+
- Async authoring (`durabletask.aio`): awaiting returns the operation's value. Exceptions are raised on `await` (no `is_failed`).
421+
- Generator authoring (`durabletask.task`): yielding returns `Task` objects. Use `get_result()` to read values; failures surface via `is_failed()` or by raising on `get_result()`.
422+
423+
Examples:
424+
425+
```python
426+
# Async authoring (await returns value)
427+
# when_any returns a proxy that compares equal to the original awaitable
428+
# and exposes get_result() for the completed item.
429+
approval = ctx.wait_for_external_event("approval")
430+
winner = await ctx.when_any([approval, ctx.sleep(60)])
431+
if winner == approval:
432+
details = winner.get_result()
433+
```
434+
435+
```python
436+
# Async authoring (index + result)
437+
idx, result = await ctx.when_any_with_result([approval, ctx.sleep(60)])
438+
if idx == 0: # approval won
439+
details = result
440+
```
441+
442+
```python
443+
# Generator authoring (yield returns Task)
444+
approval = ctx.wait_for_external_event("approval")
445+
winner = yield task.when_any([approval, ctx.create_timer(timedelta(seconds=60))])
446+
if winner == approval:
447+
details = approval.get_result()
448+
```
449+
450+
Failure handling in async:
451+
452+
```python
453+
try:
454+
val = await ctx.call_activity("might_fail")
455+
except Exception as e:
456+
# handle failure branch
457+
...
458+
```
459+
460+
Or capture with gather:
461+
462+
```python
463+
res = await ctx.gather(ctx.call_activity("a"), return_exceptions=True)
464+
if isinstance(res[0], Exception):
465+
...
466+
```
467+
468+
469+
- Sub-orchestrations (function reference or registered name):
470+
```python
471+
out = await ctx.call_sub_orchestrator(child_fn, input=payload)
472+
# or: out = await ctx.call_sub_orchestrator("child", input=payload)
473+
```
474+
475+
- Deterministic utilities:
476+
```python
477+
now = ctx.now(); rid = ctx.random().random(); uid = ctx.uuid4()
478+
```
479+
480+
- Workflow metadata/headers (async only for now):
481+
```python
482+
# Attach contextual metadata (e.g., tracing, tenant, app info)
483+
ctx.set_metadata({"x-trace": trace_id, "tenant": "acme"})
484+
md = ctx.get_metadata()
485+
486+
# Header aliases (same data)
487+
ctx.set_headers({"region": "us-east"})
488+
headers = ctx.get_headers()
489+
```
490+
Notes:
491+
- Useful for routing, observability, and cross-cutting concerns passed along activity/sub-orchestrator calls via the sidecar.
492+
- In python-sdk, available for both async and generator orchestrators. In this repo, currently implemented on `durabletask.aio`; generator parity is planned.
493+
494+
- Cross-app activity/sub-orchestrator routing (async only for now):
495+
```python
496+
# Route activity to a different app via app_id
497+
result = await ctx.call_activity("process", input=data, app_id="worker-app-2")
498+
499+
# Route sub-orchestrator to a different app
500+
child_result = await ctx.call_sub_orchestrator("child_workflow", input=data, app_id="orchestrator-app-2")
501+
```
502+
Notes:
503+
- The `app_id` parameter enables multi-app orchestrations where activities or child workflows run in different application instances.
504+
- Requires sidecar support for cross-app invocation.
505+
506+
#### Worker readiness
507+
508+
When starting a worker and scheduling immediately, wait for the connection to the sidecar to be established:
509+
510+
```python
511+
with TaskHubGrpcWorker() as worker:
512+
worker.add_orchestrator(my_orch)
513+
worker.start()
514+
worker.wait_for_ready(timeout=5)
515+
# Now safe to schedule
516+
```
517+
518+
#### Suspension & termination
519+
520+
- `ctx.is_suspended` reflects suspension state during replay/processing.
521+
- Suspend pauses progress without raising inside async orchestrators.
522+
- Terminate completes with `TERMINATED` status; use client APIs to terminate/resume.
523+
- Only new events are buffered while suspended; replay events continue to apply to rebuild local state deterministically.
524+
525+
325526
## Contributing
326527

327528
This project welcomes contributions and suggestions. Most contributions require you to agree to a

dev-requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python # supports protobuf 6.x and aligns with generated code

durabletask/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
# Public async exports (import directly from durabletask.aio)
5+
from durabletask.aio import AsyncWorkflowContext, CoroutineOrchestratorRunner # noqa: F401
6+
47
"""Durable Task SDK for Python"""
58

69
PACKAGE_NAME = "durabletask"

0 commit comments

Comments
 (0)