Skip to content

Commit c1c4c34

Browse files
committed
chore: update PLAN.md with current feature and task statuses
Reflects ongoing work on map step output aggregation, related migration, testing, and documentation efforts, along with current implementation status and pending tasks for map step support in the project.
1 parent 53ca34b commit c1c4c34

32 files changed

+3515
-72
lines changed

.claude/commands/help-review.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Your job is to help me understand changes made to $ARGUMENTS by line/section changed.
2+
**Current branch:** !`git branch --show-current`
3+
If there is `PLAN.md` or `PLAN_<somethign_related_to_current_branch>.md`, read it before starting.
4+
5+
Here is the diff of the changes:
6+
7+
<changes>
8+
!`git show -p --no-ext-diff -- $ARGUMENTS`
9+
</changes>

.claude/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"Bash(./scripts/atlas-migrate-hash:*)",
66
"Bash(./scripts/run-test-with-colors:*)",
77
"Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -c:*)",
8+
"Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -f:*)",
89
"Bash(bin/run-test-with-colors:*)",
910
"Bash(cat:*)",
1011
"Bash(cd:*)",
@@ -17,6 +18,8 @@
1718
"Bash(gh run list:*)",
1819
"Bash(gh run view:*)",
1920
"Bash(git rm:*)",
21+
"Bash(git show -p --no-ext-diff --:*)",
22+
"Bash(git whatchanged:*)",
2023
"Bash(grep:*)",
2124
"Bash(ls:*)",
2225
"Bash(mkdir:*)",

.claude/sql_style.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ Always qualify columns and arguments:
1818
- `start_flow.run_id` not just `run_id` in functions
1919

2020
## Keyword Arguments
21-
Use `param => "value"` NOT `param := "value"`
21+
Use `param => "value"` NOT `param := "value"`
22+
- Note on aliasing tables: when writing SQL functions and working with dependencies/dependents and steps and states, I want you to build your aliases such that you use parent/child prefixes and _step (for pgflow.steps) or _state (for pgflow.step_states) suffixes accordingly. dep should mean a row in pgflow.deps, not a parent dependency. do not use dep to indicate a row from steps or step_states.

PLAN.md

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,21 @@
22

33
**NOTE: This PLAN.md file should be removed in the final PR once all map infrastructure is complete.**
44

5-
### Current State
5+
### Features
66

7-
-**WORKING**: Empty array maps (taskless) cascade and complete correctly
8-
-**WORKING**: Task spawning creates N tasks with correct indices
9-
-**WORKING**: Dependency count propagation for map steps
10-
-**WORKING**: Array element extraction - tasks get full array instead of individual items
11-
-**MISSING**: Output aggregation - no way to combine map task outputs for dependents
7+
-**DONE**: Empty array maps (taskless) cascade and complete correctly
8+
-**DONE**: Task spawning creates N tasks with correct indices
9+
-**DONE**: Dependency count propagation for map steps
10+
-**DONE**: Array element extraction - tasks receive individual array elements
11+
-**DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents
12+
-**NEXT**: DSL support for `.map()` for defining map steps
13+
14+
### Chores
15+
16+
-**WAITING**: Integration tests for map steps
17+
-**WAITING**: Consolidated migration for map steps
18+
-**WAITING**: Documentation for map steps
19+
-**WAITING**: Graphite stack merge for map steps
1220

1321
## Implementation Status
1422

@@ -67,16 +75,15 @@
6775
- Handles both root maps (from run input) and dependent maps (from step outputs)
6876
- Tests with actual array data processing
6977

70-
#### ❌ Remaining Work
78+
- [x] **PR #217: Output Aggregation** - `09-17-add-map-step-output-aggregation` (THIS PR)
7179

72-
- [ ] **Output Aggregation** (CRITICAL - BLOCKS MAP OUTPUT CONSUMPTION)
80+
- Inline aggregation implementation in complete_task, start_tasks, maybe_complete_run
81+
- Full test coverage (17 tests) for all aggregation scenarios
82+
- Handles NULL preservation, empty arrays, order preservation
83+
- Validates non-array outputs to map steps fail correctly
84+
- Fixed broadcast aggregation to send full array not individual task output
7385

74-
- Aggregate map task outputs when step completes
75-
- Store aggregated output for dependent steps to consume
76-
- Maintain task_index ordering in aggregated arrays
77-
- Tests for aggregation with actual map task outputs
78-
- **IMPORTANT**: Must add test for map->map NULL propagation when this is implemented
79-
- **IMPORTANT**: Must handle non-array outputs to map steps (should fail the run)
86+
#### ❌ Remaining Work
8087

8188
- [ ] **DSL Support for .map() Step Type**
8289

@@ -93,6 +100,30 @@
93100
- Type safety for input/output types
94101
- Compile-time enforcement of single dependency rule
95102

103+
- [ ] **Fix Orphaned Messages on Run Failure**
104+
105+
- Archive all pending messages when run fails
106+
- Handle map sibling tasks specially
107+
- Fix type constraint violations to fail immediately without retries
108+
- See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md)
109+
- Critical for production: prevents queue performance degradation
110+
- Tests already written (stashed) that document the problem
111+
112+
- [ ] **Performance Optimization: step_states.output Column**
113+
114+
- Migrate from inline aggregation to storing outputs in step_states
115+
- See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md)
116+
- Benefits:
117+
- Eliminate redundant aggregation queries
118+
- 30-70% performance improvement for map chains
119+
- Cleaner architecture with single source of truth
120+
- Implementation:
121+
- Add output column to step_states table
122+
- Update complete_task to populate output on completion
123+
- Simplify consumers (start_tasks, maybe_complete_run, broadcasts)
124+
- Update all aggregation tests (~17 files)
125+
- **Note**: This is an optimization that should be done after core functionality is stable
126+
96127
- [ ] **Integration Tests**
97128

98129
- End-to-end workflows with real array data

PLAN_orphaned_messages.md

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# Plan: Fix Orphaned Messages on Run Failure
2+
3+
## Problem Statement
4+
5+
When a run fails, messages for pending tasks remain in the queue indefinitely, causing:
6+
1. **Resource waste**: Workers continuously poll orphaned messages
7+
2. **Performance degradation**: Queue operations slow down over time
8+
3. **Map step issues**: Failing one map task leaves N-1 sibling messages orphaned
9+
4. **Type violations**: Deterministic errors retry unnecessarily
10+
11+
## Current Behavior
12+
13+
### When fail_task is called
14+
```sql
15+
-- Only archives the single failing task's message
16+
SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id);
17+
-- Leaves all other queued messages orphaned
18+
```
19+
20+
### When type constraint violation occurs
21+
```sql
22+
-- Raises exception, causes retries
23+
RAISE EXCEPTION 'Map step % expects array input...';
24+
-- Transaction rolls back, but retries will hit same error
25+
```
26+
27+
## Implementation Plan
28+
29+
### 1. Update fail_task Function
30+
**File**: `pkgs/core/schemas/0100_function_fail_task.sql`
31+
32+
Add after marking run as failed (around line 47):
33+
```sql
34+
-- Archive all pending messages for this run
35+
WITH tasks_to_archive AS (
36+
SELECT t.msg_id
37+
FROM pgflow.step_tasks t
38+
WHERE t.run_id = fail_task.run_id
39+
AND t.status = 'pending'
40+
AND t.msg_id IS NOT NULL
41+
)
42+
SELECT pgmq.archive('pgflow_tasks_queue', msg_id)
43+
FROM tasks_to_archive;
44+
```
45+
46+
### 2. Update complete_task for Type Violations
47+
**File**: `pkgs/core/schemas/0100_function_complete_task.sql`
48+
49+
Replace the current RAISE EXCEPTION block (lines 115-120) with:
50+
```sql
51+
IF v_dependent_map_slug IS NOT NULL THEN
52+
-- Mark run as failed immediately (no retries for type violations)
53+
UPDATE pgflow.runs
54+
SET status = 'failed',
55+
failed_at = now(),
56+
error = format('Type contract violation: Map step %s expects array input but dependency %s produced %s (output: %s)',
57+
v_dependent_map_slug,
58+
complete_task.step_slug,
59+
CASE WHEN complete_task.output IS NULL THEN 'null'
60+
ELSE jsonb_typeof(complete_task.output) END,
61+
complete_task.output)
62+
WHERE run_id = complete_task.run_id;
63+
64+
-- Archive ALL pending messages for this run
65+
PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id)
66+
FROM pgflow.step_tasks t
67+
WHERE t.run_id = complete_task.run_id
68+
AND t.status = 'pending'
69+
AND t.msg_id IS NOT NULL;
70+
71+
-- Mark the current task as failed (not completed)
72+
UPDATE pgflow.step_tasks
73+
SET status = 'failed',
74+
failed_at = now(),
75+
error_message = format('Type contract violation: produced %s instead of array',
76+
CASE WHEN complete_task.output IS NULL THEN 'null'
77+
ELSE jsonb_typeof(complete_task.output) END)
78+
WHERE run_id = complete_task.run_id
79+
AND step_slug = complete_task.step_slug
80+
AND task_index = complete_task.task_index;
81+
82+
-- Return empty result set (task not completed)
83+
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
84+
RETURN;
85+
END IF;
86+
```
87+
88+
### 3. Add Supporting Index
89+
**File**: New migration or add to existing
90+
91+
```sql
92+
-- Speed up the archiving query
93+
CREATE INDEX IF NOT EXISTS idx_step_tasks_pending_with_msg
94+
ON pgflow.step_tasks(run_id, status)
95+
WHERE status = 'pending' AND msg_id IS NOT NULL;
96+
```
97+
98+
## Testing
99+
100+
### Tests Already Written (Stashed)
101+
102+
1. **`supabase/tests/fail_task/archive_sibling_map_tasks.test.sql`**
103+
- Verifies all map task messages are archived when one fails
104+
- Tests: 8 assertions about message archiving and status
105+
106+
2. **`supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql`**
107+
- Verifies type violations archive all pending messages
108+
- Tests: 8 assertions about queue cleanup and run status
109+
110+
### How to Run Tests
111+
```bash
112+
# After unstashing and implementing the fixes:
113+
pnpm nx test:pgtap core -- supabase/tests/fail_task/archive_sibling_map_tasks.test.sql
114+
pnpm nx test:pgtap core -- supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql
115+
```
116+
117+
## Migration Considerations
118+
119+
### Backward Compatibility
120+
- New behavior only affects failed runs (safe)
121+
- Archiving preserves messages (can be recovered if needed)
122+
- No schema changes to existing tables
123+
124+
### Performance Impact
125+
- One-time cost during failure (acceptable)
126+
- Prevents ongoing performance degradation (improvement)
127+
- Index ensures archiving query is efficient
128+
129+
### Rollback Plan
130+
If issues arise:
131+
1. Remove the archiving logic
132+
2. Messages remain in queue (old behavior)
133+
3. No data loss since we archive, not delete
134+
135+
## Edge Cases to Consider
136+
137+
### 1. Concurrent Task Completion
138+
If multiple tasks complete/fail simultaneously:
139+
- PostgreSQL row locks ensure consistency
140+
- Each failure archives all pending messages
141+
- Idempotent: archiving already-archived messages is safe
142+
143+
### 2. Very Large Map Steps
144+
For maps with 1000+ tasks:
145+
- Archiving might take several seconds
146+
- Consider batching if performance issues arise
147+
- Current approach should handle up to ~10k tasks reasonably
148+
149+
### 3. Mixed Step Types
150+
When run has both map and single steps:
151+
- Archive logic handles all pending tasks regardless of type
152+
- Correctly archives both map siblings and unrelated pending tasks
153+
154+
## Future Enhancements (Not for this PR)
155+
156+
1. **Selective Archiving**: Only archive tasks that can't proceed
157+
2. **Batch Operations**: Archive in chunks for very large runs
158+
3. **Recovery Mechanism**: Function to unarchive and retry
159+
4. **Monitoring**: Track archived message counts for alerting
160+
161+
## Success Criteria
162+
163+
- [ ] All tests pass (both new test files)
164+
- [ ] No orphaned messages after run failure
165+
- [ ] Type violations don't retry
166+
- [ ] Performance acceptable for maps with 100+ tasks
167+
- [ ] No impact on successful run performance
168+
169+
## Implementation Checklist
170+
171+
- [ ] Update `fail_task` function
172+
- [ ] Update `complete_task` function
173+
- [ ] Add database index
174+
- [ ] Unstash and run tests
175+
- [ ] Test with large map steps (100+ tasks)
176+
- [ ] Update migration file
177+
- [ ] Document behavior change in function comments
178+
179+
## Notes
180+
181+
- This fix is **critical for production** - without it, queue performance will degrade over time
182+
- Type violations are **deterministic** - retrying them is always wasteful
183+
- Archiving (vs deleting) preserves debugging capability
184+
- The fix is relatively simple (~30 lines of SQL) but high impact

0 commit comments

Comments
 (0)