Skip to content

Commit 3de38b1

Browse files
committed
chore: make initial_tasks NULL by default to indicate 'unknown yet'
1 parent 474b8fd commit 3de38b1

15 files changed

+1126
-77
lines changed

PLAN.md

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@
8989

9090
#### ❌ Remaining Work
9191

92+
- [ ] **Semantic Improvement: NULL for Unknown initial_tasks**
93+
94+
- Change initial_tasks from "1 as placeholder" to NULL for dependent map steps
95+
- Benefits: Semantic correctness (NULL = unknown, not "1 task")
96+
- Scope: Schema change to allow NULL, update 5+ SQL functions
97+
- See detailed plan in `pkgs/core/PLAN_use_null_for_map_initial_tasks.md`
98+
- **Note**: This is a semantic improvement only - current approach works functionally
99+
But it is important to have only valid states and being explicit, that's why
100+
we are doing this change first.
101+
92102
- [ ] **Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE)
93103

94104
- Enhanced start_tasks() to distribute array elements to map tasks
@@ -102,6 +112,8 @@
102112
- Store aggregated output for dependent steps to consume
103113
- Maintain task_index ordering in aggregated arrays
104114
- Tests for aggregation with actual map task outputs
115+
- **IMPORTANT**: Must add test for map->map NULL propagation when this is implemented
116+
- **IMPORTANT**: Must handle non-array outputs to map steps (should fail the run)
105117

106118
- [ ] **DSL Support for .map() Step Type**
107119

@@ -124,23 +136,41 @@
124136
- Basic happy path coverage
125137
- This should be minimal and added to the Edge Worker integration test suite for now
126138

127-
- [ ] **Semantic Improvement: NULL for Unknown initial_tasks** (OPTIONAL - Can be deferred)
128-
129-
- Change initial_tasks from "1 as placeholder" to NULL for dependent map steps
130-
- Benefits: Semantic correctness (NULL = unknown, not "1 task")
131-
- Scope: Schema change to allow NULL, update 5+ SQL functions
132-
- See detailed plan in `pkgs/core/PLAN_use_null_for_map_initial_tasks.md`
133-
- **Note**: This is a semantic improvement only - current approach works functionally
134-
- **Warning**: If deferred, new tests for Array Distribution and Output Aggregation will
135-
assume initial_tasks = 1 for dependent maps, making this change harder later
136-
137139
- [ ] **Migration Consolidation**
138140

139141
- Remove all temporary/incremental migrations from feature branches
140142
- Generate a single consolidated migration for the entire map infrastructure
141143
- Ensure clean migration path from current production schema
142144
- If NULL improvement is done, include it in the consolidated migration
143145

146+
- [ ] **Update README's** and **Docs**
147+
148+
- `pkgs/core/README.md`
149+
150+
- Add new section describing the step types
151+
- Describe single step briefly, focus on describing map step type and how it differs
152+
- Make sure to mention that maps are constrained to have exactly one dependency
153+
- Show multiple cases of inputs -> task creation
154+
- Explain edge cases (empty array propagation, invalid array input)
155+
- Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input
156+
- Explain cascade completion of taskless steps and its limitations
157+
158+
- `pkgs/dsl/README.md`
159+
160+
- Briefly describe the new `.array()` and `.map()` methods
161+
- Mention `.array` is mostly sugar, and `.map` is the new step type
162+
- Link to `pkgs/core/README.md` sections for more explanation about map steps
163+
- Make sure to mention that maps are constrained to have exactly one dependency
164+
165+
- **Add basic docs page**
166+
167+
- put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx`
168+
- describe the DSL and how the map works and why we need it
169+
- show example usage of root map
170+
- show example usage of dependent map
171+
- focus mostly on how to use it, instead of how it works under the hood
172+
- link to the README's for more details
173+
144174
- [ ] **Graphite Stack Merge**
145175

146176
- Configure Graphite merge queue for the complete PR stack

pkgs/core/PLAN_use_null_for_map_initial_tasks.md renamed to PLAN_use_null_for_map_initial_tasks.md

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,31 @@
11
# Plan: Use NULL for Unknown initial_tasks in Dependent Map Steps
22

33
## Motivation
4+
45
Currently, dependent map steps have `initial_tasks = 1` as a placeholder until their dependencies complete. This is semantically incorrect and confusing:
6+
57
- `1` implies "will spawn exactly 1 task" but that's false
68
- `NULL` correctly means "unknown until dependencies complete"
79
- Reduces cognitive load - see NULL, know it's unknown
810

911
## Critical Considerations
12+
1013
Before implementing, these issues must be addressed:
1114

12-
### 1. ~~The "Last Dependency" Problem~~ RESOLVED
13-
**Map steps can have at most 1 dependency** (enforced in add_step.sql:24):
14-
```sql
15-
-- This constraint simplifies everything:
16-
IF step_type = 'map' AND array_length(deps_slugs, 1) > 1 THEN
17-
RAISE EXCEPTION 'Map step can have at most one dependency'
18-
```
19-
This means we always know exactly when to resolve `initial_tasks` - when the single dependency completes!
15+
### 1. Non-Array Output Handling
2016

21-
### 2. Non-Array Output Handling
2217
What if a dependency doesn't produce an array?
23-
```sql
24-
-- Must handle both cases:
25-
CASE
26-
WHEN jsonb_typeof(output) = 'array' THEN jsonb_array_length(output)
27-
ELSE 1 -- Treat non-array as single item to map
28-
END
29-
```
3018

31-
### 3. ~~Race Condition Prevention~~ RESOLVED
32-
Since map steps have at most 1 dependency, no race conditions possible!
33-
The single dependency completes once, updates initial_tasks atomically.
19+
We should chat about how to handle this case, as it is an invalid state
20+
and we need to handle it somehow.
21+
22+
We should probably raise an explicit, well versed and detailed error,
23+
so user's flow will fail early.
24+
25+
### 2. The Start Ready Steps Problem
3426

35-
### 4. The Start Ready Steps Problem
3627
**CRITICAL**: We cannot use COALESCE - steps must NOT start with NULL initial_tasks
28+
3729
```sql
3830
-- WRONG: COALESCE(initial_tasks, 1)
3931
-- RIGHT: Add assertion before starting
@@ -45,6 +37,7 @@ END IF;
4537
## Implementation Plan
4638

4739
### 1. Schema Change
40+
4841
```sql
4942
-- In 0060_tables_runtime.sql
5043
ALTER TABLE pgflow.step_states
@@ -59,6 +52,7 @@ ADD CONSTRAINT step_states_initial_tasks_check
5952
```
6053

6154
### 2. Update start_flow Function
55+
6256
```sql
6357
-- In 0100_function_start_flow.sql
6458
-- Change initial_tasks assignment logic:
@@ -81,6 +75,7 @@ END
8175
```
8276

8377
### 3. Update start_ready_steps Function
78+
8479
```sql
8580
-- In 0100_function_start_ready_steps.sql
8681

@@ -101,6 +96,7 @@ CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1)
10196
```
10297

10398
### 4. Update complete_task Function
99+
104100
```sql
105101
-- In 0100_function_complete_task.sql
106102

@@ -121,6 +117,7 @@ END
121117
```
122118

123119
### 5. Update cascade_complete_taskless_steps Function
120+
124121
```sql
125122
-- In 0100_function_cascade_complete_taskless_steps.sql
126123

@@ -136,6 +133,7 @@ END
136133
```
137134

138135
### 6. Add Safety Assertions
136+
139137
```sql
140138
-- Add check constraint or trigger to ensure:
141139
-- When status changes from 'created' to 'started',
@@ -150,46 +148,31 @@ ADD CONSTRAINT initial_tasks_known_when_started
150148
```
151149

152150
### 7. Update Tests
151+
153152
- Update test expectations to check for NULL instead of 1
154153
- Add specific tests for NULL -> actual value transitions
155154
- Test that steps can't start with NULL initial_tasks
156155

157-
### 8. Migration Strategy
158-
```sql
159-
-- Create migration to update existing data:
160-
UPDATE pgflow.step_states
161-
SET initial_tasks = NULL
162-
WHERE step_slug IN (
163-
SELECT s.step_slug
164-
FROM pgflow.steps s
165-
WHERE s.step_type = 'map'
166-
AND EXISTS (
167-
SELECT 1 FROM pgflow.deps d
168-
WHERE d.flow_slug = s.flow_slug
169-
AND d.step_slug = s.step_slug
170-
)
171-
)
172-
AND status = 'created';
173-
```
174-
175156
## Benefits
157+
176158
1. **Semantic correctness**: NULL = unknown, not "1 task" placeholder
177159
2. **Clearer mental model**: No translation needed when reading state
178160
3. **Easier debugging**: Can immediately see which values are unresolved
179-
4. **Type safety**: TypeScript `number | null` enforces proper handling
180-
5. **Simpler than expected**: Map steps having max 1 dependency eliminates complexity
181161

182162
## Simplified Implementation Path
163+
183164
Since map steps can only have 0 or 1 dependency:
165+
184166
1. Root maps (0 deps): Get initial_tasks from flow input immediately
185167
2. Dependent maps (1 dep): Start with NULL, resolve when dependency completes
186168
3. No multi-dependency complexity or race conditions!
187169

188170
## Testing Checklist
171+
189172
- [ ] Root map steps get correct initial_tasks from input array
190173
- [ ] Dependent map steps start with NULL initial_tasks
191174
- [ ] Single -> Map updates NULL to array length
192175
- [ ] Map -> Map updates NULL to aggregated array length (future)
193176
- [ ] Empty array propagation sets 0, not NULL
194-
- [ ] Steps cannot start with NULL initial_tasks
195-
- [ ] All arithmetic operations handle NULL safely
177+
- [ ] 'single' steps must start with `initial_tasks = 1`
178+
- [ ] If any arithmetic operations on initial_tasks, verify if they handle NULL safely

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ create table pgflow.step_states (
2727
step_slug text not null,
2828
status text not null default 'created',
2929
remaining_tasks int null, -- NULL = not started, >0 = active countdown
30-
initial_tasks int not null default 1 check (initial_tasks >= 0), -- Planned task count: 1 for singles, N for maps
30+
initial_tasks int null check (initial_tasks is null or initial_tasks >= 0),
3131
remaining_deps int not null default 0 check (remaining_deps >= 0),
3232
error_message text,
3333
created_at timestamptz not null default now(),
@@ -43,6 +43,9 @@ create table pgflow.step_states (
4343
constraint remaining_tasks_state_consistency check (
4444
remaining_tasks is null or status != 'created'
4545
),
46+
constraint initial_tasks_known_when_started check (
47+
status != 'started' or initial_tasks is not null
48+
),
4649
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
4750
constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at),
4851
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),

pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ BEGIN
4040
-- set its initial_tasks to 0 as well
4141
initial_tasks = CASE
4242
WHEN s.step_type = 'map' AND dep_count.has_zero_tasks
43-
THEN 0
44-
ELSE ss.initial_tasks
43+
THEN 0 -- Empty array propagation
44+
ELSE ss.initial_tasks -- Keep existing value (including NULL)
4545
END
4646
FROM (
4747
-- Count how many completed steps are dependencies of each dependent

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,32 @@ set search_path to ''
1111
as $$
1212
declare
1313
v_step_state pgflow.step_states%ROWTYPE;
14+
v_dependent_map_slug text;
1415
begin
1516

17+
-- Check if output is non-array for dependent map steps
18+
-- This validation must happen BEFORE acquiring locks and making updates
19+
-- to fail fast without holding resources or starting changes
20+
SELECT ds.step_slug INTO v_dependent_map_slug
21+
FROM pgflow.deps d
22+
JOIN pgflow.steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.step_slug
23+
JOIN pgflow.step_states ss ON ss.flow_slug = ds.flow_slug AND ss.step_slug = ds.step_slug
24+
WHERE d.dep_slug = complete_task.step_slug
25+
AND d.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id)
26+
AND ds.step_type = 'map'
27+
AND ss.run_id = complete_task.run_id
28+
AND ss.initial_tasks IS NULL
29+
AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array')
30+
LIMIT 1;
31+
32+
IF v_dependent_map_slug IS NOT NULL THEN
33+
RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)',
34+
v_dependent_map_slug,
35+
complete_task.step_slug,
36+
CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END,
37+
complete_task.output;
38+
END IF;
39+
1640
WITH run_lock AS (
1741
SELECT * FROM pgflow.runs
1842
WHERE pgflow.runs.run_id = complete_task.run_id
@@ -72,11 +96,15 @@ dependent_steps_lock AS (
7296
dependent_steps_update AS (
7397
UPDATE pgflow.step_states ss
7498
SET remaining_deps = ss.remaining_deps - 1,
75-
-- For map dependents of single steps producing arrays, set initial_tasks
99+
-- NEW: Resolve NULL initial_tasks for dependent map steps
76100
initial_tasks = CASE
77-
WHEN s.step_type = 'map' AND jsonb_typeof(complete_task.output) = 'array'
78-
THEN jsonb_array_length(complete_task.output)
79-
ELSE ss.initial_tasks
101+
WHEN s.step_type = 'map' AND ss.initial_tasks IS NULL
102+
AND complete_task.output IS NOT NULL
103+
AND jsonb_typeof(complete_task.output) = 'array' THEN
104+
-- Resolve NULL to actual value based on output
105+
-- Non-array case is already handled by validation above
106+
jsonb_array_length(complete_task.output)
107+
ELSE ss.initial_tasks -- Keep existing value
80108
END
81109
FROM dependent_steps ds, pgflow.steps s
82110
WHERE ss.run_id = complete_task.run_id

pkgs/core/schemas/0100_function_start_flow.sql

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,21 @@ WITH
6060
(SELECT created_run.run_id FROM created_run),
6161
fs.step_slug,
6262
fs.deps_count,
63-
-- For root map steps (map with no deps), set initial_tasks to array length
64-
-- For all other steps, set initial_tasks to 1
65-
CASE
66-
WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN
67-
CASE
68-
WHEN jsonb_typeof(start_flow.input) = 'array' THEN
63+
-- Updated logic for initial_tasks:
64+
CASE
65+
WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN
66+
-- Root map: get array length from input
67+
CASE
68+
WHEN jsonb_typeof(start_flow.input) = 'array' THEN
6969
jsonb_array_length(start_flow.input)
70-
ELSE
70+
ELSE
7171
1
7272
END
73-
ELSE
73+
WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN
74+
-- Dependent map: unknown until dependencies complete
75+
NULL
76+
ELSE
77+
-- Single steps: always 1 task
7478
1
7579
END
7680
FROM flow_steps fs

pkgs/core/schemas/0100_function_start_ready_steps.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ ready_steps AS (
5858
WHERE step_state.run_id = start_ready_steps.run_id
5959
AND step_state.status = 'created'
6060
AND step_state.remaining_deps = 0
61+
AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count
62+
AND step_state.initial_tasks > 0 -- Don't start taskless steps
6163
-- Exclude empty map steps already handled
6264
AND NOT EXISTS (
63-
SELECT 1 FROM empty_map_steps
64-
WHERE empty_map_steps.run_id = step_state.run_id
65+
SELECT 1 FROM empty_map_steps
66+
WHERE empty_map_steps.run_id = step_state.run_id
6567
AND empty_map_steps.step_slug = step_state.step_slug
6668
)
6769
ORDER BY step_state.step_slug

pkgs/core/src/database-types.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ export type Database = {
127127
error_message: string | null
128128
failed_at: string | null
129129
flow_slug: string
130-
initial_tasks: number
130+
initial_tasks: number | null
131131
remaining_deps: number
132132
remaining_tasks: number | null
133133
run_id: string
@@ -141,7 +141,7 @@ export type Database = {
141141
error_message?: string | null
142142
failed_at?: string | null
143143
flow_slug: string
144-
initial_tasks?: number
144+
initial_tasks?: number | null
145145
remaining_deps?: number
146146
remaining_tasks?: number | null
147147
run_id: string
@@ -155,7 +155,7 @@ export type Database = {
155155
error_message?: string | null
156156
failed_at?: string | null
157157
flow_slug?: string
158-
initial_tasks?: number
158+
initial_tasks?: number | null
159159
remaining_deps?: number
160160
remaining_tasks?: number | null
161161
run_id?: string

0 commit comments

Comments
 (0)