Skip to content

Commit 74869e4

Browse files
committed
feat(core): Add Supabase Queues support
1 parent 1d10238 commit 74869e4

File tree

5 files changed

+279
-6
lines changed

5 files changed

+279
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://[email protected]/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues.rpc('enqueue', {
22+
queue_name: 'todos',
23+
msg: { title: 'Test Todo' },
24+
});
25+
26+
await queues.rpc('dequeue', {
27+
queue_name: 'todos',
28+
});
29+
} catch (error) {
30+
Sentry.captureException(error);
31+
}
32+
}
33+
34+
performQueueOperations();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import type { Page} from '@playwright/test';
2+
import { expect } from '@playwright/test';
3+
import type { Event } from '@sentry/core';
4+
5+
import { sentryTest } from '../../../../utils/fixtures';
6+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
7+
8+
async function mockSupabaseRoute(page: Page) {
9+
await page.route('**/rest/v1/rpc**', route => {
10+
return route.fulfill({
11+
status: 200,
12+
body: JSON.stringify({
13+
foo: ['bar', 'baz'],
14+
}),
15+
headers: {
16+
'Content-Type': 'application/json',
17+
},
18+
});
19+
});
20+
}
21+
22+
sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
23+
await mockSupabaseRoute(page);
24+
25+
if (shouldSkipTracingTest()) {
26+
return;
27+
}
28+
29+
const url = await getLocalTestUrl({ testDir: __dirname });
30+
31+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
32+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
33+
34+
expect(queueSpans).toHaveLength(2);
35+
36+
expect(queueSpans![0]).toMatchObject({
37+
description: 'supabase.db.rpc',
38+
parent_span_id: event.contexts?.trace?.span_id,
39+
span_id: expect.any(String),
40+
start_timestamp: expect.any(Number),
41+
timestamp: expect.any(Number),
42+
trace_id: event.contexts?.trace?.trace_id,
43+
data: expect.objectContaining({
44+
'sentry.op': 'queue.publish',
45+
'sentry.origin': 'auto.db.supabase',
46+
'messaging.destination.name': 'todos',
47+
'messaging.message.id': 'Test Todo',
48+
}),
49+
});
50+
51+
expect(queueSpans![1]).toMatchObject({
52+
description: 'supabase.db.rpc',
53+
parent_span_id: event.contexts?.trace?.span_id,
54+
span_id: expect.any(String),
55+
start_timestamp: expect.any(Number),
56+
timestamp: expect.any(Number),
57+
trace_id: event.contexts?.trace?.trace_id,
58+
data: expect.objectContaining({
59+
'sentry.op': 'queue.process',
60+
'sentry.origin': 'auto.db.supabase',
61+
'messaging.destination.name': 'todos',
62+
}),
63+
});
64+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://[email protected]/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues
22+
.schema('pgmq_public')
23+
.rpc('enqueue', {
24+
queue_name: 'todos',
25+
msg: { title: 'Test Todo' },
26+
});
27+
28+
await queues
29+
.schema('pgmq_public')
30+
.rpc('dequeue', {
31+
queue_name: 'todos',
32+
});
33+
} catch (error) {
34+
Sentry.captureException(error);
35+
}
36+
}
37+
38+
performQueueOperations();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { type Page, expect } from '@playwright/test';
2+
import type { Event } from '@sentry/core';
3+
4+
import { sentryTest } from '../../../../utils/fixtures';
5+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
6+
7+
async function mockSupabaseRoute(page: Page) {
8+
await page.route('**/rest/v1/rpc**', route => {
9+
return route.fulfill({
10+
status: 200,
11+
body: JSON.stringify({
12+
foo: ['bar', 'baz'],
13+
}),
14+
headers: {
15+
'Content-Type': 'application/json',
16+
},
17+
});
18+
});
19+
}
20+
21+
sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
22+
await mockSupabaseRoute(page);
23+
24+
if (shouldSkipTracingTest()) {
25+
return;
26+
}
27+
28+
const url = await getLocalTestUrl({ testDir: __dirname });
29+
30+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
31+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
32+
33+
expect(queueSpans).toHaveLength(2);
34+
35+
expect(queueSpans![0]).toMatchObject({
36+
description: 'supabase.db.rpc',
37+
parent_span_id: event.contexts?.trace?.span_id,
38+
span_id: expect.any(String),
39+
start_timestamp: expect.any(Number),
40+
timestamp: expect.any(Number),
41+
trace_id: event.contexts?.trace?.trace_id,
42+
data: expect.objectContaining({
43+
'sentry.op': 'queue.publish',
44+
'sentry.origin': 'auto.db.supabase',
45+
'messaging.destination.name': 'todos',
46+
'messaging.message.id': 'Test Todo',
47+
}),
48+
});
49+
50+
expect(queueSpans![1]).toMatchObject({
51+
description: 'supabase.db.rpc',
52+
parent_span_id: event.contexts?.trace?.span_id,
53+
span_id: expect.any(String),
54+
start_timestamp: expect.any(Number),
55+
timestamp: expect.any(Number),
56+
trace_id: event.contexts?.trace?.trace_id,
57+
data: expect.objectContaining({
58+
'sentry.op': 'queue.process',
59+
'sentry.origin': 'auto.db.supabase',
60+
'messaging.destination.name': 'todos',
61+
}),
62+
});
63+
});

packages/core/src/integrations/supabase.ts

+80-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ import { captureException } from '../exports';
1313
import { SPAN_STATUS_ERROR, SPAN_STATUS_OK } from '../tracing';
1414
import { DEBUG_BUILD } from '../debug-build';
1515

16+
export interface SupabaseClientConstructor {
17+
prototype: {
18+
from: (table: string) => PostgRESTQueryBuilder;
19+
schema: (schema: string) => { rpc: (...args: unknown[]) => Promise<unknown> };
20+
};
21+
rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>;
22+
}
23+
1624
const AUTH_OPERATIONS_TO_INSTRUMENT = [
1725
'reauthenticate',
1826
'signInAnonymously',
@@ -114,12 +122,6 @@ export interface SupabaseBreadcrumb {
114122
};
115123
}
116124

117-
export interface SupabaseClientConstructor {
118-
prototype: {
119-
from: (table: string) => PostgRESTQueryBuilder;
120-
};
121-
}
122-
123125
export interface PostgRESTProtoThenable {
124126
then: <T>(
125127
onfulfilled?: ((value: T) => T | PromiseLike<T>) | null,
@@ -215,6 +217,76 @@ export function translateFiltersIntoMethods(key: string, query: string): string
215217
return `${method}(${key}, ${value.join('.')})`;
216218
}
217219

220+
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
221+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
222+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
223+
{
224+
apply(target, thisArg, argumentsList) {
225+
const rv = Reflect.apply(target, thisArg, argumentsList);
226+
227+
return instrumentRpc(rv);
228+
},
229+
},
230+
);
231+
}
232+
233+
function instrumentRpc(SupabaseClient: unknown): unknown {
234+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc = new Proxy(
235+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc,
236+
{
237+
apply(target, thisArg, argumentsList) {
238+
const isProducerSpan = argumentsList[0] === 'enqueue';
239+
const isConsumerSpan = argumentsList[0] === 'dequeue';
240+
241+
const maybeQueueParams = argumentsList[1];
242+
243+
// If the second argument is not an object, it's not a queue operation
244+
if (!isPlainObject(maybeQueueParams)) {
245+
return Reflect.apply(target, thisArg, argumentsList);
246+
}
247+
248+
const msg = maybeQueueParams?.msg as { title: string };
249+
250+
const messageId = msg?.title;
251+
const queueName = maybeQueueParams?.queue_name as string;
252+
253+
const op = isProducerSpan ? 'queue.publish' : isConsumerSpan ? 'queue.process' : '';
254+
255+
// If the operation is not a queue operation, return the original function
256+
if (!op) {
257+
return Reflect.apply(target, thisArg, argumentsList);
258+
}
259+
260+
return startSpan(
261+
{
262+
name: 'supabase.db.rpc',
263+
attributes: {
264+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
265+
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op,
266+
},
267+
},
268+
async span => {
269+
return (Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>).then((res: unknown) => {
270+
if (messageId) {
271+
span.setAttribute('messaging.message.id', messageId);
272+
}
273+
274+
if (queueName) {
275+
span.setAttribute('messaging.destination.name', queueName);
276+
}
277+
278+
span.end();
279+
return res;
280+
});
281+
},
282+
);
283+
},
284+
},
285+
);
286+
287+
return SupabaseClient;
288+
}
289+
218290
function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn {
219291
return new Proxy(operation, {
220292
apply(target, thisArg, argumentsList) {
@@ -496,6 +568,8 @@ export const instrumentSupabaseClient = (supabaseClient: unknown): void => {
496568
supabaseClient.constructor === Function ? supabaseClient : supabaseClient.constructor;
497569

498570
instrumentSupabaseClientConstructor(SupabaseClientConstructor);
571+
instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor);
572+
instrumentRpc(supabaseClient as SupabaseClientInstance);
499573
instrumentSupabaseAuthClient(supabaseClient as SupabaseClientInstance);
500574
};
501575

0 commit comments

Comments
 (0)