Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 89 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,97 @@ Orchestrations can be continued as new using the `continue_as_new` API. This API

Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.

### Retry policies (TODO)
### Retry policies

Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.

#### Creating a retry policy

```python
from datetime import timedelta
from durabletask import task

retry_policy = task.RetryPolicy(
first_retry_interval=timedelta(seconds=1), # Initial delay before first retry
max_number_of_attempts=5, # Maximum total attempts (includes first attempt)
backoff_coefficient=2.0, # Exponential backoff multiplier (must be >= 1)
max_retry_interval=timedelta(seconds=30), # Cap on retry delay
retry_timeout=timedelta(minutes=5), # Total time limit for all retries (optional)
)
```

**Notes:**
- `max_number_of_attempts` **includes the initial attempt**. For example, `max_number_of_attempts=5` means 1 initial attempt + up to 4 retries.
- `retry_timeout` is optional. If omitted or set to `None`, retries continue until `max_number_of_attempts` is reached.
- `backoff_coefficient` controls exponential backoff: delay = `first_retry_interval * (backoff_coefficient ^ retry_number)`, capped by `max_retry_interval`.
- `non_retryable_error_types` (optional) can specify additional exception types to treat as non-retryable (e.g., `[ValueError, TypeError]`). `NonRetryableError` is always non-retryable regardless of this setting.

#### Using retry policies

Apply retry policies to activities or sub-orchestrations:

```python
def my_orchestrator(ctx: task.OrchestrationContext, input):
# Retry an activity
result = yield ctx.call_activity(my_activity, input=data, retry_policy=retry_policy)

# Retry a sub-orchestration
result = yield ctx.call_sub_orchestrator(child_orchestrator, input=data, retry_policy=retry_policy)
```

#### Non-retryable errors

For errors that should not be retried (e.g., validation failures, permanent errors), raise a `NonRetryableError`:

```python
from durabletask.task import NonRetryableError

def my_activity(ctx: task.ActivityContext, input):
if input is None:
# This error will bypass retry logic and fail immediately
raise NonRetryableError("Input cannot be None")

# Transient errors (network, timeouts, etc.) will be retried
return call_external_service(input)
```

Even with a retry policy configured, `NonRetryableError` will fail immediately without retrying.

#### Error type matching behavior

**Important:** Error type matching uses **exact class name comparison**, not `isinstance()` checks. This is because exception objects are serialized to gRPC protobuf messages, where only the class name (as a string) survives serialization.

**Key implications:**

- **Not inheritance-aware**: If you specify `ValueError` in `non_retryable_error_types`, it will only match exceptions with the exact class name `"ValueError"`. A custom subclass like `CustomValueError(ValueError)` will NOT match.
- **Workaround**: List all exception types explicitly, including subclasses you want to handle.
- **Built-in exception**: `NonRetryableError` is always treated as non-retryable, matched by the name `"NonRetryableError"`.

**Example:**

```python
from datetime import timedelta
from durabletask import task

# Custom exception hierarchy
class ValidationError(ValueError):
pass

# This policy ONLY matches exact "ValueError" by name
retry_policy = task.RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
non_retryable_error_types=[ValueError] # Won't match ValidationError subclass!
)

# To handle both, list them explicitly:
retry_policy = task.RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
non_retryable_error_types=[ValueError, ValidationError] # Both converted to name strings
)
```

## Getting Started

### Prerequisites
Expand Down Expand Up @@ -194,7 +281,7 @@ Certain aspects like multi-app activities require the full dapr runtime to be ru
```shell
dapr init || true

dapr run --app-id test-app --dapr-grpc-port 4001 --components-path ./examples/components/
dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/components/
```

To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:
Expand Down
19 changes: 19 additions & 0 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,28 @@ def __init__(
interceptors=interceptors,
options=channel_options,
)
self._channel = channel
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

def __enter__(self):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add context manager option for clean closing

return self

def __exit__(self, exc_type, exc, tb):
try:
self.close()
finally:
return False

def close(self) -> None:
"""Close the underlying gRPC channel."""
try:
# grpc.Channel.close() is idempotent
self._channel.close()
except Exception:
# Best-effort cleanup
pass

def schedule_new_orchestration(
self,
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
Expand Down
50 changes: 49 additions & 1 deletion durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,29 @@ class OrchestrationStateError(Exception):
pass


class NonRetryableError(Exception):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new helper, that is present in Temporal but not us, where we can defined errors that are non-retryable so activities don't attempt to retry when raised

"""Exception indicating the operation should not be retried.

If an activity or sub-orchestration raises this exception, retry logic will be
bypassed and the failure will be returned immediately to the orchestrator.
"""

pass


def is_error_non_retryable(error_type: str, policy: RetryPolicy) -> bool:
"""Checks whether an error type is non-retryable."""
is_non_retryable = False
if error_type == "NonRetryableError":
is_non_retryable = True
elif (
policy.non_retryable_error_types is not None
and error_type in policy.non_retryable_error_types
):
is_non_retryable = True
return is_non_retryable


class Task(ABC, Generic[T]):
"""Abstract base class for asynchronous tasks in a durable orchestration."""

Expand Down Expand Up @@ -397,7 +420,7 @@ def compute_next_delay(self) -> Optional[timedelta]:
next_delay_f = min(
next_delay_f, self._retry_policy.max_retry_interval.total_seconds()
)
return timedelta(seconds=next_delay_f)
return timedelta(seconds=next_delay_f)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixes a bug with retry, as the login in line 400 above f datetime.utcnow() < retry_expiration: means that we should retry, but as this was badly indented if for some reason max_retry_interval is not none this was not working.

Copy link
Author

@filintod filintod Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also kind of mentioned in one of the gotchas in dapr/python-sdk#836, I found this bug beforehand, the other gotchas are gotchas or not-explained behavior

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some info in README to cover the gotchas, but we might need to add to python-sdk


return None

Expand Down Expand Up @@ -490,6 +513,7 @@ def __init__(
backoff_coefficient: Optional[float] = 1.0,
max_retry_interval: Optional[timedelta] = None,
retry_timeout: Optional[timedelta] = None,
non_retryable_error_types: Optional[list[Union[str, type]]] = None,
):
"""Creates a new RetryPolicy instance.

Expand All @@ -505,6 +529,11 @@ def __init__(
The maximum retry interval to use for any retry attempt.
retry_timeout : Optional[timedelta]
The maximum amount of time to spend retrying the operation.
non_retryable_error_types : Optional[list[Union[str, type]]]
A list of exception type names or classes that should not be retried.
If a failure's error type matches any of these, the task fails immediately.
The built-in NonRetryableError is always treated as non-retryable regardless
of this setting.
"""
# validate inputs
if first_retry_interval < timedelta(seconds=0):
Expand All @@ -523,6 +552,16 @@ def __init__(
self._backoff_coefficient = backoff_coefficient
self._max_retry_interval = max_retry_interval
self._retry_timeout = retry_timeout
# Normalize non-retryable error type names to a set of strings
names: Optional[set[str]] = None
if non_retryable_error_types:
names = set[str]()
for t in non_retryable_error_types:
if isinstance(t, str) and t:
names.add(t)
elif isinstance(t, type):
names.add(t.__name__)
self._non_retryable_error_types = names

@property
def first_retry_interval(self) -> timedelta:
Expand All @@ -549,6 +588,15 @@ def retry_timeout(self) -> Optional[timedelta]:
"""The maximum amount of time to spend retrying the operation."""
return self._retry_timeout

@property
def non_retryable_error_types(self) -> Optional[set[str]]:
"""Set of error type names that should not be retried.

Comparison is performed against the errorType string provided by the
backend (typically the exception class name).
"""
return self._non_retryable_error_types


def get_name(fn: Callable) -> str:
"""Returns the name of the provided function"""
Expand Down
Loading
Loading