Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 270 additions & 0 deletions sdks/typescript/src/opentelemetry/instrumentor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/**
* Tests for HatchetInstrumentor.useLinksInsteadOfParent option.
*
* Verifies that fire-and-forget child runs use OTel span links instead of
* parent-child relationships when the predicate returns true, while
* preserving the default parent-child behaviour when it returns false.
*/

// Minimal OTel span mock
const makeMockSpan = () => ({
end: jest.fn(),
recordException: jest.fn(),
setStatus: jest.fn(),
spanContext: jest.fn(() => ({
traceId: 'a'.repeat(32),
spanId: 'b'.repeat(16),
traceFlags: 1,
})),
});

// Build a mock tracer whose startActiveSpan captures its arguments.
const makeTracerMock = (span = makeMockSpan()) => {
const calls: IArguments[] = [];
const tracer = {
_calls: calls,
_span: span,
startActiveSpan: jest.fn(function (...args: unknown[]) {
// The last argument is always the callback (fn).
const fn = args[args.length - 1] as (span: unknown) => unknown;
return fn(span);
}),
};
return tracer;
};

// Dummy action used in all tests.
const makeAction = (actionId = 'my-worker:my-task') => ({
actionId,
tenantId: 'tenant-1',
workflowRunId: 'run-1',
taskId: 'task-1',
taskRunExternalId: 'ext-1',
retryCount: 0,
parentWorkflowRunId: undefined,
childWorkflowIndex: undefined,
childWorkflowKey: undefined,
actionPayload: '{}',
jobName: 'my-task',
taskName: 'my-task',
workflowId: 'wf-1',
workflowVersionId: 'wfv-1',
// Encode a fake traceparent in the metadata so extractContext finds a valid context.
additionalMetadata: JSON.stringify({
traceparent: '00-' + 'a'.repeat(32) + '-' + 'b'.repeat(16) + '-01',
}),
});

// ---------------------------------------------------------------------------
// Shared setup: mock @opentelemetry/api and @opentelemetry/instrumentation so
// HatchetInstrumentor can be imported without a real OTel SDK being present.
// ---------------------------------------------------------------------------
Comment thread
purva-8 marked this conversation as resolved.

jest.mock('@opentelemetry/api', () => {
const validSpanCtx = {
traceId: 'a'.repeat(32),
spanId: 'b'.repeat(16),
traceFlags: 1,
};

// SpanKind.CONSUMER = 4, SpanStatusCode.OK = 1, SpanStatusCode.ERROR = 2
return {
SpanKind: { INTERNAL: 0, SERVER: 1, CLIENT: 2, PRODUCER: 3, CONSUMER: 4 },
SpanStatusCode: { UNSET: 0, OK: 1, ERROR: 2 },
context: {
active: jest.fn(() => ({})),
with: jest.fn((_ctx: unknown, fn: () => unknown) => fn()),
},
propagation: {
extract: jest.fn(() => ({ _extracted: true })),
inject: jest.fn(),
},
trace: {
getSpanContext: jest.fn(() => validSpanCtx),
isSpanContextValid: jest.fn(() => true),
},
diag: {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
},
};
});

jest.mock('@opentelemetry/instrumentation', () => {
class InstrumentationBase {
protected tracer: ReturnType<typeof makeTracerMock>;
protected config: Record<string, unknown>;
constructor(_name: string, _version: string, config: Record<string, unknown>) {
this.config = config;
this.tracer = makeTracerMock();
}
getConfig() {
return this.config;
}
setConfig(cfg: Record<string, unknown>) {
this.config = cfg;
}
protected _wrap(
proto: Record<string, unknown>,
method: string,
wrapper: (orig: unknown) => unknown
) {
proto[method] = wrapper(proto[method]);
}
protected _unwrap(proto: Record<string, unknown>, method: string) {
// no-op in tests
}
}

return {
InstrumentationBase,
InstrumentationNodeModuleDefinition: jest.fn(() => ({})),
InstrumentationNodeModuleFile: jest.fn(() => ({})),
isWrapped: jest.fn(() => false),
};
});

// ---------------------------------------------------------------------------
// Import the instrumentor AFTER mocks are set up.
// ---------------------------------------------------------------------------
import { HatchetInstrumentor } from './instrumentor';

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

function buildWorkerProto(action = makeAction()) {
const proto = {
workerId: 'worker-1',
handleStartStepRun: jest.fn().mockResolvedValue(undefined),
};
return proto;
}

/**
* Returns the `startActiveSpan` call arguments for the `hatchet.start_step_run` span.
* startActiveSpan is overloaded:
* (name, opts, context, fn) → parent-child mode (fn is 4th arg, index 3)
* (name, opts, fn) → link mode (fn is 3rd arg, index 2)
*/
function getStartStepRunArgs(tracer: ReturnType<typeof makeTracerMock>) {
const call = (tracer.startActiveSpan as jest.Mock).mock.calls.find(
([name]: [string]) => typeof name === 'string' && name.startsWith('hatchet.start_step_run')
);
expect(call).toBeDefined();
return call as unknown[];
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

describe('HatchetInstrumentor.useLinksInsteadOfParent', () => {
it('uses parent-child semantics by default (no option provided)', async () => {
const instrumentor = new HatchetInstrumentor({});
const tracer = (instrumentor as unknown as { tracer: ReturnType<typeof makeTracerMock> }).tracer;
tracer.startActiveSpan = makeTracerMock()._span
? jest.fn((...args: unknown[]) => {
const fn = args[args.length - 1] as (span: unknown) => unknown;
return fn(makeMockSpan());
})
: tracer.startActiveSpan;
Comment thread
purva-8 marked this conversation as resolved.

const proto = buildWorkerProto();
(instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({
InternalWorker: { prototype: proto },
});

await proto.handleStartStepRun(makeAction());

const args = getStartStepRunArgs(tracer);
// 4 args → (name, opts, parentContext, fn)
expect(args).toHaveLength(4);
// opts should NOT include links
const opts = args[1] as Record<string, unknown>;
expect(opts.links).toBeUndefined();
});

it('uses parent-child semantics when predicate returns false', async () => {
const instrumentor = new HatchetInstrumentor({
useLinksInsteadOfParent: () => false,
});
const tracer = (instrumentor as unknown as { tracer: ReturnType<typeof makeTracerMock> }).tracer;

const proto = buildWorkerProto();
(instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({
InternalWorker: { prototype: proto },
});

await proto.handleStartStepRun(makeAction());

const args = getStartStepRunArgs(tracer);
expect(args).toHaveLength(4);
const opts = args[1] as Record<string, unknown>;
expect(opts.links).toBeUndefined();
});

it('uses span links and no parent context when predicate returns true', async () => {
const instrumentor = new HatchetInstrumentor({
useLinksInsteadOfParent: () => true,
});
const tracer = (instrumentor as unknown as { tracer: ReturnType<typeof makeTracerMock> }).tracer;

const proto = buildWorkerProto();
(instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({
InternalWorker: { prototype: proto },
});

await proto.handleStartStepRun(makeAction());

const args = getStartStepRunArgs(tracer);
// 3 args → (name, opts, fn) — no parent context
expect(args).toHaveLength(3);
const opts = args[1] as Record<string, unknown>;
expect(Array.isArray(opts.links)).toBe(true);
expect((opts.links as unknown[]).length).toBe(1);
});

it('passes the actionId to the predicate', async () => {
const predicate = jest.fn(() => false);
const action = makeAction('custom-worker:custom-task');

const instrumentor = new HatchetInstrumentor({
useLinksInsteadOfParent: predicate,
});

const proto = buildWorkerProto(action);
(instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({
InternalWorker: { prototype: proto },
});

await proto.handleStartStepRun(action);

expect(predicate).toHaveBeenCalledWith('custom-worker:custom-task');
});

it('falls back to no links when parent span context is invalid', async () => {
const otelApi = require('@opentelemetry/api');
jest.spyOn(otelApi.trace, 'isSpanContextValid').mockReturnValueOnce(false);

const instrumentor = new HatchetInstrumentor({
useLinksInsteadOfParent: () => true,
});
const tracer = (instrumentor as unknown as { tracer: ReturnType<typeof makeTracerMock> }).tracer;

const proto = buildWorkerProto();
(instrumentor as unknown as { patchWorker: (e: unknown) => void }).patchWorker({
InternalWorker: { prototype: proto },
});

await proto.handleStartStepRun(makeAction());

const args = getStartStepRunArgs(tracer);
// Still 3 args (link mode path), but links array is empty
expect(args).toHaveLength(3);
const opts = args[1] as Record<string, unknown>;
expect((opts.links as unknown[]).length).toBe(0);
});
});
74 changes: 53 additions & 21 deletions sdks/typescript/src/opentelemetry/instrumentor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* patching module prototypes to automatically instrument all instances.
*/

import type { Context as OtelContext, Span, Attributes } from '@opentelemetry/api';
import type { Context as OtelContext, Span, Attributes, SpanContext } from '@opentelemetry/api';

import type { InstrumentationConfig } from '@opentelemetry/instrumentation';

Expand Down Expand Up @@ -77,6 +77,22 @@ type HatchetInstrumentationConfig = OpenTelemetryConfig &
* Configuration for the BatchSpanProcessor that sends spans to the Hatchet collector.
*/
bspConfig?: HatchetBspConfig;

/**
* When provided, this predicate is called with the actionId of each incoming step run.
* Returning true causes the step's span to use an OTel span link back to the triggering
* span rather than making that span its parent. This is the correct choice for
* fire-and-forget patterns (e.g. runNoWait fan-outs) where the parent span should close
* as soon as the spawn loop returns, not when the slowest child finishes.
*
* Default: undefined (all spans use parent-child semantics, preserving prior behaviour).
*
* @example
* new HatchetInstrumentor({
* useLinksInsteadOfParent: (actionId) => actionId.startsWith('fan-out-worker:'),
* });
*/
useLinksInsteadOfParent?: (actionId: string) => boolean;
};
type Carrier = Record<string, string>;

Expand Down Expand Up @@ -658,29 +674,45 @@ export class HatchetInstrumentor extends InstrumentationBase<HatchetInstrumentat
}
setHatchetSpanAttributes(hatchetAttrs);

const runSpan = (span: Span) =>
original
.call(this, action)
.then((taskError: Error | undefined) => {
if (taskError instanceof Error) {
span.recordException(taskError);
span.setStatus({ code: SpanStatusCode.ERROR, message: taskError.message });
} else {
span.setStatus({ code: SpanStatusCode.OK });
}
return taskError;
})
.finally(() => {
span.end();
});

const useLinks = getConfig().useLinksInsteadOfParent?.(action.actionId) ?? false;
if (useLinks) {
const parentSpanCtx: SpanContext | undefined =
otelApi.trace.getSpanContext(parentContext);
const links =
parentSpanCtx && otelApi.trace.isSpanContextValid(parentSpanCtx)
? [{ context: parentSpanCtx }]
: [];
// Using ROOT_CONTEXT prevents unrelated ambient spans on the worker
// from turning this link-only span into a child in the wrong trace.
return tracer.startActiveSpan(
spanName,
{ kind: SpanKind.CONSUMER, attributes, links },
Comment thread
purva-8 marked this conversation as resolved.
otelApi.ROOT_CONTEXT,
runSpan
);
}

return tracer.startActiveSpan(
spanName,
{
kind: SpanKind.CONSUMER,
attributes,
},
{ kind: SpanKind.CONSUMER, attributes },
parentContext,
(span: Span) => {
return original
.call(this, action)
.then((taskError: Error | undefined) => {
if (taskError instanceof Error) {
span.recordException(taskError);
span.setStatus({ code: SpanStatusCode.ERROR, message: taskError.message });
} else {
span.setStatus({ code: SpanStatusCode.OK });
}
return taskError;
})
.finally(() => {
span.end();
});
}
runSpan
);
};
}
Expand Down