Skip to content

Commit 66e3c3b

Browse files
committed
feat: Add scripts and tests for task spawning and step execution in flow management
- Introduced collect_perf_data.sh for performance testing of large array handling - Updated start_ready_steps function to handle empty map steps and initialize task states - Added migration script to modify start_ready_steps for correct task spawning - Created tests for map step message queueing, delayed message scheduling, and task spawning - Ensured proper handling of initial_tasks, task indices, and step status transitions - Included tests for both map and single steps to verify correct task creation and message dispatching
1 parent c0fc2a1 commit 66e3c3b

File tree

7 files changed

+593
-80
lines changed

7 files changed

+593
-80
lines changed

PLAN.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
- Enhanced start_flow() for root map validation and count setting
1919
- Tests for root map scenarios
2020

21-
- [ ] **Task Spawning**
21+
- [x] **PR #210: Task Spawning** - `09-12-task-spawning` (COMPLETED)
2222

2323
- Enhanced start_ready_steps() for N task generation
2424
- Empty array auto-completion

pkgs/core/schemas/0100_function_start_ready_steps.sql

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,66 @@ language sql
44
set search_path to ''
55
as $$
66

7-
WITH ready_steps AS (
7+
-- First handle empty array map steps (initial_tasks = 0) - direct transition to completed
8+
WITH empty_map_steps AS (
9+
SELECT step_state.*
10+
FROM pgflow.step_states AS step_state
11+
JOIN pgflow.steps AS step
12+
ON step.flow_slug = step_state.flow_slug
13+
AND step.step_slug = step_state.step_slug
14+
WHERE step_state.run_id = start_ready_steps.run_id
15+
AND step_state.status = 'created'
16+
AND step_state.remaining_deps = 0
17+
AND step.step_type = 'map'
18+
AND step_state.initial_tasks = 0
19+
ORDER BY step_state.step_slug
20+
FOR UPDATE OF step_state
21+
),
22+
completed_empty_steps AS (
23+
UPDATE pgflow.step_states
24+
SET status = 'completed',
25+
started_at = now(),
26+
completed_at = now(),
27+
remaining_tasks = 0
28+
FROM empty_map_steps
29+
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
30+
AND pgflow.step_states.step_slug = empty_map_steps.step_slug
31+
RETURNING pgflow.step_states.*
32+
),
33+
broadcast_empty_completed AS (
34+
SELECT
35+
realtime.send(
36+
jsonb_build_object(
37+
'event_type', 'step:completed',
38+
'run_id', completed_step.run_id,
39+
'step_slug', completed_step.step_slug,
40+
'status', 'completed',
41+
'started_at', completed_step.started_at,
42+
'completed_at', completed_step.completed_at,
43+
'remaining_tasks', 0,
44+
'remaining_deps', 0,
45+
'output', '[]'::jsonb
46+
),
47+
concat('step:', completed_step.step_slug, ':completed'),
48+
concat('pgflow:run:', completed_step.run_id),
49+
false
50+
)
51+
FROM completed_empty_steps AS completed_step
52+
),
53+
54+
-- Now handle non-empty steps (both single and map with initial_tasks > 0)
55+
ready_steps AS (
856
SELECT *
957
FROM pgflow.step_states AS step_state
1058
WHERE step_state.run_id = start_ready_steps.run_id
1159
AND step_state.status = 'created'
1260
AND step_state.remaining_deps = 0
61+
-- Exclude empty map steps already handled
62+
AND NOT EXISTS (
63+
SELECT 1 FROM empty_map_steps
64+
WHERE empty_map_steps.run_id = step_state.run_id
65+
AND empty_map_steps.step_slug = step_state.step_slug
66+
)
1367
ORDER BY step_state.step_slug
1468
FOR UPDATE
1569
),
@@ -23,26 +77,48 @@ started_step_states AS (
2377
AND pgflow.step_states.step_slug = ready_steps.step_slug
2478
RETURNING pgflow.step_states.*
2579
),
26-
sent_messages AS (
80+
81+
-- Generate tasks based on initial_tasks count
82+
-- For single steps: initial_tasks = 1, so generate_series(0, 0) = single task with index 0
83+
-- For map steps: initial_tasks = N, so generate_series(0, N-1) = N tasks with indices 0..N-1
84+
-- Group messages by step for batch sending
85+
message_batches AS (
2786
SELECT
2887
started_step.flow_slug,
2988
started_step.run_id,
3089
started_step.step_slug,
31-
pgmq.send(
32-
started_step.flow_slug,
90+
COALESCE(step.opt_start_delay, 0) as delay,
91+
array_agg(
3392
jsonb_build_object(
3493
'flow_slug', started_step.flow_slug,
3594
'run_id', started_step.run_id,
3695
'step_slug', started_step.step_slug,
37-
'task_index', 0
38-
),
39-
COALESCE(step.opt_start_delay, 0)
40-
) AS msg_id
96+
'task_index', task_idx.task_index
97+
) ORDER BY task_idx.task_index
98+
) AS messages,
99+
array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices
41100
FROM started_step_states AS started_step
42101
JOIN pgflow.steps AS step
43102
ON step.flow_slug = started_step.flow_slug
44103
AND step.step_slug = started_step.step_slug
104+
-- Generate task indices from 0 to initial_tasks-1
105+
CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index)
106+
GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay
45107
),
108+
-- Send messages in batch for better performance with large arrays
109+
sent_messages AS (
110+
SELECT
111+
mb.flow_slug,
112+
mb.run_id,
113+
mb.step_slug,
114+
task_indices.task_index,
115+
msg_ids.msg_id
116+
FROM message_batches mb
117+
CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord)
118+
CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord)
119+
WHERE task_indices.idx_ord = msg_ids.msg_ord
120+
),
121+
46122
broadcast_events AS (
47123
SELECT
48124
realtime.send(
@@ -61,11 +137,14 @@ broadcast_events AS (
61137
)
62138
FROM started_step_states AS started_step
63139
)
64-
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, message_id)
140+
141+
-- Insert all generated tasks with their respective task_index values
142+
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id)
65143
SELECT
66144
sent_messages.flow_slug,
67145
sent_messages.run_id,
68146
sent_messages.step_slug,
147+
sent_messages.task_index,
69148
sent_messages.msg_id
70149
FROM sent_messages;
71150

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
-- Modify "start_ready_steps" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$
3+
-- First handle empty array map steps (initial_tasks = 0) - direct transition to completed
4+
WITH empty_map_steps AS (
5+
SELECT step_state.*
6+
FROM pgflow.step_states AS step_state
7+
JOIN pgflow.steps AS step
8+
ON step.flow_slug = step_state.flow_slug
9+
AND step.step_slug = step_state.step_slug
10+
WHERE step_state.run_id = start_ready_steps.run_id
11+
AND step_state.status = 'created'
12+
AND step_state.remaining_deps = 0
13+
AND step.step_type = 'map'
14+
AND step_state.initial_tasks = 0
15+
ORDER BY step_state.step_slug
16+
FOR UPDATE OF step_state
17+
),
18+
completed_empty_steps AS (
19+
UPDATE pgflow.step_states
20+
SET status = 'completed',
21+
started_at = now(),
22+
completed_at = now(),
23+
remaining_tasks = 0
24+
FROM empty_map_steps
25+
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
26+
AND pgflow.step_states.step_slug = empty_map_steps.step_slug
27+
RETURNING pgflow.step_states.*
28+
),
29+
broadcast_empty_completed AS (
30+
SELECT
31+
realtime.send(
32+
jsonb_build_object(
33+
'event_type', 'step:completed',
34+
'run_id', completed_step.run_id,
35+
'step_slug', completed_step.step_slug,
36+
'status', 'completed',
37+
'started_at', completed_step.started_at,
38+
'completed_at', completed_step.completed_at,
39+
'remaining_tasks', 0,
40+
'remaining_deps', 0,
41+
'output', '[]'::jsonb
42+
),
43+
concat('step:', completed_step.step_slug, ':completed'),
44+
concat('pgflow:run:', completed_step.run_id),
45+
false
46+
)
47+
FROM completed_empty_steps AS completed_step
48+
),
49+
50+
-- Now handle non-empty steps (both single and map with initial_tasks > 0)
51+
ready_steps AS (
52+
SELECT *
53+
FROM pgflow.step_states AS step_state
54+
WHERE step_state.run_id = start_ready_steps.run_id
55+
AND step_state.status = 'created'
56+
AND step_state.remaining_deps = 0
57+
-- Exclude empty map steps already handled
58+
AND NOT EXISTS (
59+
SELECT 1 FROM empty_map_steps
60+
WHERE empty_map_steps.run_id = step_state.run_id
61+
AND empty_map_steps.step_slug = step_state.step_slug
62+
)
63+
ORDER BY step_state.step_slug
64+
FOR UPDATE
65+
),
66+
started_step_states AS (
67+
UPDATE pgflow.step_states
68+
SET status = 'started',
69+
started_at = now(),
70+
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
71+
FROM ready_steps
72+
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
73+
AND pgflow.step_states.step_slug = ready_steps.step_slug
74+
RETURNING pgflow.step_states.*
75+
),
76+
77+
-- Generate tasks based on initial_tasks count
78+
-- For single steps: initial_tasks = 1, so generate_series(0, 0) = single task with index 0
79+
-- For map steps: initial_tasks = N, so generate_series(0, N-1) = N tasks with indices 0..N-1
80+
-- Group messages by step for batch sending
81+
message_batches AS (
82+
SELECT
83+
started_step.flow_slug,
84+
started_step.run_id,
85+
started_step.step_slug,
86+
COALESCE(step.opt_start_delay, 0) as delay,
87+
array_agg(
88+
jsonb_build_object(
89+
'flow_slug', started_step.flow_slug,
90+
'run_id', started_step.run_id,
91+
'step_slug', started_step.step_slug,
92+
'task_index', task_idx.task_index
93+
) ORDER BY task_idx.task_index
94+
) AS messages,
95+
array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices
96+
FROM started_step_states AS started_step
97+
JOIN pgflow.steps AS step
98+
ON step.flow_slug = started_step.flow_slug
99+
AND step.step_slug = started_step.step_slug
100+
-- Generate task indices from 0 to initial_tasks-1
101+
CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index)
102+
GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay
103+
),
104+
-- Send messages in batch for better performance with large arrays
105+
sent_messages AS (
106+
SELECT
107+
mb.flow_slug,
108+
mb.run_id,
109+
mb.step_slug,
110+
task_indices.task_index,
111+
msg_ids.msg_id
112+
FROM message_batches mb
113+
CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord)
114+
CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord)
115+
WHERE task_indices.idx_ord = msg_ids.msg_ord
116+
),
117+
118+
broadcast_events AS (
119+
SELECT
120+
realtime.send(
121+
jsonb_build_object(
122+
'event_type', 'step:started',
123+
'run_id', started_step.run_id,
124+
'step_slug', started_step.step_slug,
125+
'status', 'started',
126+
'started_at', started_step.started_at,
127+
'remaining_tasks', started_step.remaining_tasks,
128+
'remaining_deps', started_step.remaining_deps
129+
),
130+
concat('step:', started_step.step_slug, ':started'),
131+
concat('pgflow:run:', started_step.run_id),
132+
false
133+
)
134+
FROM started_step_states AS started_step
135+
)
136+
137+
-- Insert all generated tasks with their respective task_index values
138+
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id)
139+
SELECT
140+
sent_messages.flow_slug,
141+
sent_messages.run_id,
142+
sent_messages.step_slug,
143+
sent_messages.task_index,
144+
sent_messages.msg_id
145+
FROM sent_messages;
146+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:4npyXHLdBlxXDvZBwovtYW/ZqGaaZprPdyxEsmO4Iz0=
1+
h1:B1TUYLUWFgJLPH1+HbGlvevYntnO8YIG4ysa9K1dKkE=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -10,3 +10,4 @@ h1:4npyXHLdBlxXDvZBwovtYW/ZqGaaZprPdyxEsmO4Iz0=
1010
20250719205006_pgflow_worker_deprecation.sql h1:L3LDsVrUeABlRBXhHsu60bilfgDKEJHci5xWknH9XIg=
1111
20250912075001_pgflow_temp_pr1_schema.sql h1:zVvGuRX/m8uPFCuJ7iAqOQ71onkCtze6P9d9ZsOgs98=
1212
20250912080800_pgflow_temp_pr2_root_maps.sql h1:v2KdChKBPBOIq3nCVVtKWy1OVcIROV+tPtaTUPQujSo=
13+
20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto=

0 commit comments

Comments
 (0)