Skip to content

Commit bae1de7

Browse files
nicohrubecclaude
andauthored
ref(node): Vendor amqplib instrumentation (#21003)
Vendors `@opentelemetry/instrumentation-amqplib` into the SDK with no logic changes. Uses latest upstream v0.65.0 which extends supported versions to include amqplib v1.x. Types from `@types/amqplib` are inlined as simplified interfaces to avoid requiring the package as a dependency. Adds integration test for amqplib v1.x. Closes #20144 --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 03f60fb commit bae1de7

15 files changed

Lines changed: 1405 additions & 12 deletions

File tree

.oxlintrc.base.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@
153153
"**/integration/aws/vendored/**/*.ts",
154154
"**/nestjs/src/integrations/vendored/**/*.ts",
155155
"**/integrations/tracing/kafka/vendored/**/*.ts",
156-
"**/integrations/tracing/tedious/vendored/**/*.ts"
156+
"**/integrations/tracing/tedious/vendored/**/*.ts",
157+
"**/integrations/tracing/amqplib/vendored/**/*.ts"
157158
],
158159
"rules": {
159160
"typescript/no-explicit-any": "off"
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
version: '3'
2+
3+
services:
4+
rabbitmq:
5+
image: rabbitmq:management
6+
container_name: rabbitmq-v1
7+
environment:
8+
- RABBITMQ_DEFAULT_USER=sentry
9+
- RABBITMQ_DEFAULT_PASS=sentry
10+
ports:
11+
- '5673:5672'
12+
- '15673:15672'
13+
healthcheck:
14+
test: ['CMD-SHELL', 'rabbitmq-diagnostics -q ping']
15+
interval: 2s
16+
timeout: 10s
17+
retries: 30
18+
start_period: 15s
19+
20+
networks:
21+
default:
22+
driver: bridge
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import * as Sentry from '@sentry/node';
2+
import { loggingTransport } from '@sentry-internal/node-integration-tests';
3+
4+
Sentry.init({
5+
dsn: 'https://public@dsn.ingest.sentry.io/1337',
6+
release: '1.0',
7+
tracesSampleRate: 1.0,
8+
transport: loggingTransport,
9+
});
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"name": "sentry-amqplib-v1-test",
3+
"version": "1.0.0",
4+
"dependencies": {
5+
"amqplib": "^1.0.0"
6+
}
7+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import * as Sentry from '@sentry/node';
2+
import amqp from 'amqplib';
3+
4+
const queueName = 'queue1';
5+
const amqpUsername = 'sentry';
6+
const amqpPassword = 'sentry';
7+
8+
const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5673/`;
9+
const ACKNOWLEDGEMENT = { noAck: false };
10+
11+
const QUEUE_OPTIONS = {
12+
durable: true, // Make the queue durable
13+
exclusive: false, // Not exclusive
14+
autoDelete: false, // Don't auto-delete the queue
15+
arguments: {
16+
'x-message-ttl': 30000, // Message TTL of 30 seconds
17+
'x-max-length': 1000, // Maximum queue length of 1000 messages
18+
},
19+
};
20+
21+
(async () => {
22+
const { connection, channel } = await connectToRabbitMQ();
23+
await createQueue(queueName, channel);
24+
25+
const consumeMessagePromise = consumeMessageFromQueue(queueName, channel);
26+
27+
await Sentry.startSpan({ name: 'root span' }, async () => {
28+
sendMessageToQueue(queueName, channel, JSON.stringify({ foo: 'bar01' }));
29+
});
30+
31+
await consumeMessagePromise;
32+
33+
await channel.close();
34+
await connection.close();
35+
})();
36+
37+
async function connectToRabbitMQ() {
38+
const connection = await amqp.connect(AMQP_URL);
39+
const channel = await connection.createChannel();
40+
return { connection, channel };
41+
}
42+
43+
async function createQueue(queueName, channel) {
44+
await channel.assertQueue(queueName, QUEUE_OPTIONS);
45+
}
46+
47+
function sendMessageToQueue(queueName, channel, message) {
48+
channel.sendToQueue(queueName, Buffer.from(message));
49+
}
50+
51+
async function consumer(queueName, channel) {
52+
return new Promise((resolve, reject) => {
53+
channel
54+
.consume(
55+
queueName,
56+
message => {
57+
if (message) {
58+
channel.ack(message);
59+
resolve();
60+
} else {
61+
reject(new Error('No message received'));
62+
}
63+
},
64+
ACKNOWLEDGEMENT,
65+
)
66+
.catch(reject);
67+
});
68+
}
69+
70+
async function consumeMessageFromQueue(queueName, channel) {
71+
await consumer(queueName, channel);
72+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import type { TransactionEvent } from '@sentry/core';
2+
import { afterAll, describe, expect } from 'vitest';
3+
import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner';
4+
5+
const EXPECTED_MESSAGE_SPAN_PRODUCER = expect.objectContaining({
6+
op: 'message',
7+
data: expect.objectContaining({
8+
'messaging.system': 'rabbitmq',
9+
'otel.kind': 'PRODUCER',
10+
'sentry.op': 'message',
11+
'sentry.origin': 'auto.amqplib.otel.publisher',
12+
}),
13+
status: 'ok',
14+
});
15+
16+
const EXPECTED_MESSAGE_SPAN_CONSUMER = expect.objectContaining({
17+
op: 'message',
18+
data: expect.objectContaining({
19+
'messaging.system': 'rabbitmq',
20+
'otel.kind': 'CONSUMER',
21+
'sentry.op': 'message',
22+
'sentry.origin': 'auto.amqplib.otel.consumer',
23+
}),
24+
status: 'ok',
25+
});
26+
27+
describe('amqplib v1 auto-instrumentation', () => {
28+
afterAll(async () => {
29+
cleanupChildProcesses();
30+
});
31+
32+
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createTestRunner, test) => {
33+
test('should be able to send and receive messages with amqplib v1', { timeout: 60_000 }, async () => {
34+
await createTestRunner()
35+
.withDockerCompose({
36+
workingDirectory: [__dirname],
37+
})
38+
.expect({
39+
transaction: (transaction: TransactionEvent) => {
40+
expect(transaction.transaction).toEqual('root span');
41+
expect(transaction.spans?.length).toEqual(1);
42+
expect(transaction.spans![0]).toMatchObject(EXPECTED_MESSAGE_SPAN_PRODUCER);
43+
},
44+
})
45+
.expect({
46+
transaction: (transaction: TransactionEvent) => {
47+
expect(transaction.transaction).toEqual('queue1 process');
48+
expect(transaction.contexts?.trace).toMatchObject(EXPECTED_MESSAGE_SPAN_CONSUMER);
49+
},
50+
})
51+
.start()
52+
.completed();
53+
});
54+
});
55+
});

packages/node/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
"@opentelemetry/api": "^1.9.1",
6969
"@opentelemetry/core": "^2.6.1",
7070
"@opentelemetry/instrumentation": "^0.214.0",
71-
"@opentelemetry/instrumentation-amqplib": "0.61.0",
7271
"@opentelemetry/instrumentation-graphql": "0.62.0",
7372
"@opentelemetry/instrumentation-hapi": "0.60.0",
7473
"@opentelemetry/instrumentation-http": "0.214.0",

packages/node/src/integrations/tracing/amqplib.ts renamed to packages/node/src/integrations/tracing/amqplib/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Span } from '@opentelemetry/api';
2-
import { AmqplibInstrumentation, type AmqplibInstrumentationConfig } from '@opentelemetry/instrumentation-amqplib';
2+
import { AmqplibInstrumentation } from './vendored/amqplib';
3+
import type { AmqplibInstrumentationConfig } from './vendored/types';
34
import type { IntegrationFn } from '@sentry/core';
45
import { defineIntegration } from '@sentry/core';
56
import { addOriginToSpan, generateInstrumentOnce } from '@sentry/node-core';
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Simplified types inlined from @types/amqplib (DefinitelyTyped).
3+
* Only includes members accessed by this instrumentation.
4+
* Other amqplib types (Message, ConsumeMessage, Options.Publish, etc.) are already
5+
* vendored in types.ts by the upstream OTel instrumentation.
6+
*/
7+
8+
export interface Connection {
9+
connection: { serverProperties: { product?: string; [key: string]: any } };
10+
[key: string]: any;
11+
}
12+
13+
export interface Channel {
14+
connection: Connection;
15+
[key: string]: any;
16+
}
17+
18+
export interface ConfirmChannel extends Channel {}
19+
20+
export namespace Options {
21+
export interface Connect {
22+
protocol?: string;
23+
hostname?: string;
24+
port?: number;
25+
username?: string;
26+
vhost?: string;
27+
}
28+
export interface Consume {
29+
consumerTag?: string;
30+
noLocal?: boolean;
31+
noAck?: boolean;
32+
exclusive?: boolean;
33+
priority?: number;
34+
arguments?: any;
35+
}
36+
export interface Publish {
37+
headers?: any;
38+
[key: string]: any;
39+
}
40+
}
41+
42+
export namespace Replies {
43+
export interface Empty {}
44+
export interface Consume {
45+
consumerTag: string;
46+
}
47+
}

0 commit comments

Comments
 (0)