diff --git a/CODEOWNERS b/CODEOWNERS index c8129b66780..17cc9e79a61 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -77,6 +77,12 @@ /packages/dd-trace/src/llmobs/ @DataDog/ml-observability /packages/dd-trace/test/llmobs/ @DataDog/ml-observability +# Data Streams Monitoring +/packages/dd-trace/src/datastreams/ @DataDog/data-streams-monitoring +/packages/dd-trace/test/datastreams/ @DataDog/data-streams-monitoring +/packages/**/dsm.spec.js @DataDog/data-streams-monitoring +/packages/**/*.dsm.spec.js @DataDog/data-streams-monitoring + # API SDK /packages/dd-trace/src/telemetry/ @DataDog/apm-sdk-capabilities-js /packages/dd-trace/test/telemetry/ @DataDog/apm-sdk-capabilities-js diff --git a/package.json b/package.json index f2399233e9f..d5f3950e06d 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ "test:llmobs:plugins:ci": "yarn services && nyc --no-clean --include \"packages/dd-trace/src/llmobs/**/*.js\" -- npm run test:llmobs:plugins", "test:openfeature": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/dd-trace/test/openfeature/*.spec.js\"", "test:openfeature:ci": "nyc --no-clean --include \"packages/dd-trace/src/openfeature/**/*.js\" -- npm run test:openfeature", - "test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/@($(echo ${SPEC:-'*'})).spec.js\"", + "test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/@($(echo ${SPEC:-'*'}))*.spec.js\"", "test:plugins:ci": "yarn services && nyc --no-clean --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins", "test:plugins:ci:flaky": "yarn services && nyc --no-clean --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins -- --bail --retries 2", "test:plugins:upstream": "node ./packages/dd-trace/test/plugins/suite.js", diff --git a/packages/datadog-plugin-amqplib/test/dsm.spec.js b/packages/datadog-plugin-amqplib/test/dsm.spec.js new file mode 100644 index 00000000000..6d6a75f02d7 --- /dev/null +++ b/packages/datadog-plugin-amqplib/test/dsm.spec.js @@ -0,0 +1,281 @@ +'use strict' + +const assert = require('node:assert/strict') +const { Buffer } = require('node:buffer') + +const { afterEach, beforeEach, describe, it } = require('mocha') + +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') +const id = require('../../dd-trace/src/id') +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const { assertObjectContains } = require('../../../integration-tests/helpers') + +describe('Plugin', () => { + let connection + let channel + let queue + + describe('amqplib', () => { + withVersions('amqplib', 'amqplib', version => { + beforeEach(() => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + queue = `test-${id()}` + }) + + afterEach(() => { + connection.close() + }) + + describe('data stream monitoring', function () { + this.timeout(10000) + + let expectedProducerHashWithTopic + let expectedProducerHashWithExchange + let expectedConsumerHash + + beforeEach(done => { + agent.load('amqplib').then(() => { + require(`../../../versions/amqplib@${version}`).get('amqplib/callback_api') + .connect((err, conn) => { + connection = conn + + if (err != null) { + return done(err) + } + + conn.createChannel((err, ch) => { + channel = ch + return done(err) + }) + }) + }) + }) + + afterEach(() => { + return agent.close({ ritmReset: false }) + }) + + beforeEach(() => { + const producerHashWithTopic = computePathwayHash('test', 'tester', [ + 'direction:out', + 'has_routing_key:true', + `topic:${queue}`, + 'type:rabbitmq' + ], ENTRY_PARENT_HASH) + + expectedProducerHashWithTopic = producerHashWithTopic.readBigUInt64LE(0).toString() + + expectedProducerHashWithExchange = computePathwayHash('test', 'tester', [ + 'direction:out', + 'exchange:namedExchange', + 'has_routing_key:true', + 'type:rabbitmq' + ], ENTRY_PARENT_HASH).readBigUInt64LE(0).toString() + + expectedConsumerHash = computePathwayHash('test', 'tester', [ + 'direction:in', + `topic:${queue}`, + 'type:rabbitmq' + ], producerHashWithTopic).readBigUInt64LE(0).toString() + }) + + it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.ok(statsPointsReceived.length >= 1) + assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ + 'direction:out', + 'has_routing_key:true', + `topic:${queue}`, + 'type:rabbitmq' + ]) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + }) + }) + + it('Should emit DSM stats to the agent when sending a message on an named exchange', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.ok(statsPointsReceived.length >= 1) + assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ + 'direction:out', + 'exchange:namedExchange', + 'has_routing_key:true', + 'type:rabbitmq' + ]) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithExchange), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertExchange('namedExchange', 'direct', {}, (err, ok) => { + if (err) return done(err) + + channel.publish('namedExchange', 'anyOldRoutingKey', Buffer.from('DSM pathway test')) + }) + }) + + it('Should emit DSM stats to the agent when receiving a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.strictEqual(statsPointsReceived.length, 2) + assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, + ['direction:in', `topic:${queue}`, 'type:rabbitmq']) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + channel.consume(ok.queue, () => {}, {}, (err, ok) => { + if (err) done(err) + }) + }) + }) + + it('Should emit DSM stats to the agent when sending another message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.strictEqual(statsPointsReceived.length, 1) + assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ + 'direction:out', + 'has_routing_key:true', + `topic:${queue}`, + 'type:rabbitmq' + ]) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + }) + }) + + it('Should emit DSM stats to the agent when receiving a message with get', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = [] + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) + }) + } + }) + assert.strictEqual(statsPointsReceived.length, 2) + assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, + ['direction:in', `topic:${queue}`, 'type:rabbitmq']) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) + channel.get(ok.queue, {}, (err, ok) => { + if (err) done(err) + }) + }) + }) + + it('regression test: should handle basic.get when queue is empty', done => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.get(ok.queue, {}, (err, msg) => { + if (err) return done(err) + assert.strictEqual(msg, false) + done() + }) + }) + }) + + it('Should set pathway hash tag on a span when producing', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('dsm test')) + + let produceSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('basic.publish')) { + produceSpanMeta = span.meta + } + + assertObjectContains(produceSpanMeta, { + 'pathway.hash': expectedProducerHashWithTopic + }) + }, { timeoutMs: 10000 }).then(done, done) + }) + }) + + it('Should set pathway hash tag on a span when consuming', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('dsm test')) + channel.consume(ok.queue, () => {}, {}, (err, ok) => { + if (err) return done(err) + + let consumeSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('basic.deliver')) { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }, { timeoutMs: 10000 }).then(done, done) + }) + }) + }) + }) + }) + }) +}) diff --git a/packages/datadog-plugin-amqplib/test/index.spec.js b/packages/datadog-plugin-amqplib/test/index.spec.js index 408efc892d8..0af299cc836 100644 --- a/packages/datadog-plugin-amqplib/test/index.spec.js +++ b/packages/datadog-plugin-amqplib/test/index.spec.js @@ -6,13 +6,10 @@ const { Buffer } = require('node:buffer') const { afterEach, beforeEach, describe, it } = require('mocha') const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') const agent = require('../../dd-trace/test/plugins/agent') const { withNamingSchema, withPeerService, withVersions } = require('../../dd-trace/test/setup/mocha') const { expectedSchema, rawExpectedSchema } = require('./naming') -const { assertObjectContains } = require('../../../integration-tests/helpers') describe('Plugin', () => { let tracer @@ -309,233 +306,6 @@ describe('Plugin', () => { .catch(done) }) }) - - describe('when data streams monitoring is enabled', function () { - this.timeout(10000) - - let expectedProducerHashWithTopic - let expectedProducerHashWithExchange - let expectedConsumerHash - - beforeEach(() => { - const producerHashWithTopic = computePathwayHash('test', 'tester', [ - 'direction:out', - 'has_routing_key:true', - `topic:${queue}`, - 'type:rabbitmq' - ], ENTRY_PARENT_HASH) - - expectedProducerHashWithTopic = producerHashWithTopic.readBigUInt64LE(0).toString() - - expectedProducerHashWithExchange = computePathwayHash('test', 'tester', [ - 'direction:out', - 'exchange:namedExchange', - 'has_routing_key:true', - 'type:rabbitmq' - ], ENTRY_PARENT_HASH).readBigUInt64LE(0).toString() - - expectedConsumerHash = computePathwayHash('test', 'tester', [ - 'direction:in', - `topic:${queue}`, - 'type:rabbitmq' - ], producerHashWithTopic).readBigUInt64LE(0).toString() - }) - - it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.ok(statsPointsReceived.length >= 1) - assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ - 'direction:out', - 'has_routing_key:true', - `topic:${queue}`, - 'type:rabbitmq' - ]) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - }) - }) - - it('Should emit DSM stats to the agent when sending a message on an named exchange', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.ok(statsPointsReceived.length >= 1) - assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ - 'direction:out', - 'exchange:namedExchange', - 'has_routing_key:true', - 'type:rabbitmq' - ]) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithExchange), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertExchange('namedExchange', 'direct', {}, (err, ok) => { - if (err) return done(err) - - channel.publish('namedExchange', 'anyOldRoutingKey', Buffer.from('DSM pathway test')) - }) - }) - - it('Should emit DSM stats to the agent when receiving a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.strictEqual(statsPointsReceived.length, 2) - assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, - ['direction:in', `topic:${queue}`, 'type:rabbitmq']) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - channel.consume(ok.queue, () => {}, {}, (err, ok) => { - if (err) done(err) - }) - }) - }) - - it('Should emit DSM stats to the agent when sending another message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.strictEqual(statsPointsReceived.length, 1) - assert.deepStrictEqual(statsPointsReceived[0].EdgeTags, [ - 'direction:out', - 'has_routing_key:true', - `topic:${queue}`, - 'type:rabbitmq' - ]) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHashWithTopic), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - }) - }) - - it('Should emit DSM stats to the agent when receiving a message with get', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = [] - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats) - }) - } - }) - assert.strictEqual(statsPointsReceived.length, 2) - assert.deepStrictEqual(statsPointsReceived[1].EdgeTags, - ['direction:in', `topic:${queue}`, 'type:rabbitmq']) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test')) - channel.get(ok.queue, {}, (err, ok) => { - if (err) done(err) - }) - }) - }) - - it('regression test: should handle basic.get when queue is empty', done => { - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.get(ok.queue, {}, (err, msg) => { - if (err) return done(err) - assert.strictEqual(msg, false) - done() - }) - }) - }) - - it('Should set pathway hash tag on a span when producing', (done) => { - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('dsm test')) - - let produceSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('basic.publish')) { - produceSpanMeta = span.meta - } - - assertObjectContains(produceSpanMeta, { - 'pathway.hash': expectedProducerHashWithTopic - }) - }, { timeoutMs: 10000 }).then(done, done) - }) - }) - - it('Should set pathway hash tag on a span when consuming', (done) => { - channel.assertQueue(queue, {}, (err, ok) => { - if (err) return done(err) - - channel.sendToQueue(ok.queue, Buffer.from('dsm test')) - channel.consume(ok.queue, () => {}, {}, (err, ok) => { - if (err) return done(err) - - let consumeSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('basic.deliver')) { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }, { timeoutMs: 10000 }).then(done, done) - }) - }) - }) - }) }) describe('with configuration', () => { diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js new file mode 100644 index 00000000000..164473cd484 --- /dev/null +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js @@ -0,0 +1,242 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { afterEach, beforeEach, describe, it } = require('mocha') +const sinon = require('sinon') + +const { assertObjectContains } = require('../../../integration-tests/helpers') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const agent = require('../../dd-trace/test/plugins/agent') +const { setup } = require('./spec_helpers') +const helpers = require('./kinesis_helpers') +const id = require('../../dd-trace/src/id') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') + +describe('Kinesis', function () { + this.timeout(10000) + setup() + + withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { + let AWS + let kinesis + let tracer + + const kinesisClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-kinesis' : 'aws-sdk' + + function createResources (streamName, cb) { + AWS = require(`../../../versions/${kinesisClientName}@${version}`).get() + + const params = { + endpoint: 'http://127.0.0.1:4566', + region: 'us-east-1' + } + + if (moduleName === '@aws-sdk/smithy-client') { + const { NodeHttpHandler } = require(`../../../versions/@aws-sdk/node-http-handler@${version}`).get() + + params.requestHandler = new NodeHttpHandler() + } + + kinesis = new AWS.Kinesis(params) + + kinesis.createStream({ + StreamName: streamName, + ShardCount: 1 + }, (err, res) => { + if (err) return cb(err) + + helpers.waitForActiveStream(kinesis, streamName, cb) + }) + } + + describe('DSM Context Propagation', () => { + let expectedProducerHash + let expectedConsumerHash + let nowStub + let streamNameDSM + + beforeEach(() => { + return agent.load('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + beforeEach(done => { + tracer = require('../../dd-trace') + tracer.use('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + + streamNameDSM = `MyStreamDSM-${id()}` + + const producerHash = computePathwayHash( + 'test', + 'tester', + ['direction:out', 'topic:' + streamNameDSM, 'type:kinesis'], + ENTRY_PARENT_HASH + ) + + expectedProducerHash = producerHash.readBigUInt64LE(0).toString() + expectedConsumerHash = computePathwayHash( + 'test', + 'tester', + ['direction:in', 'topic:' + streamNameDSM, 'type:kinesis'], + producerHash + ).readBigUInt64LE(0).toString() + + createResources(streamNameDSM, done) + }) + + afterEach(done => { + kinesis.deleteStream({ + StreamName: streamNameDSM + }, (err, res) => { + if (err) return done(err) + + helpers.waitForDeletedStream(kinesis, streamNameDSM, done) + }) + }) + + afterEach(() => { + try { + nowStub.restore() + } catch { + // pass + } + agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + it('injects DSM pathway hash during Kinesis getRecord to the span', done => { + let getRecordSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.response') { + getRecordSpanMeta = span.meta + } + + assertObjectContains(getRecordSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + helpers.getTestData(kinesis, streamNameDSM, data, () => {}) + }) + }) + + it('injects DSM pathway hash during Kinesis putRecord to the span', done => { + let putRecordSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('putRecord')) { + putRecordSpanMeta = span.meta + } + + assertObjectContains(putRecordSpanMeta, { + 'pathway.hash': expectedProducerHash + }) + }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) + }) + + it('emits DSM stats to the agent during Kinesis putRecord', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 1 stats point since we only had 1 put operation + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) + }) + + it('emits DSM stats to the agent during Kinesis getRecord', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 1 stats point since we only had 1 put operation + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }, { timeoutMs: 10000 }) + assert.ok(statsPointsReceived >= 2) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + helpers.getTestData(kinesis, streamNameDSM, data, () => {}) + }) + }) + + // eslint-disable-next-line @stylistic/max-len + it('emits DSM stats to the agent during Kinesis getRecord when the putRecord was done without DSM enabled', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 1 stats point since we only had 1 put operation + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }, { timeoutMs: 10000 }) + assert.strictEqual(statsPointsReceived, 1) + assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) + }, { timeoutMs: 10000 }).then(done, done) + + // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. + agent.reload('aws-sdk', { kinesis: { dsmEnabled: false } }, { dsmEnabled: false }) + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. + agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + helpers.getTestData(kinesis, streamNameDSM, data, () => {}) + }) + }) + + it('emits DSM stats to the agent during Kinesis putRecords', done => { + // we need to stub Date.now() to ensure a new stats bucket is created for each call + // otherwise, all stats checkpoints will be combined into a single stats points + let now = Date.now() + nowStub = sinon.stub(Date, 'now') + nowStub.callsFake(() => { + now += 1000000 + return now + }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have only have 3 stats points since we only had 3 records published + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 3) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }, { timeoutMs: 10000 }).then(done, done) + + helpers.putTestRecords(kinesis, streamNameDSM, (err, data) => { + // Swallow the error as it doesn't matter for this test. + }) + }) + }) + }) +}) diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js index 7dc7f3ae014..8e3a70bcf1a 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.spec.js @@ -3,7 +3,6 @@ const assert = require('node:assert/strict') const { after, afterEach, before, beforeEach, describe, it } = require('mocha') -const sinon = require('sinon') const { assertObjectContains } = require('../../../integration-tests/helpers') const { withNamingSchema, withVersions } = require('../../dd-trace/test/setup/mocha') @@ -12,8 +11,6 @@ const { setup } = require('./spec_helpers') const helpers = require('./kinesis_helpers') const { rawExpectedSchema } = require('./kinesis-naming') const id = require('../../dd-trace/src/id') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') describe('Kinesis', function () { this.timeout(10000) @@ -22,7 +19,6 @@ describe('Kinesis', function () { withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { let AWS let kinesis - let tracer const kinesisClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-kinesis' : 'aws-sdk' @@ -194,193 +190,5 @@ describe('Kinesis', function () { }) }) }) - - describe('DSM Context Propagation', () => { - let expectedProducerHash - let expectedConsumerHash - let nowStub - let streamNameDSM - - beforeEach(() => { - return agent.load('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - beforeEach(done => { - tracer = require('../../dd-trace') - tracer.use('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - - streamNameDSM = `MyStreamDSM-${id()}` - - const producerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + streamNameDSM, 'type:kinesis'], - ENTRY_PARENT_HASH - ) - - expectedProducerHash = producerHash.readBigUInt64LE(0).toString() - expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'topic:' + streamNameDSM, 'type:kinesis'], - producerHash - ).readBigUInt64LE(0).toString() - - createResources(streamNameDSM, done) - }) - - afterEach(done => { - kinesis.deleteStream({ - StreamName: streamNameDSM - }, (err, res) => { - if (err) return done(err) - - helpers.waitForDeletedStream(kinesis, streamNameDSM, done) - }) - }) - - afterEach(() => { - try { - nowStub.restore() - } catch { - // pass - } - agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - it('injects DSM pathway hash during Kinesis getRecord to the span', done => { - let getRecordSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.response') { - getRecordSpanMeta = span.meta - } - - assertObjectContains(getRecordSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { - if (err) return done(err) - - helpers.getTestData(kinesis, streamNameDSM, data, () => {}) - }) - }) - - it('injects DSM pathway hash during Kinesis putRecord to the span', done => { - let putRecordSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('putRecord')) { - putRecordSpanMeta = span.meta - } - - assertObjectContains(putRecordSpanMeta, { - 'pathway.hash': expectedProducerHash - }) - }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) - }) - - it('emits DSM stats to the agent during Kinesis putRecord', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 1 stats point since we only had 1 put operation - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, () => {}) - }) - - it('emits DSM stats to the agent during Kinesis getRecord', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 1 stats point since we only had 1 put operation - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }, { timeoutMs: 10000 }) - assert.ok(statsPointsReceived >= 2) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { - if (err) return done(err) - - helpers.getTestData(kinesis, streamNameDSM, data, () => {}) - }) - }) - - // eslint-disable-next-line @stylistic/max-len - it('emits DSM stats to the agent during Kinesis getRecord when the putRecord was done without DSM enabled', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 1 stats point since we only had 1 put operation - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }, { timeoutMs: 10000 }) - assert.strictEqual(statsPointsReceived, 1) - assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) - }, { timeoutMs: 10000 }).then(done, done) - - // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. - agent.reload('aws-sdk', { kinesis: { dsmEnabled: false } }, { dsmEnabled: false }) - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { - if (err) return done(err) - - // TODO: Fix this. The third argument is not used. Check all usages of agent.reload. - agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - helpers.getTestData(kinesis, streamNameDSM, data, () => {}) - }) - }) - - it('emits DSM stats to the agent during Kinesis putRecords', done => { - // we need to stub Date.now() to ensure a new stats bucket is created for each call - // otherwise, all stats checkpoints will be combined into a single stats points - let now = Date.now() - nowStub = sinon.stub(Date, 'now') - nowStub.callsFake(() => { - now += 1000000 - return now - }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have only have 3 stats points since we only had 3 records published - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 3) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }, { timeoutMs: 10000 }).then(done, done) - - helpers.putTestRecords(kinesis, streamNameDSM, (err, data) => { - // Swallow the error as it doesn't matter for this test. - }) - }) - }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js new file mode 100644 index 00000000000..508f033283b --- /dev/null +++ b/packages/datadog-plugin-aws-sdk/test/sns.dsm.spec.js @@ -0,0 +1,269 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { after, afterEach, before, describe, it } = require('mocha') +const sinon = require('sinon') +const semver = require('semver') + +const { assertObjectContains } = require('../../../integration-tests/helpers') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const agent = require('../../dd-trace/test/plugins/agent') +const { setup } = require('./spec_helpers') + +describe('Sns', function () { + setup() + this.timeout(20000) + + withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { + let sns + let sqs + let subParams + let receiveParams + let TopicArn + let QueueUrl + let tracer + + const snsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sns' : 'aws-sdk' + const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk' + + function createResources (queueName, topicName, cb) { + const { SNS } = require(`../../../versions/${snsClientName}@${version}`).get() + const { SQS } = require(`../../../versions/${sqsClientName}@${version}`).get() + + sns = new SNS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) + sqs = new SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) + + sns.createTopic({ Name: topicName }, (err, data) => { + if (err) return cb(err) + + TopicArn = data.TopicArn + + sqs.createQueue({ QueueName: queueName }, (err, data) => { + if (err) return cb(err) + + QueueUrl = data.QueueUrl + + sqs.getQueueAttributes({ + QueueUrl, + AttributeNames: ['QueueArn'] + }, (err, data) => { + if (err) return cb(err) + + const QueueArn = data.Attributes.QueueArn + + subParams = { + Protocol: 'sqs', + TopicArn, + Endpoint: QueueArn + } + + receiveParams = { + QueueUrl, + MessageAttributeNames: ['.*'], + WaitTimeSeconds: 1 + } + + cb() + }) + }) + }) + } + + describe('Data Streams Monitoring', () => { + const expectedProducerHash = '15386798273908484982' + const expectedConsumerHash = '15162998336469814920' + let nowStub + + before(() => { + return agent.load('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + before(done => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../dd-trace') + tracer.use('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }) + + createResources('TestQueueDSM', 'TestTopicDSM', done) + }) + + after(done => { + sns.deleteTopic({ TopicArn }, done) + }) + + after(done => { + sqs.deleteQueue({ QueueUrl }, done) + }) + + after(() => { + return agent.close({ ritmReset: false, wipe: true }) + }) + + afterEach(() => { + try { + nowStub.restore() + } catch { + // pass + } + // TODO: Fix this. The third argument is not used. + agent.reload('aws-sdk', { sns: { dsmEnabled: true, batchPropagationEnabled: true } }, { dsmEnabled: true }) + }) + + it('injects DSM pathway hash to SNS publish span', done => { + sns.subscribe(subParams, (err, data) => { + if (err) return done(err) + + sns.publish( + { TopicArn, Message: 'message DSM' }, + (err) => { + if (err) return done(err) + + let publishSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('publish')) { + publishSpanMeta = span.meta + } + + assertObjectContains(publishSpanMeta, { + 'pathway.hash': expectedProducerHash + }) + }).then(done, done) + }) + }) + }) + + it('injects DSM pathway hash to SQS receive span from SNS topic', done => { + sns.subscribe(subParams, (err, data) => { + if (err) return done(err) + + sns.publish( + { TopicArn, Message: 'message DSM' }, + (err) => { + if (err) return done(err) + }) + + sqs.receiveMessage( + receiveParams, + (err, res) => { + if (err) return done(err) + + let consumeSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.response') { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }).then(done, done) + }) + }) + }) + + it('outputs DSM stats to the agent when publishing a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }).then(done, done) + + sns.subscribe(subParams, () => { + sns.publish({ TopicArn, Message: 'message DSM' }, () => {}) + }) + }) + + it('outputs DSM stats to the agent when consuming a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 2) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }).then(done, done) + + sns.subscribe(subParams, () => { + sns.publish({ TopicArn, Message: 'message DSM' }, () => { + sqs.receiveMessage(receiveParams, () => {}) + }) + }) + }) + + it('outputs DSM stats to the agent when publishing batch messages', function (done) { + // publishBatch was released with version 2.1031.0 for the aws-sdk + // publishBatch does not work with smithy-client 3.0.0, unable to find compatible version it + // was released for, but works on 3.374.0 + if ( + (moduleName === '@aws-sdk/smithy-client' && semver.intersects(version, '>=3.374.0')) || + (moduleName === 'aws-sdk' && semver.intersects(version, '>=2.1031.0')) + ) { + // we need to stub Date.now() to ensure a new stats bucket is created for each call + // otherwise, all stats checkpoints will be combined into a single stats points + let now = Date.now() + nowStub = sinon.stub(Date, 'now') + nowStub.callsFake(() => { + now += 1000000 + return now + }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 3 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 3) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }, { timeoutMs: 2000 }).then(done, done) + + sns.subscribe(subParams, () => { + sns.publishBatch( + { + TopicArn, + PublishBatchRequestEntries: [ + { + Id: '1', + Message: 'message DSM 1' + }, + { + Id: '2', + Message: 'message DSM 2' + }, + { + Id: '3', + Message: 'message DSM 3' + } + ] + }, () => { + nowStub.restore() + }) + }) + } else { + this.skip() + } + }) + }) + }) +}) diff --git a/packages/datadog-plugin-aws-sdk/test/sns.spec.js b/packages/datadog-plugin-aws-sdk/test/sns.spec.js index 9e5d2c5d450..ac78a4ff63c 100644 --- a/packages/datadog-plugin-aws-sdk/test/sns.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sns.spec.js @@ -2,8 +2,7 @@ const assert = require('node:assert/strict') -const { after, afterEach, before, describe, it } = require('mocha') -const sinon = require('sinon') +const { after, before, describe, it } = require('mocha') const semver = require('semver') const { assertObjectContains } = require('../../../integration-tests/helpers') @@ -338,7 +337,6 @@ describe('Sns', function () { }) before(done => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' tracer = require('../../dd-trace') tracer.use('aws-sdk', { sns: { dsmEnabled: false, batchPropagationEnabled: true } }) @@ -502,201 +500,5 @@ describe('Sns', function () { sns.publish({ TopicArn, Message: 'message 1' }, e => e && done(e)) }) }) - - describe('Data Streams Monitoring', () => { - const expectedProducerHash = '15386798273908484982' - const expectedConsumerHash = '15162998336469814920' - let nowStub - - before(() => { - return agent.load('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - before(done => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../dd-trace') - tracer.use('aws-sdk', { sns: { dsmEnabled: true }, sqs: { dsmEnabled: true } }) - - createResources('TestQueueDSM', 'TestTopicDSM', done) - }) - - after(done => { - sns.deleteTopic({ TopicArn }, done) - }) - - after(done => { - sqs.deleteQueue({ QueueUrl }, done) - }) - - after(() => { - return agent.close({ ritmReset: false, wipe: true }) - }) - - afterEach(() => { - try { - nowStub.restore() - } catch { - // pass - } - // TODO: Fix this. The third argument is not used. - agent.reload('aws-sdk', { sns: { dsmEnabled: true, batchPropagationEnabled: true } }, { dsmEnabled: true }) - }) - - it('injects DSM pathway hash to SNS publish span', done => { - sns.subscribe(subParams, (err, data) => { - if (err) return done(err) - - sns.publish( - { TopicArn, Message: 'message DSM' }, - (err) => { - if (err) return done(err) - - let publishSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('publish')) { - publishSpanMeta = span.meta - } - - assertObjectContains(publishSpanMeta, { - 'pathway.hash': expectedProducerHash - }) - }).then(done, done) - }) - }) - }) - - it('injects DSM pathway hash to SQS receive span from SNS topic', done => { - sns.subscribe(subParams, (err, data) => { - if (err) return done(err) - - sns.publish( - { TopicArn, Message: 'message DSM' }, - (err) => { - if (err) return done(err) - }) - - sqs.receiveMessage( - receiveParams, - (err, res) => { - if (err) return done(err) - - let consumeSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.response') { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }).then(done, done) - }) - }) - }) - - it('outputs DSM stats to the agent when publishing a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }).then(done, done) - - sns.subscribe(subParams, () => { - sns.publish({ TopicArn, Message: 'message DSM' }, () => {}) - }) - }) - - it('outputs DSM stats to the agent when consuming a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 2) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }).then(done, done) - - sns.subscribe(subParams, () => { - sns.publish({ TopicArn, Message: 'message DSM' }, () => { - sqs.receiveMessage(receiveParams, () => {}) - }) - }) - }) - - it('outputs DSM stats to the agent when publishing batch messages', function (done) { - // publishBatch was released with version 2.1031.0 for the aws-sdk - // publishBatch does not work with smithy-client 3.0.0, unable to find compatible version it - // was released for, but works on 3.374.0 - if ( - (moduleName === '@aws-sdk/smithy-client' && semver.intersects(version, '>=3.374.0')) || - (moduleName === 'aws-sdk' && semver.intersects(version, '>=2.1031.0')) - ) { - // we need to stub Date.now() to ensure a new stats bucket is created for each call - // otherwise, all stats checkpoints will be combined into a single stats points - let now = Date.now() - nowStub = sinon.stub(Date, 'now') - nowStub.callsFake(() => { - now += 1000000 - return now - }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 3 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 3) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }, { timeoutMs: 2000 }).then(done, done) - - sns.subscribe(subParams, () => { - sns.publishBatch( - { - TopicArn, - PublishBatchRequestEntries: [ - { - Id: '1', - Message: 'message DSM 1' - }, - { - Id: '2', - Message: 'message DSM 2' - }, - { - Id: '3', - Message: 'message DSM 3' - } - ] - }, () => { - nowStub.restore() - }) - }) - } else { - this.skip() - } - }) - }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js new file mode 100644 index 00000000000..4609fdaf26e --- /dev/null +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -0,0 +1,307 @@ +'use strict' + +const assert = require('node:assert/strict') +const { randomUUID } = require('node:crypto') + +const { after, afterEach, before, beforeEach, describe, it } = require('mocha') +const semver = require('semver') +const sinon = require('sinon') + +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const { setup } = require('./spec_helpers') +const { assertObjectContains } = require('../../../integration-tests/helpers') + +const getQueueParams = (queueName) => { + return { + QueueName: queueName, + Attributes: { + MessageRetentionPeriod: '86400' + } + } +} + +describe('Plugin', () => { + describe('aws-sdk (sqs)', function () { + this.timeout(10000) + setup() + + withVersions('aws-sdk', ['aws-sdk', '@aws-sdk/smithy-client'], (version, moduleName) => { + let AWS + let sqs + let queueNameDSM + let queueNameDSMConsumerOnly + let queueOptionsDsm + let queueOptionsDsmConsumerOnly + let QueueUrlDsm + let QueueUrlDsmConsumerOnly + let tracer + + const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk' + + beforeEach(() => { + const id = randomUUID() + queueNameDSM = `SQS_QUEUE_NAME_DSM-${id}` + queueNameDSMConsumerOnly = `SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${id}` + queueOptionsDsm = getQueueParams(queueNameDSM) + queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly) + QueueUrlDsm = `http://127.0.0.1:4566/00000000000000000000/${queueNameDSM}` + QueueUrlDsmConsumerOnly = `http://127.0.0.1:4566/00000000000000000000/${queueNameDSMConsumerOnly}` + }) + + describe('data stream monitoring', () => { + let expectedProducerHash + let expectedConsumerHash + let nowStub + + before(() => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../dd-trace') + tracer.use('aws-sdk', { sqs: { dsmEnabled: true } }) + }) + + before(async () => { + return agent.load('aws-sdk', { + sqs: { + dsmEnabled: true + } + }, + { dsmEnabled: true }) + }) + + before(() => { + AWS = require(`../../../versions/${sqsClientName}@${version}`).get() + sqs = new AWS.SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) + }) + + beforeEach(() => { + const producerHash = computePathwayHash( + 'test', + 'tester', + ['direction:out', 'topic:' + queueNameDSM, 'type:sqs'], + ENTRY_PARENT_HASH + ) + + expectedProducerHash = producerHash.readBigUInt64LE(0).toString() + expectedConsumerHash = computePathwayHash( + 'test', + 'tester', + ['direction:in', 'topic:' + queueNameDSM, 'type:sqs'], + producerHash + ).readBigUInt64LE(0).toString() + }) + + beforeEach(done => { + sqs.createQueue(queueOptionsDsm, (err, res) => err ? done(err) : done()) + }) + + beforeEach(done => { + sqs.createQueue(queueOptionsDsmConsumerOnly, (err, res) => err ? done(err) : done()) + }) + + afterEach(done => { + sqs.deleteQueue({ QueueUrl: QueueUrlDsm }, done) + }) + + afterEach(done => { + sqs.deleteQueue({ QueueUrl: QueueUrlDsmConsumerOnly }, done) + }) + + after(() => { + return agent.close({ ritmReset: false }) + }) + + afterEach(() => { + try { + nowStub.restore() + } catch { + // pass + } + agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) + }) + + it('Should set pathway hash tag on a span when producing', (done) => { + sqs.sendMessage({ + MessageBody: 'test DSM', + QueueUrl: QueueUrlDsm + }, (err) => { + if (err) return done(err) + + let produceSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.resource.startsWith('sendMessage')) { + produceSpanMeta = span.meta + } + + assertObjectContains(produceSpanMeta, { + 'pathway.hash': expectedProducerHash + }) + }).then(done, done) + }) + }) + + it('Should set pathway hash tag on a span when consuming', (done) => { + sqs.sendMessage({ + MessageBody: 'test DSM', + QueueUrl: QueueUrlDsm + }, (err) => { + if (err) return done(err) + + sqs.receiveMessage({ + QueueUrl: QueueUrlDsm, + MessageAttributeNames: ['.*'] + }, (err) => { + if (err) return done(err) + + let consumeSpanMeta = {} + agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.response') { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }).then(done, done) + }) + }) + }) + + if (sqsClientName === 'aws-sdk' && semver.intersects(version, '>=2.3')) { + it('Should set pathway hash tag on a span when consuming and promise() was used over a callback', + async () => { + let consumeSpanMeta = {} + const tracePromise = agent.assertSomeTraces(traces => { + const span = traces[0][0] + + if (span.name === 'aws.request' && span.meta['aws.operation'] === 'receiveMessage') { + consumeSpanMeta = span.meta + } + + assertObjectContains(consumeSpanMeta, { + 'pathway.hash': expectedConsumerHash + }) + }) + + await sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }).promise() + await sqs.receiveMessage({ QueueUrl: QueueUrlDsm }).promise() + + return tracePromise + }) + } + + it('Should emit DSM stats to the agent when sending a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }).then(done, done) + + sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => {}) + }) + + it('Should emit DSM stats to the agent when receiving a message', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 2) + assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) + }, { timeoutMs: 5000 }).then(done, done) + + sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => { + sqs.receiveMessage({ QueueUrl: QueueUrlDsm, MessageAttributeNames: ['.*'] }, () => {}) + }) + }) + + it('Should emit DSM stats when receiving a message when the producer was not instrumented', done => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.strictEqual(statsPointsReceived, 1) + assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) + }).then(done, done) + + agent.reload('aws-sdk', { sqs: { dsmEnabled: false } }, { dsmEnabled: false }) + sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsmConsumerOnly }, () => { + agent.reload('aws-sdk', { sqs: { dsmEnabled: true } }, { dsmEnabled: true }) + sqs.receiveMessage({ QueueUrl: QueueUrlDsmConsumerOnly, MessageAttributeNames: ['.*'] }, () => {}) + }) + }) + + it('Should emit DSM stats to the agent when sending batch messages', done => { + // we need to stub Date.now() to ensure a new stats bucket is created for each call + // otherwise, all stats checkpoints will be combined into a single stats points + let now = Date.now() + nowStub = sinon.stub(Date, 'now') + nowStub.callsFake(() => { + now += 1000000 + return now + }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 3 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 3) + assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) + }).then(done, done) + + sqs.sendMessageBatch( + { + Entries: [ + { + Id: '1', + MessageBody: 'test DSM 1' + }, + { + Id: '2', + MessageBody: 'test DSM 2' + }, + { + Id: '3', + MessageBody: 'test DSM 3' + } + ], + QueueUrl: QueueUrlDsm + }, () => { + nowStub.restore() + }) + }) + }) + }) + }) +}) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js index 31e20f623bd..0a7a1087c08 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.spec.js @@ -5,16 +5,11 @@ const { randomUUID } = require('node:crypto') const { expect } = require('chai') const { after, afterEach, before, beforeEach, describe, it } = require('mocha') -const semver = require('semver') -const sinon = require('sinon') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const agent = require('../../dd-trace/test/plugins/agent') const { withNamingSchema, withPeerService, withVersions } = require('../../dd-trace/test/setup/mocha') const { setup } = require('./spec_helpers') const { rawExpectedSchema } = require('./sqs-naming') -const { assertObjectContains } = require('../../../integration-tests/helpers') const getQueueParams = (queueName) => { return { @@ -34,14 +29,8 @@ describe('Plugin', () => { let AWS let sqs let queueName - let queueNameDSM - let queueNameDSMConsumerOnly let queueOptions - let queueOptionsDsm - let queueOptionsDsmConsumerOnly let QueueUrl - let QueueUrlDsm - let QueueUrlDsmConsumerOnly let tracer const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk' @@ -50,21 +39,14 @@ describe('Plugin', () => { const id = randomUUID() queueName = `SQS_QUEUE_NAME-${id}` - queueNameDSM = `SQS_QUEUE_NAME_DSM-${id}` - queueNameDSMConsumerOnly = `SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${id}` queueOptions = getQueueParams(queueName) - queueOptionsDsm = getQueueParams(queueNameDSM) - queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly) QueueUrl = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME-${id}` - QueueUrlDsm = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM-${id}` - QueueUrlDsmConsumerOnly = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${id}` }) describe('without configuration', () => { before(() => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' tracer = require('../../dd-trace') tracer.use('aws-sdk', { sqs: { batchPropagationEnabled: true } }) @@ -416,258 +398,6 @@ describe('Plugin', () => { }, 250) }) }) - - describe('data stream monitoring', () => { - let expectedProducerHash - let expectedConsumerHash - let nowStub - - before(() => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' - tracer = require('../../dd-trace') - tracer.use('aws-sdk', { sqs: { dsmEnabled: true } }) - }) - - before(async () => { - return agent.load('aws-sdk', { - sqs: { - dsmEnabled: true - } - }, - { dsmEnabled: true }) - }) - - before(() => { - AWS = require(`../../../versions/${sqsClientName}@${version}`).get() - sqs = new AWS.SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' }) - }) - - beforeEach(() => { - const producerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + queueNameDSM, 'type:sqs'], - ENTRY_PARENT_HASH - ) - - expectedProducerHash = producerHash.readBigUInt64LE(0).toString() - expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'topic:' + queueNameDSM, 'type:sqs'], - producerHash - ).readBigUInt64LE(0).toString() - }) - - beforeEach(done => { - sqs.createQueue(queueOptionsDsm, (err, res) => err ? done(err) : done()) - }) - - beforeEach(done => { - sqs.createQueue(queueOptionsDsmConsumerOnly, (err, res) => err ? done(err) : done()) - }) - - afterEach(done => { - sqs.deleteQueue({ QueueUrl: QueueUrlDsm }, done) - }) - - afterEach(done => { - sqs.deleteQueue({ QueueUrl: QueueUrlDsmConsumerOnly }, done) - }) - - after(() => { - return agent.close({ ritmReset: false }) - }) - - afterEach(() => { - try { - nowStub.restore() - } catch { - // pass - } - agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true }) - }) - - it('Should set pathway hash tag on a span when producing', (done) => { - sqs.sendMessage({ - MessageBody: 'test DSM', - QueueUrl: QueueUrlDsm - }, (err) => { - if (err) return done(err) - - let produceSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.resource.startsWith('sendMessage')) { - produceSpanMeta = span.meta - } - - assertObjectContains(produceSpanMeta, { - 'pathway.hash': expectedProducerHash - }) - }).then(done, done) - }) - }) - - it('Should set pathway hash tag on a span when consuming', (done) => { - sqs.sendMessage({ - MessageBody: 'test DSM', - QueueUrl: QueueUrlDsm - }, (err) => { - if (err) return done(err) - - sqs.receiveMessage({ - QueueUrl: QueueUrlDsm, - MessageAttributeNames: ['.*'] - }, (err) => { - if (err) return done(err) - - let consumeSpanMeta = {} - agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.response') { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }).then(done, done) - }) - }) - }) - - if (sqsClientName === 'aws-sdk' && semver.intersects(version, '>=2.3')) { - it('Should set pathway hash tag on a span when consuming and promise() was used over a callback', - async () => { - let consumeSpanMeta = {} - const tracePromise = agent.assertSomeTraces(traces => { - const span = traces[0][0] - - if (span.name === 'aws.request' && span.meta['aws.operation'] === 'receiveMessage') { - consumeSpanMeta = span.meta - } - - assertObjectContains(consumeSpanMeta, { - 'pathway.hash': expectedConsumerHash - }) - }) - - await sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }).promise() - await sqs.receiveMessage({ QueueUrl: QueueUrlDsm }).promise() - - return tracePromise - }) - } - - it('Should emit DSM stats to the agent when sending a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }).then(done, done) - - sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => {}) - }) - - it('Should emit DSM stats to the agent when receiving a message', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 2) - assert.strictEqual(agent.dsmStatsExist(agent, expectedConsumerHash), true) - }, { timeoutMs: 5000 }).then(done, done) - - sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => { - sqs.receiveMessage({ QueueUrl: QueueUrlDsm, MessageAttributeNames: ['.*'] }, () => {}) - }) - }) - - it('Should emit DSM stats when receiving a message when the producer was not instrumented', done => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.strictEqual(statsPointsReceived, 1) - assert.strictEqual(agent.dsmStatsExistWithParentHash(agent, '0'), true) - }).then(done, done) - - agent.reload('aws-sdk', { sqs: { dsmEnabled: false } }, { dsmEnabled: false }) - sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsmConsumerOnly }, () => { - agent.reload('aws-sdk', { sqs: { dsmEnabled: true } }, { dsmEnabled: true }) - sqs.receiveMessage({ QueueUrl: QueueUrlDsmConsumerOnly, MessageAttributeNames: ['.*'] }, () => {}) - }) - }) - - it('Should emit DSM stats to the agent when sending batch messages', done => { - // we need to stub Date.now() to ensure a new stats bucket is created for each call - // otherwise, all stats checkpoints will be combined into a single stats points - let now = Date.now() - nowStub = sinon.stub(Date, 'now') - nowStub.callsFake(() => { - now += 1000000 - return now - }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 3 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 3) - assert.strictEqual(agent.dsmStatsExist(agent, expectedProducerHash), true) - }).then(done, done) - - sqs.sendMessageBatch( - { - Entries: [ - { - Id: '1', - MessageBody: 'test DSM 1' - }, - { - Id: '2', - MessageBody: 'test DSM 2' - }, - { - Id: '3', - MessageBody: 'test DSM 3' - } - ], - QueueUrl: QueueUrlDsm - }, () => { - nowStub.restore() - }) - }) - }) }) }) }) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js new file mode 100644 index 00000000000..e3b00bb35d7 --- /dev/null +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/dsm.spec.js @@ -0,0 +1,324 @@ +'use strict' + +const { expect } = require('chai') +const { describe, it, beforeEach, afterEach } = require('mocha') +const sinon = require('sinon') + +const { randomUUID } = require('node:crypto') + +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') + +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') + +const getDsmPathwayHash = (testTopic, isProducer, parentHash) => { + let edgeTags + if (isProducer) { + edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] + } else { + edgeTags = ['direction:in', 'group:test-group-confluent', 'topic:' + testTopic, 'type:kafka'] + } + + edgeTags.sort() + return computePathwayHash('test', 'tester', edgeTags, parentHash) +} + +describe('Plugin', () => { + const module = '@confluentinc/kafka-javascript' + const groupId = 'test-group-confluent' + + describe('confluentinc-kafka-javascript', function () { + this.timeout(30000) + + afterEach(() => { + return agent.close({ ritmReset: false }) + }) + + withVersions('confluentinc-kafka-javascript', module, (version) => { + let kafka + let admin + let tracer + let Kafka + let ConfluentKafka + let messages + let nativeApi + let testTopic + + describe('data stream monitoring', () => { + let consumer + let expectedProducerHash + let expectedConsumerHash + + beforeEach(async () => { + messages = [{ key: 'key1', value: 'test2' }] + + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../dd-trace') + await agent.load('confluentinc-kafka-javascript') + const lib = require(`../../../versions/${module}@${version}`).get() + + // Store the module for later use + nativeApi = lib + + // Setup for the KafkaJS wrapper tests + ConfluentKafka = lib.KafkaJS + Kafka = ConfluentKafka.Kafka + kafka = new Kafka({ + kafkaJS: { + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: ConfluentKafka.logLevel.WARN + } + }) + testTopic = `test-topic-${randomUUID()}` + admin = kafka.admin() + await admin.connect() + await admin.createTopics({ + topics: [{ + topic: testTopic, + numPartitions: 1, + replicationFactor: 1 + }] + }) + await admin.disconnect() + + consumer = kafka.consumer({ + kafkaJS: { groupId, fromBeginning: true } + }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + }) + + beforeEach(() => { + expectedProducerHash = getDsmPathwayHash(testTopic, true, ENTRY_PARENT_HASH) + expectedConsumerHash = getDsmPathwayHash(testTopic, false, expectedProducerHash) + }) + + afterEach(async () => { + await consumer.disconnect() + }) + + describe('checkpoints', () => { + let setDataStreamsContextSpy + + beforeEach(() => { + setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + }) + + afterEach(async () => { + setDataStreamsContextSpy.restore() + }) + + it('Should set a checkpoint on produce', async () => { + const messages = [{ key: 'consumerDSM1', value: 'test2' }] + await sendMessages(kafka, testTopic, messages) + expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) + }) + + it('Should set a checkpoint on consume (eachMessage)', async () => { + const runArgs = [] + let consumerReceiveMessagePromise + await consumer.run({ + eachMessage: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + consumerReceiveMessagePromise = Promise.resolve() + } + }) + await sendMessages(kafka, testTopic, messages).then( + async () => await consumerReceiveMessagePromise + ) + + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a checkpoint on consume (eachBatch)', async () => { + const runArgs = [] + let consumerReceiveMessagePromise + await consumer.run({ + eachBatch: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + consumerReceiveMessagePromise = Promise.resolve() + } + }) + await sendMessages(kafka, testTopic, messages).then( + async () => await consumerReceiveMessagePromise + ) + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a message payload size when producing a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + await sendMessages(kafka, testTopic, messages) + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + }) + + it('Should set a message payload size when consuming a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + let consumerReceiveMessagePromise + await consumer.run({ + eachMessage: async () => { + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + consumerReceiveMessagePromise = Promise.resolve() + } + }) + await sendMessages(kafka, testTopic, messages).then( + async () => await consumerReceiveMessagePromise + ) + }) + }) + + describe('backlogs', () => { + let setOffsetSpy + + beforeEach(() => { + setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') + }) + + afterEach(() => { + setOffsetSpy.restore() + }) + + it('Should add backlog on consumer explicit commit', async () => { + // Send a message, consume it, and record the last consumed offset + let commitMeta + + let messageProcessedResolve + const messageProcessedPromise = new Promise(resolve => { + messageProcessedResolve = resolve + }) + + const consumerRunPromise = consumer.run({ + eachMessage: async payload => { + const { topic, partition, message } = payload + commitMeta = { + topic, + partition, + offset: Number(message.offset) + } + messageProcessedResolve() + } + }) + + await consumerRunPromise + + // wait for the message to be processed before continuing + await sendMessages(kafka, testTopic, messages) + await messageProcessedPromise + await consumer.disconnect() + + for (const call of setOffsetSpy.getCalls()) { + expect(call.args[0]).to.not.have.property('type', 'kafka_commit') + } + + const newConsumer = kafka.consumer({ + kafkaJS: { groupId, fromBeginning: true, autoCommit: false } + }) + await newConsumer.connect() + await sendMessages(kafka, testTopic, [{ key: 'key1', value: 'test2' }]) + await newConsumer.run({ + eachMessage: async () => { + await newConsumer.disconnect() + } + }) + setOffsetSpy.resetHistory() + await newConsumer.commitOffsets() + + // Check our work + const runArg = setOffsetSpy.lastCall.args[0] + expect(runArg).to.have.property('offset', commitMeta.offset) + expect(runArg).to.have.property('partition', commitMeta.partition) + expect(runArg).to.have.property('topic', commitMeta.topic) + expect(runArg).to.have.property('type', 'kafka_commit') + expect(runArg).to.have.property('consumer_group', groupId) + }) + + it('Should add backlog on producer response', async () => { + await sendMessages(kafka, testTopic, messages) + expect(setOffsetSpy).to.be.calledOnce + const { topic } = setOffsetSpy.lastCall.args[0] + expect(topic).to.equal(testTopic) + }) + }) + + describe('when using a kafka broker version that does not support message headers', () => { + class KafkaJSError extends Error { + constructor (message) { + super(message) + this.name = 'KafkaJSError' + this.type = 'ERR_UNKNOWN' + } + } + let error + let producer + let produceStub + + beforeEach(async () => { + // simulate a kafka error for the broker version not supporting message headers + error = new KafkaJSError() + error.message = 'Simulated KafkaJSError ERR_UNKNOWN from Producer.produce stub' + producer = kafka.producer() + await producer.connect() + + // Spy on the produce method from the native library before it gets wrapped + produceStub = sinon.stub(nativeApi.Producer.prototype, 'produce') + .callsFake((topic, partition, message, key) => { + throw error + }) + }) + + afterEach(async () => { + produceStub.restore() + await producer.disconnect() + }) + + it('should hit an error for the first send and not inject headers in later sends', async () => { + const testMessages = [{ key: 'key1', value: 'test1' }] + const testMessages2 = [{ key: 'key2', value: 'test2' }] + + try { + await producer.send({ topic: testTopic, messages: testMessages }) + expect.fail('First producer.send() should have thrown an error') + } catch (e) { + expect(e).to.equal(error) + } + // Verify headers were injected in the first attempt + expect(testMessages[0].headers[0]).to.have.property('x-datadog-trace-id') + + // restore the stub to allow the next send to succeed + produceStub.restore() + + const result = await producer.send({ topic: testTopic, messages: testMessages2 }) + expect(testMessages2[0].headers).to.be.null + expect(result).to.not.be.undefined + }) + }) + }) + }) + }) +}) + +async function sendMessages (kafka, topic, messages) { + const producer = kafka.producer() + await producer.connect() + await producer.send({ + topic, + messages + }) + await producer.disconnect() +} diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 27cafbcdb2f..a8bd628333c 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -2,7 +2,6 @@ const { expect } = require('chai') const { describe, it, beforeEach, afterEach } = require('mocha') -const sinon = require('sinon') const { randomUUID } = require('node:crypto') @@ -12,22 +11,6 @@ const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/c const { expectedSchema } = require('./naming') const { withVersions } = require('../../dd-trace/test/setup/mocha') -const DataStreamsContext = require('../../dd-trace/src/datastreams/context') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') - -const getDsmPathwayHash = (testTopic, isProducer, parentHash) => { - let edgeTags - if (isProducer) { - edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] - } else { - edgeTags = ['direction:in', 'group:test-group-confluent', 'topic:' + testTopic, 'type:kafka'] - } - - edgeTags.sort() - return computePathwayHash('test', 'tester', edgeTags, parentHash) -} - describe('Plugin', () => { const module = '@confluentinc/kafka-javascript' const groupId = 'test-group-confluent' @@ -503,241 +486,6 @@ describe('Plugin', () => { // }) }) }) - - describe('data stream monitoring', () => { - let consumer - let expectedProducerHash - let expectedConsumerHash - - beforeEach(async () => { - tracer.init() - tracer.use('confluentinc-kafka-javascript', { dsmEnabled: true }) - messages = [{ key: 'key1', value: 'test2' }] - consumer = kafka.consumer({ - kafkaJS: { groupId, fromBeginning: true } - }) - await consumer.connect() - await consumer.subscribe({ topic: testTopic }) - }) - - beforeEach(() => { - expectedProducerHash = getDsmPathwayHash(testTopic, true, ENTRY_PARENT_HASH) - expectedConsumerHash = getDsmPathwayHash(testTopic, false, expectedProducerHash) - }) - - afterEach(async () => { - await consumer.disconnect() - }) - - describe('checkpoints', () => { - let setDataStreamsContextSpy - - beforeEach(() => { - setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') - }) - - afterEach(async () => { - setDataStreamsContextSpy.restore() - }) - - it('Should set a checkpoint on produce', async () => { - const messages = [{ key: 'consumerDSM1', value: 'test2' }] - await sendMessages(kafka, testTopic, messages) - expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) - }) - - it('Should set a checkpoint on consume (eachMessage)', async () => { - const runArgs = [] - let consumerReceiveMessagePromise - await consumer.run({ - eachMessage: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - consumerReceiveMessagePromise = Promise.resolve() - } - }) - await sendMessages(kafka, testTopic, messages).then( - async () => await consumerReceiveMessagePromise - ) - - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a checkpoint on consume (eachBatch)', async () => { - const runArgs = [] - let consumerReceiveMessagePromise - await consumer.run({ - eachBatch: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - consumerReceiveMessagePromise = Promise.resolve() - } - }) - await sendMessages(kafka, testTopic, messages).then( - async () => await consumerReceiveMessagePromise - ) - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a message payload size when producing a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - await sendMessages(kafka, testTopic, messages) - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - }) - - it('Should set a message payload size when consuming a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - let consumerReceiveMessagePromise - await consumer.run({ - eachMessage: async () => { - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - consumerReceiveMessagePromise = Promise.resolve() - } - }) - await sendMessages(kafka, testTopic, messages).then( - async () => await consumerReceiveMessagePromise - ) - }) - }) - - describe('backlogs', () => { - let setOffsetSpy - - beforeEach(() => { - setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') - }) - - afterEach(() => { - setOffsetSpy.restore() - }) - - it('Should add backlog on consumer explicit commit', async () => { - // Send a message, consume it, and record the last consumed offset - let commitMeta - - let messageProcessedResolve - const messageProcessedPromise = new Promise(resolve => { - messageProcessedResolve = resolve - }) - - const consumerRunPromise = consumer.run({ - eachMessage: async payload => { - const { topic, partition, message } = payload - commitMeta = { - topic, - partition, - offset: Number(message.offset) - } - messageProcessedResolve() - } - }) - - await consumerRunPromise - - // wait for the message to be processed before continuing - await sendMessages(kafka, testTopic, messages) - await messageProcessedPromise - await consumer.disconnect() - - for (const call of setOffsetSpy.getCalls()) { - expect(call.args[0]).to.not.have.property('type', 'kafka_commit') - } - - const newConsumer = kafka.consumer({ - kafkaJS: { groupId, fromBeginning: true, autoCommit: false } - }) - await newConsumer.connect() - await sendMessages(kafka, testTopic, [{ key: 'key1', value: 'test2' }]) - await newConsumer.run({ - eachMessage: async () => { - await newConsumer.disconnect() - } - }) - setOffsetSpy.resetHistory() - await newConsumer.commitOffsets() - - // Check our work - const runArg = setOffsetSpy.lastCall.args[0] - expect(runArg).to.have.property('offset', commitMeta.offset) - expect(runArg).to.have.property('partition', commitMeta.partition) - expect(runArg).to.have.property('topic', commitMeta.topic) - expect(runArg).to.have.property('type', 'kafka_commit') - expect(runArg).to.have.property('consumer_group', groupId) - }) - - it('Should add backlog on producer response', async () => { - await sendMessages(kafka, testTopic, messages) - expect(setOffsetSpy).to.be.calledOnce - const { topic } = setOffsetSpy.lastCall.args[0] - expect(topic).to.equal(testTopic) - }) - }) - - describe('when using a kafka broker version that does not support message headers', () => { - class KafkaJSError extends Error { - constructor (message) { - super(message) - this.name = 'KafkaJSError' - this.type = 'ERR_UNKNOWN' - } - } - let error - let producer - let produceStub - - beforeEach(async () => { - // simulate a kafka error for the broker version not supporting message headers - error = new KafkaJSError() - error.message = 'Simulated KafkaJSError ERR_UNKNOWN from Producer.produce stub' - producer = kafka.producer() - await producer.connect() - - // Spy on the produce method from the native library before it gets wrapped - produceStub = sinon.stub(nativeApi.Producer.prototype, 'produce') - .callsFake((topic, partition, message, key) => { - throw error - }) - }) - - afterEach(async () => { - produceStub.restore() - await producer.disconnect() - }) - - it('should hit an error for the first send and not inject headers in later sends', async () => { - const testMessages = [{ key: 'key1', value: 'test1' }] - const testMessages2 = [{ key: 'key2', value: 'test2' }] - - try { - await producer.send({ topic: testTopic, messages: testMessages }) - expect.fail('First producer.send() should have thrown an error') - } catch (e) { - expect(e).to.equal(error) - } - // Verify headers were injected in the first attempt - expect(testMessages[0].headers[0]).to.have.property('x-datadog-trace-id') - - // restore the stub to allow the next send to succeed - produceStub.restore() - - const result = await producer.send({ topic: testTopic, messages: testMessages2 }) - expect(testMessages2[0].headers).to.be.null - expect(result).to.not.be.undefined - }) - }) - }) }) }) }) diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js new file mode 100644 index 00000000000..59eb4a0c25c --- /dev/null +++ b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js @@ -0,0 +1,167 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { expect } = require('chai') +const { after, before, beforeEach, describe, it } = require('mocha') +const sinon = require('sinon') + +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') +const id = require('../../dd-trace/src/id') +const agent = require('../../dd-trace/test/plugins/agent') +const { withVersions } = require('../../dd-trace/test/setup/mocha') + +const TIMEOUT = 30000 +const dsmTopicName = 'dsm-topic' + +describe('Plugin', () => { + let tracer + + describe('google-cloud-pubsub', function () { + this.timeout(TIMEOUT) + + before(() => { + process.env.PUBSUB_EMULATOR_HOST = 'localhost:8081' + process.env.DD_DATA_STREAMS_ENABLED = 'true' + }) + + after(() => { + delete process.env.PUBSUB_EMULATOR_HOST + }) + + after(() => { + return agent.close({ ritmReset: false }) + }) + + withVersions('google-cloud-pubsub', '@google-cloud/pubsub', version => { + let pubsub + let project + let expectedProducerHash + let expectedConsumerHash + + describe('data stream monitoring', () => { + let dsmTopic + let sub + let consume + + before(async () => { + tracer = require('../../dd-trace') + await agent.load('google-cloud-pubsub', { + dsmEnabled: true + }) + tracer.use('google-cloud-pubsub', { dsmEnabled: true }) + + const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get() + project = getProjectId() + pubsub = new PubSub({ projectId: project }) + + dsmTopic = await pubsub.createTopic(dsmTopicName) + dsmTopic = dsmTopic[0] + sub = await dsmTopic.createSubscription('DSM') + sub = sub[0] + consume = function (cb) { + sub.on('message', cb) + } + + const dsmFullTopic = `projects/${project}/topics/${dsmTopicName}` + + expectedProducerHash = computePathwayHash( + 'test', + 'tester', + ['direction:out', 'topic:' + dsmFullTopic, 'type:google-pubsub'], + ENTRY_PARENT_HASH + ) + expectedConsumerHash = computePathwayHash( + 'test', + 'tester', + ['direction:in', 'topic:' + dsmFullTopic, 'type:google-pubsub'], + expectedProducerHash + ) + }) + + beforeEach(() => { + return agent.load('google-cloud-pubsub', { + dsmEnabled: true + }) + }) + + describe('should set a DSM checkpoint', () => { + it('on produce', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 1) + expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true) + }, { timeoutMs: TIMEOUT }) + }) + + it('on consume', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM consume checkpoint') }) + await consume(async () => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + assert.ok(statsPointsReceived >= 2) + expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true) + }, { timeoutMs: TIMEOUT }) + }) + }) + }) + + describe('it should set a message payload size', () => { + let recordCheckpointSpy + + beforeEach(() => { + recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + }) + + afterEach(() => { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + }) + + it('when producing a message', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') }) + assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + }) + + it('when consuming a message', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') }) + + await consume(async () => { + assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + }) + }) + }) + }) + }) + }) +}) + +function getProjectId () { + return `test-project-dsm-${id()}` +} + +function publish (topic, options) { + if (topic.publishMessage) { + return topic.publishMessage(options) + } else { + return topic.publish(options.data) + } +} diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js index acc7cebbece..f6375385803 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js @@ -2,20 +2,15 @@ const assert = require('node:assert/strict') -const { expect } = require('chai') const { after, afterEach, before, beforeEach, describe, it } = require('mocha') -const sinon = require('sinon') const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') const agent = require('../../dd-trace/test/plugins/agent') const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/helpers') const { withNamingSchema, withVersions } = require('../../dd-trace/test/setup/mocha') const { expectedSchema, rawExpectedSchema } = require('./naming') const TIMEOUT = 30000 -const dsmTopicName = 'dsm-topic' describe('Plugin', () => { let tracer @@ -25,7 +20,6 @@ describe('Plugin', () => { before(() => { process.env.PUBSUB_EMULATOR_HOST = 'localhost:8081' - process.env.DD_DATA_STREAMS_ENABLED = 'true' }) after(() => { @@ -42,8 +36,6 @@ describe('Plugin', () => { let resource let v1 let gax - let expectedProducerHash - let expectedConsumerHash describe('without configuration', () => { beforeEach(() => { @@ -333,113 +325,6 @@ describe('Plugin', () => { }) }) - describe('data stream monitoring', () => { - let dsmTopic - let sub - let consume - - beforeEach(() => { - return agent.load('google-cloud-pubsub', { - dsmEnabled: true - }) - }) - - before(async () => { - const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get() - project = getProjectId() - resource = `projects/${project}/topics/${dsmTopicName}` - pubsub = new PubSub({ projectId: project }) - tracer.use('google-cloud-pubsub', { dsmEnabled: true }) - - dsmTopic = await pubsub.createTopic(dsmTopicName) - dsmTopic = dsmTopic[0] - sub = await dsmTopic.createSubscription('DSM') - sub = sub[0] - consume = function (cb) { - sub.on('message', cb) - } - - const dsmFullTopic = `projects/${project}/topics/${dsmTopicName}` - - expectedProducerHash = computePathwayHash( - 'test', - 'tester', - ['direction:out', 'topic:' + dsmFullTopic, 'type:google-pubsub'], - ENTRY_PARENT_HASH - ) - expectedConsumerHash = computePathwayHash( - 'test', - 'tester', - ['direction:in', 'topic:' + dsmFullTopic, 'type:google-pubsub'], - expectedProducerHash - ) - }) - - describe('should set a DSM checkpoint', () => { - it('on produce', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') }) - - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 1 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 1) - expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true) - }, { timeoutMs: TIMEOUT }) - }) - - it('on consume', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM consume checkpoint') }) - await consume(async () => { - agent.expectPipelineStats(dsmStats => { - let statsPointsReceived = 0 - // we should have 2 dsm stats points - dsmStats.forEach((timeStatsBucket) => { - if (timeStatsBucket && timeStatsBucket.Stats) { - timeStatsBucket.Stats.forEach((statsBuckets) => { - statsPointsReceived += statsBuckets.Stats.length - }) - } - }) - assert.ok(statsPointsReceived >= 2) - expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true) - }, { timeoutMs: TIMEOUT }) - }) - }) - }) - - describe('it should set a message payload size', () => { - let recordCheckpointSpy - - beforeEach(() => { - recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - }) - - afterEach(() => { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - }) - - it('when producing a message', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') }) - assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - }) - - it('when consuming a message', async () => { - await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') }) - - await consume(async () => { - assert.ok(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - }) - }) - }) - }) - function expectSpanWithDefaults (expected) { const prefixedResource = [expected.meta['pubsub.method'], resource].filter(x => x).join(' ') const service = expected.meta['pubsub.method'] ? 'test-pubsub' : 'test' diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js new file mode 100644 index 00000000000..8ca39a0f93f --- /dev/null +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -0,0 +1,246 @@ +'use strict' + +const { randomUUID } = require('crypto') +const { expect } = require('chai') +const { describe, it, beforeEach, afterEach } = require('mocha') +const semver = require('semver') +const sinon = require('sinon') + +const { withVersions } = require('../../dd-trace/test/setup/mocha') +const agent = require('../../dd-trace/test/plugins/agent') + +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') + +const testKafkaClusterId = '5L6g3nShT-eMCtK--X86sw' + +const getDsmPathwayHash = (testTopic, clusterIdAvailable, isProducer, parentHash) => { + let edgeTags + if (isProducer) { + edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] + } else { + edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'] + } + + if (clusterIdAvailable) { + edgeTags.push(`kafka_cluster_id:${testKafkaClusterId}`) + } + edgeTags.sort() + return computePathwayHash('test', 'tester', edgeTags, parentHash) +} + +describe('Plugin', () => { + describe('kafkajs', function () { + this.timeout(10000) + + afterEach(() => { + return agent.close({ ritmReset: false }) + }) + + withVersions('kafkajs', 'kafkajs', (version) => { + let kafka + let admin + let tracer + let Kafka + let clusterIdAvailable + let expectedProducerHash + let expectedConsumerHash + let testTopic + + describe('data stream monitoring', () => { + const messages = [{ key: 'key1', value: 'test2' }] + + beforeEach(async () => { + process.env.DD_DATA_STREAMS_ENABLED = 'true' + tracer = require('../../dd-trace') + await agent.load('kafkajs') + const lib = require(`../../../versions/kafkajs@${version}`).get() + Kafka = lib.Kafka + kafka = new Kafka({ + clientId: `kafkajs-test-${version}`, + brokers: ['127.0.0.1:9092'], + logLevel: lib.logLevel.WARN + }) + testTopic = `test-topic-${randomUUID()}` + admin = kafka.admin() + await admin.createTopics({ + topics: [{ + topic: testTopic, + numPartitions: 1, + replicationFactor: 1 + }] + }) + clusterIdAvailable = semver.intersects(version, '>=1.13') + expectedProducerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, true, ENTRY_PARENT_HASH) + expectedConsumerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, false, expectedProducerHash) + }) + + describe('checkpoints', () => { + let consumer + let setDataStreamsContextSpy + + beforeEach(async () => { + tracer.init() + tracer.use('kafkajs', { dsmEnabled: true }) + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + }) + + afterEach(async () => { + setDataStreamsContextSpy.restore() + await consumer.disconnect() + }) + + it('Should set a checkpoint on produce', async () => { + const messages = [{ key: 'consumerDSM1', value: 'test2' }] + await sendMessages(kafka, testTopic, messages) + expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) + }) + + it('Should set a checkpoint on consume (eachMessage)', async () => { + const runArgs = [] + await consumer.run({ + eachMessage: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + } + }) + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a checkpoint on consume (eachBatch)', async () => { + const runArgs = [] + await consumer.run({ + eachBatch: async () => { + runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) + } + }) + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + for (const runArg of runArgs) { + expect(runArg.hash).to.equal(expectedConsumerHash) + } + }) + + it('Should set a message payload size when producing a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + await sendMessages(kafka, testTopic, messages) + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + }) + + it('Should set a message payload size when consuming a message', async () => { + const messages = [{ key: 'key1', value: 'test2' }] + if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + } + const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + await sendMessages(kafka, testTopic, messages) + await consumer.run({ + eachMessage: async () => { + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + recordCheckpointSpy.restore() + } + }) + }) + }) + + describe('backlogs', () => { + let consumer + let setOffsetSpy + + beforeEach(async () => { + tracer.init() + tracer.use('kafkajs', { dsmEnabled: true }) + consumer = kafka.consumer({ groupId: 'test-group' }) + await consumer.connect() + await consumer.subscribe({ topic: testTopic }) + setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') + }) + + afterEach(async () => { + setOffsetSpy.restore() + await consumer.disconnect() + }) + + if (semver.intersects(version, '>=1.10')) { + it('Should add backlog on consumer explicit commit', async () => { + // Send a message, consume it, and record the last consumed offset + let commitMeta + const deferred = {} + deferred.promise = new Promise((resolve, reject) => { + deferred.resolve = resolve + deferred.reject = reject + }) + await consumer.run({ + eachMessage: async payload => { + const { topic, partition, message } = payload + commitMeta = { + topic, + partition, + offset: Number(message.offset) + } + deferred.resolve() + }, + autoCommit: false + }) + await sendMessages(kafka, testTopic, messages) + await deferred.promise + await consumer.disconnect() // Flush ongoing `eachMessage` calls + for (const call of setOffsetSpy.getCalls()) { + expect(call.args[0]).to.not.have.property('type', 'kafka_commit') + } + + /** + * No choice but to reinitialize everything, because the only way to flush eachMessage + * calls is to disconnect. + */ + consumer.connect() + await sendMessages(kafka, testTopic, messages) + await consumer.run({ eachMessage: async () => {}, autoCommit: false }) + setOffsetSpy.resetHistory() + await consumer.commitOffsets([commitMeta]) + await consumer.disconnect() + + // Check our work + const runArg = setOffsetSpy.lastCall.args[0] + expect(setOffsetSpy).to.be.calledOnce + expect(runArg).to.have.property('offset', commitMeta.offset) + expect(runArg).to.have.property('partition', commitMeta.partition) + expect(runArg).to.have.property('topic', commitMeta.topic) + expect(runArg).to.have.property('type', 'kafka_commit') + expect(runArg).to.have.property('consumer_group', 'test-group') + }) + } + + it('Should add backlog on producer response', async () => { + await sendMessages(kafka, testTopic, messages) + expect(setOffsetSpy).to.be.calledOnce + const { topic } = setOffsetSpy.lastCall.args[0] + expect(topic).to.equal(testTopic) + }) + }) + }) + }) + }) +}) + +async function sendMessages (kafka, topic, messages) { + const producer = kafka.producer() + await producer.connect() + await producer.send({ + topic, + messages + }) + await producer.disconnect() +} diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 6f3e7dd33cc..f512d74b9b8 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -3,7 +3,7 @@ const { randomUUID } = require('crypto') const { expect } = require('chai') const dc = require('dc-polyfill') -const { describe, it, beforeEach, afterEach, before } = require('mocha') +const { describe, it, beforeEach, afterEach } = require('mocha') const semver = require('semver') const sinon = require('sinon') @@ -13,27 +13,9 @@ const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/he const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') const { expectedSchema, rawExpectedSchema } = require('./naming') -const DataStreamsContext = require('../../dd-trace/src/datastreams/context') -const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') -const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const testKafkaClusterId = '5L6g3nShT-eMCtK--X86sw' -const getDsmPathwayHash = (testTopic, clusterIdAvailable, isProducer, parentHash) => { - let edgeTags - if (isProducer) { - edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka'] - } else { - edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka'] - } - - if (clusterIdAvailable) { - edgeTags.push(`kafka_cluster_id:${testKafkaClusterId}`) - } - edgeTags.sort() - return computePathwayHash('test', 'tester', edgeTags, parentHash) -} - describe('Plugin', () => { describe('kafkajs', function () { // TODO: remove when new internal trace has landed @@ -49,8 +31,6 @@ describe('Plugin', () => { let Kafka let Broker let clusterIdAvailable - let expectedProducerHash - let expectedConsumerHash let testTopic describe('without configuration', () => { @@ -58,7 +38,6 @@ describe('Plugin', () => { const messages2 = [{ key: 'key2', value: 'test3' }] beforeEach(async () => { - process.env.DD_DATA_STREAMS_ENABLED = 'true' tracer = require('../../dd-trace') await agent.load('kafkajs') const lib = require(`../../../versions/kafkajs@${version}`).get() @@ -79,8 +58,6 @@ describe('Plugin', () => { }] }) clusterIdAvailable = semver.intersects(version, '>=1.13') - expectedProducerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, true, ENTRY_PARENT_HASH) - expectedConsumerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, false, expectedProducerHash) }) describe('producer', () => { @@ -88,7 +65,6 @@ describe('Plugin', () => { const meta = { 'span.kind': 'producer', component: 'kafkajs', - 'pathway.hash': expectedProducerHash.readBigUInt64LE(0).toString(), 'messaging.destination.name': testTopic, 'messaging.kafka.bootstrap.servers': '127.0.0.1:9092' } @@ -256,7 +232,6 @@ describe('Plugin', () => { meta: { 'span.kind': 'consumer', component: 'kafkajs', - 'pathway.hash': expectedConsumerHash.readBigUInt64LE(0).toString(), 'messaging.destination.name': testTopic }, resource: testTopic, @@ -433,169 +408,6 @@ describe('Plugin', () => { rawExpectedSchema.receive ) }) - - describe('data stream monitoring', () => { - let consumer - - beforeEach(async () => { - tracer.init() - tracer.use('kafkajs', { dsmEnabled: true }) - consumer = kafka.consumer({ groupId: 'test-group' }) - await consumer.connect() - await consumer.subscribe({ topic: testTopic }) - }) - - before(() => { - clusterIdAvailable = semver.intersects(version, '>=1.13') - expectedProducerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, true, ENTRY_PARENT_HASH) - expectedConsumerHash = getDsmPathwayHash(testTopic, clusterIdAvailable, false, expectedProducerHash) - }) - - afterEach(async () => { - await consumer.disconnect() - }) - - describe('checkpoints', () => { - let setDataStreamsContextSpy - - beforeEach(() => { - setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') - }) - - afterEach(() => { - setDataStreamsContextSpy.restore() - }) - - it('Should set a checkpoint on produce', async () => { - const messages = [{ key: 'consumerDSM1', value: 'test2' }] - await sendMessages(kafka, testTopic, messages) - expect(setDataStreamsContextSpy.args[0][0].hash).to.equal(expectedProducerHash) - }) - - it('Should set a checkpoint on consume (eachMessage)', async () => { - const runArgs = [] - await consumer.run({ - eachMessage: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - } - }) - await sendMessages(kafka, testTopic, messages) - await consumer.disconnect() - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a checkpoint on consume (eachBatch)', async () => { - const runArgs = [] - await consumer.run({ - eachBatch: async () => { - runArgs.push(setDataStreamsContextSpy.lastCall.args[0]) - } - }) - await sendMessages(kafka, testTopic, messages) - await consumer.disconnect() - for (const runArg of runArgs) { - expect(runArg.hash).to.equal(expectedConsumerHash) - } - }) - - it('Should set a message payload size when producing a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - await sendMessages(kafka, testTopic, messages) - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - }) - - it('Should set a message payload size when consuming a message', async () => { - const messages = [{ key: 'key1', value: 'test2' }] - if (DataStreamsProcessor.prototype.recordCheckpoint.isSinonProxy) { - DataStreamsProcessor.prototype.recordCheckpoint.restore() - } - const recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') - await sendMessages(kafka, testTopic, messages) - await consumer.run({ - eachMessage: async () => { - expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) - recordCheckpointSpy.restore() - } - }) - }) - }) - - describe('backlogs', () => { - let setOffsetSpy - - beforeEach(() => { - setOffsetSpy = sinon.spy(tracer._tracer._dataStreamsProcessor, 'setOffset') - }) - - afterEach(() => { - setOffsetSpy.restore() - }) - - if (semver.intersects(version, '>=1.10')) { - it('Should add backlog on consumer explicit commit', async () => { - // Send a message, consume it, and record the last consumed offset - let commitMeta - const deferred = {} - deferred.promise = new Promise((resolve, reject) => { - deferred.resolve = resolve - deferred.reject = reject - }) - await consumer.run({ - eachMessage: async payload => { - const { topic, partition, message } = payload - commitMeta = { - topic, - partition, - offset: Number(message.offset) - } - deferred.resolve() - }, - autoCommit: false - }) - await sendMessages(kafka, testTopic, messages) - await deferred.promise - await consumer.disconnect() // Flush ongoing `eachMessage` calls - for (const call of setOffsetSpy.getCalls()) { - expect(call.args[0]).to.not.have.property('type', 'kafka_commit') - } - - /** - * No choice but to reinitialize everything, because the only way to flush eachMessage - * calls is to disconnect. - */ - consumer.connect() - await sendMessages(kafka, testTopic, messages) - await consumer.run({ eachMessage: async () => {}, autoCommit: false }) - setOffsetSpy.resetHistory() - await consumer.commitOffsets([commitMeta]) - await consumer.disconnect() - - // Check our work - const runArg = setOffsetSpy.lastCall.args[0] - expect(setOffsetSpy).to.be.calledOnce - expect(runArg).to.have.property('offset', commitMeta.offset) - expect(runArg).to.have.property('partition', commitMeta.partition) - expect(runArg).to.have.property('topic', commitMeta.topic) - expect(runArg).to.have.property('type', 'kafka_commit') - expect(runArg).to.have.property('consumer_group', 'test-group') - }) - } - - it('Should add backlog on producer response', async () => { - await sendMessages(kafka, testTopic, messages) - expect(setOffsetSpy).to.be.calledOnce - const { topic } = setOffsetSpy.lastCall.args[0] - expect(topic).to.equal(testTopic) - }) - }) - }) }) }) })