From 8bad5bb9039de306bea5519ad4b40b92b961c11a Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 10 Dec 2025 13:31:25 -0500 Subject: [PATCH 01/11] simple migration of DSM test behavior to datastreams folder --- CODEOWNERS | 5 + .../datadog-plugin-amqplib/test/index.spec.js | 227 ------------ .../test/kinesis.spec.js | 188 ---------- .../datadog-plugin-aws-sdk/test/sns.spec.js | 196 ----------- .../datadog-plugin-aws-sdk/test/sqs.spec.js | 252 -------------- .../test/index.spec.js | 235 ------------- .../test/index.spec.js | 107 ------ .../datadog-plugin-kafkajs/test/index.spec.js | 163 --------- .../test/datastreams/plugins/amqplib.spec.js | 284 +++++++++++++++ .../plugins/aws-sdk/kinesis.spec.js | 243 +++++++++++++ .../datastreams/plugins/aws-sdk/sns.spec.js | 269 +++++++++++++++ .../datastreams/plugins/aws-sdk/sqs.spec.js | 308 +++++++++++++++++ .../confluentinc-kafka-javascript.spec.js | 325 ++++++++++++++++++ .../plugins/google-cloud-pubsub.spec.js | 166 +++++++++ .../test/datastreams/plugins/kafkajs.spec.js | 247 +++++++++++++ 15 files changed, 1847 insertions(+), 1368 deletions(-) create mode 100644 packages/dd-trace/test/datastreams/plugins/amqplib.spec.js create mode 100644 packages/dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js create mode 100644 packages/dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js create mode 100644 packages/dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js create mode 100644 packages/dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js create mode 100644 packages/dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js create mode 100644 packages/dd-trace/test/datastreams/plugins/kafkajs.spec.js diff --git a/CODEOWNERS b/CODEOWNERS index c8129b66780..4145d83846e 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -17,6 +17,11 @@ /packages/dd-trace/src/lambda/ @DataDog/serverless-aws /packages/dd-trace/test/lambda/ @DataDog/serverless-aws +/packages/dd-trace/src/datastreams/ @DataDog/data-stream-monitoring +/packages/dd-trace/test/datastreams/ @DataDog/data-stream-monitoring +/packages/datadog-plugin-avsc/ @DataDog/data-stream-monitoring +/packages/datadog-plugin-protobufjs/ @DataDog/data-stream-monitoring + /packages/datadog-plugin-*/ @Datadog/dd-trace-js @Datadog/apm-idm-js /packages/datadog-instrumentations/ @Datadog/dd-trace-js @Datadog/apm-idm-js /packages/ddtrace/src/plugins/ @DataDog/dd-trace-js @Datadog/apm-idm-js diff --git a/packages/datadog-plugin-amqplib/test/index.spec.js b/packages/datadog-plugin-amqplib/test/index.spec.js index 408efc892d8..287417ddb5a 100644 --- a/packages/datadog-plugin-amqplib/test/index.spec.js +++ b/packages/datadog-plugin-amqplib/test/index.spec.js @@ -309,233 +309,6 @@ describe('Plugin', () => { .catch(done) }) }) - - describe('when data streams monitoring is enabled', function () { - this.timeout(10000) - - let expectedProducerHashWithTopic - let expectedProducerHashWithExchange - let expectedConsumerHash - - beforeEach(() => { - const producerHashWithTopic = computePathwayHash('test', 'tester', [ - 'direction:out', - 'has_routing_key:true', - `topic:${queue}`, - 'type:rabbitmq' - ], ENTRY_PARENT_HASH) - - expectedProducerHashWithTopic = producerHashWithTopic.readBigUInt64LE(0).toString() - - expectedProducerHashWithExchange = computePathwayHash('test', 'tester', [ - 'direction:out', - 'exchange:namedExchange', - 'has_routing_key:true', - 'type:rabbitmq' - ], ENTRY_PARENT_HASH).readBigUInt64LE(0).toString() - - expectedConsumerHash = computePathwayHash('test', 'tester', [ - 'direction:in', - `topic:${queue}`, - 'type:rabbitmq' - ], producerHashWithTopic).readBigUInt64LE(0).toString() - }) - - it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.ok(statsPointsReceived.length >= 1) - assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ - 'direction:out', - 'has_routing_key:true', - `topic:${queue}`, - 'type:rabbitmq' - ]) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - }) - }) - - it('Should emit DSM stats to the agent when sending a message on an named exchange', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.ok(statsPointsReceived.length >= 1) - assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ - 'direction:out', - 'exchange:namedExchange', - 'has_routing_key:true', - 'type:rabbitmq' - ]) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithExchange), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertExchange('namedExchange', 'direct', {}, (err, ok) => { - if (err) return done(err) - - channel.publish('namedExchange', 'anyOldRoutingKey', Buffer.from('DSM pathway test')) - }) - }) - - it('Should emit DSM stats to the agent when receiving a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.strictEqual(statsPointsReceived.length, 2) - assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, - ['direction:in', `topic:${queue}`, 'type:rabbitmq']) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - channel.consume(ok.queue, () => {}, {}, (err, ok) => { - if (err) done(err) - }) - }) - }) - - it('Should emit DSM stats to the agent when sending another message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.strictEqual(statsPointsReceived.length, 1) - assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ - 'direction:out', - 'has_routing_key:true', - `topic:${queue}`, - 'type:rabbitmq' - ]) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - }) - }) - - it('Should emit DSM stats to the agent when receiving a message with get', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.strictEqual(statsPointsReceived.length, 2) - assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, - ['direction:in', `topic:${queue}`, 'type:rabbitmq']) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - channel.get(ok.queue, {}, (err, ok) => { - if (err) done(err) - }) - }) - }) - - it('regression test: should handle basic.get when queue is empty', done => { - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.get(ok.queue, {}, (err, msg) => { - if (err) return done(err) - assert.strictEqual(msg, false) - done() - }) - }) - }) - - it('Should set pathway hash tag on a span when producing', (done) => { - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('dsm test')) - - let produceSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('basic.publish')) { - produceSpanMeta = span.meta - } - - assertObjectContains(produceSpanMeta, { - 'pathway.hash': expectedProducerHashWithTopic - }) - }, { timeoutMs: 10000 }).then(done, done) - }) - }) - - it('Should set pathway hash tag on a span when consuming', (done) => { - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('dsm test')) - channel.consume(ok.queue, () => {}, {}, (err, ok) => { - if (err) return done(err) - - let consumeSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('basic.deliver')) { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }, { timeoutMs: 10000 }).then(done, done) - }) - }) - }) - }) }) describe('with configuration', () => { diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js index 7dc7f3ae014..3e31ff32364 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js @@ -194,193 +194,5 @@ describe('Kinesis', function () { }) }) }) - - describe('DSM Context Propagation', () => { - let expectedProducerHash - let expectedConsumerHash - let nowStub - let streamNameDSM - - beforeEach(() => { - return agent.load('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - beforeEach(done => { - tracer = require('../../dd-trace') - tracer.use('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - - streamNameDSM = `MyStreamDSM-${id()}` - - const producerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + streamNameDSM, 'type:kinesis'], - ENTRY_PARENT_HASH - ) - - expectedProducerHash = producerHash.readBigUInt64LE(0).toString() - expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'topic:' + streamNameDSM, 'type:kinesis'], - producerHash - ).readBigUInt64LE(0).toString() - - createResources(streamNameDSM, done) - }) - - afterEach(done => { - kinesis.deleteStream({ - StreamName: streamNameDSM - }, (err, res) => { - if (err) return done(err) - - helpers.waitForDeletedStream(kinesis, streamNameDSM, done) - }) - }) - - afterEach(() => { - try { - nowStub.restore() - } catch { - // pass - } - agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - it('injects DSM pathway hash during Kinesis getRecord to the span', done => { - let getRecordSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.response') { - getRecordSpanMeta = span.meta - } - - assertObjectContains(getRecordSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { - if (err) return done(err) - - helpers.getTestData(kinesis, streamNameDSM, data, () => {}) - }) - }) - - it('injects DSM pathway hash during Kinesis putRecord to the span', done => { - let putRecordSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('putRecord')) { - putRecordSpanMeta = span.meta - } - - assertObjectContains(putRecordSpanMeta, { - 'pathway.hash': expectedProducerHash - }) - }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) - }) - - it('emits DSM stats to the agent during Kinesis putRecord', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 1 stats point since we only had 1 put operation - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) - }) - - it('emits DSM stats to the agent during Kinesis getRecord', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 1 stats point since we only had 1 put operation - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }, { timeoutMs: 10000 }) - assert.ok(statsPointsReceived >= 2) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { - if (err) return done(err) - - helpers.getTestData(kinesis, streamNameDSM, data, () => {}) - }) - }) - - // eslint-disable-next-line @stylistic/max-len - it('emits DSM stats to the agent during Kinesis getRecord when the putRecord was done without DSM enabled', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 1 stats point since we only had 1 put operation - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }, { timeoutMs: 10000 }) - assert.strictEqual(statsPointsReceived, 1) - assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) - }, { timeoutMs: 10000 }).then(done, done) - - // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. - agent.reload('aws-sdk', { kinesis: { dsmEnabled: false } }, { dsmEnabled: false }) - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { - if (err) return done(err) - - // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. - agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - helpers.getTestData(kinesis, streamNameDSM, data, () => {}) - }) - }) - - it('emits DSM stats to the agent during Kinesis putRecords', done => { - // we need to stub Date.now() to ensure a new stats bucket is created for each call - // otherwise, all stats checkpoints will be combined into a single stats points - let now = Date.now() - nowStub = sinon.stub(Date, 'now') - nowStub.callsFake(() => { - now += 1000000 - return now - }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 3 stats points since we only had 3 records published - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 3) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecords(kinesis, streamNameDSM, (err, data) => { - // Swallow the error as it doesn't matter for this test. - }) - }) - }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/test/sns.spec.js b/packages/datadog-plugin-aws-sdk/test/sns.spec.js index 9e5d2c5d450..687b7ec771c 100644 --- a/packages/datadog-plugin-aws-sdk/test/sns.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sns.spec.js @@ -502,201 +502,5 @@ describe('Sns', function () { sns.publish({ TopicArn, Message: 'message 1' }, e => e && done(e)) }) }) - - describe('Data Streams Monitoring', () => { - const expectedProducerHash = '15386798273908484982' - const expectedConsumerHash = '15162998336469814920' - let nowStub - - before(() => { - return agent.load('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - before(done => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../dd-trace') - tracer.use('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }) - - createResources('TestQueueDSM', 'TestTopicDSM', done) - }) - - after(done => { - sns.deleteTopic({ TopicArn }, done) - }) - - after(done => { - sqs.deleteQueue({ QueueUrl }, done) - }) - - after(() => { - return agent.close({ ritmReset: false, wipe: true }) - }) - - afterEach(() => { - try { - nowStub.restore() - } catch { - // pass - } - // TODO: Fix this. The third argument is not used. - agent.reload('aws-sdk', { sns: { dsmEnabled: true, batchPropagationEnabled: true } }, { dsmEnabled: true }) - }) - - it('injects DSM pathway hash to SNS publish span', done => { - sns.subscribe(subParams, (err, data) => { - if (err) return done(err) - - sns.publish( - { TopicArn, Message: 'message DSM' }, - (err) => { - if (err) return done(err) - - let publishSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('publish')) { - publishSpanMeta = span.meta - } - - assertObjectContains(publishSpanMeta, { - 'pathway.hash': expectedProducerHash - }) - }).then(done, done) - }) - }) - }) - - it('injects DSM pathway hash to SQS receive span from SNS topic', done => { - sns.subscribe(subParams, (err, data) => { - if (err) return done(err) - - sns.publish( - { TopicArn, Message: 'message DSM' }, - (err) => { - if (err) return done(err) - }) - - sqs.receiveMessage( - receiveParams, - (err, res) => { - if (err) return done(err) - - let consumeSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.response') { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }).then(done, done) - }) - }) - }) - - it('outputs DSM stats to the agent when publishing a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }).then(done, done) - - sns.subscribe(subParams, () => { - sns.publish({ TopicArn, Message: 'message DSM' }, () => {}) - }) - }) - - it('outputs DSM stats to the agent when consuming a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 2) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }).then(done, done) - - sns.subscribe(subParams, () => { - sns.publish({ TopicArn, Message: 'message DSM' }, () => { - sqs.receiveMessage(receiveParams, () => {}) - }) - }) - }) - - it('outputs DSM stats to the agent when publishing batch messages', function (done) { - // publishBatch was released with version 2.1031.0 for the aws-sdk - // publishBatch does not work with smithy-client 3.0.0, unable to find compatible version it - // was released for, but works on 3.374.0 - if ( - (moduleName === '@aws-sdk/smithy-client' && semver.intersects(version, '>=3.374.0')) || - (moduleName === 'aws-sdk' && semver.intersects(version, '>=2.1031.0')) - ) { - // we need to stub Date.now() to ensure a new stats bucket is created for each call - // otherwise, all stats checkpoints will be combined into a single stats points - let now = Date.now() - nowStub = sinon.stub(Date, 'now') - nowStub.callsFake(() => { - now += 1000000 - return now - }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 3 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 3) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }, { timeoutMs: 2000 }).then(done, done) - - sns.subscribe(subParams, () => { - sns.publishBatch( - { - TopicArn, - PublishBatchRequestEntries: [ - { - Id: '1', - Message: 'message DSM 1' - }, - { - Id: '2', - Message: 'message DSM 2' - }, - { - Id: '3', - Message: 'message DSM 3' - } - ] - }, () => { - nowStub.restore() - }) - }) - } else { - this.skip() - } - }) - }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js index 31e20f623bd..2d70aa65d0c 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js @@ -416,258 +416,6 @@ describe('Plugin', () => { }, 250) }) }) - - describe('data stream monitoring', () => { - let expectedProducerHash - let expectedConsumerHash - let nowStub - - before(() => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../dd-trace') - tracer.use('aws-sdk', { sqs: { dsmEnabled: true } }) - }) - - before(async () => { - return agent.load('aws-sdk', { - sqs: { - dsmEnabled: true - } - }, - { dsmEnabled: true }) - }) - - before(() => { - AWS = require(`../../../versions/${sqsClientName}@${version}`).get() - sqs = new AWS.SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) - }) - - beforeEach(() => { - const producerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + queueNameDSM, 'type:sqs'], - ENTRY_PARENT_HASH - ) - - expectedProducerHash = producerHash.readBigUInt64LE(0).toString() - expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'topic:' + queueNameDSM, 'type:sqs'], - producerHash - ).readBigUInt64LE(0).toString() - }) - - beforeEach(done => { - sqs.createQueue(queueOptionsDsm, (err, res) => err ? done(err) : done()) - }) - - beforeEach(done => { - sqs.createQueue(queueOptionsDsmConsumerOnly, (err, res) => err ? done(err) : done()) - }) - - afterEach(done => { - sqs.deleteQueue({ QueueUrl: QueueUrlDsm }, done) - }) - - afterEach(done => { - sqs.deleteQueue({ QueueUrl: QueueUrlDsmConsumerOnly }, done) - }) - - after(() => { - return agent.close({ ritmReset: false }) - }) - - afterEach(() => { - try { - nowStub.restore() - } catch { - // pass - } - agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - it('Should set pathway hash tag on a span when producing', (done) => { - sqs.sendMessage({ - MessageBody: 'test DSM', - QueueUrl: QueueUrlDsm - }, (err) => { - if (err) return done(err) - - let produceSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('sendMessage')) { - produceSpanMeta = span.meta - } - - assertObjectContains(produceSpanMeta, { - 'pathway.hash': expectedProducerHash - }) - }).then(done, done) - }) - }) - - it('Should set pathway hash tag on a span when consuming', (done) => { - sqs.sendMessage({ - MessageBody: 'test DSM', - QueueUrl: QueueUrlDsm - }, (err) => { - if (err) return done(err) - - sqs.receiveMessage({ - QueueUrl: QueueUrlDsm, - MessageAttributeNames: ['.*'] - }, (err) => { - if (err) return done(err) - - let consumeSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.response') { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }).then(done, done) - }) - }) - }) - - if (sqsClientName === 'aws-sdk' && semver.intersects(version, '>=2.3')) { - it('Should set pathway hash tag on a span when consuming and promise() was used over a callback', - async () => { - let consumeSpanMeta = {} - const tracePromise = agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.request' && span.meta['aws.operation'] === 'receiveMessage') { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }) - - await sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }).promise() - await sqs.receiveMessage({ QueueUrl: QueueUrlDsm }).promise() - - return tracePromise - }) - } - - it('Should emit DSM stats to the agent when sending a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }).then(done, done) - - sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => {}) - }) - - it('Should emit DSM stats to the agent when receiving a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 2) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 5000 }).then(done, done) - - sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => { - sqs.receiveMessage({ QueueUrl: QueueUrlDsm, MessageAttributeNames: ['.*'] }, () => {}) - }) - }) - - it('Should emit DSM stats when receiving a message when the producer was not instrumented', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.strictEqual(statsPointsReceived, 1) - assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) - }).then(done, done) - - agent.reload('aws-sdk', { sqs: { dsmEnabled: false } }, { dsmEnabled: false }) - sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsmConsumerOnly }, () => { - agent.reload('aws-sdk', { sqs: { dsmEnabled: true } }, { dsmEnabled: true }) - sqs.receiveMessage({ QueueUrl: QueueUrlDsmConsumerOnly, MessageAttributeNames: ['.*'] }, () => {}) - }) - }) - - it('Should emit DSM stats to the agent when sending batch messages', done => { - // we need to stub Date.now() to ensure a new stats bucket is created for each call - // otherwise, all stats checkpoints will be combined into a single stats points - let now = Date.now() - nowStub = sinon.stub(Date, 'now') - nowStub.callsFake(() => { - now += 1000000 - return now - }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 3 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 3) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }).then(done, done) - - sqs.sendMessageBatch( - { - Entries: [ - { - Id: '1', - MessageBody: 'test DSM 1' - }, - { - Id: '2', - MessageBody: 'test DSM 2' - }, - { - Id: '3', - MessageBody: 'test DSM 3' - } - ], - QueueUrl: QueueUrlDsm - }, () => { - nowStub.restore() - }) - }) - }) }) }) }) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 27cafbcdb2f..c138e6142d3 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -503,241 +503,6 @@ describe('Plugin', () => { // }) }) }) - - describe('data stream monitoring', () => { - let consumer - let expectedProducerHash - let expectedConsumerHash - - beforeEach(async () => { - tracer.init() - tracer.use('confluentinc-kafka-javascript', { dsmEnabled: true }) - messages = [{ key: 'key1', value: 'test2' }] - consumer = kafka.consumer({ - kafkaJS: { groupId, fromBeginning: true } - }) - await consumer.connect() - await consumer.subscribe({ topic: testTopic }) - }) - - beforeEach(() => { - expectedProducerHash = getDsmPathwayHash(testTopic, true, ENTRY_PARENT_HASH) - expectedConsumerHash = getDsmPathwayHash(testTopic, false, expectedProducerHash) - }) - - afterEach(async () => { - await consumer.disconnect() - }) - - describe('checkpoints', () => { - let setDataStreamsContextSpy - - beforeEach(() => { - setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') - }) - - afterEach(async () => { - setDataStreamsContextSpy.restore() - }) - - it('Should set a checkpoint on produce', async () => { - const messages = [{ key: 'consumerDSM1', value: 'test2' }] - await sendMessages(kafka, testTopic, messages) - expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) - }) - - it('Should set a checkpoint on consume (eachMessage)', async () => { - const runArgs = [] - let consumerReceiveMessagePromise - await consumer.run({ - eachMessage: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - consumerReceiveMessagePromise = Promise.resolve() - } - }) - await sendMessages(kafka, testTopic, messages).then( - async () => await consumerReceiveMessagePromise - ) - - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a checkpoint on consume (eachBatch)', async () => { - const runArgs = [] - let consumerReceiveMessagePromise - await consumer.run({ - eachBatch: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - consumerReceiveMessagePromise = Promise.resolve() - } - }) - await sendMessages(kafka, testTopic, messages).then( - async () => await consumerReceiveMessagePromise - ) - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a message payload size when producing a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - await sendMessages(kafka, testTopic, messages) - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - }) - - it('Should set a message payload size when consuming a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - let consumerReceiveMessagePromise - await consumer.run({ - eachMessage: async () => { - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - consumerReceiveMessagePromise = Promise.resolve() - } - }) - await sendMessages(kafka, testTopic, messages).then( - async () => await consumerReceiveMessagePromise - ) - }) - }) - - describe('backlogs', () => { - let setOffsetSpy - - beforeEach(() => { - setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') - }) - - afterEach(() => { - setOffsetSpy.restore() - }) - - it('Should add backlog on consumer explicit commit', async () => { - // Send a message, consume it, and record the last consumed offset - let commitMeta - - let messageProcessedResolve - const messageProcessedPromise = new Promise(resolve => { - messageProcessedResolve = resolve - }) - - const consumerRunPromise = consumer.run({ - eachMessage: async payload => { - const { topic, partition, message } = payload - commitMeta = { - topic, - partition, - offset: Number(message.offset) - } - messageProcessedResolve() - } - }) - - await consumerRunPromise - - // wait for the message to be processed before continuing - await sendMessages(kafka, testTopic, messages) - await messageProcessedPromise - await consumer.disconnect() - - for (const call of setOffsetSpy.getCalls()) { - expect(call.args[0]).to.not.have.property('type', 'kafka_commit') - } - - const newConsumer = kafka.consumer({ - kafkaJS: { groupId, fromBeginning: true, autoCommit: false } - }) - await newConsumer.connect() - await sendMessages(kafka, testTopic, [{ key: 'key1', value: 'test2' }]) - await newConsumer.run({ - eachMessage: async () => { - await newConsumer.disconnect() - } - }) - setOffsetSpy.resetHistory() - await newConsumer.commitOffsets() - - // Check our work - const runArg = setOffsetSpy.lastCall.args[0] - expect(runArg).to.have.property('offset', commitMeta.offset) - expect(runArg).to.have.property('partition', commitMeta.partition) - expect(runArg).to.have.property('topic', commitMeta.topic) - expect(runArg).to.have.property('type', 'kafka_commit') - expect(runArg).to.have.property('consumer_group', groupId) - }) - - it('Should add backlog on producer response', async () => { - await sendMessages(kafka, testTopic, messages) - expect(setOffsetSpy).to.be.calledOnce - const { topic } = setOffsetSpy.lastCall.args[0] - expect(topic).to.equal(testTopic) - }) - }) - - describe('when using a kafka broker version that does not support message headers', () => { - class KafkaJSError extends Error { - constructor (message) { - super(message) - this.name = 'KafkaJSError' - this.type = 'ERR_UNKNOWN' - } - } - let error - let producer - let produceStub - - beforeEach(async () => { - // simulate a kafka error for the broker version not supporting message headers - error = new KafkaJSError() - error.message = 'Simulated KafkaJSError ERR_UNKNOWN from Producer.produce stub' - producer = kafka.producer() - await producer.connect() - - // Spy on the produce method from the native library before it gets wrapped - produceStub = sinon.stub(nativeApi.Producer.prototype, 'produce') - .callsFake((topic, partition, message, key) => { - throw error - }) - }) - - afterEach(async () => { - produceStub.restore() - await producer.disconnect() - }) - - it('should hit an error for the first send and not inject headers in later sends', async () => { - const testMessages = [{ key: 'key1', value: 'test1' }] - const testMessages2 = [{ key: 'key2', value: 'test2' }] - - try { - await producer.send({ topic: testTopic, messages: testMessages }) - expect.fail('First producer.send() should have thrown an error') - } catch (e) { - expect(e).to.equal(error) - } - // Verify headers were injected in the first attempt - expect(testMessages[0].headers[0]).to.have.property('x-datadog-trace-id') - - // restore the stub to allow the next send to succeed - produceStub.restore() - - const result = await producer.send({ topic: testTopic, messages: testMessages2 }) - expect(testMessages2[0].headers).to.be.null - expect(result).to.not.be.undefined - }) - }) - }) }) }) }) diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js index acc7cebbece..34c4caaddc4 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js @@ -333,113 +333,6 @@ describe('Plugin', () => { }) }) - describe('data stream monitoring', () => { - let dsmTopic - let sub - let consume - - beforeEach(() => { - return agent.load('google-cloud-pubsub', { - dsmEnabled: true - }) - }) - - before(async () => { - const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get() - project = getProjectId() - resource = `projects/${project}/topics/${dsmTopicName}` - pubsub = new PubSub({ projectId: project }) - tracer.use('google-cloud-pubsub', { dsmEnabled: true }) - - dsmTopic = await pubsub.createTopic(dsmTopicName) - dsmTopic = dsmTopic[0] - sub = await dsmTopic.createSubscription('DSM') - sub = sub[0] - consume = function (cb) { - sub.on('message', cb) - } - - const dsmFullTopic = `projects/${project}/topics/${dsmTopicName}` - - expectedProducerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + dsmFullTopic, 'type:google-pubsub'], - ENTRY_PARENT_HASH - ) - expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'topic:' + dsmFullTopic, 'type:google-pubsub'], - expectedProducerHash - ) - }) - - describe('should set a DSM checkpoint', () => { - it('on produce', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true) - }, { timeoutMs: TIMEOUT }) - }) - - it('on consume', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM consume checkpoint') }) - await consume(async () => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 2) - expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true) - }, { timeoutMs: TIMEOUT }) - }) - }) - }) - - describe('it should set a message payload size', () => { - let recordCheckpointSpy - - beforeEach(() => { - recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - }) - - afterEach(() => { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - }) - - it('when producing a message', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') }) - assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - }) - - it('when consuming a message', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') }) - - await consume(async () => { - assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - }) - }) - }) - }) - function expectSpanWithDefaults (expected) { const prefixedResource = [expected.meta['pubsub.method'], resource].filter(x => x).join(' ') const service = expected.meta['pubsub.method'] ? 'test-pubsub' : 'test' diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 6f3e7dd33cc..2f1cde2f81f 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -433,169 +433,6 @@ describe('Plugin', () => { rawExpectedSchema.receive ) }) - - describe('data stream monitoring', () => { - let consumer - - beforeEach(async () => { - tracer.init() - tracer.use('kafkajs', { dsmEnabled: true }) - consumer = kafka.consumer({ groupId: 'test-group' }) - await consumer.connect() - await consumer.subscribe({ topic: testTopic }) - }) - - before(() => { - clusterIdAvailable = semver.intersects(version, '>=1.13') - expectedProducerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, true, ENTRY_PARENT_HASH) - expectedConsumerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, false, expectedProducerHash) - }) - - afterEach(async () => { - await consumer.disconnect() - }) - - describe('checkpoints', () => { - let setDataStreamsContextSpy - - beforeEach(() => { - setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') - }) - - afterEach(() => { - setDataStreamsContextSpy.restore() - }) - - it('Should set a checkpoint on produce', async () => { - const messages = [{ key: 'consumerDSM1', value: 'test2' }] - await sendMessages(kafka, testTopic, messages) - expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) - }) - - it('Should set a checkpoint on consume (eachMessage)', async () => { - const runArgs = [] - await consumer.run({ - eachMessage: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - } - }) - await sendMessages(kafka, testTopic, messages) - await consumer.disconnect() - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a checkpoint on consume (eachBatch)', async () => { - const runArgs = [] - await consumer.run({ - eachBatch: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - } - }) - await sendMessages(kafka, testTopic, messages) - await consumer.disconnect() - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a message payload size when producing a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - await sendMessages(kafka, testTopic, messages) - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - }) - - it('Should set a message payload size when consuming a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - await sendMessages(kafka, testTopic, messages) - await consumer.run({ - eachMessage: async () => { - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - } - }) - }) - }) - - describe('backlogs', () => { - let setOffsetSpy - - beforeEach(() => { - setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') - }) - - afterEach(() => { - setOffsetSpy.restore() - }) - - if (semver.intersects(version, '>=1.10')) { - it('Should add backlog on consumer explicit commit', async () => { - // Send a message, consume it, and record the last consumed offset - let commitMeta - const deferred = {} - deferred.promise = new Promise((resolve, reject) => { - deferred.resolve = resolve - deferred.reject = reject - }) - await consumer.run({ - eachMessage: async payload => { - const { topic, partition, message } = payload - commitMeta = { - topic, - partition, - offset: Number(message.offset) - } - deferred.resolve() - }, - autoCommit: false - }) - await sendMessages(kafka, testTopic, messages) - await deferred.promise - await consumer.disconnect() // Flush ongoing `eachMessage` calls - for (const call of setOffsetSpy.getCalls()) { - expect(call.args[0]).to.not.have.property('type', 'kafka_commit') - } - - /** - * No choice but to reinitialize everything, because the only way to flush eachMessage - * calls is to disconnect. - */ - consumer.connect() - await sendMessages(kafka, testTopic, messages) - await consumer.run({ eachMessage: async () => {}, autoCommit: false }) - setOffsetSpy.resetHistory() - await consumer.commitOffsets([commitMeta]) - await consumer.disconnect() - - // Check our work - const runArg = setOffsetSpy.lastCall.args[0] - expect(setOffsetSpy).to.be.calledOnce - expect(runArg).to.have.property('offset', commitMeta.offset) - expect(runArg).to.have.property('partition', commitMeta.partition) - expect(runArg).to.have.property('topic', commitMeta.topic) - expect(runArg).to.have.property('type', 'kafka_commit') - expect(runArg).to.have.property('consumer_group', 'test-group') - }) - } - - it('Should add backlog on producer response', async () => { - await sendMessages(kafka, testTopic, messages) - expect(setOffsetSpy).to.be.calledOnce - const { topic } = setOffsetSpy.lastCall.args[0] - expect(topic).to.equal(testTopic) - }) - }) - }) }) }) }) diff --git a/packages/dd-trace/test/datastreams/plugins/amqplib.spec.js b/packages/dd-trace/test/datastreams/plugins/amqplib.spec.js new file mode 100644 index 00000000000..91b46f39b78 --- /dev/null +++ b/packages/dd-trace/test/datastreams/plugins/amqplib.spec.js @@ -0,0 +1,284 @@ +'use strict' + +const assert = require('node:assert/strict') +const { Buffer } = require('node:buffer') + +const { afterEach, beforeEach, describe, it } = require('mocha') + +const { computePathwayHash } = require('../../../src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../../src/datastreams/processor') +const id = require('../../../src/id') +const agent = require('../../plugins/agent') +const { withVersions } = require('../../setup/mocha') +const { assertObjectContains } = require('../../../../integration-tests/helpers') + +describe('Plugin', () => { + let tracer + let connection + let channel + let queue + + describe('amqplib', () => { + withVersions('amqplib', 'amqplib', version => { + beforeEach(() => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../../') + queue = `test-${id()}` + }) + + afterEach(() => { + connection.close() + }) + + describe('data stream monitoring', function () { + this.timeout(10000) + + let expectedProducerHashWithTopic + let expectedProducerHashWithExchange + let expectedConsumerHash + + beforeEach(done => { + agent.load('amqplib').then(() => { + require(`../../../../../versions/amqplib@${version}`).get('amqplib/callback_api') + .connect((err, conn) => { + connection = conn + + if (err != null) { + return done(err) + } + + conn.createChannel((err, ch) => { + channel = ch + return done(err) + }) + }) + }) + }) + + afterEach(() => { + return agent.close({ ritmReset: false }) + }) + + beforeEach(() => { + const producerHashWithTopic = computePathwayHash('test', 'tester', [ + 'direction:out', + 'has_routing_key:true', + `topic:${queue}`, + 'type:rabbitmq' + ], ENTRY_PARENT_HASH) + + expectedProducerHashWithTopic = producerHashWithTopic.readBigUInt64LE(0).toString() + + expectedProducerHashWithExchange = computePathwayHash('test', 'tester', [ + 'direction:out', + 'exchange:namedExchange', + 'has_routing_key:true', + 'type:rabbitmq' + ], ENTRY_PARENT_HASH).readBigUInt64LE(0).toString() + + expectedConsumerHash = computePathwayHash('test', 'tester', [ + 'direction:in', + `topic:${queue}`, + 'type:rabbitmq' + ], producerHashWithTopic).readBigUInt64LE(0).toString() + }) + + it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.ok(statsPointsReceived.length >= 1) + assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ + 'direction:out', + 'has_routing_key:true', + `topic:${queue}`, + 'type:rabbitmq' + ]) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + }) + }) + + it('Should emit DSM stats to the agent when sending a message on an named exchange', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.ok(statsPointsReceived.length >= 1) + assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ + 'direction:out', + 'exchange:namedExchange', + 'has_routing_key:true', + 'type:rabbitmq' + ]) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithExchange), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertExchange('namedExchange', 'direct', {}, (err, ok) => { + if (err) return done(err) + + channel.publish('namedExchange', 'anyOldRoutingKey', Buffer.from('DSM pathway test')) + }) + }) + + it('Should emit DSM stats to the agent when receiving a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.strictEqual(statsPointsReceived.length, 2) + assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, + ['direction:in', `topic:${queue}`, 'type:rabbitmq']) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + channel.consume(ok.queue, () => {}, {}, (err, ok) => { + if (err) done(err) + }) + }) + }) + + it('Should emit DSM stats to the agent when sending another message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.strictEqual(statsPointsReceived.length, 1) + assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ + 'direction:out', + 'has_routing_key:true', + `topic:${queue}`, + 'type:rabbitmq' + ]) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + }) + }) + + it('Should emit DSM stats to the agent when receiving a message with get', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.strictEqual(statsPointsReceived.length, 2) + assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, + ['direction:in', `topic:${queue}`, 'type:rabbitmq']) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + channel.get(ok.queue, {}, (err, ok) => { + if (err) done(err) + }) + }) + }) + + it('regression test: should handle basic.get when queue is empty', done => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.get(ok.queue, {}, (err, msg) => { + if (err) return done(err) + assert.strictEqual(msg, false) + done() + }) + }) + }) + + it('Should set pathway hash tag on a span when producing', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('dsm test')) + + let produceSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('basic.publish')) { + produceSpanMeta = span.meta + } + + assertObjectContains(produceSpanMeta, { + 'pathway.hash': expectedProducerHashWithTopic + }) + }, { timeoutMs: 10000 }).then(done, done) + }) + }) + + it('Should set pathway hash tag on a span when consuming', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('dsm test')) + channel.consume(ok.queue, () => {}, {}, (err, ok) => { + if (err) return done(err) + + let consumeSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('basic.deliver')) { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }, { timeoutMs: 10000 }).then(done, done) + }) + }) + }) + }) + }) + }) +}) + diff --git a/packages/dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js b/packages/dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js new file mode 100644 index 00000000000..ca9e4d6064c --- /dev/null +++ b/packages/dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js @@ -0,0 +1,243 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { afterEach, beforeEach, describe, it } = require('mocha') +const sinon = require('sinon') + +const { assertObjectContains } = require('../../../../../integration-tests/helpers') +const { withVersions } = require('../../../setup/mocha') +const agent = require('../../../plugins/agent') +const { setup } = require('../../../../../datadog-plugin-aws-sdk/test/spec_helpers') +const helpers = require('../../../../../datadog-plugin-aws-sdk/test/kinesis_helpers') +const id = require('../../../../src/id') +const { computePathwayHash } = require('../../../../src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../../../src/datastreams/processor') + +describe('Kinesis', function () { + this.timeout(10000) + setup() + + withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { + let AWS + let kinesis + let tracer + + const kinesisClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-kinesis' : 'aws-sdk' + + function createResources (streamName, cb) { + AWS = require(`../../../../../../versions/${kinesisClientName}@${version}`).get() + + const params = { + endpoint: 'http://127.0.0.1:4566', + region: 'us-east-1' + } + + if (moduleName === '@aws-sdk/smithy-client') { + const { NodeHttpHandler } = require(`../../../../../../versions/@aws-sdk/node-http-handler@${version}`).get() + + params.requestHandler = new NodeHttpHandler() + } + + kinesis = new AWS.Kinesis(params) + + kinesis.createStream({ + StreamName: streamName, + ShardCount: 1 + }, (err, res) => { + if (err) return cb(err) + + helpers.waitForActiveStream(kinesis, streamName, cb) + }) + } + + describe('DSM Context Propagation', () => { + let expectedProducerHash + let expectedConsumerHash + let nowStub + let streamNameDSM + + beforeEach(() => { + return agent.load('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + beforeEach(done => { + tracer = require('../../../../') + tracer.use('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + + streamNameDSM = `MyStreamDSM-${id()}` + + const producerHash = computePathwayHash( + 'test', + 'tester', + ['direction:out', 'topic:' + streamNameDSM, 'type:kinesis'], + ENTRY_PARENT_HASH + ) + + expectedProducerHash = producerHash.readBigUInt64LE(0).toString() + expectedConsumerHash = computePathwayHash( + 'test', + 'tester', + ['direction:in', 'topic:' + streamNameDSM, 'type:kinesis'], + producerHash + ).readBigUInt64LE(0).toString() + + createResources(streamNameDSM, done) + }) + + afterEach(done => { + kinesis.deleteStream({ + StreamName: streamNameDSM + }, (err, res) => { + if (err) return done(err) + + helpers.waitForDeletedStream(kinesis, streamNameDSM, done) + }) + }) + + afterEach(() => { + try { + nowStub.restore() + } catch { + // pass + } + agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + it('injects DSM pathway hash during Kinesis getRecord to the span', done => { + let getRecordSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.response') { + getRecordSpanMeta = span.meta + } + + assertObjectContains(getRecordSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + helpers.getTestData(kinesis, streamNameDSM, data, () => {}) + }) + }) + + it('injects DSM pathway hash during Kinesis putRecord to the span', done => { + let putRecordSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('putRecord')) { + putRecordSpanMeta = span.meta + } + + assertObjectContains(putRecordSpanMeta, { + 'pathway.hash': expectedProducerHash + }) + }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) + }) + + it('emits DSM stats to the agent during Kinesis putRecord', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 1 stats point since we only had 1 put operation + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) + }) + + it('emits DSM stats to the agent during Kinesis getRecord', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 1 stats point since we only had 1 put operation + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }, { timeoutMs: 10000 }) + assert.ok(statsPointsReceived >= 2) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + helpers.getTestData(kinesis, streamNameDSM, data, () => {}) + }) + }) + + // eslint-disable-next-line @stylistic/max-len + it('emits DSM stats to the agent during Kinesis getRecord when the putRecord was done without DSM enabled', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 1 stats point since we only had 1 put operation + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }, { timeoutMs: 10000 }) + assert.strictEqual(statsPointsReceived, 1) + assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) + }, { timeoutMs: 10000 }).then(done, done) + + // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. + agent.reload('aws-sdk', { kinesis: { dsmEnabled: false } }, { dsmEnabled: false }) + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. + agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + helpers.getTestData(kinesis, streamNameDSM, data, () => {}) + }) + }) + + it('emits DSM stats to the agent during Kinesis putRecords', done => { + // we need to stub Date.now() to ensure a new stats bucket is created for each call + // otherwise, all stats checkpoints will be combined into a single stats points + let now = Date.now() + nowStub = sinon.stub(Date, 'now') + nowStub.callsFake(() => { + now += 1000000 + return now + }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 3 stats points since we only had 3 records published + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 3) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecords(kinesis, streamNameDSM, (err, data) => { + // Swallow the error as it doesn't matter for this test. + }) + }) + }) + }) +}) + diff --git a/packages/dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js b/packages/dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js new file mode 100644 index 00000000000..6c8bb601807 --- /dev/null +++ b/packages/dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js @@ -0,0 +1,269 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { after, afterEach, before, describe, it } = require('mocha') +const sinon = require('sinon') +const semver = require('semver') + +const { assertObjectContains } = require('../../../../../integration-tests/helpers') +const { withVersions } = require('../../../setup/mocha') +const agent = require('../../../plugins/agent') +const { setup } = require('../../../../../datadog-plugin-aws-sdk/test/spec_helpers') + +describe('Sns', function () { + setup() + this.timeout(20000) + + withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { + let sns + let sqs + let subParams + let receiveParams + let TopicArn + let QueueUrl + let tracer + + const snsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sns' : 'aws-sdk' + const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk' + + function createResources (queueName, topicName, cb) { + const { SNS } = require(`../../../../../../versions/${snsClientName}@${version}`).get() + const { SQS } = require(`../../../../../../versions/${sqsClientName}@${version}`).get() + + sns = new SNS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) + sqs = new SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) + + sns.createTopic({ Name: topicName }, (err, data) => { + if (err) return cb(err) + + TopicArn = data.TopicArn + + sqs.createQueue({ QueueName: queueName }, (err, data) => { + if (err) return cb(err) + + QueueUrl = data.QueueUrl + + sqs.getQueueAttributes({ + QueueUrl, + AttributeNames: ['QueueArn'] + }, (err, data) => { + if (err) return cb(err) + + const QueueArn = data.Attributes.QueueArn + + subParams = { + Protocol: 'sqs', + TopicArn, + Endpoint: QueueArn + } + + receiveParams = { + QueueUrl, + MessageAttributeNames: ['All'] + } + + cb() + }) + }) + }) + } + + describe('Data Streams Monitoring', () => { + const expectedProducerHash = '15386798273908484982' + const expectedConsumerHash = '15162998336469814920' + let nowStub + + before(() => { + return agent.load('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + before(done => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../../../') + tracer.use('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }) + + createResources('TestQueueDSM', 'TestTopicDSM', done) + }) + + after(done => { + sns.deleteTopic({ TopicArn }, done) + }) + + after(done => { + sqs.deleteQueue({ QueueUrl }, done) + }) + + after(() => { + return agent.close({ ritmReset: false, wipe: true }) + }) + + afterEach(() => { + try { + nowStub.restore() + } catch { + // pass + } + // TODO: Fix this. The third argument is not used. + agent.reload('aws-sdk', { sns: { dsmEnabled: true, batchPropagationEnabled: true } }, { dsmEnabled: true }) + }) + + it('injects DSM pathway hash to SNS publish span', done => { + sns.subscribe(subParams, (err, data) => { + if (err) return done(err) + + sns.publish( + { TopicArn, Message: 'message DSM' }, + (err) => { + if (err) return done(err) + + let publishSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('publish')) { + publishSpanMeta = span.meta + } + + assertObjectContains(publishSpanMeta, { + 'pathway.hash': expectedProducerHash + }) + }).then(done, done) + }) + }) + }) + + it('injects DSM pathway hash to SQS receive span from SNS topic', done => { + sns.subscribe(subParams, (err, data) => { + if (err) return done(err) + + sns.publish( + { TopicArn, Message: 'message DSM' }, + (err) => { + if (err) return done(err) + }) + + sqs.receiveMessage( + receiveParams, + (err, res) => { + if (err) return done(err) + + let consumeSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.response') { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }).then(done, done) + }) + }) + }) + + it('outputs DSM stats to the agent when publishing a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }).then(done, done) + + sns.subscribe(subParams, () => { + sns.publish({ TopicArn, Message: 'message DSM' }, () => {}) + }) + }) + + it('outputs DSM stats to the agent when consuming a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 2) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }).then(done, done) + + sns.subscribe(subParams, () => { + sns.publish({ TopicArn, Message: 'message DSM' }, () => { + sqs.receiveMessage(receiveParams, () => {}) + }) + }) + }) + + it('outputs DSM stats to the agent when publishing batch messages', function (done) { + // publishBatch was released with version 2.1031.0 for the aws-sdk + // publishBatch does not work with smithy-client 3.0.0, unable to find compatible version it + // was released for, but works on 3.374.0 + if ( + (moduleName === '@aws-sdk/smithy-client' && semver.intersects(version, '>=3.374.0')) || + (moduleName === 'aws-sdk' && semver.intersects(version, '>=2.1031.0')) + ) { + // we need to stub Date.now() to ensure a new stats bucket is created for each call + // otherwise, all stats checkpoints will be combined into a single stats points + let now = Date.now() + nowStub = sinon.stub(Date, 'now') + nowStub.callsFake(() => { + now += 1000000 + return now + }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 3 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 3) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }, { timeoutMs: 2000 }).then(done, done) + + sns.subscribe(subParams, () => { + sns.publishBatch( + { + TopicArn, + PublishBatchRequestEntries: [ + { + Id: '1', + Message: 'message DSM 1' + }, + { + Id: '2', + Message: 'message DSM 2' + }, + { + Id: '3', + Message: 'message DSM 3' + } + ] + }, () => { + nowStub.restore() + }) + }) + } else { + this.skip() + } + }) + }) + }) +}) + diff --git a/packages/dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js b/packages/dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js new file mode 100644 index 00000000000..c3baef4c76f --- /dev/null +++ b/packages/dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js @@ -0,0 +1,308 @@ +'use strict' + +const assert = require('node:assert/strict') +const { randomUUID } = require('node:crypto') + +const { after, afterEach, before, beforeEach, describe, it } = require('mocha') +const semver = require('semver') +const sinon = require('sinon') + +const { computePathwayHash } = require('../../../../src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../../../src/datastreams/processor') +const agent = require('../../../plugins/agent') +const { withVersions } = require('../../../setup/mocha') +const { setup } = require('../../../../../datadog-plugin-aws-sdk/test/spec_helpers') +const { assertObjectContains } = require('../../../../../integration-tests/helpers') + +const getQueueParams = (queueName) => { + return { + QueueName: queueName, + Attributes: { + MessageRetentionPeriod: '86400' + } + } +} + +describe('Plugin', () => { + describe('aws-sdk (sqs)', function () { + this.timeout(10000) + setup() + + withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { + let AWS + let sqs + let queueNameDSM + let queueNameDSMConsumerOnly + let queueOptionsDsm + let queueOptionsDsmConsumerOnly + let QueueUrlDsm + let QueueUrlDsmConsumerOnly + let tracer + + const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk' + + beforeEach(() => { + const id = randomUUID() + queueNameDSM = `TestQueueDSM-${id}` + queueNameDSMConsumerOnly = `TestQueueDSMConsumerOnly-${id}` + queueOptionsDsm = getQueueParams(queueNameDSM) + queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly) + QueueUrlDsm = `http://127.0.0.1:4566/000000000000/${queueNameDSM}` + QueueUrlDsmConsumerOnly = `http://127.0.0.1:4566/000000000000/${queueNameDSMConsumerOnly}` + }) + + describe('data stream monitoring', () => { + let expectedProducerHash + let expectedConsumerHash + let nowStub + + before(() => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../../../') + tracer.use('aws-sdk', { sqs: { dsmEnabled: true } }) + }) + + before(async () => { + return agent.load('aws-sdk', { + sqs: { + dsmEnabled: true + } + }, + { dsmEnabled: true }) + }) + + before(() => { + AWS = require(`../../../../../../versions/${sqsClientName}@${version}`).get() + sqs = new AWS.SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) + }) + + beforeEach(() => { + const producerHash = computePathwayHash( + 'test', + 'tester', + ['direction:out', 'topic:' + queueNameDSM, 'type:sqs'], + ENTRY_PARENT_HASH + ) + + expectedProducerHash = producerHash.readBigUInt64LE(0).toString() + expectedConsumerHash = computePathwayHash( + 'test', + 'tester', + ['direction:in', 'topic:' + queueNameDSM, 'type:sqs'], + producerHash + ).readBigUInt64LE(0).toString() + }) + + beforeEach(done => { + sqs.createQueue(queueOptionsDsm, (err, res) => err ? done(err) : done()) + }) + + beforeEach(done => { + sqs.createQueue(queueOptionsDsmConsumerOnly, (err, res) => err ? done(err) : done()) + }) + + afterEach(done => { + sqs.deleteQueue({ QueueUrl: QueueUrlDsm }, done) + }) + + afterEach(done => { + sqs.deleteQueue({ QueueUrl: QueueUrlDsmConsumerOnly }, done) + }) + + after(() => { + return agent.close({ ritmReset: false }) + }) + + afterEach(() => { + try { + nowStub.restore() + } catch { + // pass + } + agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + it('Should set pathway hash tag on a span when producing', (done) => { + sqs.sendMessage({ + MessageBody: 'test DSM', + QueueUrl: QueueUrlDsm + }, (err) => { + if (err) return done(err) + + let produceSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('sendMessage')) { + produceSpanMeta = span.meta + } + + assertObjectContains(produceSpanMeta, { + 'pathway.hash': expectedProducerHash + }) + }).then(done, done) + }) + }) + + it('Should set pathway hash tag on a span when consuming', (done) => { + sqs.sendMessage({ + MessageBody: 'test DSM', + QueueUrl: QueueUrlDsm + }, (err) => { + if (err) return done(err) + + sqs.receiveMessage({ + QueueUrl: QueueUrlDsm, + MessageAttributeNames: ['.*'] + }, (err) => { + if (err) return done(err) + + let consumeSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.response') { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }).then(done, done) + }) + }) + }) + + if (sqsClientName === 'aws-sdk' && semver.intersects(version, '>=2.3')) { + it('Should set pathway hash tag on a span when consuming and promise() was used over a callback', + async () => { + let consumeSpanMeta = {} + const tracePromise = agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.request' && span.meta['aws.operation'] === 'receiveMessage') { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }) + + await sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }).promise() + await sqs.receiveMessage({ QueueUrl: QueueUrlDsm }).promise() + + return tracePromise + }) + } + + it('Should emit DSM stats to the agent when sending a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }).then(done, done) + + sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => {}) + }) + + it('Should emit DSM stats to the agent when receiving a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 2) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 5000 }).then(done, done) + + sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => { + sqs.receiveMessage({ QueueUrl: QueueUrlDsm, MessageAttributeNames: ['.*'] }, () => {}) + }) + }) + + it('Should emit DSM stats when receiving a message when the producer was not instrumented', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.strictEqual(statsPointsReceived, 1) + assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) + }).then(done, done) + + agent.reload('aws-sdk', { sqs: { dsmEnabled: false } }, { dsmEnabled: false }) + sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsmConsumerOnly }, () => { + agent.reload('aws-sdk', { sqs: { dsmEnabled: true } }, { dsmEnabled: true }) + sqs.receiveMessage({ QueueUrl: QueueUrlDsmConsumerOnly, MessageAttributeNames: ['.*'] }, () => {}) + }) + }) + + it('Should emit DSM stats to the agent when sending batch messages', done => { + // we need to stub Date.now() to ensure a new stats bucket is created for each call + // otherwise, all stats checkpoints will be combined into a single stats points + let now = Date.now() + nowStub = sinon.stub(Date, 'now') + nowStub.callsFake(() => { + now += 1000000 + return now + }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 3 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 3) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }).then(done, done) + + sqs.sendMessageBatch( + { + Entries: [ + { + Id: '1', + MessageBody: 'test DSM 1' + }, + { + Id: '2', + MessageBody: 'test DSM 2' + }, + { + Id: '3', + MessageBody: 'test DSM 3' + } + ], + QueueUrl: QueueUrlDsm + }, () => { + nowStub.restore() + }) + }) + }) + }) + }) +}) + diff --git a/packages/dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js b/packages/dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js new file mode 100644 index 00000000000..b3cf3dbbf22 --- /dev/null +++ b/packages/dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js @@ -0,0 +1,325 @@ +'use strict' + +const { expect } = require('chai') +const { describe, it, beforeEach, afterEach } = require('mocha') +const sinon = require('sinon') + +const { randomUUID } = require('node:crypto') + +const agent = require('../../plugins/agent') +const { withVersions } = require('../../setup/mocha') + +const DataStreamsContext = require('../../../src/datastreams/context') +const { computePathwayHash } = require('../../../src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../../src/datastreams/processor') + +const getDsmPathwayHash = (testTopic, isProducer, parentHash) => { + let edgeTags + if (isProducer) { + edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] + } else { + edgeTags = ['direction:in', 'group:test-group-confluent', 'topic:' + testTopic, 'type:kafka'] + } + + edgeTags.sort() + return computePathwayHash('test', 'tester', edgeTags, parentHash) +} + +describe('Plugin', () => { + const module = '@confluentinc/kafka-javascript' + const groupId = 'test-group-confluent' + + describe('confluentinc-kafka-javascript', function () { + this.timeout(30000) + + afterEach(() => { + return agent.close({ ritmReset: false }) + }) + + withVersions('confluentinc-kafka-javascript', module, (version) => { + let kafka + let admin + let tracer + let Kafka + let ConfluentKafka + let messages + let nativeApi + let testTopic + + describe('data stream monitoring', () => { + let consumer + let expectedProducerHash + let expectedConsumerHash + + beforeEach(async () => { + messages = [{ key: 'key1', value: 'test2' }] + + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../../') + await agent.load('confluentinc-kafka-javascript') + const lib = require(`../../../../../versions/${module}@${version}`).get() + + // Store the module for later use + nativeApi = lib + + // Setup for the KafkaJS wrapper tests + Kafka = lib.Kafka + ConfluentKafka = lib.ConfluentKafka + kafka = new Kafka(ConfluentKafka, { + clientId: `confluent-kafka-test-${version}`, + 'bootstrap.servers': '127.0.0.1:9092' + }) + testTopic = `test-topic-${randomUUID()}` + admin = kafka.admin() + await admin.connect() + await admin.createTopics({ + topics: [{ + topic: testTopic, + numPartitions: 1, + replicationFactor: 1 + }] + }) + await admin.disconnect() + + tracer.init() + tracer.use('confluentinc-kafka-javascript', { dsmEnabled: true }) + messages = [{ key: 'key1', value: 'test2' }] + consumer = kafka.consumer({ + kafkaJS: { groupId, fromBeginning: true } + }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + }) + + beforeEach(() => { + expectedProducerHash = getDsmPathwayHash(testTopic, true, ENTRY_PARENT_HASH) + expectedConsumerHash = getDsmPathwayHash(testTopic, false, expectedProducerHash) + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + describe('checkpoints', () => { + let setDataStreamsContextSpy + + beforeEach(() => { + setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + }) + + afterEach(async () => { + setDataStreamsContextSpy.restore() + }) + + it('Should set a checkpoint on produce', async () => { + const messages = [{ key: 'consumerDSM1', value: 'test2' }] + await sendMessages(kafka, testTopic, messages) + expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) + }) + + it('Should set a checkpoint on consume (eachMessage)', async () => { + const runArgs = [] + let consumerReceiveMessagePromise + await consumer.run({ + eachMessage: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + consumerReceiveMessagePromise = Promise.resolve() + } + }) + await sendMessages(kafka, testTopic, messages).then( + async () => await consumerReceiveMessagePromise + ) + + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a checkpoint on consume (eachBatch)', async () => { + const runArgs = [] + let consumerReceiveMessagePromise + await consumer.run({ + eachBatch: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + consumerReceiveMessagePromise = Promise.resolve() + } + }) + await sendMessages(kafka, testTopic, messages).then( + async () => await consumerReceiveMessagePromise + ) + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a message payload size when producing a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + await sendMessages(kafka, testTopic, messages) + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + }) + + it('Should set a message payload size when consuming a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + let consumerReceiveMessagePromise + await consumer.run({ + eachMessage: async () => { + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + consumerReceiveMessagePromise = Promise.resolve() + } + }) + await sendMessages(kafka, testTopic, messages).then( + async () => await consumerReceiveMessagePromise + ) + }) + }) + + describe('backlogs', () => { + let setOffsetSpy + + beforeEach(() => { + setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') + }) + + afterEach(() => { + setOffsetSpy.restore() + }) + + it('Should add backlog on consumer explicit commit', async () => { + // Send a message, consume it, and record the last consumed offset + let commitMeta + + let messageProcessedResolve + const messageProcessedPromise = new Promise(resolve => { + messageProcessedResolve = resolve + }) + + const consumerRunPromise = consumer.run({ + eachMessage: async payload => { + const { topic, partition, message } = payload + commitMeta = { + topic, + partition, + offset: Number(message.offset) + } + messageProcessedResolve() + } + }) + + await consumerRunPromise + + // wait for the message to be processed before continuing + await sendMessages(kafka, testTopic, messages) + await messageProcessedPromise + await consumer.disconnect() + + for (const call of setOffsetSpy.getCalls()) { + expect(call.args[0]).to.not.have.property('type', 'kafka_commit') + } + + const newConsumer = kafka.consumer({ + kafkaJS: { groupId, fromBeginning: true, autoCommit: false } + }) + await newConsumer.connect() + await sendMessages(kafka, testTopic, [{ key: 'key1', value: 'test2' }]) + await newConsumer.run({ + eachMessage: async () => { + await newConsumer.disconnect() + } + }) + setOffsetSpy.resetHistory() + await newConsumer.commitOffsets() + + // Check our work + const runArg = setOffsetSpy.lastCall.args[0] + expect(runArg).to.have.property('offset', commitMeta.offset) + expect(runArg).to.have.property('partition', commitMeta.partition) + expect(runArg).to.have.property('topic', commitMeta.topic) + expect(runArg).to.have.property('type', 'kafka_commit') + expect(runArg).to.have.property('consumer_group', groupId) + }) + + it('Should add backlog on producer response', async () => { + await sendMessages(kafka, testTopic, messages) + expect(setOffsetSpy).to.be.calledOnce + const { topic } = setOffsetSpy.lastCall.args[0] + expect(topic).to.equal(testTopic) + }) + }) + + describe('when using a kafka broker version that does not support message headers', () => { + class KafkaJSError extends Error { + constructor (message) { + super(message) + this.name = 'KafkaJSError' + this.type = 'ERR_UNKNOWN' + } + } + let error + let producer + let produceStub + + beforeEach(async () => { + // simulate a kafka error for the broker version not supporting message headers + error = new KafkaJSError() + error.message = 'Simulated KafkaJSError ERR_UNKNOWN from Producer.produce stub' + producer = kafka.producer() + await producer.connect() + + // Spy on the produce method from the native library before it gets wrapped + produceStub = sinon.stub(nativeApi.Producer.prototype, 'produce') + .callsFake((topic, partition, message, key) => { + throw error + }) + }) + + afterEach(async () => { + produceStub.restore() + await producer.disconnect() + }) + + it('should hit an error for the first send and not inject headers in later sends', async () => { + const testMessages = [{ key: 'key1', value: 'test1' }] + const testMessages2 = [{ key: 'key2', value: 'test2' }] + + try { + await producer.send({ topic: testTopic, messages: testMessages }) + expect.fail('First producer.send() should have thrown an error') + } catch (e) { + expect(e).to.equal(error) + } + // Verify headers were injected in the first attempt + expect(testMessages[0].headers[0]).to.have.property('x-datadog-trace-id') + + // restore the stub to allow the next send to succeed + produceStub.restore() + + const result = await producer.send({ topic: testTopic, messages: testMessages2 }) + expect(testMessages2[0].headers).to.be.null + expect(result).to.not.be.undefined + }) + }) + }) + }) + }) +}) + +async function sendMessages (kafka, topic, messages) { + const producer = kafka.producer() + await producer.connect() + await producer.send({ + topic, + messages + }) + await producer.disconnect() +} + diff --git a/packages/dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js b/packages/dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js new file mode 100644 index 00000000000..83fadfb02bd --- /dev/null +++ b/packages/dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js @@ -0,0 +1,166 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { expect } = require('chai') +const { after, before, beforeEach, describe, it } = require('mocha') +const sinon = require('sinon') + +const { computePathwayHash } = require('../../../src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../../src/datastreams/processor') +const id = require('../../../src/id') +const agent = require('../../plugins/agent') +const { withVersions } = require('../../setup/mocha') + +const TIMEOUT = 30000 +const dsmTopicName = 'dsm-topic' + +describe('Plugin', () => { + let tracer + + describe('google-cloud-pubsub', function () { + this.timeout(TIMEOUT) + + before(() => { + process.env.PUBSUB_EMULATOR_HOST = 'localhost:8081' + process.env.DD_DATA_STREAMS_ENABLED = 'true' + }) + + after(() => { + delete process.env.PUBSUB_EMULATOR_HOST + }) + + after(() => { + return agent.close({ ritmReset: false }) + }) + + withVersions('google-cloud-pubsub', '@google-cloud/pubsub', version => { + let pubsub + let project + let resource + let expectedProducerHash + let expectedConsumerHash + + describe('data stream monitoring', () => { + let dsmTopic + let sub + let consume + + beforeEach(() => { + return agent.load('google-cloud-pubsub', { + dsmEnabled: true + }) + }) + + before(async () => { + const { PubSub } = require(`../../../../../versions/@google-cloud/pubsub@${version}`).get() + tracer = require('../../../') + project = getProjectId() + resource = `projects/${project}/topics/${dsmTopicName}` + pubsub = new PubSub({ projectId: project }) + tracer.use('google-cloud-pubsub', { dsmEnabled: true }) + + dsmTopic = await pubsub.createTopic(dsmTopicName) + dsmTopic = dsmTopic[0] + sub = await dsmTopic.createSubscription('DSM') + sub = sub[0] + consume = function (cb) { + sub.on('message', cb) + } + + const dsmFullTopic = `projects/${project}/topics/${dsmTopicName}` + + expectedProducerHash = computePathwayHash( + 'test', + 'tester', + ['direction:out', 'topic:' + dsmFullTopic, 'type:google-pubsub'], + ENTRY_PARENT_HASH + ) + expectedConsumerHash = computePathwayHash( + 'test', + 'tester', + ['direction:in', 'topic:' + dsmFullTopic, 'type:google-pubsub'], + expectedProducerHash + ) + }) + + describe('should set a DSM checkpoint', () => { + it('on produce', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true) + }, { timeoutMs: TIMEOUT }) + }) + + it('on consume', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM consume checkpoint') }) + await consume(async () => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 2) + expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true) + }, { timeoutMs: TIMEOUT }) + }) + }) + }) + + describe('it should set a message payload size', () => { + let recordCheckpointSpy + + beforeEach(() => { + recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + }) + + afterEach(() => { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + }) + + it('when producing a message', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') }) + assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + }) + + it('when consuming a message', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') }) + + await consume(async () => { + assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + }) + }) + }) + }) + }) + }) +}) + +function getProjectId () { + return `test-project-${id()}` +} + +function publish (topic, options) { + if (topic.publishMessage) { + return topic.publishMessage(options) + } else { + return topic.publish(options.data) + } +} + diff --git a/packages/dd-trace/test/datastreams/plugins/kafkajs.spec.js b/packages/dd-trace/test/datastreams/plugins/kafkajs.spec.js new file mode 100644 index 00000000000..e54f632555b --- /dev/null +++ b/packages/dd-trace/test/datastreams/plugins/kafkajs.spec.js @@ -0,0 +1,247 @@ +'use strict' + +const { randomUUID } = require('crypto') +const { expect } = require('chai') +const { describe, it, beforeEach, afterEach, before } = require('mocha') +const semver = require('semver') +const sinon = require('sinon') + +const { withVersions } = require('../../setup/mocha') +const agent = require('../../plugins/agent') + +const DataStreamsContext = require('../../../src/datastreams/context') +const { computePathwayHash } = require('../../../src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../../src/datastreams/processor') + +const testKafkaClusterId = '5L6g3nShT-eMCtK--X86sw' + +const getDsmPathwayHash = (testTopic, clusterIdAvailable, isProducer, parentHash) => { + let edgeTags + if (isProducer) { + edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] + } else { + edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'] + } + + if (clusterIdAvailable) { + edgeTags.push(`kafka_cluster_id:${testKafkaClusterId}`) + } + edgeTags.sort() + return computePathwayHash('test', 'tester', edgeTags, parentHash) +} + +describe('Plugin', () => { + describe('kafkajs', function () { + this.timeout(10000) + + afterEach(() => { + return agent.close({ ritmReset: false }) + }) + + withVersions('kafkajs', 'kafkajs', (version) => { + let kafka + let admin + let tracer + let Kafka + let clusterIdAvailable + let expectedProducerHash + let expectedConsumerHash + let testTopic + + describe('data stream monitoring', () => { + const messages = [{ key: 'key1', value: 'test2' }] + + beforeEach(async () => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../../') + await agent.load('kafkajs') + const lib = require(`../../../../../versions/kafkajs@${version}`).get() + Kafka = lib.Kafka + kafka = new Kafka({ + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: lib.logLevel.WARN + }) + testTopic = `test-topic-${randomUUID()}` + admin = kafka.admin() + await admin.createTopics({ + topics: [{ + topic: testTopic, + numPartitions: 1, + replicationFactor: 1 + }] + }) + clusterIdAvailable = semver.intersects(version, '>=1.13') + expectedProducerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, true, ENTRY_PARENT_HASH) + expectedConsumerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, false, expectedProducerHash) + }) + + describe('checkpoints', () => { + let consumer + let setDataStreamsContextSpy + + beforeEach(async () => { + tracer.init() + tracer.use('kafkajs', { dsmEnabled: true }) + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + }) + + afterEach(async () => { + setDataStreamsContextSpy.restore() + await consumer.disconnect() + }) + + it('Should set a checkpoint on produce', async () => { + const messages = [{ key: 'consumerDSM1', value: 'test2' }] + await sendMessages(kafka, testTopic, messages) + expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) + }) + + it('Should set a checkpoint on consume (eachMessage)', async () => { + const runArgs = [] + await consumer.run({ + eachMessage: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + } + }) + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a checkpoint on consume (eachBatch)', async () => { + const runArgs = [] + await consumer.run({ + eachBatch: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + } + }) + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a message payload size when producing a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + await sendMessages(kafka, testTopic, messages) + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + }) + + it('Should set a message payload size when consuming a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + await sendMessages(kafka, testTopic, messages) + await consumer.run({ + eachMessage: async () => { + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + } + }) + }) + }) + + describe('backlogs', () => { + let consumer + let setOffsetSpy + + beforeEach(async () => { + tracer.init() + tracer.use('kafkajs', { dsmEnabled: true }) + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') + }) + + afterEach(async () => { + setOffsetSpy.restore() + await consumer.disconnect() + }) + + if (semver.intersects(version, '>=1.10')) { + it('Should add backlog on consumer explicit commit', async () => { + // Send a message, consume it, and record the last consumed offset + let commitMeta + const deferred = {} + deferred.promise = new Promise((resolve, reject) => { + deferred.resolve = resolve + deferred.reject = reject + }) + await consumer.run({ + eachMessage: async payload => { + const { topic, partition, message } = payload + commitMeta = { + topic, + partition, + offset: Number(message.offset) + } + deferred.resolve() + }, + autoCommit: false + }) + await sendMessages(kafka, testTopic, messages) + await deferred.promise + await consumer.disconnect() // Flush ongoing `eachMessage` calls + for (const call of setOffsetSpy.getCalls()) { + expect(call.args[0]).to.not.have.property('type', 'kafka_commit') + } + + /** + * No choice but to reinitialize everything, because the only way to flush eachMessage + * calls is to disconnect. + */ + consumer.connect() + await sendMessages(kafka, testTopic, messages) + await consumer.run({ eachMessage: async () => {}, autoCommit: false }) + setOffsetSpy.resetHistory() + await consumer.commitOffsets([commitMeta]) + await consumer.disconnect() + + // Check our work + const runArg = setOffsetSpy.lastCall.args[0] + expect(setOffsetSpy).to.be.calledOnce + expect(runArg).to.have.property('offset', commitMeta.offset) + expect(runArg).to.have.property('partition', commitMeta.partition) + expect(runArg).to.have.property('topic', commitMeta.topic) + expect(runArg).to.have.property('type', 'kafka_commit') + expect(runArg).to.have.property('consumer_group', 'test-group') + }) + } + + it('Should add backlog on producer response', async () => { + await sendMessages(kafka, testTopic, messages) + expect(setOffsetSpy).to.be.calledOnce + const { topic } = setOffsetSpy.lastCall.args[0] + expect(topic).to.equal(testTopic) + }) + }) + }) + }) + }) +}) + +async function sendMessages (kafka, topic, messages) { + const producer = kafka.producer() + await producer.connect() + await producer.send({ + topic, + messages + }) + await producer.disconnect() +} + From 18620c066221de8cf5b8a8bf427762d1dd347fd9 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 10 Dec 2025 13:43:01 -0500 Subject: [PATCH 02/11] remove avro & protobufjs from codeowners (temp) --- CODEOWNERS | 2 -- 1 file changed, 2 deletions(-) diff --git a/CODEOWNERS b/CODEOWNERS index 4145d83846e..8e74d7d85df 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -19,8 +19,6 @@ /packages/dd-trace/src/datastreams/ @DataDog/data-stream-monitoring /packages/dd-trace/test/datastreams/ @DataDog/data-stream-monitoring -/packages/datadog-plugin-avsc/ @DataDog/data-stream-monitoring -/packages/datadog-plugin-protobufjs/ @DataDog/data-stream-monitoring /packages/datadog-plugin-*/ @Datadog/dd-trace-js @Datadog/apm-idm-js /packages/datadog-instrumentations/ @Datadog/dd-trace-js @Datadog/apm-idm-js From 780737757f7d752f295ae27caa50ad7af03e5667 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 10 Dec 2025 14:17:00 -0500 Subject: [PATCH 03/11] move tests to dsm.spec.js --- CODEOWNERS | 9 +++++--- .../test/dsm.spec.js} | 16 +++++++------- .../test/kinesis.dsm.spec.js} | 22 +++++++++---------- .../test/sns.dsm.spec.js} | 14 ++++++------ .../test/sqs.dsm.spec.js} | 16 +++++++------- .../test/dsm.spec.js} | 14 ++++++------ .../test/dsm.spec.js} | 14 ++++++------ .../test/dsm.spec.js} | 14 ++++++------ 8 files changed, 61 insertions(+), 58 deletions(-) rename packages/{dd-trace/test/datastreams/plugins/amqplib.spec.js => datadog-plugin-amqplib/test/dsm.spec.js} (94%) rename packages/{dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js => datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js} (90%) rename packages/{dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js => datadog-plugin-aws-sdk/test/sns.dsm.spec.js} (94%) rename packages/{dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js => datadog-plugin-aws-sdk/test/sqs.dsm.spec.js} (94%) rename packages/{dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js => datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js} (96%) rename packages/{dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js => datadog-plugin-google-cloud-pubsub/test/dsm.spec.js} (92%) rename packages/{dd-trace/test/datastreams/plugins/kafkajs.spec.js => datadog-plugin-kafkajs/test/dsm.spec.js} (95%) diff --git a/CODEOWNERS b/CODEOWNERS index 8e74d7d85df..d7c3d9d8286 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -17,9 +17,6 @@ /packages/dd-trace/src/lambda/ @DataDog/serverless-aws /packages/dd-trace/test/lambda/ @DataDog/serverless-aws -/packages/dd-trace/src/datastreams/ @DataDog/data-stream-monitoring -/packages/dd-trace/test/datastreams/ @DataDog/data-stream-monitoring - /packages/datadog-plugin-*/ @Datadog/dd-trace-js @Datadog/apm-idm-js /packages/datadog-instrumentations/ @Datadog/dd-trace-js @Datadog/apm-idm-js /packages/ddtrace/src/plugins/ @DataDog/dd-trace-js @Datadog/apm-idm-js @@ -80,6 +77,12 @@ /packages/dd-trace/src/llmobs/ @DataDog/ml-observability /packages/dd-trace/test/llmobs/ @DataDog/ml-observability +# Data Streams Monitoring +/packages/dd-trace/src/datastreams/ @DataDog/data-stream-monitoring +/packages/dd-trace/test/datastreams/ @DataDog/data-stream-monitoring +/packages/**/dsm.spec.js @DataDog/data-stream-monitoring +/packages/**/*.dsm.spec.js @DataDog/data-stream-monitoring + # API SDK /packages/dd-trace/src/telemetry/ @DataDog/apm-sdk-capabilities-js /packages/dd-trace/test/telemetry/ @DataDog/apm-sdk-capabilities-js diff --git a/packages/dd-trace/test/datastreams/plugins/amqplib.spec.js b/packages/datadog-plugin-amqplib/test/dsm.spec.js similarity index 94% rename from packages/dd-trace/test/datastreams/plugins/amqplib.spec.js rename to packages/datadog-plugin-amqplib/test/dsm.spec.js index 91b46f39b78..6b7f5dca54a 100644 --- a/packages/dd-trace/test/datastreams/plugins/amqplib.spec.js +++ b/packages/datadog-plugin-amqplib/test/dsm.spec.js @@ -5,12 +5,12 @@ const { Buffer } = require('node:buffer') const { afterEach, beforeEach, describe, it } = require('mocha') -const { computePathwayHash } = require('../../../src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../../src/datastreams/processor') -const id = require('../../../src/id') -const agent = require('../../plugins/agent') -const { withVersions } = require('../../setup/mocha') -const { assertObjectContains } = require('../../../../integration-tests/helpers') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') +const id = require('../../dd-trace/src/id') +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const { assertObjectContains } = require('../../../integration-tests/helpers') describe('Plugin', () => { let tracer @@ -22,7 +22,7 @@ describe('Plugin', () => { withVersions('amqplib', 'amqplib', version => { beforeEach(() => { process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../../') + tracer = require('../../dd-trace') queue = `test-${id()}` }) @@ -39,7 +39,7 @@ describe('Plugin', () => { beforeEach(done => { agent.load('amqplib').then(() => { - require(`../../../../../versions/amqplib@${version}`).get('amqplib/callback_api') + require(`../../../versions/amqplib@${version}`).get('amqplib/callback_api') .connect((err, conn) => { connection = conn diff --git a/packages/dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js similarity index 90% rename from packages/dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js rename to packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js index ca9e4d6064c..6e234f58171 100644 --- a/packages/dd-trace/test/datastreams/plugins/aws-sdk/kinesis.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js @@ -5,14 +5,14 @@ const assert = require('node:assert/strict') const { afterEach, beforeEach, describe, it } = require('mocha') const sinon = require('sinon') -const { assertObjectContains } = require('../../../../../integration-tests/helpers') -const { withVersions } = require('../../../setup/mocha') -const agent = require('../../../plugins/agent') -const { setup } = require('../../../../../datadog-plugin-aws-sdk/test/spec_helpers') -const helpers = require('../../../../../datadog-plugin-aws-sdk/test/kinesis_helpers') -const id = require('../../../../src/id') -const { computePathwayHash } = require('../../../../src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../../../src/datastreams/processor') +const { assertObjectContains } = require('../../../integration-tests/helpers') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const agent = require('../../dd-trace/test/plugins/agent') +const { setup } = require('./spec_helpers') +const helpers = require('./kinesis_helpers') +const id = require('../../dd-trace/src/id') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') describe('Kinesis', function () { this.timeout(10000) @@ -26,7 +26,7 @@ describe('Kinesis', function () { const kinesisClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-kinesis' : 'aws-sdk' function createResources (streamName, cb) { - AWS = require(`../../../../../../versions/${kinesisClientName}@${version}`).get() + AWS = require(`../../../versions/${kinesisClientName}@${version}`).get() const params = { endpoint: 'http://127.0.0.1:4566', @@ -34,7 +34,7 @@ describe('Kinesis', function () { } if (moduleName === '@aws-sdk/smithy-client') { - const { NodeHttpHandler } = require(`../../../../../../versions/@aws-sdk/node-http-handler@${version}`).get() + const { NodeHttpHandler } = require(`../../../versions/@aws-sdk/node-http-handler@${version}`).get() params.requestHandler = new NodeHttpHandler() } @@ -62,7 +62,7 @@ describe('Kinesis', function () { }) beforeEach(done => { - tracer = require('../../../../') + tracer = require('../../dd-trace') tracer.use('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) streamNameDSM = `MyStreamDSM-${id()}` diff --git a/packages/dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js similarity index 94% rename from packages/dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js rename to packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js index 6c8bb601807..dd4d643b96e 100644 --- a/packages/dd-trace/test/datastreams/plugins/aws-sdk/sns.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js @@ -6,10 +6,10 @@ const { after, afterEach, before, describe, it } = require('mocha') const sinon = require('sinon') const semver = require('semver') -const { assertObjectContains } = require('../../../../../integration-tests/helpers') -const { withVersions } = require('../../../setup/mocha') -const agent = require('../../../plugins/agent') -const { setup } = require('../../../../../datadog-plugin-aws-sdk/test/spec_helpers') +const { assertObjectContains } = require('../../../integration-tests/helpers') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const agent = require('../../dd-trace/test/plugins/agent') +const { setup } = require('./spec_helpers') describe('Sns', function () { setup() @@ -28,8 +28,8 @@ describe('Sns', function () { const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk' function createResources (queueName, topicName, cb) { - const { SNS } = require(`../../../../../../versions/${snsClientName}@${version}`).get() - const { SQS } = require(`../../../../../../versions/${sqsClientName}@${version}`).get() + const { SNS } = require(`../../../versions/${snsClientName}@${version}`).get() + const { SQS } = require(`../../../versions/${sqsClientName}@${version}`).get() sns = new SNS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) sqs = new SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) @@ -80,7 +80,7 @@ describe('Sns', function () { before(done => { process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../../../') + tracer = require('../../dd-trace') tracer.use('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }) createResources('TestQueueDSM', 'TestTopicDSM', done) diff --git a/packages/dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js similarity index 94% rename from packages/dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js rename to packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index c3baef4c76f..453ef375cad 100644 --- a/packages/dd-trace/test/datastreams/plugins/aws-sdk/sqs.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -7,12 +7,12 @@ const { after, afterEach, before, beforeEach, describe, it } = require('mocha') const semver = require('semver') const sinon = require('sinon') -const { computePathwayHash } = require('../../../../src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../../../src/datastreams/processor') -const agent = require('../../../plugins/agent') -const { withVersions } = require('../../../setup/mocha') -const { setup } = require('../../../../../datadog-plugin-aws-sdk/test/spec_helpers') -const { assertObjectContains } = require('../../../../../integration-tests/helpers') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const { setup } = require('./spec_helpers') +const { assertObjectContains } = require('../../../integration-tests/helpers') const getQueueParams = (queueName) => { return { @@ -58,7 +58,7 @@ describe('Plugin', () => { before(() => { process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../../../') + tracer = require('../../dd-trace') tracer.use('aws-sdk', { sqs: { dsmEnabled: true } }) }) @@ -72,7 +72,7 @@ describe('Plugin', () => { }) before(() => { - AWS = require(`../../../../../../versions/${sqsClientName}@${version}`).get() + AWS = require(`../../../versions/${sqsClientName}@${version}`).get() sqs = new AWS.SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) }) diff --git a/packages/dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js similarity index 96% rename from packages/dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js rename to packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js index b3cf3dbbf22..64325c02cec 100644 --- a/packages/dd-trace/test/datastreams/plugins/confluentinc-kafka-javascript.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js @@ -6,12 +6,12 @@ const sinon = require('sinon') const { randomUUID } = require('node:crypto') -const agent = require('../../plugins/agent') -const { withVersions } = require('../../setup/mocha') +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') -const DataStreamsContext = require('../../../src/datastreams/context') -const { computePathwayHash } = require('../../../src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../../src/datastreams/processor') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const getDsmPathwayHash = (testTopic, isProducer, parentHash) => { let edgeTags @@ -55,9 +55,9 @@ describe('Plugin', () => { messages = [{ key: 'key1', value: 'test2' }] process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../../') + tracer = require('../../dd-trace') await agent.load('confluentinc-kafka-javascript') - const lib = require(`../../../../../versions/${module}@${version}`).get() + const lib = require(`../../../versions/${module}@${version}`).get() // Store the module for later use nativeApi = lib diff --git a/packages/dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js similarity index 92% rename from packages/dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js rename to packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js index 83fadfb02bd..abd8e8248f6 100644 --- a/packages/dd-trace/test/datastreams/plugins/google-cloud-pubsub.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js @@ -6,11 +6,11 @@ const { expect } = require('chai') const { after, before, beforeEach, describe, it } = require('mocha') const sinon = require('sinon') -const { computePathwayHash } = require('../../../src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../../src/datastreams/processor') -const id = require('../../../src/id') -const agent = require('../../plugins/agent') -const { withVersions } = require('../../setup/mocha') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') +const id = require('../../dd-trace/src/id') +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') const TIMEOUT = 30000 const dsmTopicName = 'dsm-topic' @@ -53,8 +53,8 @@ describe('Plugin', () => { }) before(async () => { - const { PubSub } = require(`../../../../../versions/@google-cloud/pubsub@${version}`).get() - tracer = require('../../../') + const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get() + tracer = require('../../dd-trace') project = getProjectId() resource = `projects/${project}/topics/${dsmTopicName}` pubsub = new PubSub({ projectId: project }) diff --git a/packages/dd-trace/test/datastreams/plugins/kafkajs.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js similarity index 95% rename from packages/dd-trace/test/datastreams/plugins/kafkajs.spec.js rename to packages/datadog-plugin-kafkajs/test/dsm.spec.js index e54f632555b..fd7ccac3756 100644 --- a/packages/dd-trace/test/datastreams/plugins/kafkajs.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -6,12 +6,12 @@ const { describe, it, beforeEach, afterEach, before } = require('mocha') const semver = require('semver') const sinon = require('sinon') -const { withVersions } = require('../../setup/mocha') -const agent = require('../../plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const agent = require('../../dd-trace/test/plugins/agent') -const DataStreamsContext = require('../../../src/datastreams/context') -const { computePathwayHash } = require('../../../src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../../src/datastreams/processor') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const testKafkaClusterId = '5L6g3nShT-eMCtK--X86sw' @@ -53,9 +53,9 @@ describe('Plugin', () => { beforeEach(async () => { process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../../') + tracer = require('../../dd-trace') await agent.load('kafkajs') - const lib = require(`../../../../../versions/kafkajs@${version}`).get() + const lib = require(`../../../versions/kafkajs@${version}`).get() Kafka = lib.Kafka kafka = new Kafka({ clientId: `kafkajs-test-${version}`, From b2404b8b449819a27b25ee8a86b157ecb76ee62c Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 10 Dec 2025 15:04:53 -0500 Subject: [PATCH 04/11] fix linting --- .../datadog-plugin-amqplib/test/dsm.spec.js | 3 --- .../datadog-plugin-amqplib/test/index.spec.js | 3 --- .../test/kinesis.dsm.spec.js | 1 - .../test/kinesis.spec.js | 4 --- .../test/sns.dsm.spec.js | 1 - .../datadog-plugin-aws-sdk/test/sns.spec.js | 4 +-- .../test/sqs.dsm.spec.js | 1 - .../datadog-plugin-aws-sdk/test/sqs.spec.js | 18 ------------- .../test/dsm.spec.js | 1 - .../test/index.spec.js | 17 ------------ .../test/dsm.spec.js | 3 --- .../test/index.spec.js | 8 ------ .../datadog-plugin-kafkajs/test/dsm.spec.js | 3 +-- .../datadog-plugin-kafkajs/test/index.spec.js | 27 +------------------ 14 files changed, 3 insertions(+), 91 deletions(-) diff --git a/packages/datadog-plugin-amqplib/test/dsm.spec.js b/packages/datadog-plugin-amqplib/test/dsm.spec.js index 6b7f5dca54a..6d6a75f02d7 100644 --- a/packages/datadog-plugin-amqplib/test/dsm.spec.js +++ b/packages/datadog-plugin-amqplib/test/dsm.spec.js @@ -13,7 +13,6 @@ const { withVersions } = require('../../dd-trace/test/setup/mocha') const { assertObjectContains } = require('../../../integration-tests/helpers') describe('Plugin', () => { - let tracer let connection let channel let queue @@ -22,7 +21,6 @@ describe('Plugin', () => { withVersions('amqplib', 'amqplib', version => { beforeEach(() => { process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../dd-trace') queue = `test-${id()}` }) @@ -281,4 +279,3 @@ describe('Plugin', () => { }) }) }) - diff --git a/packages/datadog-plugin-amqplib/test/index.spec.js b/packages/datadog-plugin-amqplib/test/index.spec.js index 287417ddb5a..0af299cc836 100644 --- a/packages/datadog-plugin-amqplib/test/index.spec.js +++ b/packages/datadog-plugin-amqplib/test/index.spec.js @@ -6,13 +6,10 @@ const { Buffer } = require('node:buffer') const { afterEach, beforeEach, describe, it } = require('mocha') const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') const agent = require('../../dd-trace/test/plugins/agent') const { withNamingSchema, withPeerService, withVersions } = require('../../dd-trace/test/setup/mocha') const { expectedSchema, rawExpectedSchema } = require('./naming') -const { assertObjectContains } = require('../../../integration-tests/helpers') describe('Plugin', () => { let tracer diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js index 6e234f58171..164473cd484 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js @@ -240,4 +240,3 @@ describe('Kinesis', function () { }) }) }) - diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js index 3e31ff32364..8e3a70bcf1a 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js @@ -3,7 +3,6 @@ const assert = require('node:assert/strict') const { after, afterEach, before, beforeEach, describe, it } = require('mocha') -const sinon = require('sinon') const { assertObjectContains } = require('../../../integration-tests/helpers') const { withNamingSchema, withVersions } = require('../../dd-trace/test/setup/mocha') @@ -12,8 +11,6 @@ const { setup } = require('./spec_helpers') const helpers = require('./kinesis_helpers') const { rawExpectedSchema } = require('./kinesis-naming') const id = require('../../dd-trace/src/id') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') describe('Kinesis', function () { this.timeout(10000) @@ -22,7 +19,6 @@ describe('Kinesis', function () { withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { let AWS let kinesis - let tracer const kinesisClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-kinesis' : 'aws-sdk' diff --git a/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js index dd4d643b96e..d422788c815 100644 --- a/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js @@ -266,4 +266,3 @@ describe('Sns', function () { }) }) }) - diff --git a/packages/datadog-plugin-aws-sdk/test/sns.spec.js b/packages/datadog-plugin-aws-sdk/test/sns.spec.js index 687b7ec771c..ac78a4ff63c 100644 --- a/packages/datadog-plugin-aws-sdk/test/sns.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sns.spec.js @@ -2,8 +2,7 @@ const assert = require('node:assert/strict') -const { after, afterEach, before, describe, it } = require('mocha') -const sinon = require('sinon') +const { after, before, describe, it } = require('mocha') const semver = require('semver') const { assertObjectContains } = require('../../../integration-tests/helpers') @@ -338,7 +337,6 @@ describe('Sns', function () { }) before(done => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' tracer = require('../../dd-trace') tracer.use('aws-sdk', { sns: { dsmEnabled: false, batchPropagationEnabled: true } }) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index 453ef375cad..9ffd215f3ef 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -305,4 +305,3 @@ describe('Plugin', () => { }) }) }) - diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js index 2d70aa65d0c..0a7a1087c08 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js @@ -5,16 +5,11 @@ const { randomUUID } = require('node:crypto') const { expect } = require('chai') const { after, afterEach, before, beforeEach, describe, it } = require('mocha') -const semver = require('semver') -const sinon = require('sinon') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const agent = require('../../dd-trace/test/plugins/agent') const { withNamingSchema, withPeerService, withVersions } = require('../../dd-trace/test/setup/mocha') const { setup } = require('./spec_helpers') const { rawExpectedSchema } = require('./sqs-naming') -const { assertObjectContains } = require('../../../integration-tests/helpers') const getQueueParams = (queueName) => { return { @@ -34,14 +29,8 @@ describe('Plugin', () => { let AWS let sqs let queueName - let queueNameDSM - let queueNameDSMConsumerOnly let queueOptions - let queueOptionsDsm - let queueOptionsDsmConsumerOnly let QueueUrl - let QueueUrlDsm - let QueueUrlDsmConsumerOnly let tracer const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk' @@ -50,21 +39,14 @@ describe('Plugin', () => { const id = randomUUID() queueName = `SQS_QUEUE_NAME-${id}` - queueNameDSM = `SQS_QUEUE_NAME_DSM-${id}` - queueNameDSMConsumerOnly = `SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${id}` queueOptions = getQueueParams(queueName) - queueOptionsDsm = getQueueParams(queueNameDSM) - queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly) QueueUrl = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME-${id}` - QueueUrlDsm = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM-${id}` - QueueUrlDsmConsumerOnly = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${id}` }) describe('without configuration', () => { before(() => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' tracer = require('../../dd-trace') tracer.use('aws-sdk', { sqs: { batchPropagationEnabled: true } }) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js index 64325c02cec..b1a2d74cb26 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js @@ -322,4 +322,3 @@ async function sendMessages (kafka, topic, messages) { }) await producer.disconnect() } - diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index c138e6142d3..a8bd628333c 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -2,7 +2,6 @@ const { expect } = require('chai') const { describe, it, beforeEach, afterEach } = require('mocha') -const sinon = require('sinon') const { randomUUID } = require('node:crypto') @@ -12,22 +11,6 @@ const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/c const { expectedSchema } = require('./naming') const { withVersions } = require('../../dd-trace/test/setup/mocha') -const DataStreamsContext = require('../../dd-trace/src/datastreams/context') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') - -const getDsmPathwayHash = (testTopic, isProducer, parentHash) => { - let edgeTags - if (isProducer) { - edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] - } else { - edgeTags = ['direction:in', 'group:test-group-confluent', 'topic:' + testTopic, 'type:kafka'] - } - - edgeTags.sort() - return computePathwayHash('test', 'tester', edgeTags, parentHash) -} - describe('Plugin', () => { const module = '@confluentinc/kafka-javascript' const groupId = 'test-group-confluent' diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js index abd8e8248f6..806bd98cbb8 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js @@ -37,7 +37,6 @@ describe('Plugin', () => { withVersions('google-cloud-pubsub', '@google-cloud/pubsub', version => { let pubsub let project - let resource let expectedProducerHash let expectedConsumerHash @@ -56,7 +55,6 @@ describe('Plugin', () => { const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get() tracer = require('../../dd-trace') project = getProjectId() - resource = `projects/${project}/topics/${dsmTopicName}` pubsub = new PubSub({ projectId: project }) tracer.use('google-cloud-pubsub', { dsmEnabled: true }) @@ -163,4 +161,3 @@ function publish (topic, options) { return topic.publish(options.data) } } - diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js index 34c4caaddc4..f6375385803 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js @@ -2,20 +2,15 @@ const assert = require('node:assert/strict') -const { expect } = require('chai') const { after, afterEach, before, beforeEach, describe, it } = require('mocha') -const sinon = require('sinon') const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') const agent = require('../../dd-trace/test/plugins/agent') const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/helpers') const { withNamingSchema, withVersions } = require('../../dd-trace/test/setup/mocha') const { expectedSchema, rawExpectedSchema } = require('./naming') const TIMEOUT = 30000 -const dsmTopicName = 'dsm-topic' describe('Plugin', () => { let tracer @@ -25,7 +20,6 @@ describe('Plugin', () => { before(() => { process.env.PUBSUB_EMULATOR_HOST = 'localhost:8081' - process.env.DD_DATA_STREAMS_ENABLED = 'true' }) after(() => { @@ -42,8 +36,6 @@ describe('Plugin', () => { let resource let v1 let gax - let expectedProducerHash - let expectedConsumerHash describe('without configuration', () => { beforeEach(() => { diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index fd7ccac3756..8ca39a0f93f 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -2,7 +2,7 @@ const { randomUUID } = require('crypto') const { expect } = require('chai') -const { describe, it, beforeEach, afterEach, before } = require('mocha') +const { describe, it, beforeEach, afterEach } = require('mocha') const semver = require('semver') const sinon = require('sinon') @@ -244,4 +244,3 @@ async function sendMessages (kafka, topic, messages) { }) await producer.disconnect() } - diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 2f1cde2f81f..f512d74b9b8 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -3,7 +3,7 @@ const { randomUUID } = require('crypto') const { expect } = require('chai') const dc = require('dc-polyfill') -const { describe, it, beforeEach, afterEach, before } = require('mocha') +const { describe, it, beforeEach, afterEach } = require('mocha') const semver = require('semver') const sinon = require('sinon') @@ -13,27 +13,9 @@ const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/he const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') const { expectedSchema, rawExpectedSchema } = require('./naming') -const DataStreamsContext = require('../../dd-trace/src/datastreams/context') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const testKafkaClusterId = '5L6g3nShT-eMCtK--X86sw' -const getDsmPathwayHash = (testTopic, clusterIdAvailable, isProducer, parentHash) => { - let edgeTags - if (isProducer) { - edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] - } else { - edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'] - } - - if (clusterIdAvailable) { - edgeTags.push(`kafka_cluster_id:${testKafkaClusterId}`) - } - edgeTags.sort() - return computePathwayHash('test', 'tester', edgeTags, parentHash) -} - describe('Plugin', () => { describe('kafkajs', function () { // TODO: remove when new internal trace has landed @@ -49,8 +31,6 @@ describe('Plugin', () => { let Kafka let Broker let clusterIdAvailable - let expectedProducerHash - let expectedConsumerHash let testTopic describe('without configuration', () => { @@ -58,7 +38,6 @@ describe('Plugin', () => { const messages2 = [{ key: 'key2', value: 'test3' }] beforeEach(async () => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' tracer = require('../../dd-trace') await agent.load('kafkajs') const lib = require(`../../../versions/kafkajs@${version}`).get() @@ -79,8 +58,6 @@ describe('Plugin', () => { }] }) clusterIdAvailable = semver.intersects(version, '>=1.13') - expectedProducerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, true, ENTRY_PARENT_HASH) - expectedConsumerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, false, expectedProducerHash) }) describe('producer', () => { @@ -88,7 +65,6 @@ describe('Plugin', () => { const meta = { 'span.kind': 'producer', component: 'kafkajs', - 'pathway.hash': expectedProducerHash.readBigUInt64LE(0).toString(), 'messaging.destination.name': testTopic, 'messaging.kafka.bootstrap.servers': '127.0.0.1:9092' } @@ -256,7 +232,6 @@ describe('Plugin', () => { meta: { 'span.kind': 'consumer', component: 'kafkajs', - 'pathway.hash': expectedConsumerHash.readBigUInt64LE(0).toString(), 'messaging.destination.name': testTopic }, resource: testTopic, From 709d15645fa6b9778566e9508601ad9932e58ac4 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 10 Dec 2025 17:19:48 -0500 Subject: [PATCH 05/11] fix some tests --- .../test/dsm.spec.js | 16 ++++++++-------- .../test/dsm.spec.js | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js index b1a2d74cb26..e3b00bb35d7 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js @@ -63,11 +63,14 @@ describe('Plugin', () => { nativeApi = lib // Setup for the KafkaJS wrapper tests - Kafka = lib.Kafka - ConfluentKafka = lib.ConfluentKafka - kafka = new Kafka(ConfluentKafka, { - clientId: `confluent-kafka-test-${version}`, - 'bootstrap.servers': '127.0.0.1:9092' + ConfluentKafka = lib.KafkaJS + Kafka = ConfluentKafka.Kafka + kafka = new Kafka({ + kafkaJS: { + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: ConfluentKafka.logLevel.WARN + } }) testTopic = `test-topic-${randomUUID()}` admin = kafka.admin() @@ -81,9 +84,6 @@ describe('Plugin', () => { }) await admin.disconnect() - tracer.init() - tracer.use('confluentinc-kafka-javascript', { dsmEnabled: true }) - messages = [{ key: 'key1', value: 'test2' }] consumer = kafka.consumer({ kafkaJS: { groupId, fromBeginning: true } }) diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js index 806bd98cbb8..8eb1e7afd75 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js @@ -151,7 +151,7 @@ describe('Plugin', () => { }) function getProjectId () { - return `test-project-${id()}` + return `test-project-dsm-${id()}` } function publish (topic, options) { From 0210882da23020cc695c20d5de0f5c5459628244 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Wed, 10 Dec 2025 18:37:44 -0500 Subject: [PATCH 06/11] fix dsm tests --- .../test/dsm.spec.js | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js index 8eb1e7afd75..59eb4a0c25c 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js @@ -45,18 +45,16 @@ describe('Plugin', () => { let sub let consume - beforeEach(() => { - return agent.load('google-cloud-pubsub', { + before(async () => { + tracer = require('../../dd-trace') + await agent.load('google-cloud-pubsub', { dsmEnabled: true }) - }) + tracer.use('google-cloud-pubsub', { dsmEnabled: true }) - before(async () => { const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get() - tracer = require('../../dd-trace') project = getProjectId() pubsub = new PubSub({ projectId: project }) - tracer.use('google-cloud-pubsub', { dsmEnabled: true }) dsmTopic = await pubsub.createTopic(dsmTopicName) dsmTopic = dsmTopic[0] @@ -82,6 +80,12 @@ describe('Plugin', () => { ) }) + beforeEach(() => { + return agent.load('google-cloud-pubsub', { + dsmEnabled: true + }) + }) + describe('should set a DSM checkpoint', () => { it('on produce', async () => { await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') }) From c648fe8a4aac91ef73f13cc4d3e8c81b4563593d Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 11 Dec 2025 11:00:57 -0500 Subject: [PATCH 07/11] update team name --- CODEOWNERS | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CODEOWNERS b/CODEOWNERS index d7c3d9d8286..17cc9e79a61 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -78,10 +78,10 @@ /packages/dd-trace/test/llmobs/ @DataDog/ml-observability # Data Streams Monitoring -/packages/dd-trace/src/datastreams/ @DataDog/data-stream-monitoring -/packages/dd-trace/test/datastreams/ @DataDog/data-stream-monitoring -/packages/**/dsm.spec.js @DataDog/data-stream-monitoring -/packages/**/*.dsm.spec.js @DataDog/data-stream-monitoring +/packages/dd-trace/src/datastreams/ @DataDog/data-streams-monitoring +/packages/dd-trace/test/datastreams/ @DataDog/data-streams-monitoring +/packages/**/dsm.spec.js @DataDog/data-streams-monitoring +/packages/**/*.dsm.spec.js @DataDog/data-streams-monitoring # API SDK /packages/dd-trace/src/telemetry/ @DataDog/apm-sdk-capabilities-js From b0b792c71ad5fbd279fc235ccbba7b21ea49a79d Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 11 Dec 2025 14:39:25 -0500 Subject: [PATCH 08/11] update test:plugins to include additional *.spec.js files --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 4a6f9a5e119..0b739b18f33 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ "test:llmobs:plugins:ci": "yarn services && nyc --no-clean --include \"packages/dd-trace/src/llmobs/**/*.js\" -- npm run test:llmobs:plugins", "test:openfeature": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/dd-trace/test/openfeature/*.spec.js\"", "test:openfeature:ci": "nyc --no-clean --include \"packages/dd-trace/src/openfeature/**/*.js\" -- npm run test:openfeature", - "test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/@($(echo ${SPEC:-'*'})).spec.js\"", + "test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/@($(echo ${SPEC:-'*'}))*.spec.js\"", "test:plugins:ci": "yarn services && nyc --no-clean --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins", "test:plugins:ci:flaky": "yarn services && nyc --no-clean --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins -- --bail --retries 2", "test:plugins:upstream": "node ./packages/dd-trace/test/plugins/suite.js", From b1f4d269108985f9c87b98854b9b631f3bec0b7a Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 11 Dec 2025 14:55:46 -0500 Subject: [PATCH 09/11] revert queue name change for DSM --- packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index 9ffd215f3ef..bc4f7fa81bd 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -43,8 +43,8 @@ describe('Plugin', () => { beforeEach(() => { const id = randomUUID() - queueNameDSM = `TestQueueDSM-${id}` - queueNameDSMConsumerOnly = `TestQueueDSMConsumerOnly-${id}` + queueNameDSM = `SQS_QUEUE_NAME_DSM-${id}` + queueNameDSMConsumerOnly = `SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${id}` queueOptionsDsm = getQueueParams(queueNameDSM) queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly) QueueUrlDsm = `http://127.0.0.1:4566/000000000000/${queueNameDSM}` From fefa398c53e6b557e0af99d51a4fe61444a4a86f Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 11 Dec 2025 16:40:45 -0500 Subject: [PATCH 10/11] fix ahh bug --- packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index bc4f7fa81bd..4609fdaf26e 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -47,8 +47,8 @@ describe('Plugin', () => { queueNameDSMConsumerOnly = `SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${id}` queueOptionsDsm = getQueueParams(queueNameDSM) queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly) - QueueUrlDsm = `http://127.0.0.1:4566/000000000000/${queueNameDSM}` - QueueUrlDsmConsumerOnly = `http://127.0.0.1:4566/000000000000/${queueNameDSMConsumerOnly}` + QueueUrlDsm = `http://127.0.0.1:4566/00000000000000000000/${queueNameDSM}` + QueueUrlDsmConsumerOnly = `http://127.0.0.1:4566/00000000000000000000/${queueNameDSMConsumerOnly}` }) describe('data stream monitoring', () => { From 339f97a3337a76a2609dc0fd7b994a7ef50d7426 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Thu, 11 Dec 2025 17:11:02 -0500 Subject: [PATCH 11/11] fix another llm ahh bug --- packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js index d422788c815..508f033283b 100644 --- a/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js @@ -60,7 +60,8 @@ describe('Sns', function () { receiveParams = { QueueUrl, - MessageAttributeNames: ['All'] + MessageAttributeNames: ['.*'], + WaitTimeSeconds: 1 } cb()