Skip to content

Commit 275f3a6

Browse files
committed
fix(cascade): add safety limit to prevent infinite loops during taskless step completion
- Implemented a maximum iteration count of 50 in the cascade_complete_taskless_steps function - Added a safety counter to avoid potential infinite loops in cascade processing - Updated loop logic to raise an exception if safety limit is exceeded - Minor formatting adjustments for clarity and consistency
1 parent 05e2388 commit 275f3a6

19 files changed

+2253
-86
lines changed

.claude/settings.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"Bash(./scripts/atlas-migrate-diff:*)",
55
"Bash(./scripts/atlas-migrate-hash:*)",
66
"Bash(./scripts/run-test-with-colors:*)",
7+
"Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -c:*)",
78
"Bash(bin/run-test-with-colors:*)",
89
"Bash(cat:*)",
910
"Bash(cd:*)",
@@ -51,7 +52,8 @@
5152
"mcp__nx-mcp__nx_docs",
5253
"mcp__nx-mcp__nx_project_details",
5354
"mcp__nx-mcp__nx_workspace",
54-
"mcp__nx-mcp__nx_workspace_path"
55+
"mcp__nx-mcp__nx_workspace_path",
56+
"mcp__sequentialthinking__sequentialthinking"
5557
],
5658
"deny": []
5759
},

PLAN_cascade_complete_taskless_steps.md

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ This reduces cascade calls from 10,000 (every task) to 1 (when step completes)!
105105

106106
### The Cascade Function
107107

108-
Use a simple loop that completes all ready taskless steps:
108+
Use a simple loop that completes all ready taskless steps with safety measures:
109109

110110
```sql
111111
CREATE OR REPLACE FUNCTION pgflow.cascade_complete_taskless_steps(run_id uuid)
@@ -115,8 +115,16 @@ AS $$
115115
DECLARE
116116
v_total_completed int := 0;
117117
v_iteration_completed int;
118+
v_iterations int := 0;
119+
v_max_iterations int := 50; -- Safety limit matching worst-case analysis
118120
BEGIN
119121
LOOP
122+
-- Safety counter to prevent infinite loops
123+
v_iterations := v_iterations + 1;
124+
IF v_iterations > v_max_iterations THEN
125+
RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations;
126+
END IF;
127+
120128
WITH completed AS (
121129
UPDATE pgflow.step_states
122130
SET status = 'completed',
@@ -133,24 +141,25 @@ BEGIN
133141
UPDATE pgflow.step_states ss
134142
SET remaining_deps = ss.remaining_deps - 1
135143
FROM completed c
136-
JOIN pgflow.deps d ON d.flow_slug = c.flow_slug
144+
JOIN pgflow.deps d ON d.flow_slug = c.flow_slug
137145
AND d.dep_slug = c.step_slug
138-
WHERE ss.run_id = c.run_id
146+
WHERE ss.run_id = c.run_id
139147
AND ss.step_slug = d.step_slug
140148
),
141149
-- Send realtime events and update run count...
142150
SELECT COUNT(*) INTO v_iteration_completed FROM completed;
143-
151+
144152
EXIT WHEN v_iteration_completed = 0;
145153
v_total_completed := v_total_completed + v_iteration_completed;
146154
END LOOP;
147-
155+
148156
RETURN v_total_completed;
149157
END;
150158
$$;
151159
```
152160

153161
**Performance**: 50 iterations once per step completion is acceptable
162+
**Safety**: Hard iteration limit prevents infinite loops from logic errors
154163

155164
### Integration Points
156165

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
create or replace function pgflow.cascade_complete_taskless_steps(run_id uuid)
2+
returns int
3+
language plpgsql
4+
set search_path to ''
5+
as $$
6+
declare
7+
v_total_completed int := 0;
8+
v_iteration_completed int;
9+
v_iterations int := 0;
10+
v_max_iterations int := 50; -- Safety limit matching worst-case analysis
11+
begin
12+
loop
13+
-- Safety counter to prevent infinite loops
14+
v_iterations := v_iterations + 1;
15+
if v_iterations > v_max_iterations then
16+
raise exception 'Cascade loop exceeded safety limit of % iterations', v_max_iterations;
17+
end if;
18+
19+
-- Complete all ready taskless steps and update dependencies in one statement
20+
with completed as (
21+
update pgflow.step_states
22+
set status = 'completed',
23+
started_at = now(),
24+
completed_at = now(),
25+
remaining_tasks = 0
26+
where step_states.run_id = cascade_complete_taskless_steps.run_id
27+
and status = 'created'
28+
and remaining_deps = 0
29+
and initial_tasks = 0
30+
returning *
31+
),
32+
dep_updates as (
33+
update pgflow.step_states ss
34+
set remaining_deps = ss.remaining_deps - dep_counts.completed_deps_count,
35+
-- For map dependents of taskless steps, set initial_tasks to 0
36+
initial_tasks = case
37+
when st.step_type = 'map' then 0
38+
else ss.initial_tasks
39+
end
40+
from (
41+
-- Count how many dependencies completed for each dependent step
42+
select
43+
d.step_slug as dependent_step_slug,
44+
count(*) as completed_deps_count
45+
from completed c
46+
join pgflow.deps d on d.flow_slug = c.flow_slug
47+
and d.dep_slug = c.step_slug
48+
where c.run_id = cascade_complete_taskless_steps.run_id
49+
group by d.step_slug
50+
) dep_counts,
51+
pgflow.steps st
52+
where ss.run_id = cascade_complete_taskless_steps.run_id
53+
and ss.step_slug = dep_counts.dependent_step_slug
54+
and st.flow_slug = ss.flow_slug
55+
and st.step_slug = ss.step_slug
56+
),
57+
-- Update runs.remaining_steps
58+
run_updates as (
59+
update pgflow.runs r
60+
set remaining_steps = r.remaining_steps - (
61+
select count(*) from completed c where c.run_id = r.run_id
62+
)
63+
where r.run_id = cascade_complete_taskless_steps.run_id
64+
and exists (select 1 from completed c where c.run_id = r.run_id)
65+
),
66+
-- Send realtime events for all completed steps
67+
events_sent as (
68+
select c.*, realtime.send(
69+
jsonb_build_object(
70+
'event_type', 'step:completed',
71+
'run_id', c.run_id,
72+
'step_slug', c.step_slug,
73+
'status', 'completed',
74+
'started_at', c.started_at,
75+
'completed_at', c.completed_at,
76+
'output', '[]'::jsonb
77+
),
78+
concat('step:', c.step_slug, ':completed'),
79+
concat('pgflow:run:', c.run_id),
80+
false
81+
) as event_id
82+
from completed c
83+
)
84+
select count(*) into v_iteration_completed from events_sent;
85+
86+
exit when v_iteration_completed = 0;
87+
v_total_completed := v_total_completed + v_iteration_completed;
88+
end loop;
89+
90+
return v_total_completed;
91+
end;
92+
$$;

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ set search_path to ''
1111
as $$
1212
declare
1313
v_step_state pgflow.step_states%ROWTYPE;
14+
v_updated_deps int;
1415
begin
1516

1617
WITH run_lock AS (
@@ -28,12 +29,13 @@ task AS (
2829
UPDATE pgflow.step_tasks
2930
SET
3031
status = 'completed',
32+
started_at = COALESCE(started_at, now()),
3133
completed_at = now(),
3234
output = complete_task.output
3335
WHERE pgflow.step_tasks.run_id = complete_task.run_id
3436
AND pgflow.step_tasks.step_slug = complete_task.step_slug
3537
AND pgflow.step_tasks.task_index = complete_task.task_index
36-
AND pgflow.step_tasks.status = 'started'
38+
AND pgflow.step_tasks.status IN ('started', 'queued')
3739
RETURNING *
3840
),
3941
step_state AS (
@@ -52,29 +54,6 @@ step_state AS (
5254
WHERE pgflow.step_states.run_id = complete_task.run_id
5355
AND pgflow.step_states.step_slug = complete_task.step_slug
5456
RETURNING pgflow.step_states.*
55-
),
56-
-- Find all dependent steps if the current step was completed
57-
dependent_steps AS (
58-
SELECT d.step_slug AS dependent_step_slug
59-
FROM pgflow.deps d
60-
JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug
61-
WHERE d.dep_slug = complete_task.step_slug
62-
ORDER BY d.step_slug -- Ensure consistent ordering
63-
),
64-
-- Lock dependent steps before updating
65-
dependent_steps_lock AS (
66-
SELECT * FROM pgflow.step_states
67-
WHERE pgflow.step_states.run_id = complete_task.run_id
68-
AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps)
69-
FOR UPDATE
70-
),
71-
-- Update all dependent steps
72-
dependent_steps_update AS (
73-
UPDATE pgflow.step_states
74-
SET remaining_deps = pgflow.step_states.remaining_deps - 1
75-
FROM dependent_steps
76-
WHERE pgflow.step_states.run_id = complete_task.run_id
77-
AND pgflow.step_states.step_slug = dependent_steps.dependent_step_slug
7857
)
7958
-- Only decrement remaining_steps, don't update status
8059
UPDATE pgflow.runs
@@ -87,6 +66,38 @@ WHERE pgflow.runs.run_id = complete_task.run_id
8766
SELECT * INTO v_step_state FROM pgflow.step_states
8867
WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug;
8968

69+
-- If the step completed, update dependent steps
70+
IF v_step_state.status = 'completed' THEN
71+
-- Update remaining_deps and initial_tasks for dependent steps
72+
UPDATE pgflow.step_states ss
73+
SET remaining_deps = ss.remaining_deps - 1,
74+
initial_tasks = CASE
75+
-- Only update initial_tasks for map dependents
76+
WHEN dep_step.step_type = 'map' THEN
77+
CASE
78+
-- If the completed step is a single step outputting an array
79+
WHEN src_step.step_type = 'single' AND jsonb_typeof(complete_task.output) = 'array' THEN
80+
jsonb_array_length(complete_task.output)
81+
-- If the completed step is a map step
82+
WHEN src_step.step_type = 'map' THEN
83+
v_step_state.initial_tasks
84+
ELSE
85+
ss.initial_tasks
86+
END
87+
ELSE
88+
ss.initial_tasks -- Non-map dependents keep their initial_tasks
89+
END
90+
FROM pgflow.deps d
91+
JOIN pgflow.steps dep_step ON dep_step.flow_slug = v_step_state.flow_slug
92+
AND dep_step.step_slug = d.step_slug
93+
JOIN pgflow.steps src_step ON src_step.flow_slug = v_step_state.flow_slug
94+
AND src_step.step_slug = complete_task.step_slug
95+
WHERE d.flow_slug = v_step_state.flow_slug
96+
AND d.dep_slug = complete_task.step_slug
97+
AND ss.run_id = complete_task.run_id
98+
AND ss.step_slug = d.step_slug;
99+
END IF;
100+
90101
-- Send broadcast event for step completed if the step is completed
91102
IF v_step_state.status = 'completed' THEN
92103
PERFORM realtime.send(
@@ -104,6 +115,9 @@ IF v_step_state.status = 'completed' THEN
104115
);
105116
END IF;
106117

118+
-- Always cascade after completing a task, in case dependent maps became taskless
119+
PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id);
120+
107121
-- For completed tasks: archive the message
108122
PERFORM (
109123
WITH completed_tasks AS (
@@ -132,3 +146,20 @@ WHERE step_task.run_id = complete_task.run_id
132146

133147
end;
134148
$$;
149+
150+
-- Convenience overload that accepts a JSONB task object
151+
create or replace function pgflow.complete_task(
152+
task jsonb,
153+
output jsonb
154+
)
155+
returns setof pgflow.step_tasks
156+
language sql
157+
set search_path to ''
158+
as $$
159+
select * from pgflow.complete_task(
160+
(task->>'run_id')::uuid,
161+
task->>'step_slug',
162+
(task->>'task_index')::int,
163+
output
164+
);
165+
$$;

pkgs/core/schemas/0100_function_start_flow.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,13 @@ PERFORM realtime.send(
9393
false
9494
);
9595

96+
-- First cascade complete any taskless steps, then start ready steps
97+
PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id);
9698
PERFORM pgflow.start_ready_steps(v_created_run.run_id);
9799

100+
-- Check if the run is already complete (all taskless flow)
101+
PERFORM pgflow.maybe_complete_run(v_created_run.run_id);
102+
98103
RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id;
99104

100105
end;

pkgs/core/schemas/0100_function_start_ready_steps.sql

Lines changed: 4 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,15 @@ language sql
44
set search_path to ''
55
as $$
66

7-
-- First handle empty array map steps (initial_tasks = 0) - direct transition to completed
8-
WITH empty_map_steps AS (
9-
SELECT step_state.*
10-
FROM pgflow.step_states AS step_state
11-
JOIN pgflow.steps AS step
12-
ON step.flow_slug = step_state.flow_slug
13-
AND step.step_slug = step_state.step_slug
14-
WHERE step_state.run_id = start_ready_steps.run_id
15-
AND step_state.status = 'created'
16-
AND step_state.remaining_deps = 0
17-
AND step.step_type = 'map'
18-
AND step_state.initial_tasks = 0
19-
ORDER BY step_state.step_slug
20-
FOR UPDATE OF step_state
21-
),
22-
completed_empty_steps AS (
23-
UPDATE pgflow.step_states
24-
SET status = 'completed',
25-
started_at = now(),
26-
completed_at = now(),
27-
remaining_tasks = 0
28-
FROM empty_map_steps
29-
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
30-
AND pgflow.step_states.step_slug = empty_map_steps.step_slug
31-
RETURNING pgflow.step_states.*
32-
),
33-
broadcast_empty_completed AS (
34-
SELECT
35-
realtime.send(
36-
jsonb_build_object(
37-
'event_type', 'step:completed',
38-
'run_id', completed_step.run_id,
39-
'step_slug', completed_step.step_slug,
40-
'status', 'completed',
41-
'started_at', completed_step.started_at,
42-
'completed_at', completed_step.completed_at,
43-
'remaining_tasks', 0,
44-
'remaining_deps', 0,
45-
'output', '[]'::jsonb
46-
),
47-
concat('step:', completed_step.step_slug, ':completed'),
48-
concat('pgflow:run:', completed_step.run_id),
49-
false
50-
)
51-
FROM completed_empty_steps AS completed_step
52-
),
53-
54-
-- Now handle non-empty steps (both single and map with initial_tasks > 0)
55-
ready_steps AS (
7+
-- Handle all ready steps that have tasks to spawn (initial_tasks > 0)
8+
-- Empty map steps (initial_tasks = 0) are now handled by cascade_complete_taskless_steps
9+
WITH ready_steps AS (
5610
SELECT *
5711
FROM pgflow.step_states AS step_state
5812
WHERE step_state.run_id = start_ready_steps.run_id
5913
AND step_state.status = 'created'
6014
AND step_state.remaining_deps = 0
61-
-- Exclude empty map steps already handled
62-
AND NOT EXISTS (
63-
SELECT 1 FROM empty_map_steps
64-
WHERE empty_map_steps.run_id = step_state.run_id
65-
AND empty_map_steps.step_slug = step_state.step_slug
66-
)
15+
AND step_state.initial_tasks > 0 -- Only handle steps with tasks to spawn
6716
ORDER BY step_state.step_slug
6817
FOR UPDATE
6918
),

0 commit comments

Comments
 (0)