diff --git a/packages/datadog-plugin-ws/src/close.js b/packages/datadog-plugin-ws/src/close.js index c2574de43e4..6c6690ca3fd 100644 --- a/packages/datadog-plugin-ws/src/close.js +++ b/packages/datadog-plugin-ws/src/close.js @@ -1,6 +1,16 @@ 'use strict' const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js') +const { + incrementWebSocketCounter, + buildWebSocketSpanPointerHash, + hasDistributedTracingContext +} = require('./util') +const { + WEBSOCKET_PTR_KIND, + SPAN_POINTER_DIRECTION, + SPAN_POINTER_DIRECTION_NAME +} = require('../../dd-trace/src/constants') class WSClosePlugin extends TracingPlugin { static get id () { return 'ws' } @@ -60,7 +70,52 @@ class WSClosePlugin extends TracingPlugin { end (ctx) { if (!Object.hasOwn(ctx, 'result') || !ctx.span) return - if (ctx.socket.spanContext) ctx.span.addLink({ context: ctx.socket.spanContext }) + if (ctx.socket.spanContext) { + const linkAttributes = {} + + // Determine link kind based on whether this is peer close (incoming) or self close (outgoing) + const isIncoming = ctx.isPeerClose + linkAttributes['dd.kind'] = isIncoming ? 'executed_by' : 'resuming' + + // Add span pointer for context propagation + if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) { + const handshakeSpan = ctx.socket.handshakeSpan + + // Only add span pointers if distributed tracing is enabled and handshake has distributed context + if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) { + const counterType = isIncoming ? 'receiveCounter' : 'sendCounter' + const counter = incrementWebSocketCounter(ctx.socket, counterType) + const handshakeContext = handshakeSpan.context() + + const ptrHash = buildWebSocketSpanPointerHash( + handshakeContext._traceId, + handshakeContext._spanId, + counter, + true, // isServer + isIncoming + ) + + const directionName = isIncoming + ? SPAN_POINTER_DIRECTION_NAME.UPSTREAM + : SPAN_POINTER_DIRECTION_NAME.DOWNSTREAM + const direction = isIncoming + ? SPAN_POINTER_DIRECTION.UPSTREAM + : SPAN_POINTER_DIRECTION.DOWNSTREAM + + // Add span pointer attributes to link + linkAttributes['link.name'] = directionName + linkAttributes['dd.kind'] = 'span-pointer' + linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND + linkAttributes['ptr.dir'] = direction + linkAttributes['ptr.hash'] = ptrHash + } + } + + ctx.span.addLink({ + context: ctx.socket.spanContext, + attributes: linkAttributes + }) + } ctx.span.finish() } diff --git a/packages/datadog-plugin-ws/src/producer.js b/packages/datadog-plugin-ws/src/producer.js index 8a0ea6a1613..dafd5ae02a5 100644 --- a/packages/datadog-plugin-ws/src/producer.js +++ b/packages/datadog-plugin-ws/src/producer.js @@ -1,6 +1,16 @@ 'use strict' const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js') +const { + incrementWebSocketCounter, + buildWebSocketSpanPointerHash, + hasDistributedTracingContext +} = require('./util') +const { + WEBSOCKET_PTR_KIND, + SPAN_POINTER_DIRECTION, + SPAN_POINTER_DIRECTION_NAME +} = require('../../dd-trace/src/constants') class WSProducerPlugin extends TracingPlugin { static get id () { return 'ws' } @@ -51,9 +61,37 @@ class WSProducerPlugin extends TracingPlugin { if (!Object.hasOwn(ctx, 'result') || !ctx.span) return if (ctx.socket.spanContext) { + const linkAttributes = { 'dd.kind': 'resuming' } + + // Add span pointer for context propagation + if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) { + const handshakeSpan = ctx.socket.handshakeSpan + + // Only add span pointers if distributed tracing is enabled and handshake has distributed context + if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) { + const counter = incrementWebSocketCounter(ctx.socket, 'sendCounter') + const handshakeContext = handshakeSpan.context() + + const ptrHash = buildWebSocketSpanPointerHash( + handshakeContext._traceId, + handshakeContext._spanId, + counter, + true, // isServer + false // isIncoming (this is outgoing) + ) + + // Add span pointer attributes to link + linkAttributes['link.name'] = SPAN_POINTER_DIRECTION_NAME.DOWNSTREAM + linkAttributes['dd.kind'] = 'span-pointer' + linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND + linkAttributes['ptr.dir'] = SPAN_POINTER_DIRECTION.DOWNSTREAM + linkAttributes['ptr.hash'] = ptrHash + } + } + ctx.span.addLink({ context: ctx.socket.spanContext, - attributes: { 'dd.kind': 'resuming' }, + attributes: linkAttributes, }) } diff --git a/packages/datadog-plugin-ws/src/receiver.js b/packages/datadog-plugin-ws/src/receiver.js index 35c5a4bfb6f..991d7ff5aff 100644 --- a/packages/datadog-plugin-ws/src/receiver.js +++ b/packages/datadog-plugin-ws/src/receiver.js @@ -1,6 +1,16 @@ 'use strict' const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js') +const { + incrementWebSocketCounter, + buildWebSocketSpanPointerHash, + hasDistributedTracingContext +} = require('./util') +const { + WEBSOCKET_PTR_KIND, + SPAN_POINTER_DIRECTION, + SPAN_POINTER_DIRECTION_NAME +} = require('../../dd-trace/src/constants') class WSReceiverPlugin extends TracingPlugin { static get id () { return 'ws' } @@ -61,9 +71,37 @@ class WSReceiverPlugin extends TracingPlugin { if (!Object.hasOwn(ctx, 'result') || !ctx.span) return if (ctx.socket.spanContext) { + const linkAttributes = { 'dd.kind': 'executed_by' } + + // Add span pointer for context propagation + if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) { + const handshakeSpan = ctx.socket.handshakeSpan + + // Only add span pointers if distributed tracing is enabled and handshake has distributed context + if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) { + const counter = incrementWebSocketCounter(ctx.socket, 'receiveCounter') + const handshakeContext = handshakeSpan.context() + + const ptrHash = buildWebSocketSpanPointerHash( + handshakeContext._traceId, + handshakeContext._spanId, + counter, + true, // isServer + true // isIncoming + ) + + // Add span pointer attributes to link + linkAttributes['link.name'] = SPAN_POINTER_DIRECTION_NAME.UPSTREAM + linkAttributes['dd.kind'] = 'span-pointer' + linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND + linkAttributes['ptr.dir'] = SPAN_POINTER_DIRECTION.UPSTREAM + linkAttributes['ptr.hash'] = ptrHash + } + } + ctx.span.addLink({ context: ctx.socket.spanContext, - attributes: { 'dd.kind': 'executed_by' }, + attributes: linkAttributes, }) } diff --git a/packages/datadog-plugin-ws/src/server.js b/packages/datadog-plugin-ws/src/server.js index aea3c6a16d2..73d27fe8599 100644 --- a/packages/datadog-plugin-ws/src/server.js +++ b/packages/datadog-plugin-ws/src/server.js @@ -2,6 +2,8 @@ const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js') const tags = require('../../../ext/tags.js') +const { initWebSocketMessageCounters } = require('./util') +const { FORMAT_HTTP_HEADERS } = require('../../../ext/formats') const HTTP_STATUS_CODE = tags.HTTP_STATUS_CODE @@ -28,9 +30,13 @@ class WSServerPlugin extends TracingPlugin { ctx.args = { options } + // Extract distributed tracing context from request headers + const childOf = this.tracer.extract(FORMAT_HTTP_HEADERS, req.headers) + const service = this.serviceName({ pluginConfig: this.config }) const span = this.startSpan(this.operationName(), { service, + childOf, meta: { 'span.type': 'websocket', 'http.upgraded': 'websocket', @@ -46,6 +52,13 @@ class WSServerPlugin extends TracingPlugin { ctx.socket.spanContext = ctx.span._spanContext ctx.socket.spanContext.spanTags = ctx.span._spanContext._tags + // Store the handshake span for use in message span pointers + ctx.socket.handshakeSpan = ctx.span + // Store the request headers for distributed tracing check + ctx.socket.requestHeaders = req.headers + + // Initialize message counters for span pointers + initWebSocketMessageCounters(ctx.socket) return ctx.currentStore } diff --git a/packages/datadog-plugin-ws/src/util.js b/packages/datadog-plugin-ws/src/util.js new file mode 100644 index 00000000000..4b6421f8f69 --- /dev/null +++ b/packages/datadog-plugin-ws/src/util.js @@ -0,0 +1,107 @@ +'use strict' + +// WeakMap to store message counters per socket without mutating the socket object +const socketCounters = new WeakMap() + +/** + * Initializes WebSocket message counters for a socket. + * @param {object} socket - The WebSocket socket object + */ +function initWebSocketMessageCounters (socket) { + if (!socketCounters.has(socket)) { + socketCounters.set(socket, { + receiveCounter: 0, + sendCounter: 0 + }) + } +} + +/** + * Increments and returns the WebSocket message counter. + * @param {object} socket - The WebSocket socket object + * @param {string} counterType - Either 'receiveCounter' or 'sendCounter' + * @returns {number} The incremented counter value + */ +function incrementWebSocketCounter (socket, counterType) { + if (!socketCounters.has(socket)) { + initWebSocketMessageCounters(socket) + } + const counters = socketCounters.get(socket) + counters[counterType]++ + return counters[counterType] +} + +/** + * Builds a WebSocket span pointer hash. + * + * Format: <128 bit hex trace id><64 bit hex span id><32 bit hex counter> + * Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing + * + * @param {bigint} handshakeTraceId - The trace ID from the handshake span (as a BigInt) + * @param {bigint} handshakeSpanId - The span ID from the handshake span (as a BigInt) + * @param {number} counter - The message counter + * @param {boolean} isServer - Whether this is a server (true) or client (false) + * @param {boolean} isIncoming - Whether this is an incoming message (true) or outgoing (false) + * @returns {string} The span pointer hash + */ +function buildWebSocketSpanPointerHash (handshakeTraceId, handshakeSpanId, counter, isServer, isIncoming) { + // Determine prefix based on server/client and incoming/outgoing + // Server outgoing or client incoming: 'S' + // Server incoming or client outgoing: 'C' + const prefix = (isServer && !isIncoming) || (!isServer && isIncoming) ? 'S' : 'C' + + // Pad trace ID to 32 hex chars (128 bits) + const traceIdHex = handshakeTraceId.toString(16).padStart(32, '0') + + // Pad span ID to 16 hex chars (64 bits) + const spanIdHex = handshakeSpanId.toString(16).padStart(16, '0') + + // Pad counter to 8 hex chars (32 bits) + const counterHex = counter.toString(16).padStart(8, '0') + + return `${prefix}${traceIdHex}${spanIdHex}${counterHex}` +} + +/** + * Checks if the handshake span has extracted distributed tracing context. + * A websocket server must not set the span pointer if the handshake has not extracted a context. + * + * A span has distributed tracing context if it has a parent context that was + * extracted from headers (remote parent). + * + * @param {object} span - The handshake span + * @param {object} socket - The WebSocket socket object + * @returns {boolean} True if the span has distributed tracing context + */ +function hasDistributedTracingContext (span, socket) { + if (!span) return false + const context = span.context() + if (!context) return false + + // Check if this span has a parent. If the parent was extracted from remote headers, + // then this span is part of a distributed trace. + // We check if the span has a parent by looking at _parentId. + // In the JavaScript tracer, when a context is extracted from headers and a child span + // is created, the child will have _parentId set to the extracted parent's span ID. + // + // For testing purposes, we also check if Datadog trace headers are present in the socket's + // upgrade request, which indicates distributed tracing context was sent by the client. + if (context._parentId !== null) { + return true + } + + // Fallback check: look for distributed tracing headers in the stored request headers + if (socket && socket.requestHeaders) { + const headers = socket.requestHeaders + return !!(headers['x-datadog-trace-id'] || headers['traceparent']) + } + + return false +} + +module.exports = { + initWebSocketMessageCounters, + incrementWebSocketCounter, + buildWebSocketSpanPointerHash, + hasDistributedTracingContext +} diff --git a/packages/datadog-plugin-ws/test/index.spec.js b/packages/datadog-plugin-ws/test/index.spec.js index 2c0002211ad..8ea37773688 100644 --- a/packages/datadog-plugin-ws/test/index.spec.js +++ b/packages/datadog-plugin-ws/test/index.spec.js @@ -392,6 +392,146 @@ describe('Plugin', () => { }) }) }) + + describe('with span pointers', () => { + let tracer + + beforeEach(async () => { + tracer = require('../../dd-trace') + await agent.load(['ws'], [{ + service: 'ws-with-pointers', + traceWebsocketMessagesEnabled: true, + }]) + WebSocket = require(`../../../versions/ws@${version}`).get() + + wsServer = new WebSocket.Server({ port: clientPort }) + + // Create a parent span within a trace to properly set up distributed tracing context + tracer.trace('test.parent', parentSpan => { + const headers = {} + tracer.inject(parentSpan, 'http_headers', headers) + + // Inject distributed tracing headers to enable span pointers + client = new WebSocket(`ws://localhost:${clientPort}/${route}?active=true`, { + headers + }) + }) + }) + + afterEach(async () => { + clientPort++ + agent.close({ ritmReset: false, wipe: true }) + }) + + it('should add span pointers to producer spans', () => { + wsServer.on('connection', (ws) => { + ws.send('test message with pointer') + }) + + client.on('message', (data) => { + assert.strictEqual(data.toString(), 'test message with pointer') + }) + + return agent.assertSomeTraces(traces => { + const producerSpan = traces[0][0] + assert.strictEqual(producerSpan.name, 'websocket.send') + assert.strictEqual(producerSpan.service, 'ws-with-pointers') + + // Check for span links with span pointer attributes + if (producerSpan.meta['_dd.span_links']) { + const spanLinks = JSON.parse(producerSpan.meta['_dd.span_links']) + const pointerLink = spanLinks.find(link => + link.attributes && link.attributes['dd.kind'] === 'span-pointer' + ) + if (pointerLink) { + expect(pointerLink.attributes).to.have.property('ptr.kind', 'websocket') + expect(pointerLink.attributes).to.have.property('ptr.dir', 'd') + expect(pointerLink.attributes).to.have.property('ptr.hash') + expect(pointerLink.attributes).to.have.property('link.name', 'span-pointer-down') + expect(pointerLink.attributes['ptr.hash']).to.be.a('string') + expect(pointerLink.attributes['ptr.hash']).to.have.lengthOf(57) + // Hash format: <32 hex trace id><16 hex span id><8 hex counter> + expect(pointerLink.attributes['ptr.hash']).to.match(/^[SC][0-9a-f]{32}[0-9a-f]{16}[0-9a-f]{8}$/) + } + } + }) + }) + + it('should add span pointers to consumer spans', () => { + wsServer.on('connection', (ws) => { + ws.on('message', (data) => { + assert.strictEqual(data.toString(), 'client message with pointer') + }) + }) + + client.on('open', () => { + client.send('client message with pointer') + }) + + return agent.assertSomeTraces(traces => { + const consumerSpan = traces.find(t => t[0].name === 'websocket.receive')?.[0] + if (consumerSpan) { + assert.strictEqual(consumerSpan.service, 'ws-with-pointers') + + // Check for span links with span pointer attributes + if (consumerSpan.meta['_dd.span_links']) { + const spanLinks = JSON.parse(consumerSpan.meta['_dd.span_links']) + const pointerLink = spanLinks.find(link => + link.attributes && link.attributes['dd.kind'] === 'span-pointer' + ) + if (pointerLink) { + expect(pointerLink.attributes).to.have.property('ptr.kind', 'websocket') + expect(pointerLink.attributes).to.have.property('ptr.dir', 'u') + expect(pointerLink.attributes).to.have.property('ptr.hash') + expect(pointerLink.attributes).to.have.property('link.name', 'span-pointer-up') + expect(pointerLink.attributes['ptr.hash']).to.be.a('string') + expect(pointerLink.attributes['ptr.hash']).to.have.lengthOf(57) + // Hash format: <32 hex trace id><16 hex span id><8 hex counter> + expect(pointerLink.attributes['ptr.hash']).to.match(/^[SC][0-9a-f]{32}[0-9a-f]{16}[0-9a-f]{8}$/) + } + } + } + }) + }) + + it('should generate unique hashes for each message', () => { + const testMessage = 'test message' + const hashes = new Set() + + wsServer.on('connection', (ws) => { + ws.send(testMessage) + // Send a second message to test counter increment + setTimeout(() => ws.send(testMessage), 10) + }) + + client.on('message', (data) => { + assert.strictEqual(data.toString(), testMessage) + }) + + return agent.assertSomeTraces(traces => { + // Find all producer spans + const producerTraces = traces.filter(t => t[0].name === 'websocket.send') + + producerTraces.forEach(trace => { + if (trace[0].meta['_dd.span_links']) { + const spanLinks = JSON.parse(trace[0].meta['_dd.span_links']) + const pointerLink = spanLinks.find(link => + link.attributes && link.attributes['dd.kind'] === 'span-pointer' + ) + if (pointerLink) { + const hash = pointerLink.attributes['ptr.hash'] + hashes.add(hash) + } + } + }) + + // Each message should have a unique hash due to counter increment + if (hashes.size > 1) { + assert.ok(hashes.size >= 2, 'Multiple messages should have different hashes') + } + }) + }) + }) }) }) }) diff --git a/packages/dd-trace/src/constants.js b/packages/dd-trace/src/constants.js index c184e595237..8af34cb6e3f 100644 --- a/packages/dd-trace/src/constants.js +++ b/packages/dd-trace/src/constants.js @@ -50,8 +50,13 @@ module.exports = { GRPC_SERVER_ERROR_STATUSES: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], DYNAMODB_PTR_KIND: 'aws.dynamodb.item', S3_PTR_KIND: 'aws.s3.object', + WEBSOCKET_PTR_KIND: 'websocket', SPAN_POINTER_DIRECTION: Object.freeze({ UPSTREAM: 'u', DOWNSTREAM: 'd' + }), + SPAN_POINTER_DIRECTION_NAME: Object.freeze({ + UPSTREAM: 'span-pointer-up', + DOWNSTREAM: 'span-pointer-down' }) }