Skip to content

Commit f462cf0

Browse files
committed
feat: Add scripts and tests for task spawning and step execution in flow management
- Introduced collect_perf_data.sh for performance testing of large array handling - Updated start_ready_steps function to handle empty map steps and initialize task states - Added migration script to modify start_ready_steps for correct task spawning - Created tests for map step message queueing, delayed message scheduling, and task spawning - Ensured proper handling of initial_tasks, task indices, and step status transitions - Included tests for both map and single steps to verify correct task creation and message dispatching
1 parent 2e5052c commit f462cf0

File tree

8 files changed

+883
-80
lines changed

8 files changed

+883
-80
lines changed

PLAN.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,19 @@
1818
- Enhanced start_flow() for root map validation and count setting
1919
- Tests for root map scenarios
2020

21-
- [ ] **Task Spawning**
21+
- [x] **PR #210: Task Spawning** - `09-12-task-spawning` (COMPLETED)
2222

2323
- Enhanced start_ready_steps() for N task generation
2424
- Empty array auto-completion
2525
- Tests for batch task creation
2626

27+
- [ ] **Cascade Complete Taskless Steps**
28+
29+
- Extract taskless completion from start_ready_steps()
30+
- Add cascade capability for chains of taskless steps
31+
- Generic solution for all initial_tasks=0 steps
32+
- See PLAN_cascade_complete_taskless_steps.md for details
33+
2734
- [ ] **Array Element Extraction**
2835

2936
- Enhanced start_tasks() for map input extraction
@@ -47,6 +54,14 @@
4754
- Edge case coverage
4855
- Performance validation
4956

57+
- [ ] **Performance Benchmarking Suite**
58+
- Dedicated benchmark functions separate from tests
59+
- Measure task spawning at various scales (100, 1K, 10K, 100K elements)
60+
- Track performance metrics: spawn time, memory usage, queue throughput
61+
- Non-blocking CI workflow that posts results as PR comment
62+
- Runs independently from test suite to avoid timeouts
63+
- Provides visibility without blocking merges
64+
5065
## Overview
5166

5267
This implementation establishes the SQL-level foundation for map step functionality, building on PR #207's completed `.array()` method. It focuses exclusively on the database schema and SQL Core layer, providing the infrastructure needed for parallel task spawning, execution, and result aggregation.
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
# PLAN: Cascade Complete Taskless Steps
2+
3+
## Problem Statement
4+
5+
Steps with `initial_tasks = 0` need immediate completion without task execution. When such a step completes, its dependents may become ready - and if those dependents are also taskless, they should complete immediately as well, creating a cascade effect.
6+
7+
Currently, this cascade doesn't happen, leaving taskless steps in a "ready but not completed" state.
8+
9+
## Current State
10+
11+
`start_ready_steps` currently contains logic to complete empty map steps (taskless), but:
12+
- It only handles the immediate step, not cascading to dependents
13+
- This logic is mixed with task spawning concerns
14+
- It can't handle chains of taskless steps
15+
16+
This plan extracts that logic into a dedicated function and adds cascade capability.
17+
18+
## Taskless Step Types
19+
20+
### Current
21+
- **Empty array maps**: Map steps receiving `[]` input
22+
23+
### Future (generic design)
24+
- **Condition gates**: Evaluate JSONP conditions, route without execution
25+
- **Validators**: Check constraints, pass/fail instantly
26+
- **Aggregators**: Might receive 0 inputs to aggregate
27+
- **Routers**: Direct flow based on input, no processing needed
28+
29+
The solution must be **generic** - not checking `step_type` but relying on `initial_tasks = 0`.
30+
31+
## Edge Cases & Patterns
32+
33+
### Chain cascade
34+
```
35+
A (taskless) → B (taskless) → C (taskless) → D (normal)
36+
```
37+
All taskless steps complete instantly, then D starts.
38+
39+
### Fan-in pattern
40+
```
41+
A (normal) ⟋
42+
→ C (taskless) → D (normal)
43+
B (normal) ⟌
44+
```
45+
C completes only when BOTH A and B complete.
46+
47+
### Mixed cascade
48+
```
49+
A (normal) → B (taskless) → C (taskless) → D (normal) → E (taskless)
50+
```
51+
- B,C cascade when A completes
52+
- E completes instantly when D completes
53+
- Two separate cascade events
54+
55+
### Entire flow taskless
56+
```
57+
Validate → Route → Log
58+
```
59+
Entire flow completes synchronously in `start_flow` call.
60+
61+
## Proposed Solution
62+
63+
### Performance Analysis - Corrected
64+
65+
Initial analysis was **incorrect**. After code review, the actual situation:
66+
67+
1. **complete_task** calls **start_ready_steps** on EVERY task completion
68+
- For 10k tasks = 10,000 calls to start_ready_steps
69+
70+
2. **BUT** dependent steps' `remaining_deps` only decrements when STEP completes
71+
- Happens ONCE when all tasks done, not 10,000 times
72+
73+
3. **Cascade would check 10k times but find nothing 9,999 times**
74+
- Tasks 1-9,999: cascade checks, finds no ready taskless steps
75+
- Task 10,000: cascade finds chain ready, runs 50 iterations ONCE
76+
77+
**Real impact**: 10,000 wasted checks + 50 iterations = **10,050 operations** (not 500,000!)
78+
79+
### Call Site Heat Analysis
80+
81+
| Call Site | Heat Level | When Cascade Needed | Actual Frequency |
82+
|-----------|------------|---------------------|------------------|
83+
| **start_flow()** | 🧊 COLD | Always check | Once per workflow |
84+
| **complete_task()** | 🔥🔥🔥 HOT | Only when step completes | Once per step (not task!) |
85+
| **start_ready_steps()** | 🔥 HOT | Never - wrong place | N/A |
86+
87+
### PRIMARY SOLUTION: Simple Conditional Cascade
88+
89+
Only call cascade when a step actually completes:
90+
91+
```sql
92+
-- In complete_task, after line 91
93+
IF v_step_state.status = 'completed' THEN
94+
-- Step just completed, cascade any ready taskless steps
95+
PERFORM cascade_complete_taskless_steps(run_id);
96+
97+
-- Send broadcast event (existing code)
98+
PERFORM realtime.send(...);
99+
END IF;
100+
101+
-- Remove cascade from start_ready_steps entirely
102+
```
103+
104+
This reduces cascade calls from 10,000 (every task) to 1 (when step completes)!
105+
106+
### The Cascade Function
107+
108+
Use a simple loop that completes all ready taskless steps:
109+
110+
```sql
111+
CREATE OR REPLACE FUNCTION pgflow.cascade_complete_taskless_steps(run_id uuid)
112+
RETURNS int
113+
LANGUAGE plpgsql
114+
AS $$
115+
DECLARE
116+
v_total_completed int := 0;
117+
v_iteration_completed int;
118+
BEGIN
119+
LOOP
120+
WITH completed AS (
121+
UPDATE pgflow.step_states
122+
SET status = 'completed',
123+
started_at = now(),
124+
completed_at = now(),
125+
remaining_tasks = 0
126+
WHERE step_states.run_id = cascade_complete_taskless_steps.run_id
127+
AND status = 'created'
128+
AND remaining_deps = 0
129+
AND initial_tasks = 0
130+
RETURNING *
131+
),
132+
dep_updates AS (
133+
UPDATE pgflow.step_states ss
134+
SET remaining_deps = ss.remaining_deps - 1
135+
FROM completed c
136+
JOIN pgflow.deps d ON d.flow_slug = c.flow_slug
137+
AND d.dep_slug = c.step_slug
138+
WHERE ss.run_id = c.run_id
139+
AND ss.step_slug = d.step_slug
140+
),
141+
-- Send realtime events and update run count...
142+
SELECT COUNT(*) INTO v_iteration_completed FROM completed;
143+
144+
EXIT WHEN v_iteration_completed = 0;
145+
v_total_completed := v_total_completed + v_iteration_completed;
146+
END LOOP;
147+
148+
RETURN v_total_completed;
149+
END;
150+
$$;
151+
```
152+
153+
**Performance**: 50 iterations once per step completion is acceptable
154+
155+
### Integration Points
156+
157+
```sql
158+
-- In start_flow (COLD PATH)
159+
PERFORM cascade_complete_taskless_steps(run_id);
160+
PERFORM start_ready_steps(run_id);
161+
162+
-- In complete_task (HOT PATH - but only when step completes)
163+
IF step_completed THEN
164+
PERFORM cascade_complete_taskless_steps(run_id);
165+
END IF;
166+
167+
-- NOT in start_ready_steps - that was the wrong place
168+
```
169+
170+
### Why Other Approaches Fail
171+
172+
#### Recursive CTE: PostgreSQL Limitations
173+
- ❌ Cannot use subqueries referencing recursive CTE
174+
- ❌ Cannot use NOT EXISTS with recursive reference
175+
- ❌ Cannot use aggregates on recursive reference
176+
- ❌ Cannot check "all dependencies satisfied" condition
177+
178+
#### One-Wave Approach: Weird Coupling
179+
- ❌ Creates strange dependencies between unrelated steps
180+
- ❌ Filter2 would complete when some UNRELATED step's task completes
181+
- ❌ Confusing semantics and hard to debug
182+
183+
#### Calling from start_ready_steps: Wrong Layer
184+
- ❌ Would check for cascade on EVERY task (10,000 times)
185+
- ❌ 9,999 wasted checks finding nothing
186+
- ❌ Wrong separation of concerns
187+
188+
#### No Cascade: Steps Never Complete
189+
- ❌ Taskless steps have no tasks to complete them
190+
- ❌ Would remain stuck in 'created' state forever
191+
192+
### Performance Summary
193+
194+
| Approach | Calls | Operations | Result |
195+
|----------|-------|------------|--------|
196+
| **Initial (wrong) analysis** | 10,000 | 500,000 | 🔴 Catastrophic |
197+
| **Cascade in start_ready_steps** | 10,000 | 10,050 | 🟡 Wasteful |
198+
| **Conditional cascade (solution)** | 1 | 50 | 🟢 Optimal |
199+
200+
The simple conditional approach is **200x better** than calling from start_ready_steps and **10,000x better** than the initially feared scenario.
201+
202+
### Realtime Events
203+
204+
Each completed step needs to send a realtime event. Add to the loop:
205+
206+
```sql
207+
-- Send realtime events for completed steps
208+
broadcast AS (
209+
SELECT realtime.send(
210+
jsonb_build_object(
211+
'event_type', 'step:completed',
212+
'run_id', c.run_id,
213+
'step_slug', c.step_slug,
214+
'status', 'completed',
215+
'started_at', c.started_at,
216+
'completed_at', c.completed_at,
217+
'output', '[]'::jsonb -- Empty output for taskless
218+
),
219+
concat('step:', c.step_slug, ':completed'),
220+
concat('pgflow:run:', c.run_id),
221+
false
222+
)
223+
FROM completed c
224+
)
225+
```
226+
227+
### Integration Points
228+
229+
```sql
230+
-- In start_flow
231+
PERFORM cascade_complete_taskless_steps(run_id) -- First
232+
PERFORM start_ready_steps(run_id) -- Second
233+
234+
-- In complete_task
235+
-- After completing task and updating dependents:
236+
PERFORM cascade_complete_taskless_steps(run_id) -- First
237+
PERFORM start_ready_steps(run_id) -- Second
238+
```
239+
240+
## Testing Strategy
241+
242+
Create dedicated test folder: `pkgs/core/supabase/tests/cascade_taskless/`
243+
244+
### Test cases needed
245+
246+
1. **Basic cascade**: Chain of 3 taskless steps
247+
2. **Fan-in**: Multiple deps converging on taskless step
248+
3. **Mixed flow**: Alternating taskless and normal steps
249+
4. **Empty array maps**: Current use case
250+
5. **Entire taskless flow**: Should complete synchronously
251+
6. **No cascade**: Single taskless step with normal dependent
252+
7. **Realtime events**: Verify each completed step sends event
253+
254+
### Test-First Development
255+
256+
1. Write failing test for simplest case
257+
2. Implement minimal cascade logic
258+
3. Add complex pattern test
259+
4. Extend implementation
260+
5. Repeat until all patterns covered
261+
262+
## Benefits
263+
264+
- **Generic**: Handles all taskless step types, current and future
265+
- **Decoupled**: Clear separation of concerns
266+
- **Efficient**: Batch operations, minimal queries
267+
- **Future-proof**: Ready for worker process separation
268+
- **Testable**: Each function has single responsibility
269+
270+
## Migration Notes
271+
272+
- No schema changes needed
273+
- Pure function additions
274+
- Backward compatible
275+
- Can be deployed independently

0 commit comments

Comments
 (0)