Skip to content

Conversation

@filintod
Copy link

@filintod filintod commented Nov 8, 2025

No description provided.

@filintod filintod force-pushed the filinto/asyncio-p2 branch 2 times, most recently from b20de61 to 27068dc Compare November 10, 2025 13:56
Signed-off-by: Filinto Duran <[email protected]>
Signed-off-by: Filinto Duran <[email protected]>
Signed-off-by: Filinto Duran <[email protected]>
Signed-off-by: Filinto Duran <[email protected]>
# tox -e py310-e2e
# to use custom grpc endpoint:
# DAPR_GRPC_ENDPOINT=localhost:12345 tox -e py310-e2e
# to use custom grpc endpoint and not capture print statements (-s arg in pytest):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a bad merge, let's keep the previous line

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any todo left here?

{
"[python]": {
"editor.defaultFormatter": "ms-python.autopep8",
"editor.defaultFormatter": "charliermarsh.ruff",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a benefit to this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using ruff instead of autopep, so way faster

- `DAPR_GRPC_ENDPOINT` - Full endpoint (e.g., `localhost:4001`, `grpcs://host:443`)
- `DAPR_GRPC_HOST` (or `DAPR_RUNTIME_HOST`) and `DAPR_GRPC_PORT` - Host and port separately

Example (common ports: 4001 for DurableTask-Go emulator, 50001 for Dapr sidecar):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the 4001 supposed to be for this repo, durabletask python? or is there some emulator thing somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is historical, but 4001 was the default before. also look at durabletask-go https://github.com/dapr/durabletask-go (same 4001)


Configure async workflow behavior and debugging:

- `DAPR_WF_DISABLE_DETECTION` - Disable non-determinism detection (set to `true`)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `DAPR_WF_DISABLE_DETECTION` - Disable non-determinism detection (set to `true`)
- `DAPR_WF_DISABLE_DETERMINISTIC_DETECTION` - Disable non-determinism detection (set to `true`)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change in a git as this will require multiple changes (renames)


### Async workflow authoring

For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update this? i dont see a transition guide anywhere


For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).

You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:
You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API, enabling the use of await for child workflows, activities, etc (anything else??):

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the links to current ones

worker.add_orchestrator(my_orch)
```

Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain what sandbox is here, like what it is doing. Translating XXX -> deterministic synchronous equivalents. I think its translating asyncio other python pkg stuff into determinsitic synchronout python code, but others probably dont know this


Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.

In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `SandboxViolationError` if used.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see you say to presever determinism, but i still dont understand the why here. can you expand the explanation pls

Comment on lines +494 to +504
- Cross-app activity/sub-orchestrator routing (async only for now):
```python
# Route activity to a different app via app_id
result = await ctx.call_activity("process", input=data, app_id="worker-app-2")

# Route sub-orchestrator to a different app
child_result = await ctx.call_sub_orchestrator("child_workflow", input=data, app_id="orchestrator-app-2")
```
Notes:
- The `app_id` parameter enables multi-app orchestrations where activities or child workflows run in different application instances.
- Requires sidecar support for cross-app invocation.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update this pls? i think normal synchronous workflows in python sdk support cross app functionality

#### Suspension & termination

- `ctx.is_suspended` reflects suspension state during replay/processing.
- Suspend pauses progress without raising inside async orchestrators.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raising an err or what here?

@@ -0,0 +1,374 @@
# Copyright 2025 The Dapr Authors
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename this to be clearer?

# tox -e py310-e2e
# to use custom grpc endpoint:
# DAPR_GRPC_ENDPOINT=localhost:12345 tox -e py310-e2e
# to use custom grpc endpoint and not capture print statements (-s arg in pytest):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any todo left here?

import durabletask.internal.shared as shared
from durabletask import deterministic, task

# TODO: this is part of asyncio
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this todo still relevant?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, just a tag while I was doing PR split

Comment on lines +102 to +107
# Internal helper: register async orchestrators directly on the registry.
# Primarily for unit tests and direct executor usage. For production, prefer
# using TaskHubGrpcWorker.add_async_orchestrator(), which wraps and registers
# on this registry under the hood.
# TODO: this is part of asyncio
def add_async_orchestrator(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is an internal helper than we should prefix the func signature with an underscore pls

return self._registry.add_orchestrator(fn)

# Auto-detect coroutine functions and delegate to async registration
# TODO: this is part of asyncio
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clean up these todo commetns pls bc i see this same one copy/pasted around in various places here


# Async orchestrator support (opt-in)
# TODO: this is part of asyncio
def add_async_orchestrator(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed if we can inspect and autodetect if async or not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could make it protected (prefix underscore) but it is needed to keep the methods add_orchestrator tidy and concise

time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.5, 1.0) # Exponential backoff, max 1s
except Exception:
# Ignore pre-check/poll issues (e.g., mocked stubs in unit tests) and fall back
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this logic still required or just for tests? can we clean up some of this or use a builtin backoff retry pkg by chance?

Comment on lines +60 to +63
class NonDeterminismWarning(UserWarning):
"""Warning raised when non-deterministic functions are detected in workflows."""

pass
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used if it has pass in it? or its used and used through the inheritance?

super().__init__(message, **kwargs)


class SandboxViolationError(AsyncWorkflowError):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pls explain what sandobx is here and also are these classes just higher level err helper classes? if so can you comment this at the top of the file pls?

@@ -0,0 +1,279 @@
# Enhanced Async Workflow Features

This document describes the enhanced async workflow capabilities added to this fork of durabletask-python. For a deep dive into architecture and internals, see [ASYNCIO_INTERNALS.md](ASYNCIO_INTERNALS.md).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This document describes the enhanced async workflow capabilities added to this fork of durabletask-python. For a deep dive into architecture and internals, see [ASYNCIO_INTERNALS.md](ASYNCIO_INTERNALS.md).
This document describes the enhanced async workflow capabilities. For a deep dive into architecture and internals, see [ASYNCIO_INTERNALS.md](ASYNCIO_INTERNALS.md).


## Overview

This fork extends the original durabletask-python SDK with comprehensive async workflow enhancements, providing a production-ready async authoring experience with advanced debugging, error handling, and determinism enforcement.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This fork extends the original durabletask-python SDK with comprehensive async workflow enhancements, providing a production-ready async authoring experience with advanced debugging, error handling, and determinism enforcement.
The durabletask-python SDK includes comprehensive async workflow enhancements, providing a production-ready async authoring experience with advanced debugging, error handling, and determinism enforcement. This works seamlessly with the existing python workflow authoring experience.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add how it can work with the existing workflow decorator but also there is the async decorator. should we just rm the async decorator if it works under the hood to detect this anyways?

async def enhanced_workflow(ctx: AsyncWorkflowContext, input_data) -> str:
# Enhanced error handling with rich context
try:
result = await ctx.with_timeout(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a new addition? do the other sdks have this ctx.with_timeout? how is this diff than if i use an activity and then say when any of the XXX and a timer are complete then proceed? Is this only useful for async workflows and thats why it came about?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice in this readme to document the new additions/features before adding code or remove this code in readme bc it can get outdated quickly and link to quickstarts/examples/tests that showcase these features pls

- Helpful suggestions for deterministic alternatives

### 3. **Enhanced Concurrency Primitives**
- `when_any_with_result()` - Returns (index, result) tuple
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

help me to understand why do we want this pls - doesn't when_any already give the result?


### 4. **Async Context Management**
- Full async context manager support (`async with ctx:`)
- Cleanup task registry with `ctx.add_cleanup()`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want cleanup on the ctx?


### 5. **Debugging and Monitoring**
- Operation history tracking when debug mode is enabled
- `ctx.get_debug_info()` for workflow introspection
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what wf details does this provide? is it too low level to bubble up for users is really my question here 🤔

# With explicit sandbox mode
worker.add_orchestrator(
my_async_workflow,
sandbox_mode=SandboxMode.BEST_EFFORT # or "best_effort" string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the default here? note to self to look i guess when i get to this class

## Workflow Metadata and Headers (Async Only)

Purpose:
- Carry lightweight key/value context (e.g., tracing IDs, tenant, app info) across workflow steps.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so does this mean if workflow metadata/headers are async only that if i want to propagate tracing information say through the wf metadata/headers that it would only work for async workflows?

Comment on lines +183 to +187
python your_workflow.py

# Production mode (no warnings, optimal performance)
unset DAPR_WF_DEBUG
python your_workflow.py
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont these need to be wrapped in a dapr run cmd then if they req a sidecar?

async def workflow_with_cleanup(ctx, input_data):
async with ctx: # Automatic cleanup
# Register cleanup tasks
ctx.add_cleanup(lambda: print("Workflow completed"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does temporal have this and thats why you thought it would be good/valuable to add? can you give some examples on when/why i would use this pls?

@@ -0,0 +1,301 @@
# Durable Task AsyncIO Internals

This document explains how the AsyncIO implementation in this repository integrates with the existing generator‑based Durable Task runtime. It covers the coroutine→generator bridge, awaitable design, sandboxing and non‑determinism detection, error/cancellation semantics, debugging, and guidance for extending the system.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the coroutine-> generator bridge the sandbox thing? or different? what does non-determinsitm detection mean? and if triggered then do we err?


## Scope and Goals

- Async authoring model for orchestrators while preserving Durable Task's generator runtime contract
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to confirm (and can you pls add to docs here), does durable task only work with generators and so your stuff translates asyncio stuff into sync based python things that will work with generators?

Key modules:
- `durabletask/aio/context.py` — Async workflow context and deterministic utilities
- `durabletask/aio/driver.py` — Coroutine→generator bridge
- `durabletask/aio/sandbox.py` — Scoped patching and non‑determinism detection
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth separating the non-determinism detection stuff then into a diff file? or put the non-determinism stuff with the deterministic utilities from context.py?

Async orchestrators are authored as `async def` but executed by Durable Task as generators that yield `durabletask.task.Task` (or composite) instances. The bridge implements a driver that manually steps a coroutine and converts each `await` into a yielded Durable Task operation.

High‑level flow:
1. `TaskHubGrpcWorker.add_async_orchestrator(async_fn, sandbox_mode=...)` wraps `async_fn` with a `CoroutineOrchestratorRunner` and registers a generator orchestrator with the worker.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still needed or can we just call add_orchestrator and it checks under the hood if async or not?


### Coroutine→Generator Bridge

Async orchestrators are authored as `async def` but executed by Durable Task as generators that yield `durabletask.task.Task` (or composite) instances. The bridge implements a driver that manually steps a coroutine and converts each `await` into a yielded Durable Task operation.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pls add in how this is the translation of async world to safe sync workflow generator world pls

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and is it the case that each activity yields its own coroutine and then the workflow is in a main coroutine - just to make sure im tracking 🙏

Comment on lines +36 to +44
### Awaitables and Operation Descriptors

Awaitables in `durabletask.aio` implement `__await__` to expose a small operation descriptor that the driver understands. Each descriptor maps deterministically to a Durable Task operation:

- Activity: `ctx.activity(name, *, input)``task.call_activity(name, input)`
- Sub‑orchestrator: `ctx.sub_orchestrator(fn_or_name, *, input)``task.call_sub_orchestrator(...)`
- Timer: `ctx.sleep(duration)``task.create_timer(fire_at)`
- External event: `ctx.wait_for_external_event(name)``task.wait_for_external_event(name)`
- Concurrency: `ctx.when_all([...])` / `ctx.when_any([...])``task.when_all([...])` / `task.when_any([...])`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so these are wrappers on top just so it will work with asyncio? will this also work if i dont use asyncio? did you just find these helpful/useful to have or there was a hard requirement to add these bc of the nature of moving from asyncio world to sync world of workflows under the hood. can you elaborate a bit more so i have this for reference pls

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess if the awaitables are not required then can we just rm these pls?

Concurrency:
- `when_all([...])` returns an awaitable that completes with a list of results
- `when_any([...])` returns an awaitable that completes with the first completed child
- `when_any_with_result([...])` returns `(index, result)`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these just docs on existing methods? or did you add things for when_all? it not then can we pls rm?

Comment on lines +68 to +69
- Operation history when debug is enabled (`DAPR_WF_DEBUG=true` or `DT_DEBUG=true`)
- `get_debug_info()` to inspect state for diagnostics
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably can rm these pls bc i think setting log level is sufficient already right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants