Skip to content

Commit 84c4291

Browse files
authored
Add distributed tracing using opentelemetry (#119)
* Add distributed tracing using opentelemetry
1 parent ce7c524 commit 84c4291

File tree

15 files changed

+3503
-78
lines changed

15 files changed

+3503
-78
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ ADDED
1313
- Added `AsyncTaskHubGrpcClient` for asyncio-based applications using `grpc.aio`
1414
- Added `DefaultAsyncClientInterceptorImpl` for async gRPC metadata interceptors
1515
- Added `get_async_grpc_channel` helper for creating async gRPC channels
16+
- Improved distributed tracing support with full span coverage for orchestrations, activities, sub-orchestrations, timers, and events
1617

1718
CHANGED
1819

durabletask/client.py

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Licensed under the MIT License.
33

44
import logging
5+
import uuid
56
from dataclasses import dataclass
67
from datetime import datetime
78
from enum import Enum
@@ -16,6 +17,7 @@
1617
import durabletask.internal.orchestrator_service_pb2 as pb
1718
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1819
import durabletask.internal.shared as shared
20+
import durabletask.internal.tracing as tracing
1921
from durabletask import task
2022
from durabletask.internal.client_helpers import (
2123
build_query_entities_req,
@@ -176,14 +178,28 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
176178
tags: Optional[dict[str, str]] = None,
177179
version: Optional[str] = None) -> str:
178180

179-
req = build_schedule_new_orchestration_req(
180-
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
181-
reuse_id_policy=reuse_id_policy, tags=tags,
182-
version=version if version else self.default_version)
183-
184-
self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
185-
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
186-
return res.instanceId
181+
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
182+
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
183+
resolved_version = version if version else self.default_version
184+
185+
with tracing.start_create_orchestration_span(
186+
name, resolved_instance_id, version=resolved_version,
187+
):
188+
req = build_schedule_new_orchestration_req(
189+
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
190+
reuse_id_policy=reuse_id_policy, tags=tags,
191+
version=version if version else self.default_version)
192+
193+
# Inject the active PRODUCER span context into the request so the sidecar
194+
# stores it in the executionStarted event and the worker can parent all
195+
# orchestration/activity/timer spans under this trace.
196+
parent_trace_ctx = tracing.get_current_trace_context()
197+
if parent_trace_ctx is not None:
198+
req.parentTraceContext.CopyFrom(parent_trace_ctx)
199+
200+
self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
201+
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
202+
return res.instanceId
187203

188204
def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
189205
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
@@ -245,10 +261,10 @@ def wait_for_orchestration_completion(self, instance_id: str, *,
245261

246262
def raise_orchestration_event(self, instance_id: str, event_name: str, *,
247263
data: Optional[Any] = None) -> None:
248-
req = build_raise_event_req(instance_id, event_name, data)
249-
250-
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
251-
self._stub.RaiseEvent(req)
264+
with tracing.start_raise_event_span(event_name, instance_id):
265+
req = build_raise_event_req(instance_id, event_name, data)
266+
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
267+
self._stub.RaiseEvent(req)
252268

253269
def terminate_orchestration(self, instance_id: str, *,
254270
output: Optional[Any] = None,
@@ -418,14 +434,25 @@ async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator
418434
tags: Optional[dict[str, str]] = None,
419435
version: Optional[str] = None) -> str:
420436

421-
req = build_schedule_new_orchestration_req(
422-
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
423-
reuse_id_policy=reuse_id_policy, tags=tags,
424-
version=version if version else self.default_version)
437+
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
438+
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
439+
resolved_version = version if version else self.default_version
425440

426-
self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
427-
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
428-
return res.instanceId
441+
with tracing.start_create_orchestration_span(
442+
name, resolved_instance_id, version=resolved_version,
443+
):
444+
req = build_schedule_new_orchestration_req(
445+
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
446+
reuse_id_policy=reuse_id_policy, tags=tags,
447+
version=version if version else self.default_version)
448+
449+
parent_trace_ctx = tracing.get_current_trace_context()
450+
if parent_trace_ctx is not None:
451+
req.parentTraceContext.CopyFrom(parent_trace_ctx)
452+
453+
self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
454+
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
455+
return res.instanceId
429456

430457
async def get_orchestration_state(self, instance_id: str, *,
431458
fetch_payloads: bool = True) -> Optional[OrchestrationState]:
@@ -487,10 +514,10 @@ async def wait_for_orchestration_completion(self, instance_id: str, *,
487514

488515
async def raise_orchestration_event(self, instance_id: str, event_name: str, *,
489516
data: Optional[Any] = None) -> None:
490-
req = build_raise_event_req(instance_id, event_name, data)
491-
492-
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
493-
await self._stub.RaiseEvent(req)
517+
with tracing.start_raise_event_span(event_name, instance_id):
518+
req = build_raise_event_req(instance_id, event_name, data)
519+
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
520+
await self._stub.RaiseEvent(req)
494521

495522
async def terminate_orchestration(self, instance_id: str, *,
496523
output: Optional[Any] = None,

durabletask/internal/helpers.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ def new_orchestrator_completed_event() -> pb.HistoryEvent:
2727

2828
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None,
2929
tags: Optional[dict[str, str]] = None,
30-
version: Optional[str] = None) -> pb.HistoryEvent:
30+
version: Optional[str] = None,
31+
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.HistoryEvent:
3132
return pb.HistoryEvent(
3233
eventId=-1,
3334
timestamp=timestamp_pb2.Timestamp(),
@@ -36,7 +37,8 @@ def new_execution_started_event(name: str, instance_id: str, encoded_input: Opti
3637
version=get_string_value(version),
3738
input=get_string_value(encoded_input),
3839
orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id),
39-
tags=tags))
40+
tags=tags,
41+
parentTraceContext=parent_trace_context))
4042

4143

4244
def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent:
@@ -223,11 +225,13 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
223225

224226

225227
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
226-
tags: Optional[dict[str, str]]) -> pb.OrchestratorAction:
228+
tags: Optional[dict[str, str]],
229+
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
227230
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
228231
name=name,
229232
input=get_string_value(encoded_input),
230-
tags=tags
233+
tags=tags,
234+
parentTraceContext=parent_trace_context,
231235
))
232236

233237

@@ -302,12 +306,14 @@ def new_create_sub_orchestration_action(
302306
name: str,
303307
instance_id: Optional[str],
304308
encoded_input: Optional[str],
305-
version: Optional[str]) -> pb.OrchestratorAction:
309+
version: Optional[str],
310+
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
306311
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
307312
name=name,
308313
instanceId=instance_id,
309314
input=get_string_value(encoded_input),
310-
version=get_string_value(version)
315+
version=get_string_value(version),
316+
parentTraceContext=parent_trace_context,
311317
))
312318

313319

0 commit comments

Comments
 (0)