-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(core): Add Supabase Queues support #15921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 13 commits
3f55cf5
ecf67cc
13d4a6e
8d38e7c
a5a271e
6e0bc24
5327a3c
a5744f0
2e6012a
b5ebc9a
1d32d86
d650de1
9accd4b
e68a576
09ef20a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| import * as Sentry from '@sentry/browser'; | ||
| import { createClient } from '@supabase/supabase-js'; | ||
|
|
||
| window.Sentry = Sentry; | ||
|
|
||
| const supabaseClient = createClient('https://test.supabase.co', 'test-key', { | ||
| db: { | ||
| schema: 'pgmq_public', | ||
| }, | ||
| }); | ||
|
|
||
| Sentry.init({ | ||
| dsn: 'https://[email protected]/1337', | ||
| integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })], | ||
| tracesSampleRate: 1.0, | ||
| }); | ||
|
|
||
| // Simulate queue operations | ||
| async function performQueueOperations() { | ||
| try { | ||
| await supabaseClient.rpc('send', { | ||
| queue_name: 'todos', | ||
| msg: { title: 'Test Todo' }, | ||
| }); | ||
|
|
||
| await supabaseClient.rpc('pop', { | ||
| queue_name: 'todos', | ||
| }); | ||
| } catch (error) { | ||
| Sentry.captureException(error); | ||
| } | ||
| } | ||
|
|
||
| performQueueOperations(); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| import type { Page } from '@playwright/test'; | ||
| import { expect } from '@playwright/test'; | ||
| import type { Event } from '@sentry/core'; | ||
| import { sentryTest } from '../../../../utils/fixtures'; | ||
| import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; | ||
|
|
||
| async function mockSupabaseRoute(page: Page) { | ||
| await page.route('**/rpc/**/send', route => { | ||
| return route.fulfill({ | ||
| status: 200, | ||
| body: JSON.stringify([0]), | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| }, | ||
| }); | ||
| }); | ||
|
|
||
| await page.route('**/rpc/**/pop', route => { | ||
| return route.fulfill({ | ||
| status: 200, | ||
| body: JSON.stringify([ | ||
| { | ||
| msg_id: 0, | ||
| }, | ||
| ]), | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| }, | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| const bundle = process.env.PW_BUNDLE || ''; | ||
| // We only want to run this in non-CDN bundle mode | ||
| if (bundle.startsWith('bundle')) { | ||
| sentryTest.skip(); | ||
| } | ||
|
|
||
| sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => { | ||
| if (shouldSkipTracingTest()) { | ||
| return; | ||
| } | ||
|
|
||
| await mockSupabaseRoute(page); | ||
|
|
||
| const url = await getLocalTestUrl({ testDir: __dirname }); | ||
|
|
||
| const event = await getFirstSentryEnvelopeRequest<Event>(page, url); | ||
| const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.')); | ||
|
|
||
| expect(queueSpans).toHaveLength(2); | ||
|
|
||
| expect(queueSpans![0]).toMatchObject({ | ||
| description: 'supabase.db.rpc', | ||
| parent_span_id: event.contexts?.trace?.span_id, | ||
| span_id: expect.any(String), | ||
| start_timestamp: expect.any(Number), | ||
| timestamp: expect.any(Number), | ||
| trace_id: event.contexts?.trace?.trace_id, | ||
| data: expect.objectContaining({ | ||
| 'sentry.op': 'queue.publish', | ||
| 'sentry.origin': 'auto.db.supabase', | ||
| 'messaging.destination.name': 'todos', | ||
| 'messaging.message.id': '0', | ||
| }), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Origin value mismatch in browser testThe browser integration test expects |
||
| }); | ||
|
|
||
| expect(queueSpans![1]).toMatchObject({ | ||
| description: 'supabase.db.rpc', | ||
| parent_span_id: event.contexts?.trace?.span_id, | ||
| span_id: expect.any(String), | ||
| start_timestamp: expect.any(Number), | ||
| timestamp: expect.any(Number), | ||
| trace_id: event.contexts?.trace?.trace_id, | ||
| data: expect.objectContaining({ | ||
| 'sentry.op': 'queue.process', | ||
| 'sentry.origin': 'auto.db.supabase', | ||
| 'messaging.destination.name': 'todos', | ||
| 'messaging.message.id': '0', | ||
| }), | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| import * as Sentry from '@sentry/browser'; | ||
| import { createClient } from '@supabase/supabase-js'; | ||
|
|
||
| window.Sentry = Sentry; | ||
|
|
||
| const supabaseClient = createClient('https://test.supabase.co', 'test-key', { | ||
| db: { | ||
| schema: 'pgmq_public', | ||
| }, | ||
| }); | ||
|
|
||
| Sentry.init({ | ||
| dsn: 'https://[email protected]/1337', | ||
| integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })], | ||
| tracesSampleRate: 1.0, | ||
| }); | ||
|
|
||
| // Simulate queue operations | ||
| async function performQueueOperations() { | ||
| try { | ||
| await supabaseClient.schema('pgmq_public').rpc('send', { | ||
| queue_name: 'todos', | ||
| msg: { title: 'Test Todo' }, | ||
| }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Incorrect RPC Parameter Name in Browser TestThe browser integration test uses incorrect parameter name |
||
|
|
||
| await supabaseClient.schema('pgmq_public').rpc('pop', { | ||
| queue_name: 'todos', | ||
| }); | ||
| } catch (error) { | ||
| Sentry.captureException(error); | ||
| } | ||
| } | ||
|
|
||
| performQueueOperations(); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| import { type Page, expect } from '@playwright/test'; | ||
| import type { Event } from '@sentry/core'; | ||
| import { sentryTest } from '../../../../utils/fixtures'; | ||
| import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; | ||
|
|
||
| async function mockSupabaseRoute(page: Page) { | ||
| await page.route('**/rpc/**/send', route => { | ||
| return route.fulfill({ | ||
| status: 200, | ||
| body: JSON.stringify([0]), | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| }, | ||
| }); | ||
| }); | ||
|
|
||
| await page.route('**/rpc/**/pop', route => { | ||
| return route.fulfill({ | ||
| status: 200, | ||
| body: JSON.stringify([ | ||
| { | ||
| msg_id: 0, | ||
| }, | ||
| ]), | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| }, | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| const bundle = process.env.PW_BUNDLE || ''; | ||
| // We only want to run this in non-CDN bundle mode | ||
| if (bundle.startsWith('bundle')) { | ||
| sentryTest.skip(); | ||
| } | ||
|
|
||
| sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => { | ||
| if (shouldSkipTracingTest()) { | ||
| return; | ||
| } | ||
|
|
||
| await mockSupabaseRoute(page); | ||
|
|
||
| const url = await getLocalTestUrl({ testDir: __dirname }); | ||
|
|
||
| const event = await getFirstSentryEnvelopeRequest<Event>(page, url); | ||
|
|
||
| const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.')); | ||
|
|
||
| expect(queueSpans).toHaveLength(2); | ||
|
|
||
| expect(queueSpans![0]).toMatchObject({ | ||
| description: 'supabase.db.rpc', | ||
| parent_span_id: event.contexts?.trace?.span_id, | ||
| span_id: expect.any(String), | ||
| start_timestamp: expect.any(Number), | ||
| timestamp: expect.any(Number), | ||
| trace_id: event.contexts?.trace?.trace_id, | ||
| data: expect.objectContaining({ | ||
| 'sentry.op': 'queue.publish', | ||
| 'sentry.origin': 'auto.db.supabase', | ||
| 'messaging.destination.name': 'todos', | ||
| 'messaging.message.id': '0', | ||
| }), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Origin string mismatch in browser testsThe browser integration test expects |
||
| }); | ||
|
|
||
| expect(queueSpans![1]).toMatchObject({ | ||
| description: 'supabase.db.rpc', | ||
| parent_span_id: event.contexts?.trace?.span_id, | ||
| span_id: expect.any(String), | ||
| start_timestamp: expect.any(Number), | ||
| timestamp: expect.any(Number), | ||
| trace_id: event.contexts?.trace?.trace_id, | ||
| data: expect.objectContaining({ | ||
| 'sentry.op': 'queue.process', | ||
| 'sentry.origin': 'auto.db.supabase', | ||
| 'messaging.destination.name': 'todos', | ||
| 'messaging.message.id': '0', | ||
| }), | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| import { NextApiRequest, NextApiResponse } from 'next'; | ||
| import { createClient } from '@supabase/supabase-js'; | ||
| import * as Sentry from '@sentry/nextjs'; | ||
|
|
||
| // These are the default development keys for a local Supabase instance | ||
| const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; | ||
| const SUPABASE_SERVICE_ROLE_KEY = | ||
| 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; | ||
|
|
||
| const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, { | ||
| db: { | ||
| schema: 'pgmq_public', | ||
| }, | ||
| }); | ||
|
|
||
| Sentry.instrumentSupabaseClient(supabaseClient); | ||
|
|
||
| export default async function handler(req: NextApiRequest, res: NextApiResponse) { | ||
| // Step 1: Batch produce multiple messages | ||
| const { data: sendData, error: sendError } = await supabaseClient.rpc('send_batch', { | ||
| queue_name: 'batch-flow-queue', | ||
| messages: [ | ||
| { | ||
| taskType: 'email', | ||
| recipient: '[email protected]', | ||
| subject: 'Welcome!', | ||
| }, | ||
| { | ||
| taskType: 'email', | ||
| recipient: '[email protected]', | ||
| subject: 'Verification', | ||
| }, | ||
| { | ||
| taskType: 'sms', | ||
| recipient: '+1234567890', | ||
| message: 'Your code is 123456', | ||
| }, | ||
| ], | ||
| }); | ||
|
|
||
| if (sendError) { | ||
| return res.status(500).json({ error: `Send batch failed: ${sendError.message}` }); | ||
| } | ||
|
|
||
| // Step 2: Consume multiple messages from the queue | ||
| const { data: receiveData, error: receiveError } = await supabaseClient.rpc('receive', { | ||
| queue_name: 'batch-flow-queue', | ||
| vt: 30, | ||
| qty: 3, | ||
| }); | ||
|
|
||
| if (receiveError) { | ||
| return res.status(500).json({ error: `Receive failed: ${receiveError.message}` }); | ||
| } | ||
|
|
||
| // Step 3: Process all messages | ||
| const processedMessages = receiveData?.map((msg: any) => ({ | ||
| messageId: msg.msg_id, | ||
| taskType: msg.message?.taskType, | ||
| processed: true, | ||
| })); | ||
|
|
||
| // Step 4: Archive all processed messages | ||
| const messageIds = receiveData?.map((msg: any) => msg.msg_id).filter(Boolean); | ||
| if (messageIds && messageIds.length > 0) { | ||
| const { error: archiveError } = await supabaseClient.rpc('archive', { | ||
| queue_name: 'batch-flow-queue', | ||
| msg_ids: messageIds, | ||
| }); | ||
|
|
||
| if (archiveError) { | ||
| return res.status(500).json({ error: `Archive failed: ${archiveError.message}` }); | ||
| } | ||
| } | ||
|
|
||
| return res.status(200).json({ | ||
| success: true, | ||
| batchSize: 3, | ||
| produced: { messageIds: sendData }, | ||
| consumed: { | ||
| count: receiveData?.length || 0, | ||
| messages: processedMessages, | ||
| }, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Inconsistent RPC Payloads Break Sentry Tracing
The Supabase RPC calls for queue operations use
msgfor the message payload. Sentry's queue instrumentation expectsmessage, which prevents proper trace context injection.Additional Locations (1)
dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js#L22-L23