Skip to content

Commit a1cca64

Browse files
mdnanocomclaudeandreiborza
authored
fix(node): Preserve CallbackManager handlers in LangChain instrumentation (#20849)
## Summary `augmentCallbackHandlers` in `packages/node/src/integrations/tracing/langchain/instrumentation.ts` previously fell into its `typeof handlers === 'object'` branch whenever `options.callbacks` was a single object and wrapped it into `[handlers, sentryHandler]`. When that object was a LangChain `CallbackManager`, downstream code treats the whole manager as one opaque handler — its inheritable children (notably LangGraph's `StreamMessagesHandler` installed by `streamMode: ['messages']`, and the LangSmith tracer) are never unpacked, so per-token streaming events and nested tracing silently disappear. This PR detects `CallbackManager` via duck-typing (avoids coupling to a specific `@langchain/core` resolution) and registers Sentry's handler as an inheritable child on a copy via `.addHandler(handler, true)`. The manager's existing children continue to receive `handleLLMNewToken` and friends. ## Repro A LangGraph compiled graph + `ChatOpenAI` (or any LangChain provider with `bindTools(...)`), driven through `@ai-sdk/langchain`'s `toUIMessageStream`: ```ts const stream = await graph.stream( { messages }, { streamMode: ['values', 'messages'] }, ); return createUIMessageStreamResponse({ stream: toUIMessageStream(stream) }); ``` - **Without this fix**: the SSE output collapses to a single aggregated `text-delta` at end of run. Probing inside `_streamResponseChunks` shows the per-token chunks fire `runManager.handleLLMNewToken` correctly, but the LLM-level run-manager's handlers list contains only `[CallbackManager, SentryCallbackHandler, langchain_tracer]` — `StreamMessagesHandler` is missing. - **With this fix**: every token is delivered as the model produces it (verified end-to-end against `gpt-5.5` + `useResponsesApi: true` + `bindTools` and against `gpt-4o-mini` with no tools). ## Tests Added `packages/node/test/integrations/tracing/langchain.test.ts` with 7 unit tests covering: - `undefined` callbacks → single-handler array - pre-existing array → append (idempotent) - `CallbackManager` → preserves children, copies rather than mutates, deduplicates - opaque non-manager object → returned unchanged ## Verify - [x] Added unit tests for the fixed behaviour - [x] `yarn build` succeeds - [x] `yarn test:unit` passes (381/381 in `packages/node`) - [ ] No related issue currently open — happy to open one if preferred. --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Andrei Borza <andrei.borza@sentry.io>
1 parent 88f871c commit a1cca64

7 files changed

Lines changed: 149 additions & 102 deletions

File tree

packages/core/src/shared-exports.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ export { instrumentGoogleGenAIClient } from './tracing/google-genai';
173173
export { GOOGLE_GENAI_INTEGRATION_NAME } from './tracing/google-genai/constants';
174174
export type { GoogleGenAIResponse } from './tracing/google-genai/types';
175175
export { createLangChainCallbackHandler, instrumentLangChainEmbeddings } from './tracing/langchain';
176+
export { _INTERNAL_mergeLangChainCallbackHandler } from './tracing/langchain/utils';
176177
export { LANGCHAIN_INTEGRATION_NAME } from './tracing/langchain/constants';
177178
export type { LangChainOptions, LangChainIntegration } from './tracing/langchain/types';
178179
export { instrumentStateGraphCompile, instrumentCreateReactAgent, instrumentLangGraph } from './tracing/langgraph';

packages/core/src/tracing/langchain/utils.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,3 +537,51 @@ export function extractToolDefinitions(extraParams?: Record<string, unknown>): s
537537
});
538538
return JSON.stringify(toolDefs);
539539
}
540+
541+
/** Duck-types a LangChain `CallbackManager` (avoids coupling to a specific `@langchain/core` resolution). */
542+
function isCallbackManager(value: unknown): value is {
543+
addHandler: (handler: unknown, inherit?: boolean) => void;
544+
copy: () => unknown;
545+
handlers?: unknown[];
546+
} {
547+
if (!value || typeof value !== 'object') {
548+
return false;
549+
}
550+
const candidate = value as { addHandler?: unknown; copy?: unknown };
551+
return typeof candidate.addHandler === 'function' && typeof candidate.copy === 'function';
552+
}
553+
554+
function isSentryHandler(handler: unknown): boolean {
555+
return typeof handler === 'object' && (handler as Record<string, unknown>)?.name === 'SentryCallbackHandler';
556+
}
557+
558+
function containsSentryHandler(handlers: unknown[]): boolean {
559+
return handlers.some(isSentryHandler);
560+
}
561+
562+
/**
563+
* Merge `sentryHandler` into a given set of LangChain callbacks or callback manager.
564+
* @internal Exported for cross-package instrumentation.
565+
*/
566+
export function _INTERNAL_mergeLangChainCallbackHandler(existing: unknown, sentryHandler: unknown): unknown {
567+
if (!existing) {
568+
return [sentryHandler];
569+
}
570+
571+
if (isCallbackManager(existing)) {
572+
if (containsSentryHandler(existing.handlers ?? [])) {
573+
return existing;
574+
}
575+
576+
const copied = existing.copy() as { addHandler: (handler: unknown, inherit?: boolean) => void };
577+
copied.addHandler(sentryHandler, true);
578+
return copied;
579+
}
580+
581+
const handlers = Array.isArray(existing) ? existing : [existing];
582+
if (containsSentryHandler(handlers)) {
583+
return existing;
584+
}
585+
586+
return [...handlers, sentryHandler];
587+
}

packages/core/src/tracing/langgraph/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import {
3030
extractAgentNameFromParams,
3131
extractLLMFromParams,
3232
extractToolsFromCompiledGraph,
33-
mergeSentryCallback,
3433
setResponseAttributes,
3534
wrapToolsWithSpans,
3635
} from './utils';
36+
import { _INTERNAL_mergeLangChainCallbackHandler } from '../langchain/utils';
3737

3838
let _insideCreateReactAgent = false;
3939

@@ -179,7 +179,10 @@ function instrumentCompiledGraphInvoke(
179179
...(typeof graphName === 'string' ? { lc_agent_name: graphName } : {}),
180180
};
181181

182-
invokeConfig.callbacks = mergeSentryCallback(invokeConfig.callbacks, sentryCallbackHandler);
182+
invokeConfig.callbacks = _INTERNAL_mergeLangChainCallbackHandler(
183+
invokeConfig.callbacks,
184+
sentryCallbackHandler,
185+
);
183186
}
184187

185188
// Extract available tools from the graph instance

packages/core/src/tracing/langgraph/utils.ts

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -334,27 +334,3 @@ export function setResponseAttributes(span: Span, inputMessages: LangChainMessag
334334
span.setAttribute(GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, totalTokens);
335335
}
336336
}
337-
338-
/** Merge `sentryHandler` into a langchain `callbacks` value (`BaseCallbackHandler[]` or `BaseCallbackManager`). */
339-
export function mergeSentryCallback(existing: unknown, sentryHandler: unknown): unknown {
340-
if (!existing) {
341-
return [sentryHandler];
342-
}
343-
344-
if (Array.isArray(existing)) {
345-
if (existing.includes(sentryHandler)) {
346-
return existing;
347-
}
348-
return [...existing, sentryHandler];
349-
}
350-
351-
const manager = existing as { addHandler?: (h: unknown) => void; handlers?: unknown[] };
352-
if (typeof manager.addHandler === 'function') {
353-
const alreadyAdded = Array.isArray(manager.handlers) && manager.handlers.includes(sentryHandler);
354-
if (!alreadyAdded) {
355-
manager.addHandler(sentryHandler);
356-
}
357-
}
358-
359-
return existing;
360-
}

packages/core/test/lib/tracing/langchain-utils.test.ts

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
import { describe, expect, it } from 'vitest';
1+
import { describe, expect, it, vi } from 'vitest';
22
import { GEN_AI_INPUT_MESSAGES_ATTRIBUTE } from '../../../src/tracing/ai/gen-ai-attributes';
33
import type { LangChainMessage } from '../../../src/tracing/langchain/types';
4-
import { extractChatModelRequestAttributes, normalizeLangChainMessages } from '../../../src/tracing/langchain/utils';
4+
import {
5+
_INTERNAL_mergeLangChainCallbackHandler,
6+
extractChatModelRequestAttributes,
7+
normalizeLangChainMessages,
8+
} from '../../../src/tracing/langchain/utils';
59

610
describe('normalizeLangChainMessages', () => {
711
it('normalizes messages with _getType()', () => {
@@ -246,3 +250,88 @@ describe('extractChatModelRequestAttributes with multimodal content', () => {
246250
expect(inputMessages).toContain('What is in this image?');
247251
});
248252
});
253+
254+
describe('_INTERNAL_mergeLangChainCallbackHandler', () => {
255+
const sentryHandler = { name: 'SentryCallbackHandler' };
256+
257+
function makeFakeCallbackManager(existingHandlers: unknown[] = []) {
258+
const manager = {
259+
handlers: [...existingHandlers],
260+
inheritableHandlers: [...existingHandlers],
261+
addHandler: vi.fn(function (this: any, handler: unknown, inherit?: boolean) {
262+
this.handlers.push(handler);
263+
if (inherit !== false) {
264+
this.inheritableHandlers.push(handler);
265+
}
266+
}),
267+
copy: vi.fn(function (this: any) {
268+
return makeFakeCallbackManager(this.handlers);
269+
}),
270+
};
271+
return manager;
272+
}
273+
274+
it('returns a fresh array when no existing callbacks are present', () => {
275+
expect(_INTERNAL_mergeLangChainCallbackHandler(undefined, sentryHandler)).toStrictEqual([sentryHandler]);
276+
expect(_INTERNAL_mergeLangChainCallbackHandler(null, sentryHandler)).toStrictEqual([sentryHandler]);
277+
});
278+
279+
it('appends to an existing callbacks array', () => {
280+
const userA = { _user: 'A' };
281+
const userB = { _user: 'B' };
282+
expect(_INTERNAL_mergeLangChainCallbackHandler([userA, userB], sentryHandler)).toStrictEqual([
283+
userA,
284+
userB,
285+
sentryHandler,
286+
]);
287+
});
288+
289+
it('does not duplicate when the sentry handler is already in the array', () => {
290+
const userA = { _user: 'A' };
291+
const existing = [userA, sentryHandler];
292+
expect(_INTERNAL_mergeLangChainCallbackHandler(existing, sentryHandler)).toBe(existing);
293+
});
294+
295+
it('preserves inheritable handlers when callbacks is a CallbackManager', () => {
296+
// Reproduces the LangGraph `streamMode: ['messages']` setup: a
297+
// CallbackManager carrying a StreamMessagesHandler is passed via
298+
// options.callbacks. Wrapping it as `[manager, sentryHandler]` would
299+
// drop the manager's inheritable children — instead we register
300+
// Sentry on a copy and keep the existing handler chain intact.
301+
const streamMessagesHandler = { name: 'StreamMessagesHandler', lc_prefer_streaming: true };
302+
const manager = makeFakeCallbackManager([streamMessagesHandler]);
303+
const result = _INTERNAL_mergeLangChainCallbackHandler(manager, sentryHandler) as { handlers: unknown[] };
304+
expect(Array.isArray(result)).toBe(false);
305+
expect(result.handlers).toEqual([streamMessagesHandler, sentryHandler]);
306+
});
307+
308+
it('copies the manager and registers Sentry as an inheritable handler', () => {
309+
const manager = makeFakeCallbackManager([]);
310+
const result = _INTERNAL_mergeLangChainCallbackHandler(manager, sentryHandler) as {
311+
addHandler: ReturnType<typeof vi.fn>;
312+
inheritableHandlers: unknown[];
313+
};
314+
expect(manager.copy).toHaveBeenCalledTimes(1);
315+
expect(manager.handlers).toEqual([]);
316+
expect(result.addHandler).toHaveBeenCalledWith(sentryHandler, true);
317+
expect(result.inheritableHandlers).toEqual([sentryHandler]);
318+
});
319+
320+
it('returns the manager unchanged without copying when it already contains the handler', () => {
321+
const manager = makeFakeCallbackManager([sentryHandler]);
322+
const result = _INTERNAL_mergeLangChainCallbackHandler(manager, sentryHandler);
323+
expect(result).toBe(manager);
324+
expect(manager.copy).not.toHaveBeenCalled();
325+
expect(manager.addHandler).not.toHaveBeenCalled();
326+
});
327+
328+
it('wraps a lone callback object into an array with the sentry handler', () => {
329+
const opaque = { name: 'NotAManager' };
330+
expect(_INTERNAL_mergeLangChainCallbackHandler(opaque, sentryHandler)).toStrictEqual([opaque, sentryHandler]);
331+
});
332+
333+
it('returns unchanged when the lone callback object is already a sentry handler', () => {
334+
const existing = { name: 'SentryCallbackHandler' };
335+
expect(_INTERNAL_mergeLangChainCallbackHandler(existing, sentryHandler)).toBe(existing);
336+
});
337+
});
Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
1-
import { describe, expect, it, vi } from 'vitest';
2-
import {
3-
extractAgentNameFromParams,
4-
extractLLMFromParams,
5-
mergeSentryCallback,
6-
} from '../../../src/tracing/langgraph/utils';
1+
import { describe, expect, it } from 'vitest';
2+
import { extractAgentNameFromParams, extractLLMFromParams } from '../../../src/tracing/langgraph/utils';
73

84
describe('extractLLMFromParams', () => {
95
it('returns null for empty or invalid args', () => {
@@ -44,40 +40,3 @@ describe('extractAgentNameFromParams', () => {
4440
expect(extractAgentNameFromParams([{ name: 'my_agent' }])).toBe('my_agent');
4541
});
4642
});
47-
48-
describe('mergeSentryCallback', () => {
49-
const sentryHandler = { _sentry: true };
50-
51-
it('returns a fresh array when no existing callbacks are present', () => {
52-
expect(mergeSentryCallback(undefined, sentryHandler)).toStrictEqual([sentryHandler]);
53-
expect(mergeSentryCallback(null, sentryHandler)).toStrictEqual([sentryHandler]);
54-
});
55-
56-
it('appends to an existing callbacks array', () => {
57-
const userA = { _user: 'A' };
58-
const userB = { _user: 'B' };
59-
expect(mergeSentryCallback([userA, userB], sentryHandler)).toStrictEqual([userA, userB, sentryHandler]);
60-
});
61-
62-
it('does not duplicate when the sentry handler is already in the array', () => {
63-
const userA = { _user: 'A' };
64-
const existing = [userA, sentryHandler];
65-
expect(mergeSentryCallback(existing, sentryHandler)).toBe(existing);
66-
});
67-
68-
it('calls addHandler on a CallbackManager-like object', () => {
69-
const addHandler = vi.fn();
70-
const manager = { addHandler, handlers: [] as unknown[] };
71-
const result = mergeSentryCallback(manager, sentryHandler);
72-
expect(result).toBe(manager);
73-
expect(addHandler).toHaveBeenCalledWith(sentryHandler);
74-
expect(addHandler).toHaveBeenCalledTimes(1);
75-
});
76-
77-
it('does not re-add when the manager already has the sentry handler', () => {
78-
const addHandler = vi.fn();
79-
const manager = { addHandler, handlers: [sentryHandler] };
80-
mergeSentryCallback(manager, sentryHandler);
81-
expect(addHandler).not.toHaveBeenCalled();
82-
});
83-
});

packages/node/src/integrations/tracing/langchain/instrumentation.ts

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
} from '@opentelemetry/instrumentation';
88
import type { LangChainOptions } from '@sentry/core';
99
import {
10+
_INTERNAL_mergeLangChainCallbackHandler,
1011
_INTERNAL_skipAiProviderWrapping,
1112
ANTHROPIC_AI_INTEGRATION_NAME,
1213
createLangChainCallbackHandler,
@@ -27,34 +28,6 @@ interface PatchedLangChainExports {
2728
[key: string]: unknown;
2829
}
2930

30-
/**
31-
* Augments a callback handler list with Sentry's handler if not already present
32-
*/
33-
function augmentCallbackHandlers(handlers: unknown, sentryHandler: unknown): unknown {
34-
// Handle null/undefined - return array with just our handler
35-
if (!handlers) {
36-
return [sentryHandler];
37-
}
38-
39-
// If handlers is already an array
40-
if (Array.isArray(handlers)) {
41-
// Check if our handler is already in the list
42-
if (handlers.includes(sentryHandler)) {
43-
return handlers;
44-
}
45-
// Add our handler to the list
46-
return [...handlers, sentryHandler];
47-
}
48-
49-
// If it's a single handler object, convert to array
50-
if (typeof handlers === 'object') {
51-
return [handlers, sentryHandler];
52-
}
53-
54-
// Unknown type - return original
55-
return handlers;
56-
}
57-
5831
/**
5932
* Wraps Runnable methods (invoke, stream, batch) to inject Sentry callbacks at request time
6033
* Uses a Proxy to intercept method calls and augment the options.callbacks
@@ -82,9 +55,7 @@ function wrapRunnableMethod(
8255
}
8356

8457
// Inject our callback handler into options.callbacks (request time callbacks)
85-
const existingCallbacks = options.callbacks;
86-
const augmentedCallbacks = augmentCallbackHandlers(existingCallbacks, sentryHandler);
87-
options.callbacks = augmentedCallbacks;
58+
options.callbacks = _INTERNAL_mergeLangChainCallbackHandler(options.callbacks, sentryHandler);
8859

8960
// Call original method with augmented options
9061
return Reflect.apply(target, thisArg, args);

0 commit comments

Comments
 (0)