Skip to content

Commit f303c04

Browse files
committed
docs: add instructions for fixing SQL tests and updating functions
Provides guidance on fixing invalid tests, updating SQL functions, and rerunning tests without creating migrations or using nx, to streamline test maintenance and debugging.
1 parent acdca8d commit f303c04

16 files changed

+1347
-17
lines changed

.changeset/hungry-cloths-hunt.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Fix orphaned messages accumulating in queue when runs fail
6+
7+
- Archive all queued messages when a run fails to prevent resource waste
8+
- Handle type constraint violations gracefully without exceptions
9+
- Add performance index for efficient message archiving
10+
- Prevent retries on already-failed runs

.claude/commands/fix-sql-tests.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Your job is to fix SQL tests, either by fixing the tests if those are invalid,
2+
or updating the SQL functions in pkgs/core/schemas/ and trying again.
3+
4+
If updating functions, load them with psql.
5+
6+
!`pnpm nx supabase:status core --output env | grep DB_URL`
7+
PWD: !`pwd`
8+
9+
To rerun the test(s), run this command from `pkgs/core` directory:
10+
11+
`scripts/run-test-with-colors supabase/tests/<testfile>`
12+
13+
Do not create any migratons or try to run tests with nx.
14+
15+
<test_failures>
16+
!`pnpm nx test:pgtap core`
17+
</test_failures>

.claude/commands/test-first-sql.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
Your job is to implement the feature below in a test-first manner.
2+
First, you must idenfity what things you want to test for.
3+
Then you must write one test at a time, from the simplest, more generic,
4+
to more precise (if applicable, sometimes you only need to write one test per
5+
thing, without multiple per thing).
6+
7+
To run the test(s), run this command from `pkgs/core` directory:
8+
9+
`scripts/run-test-with-colors supabase/tests/<testfile>`
10+
11+
The newly written test must fail for the correct reasons.
12+
13+
In order to make the test pass, you need to update function
14+
code in pkgs/core/schemas/.
15+
16+
After updating you should use `psql` to execute function file
17+
and update function in database.
18+
19+
!`pnpm nx supabase:status core --output env | grep DB_URL`
20+
PWD: !`pwd`
21+
22+
Repeat until all the added tests are passing.
23+
24+
When they do, run all the tests like this:
25+
26+
`scripts/run-test-with-colors supabase/tests/`
27+
28+
Do not create any migratons or try to run tests with nx.
29+
30+
Never use any INSERTs or UPDATEs to prepare or mutate state for the test.
31+
Instead, use regular pgflow.\* SQL functions or functions that are
32+
available in pkgs/core/supabase/tests/seed.sql:
33+
34+
!`grep 'function.*pgflow_tests' pkgs/core/supabase/seed.sql -A7`
35+
36+
Check how they are used in other tests.
37+
38+
<feature_to_implement>
39+
$ARGUMENTS
40+
</feature_to_implement>

pkgs/core/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,11 @@ The system handles failures by:
234234
- Marking the step as 'failed'
235235
- Marking the run as 'failed'
236236
- Archiving the message in PGMQ
237-
- Notifying workers to abort pending tasks (future feature)
237+
- **Archiving all queued messages for the failed run** (preventing orphaned messages)
238+
4. Additional failure handling:
239+
- **No retries on already-failed runs** - tasks are immediately marked as failed
240+
- **Graceful type constraint violations** - handled without exceptions when single steps feed map steps
241+
- **Performance-optimized message archiving** using indexed queries
238242

239243
#### Retries and Timeouts
240244

pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ DECLARE
88
v_iterations int := 0;
99
v_max_iterations int := 50;
1010
BEGIN
11+
-- ==========================================
12+
-- GUARD: No mutations on failed runs
13+
-- ==========================================
14+
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = cascade_complete_taskless_steps.run_id AND pgflow.runs.status = 'failed') THEN
15+
RETURN 0;
16+
END IF;
17+
1118
-- ==========================================
1219
-- ITERATIVE CASCADE COMPLETION
1320
-- ==========================================

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,17 @@ declare
1414
v_dependent_map_slug text;
1515
begin
1616

17+
-- ==========================================
18+
-- GUARD: No mutations on failed runs
19+
-- ==========================================
20+
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN
21+
RETURN QUERY SELECT * FROM pgflow.step_tasks
22+
WHERE pgflow.step_tasks.run_id = complete_task.run_id
23+
AND pgflow.step_tasks.step_slug = complete_task.step_slug
24+
AND pgflow.step_tasks.task_index = complete_task.task_index;
25+
RETURN;
26+
END IF;
27+
1728
-- ==========================================
1829
-- VALIDATION: Array output for dependent maps
1930
-- ==========================================
@@ -37,11 +48,55 @@ WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing
3748
LIMIT 1;
3849

3950
IF v_dependent_map_slug IS NOT NULL THEN
40-
RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)',
41-
v_dependent_map_slug,
42-
complete_task.step_slug,
43-
CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END,
44-
complete_task.output;
51+
-- Mark run as failed immediately
52+
UPDATE pgflow.runs
53+
SET status = 'failed',
54+
failed_at = now()
55+
WHERE pgflow.runs.run_id = complete_task.run_id;
56+
57+
-- Archive all queued messages
58+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
59+
FROM pgflow.step_tasks st
60+
JOIN pgflow.runs r ON st.run_id = r.run_id
61+
WHERE st.run_id = complete_task.run_id
62+
AND st.status = 'queued'
63+
AND st.message_id IS NOT NULL;
64+
65+
-- Mark current task as failed
66+
UPDATE pgflow.step_tasks
67+
SET status = 'failed',
68+
failed_at = now(),
69+
error_message = '[TYPE_VIOLATION] Produced ' ||
70+
CASE WHEN complete_task.output IS NULL THEN 'null'
71+
ELSE jsonb_typeof(complete_task.output) END ||
72+
' instead of array'
73+
WHERE pgflow.step_tasks.run_id = complete_task.run_id
74+
AND pgflow.step_tasks.step_slug = complete_task.step_slug
75+
AND pgflow.step_tasks.task_index = complete_task.task_index;
76+
77+
-- Mark step state as failed
78+
UPDATE pgflow.step_states
79+
SET status = 'failed',
80+
failed_at = now(),
81+
error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug ||
82+
' expects array input but dependency ' || complete_task.step_slug ||
83+
' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null'
84+
ELSE jsonb_typeof(complete_task.output) END
85+
WHERE pgflow.step_states.run_id = complete_task.run_id
86+
AND pgflow.step_states.step_slug = complete_task.step_slug;
87+
88+
-- Archive the current task's message (it was started, now failed)
89+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
90+
FROM pgflow.step_tasks st
91+
JOIN pgflow.runs r ON st.run_id = r.run_id
92+
WHERE st.run_id = complete_task.run_id
93+
AND st.step_slug = complete_task.step_slug
94+
AND st.task_index = complete_task.task_index
95+
AND st.message_id IS NOT NULL;
96+
97+
-- Return empty result
98+
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
99+
RETURN;
45100
END IF;
46101

47102
-- ==========================================

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,33 @@ DECLARE
1414
v_step_failed boolean;
1515
begin
1616

17+
-- If run is already failed, no retries allowed
18+
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN
19+
UPDATE pgflow.step_tasks
20+
SET status = 'failed',
21+
failed_at = now(),
22+
error_message = '[RUN_ALREADY_FAILED] ' || fail_task.error_message
23+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
24+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
25+
AND pgflow.step_tasks.task_index = fail_task.task_index
26+
AND pgflow.step_tasks.status = 'started';
27+
28+
-- Archive the task's message
29+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
30+
FROM pgflow.step_tasks st
31+
JOIN pgflow.runs r ON st.run_id = r.run_id
32+
WHERE st.run_id = fail_task.run_id
33+
AND st.step_slug = fail_task.step_slug
34+
AND st.task_index = fail_task.task_index
35+
AND st.message_id IS NOT NULL;
36+
37+
RETURN QUERY SELECT * FROM pgflow.step_tasks
38+
WHERE pgflow.step_tasks.run_id = fail_task.run_id
39+
AND pgflow.step_tasks.step_slug = fail_task.step_slug
40+
AND pgflow.step_tasks.task_index = fail_task.task_index;
41+
RETURN;
42+
END IF;
43+
1744
WITH run_lock AS (
1845
SELECT * FROM pgflow.runs
1946
WHERE pgflow.runs.run_id = fail_task.run_id
@@ -140,6 +167,16 @@ IF v_run_failed THEN
140167
END;
141168
END IF;
142169

170+
-- Archive all queued messages when run fails
171+
IF v_run_failed THEN
172+
PERFORM pgmq.archive(r.flow_slug, st.message_id)
173+
FROM pgflow.step_tasks st
174+
JOIN pgflow.runs r ON st.run_id = r.run_id
175+
WHERE st.run_id = fail_task.run_id
176+
AND st.status = 'queued'
177+
AND st.message_id IS NOT NULL;
178+
END IF;
179+
143180
-- For queued tasks: delay the message for retry with exponential backoff
144181
PERFORM (
145182
WITH retry_config AS (

pkgs/core/schemas/0100_function_start_ready_steps.sql

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
create or replace function pgflow.start_ready_steps(run_id uuid)
22
returns void
3-
language sql
3+
language plpgsql
44
set search_path to ''
55
as $$
6+
begin
7+
-- ==========================================
8+
-- GUARD: No mutations on failed runs
9+
-- ==========================================
10+
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN
11+
RETURN;
12+
END IF;
13+
614
-- ==========================================
715
-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0)
816
-- ==========================================
@@ -165,4 +173,5 @@ SELECT
165173
sent_messages.msg_id
166174
FROM sent_messages;
167175

176+
end;
168177
$$;

0 commit comments

Comments
 (0)