Skip to content

Commit 6d7f91f

Browse files
committed
feat: add map step type in sql
1 parent 08a197e commit 6d7f91f

16 files changed

+1091
-416
lines changed

PLAN.md

Lines changed: 575 additions & 362 deletions
Large diffs are not rendered by default.

pkgs/core/schemas/0050_tables_definitions.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ create table pgflow.steps (
2828
primary key (flow_slug, step_slug),
2929
unique (flow_slug, step_index), -- Ensure step_index is unique within a flow
3030
check (pgflow.is_valid_slug(step_slug)),
31-
check (step_type in ('single')),
31+
check (step_type in ('single', 'map')),
3232
constraint opt_max_attempts_is_nonnegative check (opt_max_attempts is null or opt_max_attempts >= 0),
3333
constraint opt_base_delay_is_nonnegative check (opt_base_delay is null or opt_base_delay >= 0),
3434
constraint opt_timeout_is_positive check (opt_timeout is null or opt_timeout > 0),

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ create table pgflow.step_states (
2626
run_id uuid not null references pgflow.runs (run_id),
2727
step_slug text not null,
2828
status text not null default 'created',
29-
remaining_tasks int not null default 1 check (remaining_tasks >= 0),
29+
remaining_tasks int NULL, -- NULL = not started, >0 = active countdown
30+
initial_tasks int DEFAULT 1 CHECK (initial_tasks >= 0), -- Planned task count: 1 for singles, N for maps
3031
remaining_deps int not null default 0 check (remaining_deps >= 0),
3132
error_message text,
3233
created_at timestamptz not null default now(),
@@ -38,6 +39,10 @@ create table pgflow.step_states (
3839
references pgflow.steps (flow_slug, step_slug),
3940
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed')),
4041
constraint status_and_remaining_tasks_match check (status != 'completed' or remaining_tasks = 0),
42+
-- Add constraint to ensure remaining_tasks is only set when step has started
43+
CONSTRAINT remaining_tasks_state_consistency CHECK (
44+
remaining_tasks IS NULL OR status != 'created'
45+
),
4146
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
4247
constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at),
4348
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),
@@ -76,7 +81,6 @@ create table pgflow.step_tasks (
7681
constraint output_valid_only_for_completed check (
7782
output is null or status = 'completed'
7883
),
79-
constraint only_single_task_per_step check (task_index = 0),
8084
constraint attempts_count_nonnegative check (attempts_count >= 0),
8185
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
8286
constraint completed_at_is_after_queued_at check (completed_at is null or completed_at >= queued_at),
Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,65 @@
11
create or replace function pgflow.add_step(
22
flow_slug text,
33
step_slug text,
4-
deps_slugs text [],
4+
deps_slugs text [] default '{}',
55
max_attempts int default null,
66
base_delay int default null,
77
timeout int default null,
8-
start_delay int default null
8+
start_delay int default null,
9+
step_type text default 'single'
910
)
1011
returns pgflow.steps
11-
language sql
12+
language plpgsql
1213
set search_path to ''
1314
volatile
1415
as $$
15-
WITH
16-
next_index AS (
17-
SELECT COALESCE(MAX(step_index) + 1, 0) as idx
18-
FROM pgflow.steps
19-
WHERE flow_slug = add_step.flow_slug
20-
),
21-
create_step AS (
22-
INSERT INTO pgflow.steps (flow_slug, step_slug, step_index, deps_count, opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay)
23-
SELECT add_step.flow_slug, add_step.step_slug, idx, COALESCE(array_length(deps_slugs, 1), 0), max_attempts, base_delay, timeout, start_delay
24-
FROM next_index
25-
ON CONFLICT (flow_slug, step_slug)
26-
DO UPDATE SET step_slug = pgflow.steps.step_slug
27-
RETURNING *
28-
),
29-
insert_deps AS (
30-
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
31-
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
32-
FROM unnest(deps_slugs) AS d(dep_slug)
33-
ON CONFLICT (flow_slug, dep_slug, step_slug) DO NOTHING
34-
RETURNING 1
16+
DECLARE
17+
result_step pgflow.steps;
18+
next_idx int;
19+
BEGIN
20+
-- Validate map step constraints
21+
-- Map steps can have either:
22+
-- 0 dependencies (root map - maps over flow input array)
23+
-- 1 dependency (dependent map - maps over dependency output array)
24+
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
25+
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
26+
add_step.step_slug,
27+
COALESCE(array_length(add_step.deps_slugs, 1), 0),
28+
array_to_string(add_step.deps_slugs, ', ');
29+
END IF;
30+
31+
-- Get next step index
32+
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
33+
FROM pgflow.steps s
34+
WHERE s.flow_slug = add_step.flow_slug;
35+
36+
-- Create the step
37+
INSERT INTO pgflow.steps (
38+
flow_slug, step_slug, step_type, step_index, deps_count,
39+
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay
3540
)
36-
-- Return the created step
37-
SELECT * FROM create_step;
38-
$$;
41+
VALUES (
42+
add_step.flow_slug,
43+
add_step.step_slug,
44+
COALESCE(add_step.step_type, 'single'),
45+
next_idx,
46+
COALESCE(array_length(add_step.deps_slugs, 1), 0),
47+
add_step.max_attempts,
48+
add_step.base_delay,
49+
add_step.timeout,
50+
add_step.start_delay
51+
)
52+
ON CONFLICT ON CONSTRAINT steps_pkey
53+
DO UPDATE SET step_slug = EXCLUDED.step_slug
54+
RETURNING * INTO result_step;
3955

40-
-- New overloaded function without deps_slugs parameter
41-
create or replace function pgflow.add_step(
42-
flow_slug text,
43-
step_slug text,
44-
max_attempts int default null,
45-
base_delay int default null,
46-
timeout int default null,
47-
start_delay int default null
48-
)
49-
returns pgflow.steps
50-
language sql
51-
set search_path to ''
52-
volatile
53-
as $$
54-
-- Call the original function with an empty array
55-
SELECT * FROM pgflow.add_step(flow_slug, step_slug, ARRAY[]::text[], max_attempts, base_delay, timeout, start_delay);
56-
$$;
56+
-- Insert dependencies
57+
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
58+
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
59+
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
60+
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
61+
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
62+
63+
RETURN result_step;
64+
END;
65+
$$;

pkgs/core/schemas/0100_function_start_flow.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ WITH
2929
RETURNING *
3030
),
3131
created_step_states AS (
32-
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps)
32+
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks)
3333
SELECT
3434
fs.flow_slug,
3535
(SELECT created_run.run_id FROM created_run),
3636
fs.step_slug,
37-
fs.deps_count
37+
fs.deps_count,
38+
1 -- For now, all steps get initial_tasks = 1 (single steps)
3839
FROM flow_steps fs
3940
)
4041
SELECT * FROM created_run INTO v_created_run;

pkgs/core/schemas/0100_function_start_ready_steps.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ WITH ready_steps AS (
1616
started_step_states AS (
1717
UPDATE pgflow.step_states
1818
SET status = 'started',
19-
started_at = now()
19+
started_at = now(),
20+
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
2021
FROM ready_steps
2122
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
2223
AND pgflow.step_states.step_slug = ready_steps.step_slug
@@ -51,7 +52,7 @@ broadcast_events AS (
5152
'step_slug', started_step.step_slug,
5253
'status', 'started',
5354
'started_at', started_step.started_at,
54-
'remaining_tasks', 1,
55+
'remaining_tasks', started_step.remaining_tasks,
5556
'remaining_deps', started_step.remaining_deps
5657
),
5758
concat('step:', started_step.step_slug, ':started'),
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
-- Modify "step_states" table
2+
ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "step_states_remaining_tasks_check", ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> 'created'::text)), ADD CONSTRAINT "step_states_initial_tasks_check" CHECK (initial_tasks >= 0), ALTER COLUMN "remaining_tasks" DROP NOT NULL, ALTER COLUMN "remaining_tasks" DROP DEFAULT, ADD COLUMN "initial_tasks" integer NULL DEFAULT 1;
3+
-- Modify "step_tasks" table
4+
ALTER TABLE "pgflow"."step_tasks" DROP CONSTRAINT "only_single_task_per_step";
5+
-- Modify "steps" table
6+
ALTER TABLE "pgflow"."steps" DROP CONSTRAINT "steps_step_type_check", ADD CONSTRAINT "steps_step_type_check" CHECK (step_type = ANY (ARRAY['single'::text, 'map'::text]));
7+
-- Modify "start_ready_steps" function
8+
CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$
9+
WITH ready_steps AS (
10+
SELECT *
11+
FROM pgflow.step_states AS step_state
12+
WHERE step_state.run_id = start_ready_steps.run_id
13+
AND step_state.status = 'created'
14+
AND step_state.remaining_deps = 0
15+
ORDER BY step_state.step_slug
16+
FOR UPDATE
17+
),
18+
started_step_states AS (
19+
UPDATE pgflow.step_states
20+
SET status = 'started',
21+
started_at = now(),
22+
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
23+
FROM ready_steps
24+
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
25+
AND pgflow.step_states.step_slug = ready_steps.step_slug
26+
RETURNING pgflow.step_states.*
27+
),
28+
sent_messages AS (
29+
SELECT
30+
started_step.flow_slug,
31+
started_step.run_id,
32+
started_step.step_slug,
33+
pgmq.send(
34+
started_step.flow_slug,
35+
jsonb_build_object(
36+
'flow_slug', started_step.flow_slug,
37+
'run_id', started_step.run_id,
38+
'step_slug', started_step.step_slug,
39+
'task_index', 0
40+
),
41+
COALESCE(step.opt_start_delay, 0)
42+
) AS msg_id
43+
FROM started_step_states AS started_step
44+
JOIN pgflow.steps AS step
45+
ON step.flow_slug = started_step.flow_slug
46+
AND step.step_slug = started_step.step_slug
47+
),
48+
broadcast_events AS (
49+
SELECT
50+
realtime.send(
51+
jsonb_build_object(
52+
'event_type', 'step:started',
53+
'run_id', started_step.run_id,
54+
'step_slug', started_step.step_slug,
55+
'status', 'started',
56+
'started_at', started_step.started_at,
57+
'remaining_tasks', started_step.remaining_tasks,
58+
'remaining_deps', started_step.remaining_deps
59+
),
60+
concat('step:', started_step.step_slug, ':started'),
61+
concat('pgflow:run:', started_step.run_id),
62+
false
63+
)
64+
FROM started_step_states AS started_step
65+
)
66+
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, message_id)
67+
SELECT
68+
sent_messages.flow_slug,
69+
sent_messages.run_id,
70+
sent_messages.step_slug,
71+
sent_messages.msg_id
72+
FROM sent_messages;
73+
$$;
74+
-- Modify "start_flow" function
75+
CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$
76+
declare
77+
v_created_run pgflow.runs%ROWTYPE;
78+
begin
79+
80+
WITH
81+
flow_steps AS (
82+
SELECT steps.flow_slug, steps.step_slug, steps.deps_count
83+
FROM pgflow.steps
84+
WHERE steps.flow_slug = start_flow.flow_slug
85+
),
86+
created_run AS (
87+
INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps)
88+
VALUES (
89+
COALESCE(start_flow.run_id, gen_random_uuid()),
90+
start_flow.flow_slug,
91+
start_flow.input,
92+
(SELECT count(*) FROM flow_steps)
93+
)
94+
RETURNING *
95+
),
96+
created_step_states AS (
97+
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks)
98+
SELECT
99+
fs.flow_slug,
100+
(SELECT created_run.run_id FROM created_run),
101+
fs.step_slug,
102+
fs.deps_count,
103+
1 -- For now, all steps get initial_tasks = 1 (single steps)
104+
FROM flow_steps fs
105+
)
106+
SELECT * FROM created_run INTO v_created_run;
107+
108+
-- Send broadcast event for run started
109+
PERFORM realtime.send(
110+
jsonb_build_object(
111+
'event_type', 'run:started',
112+
'run_id', v_created_run.run_id,
113+
'flow_slug', v_created_run.flow_slug,
114+
'input', v_created_run.input,
115+
'status', 'started',
116+
'remaining_steps', v_created_run.remaining_steps,
117+
'started_at', v_created_run.started_at
118+
),
119+
'run:started',
120+
concat('pgflow:run:', v_created_run.run_id),
121+
false
122+
);
123+
124+
PERFORM pgflow.start_ready_steps(v_created_run.run_id);
125+
126+
RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id;
127+
128+
end;
129+
$$;
130+
-- Create "add_step" function
131+
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
132+
DECLARE
133+
result_step pgflow.steps;
134+
next_idx int;
135+
BEGIN
136+
-- Validate map step constraints
137+
-- Map steps can have either:
138+
-- 0 dependencies (root map - maps over flow input array)
139+
-- 1 dependency (dependent map - maps over dependency output array)
140+
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
141+
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
142+
add_step.step_slug,
143+
COALESCE(array_length(add_step.deps_slugs, 1), 0),
144+
array_to_string(add_step.deps_slugs, ', ');
145+
END IF;
146+
147+
-- Get next step index
148+
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
149+
FROM pgflow.steps s
150+
WHERE s.flow_slug = add_step.flow_slug;
151+
152+
-- Create the step
153+
INSERT INTO pgflow.steps (
154+
flow_slug, step_slug, step_type, step_index, deps_count,
155+
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay
156+
)
157+
VALUES (
158+
add_step.flow_slug,
159+
add_step.step_slug,
160+
COALESCE(add_step.step_type, 'single'),
161+
next_idx,
162+
COALESCE(array_length(add_step.deps_slugs, 1), 0),
163+
add_step.max_attempts,
164+
add_step.base_delay,
165+
add_step.timeout,
166+
add_step.start_delay
167+
)
168+
ON CONFLICT ON CONSTRAINT steps_pkey
169+
DO UPDATE SET step_slug = EXCLUDED.step_slug
170+
RETURNING * INTO result_step;
171+
172+
-- Insert dependencies
173+
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
174+
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
175+
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
176+
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
177+
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;
178+
179+
RETURN result_step;
180+
END;
181+
$$;
182+
-- Drop "add_step" function
183+
DROP FUNCTION "pgflow"."add_step" (text, text, integer, integer, integer, integer);
184+
-- Drop "add_step" function
185+
DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer);

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:Ihi9DpliOFojbU4NqWbrFghYonnGhi61w1oX+y7Bd50=
1+
h1:kpo0hM5pNqe24N7+tWQ62QVsVmgdd1sIdSmb2f1yaqA=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -8,3 +8,4 @@ h1:Ihi9DpliOFojbU4NqWbrFghYonnGhi61w1oX+y7Bd50=
88
20250627090700_pgflow_fix_function_search_paths.sql h1:NRMbmDKkOww7pOx1TVERMP5UdjmgfH0wE9QhzfBU3co=
99
20250707210212_pgflow_add_opt_start_delay.sql h1:11J7SDgS6EVFUwxSi0bRZnNQgVGTV0EJGj9yuC0vczY=
1010
20250719205006_pgflow_worker_deprecation.sql h1:L3LDsVrUeABlRBXhHsu60bilfgDKEJHci5xWknH9XIg=
11+
20250911062632_pgflow_add_map_step_type.sql h1:3kygk7tr92R3ExyHUTZl5Sg9D2CzVCLyTIK6+ogeJy4=

0 commit comments

Comments
 (0)