From 6faa8672f3e72b8e380d276bc3eb56d053bb20c5 Mon Sep 17 00:00:00 2001 From: Valentin Cocaud Date: Mon, 10 Feb 2025 11:49:53 +0100 Subject: [PATCH] [otel] rely on context for parenting spans correctly --- ...h_plugin-opentelemetry-532-dependencies.md | 7 + e2e/opentelemetry/opentelemetry.e2e.ts | 119 ++-- package.json | 1 + packages/plugins/opentelemetry/package.json | 2 + .../opentelemetry/src/contextManager.ts | 42 ++ .../plugins/opentelemetry/src/plugin-utils.ts | 104 +++ packages/plugins/opentelemetry/src/plugin.ts | 608 +++++++++++------- packages/plugins/opentelemetry/src/spans.ts | 326 ++++++---- packages/plugins/opentelemetry/src/utils.ts | 48 ++ 9 files changed, 872 insertions(+), 385 deletions(-) create mode 100644 .changeset/@graphql-mesh_plugin-opentelemetry-532-dependencies.md create mode 100644 packages/plugins/opentelemetry/src/contextManager.ts create mode 100644 packages/plugins/opentelemetry/src/plugin-utils.ts create mode 100644 packages/plugins/opentelemetry/src/utils.ts diff --git a/.changeset/@graphql-mesh_plugin-opentelemetry-532-dependencies.md b/.changeset/@graphql-mesh_plugin-opentelemetry-532-dependencies.md new file mode 100644 index 000000000..ff6e3308c --- /dev/null +++ b/.changeset/@graphql-mesh_plugin-opentelemetry-532-dependencies.md @@ -0,0 +1,7 @@ +--- +'@graphql-mesh/plugin-opentelemetry': patch +--- + +dependencies updates: + +- Added dependency [`@opentelemetry/context-async-hooks@^1.30.1` ↗︎](https://www.npmjs.com/package/@opentelemetry/context-async-hooks/v/1.30.1) (to `dependencies`) diff --git a/e2e/opentelemetry/opentelemetry.e2e.ts b/e2e/opentelemetry/opentelemetry.e2e.ts index b16a1b2ee..498056c79 100644 --- a/e2e/opentelemetry/opentelemetry.e2e.ts +++ b/e2e/opentelemetry/opentelemetry.e2e.ts @@ -24,15 +24,18 @@ beforeAll(async () => { type JaegerTracesApiResponse = { data: Array<{ traceID: string; - spans: Array<{ - traceID: string; - spanID: string; - operationName: string; - tags: Array<{ key: string; value: string; type: string }>; - }>; + spans: JaegerTraceSpan[]; }>; }; +type JaegerTraceSpan = { + traceID: string; + spanID: string; + operationName: string; + tags: Array<{ key: string; value: string; type: string }>; + references: Array<{ refType: string; spanID: string; traceID: string }>; +}; + describe('OpenTelemetry', () => { (['grpc', 'http'] as const).forEach((OTLP_EXPORTER_TYPE) => { describe(`exporter > ${OTLP_EXPORTER_TYPE}`, () => { @@ -77,6 +80,9 @@ describe('OpenTelemetry', () => { await checkFn(res); return; } catch (e) { + if (signal.aborted) { + throw err; + } err = e; } } @@ -559,40 +565,52 @@ describe('OpenTelemetry', () => { expect(relevantTraces.length).toBe(1); const relevantTrace = relevantTraces[0]; expect(relevantTrace).toBeDefined(); - expect(relevantTrace?.spans.length).toBe(11); + expect(relevantTrace!.spans.length).toBe(18); - expect(relevantTrace?.spans).toContainEqual( - expect.objectContaining({ operationName: 'POST /graphql' }), - ); - expect(relevantTrace?.spans).toContainEqual( - expect.objectContaining({ operationName: 'graphql.parse' }), - ); - expect(relevantTrace?.spans).toContainEqual( - expect.objectContaining({ operationName: 'graphql.validate' }), - ); - expect(relevantTrace?.spans).toContainEqual( - expect.objectContaining({ operationName: 'graphql.execute' }), + const spanTree = buildSpanTree(relevantTrace!.spans, 'POST /graphql'); + expect(spanTree).toBeDefined(); + + const expectedHttpChildren = [ + 'graphql.parse', + 'graphql.validate', + 'graphql.execute', + ]; + expect(spanTree!.children).toHaveLength(3); + for (const operationName of expectedHttpChildren) { + expect(spanTree?.children).toContainEqual( + expect.objectContaining({ + span: expect.objectContaining({ operationName }), + }), + ); + } + + const executeSpan = spanTree?.children.find( + ({ span }) => span.operationName === 'graphql.execute', ); - expect( - relevantTrace?.spans.filter( - (r) => r.operationName === 'subgraph.execute (accounts)', - ).length, - ).toBe(2); - expect( - relevantTrace?.spans.filter( - (r) => r.operationName === 'subgraph.execute (products)', - ).length, - ).toBe(2); - expect( - relevantTrace?.spans.filter( - (r) => r.operationName === 'subgraph.execute (inventory)', - ).length, - ).toBe(1); - expect( - relevantTrace?.spans.filter( - (r) => r.operationName === 'subgraph.execute (reviews)', - ).length, - ).toBe(2); + + const expectedExecuteChildren = [ + ['subgraph.execute (accounts)', 2], + ['subgraph.execute (products)', 2], + ['subgraph.execute (inventory)', 1], + ['subgraph.execute (reviews)', 2], + ] as const; + + for (const [operationName, count] of expectedExecuteChildren) { + const matchingChildren = executeSpan!.children.filter( + ({ span }) => span.operationName === operationName, + ); + expect(matchingChildren).toHaveLength(count); + for (const child of matchingChildren) { + expect(child.children).toHaveLength(1); + expect(child.children).toContainEqual( + expect.objectContaining({ + span: expect.objectContaining({ + operationName: 'http.fetch', + }), + }), + ); + } + } }); }); @@ -1279,3 +1297,28 @@ describe('OpenTelemetry', () => { }); }); }); + +type TraceTreeNode = { + span: JaegerTraceSpan; + children: TraceTreeNode[]; +}; +function buildSpanTree( + spans: JaegerTraceSpan[], + rootName: string, +): TraceTreeNode | undefined { + function buildNode(root: JaegerTraceSpan): TraceTreeNode { + return { + span: root, + children: spans + .filter((span) => + span.references.find( + (ref) => ref.refType === 'CHILD_OF' && ref.spanID === root.spanID, + ), + ) + .map(buildNode), + }; + } + + const root = spans.find((span) => span.operationName === rootName); + return root && buildNode(root); +} diff --git a/package.json b/package.json index 11e7b2f1b..6b2ba5f22 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "vitest": "^3.0.1" }, "resolutions": { + "@envelop/core": "5.1.0-alpha-20250206123608-cd323a08c43c066eb34cba698d61b4f059ab5ee5", "@graphql-tools/delegate": "workspace:^", "@opentelemetry/exporter-trace-otlp-http": "patch:@opentelemetry/exporter-trace-otlp-http@npm%3A0.56.0#~/.yarn/patches/@opentelemetry-exporter-trace-otlp-http-npm-0.56.0-dddd282e41.patch", "@opentelemetry/otlp-exporter-base": "patch:@opentelemetry/otlp-exporter-base@npm%3A0.56.0#~/.yarn/patches/@opentelemetry-otlp-exporter-base-npm-0.56.0-ba3dc5f5c5.patch", diff --git a/packages/plugins/opentelemetry/package.json b/packages/plugins/opentelemetry/package.json index 55f79f9db..890c65d74 100644 --- a/packages/plugins/opentelemetry/package.json +++ b/packages/plugins/opentelemetry/package.json @@ -50,6 +50,7 @@ "@graphql-mesh/utils": "^0.103.6", "@graphql-tools/utils": "^10.7.0", "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^1.30.1", "@opentelemetry/exporter-trace-otlp-grpc": "^0.57.0", "@opentelemetry/exporter-trace-otlp-http": "^0.57.0", "@opentelemetry/exporter-zipkin": "^1.29.0", @@ -62,6 +63,7 @@ "tslib": "^2.8.1" }, "devDependencies": { + "@whatwg-node/server": "^0.9.65", "graphql": "^16.9.0", "graphql-yoga": "^5.10.11", "pkgroll": "2.8.2" diff --git a/packages/plugins/opentelemetry/src/contextManager.ts b/packages/plugins/opentelemetry/src/contextManager.ts new file mode 100644 index 000000000..f4bae5c2b --- /dev/null +++ b/packages/plugins/opentelemetry/src/contextManager.ts @@ -0,0 +1,42 @@ +import { trace, type Context } from '@opentelemetry/api'; + +type Node = { + ctx: Context; + previous?: Node; +}; + +export class OtelContextStack { + #root: Node; + #current: Node; + + constructor(root: Context) { + this.#root = { ctx: root }; + this.#current = this.#root; + } + + get current(): Context { + return this.#current.ctx; + } + + get root(): Context { + return this.#root.ctx; + } + + push = (ctx: Context) => { + this.#current = { ctx, previous: this.#current }; + }; + + pop = () => { + this.#current = this.#current.previous ?? this.#root; + }; + + toString() { + let node: Node | undefined = this.#current; + const names = []; + while (node != undefined) { + names.push((trace.getSpan(node.ctx) as unknown as { name: string }).name); + node = node.previous; + } + return names.join(' -> '); + } +} diff --git a/packages/plugins/opentelemetry/src/plugin-utils.ts b/packages/plugins/opentelemetry/src/plugin-utils.ts new file mode 100644 index 000000000..8a7a0bd12 --- /dev/null +++ b/packages/plugins/opentelemetry/src/plugin-utils.ts @@ -0,0 +1,104 @@ +import type { ExecutionRequest } from '@graphql-tools/utils'; + +export function withState< + P, + GraphqlState = object, + HttpState = object, + SubExecState = object, +>(plugin: WithState): P { + const states: { + forRequest?: WeakMap>; + forOperation?: WeakMap>; + forSubgraphExecution?: WeakMap>; + } = {}; + + function getProp(scope: keyof typeof states, key: any): PropertyDescriptor { + return { + get() { + if (!states[scope]) states[scope] = new WeakMap(); + let value = states[scope].get(key as any); + if (!value) states[scope].set(key, (value = {})); + return value; + }, + enumerable: true, + }; + } + + const pluginWithState: Record unknown> = {}; + for (const [hookName, hook] of Object.entries(plugin) as any) { + pluginWithState[hookName] = (payload) => + hook({ + ...payload, + get state() { + let { executionRequest, context, request } = payload; + + const state = {}; + const defineState = (scope: keyof typeof states, key: any) => + Object.defineProperty(state, scope, getProp(scope, key)); + + if (executionRequest) { + defineState('forSubgraphExecution', executionRequest); + if (executionRequest.context) context = executionRequest.context; + } + if (context) { + defineState('forOperation', context); + if (context.request) request = context.request; + } + if (request) { + defineState('forRequest', request); + } + return state; + }, + }); + } + + return pluginWithState as P; +} + +export type HttpState = { + forRequest: Partial; +}; + +export type GraphQLState = { + forOperation: Partial; +}; + +export type GatewayState = { + forSubgraphExecution: Partial; +}; + +export function getMostSpecificState( + state: Partial & GraphQLState & GatewayState> = {}, +): Partial | undefined { + const { forOperation, forRequest, forSubgraphExecution } = state; + return forSubgraphExecution ?? forOperation ?? forRequest; +} + +// Brace yourself! TS Wizardry is coming! + +type PayloadWithState = T extends { + executionRequest: any; +} + ? T & { + state: Partial & GraphQLState> & + GatewayState; + } + : T extends { + executionRequest?: any; + } + ? T & { + state: Partial< + HttpState & GraphQLState & GatewayState + >; + } + : T extends { context: any } + ? T & { state: HttpState & GraphQLState } + : T extends { request: any } + ? T & { state: HttpState } + : T; + +type WithState = { + [K in keyof P]: P[K] extends ((payload: infer T) => infer R) | undefined + ? (payload: PayloadWithState) => R | undefined + : P[K]; +}; diff --git a/packages/plugins/opentelemetry/src/plugin.ts b/packages/plugins/opentelemetry/src/plugin.ts index cce3fe667..099dcc536 100644 --- a/packages/plugins/opentelemetry/src/plugin.ts +++ b/packages/plugins/opentelemetry/src/plugin.ts @@ -1,52 +1,75 @@ import { + type OnContextBuildingEventPayload, type OnExecuteEventPayload, type OnParseEventPayload, type OnValidateEventPayload, } from '@envelop/types'; import { type GatewayPlugin } from '@graphql-hive/gateway-runtime'; import type { OnSubgraphExecutePayload } from '@graphql-mesh/fusion-runtime'; -import type { Logger, OnFetchHookPayload } from '@graphql-mesh/types'; +import type { + Logger, + OnFetchHookDone, + OnFetchHookPayload, +} from '@graphql-mesh/types'; import { getHeadersObj } from '@graphql-mesh/utils'; -import { - fakePromise, - isAsyncIterable, - MaybePromise, -} from '@graphql-tools/utils'; +import { createDeferred, isPromise, MaybePromise } from '@graphql-tools/utils'; import { context, diag, DiagLogLevel, propagation, + ROOT_CONTEXT, trace, type Context, + type ContextManager, type TextMapGetter, type Tracer, } from '@opentelemetry/api'; +import '@opentelemetry/api'; import { Resource } from '@opentelemetry/resources'; import { type SpanProcessor } from '@opentelemetry/sdk-trace-base'; import { WebTracerProvider } from '@opentelemetry/sdk-trace-web'; import { DisposableSymbols } from '@whatwg-node/disposablestack'; import { type OnRequestEventPayload } from '@whatwg-node/server'; +import type { OnParamsEventPayload, YogaInitialContext } from 'graphql-yoga'; import { ATTR_SERVICE_VERSION, SEMRESATTRS_SERVICE_NAME } from './attributes'; +import { OtelContextStack } from './contextManager'; +import { + getMostSpecificState, + withState, + type GatewayState, + type GraphQLState, + type HttpState, +} from './plugin-utils'; import { - completeHttpSpan, - createGraphQLExecuteSpan, - createGraphQLParseSpan, - createGraphQLValidateSpan, + addExecutionArgsToGraphqlSpan, + createGraphqlContextBuildingSpan, createHttpSpan, - createSubgraphExecuteFetchSpan, - createUpstreamHttpFetchSpan, + endHttpSpan, + startGraphQLExecuteSpan, + startGraphQLParseSpan, + startGraphQLSpan, + startGraphQLValidateSpan, + startSubgraphExecuteFetchSpan, + startUpstreamHttpFetchSpan, } from './spans'; +import { mapMaybePromise } from './utils'; -type PrimitiveOrEvaluated = - | TExpectedResult - | ((input: TInput) => TExpectedResult); +type BooleanOrPredicate = + | boolean + | ((input: TInput) => boolean); interface OpenTelemetryGatewayPluginOptionsWithoutInit { /** * Whether to initialize the OpenTelemetry SDK (default: true). */ initializeNodeSDK: false; + /** + * Whether to rely on OTEL context api for span correlation (default: true). + * If false, the plugin will rely on request context for span parenting, + * which implies that any user defined context and spans will be ignored. + */ + contextManager?: false; } interface OpenTelemetryGatewayPluginOptionsWithInit { @@ -67,6 +90,13 @@ interface OpenTelemetryGatewayPluginOptionsWithInit { * Does not apply when `initializeNodeSDK` is `false`. */ serviceName?: string; + /** + * The context manager used to keep track of the OTEL context. + * By default, it uses AsyncLocalStorage based manager, which is compatible only in Node. + * + * Does not apply when `initializeNodeSDK` is `false`. + */ + contextManager?: ContextManager | false; } type OpenTelemetryGatewayPluginOptionsInit = @@ -107,36 +137,37 @@ export type OpenTelemetryGatewayPluginOptions = * * Disabling the HTTP span will also disable all other child spans. */ - http?: PrimitiveOrEvaluated>; + http?: BooleanOrPredicate>; + /** + * Enable/disable GraphQL operation spans (default: true). + */ + graphql?: BooleanOrPredicate>; + /** + * Enable/disable GraphQL context building phase (default: true). + */ + graphqlContextBuilding?: BooleanOrPredicate< + OnContextBuildingEventPayload + >; /** * Enable/disable GraphQL parse spans (default: true). */ - graphqlParse?: PrimitiveOrEvaluated>; + graphqlParse?: BooleanOrPredicate>; /** * Enable/disable GraphQL validate spans (default: true). */ - graphqlValidate?: PrimitiveOrEvaluated< - boolean, - OnValidateEventPayload - >; + graphqlValidate?: BooleanOrPredicate>; /** * Enable/disable GraphQL execute spans (default: true). */ - graphqlExecute?: PrimitiveOrEvaluated< - boolean, - OnExecuteEventPayload - >; + graphqlExecute?: BooleanOrPredicate>; /** * Enable/disable subgraph execute spans (default: true). */ - subgraphExecute?: PrimitiveOrEvaluated< - boolean, - OnSubgraphExecutePayload - >; + subgraphExecute?: BooleanOrPredicate>; /** * Enable/disable upstream HTTP fetch calls spans (default: true). */ - upstreamFetch?: PrimitiveOrEvaluated>; + upstreamFetch?: BooleanOrPredicate>; }; }; @@ -149,262 +180,374 @@ const HeadersTextMapGetter: TextMapGetter = { }, }; -export function useOpenTelemetry( - options: OpenTelemetryGatewayPluginOptions & { logger: Logger }, -): GatewayPlugin<{ +export type OpenTelemetryContextExtension = { opentelemetry: { tracer: Tracer; activeContext: () => Context; }; -}> { +}; + +type OtelState = { + otel: OtelContextStack; +}; + +type State = Partial< + HttpState & GraphQLState & GatewayState +>; + +export function useOpenTelemetry( + options: OpenTelemetryGatewayPluginOptions & { logger: Logger }, +): GatewayPlugin { const inheritContext = options.inheritContext ?? true; const propagateContext = options.propagateContext ?? true; + const useContextManager = options.contextManager !== false; - const requestContextMapping = new WeakMap(); let tracer: Tracer; let spanProcessors: SpanProcessor[]; - let serviceName: string = 'Gateway'; let provider: WebTracerProvider; - let preparation$: Promise | undefined; - - return { - onYogaInit({ yoga }) { - preparation$ = fakePromise(undefined).then(async () => { - if ( - !( - 'initializeNodeSDK' in options && - options.initializeNodeSDK === false - ) - ) { - if (options.serviceName) { - serviceName = options.serviceName; - } - if (options.exporters) { - spanProcessors = await Promise.all(options.exporters); - } - const webProvider = new WebTracerProvider({ - resource: new Resource({ - [SEMRESATTRS_SERVICE_NAME]: serviceName, - [ATTR_SERVICE_VERSION]: yoga.version, - }), - spanProcessors, - }); - webProvider.register(); - provider = webProvider; - } - const pluginLogger = options.logger.child('OpenTelemetry'); - diag.setLogger( - { - error: (message, ...args) => pluginLogger.error(message, ...args), - warn: (message, ...args) => pluginLogger.warn(message, ...args), - info: (message, ...args) => pluginLogger.info(message, ...args), - debug: (message, ...args) => pluginLogger.debug(message, ...args), - verbose: (message, ...args) => pluginLogger.debug(message, ...args), - }, - DiagLogLevel.VERBOSE, - ); + let preparation$: Promise | undefined | void; + const { promise: asyncAttributes, resolve: resolveAsyncAttributes } = + createDeferred<{ [ATTR_SERVICE_VERSION]: string }>(); + + function isParentEnabled(state: State): boolean { + const parentState = getMostSpecificState(state); + return !parentState || !!parentState.otel; + } + + function getContext(state?: State): Context { + return useContextManager + ? context.active() + : (getMostSpecificState(state)?.otel?.current ?? ROOT_CONTEXT); + } + + const pluginLogger = options.logger.child('OpenTelemetry'); + diag.setLogger( + { + error: (message, ...args) => pluginLogger.error(message, ...args), + warn: (message, ...args) => pluginLogger.warn(message, ...args), + info: (message, ...args) => pluginLogger.info(message, ...args), + debug: (message, ...args) => pluginLogger.debug(message, ...args), + verbose: (message, ...args) => pluginLogger.debug(message, ...args), + }, + DiagLogLevel.VERBOSE, + ); + + if ( + !('initializeNodeSDK' in options && options.initializeNodeSDK === false) + ) { + const exporters$ = containsOnlyValues(options.exporters) + ? options.exporters + : Promise.all(options.exporters); + + const resource = new Resource( + { [SEMRESATTRS_SERVICE_NAME]: options.serviceName || 'Gateway' }, + asyncAttributes, + ); + + const contextManager$ = + options.contextManager != undefined + ? options.contextManager + : import('@opentelemetry/context-async-hooks').then( + (module) => new module.AsyncLocalStorageContextManager(), + ); + + preparation$ = mapMaybePromise(exporters$, (exporters) => { + spanProcessors = exporters; + provider = new WebTracerProvider({ resource, spanProcessors }); + return mapMaybePromise(contextManager$, (contextManager) => { + provider.register({ + contextManager: contextManager === false ? undefined : contextManager, + }); tracer = options.tracer || trace.getTracer('gateway'); preparation$ = undefined; }); + }); + } else { + tracer = options.tracer || trace.getTracer('gateway'); + } + + return withState< + GatewayPlugin, + OtelState, + OtelState, + OtelState + >({ + onYogaInit({ yoga }) { + resolveAsyncAttributes({ [ATTR_SERVICE_VERSION]: yoga.version }); }, - onContextBuilding({ extendContext, context }) { - extendContext({ - opentelemetry: { - tracer, - activeContext: () => - requestContextMapping.get(context.request) ?? context['active'](), + onRequest(onRequestPayload) { + if (!shouldTrace(options.spans?.http, onRequestPayload)) { + return; + } + + return mapMaybePromise( + preparation$, + () => { + const { requestHandler, request, setRequestHandler, state } = + onRequestPayload; + + const { ctx } = createHttpSpan({ + ctx: inheritContext + ? propagation.extract( + context.active(), + request.headers, + HeadersTextMapGetter, + ) + : context.active(), + request, + tracer, + url: onRequestPayload.url, + }); + + if (useContextManager) { + setRequestHandler(context.bind(ctx, requestHandler)); + } + + state.forRequest.otel = new OtelContextStack(ctx); }, - }); + (error) => { + pluginLogger.error('Failed to start http span', { error }); + }, + ); }, - onRequest(onRequestPayload) { - const shouldTraceHttp = - typeof options.spans?.http === 'function' - ? options.spans.http(onRequestPayload) - : (options.spans?.http ?? true); + onResponse({ response, state }) { + try { + state.forRequest.otel && + endHttpSpan(state.forRequest.otel.root, response); + } catch (error) { + pluginLogger.error('Failed to end http span', { error }); + } + }, + onParams(onParamsPayload) { + const { setParamsHandler, paramsHandler, params, state } = + onParamsPayload; + const { forOperation, ...parentState } = state; - if (!shouldTraceHttp) { - return preparation$; + if (!isParentEnabled(parentState)) { + return; } - const { request, url } = onRequestPayload; - const otelContext = inheritContext - ? propagation.extract( - context.active(), - request.headers, - HeadersTextMapGetter, - ) - : context.active(); - - const httpSpan = createHttpSpan({ - request, - url, - tracer, - otelContext, - }); + if (shouldTrace(options.spans?.graphql, onParamsPayload)) { + const { ctx, done } = startGraphQLSpan({ + tracer, + params, + ctx: getContext(parentState), + }); - requestContextMapping.set(request, trace.setSpan(otelContext, httpSpan)); + forOperation.otel! = new OtelContextStack(ctx); - return preparation$; - }, - onValidate(onValidatePayload) { - const shouldTraceValidate = - typeof options.spans?.graphqlValidate === 'function' - ? options.spans.graphqlValidate(onValidatePayload) - : (options.spans?.graphqlValidate ?? true); + const handler = useContextManager + ? context.bind(ctx, paramsHandler) + : paramsHandler; + + setParamsHandler((...args) => { + const result$ = handler(...args); + done(result$); + return result$; + }); + } - const { context } = onValidatePayload; - const otelContext = requestContextMapping.get(context.request); + const gqlCtx = onParamsPayload.context as YogaInitialContext & + OpenTelemetryContextExtension; + gqlCtx.opentelemetry = { + tracer, + activeContext: (): Context => getContext(state), + }; + }, + onContextBuilding(payload) { + const { state } = payload; + if (!isParentEnabled(state)) { + return; + } - if (shouldTraceValidate && otelContext) { - const { done } = createGraphQLValidateSpan({ - otelContext, + if (shouldTrace(options.spans?.graphqlContextBuilding, payload)) { + const { ctx, done } = createGraphqlContextBuildingSpan({ + ctx: getContext(state), tracer, - query: context.params.query, - operationName: context.params.operationName, }); - return ({ result }) => done(result); + state.forOperation.otel!.push(ctx); + + return () => { + done(); + state.forOperation.otel!.pop(); + }; } - return void 0; + + return; }, onParse(onParsePayload) { - const shouldTracePrase = - typeof options.spans?.graphqlParse === 'function' - ? options.spans.graphqlParse(onParsePayload) - : (options.spans?.graphqlParse ?? true); - - const { context } = onParsePayload; - const otelContext = requestContextMapping.get(context.request); + const { state } = onParsePayload; + if (!isParentEnabled(state)) { + return; + } - if (shouldTracePrase && otelContext) { - const { done } = createGraphQLParseSpan({ - otelContext, + if (shouldTrace(options.spans?.graphqlParse, onParsePayload)) { + const { context: gqlCtx, setParseFn, parseFn } = onParsePayload; + const { ctx, done } = startGraphQLParseSpan({ + ctx: getContext(state), tracer, - query: context.params.query, - operationName: context.params.operationName, + operationName: gqlCtx.params.operationName, + query: gqlCtx.params.query?.trim(), }); + state.forOperation.otel!.push(ctx); + if (useContextManager) { + setParseFn(context.bind(ctx, parseFn)); + } + return ({ result }) => { + done(result); + state.forOperation.otel!.pop(); + }; + } - return ({ result }) => done(result); + return; + }, + onValidate(onValidatePayload) { + const { context: gqlCtx, state } = onValidatePayload; + if (!isParentEnabled(state)) { + return; } - return void 0; + + if (shouldTrace(options.spans?.graphqlValidate, onValidatePayload)) { + const { setValidationFn, validateFn } = onValidatePayload; + const { ctx, done } = startGraphQLValidateSpan({ + ctx: getContext(state), + tracer, + query: gqlCtx.params.query?.trim(), + operationName: gqlCtx.params.operationName, + }); + state.forOperation.otel!.push(ctx); + if (useContextManager) { + setValidationFn(context.bind(ctx, validateFn)); + } + return ({ result }) => { + done(result); + state.forOperation.otel!.pop(); + }; + } + + return; }, onExecute(onExecuteArgs) { - const shouldTraceExecute = - typeof options.spans?.graphqlExecute === 'function' - ? options.spans.graphqlExecute(onExecuteArgs) - : (options.spans?.graphqlExecute ?? true); - - const { args } = onExecuteArgs; - const otelContext = requestContextMapping.get(args.contextValue.request); - - if (shouldTraceExecute && otelContext) { - const { done } = createGraphQLExecuteSpan({ - args, - otelContext, + const { state } = onExecuteArgs; + + if (!isParentEnabled(state)) { + return; + } + + addExecutionArgsToGraphqlSpan( + state.forOperation.otel!.root, + onExecuteArgs.args, + ); + + if (shouldTrace(options.spans?.graphqlExecute, onExecuteArgs)) { + const { setExecuteFn, executeFn } = onExecuteArgs; + const { ctx, done } = startGraphQLExecuteSpan({ + ctx: getContext(state), + args: onExecuteArgs.args, tracer, }); - + state.forOperation.otel!.push(ctx); + if (useContextManager) { + setExecuteFn(context.bind(ctx, executeFn)); + } return { - onExecuteDone: ({ result }) => { - if (!isAsyncIterable(result)) { - done(result); - } + onExecuteDone(payload) { + done(payload.result); + state.forOperation.otel!.pop(); }, }; } - return void 0; + + return; }, onSubgraphExecute(onSubgraphPayload) { - const shouldTraceSubgraphExecute = - typeof options.spans?.subgraphExecute === 'function' - ? options.spans.subgraphExecute(onSubgraphPayload) - : (options.spans?.subgraphExecute ?? true); - - const otelContext = onSubgraphPayload.executionRequest.context?.request - ? requestContextMapping.get( - onSubgraphPayload.executionRequest.context.request, - ) - : undefined; - - if (shouldTraceSubgraphExecute && otelContext) { - const { subgraphName, executionRequest } = onSubgraphPayload; - const { done } = createSubgraphExecuteFetchSpan({ - otelContext, + const { state } = onSubgraphPayload; + const { forSubgraphExecution, ...parentState } = state; + + if (!isParentEnabled(parentState)) { + return; + } + + // Here it is possible that otelCtx is not present, because this hook can be triggered by + // internal introspection queries, which are not linked to any client request, but should + // still be traced and monitored. + + if (shouldTrace(options.spans?.subgraphExecute, onSubgraphPayload)) { + const { subgraphName, executionRequest, executor, setExecutor } = + onSubgraphPayload; + const { ctx, done } = startSubgraphExecuteFetchSpan({ + ctx: getContext(parentState), tracer, executionRequest, subgraphName, }); - - return done; + forSubgraphExecution.otel = new OtelContextStack(ctx); + if (useContextManager) { + setExecutor(context.bind(ctx, executor)); + } + return ({ result }) => { + done(result); + forSubgraphExecution.otel!.pop(); + }; } - return void 0; + + return; }, onFetch(onFetchPayload) { - const shouldTraceFetch = - typeof options.spans?.upstreamFetch === 'function' - ? options.spans.upstreamFetch(onFetchPayload) - : (options.spans?.upstreamFetch ?? true); - - const { - context, - options: fetchOptions, - url, - setOptions, - executionRequest, - } = onFetchPayload; - - const otelContext = requestContextMapping.get(context.request); - if (shouldTraceFetch && otelContext) { - if (propagateContext) { - const reqHeaders = getHeadersObj(fetchOptions.headers || {}); - propagation.inject(otelContext, reqHeaders); - - setOptions({ - ...fetchOptions, - headers: reqHeaders, - }); - } + const { state, setFetchFn } = onFetchPayload; + if (!isParentEnabled(state)) { + return; + } + + // Here it is possible that otelCtx is not present, because this hook can be triggered by + // internal introspection queries, which are not linked to any client request, but should + // still be traced and monitored. + + let { fetchFn } = onFetchPayload; + const originalFetch = fetchFn; + let onDone: OnFetchHookDone | undefined = void 0; + + if (propagateContext) { + fetchFn = (url, options, ...args) => { + const reqHeaders = getHeadersObj(options?.headers || {}); + propagation.inject(getContext(state), reqHeaders); + return originalFetch( + url, + { ...options, headers: reqHeaders }, + ...args, + ); + }; + } + + if (shouldTrace(options.spans?.upstreamFetch, onFetchPayload)) { + const { url, options, executionRequest } = onFetchPayload; - const { done } = createUpstreamHttpFetchSpan({ - otelContext, + const { ctx, done } = startUpstreamHttpFetchSpan({ + ctx: getContext(state), tracer, url, - fetchOptions, + fetchOptions: options, executionRequest, }); - - return (fetchDonePayload) => done(fetchDonePayload.response); - } - return void 0; - }, - onResponse({ request, response }) { - const otelContext = requestContextMapping.get(request); - if (!otelContext) { - return; + state.forSubgraphExecution?.otel?.push(ctx); + if (useContextManager) { + fetchFn = context.bind(ctx, fetchFn); + } + onDone = ({ response }) => { + done(response); + state.forSubgraphExecution?.otel?.pop(); + }; } - const rootSpan = trace.getSpan(otelContext); + setFetchFn(fetchFn); - if (rootSpan) { - completeHttpSpan(rootSpan, response); - } - - requestContextMapping.delete(request); + return onDone; }, async [DisposableSymbols.asyncDispose]() { - if (spanProcessors) { - await Promise.all( - spanProcessors.map((processor) => processor.forceFlush()), - ); - } await provider?.forceFlush?.(); - - if (spanProcessors) { - spanProcessors.forEach((processor) => processor.shutdown()); - } - await provider?.shutdown?.(); diag.disable(); @@ -412,5 +555,24 @@ export function useOpenTelemetry( context.disable(); propagation.disable(); }, - }; + }); +} + +function containsOnlyValues( + maybePromises: MaybePromise[], +): maybePromises is T[] { + return !maybePromises.some(isPromise); +} + +function shouldTrace( + value: BooleanOrPredicate | null | undefined, + args: Args, +): boolean { + if (value == null) { + return true; + } + if (typeof value === 'function') { + return value(args); + } + return value; } diff --git a/packages/plugins/opentelemetry/src/spans.ts b/packages/plugins/opentelemetry/src/spans.ts index 11fc2afad..975d0ed77 100644 --- a/packages/plugins/opentelemetry/src/spans.ts +++ b/packages/plugins/opentelemetry/src/spans.ts @@ -1,17 +1,21 @@ import { defaultPrintFn } from '@graphql-mesh/transport-common'; import { getOperationASTFromDocument, + isAsyncIterable, + mapMaybePromise, type ExecutionRequest, type ExecutionResult, + type MaybePromise, } from '@graphql-tools/utils'; import { SpanKind, SpanStatusCode, + trace, type Context, - type Span, type Tracer, } from '@opentelemetry/api'; import type { ExecutionArgs } from 'graphql'; +import type { GraphQLParams } from 'graphql-yoga'; import { SEMATTRS_GATEWAY_UPSTREAM_SUBGRAPH_NAME, SEMATTRS_GRAPHQL_DOCUMENT, @@ -30,55 +34,144 @@ import { } from './attributes'; export function createHttpSpan(input: { + ctx: Context; tracer: Tracer; request: Request; url: URL; - otelContext: Context; -}): Span { - const { url, request, tracer, otelContext } = input; - const path = url.pathname; - const userAgent = request.headers.get('user-agent'); - const ips = request.headers.get('x-forwarded-for'); - const method = request.method || 'GET'; - const host = url.host || request.headers.get('host'); - const hostname = url.hostname || host || 'localhost'; - const rootSpanName = `${method} ${path}`; +}): { ctx: Context } { + const { url, request, tracer } = input; - return tracer.startSpan( - rootSpanName, + const span = tracer.startSpan( + `${request.method || 'GET'} ${url.pathname}`, { attributes: { - [SEMATTRS_HTTP_METHOD]: method, + [SEMATTRS_HTTP_METHOD]: request.method || 'GET', [SEMATTRS_HTTP_URL]: request.url, - [SEMATTRS_HTTP_ROUTE]: path, + [SEMATTRS_HTTP_ROUTE]: url.pathname, [SEMATTRS_HTTP_SCHEME]: url.protocol, - [SEMATTRS_NET_HOST_NAME]: hostname, - [SEMATTRS_HTTP_HOST]: host || undefined, - [SEMATTRS_HTTP_CLIENT_IP]: ips?.split(',')[0], - [SEMATTRS_HTTP_USER_AGENT]: userAgent || undefined, + [SEMATTRS_NET_HOST_NAME]: + url.hostname || + url.host || + request.headers.get('host') || + 'localhost', + [SEMATTRS_HTTP_HOST]: + url.host || request.headers.get('host') || undefined, + [SEMATTRS_HTTP_CLIENT_IP]: request.headers + .get('x-forwarded-for') + ?.split(',')[0], + [SEMATTRS_HTTP_USER_AGENT]: + request.headers.get('user-agent') || undefined, }, kind: SpanKind.SERVER, }, - otelContext, + input.ctx, ); + + return { + ctx: trace.setSpan(input.ctx, span), + }; +} + +export function endHttpSpan(ctx: Context, response: Response) { + const span = trace.getSpan(ctx); + if (span) { + span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); + span.setStatus({ + code: response.ok ? SpanStatusCode.OK : SpanStatusCode.ERROR, + message: response.ok ? undefined : response.statusText, + }); + span.end(); + } +} + +export function startGraphQLSpan(input: { + ctx: Context; + tracer: Tracer; + params: GraphQLParams; +}): { + ctx: Context; + done: ( + result: MaybePromise>, + ) => void; +} { + const operationName = input.params.operationName ?? 'unknown'; + const span = input.tracer.startSpan( + `graphql.operation ${operationName}`, + { + attributes: { + [SEMATTRS_GRAPHQL_DOCUMENT]: input.params.query, + [SEMATTRS_GRAPHQL_OPERATION_NAME]: operationName, + }, + kind: SpanKind.INTERNAL, + }, + input.ctx, + ); + + return { + ctx: trace.setSpan(input.ctx, span), + done: (result$) => { + return mapMaybePromise( + result$, + () => { + //FIXME: handle async iterable results. + span.end(); + }, + (err) => { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.message, + }); + span.recordException(err); + span.end(); + throw err; + }, + ); + }, + }; +} + +export function addExecutionArgsToGraphqlSpan( + ctx: Context, + args: ExecutionArgs, +) { + const span = trace.getSpan(ctx); + if (span) { + const operation = getOperationASTFromDocument( + args.document, + args.operationName || undefined, + ); + const operationName = operation.name?.value ?? 'Anonymous'; + const document = defaultPrintFn(args.document); + span.setAttribute(SEMATTRS_GRAPHQL_OPERATION_TYPE, operation.operation); + span.setAttribute(SEMATTRS_GRAPHQL_OPERATION_NAME, operationName); + span.setAttribute(SEMATTRS_GRAPHQL_DOCUMENT, document); + span.updateName(`graphql.operation ${operationName}`); + } } -export function completeHttpSpan(span: Span, response: Response) { - span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); - span.setStatus({ - code: response.ok ? SpanStatusCode.OK : SpanStatusCode.ERROR, - message: response.ok ? undefined : response.statusText, - }); - span.end(); +export function createGraphqlContextBuildingSpan(input: { + ctx: Context; + tracer: Tracer; +}): { ctx: Context; done: () => void } { + const span = input.tracer.startSpan( + 'graphql.context', + { kind: SpanKind.INTERNAL }, + input.ctx, + ); + + return { + ctx: trace.setSpan(input.ctx, span), + done: () => span.end(), + }; } -export function createGraphQLParseSpan(input: { - otelContext: Context; +export function startGraphQLParseSpan(input: { + ctx: Context; tracer: Tracer; query?: string; operationName?: string; -}) { - const parseSpan = input.tracer.startSpan( +}): { ctx: Context; done: (result: unknown) => void } { + const span = input.tracer.startSpan( 'graphql.parse', { attributes: { @@ -87,33 +180,32 @@ export function createGraphQLParseSpan(input: { }, kind: SpanKind.INTERNAL, }, - input.otelContext, + input.ctx, ); return { - parseSpan, - done: (result: any | Error | null) => { + ctx: trace.setSpan(input.ctx, span), + done: (result) => { if (result instanceof Error) { - parseSpan.setAttribute(SEMATTRS_GRAPHQL_ERROR_COUNT, 1); - parseSpan.recordException(result); - parseSpan.setStatus({ + span.setAttribute(SEMATTRS_GRAPHQL_ERROR_COUNT, 1); + span.recordException(result); + span.setStatus({ code: SpanStatusCode.ERROR, message: result.message, }); } - - parseSpan.end(); + span.end(); }, }; } -export function createGraphQLValidateSpan(input: { - otelContext: Context; +export function startGraphQLValidateSpan(input: { + ctx: Context; tracer: Tracer; query?: string; operationName?: string; -}) { - const validateSpan = input.tracer.startSpan( +}): { ctx: Context; done: (result: any[] | readonly Error[]) => void } { + const span = input.tracer.startSpan( 'graphql.validate', { attributes: { @@ -122,89 +214,94 @@ export function createGraphQLValidateSpan(input: { }, kind: SpanKind.INTERNAL, }, - input.otelContext, + input.ctx, ); - return { - validateSpan, - done: (result: any[] | readonly Error[]) => { + ctx: trace.setSpan(input.ctx, span), + done: (result) => { if (result instanceof Error) { - validateSpan.setStatus({ + span.setStatus({ code: SpanStatusCode.ERROR, message: result.message, }); } else if (Array.isArray(result) && result.length > 0) { - validateSpan.setAttribute(SEMATTRS_GRAPHQL_ERROR_COUNT, result.length); - validateSpan.setStatus({ + span.setAttribute(SEMATTRS_GRAPHQL_ERROR_COUNT, result.length); + span.setStatus({ code: SpanStatusCode.ERROR, message: result.map((e) => e.message).join(', '), }); for (const error in result) { - validateSpan.recordException(error); + span.recordException(error); } } - - validateSpan.end(); + span.end(); }, }; } -export function createGraphQLExecuteSpan(input: { +export function startGraphQLExecuteSpan(input: { + ctx: Context; args: ExecutionArgs; - otelContext: Context; tracer: Tracer; -}) { +}): { + ctx: Context; + done: ( + result: ExecutionResult | AsyncIterableIterator, + ) => void; +} { const operation = getOperationASTFromDocument( input.args.document, input.args.operationName || undefined, ); - const executeSpan = input.tracer.startSpan( + const operationName = operation.name?.value ?? 'Anonymous'; + const document = defaultPrintFn(input.args.document); + const span = input.tracer.startSpan( 'graphql.execute', { attributes: { [SEMATTRS_GRAPHQL_OPERATION_TYPE]: operation.operation, - [SEMATTRS_GRAPHQL_OPERATION_NAME]: - input.args.operationName || undefined, - [SEMATTRS_GRAPHQL_DOCUMENT]: defaultPrintFn(input.args.document), + [SEMATTRS_GRAPHQL_OPERATION_NAME]: operationName, + [SEMATTRS_GRAPHQL_DOCUMENT]: document, }, kind: SpanKind.INTERNAL, }, - input.otelContext, + input.ctx, ); return { - executeSpan, - done: (result: ExecutionResult) => { - if (result.errors && result.errors.length > 0) { - executeSpan.setAttribute( - SEMATTRS_GRAPHQL_ERROR_COUNT, - result.errors.length, - ); - executeSpan.setStatus({ + ctx: trace.setSpan(input.ctx, span), + done: (result) => { + if ( + !isAsyncIterable(result) && // FIXME: Handle async iterable too + result.errors && + result.errors.length > 0 + ) { + span.setAttribute(SEMATTRS_GRAPHQL_ERROR_COUNT, result.errors.length); + span.setStatus({ code: SpanStatusCode.ERROR, message: result.errors.map((e) => e.message).join(', '), }); - for (const error in result.errors) { - executeSpan.recordException(error); + for (const error of result.errors) { + span.recordException(error); } } - - executeSpan.end(); + span.end(); }, }; } -export const subgraphExecReqSpanMap = new WeakMap(); - -export function createSubgraphExecuteFetchSpan(input: { - otelContext: Context; +export function startSubgraphExecuteFetchSpan(input: { + ctx: Context; tracer: Tracer; executionRequest: ExecutionRequest; subgraphName: string; -}) { - const subgraphExecuteSpan = input.tracer.startSpan( +}): { + ctx: Context; + done: (result: unknown) => void; +} { + const span = input.tracer.startSpan( `subgraph.execute (${input.subgraphName})`, { attributes: { @@ -220,69 +317,50 @@ export function createSubgraphExecuteFetchSpan(input: { }, kind: SpanKind.CLIENT, }, - input.otelContext, + input.ctx, ); - subgraphExecReqSpanMap.set(input.executionRequest, subgraphExecuteSpan); - return { - done() { - subgraphExecuteSpan.end(); + ctx: trace.setSpan(input.ctx, span), + done: () => { + return span.end(); }, }; } -export function createUpstreamHttpFetchSpan(input: { - otelContext: Context; +export function startUpstreamHttpFetchSpan(input: { + ctx: Context; tracer: Tracer; url: string; fetchOptions: RequestInit; executionRequest?: ExecutionRequest; -}) { +}): { ctx: Context; done: (response: Response) => void } { const urlObj = new URL(input.url); - - const attributes = { - [SEMATTRS_HTTP_METHOD]: input.fetchOptions.method, - [SEMATTRS_HTTP_URL]: input.url, - [SEMATTRS_NET_HOST_NAME]: urlObj.hostname, - [SEMATTRS_HTTP_HOST]: urlObj.host, - [SEMATTRS_HTTP_ROUTE]: urlObj.pathname, - [SEMATTRS_HTTP_SCHEME]: urlObj.protocol, - }; - - let fetchSpan: Span | undefined; - let isOrigSpan: boolean; - - if (input.executionRequest) { - fetchSpan = subgraphExecReqSpanMap.get(input.executionRequest); - if (fetchSpan) { - isOrigSpan = false; - fetchSpan.setAttributes(attributes); - } - } - - if (!fetchSpan) { - fetchSpan = input.tracer.startSpan( - 'http.fetch', - { - attributes, - kind: SpanKind.CLIENT, + const span = input.tracer.startSpan( + 'http.fetch', + { + attributes: { + [SEMATTRS_HTTP_METHOD]: input.fetchOptions.method, + [SEMATTRS_HTTP_URL]: input.url, + [SEMATTRS_NET_HOST_NAME]: urlObj.hostname, + [SEMATTRS_HTTP_HOST]: urlObj.host, + [SEMATTRS_HTTP_ROUTE]: urlObj.pathname, + [SEMATTRS_HTTP_SCHEME]: urlObj.protocol, }, - input.otelContext, - ); - isOrigSpan = true; - } + kind: SpanKind.CLIENT, + }, + input.ctx, + ); return { - done: (response: Response) => { - fetchSpan.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); - fetchSpan.setStatus({ + ctx: trace.setSpan(input.ctx, span), + done: (response) => { + span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); + span.setStatus({ code: response.ok ? SpanStatusCode.OK : SpanStatusCode.ERROR, message: response.ok ? undefined : response.statusText, }); - if (isOrigSpan) { - fetchSpan.end(); - } + span.end(); }, }; } diff --git a/packages/plugins/opentelemetry/src/utils.ts b/packages/plugins/opentelemetry/src/utils.ts new file mode 100644 index 000000000..eff7a3581 --- /dev/null +++ b/packages/plugins/opentelemetry/src/utils.ts @@ -0,0 +1,48 @@ +import { isPromise } from 'util/types'; +import { + isPromise as isPromiseLike, + mapMaybePromise as mapMaybePromiseLike, + type MaybePromise, +} from '@graphql-tools/utils'; + +function mapMaybePromise( + value: Promise | T, + mapper: (value: T) => Promise | R, + errorMapper?: (err: unknown) => Promise | R, +): Promise | R { + const res$ = mapMaybePromiseLike(value, mapper, errorMapper); + if (isPromiseLike(res$)) { + return toPromise(res$); + } + return res$; +} +export { mapMaybePromise, mapMaybePromiseLike }; + +export function toPromise(mp: MaybePromise): Promise { + if (isPromise(mp)) { + return mp as Promise; + } + if (isPromiseLike(mp)) { + return { + then: (onfullfilled, onrejected) => + toPromise(mp.then(onfullfilled, onrejected)), + catch: (onrejected) => toPromise(mp.then(null, onrejected)), + finally: (onfinally) => { + return toPromise( + mp.then( + (res) => { + onfinally?.(); + return res; + }, + (err) => { + onfinally?.(); + throw err; + }, + ), + ); + }, + [Symbol.toStringTag]: 'Promise', + }; + } + return Promise.resolve(mp); +}