Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions sdks/typescript/src/opentelemetry/instrumentor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { propagation } from '@opentelemetry/api';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import {
BasicTracerProvider,
InMemorySpanExporter,
SimpleSpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { HatchetInstrumentor } from './instrumentor';

describe('HatchetInstrumentor worker spans', () => {
it('creates a workflow-level parent span for dashboard-triggered step runs', async () => {
const exporter = new InMemorySpanExporter();
const provider = new BasicTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});

propagation.setGlobalPropagator(new W3CTraceContextPropagator());

const dashboardTracer = provider.getTracer('dashboard-test');
const dashboardSpan = dashboardTracer.startSpan('dashboard-trigger');
const dashboardSpanContext = dashboardSpan.spanContext();
const carrier: Record<string, string> = {
traceparent: `00-${dashboardSpanContext.traceId}-${dashboardSpanContext.spanId}-01`,
};
dashboardSpan.end();

class FakeWorker {
workerId = 'worker-1';

async handleStartStepRun(_action: unknown) {
return undefined;
}

async handleCancelStepRun() {
return undefined;
}
}

const instrumentor = new HatchetInstrumentor();
instrumentor.setTracerProvider(provider);
(instrumentor as any)._patchHandleStartStepRun(FakeWorker.prototype);

const worker = new FakeWorker();
await worker.handleStartStepRun({
tenantId: 'tenant-1',
workflowRunId: 'workflow-run-1',
taskId: 'task-1',
taskRunExternalId: 'step-run-1',
retryCount: 0,
parentWorkflowRunId: '',
childWorkflowIndex: 0,
childWorkflowKey: '',
actionPayload: '{"input":true}',
jobName: 'find-subprocessors',
actionId: 'find-subprocessors:resolve-parent-company',
taskName: 'resolve-parent-company',
workflowId: 'workflow-1',
workflowVersionId: 'workflow-version-1',
additionalMetadata: JSON.stringify(carrier),
} as any);

const spans = exporter.getFinishedSpans();
const workflowSpan = spans.find((span) => span.name === 'hatchet.workflow_run');
const stepSpan = spans.find((span) => span.name === 'hatchet.start_step_run');

expect(workflowSpan).toBeDefined();
expect(stepSpan).toBeDefined();
expect(stepSpan?.parentSpanContext?.spanId).toBe(workflowSpan?.spanContext().spanId);
expect(workflowSpan?.parentSpanContext?.spanId).toBe(dashboardSpanContext.spanId);
});
});
74 changes: 65 additions & 9 deletions sdks/typescript/src/opentelemetry/instrumentor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const otelInstrumentation =
require('@opentelemetry/instrumentation') as typeof import('@opentelemetry/instrumentation');
/* eslint-enable @typescript-eslint/no-require-imports */

const { context, propagation, SpanKind, SpanStatusCode, diag } = otelApi;
const { context, propagation, SpanKind, SpanStatusCode, diag, trace, isSpanContextValid } = otelApi;

const {
InstrumentationBase,
Expand Down Expand Up @@ -80,6 +80,7 @@ type HatchetInstrumentationConfig = OpenTelemetryConfig &
};
type Carrier = Record<string, string>;

const TRIGGER_SPAN_EXPORTED_KEY = 'hatchet__trigger_span_exported';
const INSTRUMENTOR_NAME = '@hatchet-dev/typescript-sdk';
// FIXME: refactor version check to use the new pattern introduced in #2954
const SUPPORTED_VERSIONS = ['>=1.16.0'];
Expand All @@ -105,6 +106,22 @@ function injectSourceInfo(carrier: Carrier): void {
}
}

function markTriggerSpanAsExported(carrier: Carrier): void {
carrier[TRIGGER_SPAN_EXPORTED_KEY] = 'true';
}

function shouldCreateWorkflowRunSpan(
carrier: Carrier | undefined,
parentContext: OtelContext
): boolean {
if (!carrier || carrier[TRIGGER_SPAN_EXPORTED_KEY] === 'true') {
return false;
}

const parentSpanContext = trace.getSpanContext(parentContext);
return parentSpanContext ? isSpanContextValid(parentSpanContext) : false;
}

function getActionOtelAttributes(
action: Action,
excludedAttributes: string[] = [],
Expand Down Expand Up @@ -351,6 +368,7 @@ export class HatchetInstrumentor extends InstrumentationBase<HatchetInstrumentat
(span: Span) => {
const enhancedMetadata: Carrier = { ...(options.additionalMetadata ?? {}) };
injectContext(enhancedMetadata);
markTriggerSpanAsExported(enhancedMetadata);
injectSourceInfo(enhancedMetadata);

const enhancedOptions: PushEventOptions = {
Expand Down Expand Up @@ -406,6 +424,7 @@ export class HatchetInstrumentor extends InstrumentationBase<HatchetInstrumentat
...((input.additionalMetadata as Carrier) ?? {}),
};
injectContext(enhancedMetadata);
markTriggerSpanAsExported(enhancedMetadata);
injectSourceInfo(enhancedMetadata);
return {
...input,
Expand Down Expand Up @@ -500,6 +519,7 @@ export class HatchetInstrumentor extends InstrumentationBase<HatchetInstrumentat
(span: Span) => {
const enhancedMetadata: Carrier = { ...(options?.additionalMetadata ?? {}) };
injectContext(enhancedMetadata);
markTriggerSpanAsExported(enhancedMetadata);

const enhancedOptions = {
...options,
Expand Down Expand Up @@ -564,6 +584,7 @@ export class HatchetInstrumentor extends InstrumentationBase<HatchetInstrumentat
const enhancedWorkflowRuns = workflowRuns.map((run) => {
const enhancedMetadata: Carrier = { ...(run.options?.additionalMetadata ?? {}) };
injectContext(enhancedMetadata);
markTriggerSpanAsExported(enhancedMetadata);
return {
...run,
options: {
Expand Down Expand Up @@ -658,27 +679,61 @@ export class HatchetInstrumentor extends InstrumentationBase<HatchetInstrumentat
}
setHatchetSpanAttributes(hatchetAttrs);

const runStepSpan = (stepParentContext: OtelContext) =>
tracer.startActiveSpan(
spanName,
{
kind: SpanKind.CONSUMER,
attributes,
},
stepParentContext,
(span: Span) => {
return original
.call(this, action)
.then((taskError: Error | undefined) => {
if (taskError instanceof Error) {
span.recordException(taskError);
span.setStatus({ code: SpanStatusCode.ERROR, message: taskError.message });
} else {
span.setStatus({ code: SpanStatusCode.OK });
}
return taskError;
})
.finally(() => {
span.end();
});
}
);

if (!shouldCreateWorkflowRunSpan(additionalMetadata, parentContext)) {
return runStepSpan(parentContext);
}

return tracer.startActiveSpan(
spanName,
'hatchet.workflow_run',
{
kind: SpanKind.CONSUMER,
attributes,
},
parentContext,
(span: Span) => {
return original
.call(this, action)
(workflowSpan: Span) => {
const workflowContext = trace.setSpan(context.active(), workflowSpan);

return runStepSpan(workflowContext)
.then((taskError: Error | undefined) => {
if (taskError instanceof Error) {
span.recordException(taskError);
span.setStatus({ code: SpanStatusCode.ERROR, message: taskError.message });
workflowSpan.recordException(taskError);
workflowSpan.setStatus({
code: SpanStatusCode.ERROR,
message: taskError.message,
});
} else {
span.setStatus({ code: SpanStatusCode.OK });
workflowSpan.setStatus({ code: SpanStatusCode.OK });
}
return taskError;
})
.finally(() => {
span.end();
workflowSpan.end();
});
}
);
Expand Down Expand Up @@ -791,6 +846,7 @@ export class HatchetInstrumentor extends InstrumentationBase<HatchetInstrumentat
// Inject traceparent into additionalMetadata for context propagation
const enhancedMetadata: Carrier = { ...(input.additionalMetadata ?? {}) };
injectContext(enhancedMetadata);
markTriggerSpanAsExported(enhancedMetadata);

const enhancedInput = {
...input,
Expand Down