Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
/packages/dd-trace/src/llmobs/ @DataDog/ml-observability
/packages/dd-trace/test/llmobs/ @DataDog/ml-observability

# Data Streams Monitoring
/packages/dd-trace/src/datastreams/ @DataDog/data-streams-monitoring
/packages/dd-trace/test/datastreams/ @DataDog/data-streams-monitoring
/packages/**/dsm.spec.js @DataDog/data-streams-monitoring
/packages/**/*.dsm.spec.js @DataDog/data-streams-monitoring

# API SDK
/packages/dd-trace/src/telemetry/ @DataDog/apm-sdk-capabilities-js
/packages/dd-trace/test/telemetry/ @DataDog/apm-sdk-capabilities-js
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"test:llmobs:plugins:ci": "yarn services && nyc --no-clean --include \"packages/dd-trace/src/llmobs/**/*.js\" -- npm run test:llmobs:plugins",
"test:openfeature": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/dd-trace/test/openfeature/*.spec.js\"",
"test:openfeature:ci": "nyc --no-clean --include \"packages/dd-trace/src/openfeature/**/*.js\" -- npm run test:openfeature",
"test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/@($(echo ${SPEC:-'*'})).spec.js\"",
"test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/@($(echo ${SPEC:-'*'}))*.spec.js\"",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so i.e. SPEC=sqs will still test sqs.dsm.spec.js, which is consistent with how this would work if SPEC wasn't set.

"test:plugins:ci": "yarn services && nyc --no-clean --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins",
"test:plugins:ci:flaky": "yarn services && nyc --no-clean --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins -- --bail --retries 2",
"test:plugins:upstream": "node ./packages/dd-trace/test/plugins/suite.js",
Expand Down
281 changes: 281 additions & 0 deletions packages/datadog-plugin-amqplib/test/dsm.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
'use strict'

const assert = require('node:assert/strict')
const { Buffer } = require('node:buffer')

const { afterEach, beforeEach, describe, it } = require('mocha')

const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor')
const id = require('../../dd-trace/src/id')
const agent = require('../../dd-trace/test/plugins/agent')
const { withVersions } = require('../../dd-trace/test/setup/mocha')
const { assertObjectContains } = require('../../../integration-tests/helpers')

describe('Plugin', () => {
let connection
let channel
let queue

describe('amqplib', () => {
withVersions('amqplib', 'amqplib', version => {
beforeEach(() => {
process.env.DD_DATA_STREAMS_ENABLED = 'true'
queue = `test-${id()}`
})

afterEach(() => {
connection.close()
})

describe('data stream monitoring', function () {
this.timeout(10000)

let expectedProducerHashWithTopic
let expectedProducerHashWithExchange
let expectedConsumerHash

beforeEach(done => {
agent.load('amqplib').then(() => {
require(`../../../versions/amqplib@${version}`).get('amqplib/callback_api')
.connect((err, conn) => {
connection = conn

if (err != null) {
return done(err)
}

conn.createChannel((err, ch) => {
channel = ch
return done(err)
})
})
})
})

afterEach(() => {
return agent.close({ ritmReset: false })
})

beforeEach(() => {
const producerHashWithTopic = computePathwayHash('test', 'tester', [
'direction:out',
'has_routing_key:true',
`topic:${queue}`,
'type:rabbitmq'
], ENTRY_PARENT_HASH)

expectedProducerHashWithTopic = producerHashWithTopic.readBigUInt64LE(0).toString()

expectedProducerHashWithExchange = computePathwayHash('test', 'tester', [
'direction:out',
'exchange:namedExchange',
'has_routing_key:true',
'type:rabbitmq'
], ENTRY_PARENT_HASH).readBigUInt64LE(0).toString()

expectedConsumerHash = computePathwayHash('test', 'tester', [
'direction:in',
`topic:${queue}`,
'type:rabbitmq'
], producerHashWithTopic).readBigUInt64LE(0).toString()
})

it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
assert.ok(statsPointsReceived.length >= 1)
assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [
'direction:out',
'has_routing_key:true',
`topic:${queue}`,
'type:rabbitmq'
])
assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
})
})

it('Should emit DSM stats to the agent when sending a message on an named exchange', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
assert.ok(statsPointsReceived.length >= 1)
assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [
'direction:out',
'exchange:namedExchange',
'has_routing_key:true',
'type:rabbitmq'
])
assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithExchange), true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertExchange('namedExchange', 'direct', {}, (err, ok) => {
if (err) return done(err)

channel.publish('namedExchange', 'anyOldRoutingKey', Buffer.from('DSM pathway test'))
})
})

it('Should emit DSM stats to the agent when receiving a message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
assert.strictEqual(statsPointsReceived.length, 2)
assert.deepStrictEqual(statsPointsReceived[1].EdgeTags,
['direction:in', `topic:${queue}`, 'type:rabbitmq'])
assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
channel.consume(ok.queue, () => {}, {}, (err, ok) => {
if (err) done(err)
})
})
})

it('Should emit DSM stats to the agent when sending another message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
assert.strictEqual(statsPointsReceived.length, 1)
assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [
'direction:out',
'has_routing_key:true',
`topic:${queue}`,
'type:rabbitmq'
])
assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
})
})

it('Should emit DSM stats to the agent when receiving a message with get', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
assert.strictEqual(statsPointsReceived.length, 2)
assert.deepStrictEqual(statsPointsReceived[1].EdgeTags,
['direction:in', `topic:${queue}`, 'type:rabbitmq'])
assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
channel.get(ok.queue, {}, (err, ok) => {
if (err) done(err)
})
})
})

it('regression test: should handle basic.get when queue is empty', done => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.get(ok.queue, {}, (err, msg) => {
if (err) return done(err)
assert.strictEqual(msg, false)
done()
})
})
})

it('Should set pathway hash tag on a span when producing', (done) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('dsm test'))

let produceSpanMeta = {}
agent.assertSomeTraces(traces => {
const span = traces[0][0]

if (span.resource.startsWith('basic.publish')) {
produceSpanMeta = span.meta
}

assertObjectContains(produceSpanMeta, {
'pathway.hash': expectedProducerHashWithTopic
})
}, { timeoutMs: 10000 }).then(done, done)
})
})

it('Should set pathway hash tag on a span when consuming', (done) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('dsm test'))
channel.consume(ok.queue, () => {}, {}, (err, ok) => {
if (err) return done(err)

let consumeSpanMeta = {}
agent.assertSomeTraces(traces => {
const span = traces[0][0]

if (span.resource.startsWith('basic.deliver')) {
consumeSpanMeta = span.meta
}

assertObjectContains(consumeSpanMeta, {
'pathway.hash': expectedConsumerHash
})
}, { timeoutMs: 10000 }).then(done, done)
})
})
})
})
})
})
})
Loading
Loading