Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ jobs:
# Install and run the durabletask-go sidecar for running e2e tests
- name: Pytest e2e tests
run: |
# TODO: use dapr run instead of durabletask-go as it provides a more reliable sidecar behaviorfor e2e tests
go install github.com/dapr/durabletask-go@main
durabletask-go --port 4001 &
tox -e py${{ matrix.python-version }}-e2e
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,6 @@ dmypy.json

# IDEs
.idea
.vscode

coverage.lcov
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"[python]": {
"editor.defaultFormatter": "ms-python.autopep8",
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
Expand Down
181 changes: 179 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`

Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.

### Continue-as-new (TODO)
### Continue-as-new

Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.

Expand Down Expand Up @@ -281,6 +281,9 @@ The following is more information about how to develop this project. Note that d
### Generating protobufs

```sh
# install dev dependencies for generating protobufs and running tests
pip3 install '.[dev]'

make gen-proto
```

Expand Down Expand Up @@ -319,9 +322,183 @@ dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/co
To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:

```sh
tox -e py311 -- e2e
tox -e py311-e2e
```

### Configuration

#### Connection Configuration

The SDK connects to a Durable Task sidecar. By default it uses `localhost:4001`. You can override via environment variables (checked in order):

- `DAPR_GRPC_ENDPOINT` - Full endpoint (e.g., `localhost:4001`, `grpcs://host:443`)
- `DAPR_GRPC_HOST` (or `DAPR_RUNTIME_HOST`) and `DAPR_GRPC_PORT` - Host and port separately

Example (common ports: 4001 for DurableTask-Go emulator, 50001 for Dapr sidecar):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the 4001 supposed to be for this repo, durabletask python? or is there some emulator thing somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is historical, but 4001 was the default before. also look at durabletask-go https://github.com/dapr/durabletask-go (same 4001)


```sh
export DAPR_GRPC_ENDPOINT=localhost:4001
# or
export DAPR_GRPC_HOST=localhost
export DAPR_GRPC_PORT=50001
```


#### Async Workflow Configuration

Configure async workflow behavior and debugging:

- `DAPR_WF_DISABLE_DETERMINISTIC_DETECTION` - Disable non-determinism detection (set to `true`)

Example:

```sh
export DAPR_WF_DISABLE_DETERMINISTIC_DETECTION=false
```

### Async workflow authoring

For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md).

You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:

```python
from durabletask.worker import TaskHubGrpcWorker
from durabletask.aio import AsyncWorkflowContext

async def my_orch(ctx: AsyncWorkflowContext, input) -> str:
r1 = await ctx.call_activity(act1, input=input)
await ctx.sleep(1.0)
r2 = await ctx.call_activity(act2, input=r1)
return r2

with TaskHubGrpcWorker() as worker:
worker.add_orchestrator(my_orch)
```

The sandbox (enabled by default) patches standard Python functions to deterministic equivalents during workflow execution. This allows natural async code like `asyncio.sleep()`, `random.random()`, and `asyncio.gather()` to work correctly with workflow replay. Three modes are available:

- `"best_effort"` (default): Patches functions, minimal overhead
- `"strict"`: Patches + blocks dangerous operations (file I/O, `asyncio.create_task`)
- `"off"`: No patching (requires manual use of `ctx.*` methods everywhere)

> **Enhanced Sandbox Features**: The enhanced version includes comprehensive non-determinism detection, timeout support, enhanced concurrency primitives, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for complete documentation.

#### Async patterns

- Activities and sub-orchestrations can be referenced by function object or by their registered string name. Both forms are supported:
- Function reference (preferred for IDE/type support) or string name (useful across modules/languages).

- Activities:
```python
result = await ctx.call_activity("process", input={"x": 1})
# or: result = await ctx.call_activity(process, input={"x": 1})
```

- Timers:
```python
await ctx.sleep(1.5) # seconds or timedelta
```

- External events:
```python
val = await ctx.wait_for_external_event("approval")
```

- Concurrency:
```python
t1 = ctx.call_activity("a"); t2 = ctx.call_activity("b")
# when_all waits for all tasks and returns results in order
results = await ctx.when_all([t1, t2])
# when_any returns (index, result) tuple of first completed task
idx, result = await ctx.when_any([ctx.wait_for_external_event("x"), ctx.create_timer(5)])
```

#### Async vs. generator API differences

- Async authoring (`durabletask.aio`): awaiting returns the operation's value. Exceptions are raised on `await` (no `is_failed`).
- Generator authoring (`durabletask.task`): yielding returns `Task` objects. Use `get_result()` to read values; failures surface via `is_failed()` or by raising on `get_result()`.

Examples:

```python
# Async authoring (await returns value)
# when_any returns a proxy that compares equal to the original awaitable
# and exposes get_result() for the completed item.
approval = ctx.wait_for_external_event("approval")
winner = await ctx.when_any([approval, ctx.sleep(60)])
if winner == approval:
details = winner.get_result()
```

```python
# Async authoring (index + result)
idx, result = await ctx.when_any_with_result([approval, ctx.sleep(60)])
if idx == 0: # approval won
details = result
```

```python
# Generator authoring (yield returns Task)
approval = ctx.wait_for_external_event("approval")
winner = yield task.when_any([approval, ctx.create_timer(timedelta(seconds=60))])
if winner == approval:
details = approval.get_result()
```

Failure handling in async:

```python
try:
val = await ctx.call_activity("might_fail")
except Exception as e:
# handle failure branch
...
```

- Sub-orchestrations (function reference or registered name):
```python
out = await ctx.call_sub_orchestrator(child_fn, input=payload)
# or: out = await ctx.call_sub_orchestrator("child", input=payload)
```

- Deterministic utilities:
```python
now = ctx.now(); rid = ctx.random().random(); uid = ctx.uuid4()
```

- Cross-app activity/sub-orchestrator routing (async only for now):
```python
# Route activity to a different app via app_id
result = await ctx.call_activity("process", input=data, app_id="worker-app-2")

# Route sub-orchestrator to a different app
child_result = await ctx.call_sub_orchestrator("child_workflow", input=data, app_id="orchestrator-app-2")
```
Notes:
- The `app_id` parameter enables multi-app orchestrations where activities or child workflows run in different application instances.
- Requires sidecar support for cross-app invocation.

#### Worker readiness

When starting a worker and scheduling immediately, wait for the connection to the sidecar to be established:

```python
with TaskHubGrpcWorker() as worker:
worker.add_orchestrator(my_orch)
worker.start()
worker.wait_for_ready(timeout=5)
# Now safe to schedule
```

#### Suspension & termination

- `ctx.is_suspended` reflects suspension state during replay/processing.
- Suspend pauses progress without raising inside async orchestrators.
- Terminate completes with `TERMINATED` status; use client APIs to terminate/resume.
- Only new events are buffered while suspended; replay events continue to apply to rebuild local state deterministically.


## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Expand Down
1 change: 0 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python # supports protobuf 6.x and aligns with generated code
3 changes: 3 additions & 0 deletions durabletask/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

# Public async exports (import directly from durabletask.aio)
from durabletask.aio import AsyncWorkflowContext, CoroutineOrchestratorRunner # noqa: F401

"""Durable Task SDK for Python"""

PACKAGE_NAME = "durabletask"
Loading
Loading