From 15d02f407c58e1ad338342f3aea41334c993bfce Mon Sep 17 00:00:00 2001 From: purva Date: Sat, 2 May 2026 19:31:05 +0530 Subject: [PATCH 1/2] feat(otel): add useLinksInsteadOfParent option to HatchetInstrumentor Fire-and-forget child runs (runNoWait fan-outs) should not extend their parent trace's duration until the slowest child finishes. OTel span links are the correct primitive: the child starts a new root trace with a navigable link back to the spawning span. Add a useLinksInsteadOfParent predicate to HatchetInstrumentationConfig. When it returns true for a given actionId, handleStartStepRun starts the step span without a parent context and instead attaches the extracted SpanContext as a link. Default is undefined, preserving all existing parent-child behaviour exactly. Closes #3641 Co-Authored-By: Claude Sonnet 4.6 --- .../src/opentelemetry/instrumentor.test.ts | 270 ++++++++++++++++++ .../src/opentelemetry/instrumentor.ts | 71 +++-- 2 files changed, 320 insertions(+), 21 deletions(-) create mode 100644 sdks/typescript/src/opentelemetry/instrumentor.test.ts diff --git a/sdks/typescript/src/opentelemetry/instrumentor.test.ts b/sdks/typescript/src/opentelemetry/instrumentor.test.ts new file mode 100644 index 0000000000..f073d5e317 --- /dev/null +++ b/sdks/typescript/src/opentelemetry/instrumentor.test.ts @@ -0,0 +1,270 @@ +/** + * Tests for HatchetInstrumentor.useLinksInsteadOfParent option. + * + * Verifies that fire-and-forget child runs use OTel span links instead of + * parent-child relationships when the predicate returns true, while + * preserving the default parent-child behaviour when it returns false. + */ + +// Minimal OTel span mock +const makeMockSpan = () => ({ + end: jest.fn(), + recordException: jest.fn(), + setStatus: jest.fn(), + spanContext: jest.fn(() => ({ + traceId: 'a'.repeat(32), + spanId: 'b'.repeat(16), + traceFlags: 1, + })), +}); + +// Build a mock tracer whose startActiveSpan captures its arguments. +const makeTracerMock = (span = makeMockSpan()) => { + const calls: IArguments[] = []; + const tracer = { + _calls: calls, + _span: span, + startActiveSpan: jest.fn(function (...args: unknown[]) { + // The last argument is always the callback (fn). + const fn = args[args.length - 1] as (span: unknown) => unknown; + return fn(span); + }), + }; + return tracer; +}; + +// Dummy action used in all tests. +const makeAction = (actionId = 'my-worker:my-task') => ({ + actionId, + tenantId: 'tenant-1', + workflowRunId: 'run-1', + taskId: 'task-1', + taskRunExternalId: 'ext-1', + retryCount: 0, + parentWorkflowRunId: undefined, + childWorkflowIndex: undefined, + childWorkflowKey: undefined, + actionPayload: '{}', + jobName: 'my-task', + taskName: 'my-task', + workflowId: 'wf-1', + workflowVersionId: 'wfv-1', + // Encode a fake traceparent in the metadata so extractContext finds a valid context. + additionalMetadata: JSON.stringify({ + traceparent: '00-' + 'a'.repeat(32) + '-' + 'b'.repeat(16) + '-01', + }), +}); + +// --------------------------------------------------------------------------- +// Shared setup: mock @opentelemetry/api and @opentelemetry/instrumentation so +// HatchetInstrumentor can be imported without a real OTel SDK being present. +// --------------------------------------------------------------------------- + +jest.mock('@opentelemetry/api', () => { + const validSpanCtx = { + traceId: 'a'.repeat(32), + spanId: 'b'.repeat(16), + traceFlags: 1, + }; + + // SpanKind.CONSUMER = 4, SpanStatusCode.OK = 1, SpanStatusCode.ERROR = 2 + return { + SpanKind: { INTERNAL: 0, SERVER: 1, CLIENT: 2, PRODUCER: 3, CONSUMER: 4 }, + SpanStatusCode: { UNSET: 0, OK: 1, ERROR: 2 }, + context: { + active: jest.fn(() => ({})), + with: jest.fn((_ctx: unknown, fn: () => unknown) => fn()), + }, + propagation: { + extract: jest.fn(() => ({ _extracted: true })), + inject: jest.fn(), + }, + trace: { + getSpanContext: jest.fn(() => validSpanCtx), + isSpanContextValid: jest.fn(() => true), + }, + diag: { + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }, + }; +}); + +jest.mock('@opentelemetry/instrumentation', () => { + class InstrumentationBase { + protected tracer: ReturnType; + protected config: Record; + constructor(_name: string, _version: string, config: Record) { + this.config = config; + this.tracer = makeTracerMock(); + } + getConfig() { + return this.config; + } + setConfig(cfg: Record) { + this.config = cfg; + } + protected _wrap( + proto: Record, + method: string, + wrapper: (orig: unknown) => unknown + ) { + proto[method] = wrapper(proto[method]); + } + protected _unwrap(proto: Record, method: string) { + // no-op in tests + } + } + + return { + InstrumentationBase, + InstrumentationNodeModuleDefinition: jest.fn(() => ({})), + InstrumentationNodeModuleFile: jest.fn(() => ({})), + isWrapped: jest.fn(() => false), + }; +}); + +// --------------------------------------------------------------------------- +// Import the instrumentor AFTER mocks are set up. +// --------------------------------------------------------------------------- +import { HatchetInstrumentor } from './instrumentor'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function buildWorkerProto(action = makeAction()) { + const proto = { + workerId: 'worker-1', + handleStartStepRun: jest.fn().mockResolvedValue(undefined), + }; + return proto; +} + +/** + * Returns the `startActiveSpan` call arguments for the `hatchet.start_step_run` span. + * startActiveSpan is overloaded: + * (name, opts, context, fn) → parent-child mode (fn is 4th arg, index 3) + * (name, opts, fn) → link mode (fn is 3rd arg, index 2) + */ +function getStartStepRunArgs(tracer: ReturnType) { + const call = (tracer.startActiveSpan as jest.Mock).mock.calls.find( + ([name]: [string]) => typeof name === 'string' && name.startsWith('hatchet.start_step_run') + ); + expect(call).toBeDefined(); + return call as unknown[]; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('HatchetInstrumentor.useLinksInsteadOfParent', () => { + it('uses parent-child semantics by default (no option provided)', async () => { + const instrumentor = new HatchetInstrumentor({}); + const tracer = (instrumentor as unknown as { tracer: ReturnType }).tracer; + tracer.startActiveSpan = makeTracerMock()._span + ? jest.fn((...args: unknown[]) => { + const fn = args[args.length - 1] as (span: unknown) => unknown; + return fn(makeMockSpan()); + }) + : tracer.startActiveSpan; + + const proto = buildWorkerProto(); + (instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({ + InternalWorker: { prototype: proto }, + }); + + await proto.handleStartStepRun(makeAction()); + + const args = getStartStepRunArgs(tracer); + // 4 args → (name, opts, parentContext, fn) + expect(args).toHaveLength(4); + // opts should NOT include links + const opts = args[1] as Record; + expect(opts.links).toBeUndefined(); + }); + + it('uses parent-child semantics when predicate returns false', async () => { + const instrumentor = new HatchetInstrumentor({ + useLinksInsteadOfParent: () => false, + }); + const tracer = (instrumentor as unknown as { tracer: ReturnType }).tracer; + + const proto = buildWorkerProto(); + (instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({ + InternalWorker: { prototype: proto }, + }); + + await proto.handleStartStepRun(makeAction()); + + const args = getStartStepRunArgs(tracer); + expect(args).toHaveLength(4); + const opts = args[1] as Record; + expect(opts.links).toBeUndefined(); + }); + + it('uses span links and no parent context when predicate returns true', async () => { + const instrumentor = new HatchetInstrumentor({ + useLinksInsteadOfParent: () => true, + }); + const tracer = (instrumentor as unknown as { tracer: ReturnType }).tracer; + + const proto = buildWorkerProto(); + (instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({ + InternalWorker: { prototype: proto }, + }); + + await proto.handleStartStepRun(makeAction()); + + const args = getStartStepRunArgs(tracer); + // 3 args → (name, opts, fn) — no parent context + expect(args).toHaveLength(3); + const opts = args[1] as Record; + expect(Array.isArray(opts.links)).toBe(true); + expect((opts.links as unknown[]).length).toBe(1); + }); + + it('passes the actionId to the predicate', async () => { + const predicate = jest.fn(() => false); + const action = makeAction('custom-worker:custom-task'); + + const instrumentor = new HatchetInstrumentor({ + useLinksInsteadOfParent: predicate, + }); + + const proto = buildWorkerProto(action); + (instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({ + InternalWorker: { prototype: proto }, + }); + + await proto.handleStartStepRun(action); + + expect(predicate).toHaveBeenCalledWith('custom-worker:custom-task'); + }); + + it('falls back to no links when parent span context is invalid', async () => { + const otelApi = require('@opentelemetry/api'); + jest.spyOn(otelApi.trace, 'isSpanContextValid').mockReturnValueOnce(false); + + const instrumentor = new HatchetInstrumentor({ + useLinksInsteadOfParent: () => true, + }); + const tracer = (instrumentor as unknown as { tracer: ReturnType }).tracer; + + const proto = buildWorkerProto(); + (instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({ + InternalWorker: { prototype: proto }, + }); + + await proto.handleStartStepRun(makeAction()); + + const args = getStartStepRunArgs(tracer); + // Still 3 args (link mode path), but links array is empty + expect(args).toHaveLength(3); + const opts = args[1] as Record; + expect((opts.links as unknown[]).length).toBe(0); + }); +}); diff --git a/sdks/typescript/src/opentelemetry/instrumentor.ts b/sdks/typescript/src/opentelemetry/instrumentor.ts index 36137a02bd..7c9a4945c7 100644 --- a/sdks/typescript/src/opentelemetry/instrumentor.ts +++ b/sdks/typescript/src/opentelemetry/instrumentor.ts @@ -8,7 +8,7 @@ * patching module prototypes to automatically instrument all instances. */ -import type { Context as OtelContext, Span, Attributes } from '@opentelemetry/api'; +import type { Context as OtelContext, Span, Attributes, SpanContext } from '@opentelemetry/api'; import type { InstrumentationConfig } from '@opentelemetry/instrumentation'; @@ -77,6 +77,22 @@ type HatchetInstrumentationConfig = OpenTelemetryConfig & * Configuration for the BatchSpanProcessor that sends spans to the Hatchet collector. */ bspConfig?: HatchetBspConfig; + + /** + * When provided, this predicate is called with the actionId of each incoming step run. + * Returning true causes the step's span to use an OTel span link back to the triggering + * span rather than making that span its parent. This is the correct choice for + * fire-and-forget patterns (e.g. runNoWait fan-outs) where the parent span should close + * as soon as the spawn loop returns, not when the slowest child finishes. + * + * Default: undefined (all spans use parent-child semantics, preserving prior behaviour). + * + * @example + * new HatchetInstrumentor({ + * useLinksInsteadOfParent: (actionId) => actionId.startsWith('fan-out-worker:'), + * }); + */ + useLinksInsteadOfParent?: (actionId: string) => boolean; }; type Carrier = Record; @@ -658,29 +674,42 @@ export class HatchetInstrumentor extends InstrumentationBase + original + .call(this, action) + .then((taskError: Error | undefined) => { + if (taskError instanceof Error) { + span.recordException(taskError); + span.setStatus({ code: SpanStatusCode.ERROR, message: taskError.message }); + } else { + span.setStatus({ code: SpanStatusCode.OK }); + } + return taskError; + }) + .finally(() => { + span.end(); + }); + + const useLinks = getConfig().useLinksInsteadOfParent?.(action.actionId) ?? false; + if (useLinks) { + const parentSpanCtx: SpanContext | undefined = + otelApi.trace.getSpanContext(parentContext); + const links = + parentSpanCtx && otelApi.trace.isSpanContextValid(parentSpanCtx) + ? [{ context: parentSpanCtx }] + : []; + return tracer.startActiveSpan( + spanName, + { kind: SpanKind.CONSUMER, attributes, links }, + runSpan + ); + } + return tracer.startActiveSpan( spanName, - { - kind: SpanKind.CONSUMER, - attributes, - }, + { kind: SpanKind.CONSUMER, attributes }, parentContext, - (span: Span) => { - return original - .call(this, action) - .then((taskError: Error | undefined) => { - if (taskError instanceof Error) { - span.recordException(taskError); - span.setStatus({ code: SpanStatusCode.ERROR, message: taskError.message }); - } else { - span.setStatus({ code: SpanStatusCode.OK }); - } - return taskError; - }) - .finally(() => { - span.end(); - }); - } + runSpan ); }; } From b9721f38c6684132243e22c5840d0149d28e4911 Mon Sep 17 00:00:00 2001 From: Purva Kandalgaonkar <136103488+purva-8@users.noreply.github.com> Date: Sat, 2 May 2026 19:38:04 +0530 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- sdks/typescript/src/opentelemetry/instrumentor.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/typescript/src/opentelemetry/instrumentor.ts b/sdks/typescript/src/opentelemetry/instrumentor.ts index 7c9a4945c7..467a98db83 100644 --- a/sdks/typescript/src/opentelemetry/instrumentor.ts +++ b/sdks/typescript/src/opentelemetry/instrumentor.ts @@ -698,9 +698,12 @@ export class HatchetInstrumentor extends InstrumentationBase