diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js
new file mode 100644
index 000000000000..15309015bbd9
--- /dev/null
+++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js
@@ -0,0 +1,34 @@
+import * as Sentry from '@sentry/browser';
+import { createClient } from '@supabase/supabase-js';
+
+window.Sentry = Sentry;
+
+const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
+  db: {
+    schema: 'pgmq_public',
+  },
+});
+
+Sentry.init({
+  dsn: 'https://public@dsn.ingest.sentry.io/1337',
+  integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
+  tracesSampleRate: 1.0,
+});
+
+// Simulate queue operations
+async function performQueueOperations() {
+  try {
+    await supabaseClient.rpc('send', {
+      queue_name: 'todos',
+      msg: { title: 'Test Todo' },
+    });
+
+    await supabaseClient.rpc('pop', {
+      queue_name: 'todos',
+    });
+  } catch (error) {
+    Sentry.captureException(error);
+  }
+}
+
+performQueueOperations();
diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts
new file mode 100644
index 000000000000..d0f1534a404a
--- /dev/null
+++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts
@@ -0,0 +1,82 @@
+import type { Page } from '@playwright/test';
+import { expect } from '@playwright/test';
+import type { Event } from '@sentry/core';
+import { sentryTest } from '../../../../utils/fixtures';
+import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
+
+async function mockSupabaseRoute(page: Page) {
+  await page.route('**/rpc/**/send', route => {
+    return route.fulfill({
+      status: 200,
+      body: JSON.stringify([0]),
+      headers: {
+        'Content-Type': 'application/json',
+      },
+    });
+  });
+
+  await page.route('**/rpc/**/pop', route => {
+    return route.fulfill({
+      status: 200,
+      body: JSON.stringify([
+        {
+          msg_id: 0,
+        },
+      ]),
+      headers: {
+        'Content-Type': 'application/json',
+      },
+    });
+  });
+}
+
+const bundle = process.env.PW_BUNDLE || '';
+// We only want to run this in non-CDN bundle mode
+if (bundle.startsWith('bundle')) {
+  sentryTest.skip();
+}
+
+sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
+  if (shouldSkipTracingTest()) {
+    return;
+  }
+
+  await mockSupabaseRoute(page);
+
+  const url = await getLocalTestUrl({ testDir: __dirname });
+
+  const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
+  const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));
+
+  expect(queueSpans).toHaveLength(2);
+
+  expect(queueSpans![0]).toMatchObject({
+    description: 'supabase.db.rpc',
+    parent_span_id: event.contexts?.trace?.span_id,
+    span_id: expect.any(String),
+    start_timestamp: expect.any(Number),
+    timestamp: expect.any(Number),
+    trace_id: event.contexts?.trace?.trace_id,
+    data: expect.objectContaining({
+      'sentry.op': 'queue.publish',
+      'sentry.origin': 'auto.db.supabase',
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '0',
+    }),
+  });
+
+  expect(queueSpans![1]).toMatchObject({
+    description: 'supabase.db.rpc',
+    parent_span_id: event.contexts?.trace?.span_id,
+    span_id: expect.any(String),
+    start_timestamp: expect.any(Number),
+    timestamp: expect.any(Number),
+    trace_id: event.contexts?.trace?.trace_id,
+    data: expect.objectContaining({
+      'sentry.op': 'queue.process',
+      'sentry.origin': 'auto.db.supabase',
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '0',
+    }),
+  });
+});
diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js
new file mode 100644
index 000000000000..0cbc629a2b3e
--- /dev/null
+++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js
@@ -0,0 +1,34 @@
+import * as Sentry from '@sentry/browser';
+import { createClient } from '@supabase/supabase-js';
+
+window.Sentry = Sentry;
+
+const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
+  db: {
+    schema: 'pgmq_public',
+  },
+});
+
+Sentry.init({
+  dsn: 'https://public@dsn.ingest.sentry.io/1337',
+  integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
+  tracesSampleRate: 1.0,
+});
+
+// Simulate queue operations
+async function performQueueOperations() {
+  try {
+    await supabaseClient.schema('pgmq_public').rpc('send', {
+      queue_name: 'todos',
+      msg: { title: 'Test Todo' },
+    });
+
+    await supabaseClient.schema('pgmq_public').rpc('pop', {
+      queue_name: 'todos',
+    });
+  } catch (error) {
+    Sentry.captureException(error);
+  }
+}
+
+performQueueOperations();
diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts
new file mode 100644
index 000000000000..6417f7796964
--- /dev/null
+++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts
@@ -0,0 +1,82 @@
+import { type Page, expect } from '@playwright/test';
+import type { Event } from '@sentry/core';
+import { sentryTest } from '../../../../utils/fixtures';
+import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
+
+async function mockSupabaseRoute(page: Page) {
+  await page.route('**/rpc/**/send', route => {
+    return route.fulfill({
+      status: 200,
+      body: JSON.stringify([0]),
+      headers: {
+        'Content-Type': 'application/json',
+      },
+    });
+  });
+
+  await page.route('**/rpc/**/pop', route => {
+    return route.fulfill({
+      status: 200,
+      body: JSON.stringify([
+        {
+          msg_id: 0,
+        },
+      ]),
+      headers: {
+        'Content-Type': 'application/json',
+      },
+    });
+  });
+}
+
+const bundle = process.env.PW_BUNDLE || '';
+// We only want to run this in non-CDN bundle mode
+if (bundle.startsWith('bundle')) {
+  sentryTest.skip();
+}
+
+sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
+  if (shouldSkipTracingTest()) {
+    return;
+  }
+
+  await mockSupabaseRoute(page);
+
+  const url = await getLocalTestUrl({ testDir: __dirname });
+
+  const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
+
+  const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));
+
+  expect(queueSpans).toHaveLength(2);
+
+  expect(queueSpans![0]).toMatchObject({
+    description: 'supabase.db.rpc',
+    parent_span_id: event.contexts?.trace?.span_id,
+    span_id: expect.any(String),
+    start_timestamp: expect.any(Number),
+    timestamp: expect.any(Number),
+    trace_id: event.contexts?.trace?.trace_id,
+    data: expect.objectContaining({
+      'sentry.op': 'queue.publish',
+      'sentry.origin': 'auto.db.supabase',
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '0',
+    }),
+  });
+
+  expect(queueSpans![1]).toMatchObject({
+    description: 'supabase.db.rpc',
+    parent_span_id: event.contexts?.trace?.span_id,
+    span_id: expect.any(String),
+    start_timestamp: expect.any(Number),
+    timestamp: expect.any(Number),
+    trace_id: event.contexts?.trace?.trace_id,
+    data: expect.objectContaining({
+      'sentry.op': 'queue.process',
+      'sentry.origin': 'auto.db.supabase',
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '0',
+    }),
+  });
+});
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json b/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json
index a46519e9c75d..6429b0e8b9c2 100644
--- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json
@@ -7,7 +7,7 @@
     "build": "next build",
     "start": "next start",
     "clean": "npx rimraf node_modules pnpm-lock.yaml .next",
-    "start-local-supabase": "supabase init --force --workdir . && supabase start -o env && supabase db reset",
+    "start-local-supabase": "supabase start -o env && supabase db reset",
     "test:prod": "TEST_ENV=production playwright test",
     "test:build": "pnpm install && pnpm start-local-supabase && pnpm build",
     "test:assert": "pnpm test:prod"
@@ -25,7 +25,7 @@
     "next": "14.2.25",
     "react": "18.2.0",
     "react-dom": "18.2.0",
-    "supabase": "2.19.7",
+    "supabase": "2.23.4",
     "typescript": "4.9.5"
   },
   "devDependencies": {
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-error.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-error.ts
new file mode 100644
index 000000000000..d6543c0d2ede
--- /dev/null
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-error.ts
@@ -0,0 +1,27 @@
+// Enqueue a job to the queue
+
+import { NextApiRequest, NextApiResponse } from 'next';
+import { createClient } from '@supabase/supabase-js';
+import * as Sentry from '@sentry/nextjs';
+
+// These are the default development keys for a local Supabase instance
+const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
+const SUPABASE_SERVICE_ROLE_KEY =
+  'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
+
+const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
+
+Sentry.instrumentSupabaseClient(supabaseClient);
+
+export default async function handler(req: NextApiRequest, res: NextApiResponse) {
+  // Enqueue a job to the queue
+  const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', {
+    queue_name: 'non-existing-queue',
+  });
+
+  if (error) {
+    return res.status(500).json({ error: error.message });
+  }
+
+  return res.status(200).json({ data });
+}
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-rpc.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-rpc.ts
new file mode 100644
index 000000000000..e1c7caa0c6d0
--- /dev/null
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-rpc.ts
@@ -0,0 +1,31 @@
+// Enqueue a job to the queue
+
+import { NextApiRequest, NextApiResponse } from 'next';
+import { createClient } from '@supabase/supabase-js';
+import * as Sentry from '@sentry/nextjs';
+
+// These are the default development keys for a local Supabase instance
+const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
+const SUPABASE_SERVICE_ROLE_KEY =
+  'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
+
+const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
+  db: {
+    schema: 'pgmq_public',
+  },
+});
+
+Sentry.instrumentSupabaseClient(supabaseClient);
+
+export default async function handler(req: NextApiRequest, res: NextApiResponse) {
+  // Enqueue a job to the queue
+  const { data, error } = await supabaseClient.rpc('pop', {
+    queue_name: 'todos',
+  });
+
+  if (error) {
+    return res.status(500).json({ error: error.message });
+  }
+
+  return res.status(200).json({ data });
+}
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-schema.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-schema.ts
new file mode 100644
index 000000000000..ec77e7258e1e
--- /dev/null
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-schema.ts
@@ -0,0 +1,25 @@
+import { NextApiRequest, NextApiResponse } from 'next';
+import { createClient } from '@supabase/supabase-js';
+import * as Sentry from '@sentry/nextjs';
+
+// These are the default development keys for a local Supabase instance
+const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
+const SUPABASE_SERVICE_ROLE_KEY =
+  'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
+
+const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
+
+Sentry.instrumentSupabaseClient(supabaseClient);
+
+export default async function handler(req: NextApiRequest, res: NextApiResponse) {
+  // Process a job from the queue
+  const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', {
+    queue_name: 'todos',
+  });
+
+  if (error) {
+    return res.status(500).json({ error: error.message });
+  }
+
+  return res.status(200).json({ data });
+}
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-batch.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-batch.ts
new file mode 100644
index 000000000000..14208a00f450
--- /dev/null
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-batch.ts
@@ -0,0 +1,37 @@
+import { NextApiRequest, NextApiResponse } from 'next';
+import { createClient } from '@supabase/supabase-js';
+import * as Sentry from '@sentry/nextjs';
+
+// These are the default development keys for a local Supabase instance
+const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
+const SUPABASE_SERVICE_ROLE_KEY =
+  'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
+
+const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
+  db: {
+    schema: 'pgmq_public',
+  },
+});
+
+Sentry.instrumentSupabaseClient(supabaseClient);
+
+export default async function handler(req: NextApiRequest, res: NextApiResponse) {
+  // Enqueue a job to the queue
+  const { data, error } = await supabaseClient.rpc('send_batch', {
+    queue_name: 'todos',
+    messages: [
+      {
+        title: 'Test Todo 1',
+      },
+      {
+        title: 'Test Todo 2',
+      },
+    ],
+  });
+
+  if (error) {
+    return res.status(500).json({ error: error.message });
+  }
+
+  return res.status(200).json({ data });
+}
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-rpc.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-rpc.ts
new file mode 100644
index 000000000000..a4d161fc224e
--- /dev/null
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-rpc.ts
@@ -0,0 +1,32 @@
+import { NextApiRequest, NextApiResponse } from 'next';
+import { createClient } from '@supabase/supabase-js';
+import * as Sentry from '@sentry/nextjs';
+
+// These are the default development keys for a local Supabase instance
+const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
+const SUPABASE_SERVICE_ROLE_KEY =
+  'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
+
+const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
+  db: {
+    schema: 'pgmq_public',
+  },
+});
+
+Sentry.instrumentSupabaseClient(supabaseClient);
+
+export default async function handler(req: NextApiRequest, res: NextApiResponse) {
+  // Enqueue a job to the queue
+  const { data, error } = await supabaseClient.rpc('send', {
+    queue_name: 'todos',
+    message: {
+      title: 'Test Todo',
+    },
+  });
+
+  if (error) {
+    return res.status(500).json({ error: error.message });
+  }
+
+  return res.status(200).json({ data });
+}
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-schema.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-schema.ts
new file mode 100644
index 000000000000..92f81f27d49e
--- /dev/null
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-schema.ts
@@ -0,0 +1,28 @@
+import { NextApiRequest, NextApiResponse } from 'next';
+import { createClient } from '@supabase/supabase-js';
+import * as Sentry from '@sentry/nextjs';
+
+// These are the default development keys for a local Supabase instance
+const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
+const SUPABASE_SERVICE_ROLE_KEY =
+  'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
+
+const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
+
+Sentry.instrumentSupabaseClient(supabaseClient);
+
+export default async function handler(req: NextApiRequest, res: NextApiResponse) {
+  // Enqueue a job to the queue
+  const { data, error } = await supabaseClient.schema('pgmq_public').rpc('send', {
+    queue_name: 'todos',
+    message: {
+      title: 'Test Todo',
+    },
+  });
+
+  if (error) {
+    return res.status(500).json({ error: error.message });
+  }
+
+  return res.status(200).json({ data });
+}
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml
index 35dcff35bec4..6d003c8a64fd 100644
--- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml
@@ -10,9 +10,9 @@ enabled = true
 port = 54321
 # Schemas to expose in your API. Tables, views and stored procedures in this schema will get API
 # endpoints. `public` and `graphql_public` schemas are included by default.
-schemas = ["public", "graphql_public"]
+schemas = ["public", "graphql_public", "storage", "pgmq_public"]
 # Extra schemas to add to the search_path of every request.
-extra_search_path = ["public", "extensions"]
+extra_search_path = ["public", "extensions", "pgmq_public"]
 # The maximum number of rows returns from a view, table, or stored procedure. Limits payload size
 # for accidental or malicious requests.
 max_rows = 1000
@@ -28,7 +28,7 @@ port = 54322
 shadow_port = 54320
 # The database major version to use. This has to be the same as your remote database's. Run `SHOW
 # server_version;` on the remote database to check.
-major_version = 15
+major_version = 17
 
 [db.pooler]
 enabled = false
@@ -141,7 +141,6 @@ sign_in_sign_ups = 30
 # Number of OTP / Magic link verifications that can be made in a 5 minute interval per IP address.
 token_verifications = 30
 
-
 # Configure one of the supported captcha providers: `hcaptcha`, `turnstile`.
 # [auth.captcha]
 # enabled = true
@@ -283,6 +282,8 @@ enabled = true
 policy = "oneshot"
 # Port to attach the Chrome inspector for debugging edge functions.
 inspector_port = 8083
+# The Deno major version to use.
+deno_version = 1
 
 # [edge_runtime.secrets]
 # secret_key = "env(SECRET_VALUE)"
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql
index 1b1a98ace2e4..2af0497506c6 100644
--- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql
@@ -13,4 +13,4 @@ create policy "Individuals can view their own todos. " on todos for
 create policy "Individuals can update their own todos." on todos for
     update using (auth.uid() = user_id);
 create policy "Individuals can delete their own todos." on todos for
-    delete using (auth.uid() = user_id);
\ No newline at end of file
+    delete using (auth.uid() = user_id);
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20250515080602_enable-queues.sql b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20250515080602_enable-queues.sql
new file mode 100644
index 000000000000..8eba5c8de3a4
--- /dev/null
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20250515080602_enable-queues.sql
@@ -0,0 +1,182 @@
+
+-- Enable queues
+create extension if not exists "pgmq";
+select pgmq.create('todos');
+alter table "pgmq"."q_todos" enable row level security;
+
+--- The following code is vendored in from the supabase implementation for now
+--- By default, the pgmq schema is not exposed to the public
+--- And there is no other way to enable access locally without using the UI
+--- Vendored from: https://github.com/supabase/supabase/blob/aa9070c9087ce8c37a27e7c74ea0353858aed6c2/apps/studio/data/database-queues/database-queues-toggle-postgrest-mutation.ts#L18-L191
+create schema if not exists pgmq_public;
+grant usage on schema pgmq_public to postgres, anon, authenticated, service_role;
+
+create or replace function pgmq_public.pop(
+    queue_name text
+)
+  returns setof pgmq.message_record
+  language plpgsql
+  set search_path = ''
+as $$
+begin
+    return query
+    select *
+    from pgmq.pop(
+        queue_name := queue_name
+    );
+end;
+$$;
+
+comment on function pgmq_public.pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.';
+
+
+create or replace function pgmq_public.send(
+    queue_name text,
+    message jsonb,
+    sleep_seconds integer default 0  -- renamed from 'delay'
+)
+  returns setof bigint
+  language plpgsql
+  set search_path = ''
+as $$
+begin
+    return query
+    select *
+    from pgmq.send(
+        queue_name := queue_name,
+        msg := message,
+        delay := sleep_seconds
+    );
+end;
+$$;
+
+comment on function pgmq_public.send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.';
+
+
+create or replace function pgmq_public.send_batch(
+    queue_name text,
+    messages jsonb[],
+    sleep_seconds integer default 0  -- renamed from 'delay'
+)
+  returns setof bigint
+  language plpgsql
+  set search_path = ''
+as $$
+begin
+    return query
+    select *
+    from pgmq.send_batch(
+        queue_name := queue_name,
+        msgs := messages,
+        delay := sleep_seconds
+    );
+end;
+$$;
+
+comment on function pgmq_public.send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.';
+
+
+create or replace function pgmq_public.archive(
+    queue_name text,
+    message_id bigint
+)
+  returns boolean
+  language plpgsql
+  set search_path = ''
+as $$
+begin
+    return
+    pgmq.archive(
+        queue_name := queue_name,
+        msg_id := message_id
+    );
+end;
+$$;
+
+comment on function pgmq_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.';
+
+
+create or replace function pgmq_public.delete(
+    queue_name text,
+    message_id bigint
+)
+  returns boolean
+  language plpgsql
+  set search_path = ''
+as $$
+begin
+    return
+    pgmq.delete(
+        queue_name := queue_name,
+        msg_id := message_id
+    );
+end;
+$$;
+
+comment on function pgmq_public.delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.';
+
+create or replace function pgmq_public.read(
+    queue_name text,
+    sleep_seconds integer,
+    n integer
+)
+  returns setof pgmq.message_record
+  language plpgsql
+  set search_path = ''
+as $$
+begin
+    return query
+    select *
+    from pgmq.read(
+        queue_name := queue_name,
+        vt := sleep_seconds,
+        qty := n
+    );
+end;
+$$;
+
+comment on function pgmq_public.read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).';
+
+-- Grant execute permissions on wrapper functions to roles
+grant execute on function pgmq_public.pop(text) to postgres, service_role, anon, authenticated;
+grant execute on function pgmq.pop(text) to postgres, service_role, anon, authenticated;
+
+grant execute on function pgmq_public.send(text, jsonb, integer) to postgres, service_role, anon, authenticated;
+grant execute on function pgmq.send(text, jsonb, integer) to postgres, service_role, anon, authenticated;
+
+grant execute on function pgmq_public.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated;
+grant execute on function pgmq.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated;
+
+grant execute on function pgmq_public.archive(text, bigint) to postgres, service_role, anon, authenticated;
+grant execute on function pgmq.archive(text, bigint) to postgres, service_role, anon, authenticated;
+
+grant execute on function pgmq_public.delete(text, bigint) to postgres, service_role, anon, authenticated;
+grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated;
+
+grant execute on function pgmq_public.read(text, integer, integer) to postgres, service_role, anon, authenticated;
+grant execute on function pgmq.read(text, integer, integer) to postgres, service_role, anon, authenticated;
+
+-- For the service role, we want full access
+-- Grant permissions on existing tables
+grant all privileges on all tables in schema pgmq to postgres, service_role;
+
+-- Ensure service_role has permissions on future tables
+alter default privileges in schema pgmq grant all privileges on tables to postgres, service_role;
+
+grant usage on schema pgmq to postgres, anon, authenticated, service_role;
+
+
+/*
+  Grant access to sequences to API roles by default. Existing table permissions
+  continue to enforce insert restrictions. This is necessary to accommodate the
+  on-backup hook that rebuild queue table primary keys to avoid a pg_dump segfault.
+  This can be removed once logical backups are completely retired.
+*/
+grant usage, select, update
+on all sequences in schema pgmq
+to anon, authenticated, service_role;
+
+alter default privileges in schema pgmq
+grant usage, select, update
+on sequences
+to anon, authenticated, service_role;
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql
index 57b5c4d07e05..e69de29bb2d1 100644
--- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql
@@ -1,2 +0,0 @@
-TRUNCATE auth.users CASCADE;
-TRUNCATE auth.identities CASCADE;
diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts
index cfb66b372420..c91e9f50bb98 100644
--- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts
+++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts
@@ -1,5 +1,5 @@
 import { expect, test } from '@playwright/test';
-import { waitForTransaction } from '@sentry-internal/test-utils';
+import { waitForError, waitForTransaction } from '@sentry-internal/test-utils';
 
 // This test should be run in serial mode to ensure that the test user is created before the other tests
 test.describe.configure({ mode: 'serial' });
@@ -210,3 +210,309 @@ test('Sends server-side Supabase auth admin `listUsers` span', async ({ page, ba
     origin: 'auto.db.supabase',
   });
 });
+
+test('Sends queue publish spans with `schema(...).rpc(...)`', async ({ page, baseURL }) => {
+  const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
+    return (
+      transactionEvent?.contexts?.trace?.op === 'http.server' &&
+      transactionEvent?.transaction === 'GET /api/queue/producer-schema'
+    );
+  });
+
+  const result = await fetch(`${baseURL}/api/queue/producer-schema`);
+
+  expect(result.status).toBe(200);
+  expect(await result.json()).toEqual({ data: [1] });
+
+  const transactionEvent = await httpTransactionPromise;
+
+  expect(transactionEvent.spans).toHaveLength(2);
+  expect(transactionEvent.spans).toContainEqual({
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.system': 'supabase',
+      'messaging.message.id': '1',
+      'sentry.op': 'queue.publish',
+      'sentry.origin': 'auto.db.supabase',
+    },
+    description: 'supabase.db.rpc',
+    op: 'queue.publish',
+    origin: 'auto.db.supabase',
+    parent_span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    start_timestamp: expect.any(Number),
+    status: 'ok',
+    timestamp: expect.any(Number),
+    trace_id: expect.stringMatching(/[a-f0-9]{32}/),
+  });
+
+  expect(transactionEvent.breadcrumbs).toContainEqual({
+    timestamp: expect.any(Number),
+    type: 'supabase',
+    category: 'db.rpc.send',
+    message: 'rpc(send)',
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '1',
+    },
+  });
+});
+
+test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => {
+  const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
+    return (
+      transactionEvent?.contexts?.trace?.op === 'http.server' &&
+      transactionEvent?.transaction === 'GET /api/queue/producer-rpc'
+    );
+  });
+
+  const result = await fetch(`${baseURL}/api/queue/producer-rpc`);
+  const transactionEvent = await httpTransactionPromise;
+
+  expect(result.status).toBe(200);
+  expect(await result.json()).toEqual({ data: [2] });
+
+  expect(transactionEvent.spans).toHaveLength(2);
+  expect(transactionEvent.spans).toContainEqual({
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.system': 'supabase',
+      'messaging.message.id': '2',
+      'sentry.op': 'queue.publish',
+      'sentry.origin': 'auto.db.supabase',
+    },
+    description: 'supabase.db.rpc',
+    op: 'queue.publish',
+    origin: 'auto.db.supabase',
+    parent_span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    start_timestamp: expect.any(Number),
+    status: 'ok',
+    timestamp: expect.any(Number),
+    trace_id: expect.stringMatching(/[a-f0-9]{32}/),
+  });
+
+  expect(transactionEvent.breadcrumbs).toContainEqual({
+    timestamp: expect.any(Number),
+    type: 'supabase',
+    category: 'db.rpc.send',
+    message: 'rpc(send)',
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '2',
+    },
+  });
+});
+
+test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, baseURL }) => {
+  const consumerSpanPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
+    return (
+      transactionEvent?.contexts?.trace?.op === 'queue.process' && transactionEvent?.transaction === 'supabase.db.rpc'
+    );
+  });
+
+  const result = await fetch(`${baseURL}/api/queue/consumer-schema`);
+  const consumerEvent = await consumerSpanPromise;
+
+  expect(result.status).toBe(200);
+  expect(await result.json()).toEqual(
+    expect.objectContaining({
+      data: [
+        expect.objectContaining({
+          message: {
+            title: 'Test Todo',
+          },
+          msg_id: expect.any(Number),
+        }),
+      ],
+    }),
+  );
+
+  expect(consumerEvent.contexts.trace).toEqual({
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.system': 'supabase',
+      'messaging.message.id': '1',
+      'messaging.message.receive.latency': expect.any(Number),
+      'sentry.op': 'queue.process',
+      'sentry.origin': 'auto.db.supabase',
+      'sentry.source': 'route',
+    },
+    op: 'queue.process',
+    origin: 'auto.db.supabase',
+    parent_span_id: expect.any(String),
+    span_id: expect.any(String),
+    status: 'ok',
+    trace_id: expect.any(String),
+  });
+
+  expect(consumerEvent.breadcrumbs).toContainEqual({
+    timestamp: expect.any(Number),
+    type: 'supabase',
+    category: 'db.rpc.pop',
+    message: 'rpc(pop)',
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '1',
+    },
+  });
+});
+
+test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => {
+  const consumerSpanPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
+    return (
+      transactionEvent?.contexts?.trace?.op === 'queue.process' && transactionEvent?.transaction === 'supabase.db.rpc'
+    );
+  });
+
+  const result = await fetch(`${baseURL}/api/queue/consumer-rpc`);
+  const consumerEvent = await consumerSpanPromise;
+
+  expect(result.status).toBe(200);
+  expect(await result.json()).toEqual(
+    expect.objectContaining({
+      data: [
+        expect.objectContaining({
+          message: {
+            title: 'Test Todo',
+          },
+          msg_id: expect.any(Number),
+        }),
+      ],
+    }),
+  );
+  expect(consumerEvent.contexts.trace).toEqual({
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.system': 'supabase',
+      'messaging.message.id': '2',
+      'messaging.message.receive.latency': expect.any(Number),
+      'sentry.op': 'queue.process',
+      'sentry.origin': 'auto.db.supabase',
+      'sentry.source': 'route',
+    },
+    op: 'queue.process',
+    origin: 'auto.db.supabase',
+    parent_span_id: expect.any(String),
+    span_id: expect.any(String),
+    status: 'ok',
+    trace_id: expect.any(String),
+  });
+
+  expect(consumerEvent.breadcrumbs).toContainEqual({
+    timestamp: expect.any(Number),
+    type: 'supabase',
+    category: 'db.rpc.pop',
+    message: 'rpc(pop)',
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '2',
+    },
+  });
+});
+
+test('Sends queue process error spans with `rpc(...)`', async ({ page, baseURL }) => {
+  const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
+    return (
+      transactionEvent?.contexts?.trace?.op === 'http.server' &&
+      transactionEvent?.transaction === 'GET /api/queue/consumer-error'
+    );
+  });
+
+  const errorEventPromise = waitForError('supabase-nextjs', errorEvent => {
+    return errorEvent?.exception?.values?.[0]?.value?.includes('pgmq.q_non-existing-queue');
+  });
+
+  const result = await fetch(`${baseURL}/api/queue/consumer-error`);
+  const transactionEvent = await httpTransactionPromise;
+
+  expect(result.status).toBe(500);
+  expect(await result.json()).toEqual(
+    expect.objectContaining({
+      error: expect.stringContaining('relation "pgmq.q_non-existing-queue" does not exist'),
+    }),
+  );
+
+  const errorEvent = await errorEventPromise;
+  expect(errorEvent).toBeDefined();
+
+  expect(errorEvent.exception?.values?.[0].value).toBe('relation "pgmq.q_non-existing-queue" does not exist');
+  expect(errorEvent.contexts?.supabase).toEqual({
+    queueName: 'non-existing-queue',
+  });
+
+  expect(errorEvent.breadcrumbs).toContainEqual(
+    expect.objectContaining({
+      type: 'supabase',
+      category: 'db.rpc.pop',
+      message: 'rpc(pop)',
+      data: {
+        'messaging.destination.name': 'non-existing-queue',
+      },
+    }),
+  );
+
+  expect(transactionEvent.spans).toContainEqual({
+    data: {
+      'messaging.destination.name': 'non-existing-queue',
+      'messaging.system': 'supabase',
+      'sentry.op': 'queue.process',
+      'sentry.origin': 'auto.db.supabase',
+    },
+    description: 'supabase.db.rpc',
+    op: 'queue.process',
+    origin: 'auto.db.supabase',
+    parent_span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    start_timestamp: expect.any(Number),
+    status: 'unknown_error',
+    timestamp: expect.any(Number),
+    trace_id: expect.stringMatching(/[a-f0-9]{32}/),
+  });
+});
+
+test('Sends queue batch publish spans with `rpc(...)`', async ({ page, baseURL }) => {
+  const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
+    return (
+      transactionEvent?.contexts?.trace?.op === 'http.server' &&
+      transactionEvent?.transaction === 'GET /api/queue/producer-batch'
+    );
+  });
+
+  const result = await fetch(`${baseURL}/api/queue/producer-batch`);
+  const transactionEvent = await httpTransactionPromise;
+
+  expect(result.status).toBe(200);
+  expect(await result.json()).toEqual({ data: [3, 4] });
+
+  expect(transactionEvent.spans).toHaveLength(2);
+  expect(transactionEvent.spans).toContainEqual({
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.system': 'supabase',
+      'messaging.message.id': '3,4',
+      'sentry.op': 'queue.publish',
+      'sentry.origin': 'auto.db.supabase',
+    },
+    description: 'supabase.db.rpc',
+    op: 'queue.publish',
+    origin: 'auto.db.supabase',
+    parent_span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    span_id: expect.stringMatching(/[a-f0-9]{16}/),
+    start_timestamp: expect.any(Number),
+    status: 'ok',
+    timestamp: expect.any(Number),
+    trace_id: expect.stringMatching(/[a-f0-9]{32}/),
+  });
+
+  expect(transactionEvent.breadcrumbs).toContainEqual({
+    timestamp: expect.any(Number),
+    type: 'supabase',
+    category: 'db.rpc.send_batch',
+    message: 'rpc(send_batch)',
+    data: {
+      'messaging.destination.name': 'todos',
+      'messaging.message.id': '3,4',
+    },
+  });
+});
diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts
index ac781e95ece6..e0c61b1a9270 100644
--- a/packages/core/src/integrations/supabase.ts
+++ b/packages/core/src/integrations/supabase.ts
@@ -7,10 +7,19 @@ import { DEBUG_BUILD } from '../debug-build';
 import { captureException } from '../exports';
 import { defineIntegration } from '../integration';
 import { SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../semanticAttributes';
-import { setHttpStatus, SPAN_STATUS_ERROR, SPAN_STATUS_OK, startSpan } from '../tracing';
+import { continueTrace, setHttpStatus, SPAN_STATUS_ERROR, SPAN_STATUS_OK, startSpan } from '../tracing';
 import type { IntegrationFn } from '../types-hoist/integration';
 import { isPlainObject } from '../utils/is';
 import { logger } from '../utils/logger';
+import { getTraceData } from '../utils/traceData';
+
+export interface SupabaseClientConstructor {
+  prototype: {
+    from: (table: string) => PostgRESTQueryBuilder;
+    schema: (schema: string) => { rpc: (...args: unknown[]) => Promise<unknown> };
+  };
+  rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>;
+}
 
 const AUTH_OPERATIONS_TO_INSTRUMENT = [
   'reauthenticate',
@@ -72,6 +81,7 @@ type AuthAdminOperationName = (typeof AUTH_ADMIN_OPERATIONS_TO_INSTRUMENT)[numbe
 type PostgRESTQueryOperationFn = (...args: unknown[]) => PostgRESTFilterBuilder;
 
 export interface SupabaseClientInstance {
+  rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>;
   auth: {
     admin: Record<AuthAdminOperationName, AuthOperationFn>;
   } & Record<AuthOperationName, AuthOperationFn>;
@@ -91,6 +101,16 @@ export interface PostgRESTFilterBuilder {
 
 export interface SupabaseResponse {
   status?: number;
+  data?: Array<{
+    msg_id?: number;
+    enqueued_at?: string;
+    message?: {
+      _sentry?: {
+        sentry_trace?: string;
+        baggage?: string;
+      };
+    };
+  }>;
   error?: {
     message: string;
     code?: string;
@@ -113,12 +133,6 @@ export interface SupabaseBreadcrumb {
   };
 }
 
-export interface SupabaseClientConstructor {
-  prototype: {
-    from: (table: string) => PostgRESTQueryBuilder;
-  };
-}
-
 export interface PostgRESTProtoThenable {
   then: <T>(
     onfulfilled?: ((value: T) => T | PromiseLike<T>) | null,
@@ -214,6 +228,312 @@ export function translateFiltersIntoMethods(key: string, query: string): string
   return `${method}(${key}, ${value.join('.')})`;
 }
 
+function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
+  if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) {
+    return;
+  }
+
+  (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
+    (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
+    {
+      apply(target, thisArg, argumentsList) {
+        const supabaseInstance = Reflect.apply(target, thisArg, argumentsList);
+
+        (supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy(
+          (supabaseInstance as unknown as SupabaseClientInstance).rpc,
+          {
+            apply(target, thisArg, argumentsList) {
+              const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch';
+              const isConsumerSpan = argumentsList[0] === 'pop';
+
+              if (!isProducerSpan && !isConsumerSpan) {
+                return Reflect.apply(target, thisArg, argumentsList);
+              }
+
+              if (isProducerSpan) {
+                return instrumentRpcProducer(target, thisArg, argumentsList);
+              } else if (isConsumerSpan) {
+                return instrumentRpcConsumer(target, thisArg, argumentsList);
+              }
+
+              // If the operation is not a queue operation, return the original function
+              return Reflect.apply(target, thisArg, argumentsList);
+            },
+          },
+        );
+
+        return supabaseInstance;
+      },
+    },
+  );
+
+  markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema);
+}
+
+function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): {
+  sentryTrace?: string;
+  baggage?: string;
+} {
+  if (message?._sentry) {
+    return {
+      sentryTrace: message._sentry.sentry_trace,
+      baggage: message._sentry.baggage,
+    };
+  }
+  return {};
+}
+
+const instrumentRpcConsumer = (target: any, thisArg: any, argumentsList: any[]): Promise<unknown> => {
+  const [operationName, queueParams] = argumentsList as [
+    'pop',
+    {
+      queue_name?: string;
+    },
+  ];
+
+  const isConsumerSpan = operationName === 'pop';
+  const queueName = queueParams?.queue_name;
+
+  if (!isConsumerSpan) {
+    return Reflect.apply(target, thisArg, argumentsList); // Not a consumer operation
+  }
+
+  return (Reflect.apply(target, thisArg, argumentsList) as Promise<SupabaseResponse>).then((res: SupabaseResponse) => {
+    const latency = res.data?.[0]?.enqueued_at ? Date.now() - Date.parse(res.data?.[0]?.enqueued_at) : undefined;
+
+    const { sentryTrace, baggage } = extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {});
+
+    // Remove Sentry metadata from the returned message
+    delete res.data?.[0]?.message?._sentry;
+
+    return continueTrace(
+      {
+        sentryTrace,
+        baggage,
+      },
+      () => {
+        return startSpan(
+          {
+            name: 'supabase.db.rpc',
+            attributes: {
+              [SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process',
+              [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
+              'messaging.system': 'supabase',
+            },
+          },
+          span => {
+            const messageId =
+              res?.data?.map(item => (typeof item === 'number' ? item : item.msg_id)).join(',') || undefined;
+
+            if (messageId) {
+              span.setAttribute('messaging.message.id', messageId);
+            }
+
+            if (queueName) {
+              span.setAttribute('messaging.destination.name', queueName);
+            }
+
+            if (latency) {
+              span.setAttribute('messaging.message.receive.latency', latency);
+            }
+
+            const breadcrumb: SupabaseBreadcrumb = {
+              type: 'supabase',
+              category: `db.rpc.${argumentsList[0]}`,
+              message: `rpc(${argumentsList[0]})`,
+            };
+
+            const data: Record<string, unknown> = {};
+
+            if (messageId) {
+              data['messaging.message.id'] = messageId;
+            }
+
+            if (queueName) {
+              data['messaging.destination.name'] = queueName;
+            }
+
+            if (Object.keys(data).length) {
+              breadcrumb.data = data;
+            }
+
+            addBreadcrumb(breadcrumb);
+
+            if (res.error) {
+              const err = new Error(res.error.message) as SupabaseError;
+
+              if (res.error.code) {
+                err.code = res.error.code;
+              }
+
+              if (res.error.details) {
+                err.details = res.error.details;
+              }
+
+              captureException(err, {
+                contexts: {
+                  supabase: {
+                    queueName,
+                    messageId,
+                  },
+                },
+              });
+
+              span.setStatus({ code: SPAN_STATUS_ERROR });
+            } else {
+              span.setStatus({ code: SPAN_STATUS_OK });
+            }
+
+            span.end();
+
+            return res;
+          },
+        );
+      },
+    );
+  });
+};
+
+function instrumentRpcProducer(target: any, thisArg: any, argumentsList: any[]): Promise<unknown> {
+  const maybeQueueParams = argumentsList[1];
+
+  // If the second argument is not an object, it's not a queue operation
+  if (!isPlainObject(maybeQueueParams)) {
+    return Reflect.apply(target, thisArg, argumentsList);
+  }
+
+  const queueName = maybeQueueParams?.queue_name as string;
+
+  // If the queue name is not provided, return the original function
+  if (!queueName) {
+    return Reflect.apply(target, thisArg, argumentsList);
+  }
+
+  return startSpan(
+    {
+      name: 'supabase.db.rpc',
+      attributes: {
+        [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
+        [SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.publish',
+        'messaging.system': 'supabase',
+      },
+    },
+    span => {
+      const { 'sentry-trace': sentryTrace, baggage: sentryBaggage } = getTraceData();
+      const [, sentryArgumentsQueueParams] = argumentsList as [
+        'send' | 'send_batch',
+        {
+          queue_name: string;
+          messages?: Array<{ _sentry?: { sentry_trace?: string; baggage?: string } }>;
+          message?: { _sentry?: { sentry_trace?: string; baggage?: string } };
+        },
+      ];
+
+      if (sentryArgumentsQueueParams?.message) {
+        sentryArgumentsQueueParams.message._sentry = {
+          sentry_trace: sentryTrace,
+          baggage: sentryBaggage,
+        };
+      } else if (sentryArgumentsQueueParams?.messages) {
+        sentryArgumentsQueueParams.messages = sentryArgumentsQueueParams.messages.map(message => {
+          message._sentry = {
+            sentry_trace: sentryTrace,
+            baggage: sentryBaggage,
+          };
+          return message;
+        });
+      }
+
+      argumentsList[1] = sentryArgumentsQueueParams;
+
+      return (Reflect.apply(target, thisArg, argumentsList) as Promise<SupabaseResponse>)
+        .then((res: SupabaseResponse) => {
+          const messageId =
+            res?.data?.map(item => (typeof item === 'number' ? item : item.msg_id)).join(',') || undefined;
+
+          if (messageId) {
+            span.setAttribute('messaging.message.id', messageId || '<unknown>');
+          }
+
+          if (queueName) {
+            span.setAttribute('messaging.destination.name', queueName || '<unknown>');
+          }
+
+          const breadcrumb: SupabaseBreadcrumb = {
+            type: 'supabase',
+            category: `db.rpc.${argumentsList[0]}`,
+            message: `rpc(${argumentsList[0]})`,
+          };
+          const data: Record<string, unknown> = {};
+          if (messageId) {
+            data['messaging.message.id'] = messageId;
+          }
+          if (queueName) {
+            data['messaging.destination.name'] = queueName;
+          }
+          if (Object.keys(data).length) {
+            breadcrumb.data = data;
+          }
+          addBreadcrumb(breadcrumb);
+          if (res.error) {
+            const err = new Error(res.error.message) as SupabaseError;
+            if (res.error.code) {
+              err.code = res.error.code;
+            }
+            if (res.error.details) {
+              err.details = res.error.details;
+            }
+            captureException(err, {
+              contexts: {
+                supabase: {
+                  queueName,
+                  messageId,
+                },
+              },
+            });
+            span.setStatus({ code: SPAN_STATUS_ERROR });
+          } else {
+            span.setStatus({ code: SPAN_STATUS_OK });
+          }
+          span.end();
+
+          return res;
+        })
+        .catch((err: unknown) => {
+          span.setStatus({ code: SPAN_STATUS_ERROR });
+          span.end();
+          captureException(err, {
+            mechanism: {
+              handled: false,
+            },
+          });
+          throw err;
+        });
+    },
+  );
+}
+
+function instrumentRpc(SupabaseClient: unknown): void {
+  (SupabaseClient as unknown as SupabaseClientInstance).rpc = new Proxy(
+    (SupabaseClient as unknown as SupabaseClientInstance).rpc,
+    {
+      apply(target, thisArg, argumentsList) {
+        let result: Promise<unknown>;
+
+        if (argumentsList[0] === 'send' || argumentsList[0] === 'send_batch') {
+          result = instrumentRpcProducer(target, thisArg, argumentsList);
+        } else if (argumentsList[0] === 'pop') {
+          result = instrumentRpcConsumer(target, thisArg, argumentsList);
+        } else {
+          result = Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>;
+        }
+
+        return result;
+      },
+    },
+  );
+}
+
 function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn {
   return new Proxy(operation, {
     apply(target, thisArg, argumentsList) {
@@ -342,6 +662,13 @@ function instrumentPostgRESTFilterBuilder(PostgRESTFilterBuilder: PostgRESTFilte
         }
 
         const pathParts = typedThis.url.pathname.split('/');
+
+        if (pathParts.includes('rpc')) {
+          // RPC calls are instrumented in the `instrumentRpc` function
+          // and should not be instrumented here.
+          return Reflect.apply(target, thisArg, argumentsList);
+        }
+
         const table = pathParts.length > 0 ? pathParts[pathParts.length - 1] : '';
 
         const queryItems: string[] = [];
@@ -399,6 +726,28 @@ function instrumentPostgRESTFilterBuilder(PostgRESTFilterBuilder: PostgRESTFilte
                     span.end();
                   }
 
+                  const breadcrumb: SupabaseBreadcrumb = {
+                    type: 'supabase',
+                    category: `db.${operation}`,
+                    message: description,
+                  };
+
+                  const data: Record<string, unknown> = {};
+
+                  if (queryItems.length) {
+                    data.query = queryItems;
+                  }
+
+                  if (Object.keys(body).length) {
+                    data.body = body;
+                  }
+
+                  if (Object.keys(data).length) {
+                    breadcrumb.data = data;
+                  }
+
+                  addBreadcrumb(breadcrumb);
+
                   if (res.error) {
                     const err = new Error(res.error.message) as SupabaseError;
                     if (res.error.code) {
@@ -423,28 +772,6 @@ function instrumentPostgRESTFilterBuilder(PostgRESTFilterBuilder: PostgRESTFilte
                     });
                   }
 
-                  const breadcrumb: SupabaseBreadcrumb = {
-                    type: 'supabase',
-                    category: `db.${operation}`,
-                    message: description,
-                  };
-
-                  const data: Record<string, unknown> = {};
-
-                  if (queryItems.length) {
-                    data.query = queryItems;
-                  }
-
-                  if (Object.keys(body).length) {
-                    data.body = body;
-                  }
-
-                  if (Object.keys(data).length) {
-                    breadcrumb.data = data;
-                  }
-
-                  addBreadcrumb(breadcrumb);
-
                   return res;
                 },
                 (err: Error) => {
@@ -503,6 +830,8 @@ export const instrumentSupabaseClient = (supabaseClient: unknown): void => {
     supabaseClient.constructor === Function ? supabaseClient : supabaseClient.constructor;
 
   instrumentSupabaseClientConstructor(SupabaseClientConstructor);
+  instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor);
+  instrumentRpc(supabaseClient as SupabaseClientInstance);
   instrumentSupabaseAuthClient(supabaseClient as SupabaseClientInstance);
 };