Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .claude/schema_development.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,21 @@ git rm -f supabase/migrations/*_pgflow_{TEMP,temp}_*.sql
**Temp Migrations**: Use TEMP_ prefix for stacked PRs, remove before final merge, CI enforces this
**Avoid**: Manual migration edits, forgetting to remove old migration, skipping hash reset, failing tests, mixing changes, merging temp migrations to main

### Performance-First SQL Design

**Use Section Comments Instead of Helper Functions**: Keep complex functions monolithic for performance. Use clear section comments:

```sql
-- ==========================================
-- MAIN SECTION: Description
-- ==========================================
WITH
-- ---------- Subsection ----------
cte_name AS (...)
```

Avoids function call overhead, preserves CTE optimization, simpler atomicity.

## Troubleshooting

### Migration name exists
Expand Down
47 changes: 37 additions & 10 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@
- Array validation and count propagation working
- Cascade handles taskless dependent maps

- [x] **PR #213: NULL for Unknown initial_tasks** - `09-16-make-initial-tasks-nullable`
- Changed initial_tasks from "1 as placeholder" to NULL for dependent map steps
- Benefits: Semantic correctness (NULL = unknown, not "1 task")
- Implemented: Schema change to allow NULL, updated all SQL functions
- Added validation for non-array and NULL outputs to map steps
- Comprehensive tests for NULL behavior and error cases

#### ❌ Remaining Work

- [ ] **Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE)
Expand All @@ -102,6 +109,8 @@
- Store aggregated output for dependent steps to consume
- Maintain task_index ordering in aggregated arrays
- Tests for aggregation with actual map task outputs
- **IMPORTANT**: Must add test for map->map NULL propagation when this is implemented
- **IMPORTANT**: Must handle non-array outputs to map steps (should fail the run)

- [ ] **DSL Support for .map() Step Type**

Expand All @@ -124,23 +133,41 @@
- Basic happy path coverage
- This should be minimal and added to the Edge Worker integration test suite for now

- [ ] **Semantic Improvement: NULL for Unknown initial_tasks** (OPTIONAL - Can be deferred)

- Change initial_tasks from "1 as placeholder" to NULL for dependent map steps
- Benefits: Semantic correctness (NULL = unknown, not "1 task")
- Scope: Schema change to allow NULL, update 5+ SQL functions
- See detailed plan in `pkgs/core/PLAN_use_null_for_map_initial_tasks.md`
- **Note**: This is a semantic improvement only - current approach works functionally
- **Warning**: If deferred, new tests for Array Distribution and Output Aggregation will
assume initial_tasks = 1 for dependent maps, making this change harder later

- [ ] **Migration Consolidation**

- Remove all temporary/incremental migrations from feature branches
- Generate a single consolidated migration for the entire map infrastructure
- Ensure clean migration path from current production schema
- If NULL improvement is done, include it in the consolidated migration

- [ ] **Update README's** and **Docs**

- `pkgs/core/README.md`

- Add new section describing the step types
- Describe single step briefly, focus on describing map step type and how it differs
- Make sure to mention that maps are constrained to have exactly one dependency
- Show multiple cases of inputs -> task creation
- Explain edge cases (empty array propagation, invalid array input)
- Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input
- Explain cascade completion of taskless steps and its limitations

- `pkgs/dsl/README.md`

- Briefly describe the new `.array()` and `.map()` methods
- Mention `.array` is mostly sugar, and `.map` is the new step type
- Link to `pkgs/core/README.md` sections for more explanation about map steps
- Make sure to mention that maps are constrained to have exactly one dependency

- **Add basic docs page**

- put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx`
- describe the DSL and how the map works and why we need it
- show example usage of root map
- show example usage of dependent map
- focus mostly on how to use it, instead of how it works under the hood
- link to the README's for more details

- [ ] **Graphite Stack Merge**

- Configure Graphite merge queue for the complete PR stack
Expand Down
195 changes: 0 additions & 195 deletions pkgs/core/PLAN_use_null_for_map_initial_tasks.md

This file was deleted.

5 changes: 4 additions & 1 deletion pkgs/core/schemas/0060_tables_runtime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ create table pgflow.step_states (
step_slug text not null,
status text not null default 'created',
remaining_tasks int null, -- NULL = not started, >0 = active countdown
initial_tasks int not null default 1 check (initial_tasks >= 0), -- Planned task count: 1 for singles, N for maps
initial_tasks int null check (initial_tasks is null or initial_tasks >= 0),
remaining_deps int not null default 0 check (remaining_deps >= 0),
error_message text,
created_at timestamptz not null default now(),
Expand All @@ -43,6 +43,9 @@ create table pgflow.step_states (
constraint remaining_tasks_state_consistency check (
remaining_tasks is null or status != 'created'
),
constraint initial_tasks_known_when_started check (
status != 'started' or initial_tasks is not null
),
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at),
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@ DECLARE
v_iterations int := 0;
v_max_iterations int := 50;
BEGIN
-- ==========================================
-- ITERATIVE CASCADE COMPLETION
-- ==========================================
-- Completes taskless steps in waves until none remain
LOOP
-- Safety counter to prevent infinite loops
-- ---------- Safety check ----------
v_iterations := v_iterations + 1;
IF v_iterations > v_max_iterations THEN
RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations;
END IF;

-- ==========================================
-- COMPLETE READY TASKLESS STEPS
-- ==========================================
WITH completed AS (
-- Complete all ready taskless steps in topological order
-- ---------- Complete taskless steps ----------
-- Steps with initial_tasks=0 and no remaining deps
UPDATE pgflow.step_states ss
SET status = 'completed',
started_at = now(),
Expand All @@ -32,19 +40,20 @@ BEGIN
-- Process in topological order to ensure proper cascade
RETURNING ss.*
),
-- ---------- Update dependent steps ----------
-- Propagate completion and empty arrays to dependents
dep_updates AS (
-- Update remaining_deps and initial_tasks for dependents of completed steps
UPDATE pgflow.step_states ss
SET remaining_deps = ss.remaining_deps - dep_count.count,
-- If the dependent is a map step and its dependency completed with 0 tasks,
-- set its initial_tasks to 0 as well
initial_tasks = CASE
WHEN s.step_type = 'map' AND dep_count.has_zero_tasks
THEN 0
ELSE ss.initial_tasks
THEN 0 -- Empty array propagation
ELSE ss.initial_tasks -- Keep existing value (including NULL)
END
FROM (
-- Count how many completed steps are dependencies of each dependent
-- Aggregate dependency updates per dependent step
SELECT
d.flow_slug,
d.step_slug as dependent_slug,
Expand All @@ -62,8 +71,8 @@ BEGIN
AND s.flow_slug = ss.flow_slug
AND s.step_slug = ss.step_slug
),
-- ---------- Update run counters ----------
run_updates AS (
-- Update run's remaining_steps count
UPDATE pgflow.runs r
SET remaining_steps = r.remaining_steps - c.completed_count,
status = CASE
Expand All @@ -80,9 +89,10 @@ BEGIN
WHERE r.run_id = cascade_complete_taskless_steps.run_id
AND c.completed_count > 0
)
-- ---------- Check iteration results ----------
SELECT COUNT(*) INTO v_iteration_completed FROM completed;

EXIT WHEN v_iteration_completed = 0;
EXIT WHEN v_iteration_completed = 0; -- No more steps to complete
v_total_completed := v_total_completed + v_iteration_completed;
END LOOP;

Expand Down
Loading