Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .changeset/easy-bats-nail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
'@pgflow/dsl': patch
---

Add `.map()` method to Flow DSL for defining map-type steps

The new `.map()` method enables defining steps that process arrays element-by-element, complementing the existing SQL Core map infrastructure. Key features:

- **Root maps**: Process flow input arrays directly by omitting the `array` property
- **Dependent maps**: Process another step's array output using `array: 'stepSlug'`
- **Type-safe**: Enforces Json-compatible types with full TypeScript inference
- **Compile-time duplicate slug detection**: TypeScript now prevents duplicate step slugs at compile-time
- **Different handler signature**: Receives individual items `(item, context)` instead of full input object
- **Always returns arrays**: Return type is `HandlerReturnType[]`
- **SQL generation**: Correctly adds `step_type => 'map'` parameter to `pgflow.add_step()`

Example usage:

```typescript
// Root map - processes array input
new Flow<string[]>({ slug: 'process' }).map({ slug: 'uppercase' }, (item) =>
item.toUpperCase()
);

// Dependent map - processes another step's output
new Flow<{}>({ slug: 'workflow' })
.array({ slug: 'items' }, () => [1, 2, 3])
.map({ slug: 'double', array: 'items' }, (n) => n * 2);
```
2 changes: 2 additions & 0 deletions .claude/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
"Bash(grep:*)",
"Bash(ls:*)",
"Bash(mkdir:*)",
"Bash(npx tsc:*)",
"Bash(pnpm tsc:*)",
"Bash(pnpm eslint:*)",
"Bash(pnpm nx build:*)",
"Bash(pnpm nx lint:*)",
Expand Down
61 changes: 32 additions & 29 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
- ✅ **DONE**: Dependency count propagation for map steps
- ✅ **DONE**: Array element extraction - tasks receive individual array elements
- ✅ **DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents
- ⏳ **NEXT**: DSL support for `.map()` for defining map steps
- ✅ **DONE**: DSL support for `.map()` for defining map steps with compile-time duplicate detection
- ⏳ **TODO**: Fix orphaned messages on run failure
- ⏳ **TODO**: Performance optimization with step_states.output column

### Chores

- ⏳ **WAITING**: Integration tests for map steps
- ⏳ **WAITING**: Consolidated migration for map steps
- ⏳ **WAITING**: Documentation for map steps
- ⏳ **WAITING**: Graphite stack merge for map steps
- ⏳ **TODO**: Integration tests for map steps
- ⏳ **TODO**: Update core README
- ⏳ **TODO**: Add docs page for array and map steps
- ⏳ **TODO**: Migration consolidation
- ⏳ **TODO**: Graphite stack merge

## Implementation Status

Expand Down Expand Up @@ -83,33 +86,38 @@
- Validates non-array outputs to map steps fail correctly
- Fixed broadcast aggregation to send full array not individual task output

#### ❌ Remaining Work
- [x] **PR #218: DSL Support for .map() Step Type** - `09-18-add-map-support-to-dsl` ✅ COMPLETED

- [ ] **DSL Support for .map() Step Type**

- Add `.map()` method to Flow DSL for defining map steps
- Added `.map()` method to Flow DSL for defining map steps
- Constraints:
- Locked to exactly one dependency (enforced at compile time)
- Dependency must return an array (type-checked)
- Syntax design:
- Dependent maps: `flow.map({ slug: 'stepName', array: 'arrayReturningStep' }, handler)`
- Root maps: Decide between `{ array: 'run' }` or omitting array property
- Root maps: Omit array property
- Return type always inferred as array
- Comprehensive tests:
- Runtime validation of array dependencies
- Type safety for input/output types
- Compile-time enforcement of single dependency rule
- Fixed complex TypeScript type inference issue with overloads
- Added compile-time duplicate slug detection across all DSL methods
- Fixed all linting errors (replaced `{}` with `Record<string, never>`)
- Updated DSL README with .map() documentation
- Created detailed changeset

#### ❌ Remaining Work (Priority Order)

- [ ] **Fix Orphaned Messages on Run Failure**
- [ ] **Priority 1: Fix Orphaned Messages on Run Failure** 🚨 CRITICAL

- Archive all pending messages when run fails
- Handle map sibling tasks specially
- Fix type constraint violations to fail immediately without retries
- See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md)
- Critical for production: prevents queue performance degradation
- **Critical for production: prevents queue performance degradation**
- Tests already written (stashed) that document the problem

- [ ] **Performance Optimization: step_states.output Column**
- [ ] **Priority 2: Performance Optimization - step_states.output Column**

- Migrate from inline aggregation to storing outputs in step_states
- See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md)
Expand All @@ -124,20 +132,13 @@
- Update all aggregation tests (~17 files)
- **Note**: This is an optimization that should be done after core functionality is stable

- [ ] **Integration Tests**
- [ ] **Priority 3: Integration Tests**

- End-to-end workflows with real array data
- Basic happy path coverage
- This should be minimal and added to the Edge Worker integration test suite for now

- [ ] **Migration Consolidation**

- Remove all temporary/incremental migrations from feature branches
- Generate a single consolidated migration for the entire map infrastructure
- Ensure clean migration path from current production schema
- If NULL improvement is done, include it in the consolidated migration

- [ ] **Update README's** and **Docs**
- [ ] **Priority 4: Update core README**

- `pkgs/core/README.md`

Expand All @@ -149,12 +150,7 @@
- Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input
- Explain cascade completion of taskless steps and its limitations

- `pkgs/dsl/README.md`

- Briefly describe the new `.array()` and `.map()` methods
- Mention `.array` is mostly sugar, and `.map` is the new step type
- Link to `pkgs/core/README.md` sections for more explanation about map steps
- Make sure to mention that maps are constrained to have exactly one dependency
- [ ] **Priority 5: Add docs page**

- **Add basic docs page**

Expand All @@ -165,7 +161,14 @@
- focus mostly on how to use it, instead of how it works under the hood
- link to the README's for more details

- [ ] **Graphite Stack Merge**
- [ ] **Priority 6: Migration Consolidation** (Do this last before merge!)

- Remove all temporary/incremental migrations from feature branches
- Generate a single consolidated migration for the entire map infrastructure
- Ensure clean migration path from current production schema
- If NULL improvement is done, include it in the consolidated migration

- [ ] **Priority 7: Graphite Stack Merge**

- Configure Graphite merge queue for the complete PR stack
- Ensure all PRs in sequence can be merged together
Expand Down
77 changes: 77 additions & 0 deletions pkgs/dsl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,78 @@ This design ensures:
- Data doesn't need to be manually forwarded through intermediate steps
- Steps can combine original input with processed data from previous steps

### Step Methods

The Flow DSL provides three methods for defining steps in your workflow:

#### `.step()` - Regular Steps

The standard method for adding steps to a flow. Each step processes input and returns output.

```typescript
.step(
{ slug: 'process', dependsOn: ['previous'] },
async (input) => {
// Access input.run and input.previous
return { result: 'processed' };
}
)
```

#### `.array()` - Array-Returning Steps

A semantic wrapper around `.step()` that provides type enforcement for steps that return arrays. Useful for data fetching or collection steps.

```typescript
.array(
{ slug: 'fetch_items' },
async () => [1, 2, 3, 4, 5]
)
```

#### `.map()` - Array Processing Steps

Processes arrays element-by-element, similar to JavaScript's `Array.map()`. The handler receives individual items instead of the full input object.

```typescript
// Root map - processes flow input array
new Flow<string[]>({ slug: 'process_strings' })
.map({ slug: 'uppercase' }, (item) => item.toUpperCase());

// Dependent map - processes another step's output
new Flow<{}>({ slug: 'data_pipeline' })
.array({ slug: 'numbers' }, () => [1, 2, 3])
.map({ slug: 'double', array: 'numbers' }, (n) => n * 2)
.map({ slug: 'square', array: 'double' }, (n) => n * n);
```

**Key differences from regular steps:**
- Uses `array:` instead of `dependsOn:` for specifying the single array dependency
- Handler signature is `(item, context) => result` instead of `(input, context) => result`
- Return type is always an array
- Generates SQL with `step_type => 'map'` parameter for pgflow's map processing

**Type Safety:**
The `.map()` method provides full TypeScript type inference for array elements:

```typescript
type User = { id: number; name: string };

new Flow<{}>({ slug: 'user_flow' })
.array({ slug: 'users' }, (): User[] => [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
])
.map({ slug: 'greet', array: 'users' }, (user) => {
// TypeScript knows user is of type User
return `Hello, ${user.name} (ID: ${user.id})`;
});
```

**Limitations:**
- Can only depend on a single array-returning step
- TypeScript may not track type transformations between chained maps (use type assertions if needed)

### Context Object

Step handlers can optionally receive a second parameter - the **context object** - which provides access to platform resources and runtime information.
Expand Down Expand Up @@ -114,6 +186,11 @@ All platforms provide these core resources:
msg_id: number;
}
```
- **`context.workerConfig`** - Resolved worker configuration with all defaults applied
```typescript
// Provides access to worker settings like retry limits
const isLastAttempt = context.rawMessage.read_ct >= context.workerConfig.retry.limit;
```

#### Supabase Platform Resources

Expand Down
Loading