Skip to content

Commit c0fc2a1

Browse files
committed
feat: add root map step support
1 parent 2496a29 commit c0fc2a1

17 files changed

+872
-70
lines changed

PLAN.md

Lines changed: 31 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,48 @@
11
# Map Infrastructure (SQL Core)
22

3+
**NOTE: This PLAN.md file should be removed in the final PR once all map infrastructure is complete.**
4+
35
## Implementation Status
46

57
### Sequential Child PR Plan
68

79
- [x] **PR #207: Add .array() to DSL** - `feature-map-and-array`
810
- TypeScript DSL enhancement for array creation
911
- Foundation for map step functionality
10-
- [x] **PR #208: Foundation - Schema & add_step()** - `09-10-feat_add_map_step_type_in_sql` (CURRENT PR)
11-
12+
- [x] **PR #208: Foundation - Schema & add_step()** - `09-10-feat_add_map_step_type_in_sql`
1213
- Schema changes (initial_tasks, remaining_tasks, constraints)
1314
- add_step() function with map step validation
1415
- Basic tests for map step creation
15-
16-
- [ ] **PR #209: Root Map Support** - `09-11-root-map-support`
16+
- [x] **PR #209: Root Map Support** - `09-11-root-map-support` (COMPLETED)
1717

1818
- Enhanced start_flow() for root map validation and count setting
1919
- Tests for root map scenarios
2020

21-
- [ ] **PR #210: Task Spawning** - `09-12-task-spawning`
21+
- [ ] **Task Spawning**
2222

2323
- Enhanced start_ready_steps() for N task generation
2424
- Empty array auto-completion
2525
- Tests for batch task creation
2626

27-
- [ ] **PR #211: Array Element Extraction** - `09-13-array-extraction`
27+
- [ ] **Array Element Extraction**
2828

2929
- Enhanced start_tasks() for map input extraction
3030
- Support for root and dependent maps
3131
- Tests for element extraction
3232

33-
- [ ] **PR #212: Dependent Map Support** - `09-14-dependent-map`
33+
- [ ] **Dependent Map Support**
3434

3535
- Enhanced complete_task() for map dependency handling
3636
- Array validation and count propagation
3737
- Tests for dependency scenarios
3838

39-
- [ ] **PR #213: Output Aggregation** - `09-15-output-aggregation`
39+
- [ ] **Output Aggregation**
4040

4141
- Enhanced maybe_complete_run() for array aggregation
4242
- Ordered output collection
4343
- Tests for aggregation
4444

45-
- [ ] **PR #214: Integration Tests** - `09-16-integration-tests`
45+
- [ ] **Integration Tests**
4646
- End-to-end test suite
4747
- Edge case coverage
4848
- Performance validation
@@ -97,54 +97,9 @@ The implementation is split across multiple PRs as shown in the Sequential Child
9797

9898
## Database Schema Changes
9999

100-
### Migration Strategy
101-
102-
Using Atlas migrations with the established pkgs/core/scripts workflow:
103-
104-
#### Initial Migration Generation
105-
106-
```bash
107-
# Navigate to pkgs/core directory
108-
cd pkgs/core
109-
110-
# First update schema files in pkgs/core/schemas/, then generate migration
111-
./scripts/atlas-migrate-diff add_map_step_type
112-
113-
# Review generated migration file, then apply and verify
114-
pnpm nx verify-migrations core
115-
```
116-
117-
#### Regenerating Migration After Schema Updates
118-
119-
```bash
120-
# 1. Decide on migration name
121-
migration_name=add_map_step_type
100+
For detailed schema development workflow, migration generation, and regeneration instructions, see:
122101

123-
# 1. Remove the previous version of the migration file
124-
git rm -f supabase/migrations/*_pgflow_${migration_name}.sql
125-
126-
# 2. Reset the Atlas hash to allow regeneration
127-
./scripts/atlas-migrate-hash --yes
128-
129-
# 3. reset database state to pre-migration
130-
pnpm nx supabase:reset core
131-
132-
# 4. Update schema files in pkgs/core/schemas/ as needed (or if already updated, skip this step)
133-
134-
# 5. generate the migration with the same name
135-
./scripts/atlas-migrate-diff ${migration_name}
136-
137-
# 6. verify the migration
138-
pnpm nx verify-migrations core
139-
```
140-
141-
**Key Points:**
142-
143-
- START WITH REMOVING PREVIOUS MIGRATION FILE BEFORE REGENERATING TO AVOID CONFLICTS !!!!
144-
- Always use the same migration name (`add_map_step_type`) for the entire PR
145-
- do not include `pgflow_` prefix when genrating migration - it is included by atlas-migrate-diff automatically
146-
- Reset Atlas hash before regeneration to allow the same name to be used
147-
- This maintains a single, comprehensive migration per PR
102+
- `.claude/schema_development.md` - Concise workflow guide
148103

149104
### Schema Updates (DONE)
150105

@@ -180,7 +135,7 @@ pnpm nx verify-migrations core
180135
- Added validation for map steps (max 1 dependency)
181136
- Function now stores step_type in database
182137

183-
### 2. `start_flow()` - Root Map Count Setting (TODO: PR #209)
138+
### 2. `start_flow()` - Root Map Count Setting (CURRENT PR)
184139

185140
**File**: `pkgs/core/schemas/0100_function_start_flow.sql`
186141

@@ -191,7 +146,7 @@ pnpm nx verify-migrations core
191146
- Set `initial_tasks = jsonb_array_length(input)` for root maps
192147
- Fail with clear error if input is not array for root map
193148

194-
### 3. `complete_task()` - Dependent Map Count Setting (TODO: PR #212)
149+
### 3. `complete_task()` - Dependent Map Count Setting (TODO)
195150

196151
**File**: `pkgs/core/schemas/0100_function_complete_task.sql`
197152

@@ -202,7 +157,7 @@ pnpm nx verify-migrations core
202157
- For map→map: count completed tasks, set `initial_tasks = task_count`
203158
- Fail with clear error if dependency output is not array when needed
204159

205-
### 4. `start_ready_steps()` - Task Spawning (TODO: PR #210)
160+
### 4. `start_ready_steps()` - Task Spawning (TODO)
206161

207162
**File**: `pkgs/core/schemas/0100_function_start_ready_steps.sql`
208163

@@ -218,7 +173,7 @@ pnpm nx verify-migrations core
218173
- Transition directly `created``completed` for initial_tasks=0
219174
- Send single `step:completed` event with `output: []`
220175

221-
### 5. `start_tasks()` - Array Element Extraction (TODO: PR #211)
176+
### 5. `start_tasks()` - Array Element Extraction (TODO)
222177

223178
**File**: `pkgs/core/schemas/0120_function_start_tasks.sql`
224179

@@ -229,7 +184,7 @@ pnpm nx verify-migrations core
229184
- Dependent maps: extract from aggregated dependency output
230185
- Single steps: unchanged behavior (keep existing logic)
231186

232-
### 6. `maybe_complete_run()` - Output Aggregation (TODO: PR #213)
187+
### 6. `maybe_complete_run()` - Output Aggregation (TODO)
233188

234189
**File**: `pkgs/core/schemas/0100_function_maybe_complete_run.sql`
235190

@@ -250,13 +205,26 @@ pnpm nx verify-migrations core
250205
- map_dependency_limit.test.sql
251206
- map_step_with_no_deps.test.sql
252207

253-
**PR #209-214:** Each PR will include its own comprehensive test suite covering:
208+
**PR #209 (CURRENT):**
209+
210+
- root_map_array_validation.test.sql
211+
- root_map_initial_tasks.test.sql
212+
- mixed_step_types.test.sql
213+
- multiple_root_maps.test.sql
214+
- null_input_validation.test.sql
215+
- large_array_handling.test.sql
216+
- nested_array_handling.test.sql
217+
- mixed_type_arrays.test.sql
218+
- invalid_json_types.test.sql
219+
- flow_only_maps.test.sql
220+
221+
**Subsequent PRs:** Each will include its own comprehensive test suite covering:
254222

255223
- Function-specific tests
256224
- Edge cases and error handling
257225
- Integration with existing functionality
258226

259-
**PR #214:** Final integration test suite covering end-to-end workflows
227+
**Final PR:** Integration test suite covering end-to-end workflows
260228

261229
## Edge Cases Handled
262230

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ create table pgflow.step_states (
2727
step_slug text not null,
2828
status text not null default 'created',
2929
remaining_tasks int null, -- NULL = not started, >0 = active countdown
30-
initial_tasks int default 1 check (initial_tasks >= 0), -- Planned task count: 1 for singles, N for maps
30+
initial_tasks int not null default 1 check (initial_tasks >= 0), -- Planned task count: 1 for singles, N for maps
3131
remaining_deps int not null default 0 check (remaining_deps >= 0),
3232
error_message text,
3333
created_at timestamptz not null default now(),

pkgs/core/schemas/0100_function_start_flow.sql

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,36 @@ volatile
1010
as $$
1111
declare
1212
v_created_run pgflow.runs%ROWTYPE;
13+
v_root_map_count int;
1314
begin
1415

16+
-- Check for root map steps and validate input
17+
WITH root_maps AS (
18+
SELECT step_slug
19+
FROM pgflow.steps
20+
WHERE steps.flow_slug = start_flow.flow_slug
21+
AND steps.step_type = 'map'
22+
AND steps.deps_count = 0
23+
)
24+
SELECT COUNT(*) INTO v_root_map_count FROM root_maps;
25+
26+
-- If we have root map steps, validate that input is an array
27+
IF v_root_map_count > 0 THEN
28+
-- First check for NULL (should be caught by NOT NULL constraint, but be defensive)
29+
IF start_flow.input IS NULL THEN
30+
RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug;
31+
END IF;
32+
33+
-- Then check if it's not an array
34+
IF jsonb_typeof(start_flow.input) != 'array' THEN
35+
RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)',
36+
start_flow.flow_slug, jsonb_typeof(start_flow.input);
37+
END IF;
38+
END IF;
39+
1540
WITH
1641
flow_steps AS (
17-
SELECT steps.flow_slug, steps.step_slug, steps.deps_count
42+
SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count
1843
FROM pgflow.steps
1944
WHERE steps.flow_slug = start_flow.flow_slug
2045
),
@@ -35,7 +60,19 @@ WITH
3560
(SELECT created_run.run_id FROM created_run),
3661
fs.step_slug,
3762
fs.deps_count,
38-
1 -- For now, all steps get initial_tasks = 1 (single steps)
63+
-- For root map steps (map with no deps), set initial_tasks to array length
64+
-- For all other steps, set initial_tasks to 1
65+
CASE
66+
WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN
67+
CASE
68+
WHEN jsonb_typeof(start_flow.input) = 'array' THEN
69+
jsonb_array_length(start_flow.input)
70+
ELSE
71+
1
72+
END
73+
ELSE
74+
1
75+
END
3976
FROM flow_steps fs
4077
)
4178
SELECT * FROM created_run INTO v_created_run;

pkgs/core/src/database-types.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ export type Database = {
127127
error_message: string | null
128128
failed_at: string | null
129129
flow_slug: string
130-
initial_tasks: number | null
130+
initial_tasks: number
131131
remaining_deps: number
132132
remaining_tasks: number | null
133133
run_id: string
@@ -141,7 +141,7 @@ export type Database = {
141141
error_message?: string | null
142142
failed_at?: string | null
143143
flow_slug: string
144-
initial_tasks?: number | null
144+
initial_tasks?: number
145145
remaining_deps?: number
146146
remaining_tasks?: number | null
147147
run_id: string
@@ -155,7 +155,7 @@ export type Database = {
155155
error_message?: string | null
156156
failed_at?: string | null
157157
flow_slug?: string
158-
initial_tasks?: number | null
158+
initial_tasks?: number
159159
remaining_deps?: number
160160
remaining_tasks?: number | null
161161
run_id?: string
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
-- Modify "step_states" table
2+
ALTER TABLE "pgflow"."step_states" ALTER COLUMN "initial_tasks" SET NOT NULL;
3+
-- Modify "start_flow" function
4+
CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$
5+
declare
6+
v_created_run pgflow.runs%ROWTYPE;
7+
v_root_map_count int;
8+
begin
9+
10+
-- Check for root map steps and validate input
11+
WITH root_maps AS (
12+
SELECT step_slug
13+
FROM pgflow.steps
14+
WHERE steps.flow_slug = start_flow.flow_slug
15+
AND steps.step_type = 'map'
16+
AND steps.deps_count = 0
17+
)
18+
SELECT COUNT(*) INTO v_root_map_count FROM root_maps;
19+
20+
-- If we have root map steps, validate that input is an array
21+
IF v_root_map_count > 0 THEN
22+
-- First check for NULL (should be caught by NOT NULL constraint, but be defensive)
23+
IF start_flow.input IS NULL THEN
24+
RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug;
25+
END IF;
26+
27+
-- Then check if it's not an array
28+
IF jsonb_typeof(start_flow.input) != 'array' THEN
29+
RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)',
30+
start_flow.flow_slug, jsonb_typeof(start_flow.input);
31+
END IF;
32+
END IF;
33+
34+
WITH
35+
flow_steps AS (
36+
SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count
37+
FROM pgflow.steps
38+
WHERE steps.flow_slug = start_flow.flow_slug
39+
),
40+
created_run AS (
41+
INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps)
42+
VALUES (
43+
COALESCE(start_flow.run_id, gen_random_uuid()),
44+
start_flow.flow_slug,
45+
start_flow.input,
46+
(SELECT count(*) FROM flow_steps)
47+
)
48+
RETURNING *
49+
),
50+
created_step_states AS (
51+
INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks)
52+
SELECT
53+
fs.flow_slug,
54+
(SELECT created_run.run_id FROM created_run),
55+
fs.step_slug,
56+
fs.deps_count,
57+
-- For root map steps (map with no deps), set initial_tasks to array length
58+
-- For all other steps, set initial_tasks to 1
59+
CASE
60+
WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN
61+
CASE
62+
WHEN jsonb_typeof(start_flow.input) = 'array' THEN
63+
jsonb_array_length(start_flow.input)
64+
ELSE
65+
1
66+
END
67+
ELSE
68+
1
69+
END
70+
FROM flow_steps fs
71+
)
72+
SELECT * FROM created_run INTO v_created_run;
73+
74+
-- Send broadcast event for run started
75+
PERFORM realtime.send(
76+
jsonb_build_object(
77+
'event_type', 'run:started',
78+
'run_id', v_created_run.run_id,
79+
'flow_slug', v_created_run.flow_slug,
80+
'input', v_created_run.input,
81+
'status', 'started',
82+
'remaining_steps', v_created_run.remaining_steps,
83+
'started_at', v_created_run.started_at
84+
),
85+
'run:started',
86+
concat('pgflow:run:', v_created_run.run_id),
87+
false
88+
);
89+
90+
PERFORM pgflow.start_ready_steps(v_created_run.run_id);
91+
92+
RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id;
93+
94+
end;
95+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:p+C5rFGJ1qtyZhk/6hw+wKNa8qxySt48uRW7nURXhb4=
1+
h1:4npyXHLdBlxXDvZBwovtYW/ZqGaaZprPdyxEsmO4Iz0=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -9,3 +9,4 @@ h1:p+C5rFGJ1qtyZhk/6hw+wKNa8qxySt48uRW7nURXhb4=
99
20250707210212_pgflow_add_opt_start_delay.sql h1:11J7SDgS6EVFUwxSi0bRZnNQgVGTV0EJGj9yuC0vczY=
1010
20250719205006_pgflow_worker_deprecation.sql h1:L3LDsVrUeABlRBXhHsu60bilfgDKEJHci5xWknH9XIg=
1111
20250912075001_pgflow_temp_pr1_schema.sql h1:zVvGuRX/m8uPFCuJ7iAqOQ71onkCtze6P9d9ZsOgs98=
12+
20250912080800_pgflow_temp_pr2_root_maps.sql h1:v2KdChKBPBOIq3nCVVtKWy1OVcIROV+tPtaTUPQujSo=

0 commit comments

Comments
 (0)