Skip to content

Commit b4b83d3

Browse files
committed
- Fix Critical bug in continue_as_new bug where it was missing router on the complete action.
Linting - Normalize strings to use double-quotes consistently across tests and source files as used in msft durabletask. - Run ruff format - Add `test_continue_as_new_with_activity_e2e` for better coverage of `continue_as_new` functionality. - Modify `.flake8` for extended exclusions and per-file ignores. - Introduce `tox.ini` for test environment configurations and streamline linting, typing, and example validation. Signed-off-by: Filinto Duran <[email protected]>
1 parent 06357df commit b4b83d3

23 files changed

+955
-549
lines changed

.flake8

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[flake8]
2-
ignore = E501,C901
3-
exclude =
4-
.git
5-
*_pb2*
6-
__pycache__
2+
ignore = E203,E501,W503,E701,E704,F821,C901
3+
extend-exclude = .tox,venv,.venv,build,**/.venv,**/venv,*pb2_grpc.py,*pb2.py
4+
per-file-ignores=
5+
examples/**:F541 setup.py:E121
6+
tests/**:F541,E712
7+
max-line-length = 100

.github/workflows/pr-validation.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ jobs:
2828
- name: Install dependencies
2929
run: |
3030
python -m pip install --upgrade pip
31-
pip install flake8 pytest
32-
pip install -r requirements.txt
31+
pip install -r dev-requirements.txt
3332
- name: Lint with flake8
3433
run: |
3534
flake8 . --count --show-source --statistics --exit-zero

dev-requirements.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
1-
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python
1+
# TODO: move to pyproject optional-dependencies
2+
pytest-asyncio>=0.23
3+
flake8
4+
tox>=4.0.0
5+
pytest
6+
pytest-cov
7+
grpcio-tools==1.75.1
8+
protobuf>=6.31.1

durabletask/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,4 @@
33

44
"""Durable Task SDK for Python"""
55

6-
76
PACKAGE_NAME = "durabletask"

durabletask/client.py

Lines changed: 71 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
from durabletask import task
1919
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2020

21-
TInput = TypeVar('TInput')
22-
TOutput = TypeVar('TOutput')
21+
TInput = TypeVar("TInput")
22+
TOutput = TypeVar("TOutput")
2323

2424

2525
class OrchestrationStatus(Enum):
2626
"""The status of an orchestration instance."""
27+
2728
RUNNING = pb.ORCHESTRATION_STATUS_RUNNING
2829
COMPLETED = pb.ORCHESTRATION_STATUS_COMPLETED
2930
FAILED = pb.ORCHESTRATION_STATUS_FAILED
@@ -52,7 +53,8 @@ def raise_if_failed(self):
5253
if self.failure_details is not None:
5354
raise OrchestrationFailedError(
5455
f"Orchestration '{self.instance_id}' failed: {self.failure_details.message}",
55-
self.failure_details)
56+
self.failure_details,
57+
)
5658

5759

5860
class OrchestrationFailedError(Exception):
@@ -65,18 +67,23 @@ def failure_details(self):
6567
return self._failure_details
6668

6769

68-
def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Optional[OrchestrationState]:
70+
def new_orchestration_state(
71+
instance_id: str, res: pb.GetInstanceResponse
72+
) -> Optional[OrchestrationState]:
6973
if not res.exists:
7074
return None
7175

7276
state = res.orchestrationState
7377

7478
failure_details = None
75-
if state.failureDetails.errorMessage != '' or state.failureDetails.errorType != '':
79+
if state.failureDetails.errorMessage != "" or state.failureDetails.errorType != "":
7680
failure_details = task.FailureDetails(
7781
state.failureDetails.errorMessage,
7882
state.failureDetails.errorType,
79-
state.failureDetails.stackTrace.value if not helpers.is_empty(state.failureDetails.stackTrace) else None)
83+
state.failureDetails.stackTrace.value
84+
if not helpers.is_empty(state.failureDetails.stackTrace)
85+
else None,
86+
)
8087

8188
return OrchestrationState(
8289
instance_id,
@@ -87,19 +94,21 @@ def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Op
8794
state.input.value if not helpers.is_empty(state.input) else None,
8895
state.output.value if not helpers.is_empty(state.output) else None,
8996
state.customStatus.value if not helpers.is_empty(state.customStatus) else None,
90-
failure_details)
97+
failure_details,
98+
)
9199

92100

93101
class TaskHubGrpcClient:
94-
95-
def __init__(self, *,
96-
host_address: Optional[str] = None,
97-
metadata: Optional[list[tuple[str, str]]] = None,
98-
log_handler: Optional[logging.Handler] = None,
99-
log_formatter: Optional[logging.Formatter] = None,
100-
secure_channel: bool = False,
101-
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):
102-
102+
def __init__(
103+
self,
104+
*,
105+
host_address: Optional[str] = None,
106+
metadata: Optional[list[tuple[str, str]]] = None,
107+
log_handler: Optional[logging.Handler] = None,
108+
log_formatter: Optional[logging.Formatter] = None,
109+
secure_channel: bool = False,
110+
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
111+
):
103112
# If the caller provided metadata, we need to create a new interceptor for it and
104113
# add it to the list of interceptors.
105114
if interceptors is not None:
@@ -112,25 +121,28 @@ def __init__(self, *,
112121
interceptors = None
113122

114123
channel = shared.get_grpc_channel(
115-
host_address=host_address,
116-
secure_channel=secure_channel,
117-
interceptors=interceptors
124+
host_address=host_address, secure_channel=secure_channel, interceptors=interceptors
118125
)
119126
self._stub = stubs.TaskHubSidecarServiceStub(channel)
120127
self._logger = shared.get_logger("client", log_handler, log_formatter)
121128

122-
def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
123-
input: Optional[TInput] = None,
124-
instance_id: Optional[str] = None,
125-
start_at: Optional[datetime] = None,
126-
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str:
127-
129+
def schedule_new_orchestration(
130+
self,
131+
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
132+
*,
133+
input: Optional[TInput] = None,
134+
instance_id: Optional[str] = None,
135+
start_at: Optional[datetime] = None,
136+
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
137+
) -> str:
128138
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
129139

130140
req = pb.CreateInstanceRequest(
131141
name=name,
132142
instanceId=instance_id if instance_id else uuid.uuid4().hex,
133-
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
143+
input=wrappers_pb2.StringValue(value=shared.to_json(input))
144+
if input is not None
145+
else None,
134146
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
135147
version=wrappers_pb2.StringValue(value=""),
136148
orchestrationIdReusePolicy=reuse_id_policy,
@@ -140,19 +152,22 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
140152
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
141153
return res.instanceId
142154

143-
def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
155+
def get_orchestration_state(
156+
self, instance_id: str, *, fetch_payloads: bool = True
157+
) -> Optional[OrchestrationState]:
144158
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
145159
res: pb.GetInstanceResponse = self._stub.GetInstance(req)
146160
return new_orchestration_state(req.instanceId, res)
147161

148-
def wait_for_orchestration_start(self, instance_id: str, *,
149-
fetch_payloads: bool = False,
150-
timeout: int = 0) -> Optional[OrchestrationState]:
162+
def wait_for_orchestration_start(
163+
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
164+
) -> Optional[OrchestrationState]:
151165
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
152166
try:
153167
grpc_timeout = None if timeout == 0 else timeout
154168
self._logger.info(
155-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start.")
169+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
170+
)
156171
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
157172
return new_orchestration_state(req.instanceId, res)
158173
except grpc.RpcError as rpc_error:
@@ -162,22 +177,30 @@ def wait_for_orchestration_start(self, instance_id: str, *,
162177
else:
163178
raise
164179

165-
def wait_for_orchestration_completion(self, instance_id: str, *,
166-
fetch_payloads: bool = True,
167-
timeout: int = 0) -> Optional[OrchestrationState]:
180+
def wait_for_orchestration_completion(
181+
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
182+
) -> Optional[OrchestrationState]:
168183
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
169184
try:
170185
grpc_timeout = None if timeout == 0 else timeout
171186
self._logger.info(
172-
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete.")
173-
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(req, timeout=grpc_timeout)
187+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
188+
)
189+
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
190+
req, timeout=grpc_timeout
191+
)
174192
state = new_orchestration_state(req.instanceId, res)
175193
if not state:
176194
return None
177195

178-
if state.runtime_status == OrchestrationStatus.FAILED and state.failure_details is not None:
196+
if (
197+
state.runtime_status == OrchestrationStatus.FAILED
198+
and state.failure_details is not None
199+
):
179200
details = state.failure_details
180-
self._logger.info(f"Instance '{instance_id}' failed: [{details.error_type}] {details.message}")
201+
self._logger.info(
202+
f"Instance '{instance_id}' failed: [{details.error_type}] {details.message}"
203+
)
181204
elif state.runtime_status == OrchestrationStatus.TERMINATED:
182205
self._logger.info(f"Instance '{instance_id}' was terminated.")
183206
elif state.runtime_status == OrchestrationStatus.COMPLETED:
@@ -191,23 +214,26 @@ def wait_for_orchestration_completion(self, instance_id: str, *,
191214
else:
192215
raise
193216

194-
def raise_orchestration_event(self, instance_id: str, event_name: str, *,
195-
data: Optional[Any] = None):
217+
def raise_orchestration_event(
218+
self, instance_id: str, event_name: str, *, data: Optional[Any] = None
219+
):
196220
req = pb.RaiseEventRequest(
197221
instanceId=instance_id,
198222
name=event_name,
199-
input=wrappers_pb2.StringValue(value=shared.to_json(data)) if data else None)
223+
input=wrappers_pb2.StringValue(value=shared.to_json(data)) if data else None,
224+
)
200225

201226
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
202227
self._stub.RaiseEvent(req)
203228

204-
def terminate_orchestration(self, instance_id: str, *,
205-
output: Optional[Any] = None,
206-
recursive: bool = True):
229+
def terminate_orchestration(
230+
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
231+
):
207232
req = pb.TerminateRequest(
208233
instanceId=instance_id,
209234
output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None,
210-
recursive=recursive)
235+
recursive=recursive,
236+
)
211237

212238
self._logger.info(f"Terminating instance '{instance_id}'.")
213239
self._stub.TerminateInstance(req)

durabletask/internal/grpc_interceptor.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,26 @@
77

88

99
class _ClientCallDetails(
10-
namedtuple(
11-
'_ClientCallDetails',
12-
['method', 'timeout', 'metadata', 'credentials', 'wait_for_ready', 'compression']),
13-
grpc.ClientCallDetails):
10+
namedtuple(
11+
"_ClientCallDetails",
12+
["method", "timeout", "metadata", "credentials", "wait_for_ready", "compression"],
13+
),
14+
grpc.ClientCallDetails,
15+
):
1416
"""This is an implementation of the ClientCallDetails interface needed for interceptors.
1517
This class takes six named values and inherits the ClientCallDetails from grpc package.
1618
This class encloses the values that describe a RPC to be invoked.
1719
"""
20+
1821
pass
1922

2023

21-
class DefaultClientInterceptorImpl (
22-
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
23-
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
24+
class DefaultClientInterceptorImpl(
25+
grpc.UnaryUnaryClientInterceptor,
26+
grpc.UnaryStreamClientInterceptor,
27+
grpc.StreamUnaryClientInterceptor,
28+
grpc.StreamStreamClientInterceptor,
29+
):
2430
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
2531
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
2632
interceptor to add additional headers to all calls as needed."""
@@ -29,10 +35,9 @@ def __init__(self, metadata: list[tuple[str, str]]):
2935
super().__init__()
3036
self._metadata = metadata
3137

32-
def _intercept_call(
33-
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
38+
def _intercept_call(self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
3439
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
35-
call details."""
40+
call details."""
3641
if self._metadata is None:
3742
return client_call_details
3843

@@ -43,8 +48,13 @@ def _intercept_call(
4348

4449
metadata.extend(self._metadata)
4550
client_call_details = _ClientCallDetails(
46-
client_call_details.method, client_call_details.timeout, metadata,
47-
client_call_details.credentials, client_call_details.wait_for_ready, client_call_details.compression)
51+
client_call_details.method,
52+
client_call_details.timeout,
53+
metadata,
54+
client_call_details.credentials,
55+
client_call_details.wait_for_ready,
56+
client_call_details.compression,
57+
)
4858

4959
return client_call_details
5060

0 commit comments

Comments
 (0)