Skip to content

Commit 2aef286

Browse files
committed
Match with OTEL semantics improve tests
1 parent 4de58d8 commit 2aef286

File tree

9 files changed

+2778
-483
lines changed

9 files changed

+2778
-483
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { NextApiRequest, NextApiResponse } from 'next';
2+
import { createClient } from '@supabase/supabase-js';
3+
import * as Sentry from '@sentry/nextjs';
4+
5+
// These are the default development keys for a local Supabase instance
6+
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
7+
const SUPABASE_SERVICE_ROLE_KEY =
8+
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
9+
10+
const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
11+
db: {
12+
schema: 'pgmq_public',
13+
},
14+
});
15+
16+
Sentry.instrumentSupabaseClient(supabaseClient);
17+
18+
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
19+
// Step 1: Batch produce multiple messages
20+
const { data: sendData, error: sendError } = await supabaseClient.rpc('send_batch', {
21+
queue_name: 'batch-flow-queue',
22+
messages: [
23+
{
24+
taskType: 'email',
25+
recipient: '[email protected]',
26+
subject: 'Welcome!',
27+
},
28+
{
29+
taskType: 'email',
30+
recipient: '[email protected]',
31+
subject: 'Verification',
32+
},
33+
{
34+
taskType: 'sms',
35+
recipient: '+1234567890',
36+
message: 'Your code is 123456',
37+
},
38+
],
39+
});
40+
41+
if (sendError) {
42+
return res.status(500).json({ error: `Send batch failed: ${sendError.message}` });
43+
}
44+
45+
// Step 2: Consume multiple messages from the queue
46+
const { data: receiveData, error: receiveError } = await supabaseClient.rpc('receive', {
47+
queue_name: 'batch-flow-queue',
48+
vt: 30,
49+
qty: 3,
50+
});
51+
52+
if (receiveError) {
53+
return res.status(500).json({ error: `Receive failed: ${receiveError.message}` });
54+
}
55+
56+
// Step 3: Process all messages
57+
const processedMessages = receiveData?.map((msg: any) => ({
58+
messageId: msg.msg_id,
59+
taskType: msg.message?.taskType,
60+
processed: true,
61+
}));
62+
63+
// Step 4: Archive all processed messages
64+
const messageIds = receiveData?.map((msg: any) => msg.msg_id).filter(Boolean);
65+
if (messageIds && messageIds.length > 0) {
66+
const { error: archiveError } = await supabaseClient.rpc('archive', {
67+
queue_name: 'batch-flow-queue',
68+
msg_ids: messageIds,
69+
});
70+
71+
if (archiveError) {
72+
return res.status(500).json({ error: `Archive failed: ${archiveError.message}` });
73+
}
74+
}
75+
76+
return res.status(200).json({
77+
success: true,
78+
batchSize: 3,
79+
produced: { messageIds: sendData },
80+
consumed: {
81+
count: receiveData?.length || 0,
82+
messages: processedMessages,
83+
},
84+
});
85+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { NextApiRequest, NextApiResponse } from 'next';
2+
import { createClient } from '@supabase/supabase-js';
3+
import * as Sentry from '@sentry/nextjs';
4+
5+
// These are the default development keys for a local Supabase instance
6+
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
7+
const SUPABASE_SERVICE_ROLE_KEY =
8+
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
9+
10+
const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
11+
db: {
12+
schema: 'pgmq_public',
13+
},
14+
});
15+
16+
Sentry.instrumentSupabaseClient(supabaseClient);
17+
18+
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
19+
// Test concurrent queue operations to multiple queues
20+
// This validates that instrumentation handles parallel operations correctly
21+
22+
try {
23+
// Produce messages to 3 different queues concurrently
24+
const produceOperations = await Promise.all([
25+
supabaseClient.rpc('send', {
26+
queue_name: 'concurrent-queue-1',
27+
message: { queueId: 1, task: 'process-images' },
28+
}),
29+
supabaseClient.rpc('send', {
30+
queue_name: 'concurrent-queue-2',
31+
message: { queueId: 2, task: 'send-emails' },
32+
}),
33+
supabaseClient.rpc('send', {
34+
queue_name: 'concurrent-queue-3',
35+
message: { queueId: 3, task: 'generate-reports' },
36+
}),
37+
]);
38+
39+
// Check for errors
40+
const produceErrors = produceOperations
41+
.map((op, idx) => (op.error ? { queue: idx + 1, error: op.error.message } : null))
42+
.filter(Boolean);
43+
44+
if (produceErrors.length > 0) {
45+
return res.status(500).json({ error: 'Some produce operations failed', details: produceErrors });
46+
}
47+
48+
// Consume from all queues concurrently
49+
const consumeOperations = await Promise.all([
50+
supabaseClient.rpc('receive', {
51+
queue_name: 'concurrent-queue-1',
52+
vt: 30,
53+
qty: 1,
54+
}),
55+
supabaseClient.rpc('receive', {
56+
queue_name: 'concurrent-queue-2',
57+
vt: 30,
58+
qty: 1,
59+
}),
60+
supabaseClient.rpc('receive', {
61+
queue_name: 'concurrent-queue-3',
62+
vt: 30,
63+
qty: 1,
64+
}),
65+
]);
66+
67+
// Process results
68+
const consumeErrors = consumeOperations
69+
.map((op, idx) => (op.error ? { queue: idx + 1, error: op.error.message } : null))
70+
.filter(Boolean);
71+
72+
if (consumeErrors.length > 0) {
73+
return res.status(500).json({ error: 'Some consume operations failed', details: consumeErrors });
74+
}
75+
76+
// Archive all messages concurrently
77+
const messageIds = consumeOperations.map((op, idx) => ({
78+
queue: `concurrent-queue-${idx + 1}`,
79+
msgId: op.data?.[0]?.msg_id,
80+
}));
81+
82+
await Promise.all(
83+
messageIds
84+
.filter(m => m.msgId)
85+
.map(m =>
86+
supabaseClient.rpc('archive', {
87+
queue_name: m.queue,
88+
msg_ids: [m.msgId],
89+
}),
90+
),
91+
);
92+
93+
return res.status(200).json({
94+
success: true,
95+
concurrentOperations: {
96+
queuesProcessed: 3,
97+
produced: produceOperations.map(op => op.data),
98+
consumed: consumeOperations.map((op, idx) => ({
99+
queue: idx + 1,
100+
messageId: op.data?.[0]?.msg_id,
101+
task: op.data?.[0]?.message?.task,
102+
})),
103+
},
104+
});
105+
} catch (error) {
106+
Sentry.captureException(error);
107+
return res.status(500).json({
108+
error: error instanceof Error ? error.message : 'Unknown error',
109+
});
110+
}
111+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { NextApiRequest, NextApiResponse } from 'next';
2+
import { createClient } from '@supabase/supabase-js';
3+
import * as Sentry from '@sentry/nextjs';
4+
5+
// These are the default development keys for a local Supabase instance
6+
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
7+
const SUPABASE_SERVICE_ROLE_KEY =
8+
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
9+
10+
const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
11+
db: {
12+
schema: 'pgmq_public',
13+
},
14+
});
15+
16+
Sentry.instrumentSupabaseClient(supabaseClient);
17+
18+
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
19+
// Step 1: Produce a message that will cause processing error
20+
const { data: sendData, error: sendError } = await supabaseClient.rpc('send', {
21+
queue_name: 'error-flow-queue',
22+
message: {
23+
action: 'divide',
24+
numerator: 100,
25+
denominator: 0, // This will cause an error
26+
},
27+
});
28+
29+
if (sendError) {
30+
return res.status(500).json({ error: `Send failed: ${sendError.message}` });
31+
}
32+
33+
// Step 2: Consume the message
34+
const { data: receiveData, error: receiveError } = await supabaseClient.rpc('receive', {
35+
queue_name: 'error-flow-queue',
36+
vt: 30,
37+
qty: 1,
38+
});
39+
40+
if (receiveError) {
41+
return res.status(500).json({ error: `Receive failed: ${receiveError.message}` });
42+
}
43+
44+
// Step 3: Process the message - this will throw an error
45+
const message = receiveData?.[0];
46+
47+
try {
48+
if (message?.message?.denominator === 0) {
49+
throw new Error('Division by zero error in queue processor');
50+
}
51+
52+
// Simulate successful processing (won't be reached in this flow)
53+
const result = message.message.numerator / message.message.denominator;
54+
55+
return res.status(200).json({
56+
success: true,
57+
result,
58+
messageId: message?.msg_id,
59+
});
60+
} catch (error) {
61+
// Capture the error with Sentry
62+
Sentry.captureException(error, scope => {
63+
scope.setContext('queue', {
64+
queueName: 'error-flow-queue',
65+
messageId: message?.msg_id,
66+
message: message?.message,
67+
});
68+
return scope;
69+
});
70+
71+
// Return error response
72+
return res.status(500).json({
73+
success: false,
74+
error: error instanceof Error ? error.message : 'Unknown error',
75+
messageId: message?.msg_id,
76+
});
77+
}
78+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { NextApiRequest, NextApiResponse } from 'next';
2+
import { createClient } from '@supabase/supabase-js';
3+
import * as Sentry from '@sentry/nextjs';
4+
5+
// These are the default development keys for a local Supabase instance
6+
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
7+
const SUPABASE_SERVICE_ROLE_KEY =
8+
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
9+
10+
const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
11+
db: {
12+
schema: 'pgmq_public',
13+
},
14+
});
15+
16+
Sentry.instrumentSupabaseClient(supabaseClient);
17+
18+
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
19+
// Step 1: Produce a message to the queue
20+
const { data: sendData, error: sendError } = await supabaseClient.rpc('send', {
21+
queue_name: 'e2e-flow-queue',
22+
message: {
23+
action: 'process_order',
24+
orderId: 'ORDER-123',
25+
timestamp: new Date().toISOString(),
26+
},
27+
});
28+
29+
if (sendError) {
30+
return res.status(500).json({ error: `Send failed: ${sendError.message}` });
31+
}
32+
33+
// Step 2: Consume the message from the queue (with VT=30 seconds)
34+
const { data: receiveData, error: receiveError } = await supabaseClient.rpc('receive', {
35+
queue_name: 'e2e-flow-queue',
36+
vt: 30,
37+
qty: 1,
38+
});
39+
40+
if (receiveError) {
41+
return res.status(500).json({ error: `Receive failed: ${receiveError.message}` });
42+
}
43+
44+
// Step 3: Process the message (simulate business logic)
45+
const processedMessage = receiveData?.[0];
46+
47+
// Step 4: Archive the message after successful processing
48+
if (processedMessage?.msg_id) {
49+
const { error: archiveError } = await supabaseClient.rpc('archive', {
50+
queue_name: 'e2e-flow-queue',
51+
msg_ids: [processedMessage.msg_id],
52+
});
53+
54+
if (archiveError) {
55+
return res.status(500).json({ error: `Archive failed: ${archiveError.message}` });
56+
}
57+
}
58+
59+
return res.status(200).json({
60+
success: true,
61+
produced: { messageId: sendData },
62+
consumed: {
63+
messageId: processedMessage?.msg_id,
64+
message: processedMessage?.message,
65+
},
66+
});
67+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import type { NextApiRequest, NextApiResponse } from 'next';
2+
import { getSupabaseClient } from '@/lib/initSupabaseAdmin';
3+
4+
const supabaseClient = getSupabaseClient();
5+
6+
type Data = {
7+
data: unknown;
8+
error: unknown;
9+
};
10+
11+
export default async function handler(req: NextApiRequest, res: NextApiResponse<Data>) {
12+
const { data, error } = await supabaseClient.rpc('get_supabase_status');
13+
14+
if (error) {
15+
console.warn('Supabase RPC status check failed', error);
16+
res.status(500).json({ data, error });
17+
return;
18+
}
19+
20+
res.status(200).json({ data, error });
21+
}

0 commit comments

Comments
 (0)