Skip to content

Commit 6602788

Browse files
committed
docs: add instructions for fixing SQL tests and updating functions
Provides guidance on fixing invalid tests, updating SQL functions, and rerunning tests without creating migrations or using nx, to streamline test maintenance and debugging.
1 parent acdca8d commit 6602788

11 files changed

+859
-16
lines changed

.changeset/hungry-cloths-hunt.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Fix orphaned messages accumulating in queue when runs fail
6+
7+
- Archive all queued messages when a run fails to prevent resource waste
8+
- Handle type constraint violations gracefully without exceptions
9+
- Add performance index for efficient message archiving
10+
- Prevent retries on already-failed runs

.claude/commands/fix-sql-tests.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Your job is to fix SQL tests, either by fixing the tests if those are invalid,
2+
or updating the SQL functions in pkgs/core/schemas/ and trying again.
3+
4+
If updating functions, load them with psql.
5+
6+
!`pnpm nx supabase:status core --output env | grep DB_URL`
7+
PWD: !`pwd`
8+
9+
To rerun the test(s), run this command from `pkgs/core` directory:
10+
11+
`scripts/run-test-with-colors supabase/tests/<testfile>`
12+
13+
Do not create any migratons or try to run tests with nx.
14+
15+
<test_failures>
16+
!`pnpm nx test:pgtap core`
17+
</test_failures>

pkgs/core/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,11 @@ The system handles failures by:
234234
- Marking the step as 'failed'
235235
- Marking the run as 'failed'
236236
- Archiving the message in PGMQ
237-
- Notifying workers to abort pending tasks (future feature)
237+
- **Archiving all queued messages for the failed run** (preventing orphaned messages)
238+
4. Additional failure handling:
239+
- **No retries on already-failed runs** - tasks are immediately marked as failed
240+
- **Graceful type constraint violations** - handled without exceptions when single steps feed map steps
241+
- **Performance-optimized message archiving** using indexed queries
238242

239243
#### Retries and Timeouts
240244

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,55 @@ WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing
3737
LIMIT 1;
3838

3939
IF v_dependent_map_slug IS NOT NULL THEN
40-
RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)',
41-
v_dependent_map_slug,
42-
complete_task.step_slug,
43-
CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END,
44-
complete_task.output;
40+
-- Mark run as failed immediately
41+
UPDATE pgflow.runs
42+
SET status = 'failed',
43+
failed_at = now()
44+
WHERE pgflow.runs.run_id = complete_task.run_id;
45+
46+
-- Archive all queued messages
47+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
48+
FROM pgflow.step_tasks st
49+
JOIN pgflow.runs r ON st.run_id = r.run_id
50+
WHERE st.run_id = complete_task.run_id
51+
AND st.status = 'queued'
52+
AND st.message_id IS NOT NULL;
53+
54+
-- Mark current task as failed
55+
UPDATE pgflow.step_tasks
56+
SET status = 'failed',
57+
failed_at = now(),
58+
error_message = '[TYPE_VIOLATION] Produced ' ||
59+
CASE WHEN complete_task.output IS NULL THEN 'null'
60+
ELSE jsonb_typeof(complete_task.output) END ||
61+
' instead of array'
62+
WHERE pgflow.step_tasks.run_id = complete_task.run_id
63+
AND pgflow.step_tasks.step_slug = complete_task.step_slug
64+
AND pgflow.step_tasks.task_index = complete_task.task_index;
65+
66+
-- Mark step state as failed
67+
UPDATE pgflow.step_states
68+
SET status = 'failed',
69+
failed_at = now(),
70+
error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
71+
' expects array input but dependency ' || complete_task.step_slug ||
72+
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
73+
ELSE jsonb_typeof(complete_task.output) END
74+
WHERE pgflow.step_states.run_id = complete_task.run_id
75+
AND pgflow.step_states.step_slug = complete_task.step_slug;
76+
77+
-- Archive the current task's message (it was started, now failed)
78+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
79+
FROM pgflow.step_tasks st
80+
JOIN pgflow.runs r ON st.run_id = r.run_id
81+
WHERE st.run_id = complete_task.run_id
82+
AND st.step_slug = complete_task.step_slug
83+
AND st.task_index = complete_task.task_index
84+
AND st.message_id IS NOT NULL;
85+
86+
-- Return empty result
87+
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
88+
RETURN;
4589
END IF;
4690

4791
-- ==========================================

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,33 @@ DECLARE
1414
v_step_failed boolean;
1515
begin
1616

17+
-- If run is already failed, no retries allowed
18+
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN
19+
UPDATE pgflow.step_tasks
20+
SET status = 'failed',
21+
failed_at = now(),
22+
error_message = '[RUN_ALREADY_FAILED] ' || fail_task.error_message
23+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
24+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
25+
AND pgflow.step_tasks.task_index = fail_task.task_index
26+
AND pgflow.step_tasks.status = 'started';
27+
28+
-- Archive the task's message
29+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
30+
FROM pgflow.step_tasks st
31+
JOIN pgflow.runs r ON st.run_id = r.run_id
32+
WHERE st.run_id = fail_task.run_id
33+
AND st.step_slug = fail_task.step_slug
34+
AND st.task_index = fail_task.task_index
35+
AND st.message_id IS NOT NULL;
36+
37+
RETURN QUERY SELECT * FROM pgflow.step_tasks
38+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
39+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
40+
AND pgflow.step_tasks.task_index = fail_task.task_index;
41+
RETURN;
42+
END IF;
43+
1744
WITH run_lock AS (
1845
SELECT * FROM pgflow.runs
1946
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -140,6 +167,16 @@ IF v_run_failed THEN
140167
END;
141168
END IF;
142169

170+
-- Archive all queued messages when run fails
171+
IF v_run_failed THEN
172+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
173+
FROM pgflow.step_tasks st
174+
JOIN pgflow.runs r ON st.run_id = r.run_id
175+
WHERE st.run_id = fail_task.run_id
176+
AND st.status = 'queued'
177+
AND st.message_id IS NOT NULL;
178+
END IF;
179+
143180
-- For queued tasks: delay the message for retry with exponential backoff
144181
PERFORM (
145182
WITH retry_config AS (

0 commit comments

Comments
 (0)