Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion packages/datadog-plugin-ws/src/close.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
'use strict'

const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
const {
incrementWebSocketCounter,
buildWebSocketSpanPointerHash,
hasDistributedTracingContext
} = require('./util')
const {
WEBSOCKET_PTR_KIND,
SPAN_POINTER_DIRECTION,
SPAN_POINTER_DIRECTION_NAME
} = require('../../dd-trace/src/constants')

class WSClosePlugin extends TracingPlugin {
static get id () { return 'ws' }
Expand Down Expand Up @@ -60,7 +70,52 @@ class WSClosePlugin extends TracingPlugin {
end (ctx) {
if (!Object.hasOwn(ctx, 'result') || !ctx.span) return

if (ctx.socket.spanContext) ctx.span.addLink({ context: ctx.socket.spanContext })
if (ctx.socket.spanContext) {
const linkAttributes = {}

// Determine link kind based on whether this is peer close (incoming) or self close (outgoing)
const isIncoming = ctx.isPeerClose
linkAttributes['dd.kind'] = isIncoming ? 'executed_by' : 'resuming'

// Add span pointer for context propagation
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
const handshakeSpan = ctx.socket.handshakeSpan

// Only add span pointers if distributed tracing is enabled and handshake has distributed context
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
const counterType = isIncoming ? 'receiveCounter' : 'sendCounter'
const counter = incrementWebSocketCounter(ctx.socket, counterType)
const handshakeContext = handshakeSpan.context()

const ptrHash = buildWebSocketSpanPointerHash(
handshakeContext._traceId,
handshakeContext._spanId,
counter,
true, // isServer
isIncoming
)

const directionName = isIncoming
? SPAN_POINTER_DIRECTION_NAME.UPSTREAM
: SPAN_POINTER_DIRECTION_NAME.DOWNSTREAM
const direction = isIncoming
? SPAN_POINTER_DIRECTION.UPSTREAM
: SPAN_POINTER_DIRECTION.DOWNSTREAM

// Add span pointer attributes to link
linkAttributes['link.name'] = directionName
linkAttributes['dd.kind'] = 'span-pointer'
linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND
linkAttributes['ptr.dir'] = direction
linkAttributes['ptr.hash'] = ptrHash
}
}

ctx.span.addLink({
context: ctx.socket.spanContext,
attributes: linkAttributes
})
}

ctx.span.finish()
}
Expand Down
40 changes: 39 additions & 1 deletion packages/datadog-plugin-ws/src/producer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
'use strict'

const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
const {
incrementWebSocketCounter,
buildWebSocketSpanPointerHash,
hasDistributedTracingContext
} = require('./util')
const {
WEBSOCKET_PTR_KIND,
SPAN_POINTER_DIRECTION,
SPAN_POINTER_DIRECTION_NAME
} = require('../../dd-trace/src/constants')

class WSProducerPlugin extends TracingPlugin {
static get id () { return 'ws' }
Expand Down Expand Up @@ -51,9 +61,37 @@
if (!Object.hasOwn(ctx, 'result') || !ctx.span) return

if (ctx.socket.spanContext) {
const linkAttributes = { 'dd.kind': 'resuming' }

// Add span pointer for context propagation
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
const handshakeSpan = ctx.socket.handshakeSpan

Check failure on line 69 in packages/datadog-plugin-ws/src/producer.js

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// Only add span pointers if distributed tracing is enabled and handshake has distributed context
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
const counter = incrementWebSocketCounter(ctx.socket, 'sendCounter')
const handshakeContext = handshakeSpan.context()

const ptrHash = buildWebSocketSpanPointerHash(
handshakeContext._traceId,
handshakeContext._spanId,
counter,
true, // isServer
false // isIncoming (this is outgoing)
)

// Add span pointer attributes to link
linkAttributes['link.name'] = SPAN_POINTER_DIRECTION_NAME.DOWNSTREAM
linkAttributes['dd.kind'] = 'span-pointer'
linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND
linkAttributes['ptr.dir'] = SPAN_POINTER_DIRECTION.DOWNSTREAM
linkAttributes['ptr.hash'] = ptrHash
}
}

ctx.span.addLink({
context: ctx.socket.spanContext,
attributes: { 'dd.kind': 'resuming' },
attributes: linkAttributes,
})
}

Expand Down
40 changes: 39 additions & 1 deletion packages/datadog-plugin-ws/src/receiver.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
'use strict'

const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
const {
incrementWebSocketCounter,
buildWebSocketSpanPointerHash,
hasDistributedTracingContext
} = require('./util')
const {
WEBSOCKET_PTR_KIND,
SPAN_POINTER_DIRECTION,
SPAN_POINTER_DIRECTION_NAME
} = require('../../dd-trace/src/constants')

class WSReceiverPlugin extends TracingPlugin {
static get id () { return 'ws' }
Expand Down Expand Up @@ -61,9 +71,37 @@ class WSReceiverPlugin extends TracingPlugin {
if (!Object.hasOwn(ctx, 'result') || !ctx.span) return

if (ctx.socket.spanContext) {
const linkAttributes = { 'dd.kind': 'executed_by' }

// Add span pointer for context propagation
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
const handshakeSpan = ctx.socket.handshakeSpan

// Only add span pointers if distributed tracing is enabled and handshake has distributed context
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
const counter = incrementWebSocketCounter(ctx.socket, 'receiveCounter')
const handshakeContext = handshakeSpan.context()

const ptrHash = buildWebSocketSpanPointerHash(
handshakeContext._traceId,
handshakeContext._spanId,
counter,
true, // isServer
true // isIncoming
)

// Add span pointer attributes to link
linkAttributes['link.name'] = SPAN_POINTER_DIRECTION_NAME.UPSTREAM
linkAttributes['dd.kind'] = 'span-pointer'
linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND
linkAttributes['ptr.dir'] = SPAN_POINTER_DIRECTION.UPSTREAM
linkAttributes['ptr.hash'] = ptrHash
}
}

ctx.span.addLink({
context: ctx.socket.spanContext,
attributes: { 'dd.kind': 'executed_by' },
attributes: linkAttributes,
})
}

Expand Down
13 changes: 13 additions & 0 deletions packages/datadog-plugin-ws/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
const tags = require('../../../ext/tags.js')
const { initWebSocketMessageCounters } = require('./util')
const { FORMAT_HTTP_HEADERS } = require('../../../ext/formats')

const HTTP_STATUS_CODE = tags.HTTP_STATUS_CODE

Expand All @@ -28,9 +30,13 @@ class WSServerPlugin extends TracingPlugin {

ctx.args = { options }

// Extract distributed tracing context from request headers
const childOf = this.tracer.extract(FORMAT_HTTP_HEADERS, req.headers)

const service = this.serviceName({ pluginConfig: this.config })
const span = this.startSpan(this.operationName(), {
service,
childOf,
meta: {
'span.type': 'websocket',
'http.upgraded': 'websocket',
Expand All @@ -46,6 +52,13 @@ class WSServerPlugin extends TracingPlugin {

ctx.socket.spanContext = ctx.span._spanContext
ctx.socket.spanContext.spanTags = ctx.span._spanContext._tags
// Store the handshake span for use in message span pointers
ctx.socket.handshakeSpan = ctx.span
// Store the request headers for distributed tracing check
ctx.socket.requestHeaders = req.headers

// Initialize message counters for span pointers
initWebSocketMessageCounters(ctx.socket)

return ctx.currentStore
}
Expand Down
107 changes: 107 additions & 0 deletions packages/datadog-plugin-ws/src/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
'use strict'

// WeakMap to store message counters per socket without mutating the socket object
const socketCounters = new WeakMap()

/**
* Initializes WebSocket message counters for a socket.
* @param {object} socket - The WebSocket socket object
*/
function initWebSocketMessageCounters (socket) {
if (!socketCounters.has(socket)) {
socketCounters.set(socket, {
receiveCounter: 0,
sendCounter: 0
})
}
}

/**
* Increments and returns the WebSocket message counter.
* @param {object} socket - The WebSocket socket object
* @param {string} counterType - Either 'receiveCounter' or 'sendCounter'
* @returns {number} The incremented counter value
*/
function incrementWebSocketCounter (socket, counterType) {
if (!socketCounters.has(socket)) {
initWebSocketMessageCounters(socket)
}
const counters = socketCounters.get(socket)
counters[counterType]++
return counters[counterType]
}

/**
* Builds a WebSocket span pointer hash.
*
* Format: <prefix><128 bit hex trace id><64 bit hex span id><32 bit hex counter>
* Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing
*
* @param {bigint} handshakeTraceId - The trace ID from the handshake span (as a BigInt)
* @param {bigint} handshakeSpanId - The span ID from the handshake span (as a BigInt)
* @param {number} counter - The message counter
* @param {boolean} isServer - Whether this is a server (true) or client (false)
* @param {boolean} isIncoming - Whether this is an incoming message (true) or outgoing (false)
* @returns {string} The span pointer hash
*/
function buildWebSocketSpanPointerHash (handshakeTraceId, handshakeSpanId, counter, isServer, isIncoming) {
// Determine prefix based on server/client and incoming/outgoing
// Server outgoing or client incoming: 'S'
// Server incoming or client outgoing: 'C'
const prefix = (isServer && !isIncoming) || (!isServer && isIncoming) ? 'S' : 'C'

// Pad trace ID to 32 hex chars (128 bits)
const traceIdHex = handshakeTraceId.toString(16).padStart(32, '0')

// Pad span ID to 16 hex chars (64 bits)
const spanIdHex = handshakeSpanId.toString(16).padStart(16, '0')

// Pad counter to 8 hex chars (32 bits)
const counterHex = counter.toString(16).padStart(8, '0')

return `${prefix}${traceIdHex}${spanIdHex}${counterHex}`
}

/**
* Checks if the handshake span has extracted distributed tracing context.
* A websocket server must not set the span pointer if the handshake has not extracted a context.
*

Check failure on line 68 in packages/datadog-plugin-ws/src/util.js

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
* A span has distributed tracing context if it has a parent context that was
* extracted from headers (remote parent).
*

Check failure on line 71 in packages/datadog-plugin-ws/src/util.js

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
* @param {object} span - The handshake span
* @param {object} socket - The WebSocket socket object

Check failure on line 73 in packages/datadog-plugin-ws/src/util.js

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
* @returns {boolean} True if the span has distributed tracing context
*/
function hasDistributedTracingContext (span, socket) {
if (!span) return false
const context = span.context()
if (!context) return false

Check failure on line 80 in packages/datadog-plugin-ws/src/util.js

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// Check if this span has a parent. If the parent was extracted from remote headers,
// then this span is part of a distributed trace.
// We check if the span has a parent by looking at _parentId.
// In the JavaScript tracer, when a context is extracted from headers and a child span
// is created, the child will have _parentId set to the extracted parent's span ID.
//
// For testing purposes, we also check if Datadog trace headers are present in the socket's
// upgrade request, which indicates distributed tracing context was sent by the client.
if (context._parentId !== null) {
return true
}

Check failure on line 92 in packages/datadog-plugin-ws/src/util.js

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
// Fallback check: look for distributed tracing headers in the stored request headers
if (socket && socket.requestHeaders) {
const headers = socket.requestHeaders
return !!(headers['x-datadog-trace-id'] || headers['traceparent'])

Check failure on line 96 in packages/datadog-plugin-ws/src/util.js

View workflow job for this annotation

GitHub Actions / lint

["traceparent"] is better written in dot notation
}

Check failure on line 98 in packages/datadog-plugin-ws/src/util.js

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces not allowed
return false
}

module.exports = {
initWebSocketMessageCounters,
incrementWebSocketCounter,
buildWebSocketSpanPointerHash,
hasDistributedTracingContext
}
Loading
Loading