Skip to content

Commit 5262b05

Browse files
committed
fix continue as new losing track of router and making placement fail
refactor: clean up debug print statements and improve code formatting Removed unnecessary debug print statements from the TaskHubGrpcClient class to streamline the code and enhance readability. Additionally, adjusted formatting in various files for consistency, including property definitions and string formatting. This update aims to improve code maintainability and clarity. Signed-off-by: Filinto Duran <[email protected]>
1 parent 336d37d commit 5262b05

24 files changed

+335
-143
lines changed

durabletask/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
"""Durable Task SDK for Python"""
55

6-
76
PACKAGE_NAME = 'durabletask'
87

98
# Public async exports (import directly from durabletask.aio)

durabletask/aio/compatibility.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ def ensure_compatibility(context_class: type) -> type:
169169

170170
if missing_items:
171171
raise TypeError(
172-
f"{context_class.__name__} does not implement OrchestrationContextProtocol. "
173-
f"Missing: {', '.join(missing_items)}"
172+
f'{context_class.__name__} does not implement OrchestrationContextProtocol. '
173+
f'Missing: {", ".join(missing_items)}'
174174
)
175175

176176
return context_class

durabletask/aio/context.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class AsyncWorkflowContext(DeterministicContextMixin):
7878
'_detection_disabled',
7979
'_workflow_name',
8080
'_current_step',
81+
'_sandbox_originals',
82+
'_sandbox_mode',
8183
)
8284

8385
# Generic type variable for awaitable result
@@ -95,6 +97,9 @@ def __init__(self, base_ctx: dt_task.OrchestrationContext):
9597
self._cleanup_tasks: list[Callable[[], Any]] = []
9698
self._workflow_name: Optional[str] = None
9799
self._current_step: Optional[str] = None
100+
# Set by sandbox when active
101+
self._sandbox_originals: Optional[dict[str, Any]] = None
102+
self._sandbox_mode: Optional[str] = None
98103

99104
# Performance optimization: Check if detection should be globally disabled
100105
self._detection_disabled = os.getenv('DAPR_WF_DISABLE_DETECTION') == 'true'
@@ -192,6 +197,37 @@ def info(self) -> WorkflowInfo:
192197
workflow_attempt=self.workflow_attempt,
193198
)
194199

200+
# Unsafe escape hatch: real wall-clock UTC now (best_effort only, not during replay)
201+
def unsafe_wall_clock_now(self) -> datetime:
202+
"""
203+
Return the real UTC wall-clock time.
204+
205+
Constraints:
206+
- Raises RuntimeError if called during replay.
207+
- Raises RuntimeError if sandbox mode is 'strict'.
208+
- Intended for telemetry/logging only; do not use for workflow decisions.
209+
"""
210+
if self.is_replaying:
211+
raise RuntimeError('unsafe_wall_clock_now() cannot be used during replay')
212+
mode = getattr(self, '_sandbox_mode', None)
213+
if mode == 'strict':
214+
raise RuntimeError('unsafe_wall_clock_now() is disabled in strict sandbox mode')
215+
originals = getattr(self, '_sandbox_originals', None)
216+
if not originals or 'time.time' not in originals:
217+
# Fallback to system if sandbox not active
218+
import time as _time
219+
from datetime import timezone
220+
221+
return datetime.fromtimestamp(_time.time(), tz=timezone.utc)
222+
real_time = originals['time.time']
223+
try:
224+
from datetime import timezone
225+
226+
ts = float(real_time()) # type: ignore[call-arg]
227+
return datetime.fromtimestamp(ts, tz=timezone.utc)
228+
except Exception as e:
229+
raise RuntimeError(f'unsafe_wall_clock_now() failed: {e}')
230+
195231
# Activity operations
196232
def activity(
197233
self,

durabletask/aio/driver.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@
3535
class WorkflowFunction(Protocol):
3636
"""Protocol for workflow functions."""
3737

38-
async def __call__(self, ctx: Any, input_data: Optional[Any] = None) -> Any:
39-
...
38+
async def __call__(self, ctx: Any, input_data: Optional[Any] = None) -> Any: ...
4039

4140

4241
class CoroutineOrchestratorRunner:

durabletask/aio/errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(
4545
if step:
4646
context_parts.append(f'step: {step}')
4747

48-
context_str = f" ({', '.join(context_parts)})" if context_parts else ''
48+
context_str = f' ({", ".join(context_parts)})' if context_parts else ''
4949
super().__init__(f'{message}{context_str}')
5050

5151

durabletask/aio/sandbox.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,14 @@ def __enter__(self) -> '_Sandbox':
293293
# Apply patches for best_effort and strict modes
294294
self._apply_patches()
295295

296+
# Expose originals/mode to the async workflow context for controlled unsafe access
297+
try:
298+
setattr(self.async_ctx, '_sandbox_originals', dict(self.originals))
299+
setattr(self.async_ctx, '_sandbox_mode', self.mode)
300+
except Exception:
301+
# Context may not support attribute assignment; ignore
302+
pass
303+
296304
return self
297305

298306
def __exit__(
@@ -307,6 +315,15 @@ def __exit__(
307315
if self.mode != 'off' and self.originals:
308316
self._restore_originals()
309317

318+
# Remove exposed references from the async context
319+
try:
320+
if hasattr(self.async_ctx, '_sandbox_originals'):
321+
delattr(self.async_ctx, '_sandbox_originals')
322+
if hasattr(self.async_ctx, '_sandbox_mode'):
323+
delattr(self.async_ctx, '_sandbox_mode')
324+
except Exception:
325+
pass
326+
310327
def _apply_patches(self) -> None:
311328
"""Apply patches to non-deterministic functions."""
312329
import asyncio as _asyncio
@@ -525,9 +542,7 @@ async def _empty() -> list[Any]:
525542
if all(_is_workflow_awaitable(a) for a in aws):
526543

527544
async def _await_when_all() -> Any:
528-
from .awaitables import (
529-
WhenAllAwaitable, # local import to avoid cycles
530-
)
545+
from .awaitables import WhenAllAwaitable # local import to avoid cycles
531546

532547
combined: Any = WhenAllAwaitable(list(aws))
533548
return await combined

durabletask/client.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,9 @@ def wait_for_orchestration_completion(
232232
# For positive timeout, best-effort pre-check and short polling to avoid long server waits
233233
try:
234234
# First check if the orchestration is already completed
235-
print(f'[CLIENT DEBUG] Checking current state before waiting for {instance_id}')
236235
current_state = self.get_orchestration_state(
237236
instance_id, fetch_payloads=fetch_payloads
238237
)
239-
print(
240-
f"[CLIENT DEBUG] Current state: {current_state.runtime_status if current_state else 'None'}"
241-
)
242238
if current_state and current_state.runtime_status in [
243239
OrchestrationStatus.COMPLETED,
244240
OrchestrationStatus.FAILED,
@@ -256,42 +252,28 @@ def wait_for_orchestration_completion(
256252
poll_start = time.time()
257253
poll_interval = 0.1
258254

259-
print(f'[CLIENT DEBUG] Starting polling for up to {poll_timeout}s')
260255
while time.time() - poll_start < poll_timeout:
261256
current_state = self.get_orchestration_state(
262257
instance_id, fetch_payloads=fetch_payloads
263258
)
264-
print(
265-
f"[CLIENT DEBUG] Poll state: {current_state.runtime_status if current_state else 'None'}"
266-
)
267259

268260
if current_state and current_state.runtime_status in [
269261
OrchestrationStatus.COMPLETED,
270262
OrchestrationStatus.FAILED,
271263
OrchestrationStatus.TERMINATED,
272264
]:
273-
print(
274-
f'[CLIENT DEBUG] Found terminal state during polling: {current_state.runtime_status}'
275-
)
276265
return current_state
277266

278267
time.sleep(poll_interval)
279268
poll_interval = min(poll_interval * 1.5, 1.0) # Exponential backoff, max 1s
280-
print(f'[CLIENT DEBUG] Polling completed, no terminal state found')
281269
except Exception:
282270
# Ignore pre-check/poll issues (e.g., mocked stubs in unit tests) and fall back
283271
pass
284272

285273
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to complete.")
286-
print(
287-
f'[CLIENT DEBUG] About to call WaitForInstanceCompletion for {instance_id} with timeout {grpc_timeout}'
288-
)
289274
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
290275
req, timeout=grpc_timeout
291276
)
292-
print(
293-
f'[CLIENT DEBUG] WaitForInstanceCompletion returned successfully for {instance_id}'
294-
)
295277
state = new_orchestration_state(req.instanceId, res)
296278
if not state:
297279
return None
@@ -311,17 +293,10 @@ def wait_for_orchestration_completion(
311293

312294
return state
313295
except grpc.RpcError as rpc_error:
314-
print(
315-
f'[CLIENT DEBUG] gRPC error in WaitForInstanceCompletion for {instance_id}: {rpc_error.code()} - {rpc_error.details()}'
316-
)
317296
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
318-
print(
319-
f'[CLIENT DEBUG] Deadline exceeded for {instance_id}, converting to TimeoutError'
320-
)
321297
# Replace gRPC error with the built-in TimeoutError
322298
raise TimeoutError('Timed-out waiting for the orchestration to complete')
323299
else:
324-
print(f'[CLIENT DEBUG] Re-raising non-timeout gRPC error for {instance_id}')
325300
raise
326301

327302
def raise_orchestration_event(

durabletask/deterministic.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,10 @@ class DeterministicContextProtocol(Protocol):
6464
"""Protocol for contexts that provide deterministic operations."""
6565

6666
@property
67-
def instance_id(self) -> str:
68-
...
67+
def instance_id(self) -> str: ...
6968

7069
@property
71-
def current_utc_datetime(self) -> datetime:
72-
...
70+
def current_utc_datetime(self) -> datetime: ...
7371

7472

7573
class DeterministicContextMixin:

durabletask/internal/helpers.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def new_complete_orchestration_action(
208208
result: Optional[str] = None,
209209
failure_details: Optional[pb.TaskFailureDetails] = None,
210210
carryover_events: Optional[list[pb.HistoryEvent]] = None,
211+
app_id: Optional[str] = None,
211212
) -> pb.OrchestratorAction:
212213
completeOrchestrationAction = pb.CompleteOrchestrationAction(
213214
orchestrationStatus=status,
@@ -216,7 +217,10 @@ def new_complete_orchestration_action(
216217
carryoverEvents=carryover_events,
217218
)
218219

219-
return pb.OrchestratorAction(id=id, completeOrchestration=completeOrchestrationAction)
220+
return pb.OrchestratorAction(
221+
id=id, completeOrchestration=completeOrchestrationAction,
222+
router=pb.TaskRouter(sourceAppID=app_id) if app_id else None
223+
)
220224

221225

222226
def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction:

durabletask/internal/shared.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ def get_grpc_retry_service_config_option() -> Optional[tuple[str, str]]:
181181
'name': [{'service': ''}],
182182
'retryPolicy': {
183183
'maxAttempts': max_attempts,
184-
'initialBackoff': f'{initial_backoff_ms/1000.0}s',
185-
'maxBackoff': f'{max_backoff_ms/1000.0}s',
184+
'initialBackoff': f'{initial_backoff_ms / 1000.0}s',
185+
'maxBackoff': f'{max_backoff_ms / 1000.0}s',
186186
'backoffMultiplier': backoff_multiplier,
187187
'retryableStatusCodes': codes,
188188
},
@@ -222,7 +222,7 @@ def get_logger(
222222
# Add a default log handler if none is provided
223223
if log_handler is None:
224224
log_handler = logging.StreamHandler()
225-
log_handler.setLevel(logging.INFO)
225+
log_handler.setLevel(logging.DEBUG)
226226
logger.handlers.append(log_handler)
227227

228228
# Set a default log formatter to our handler if none is provided

0 commit comments

Comments
 (0)