Skip to content

Commit ece6b73

Browse files
committed
test: add integration tests for flow mapping and array handling
Introduce new tests covering root map, dependent map, and empty array map scenarios. Verify correct execution, task creation, and output aggregation for various flow configurations.
1 parent 5543ded commit ece6b73

File tree

17 files changed

+1505
-202
lines changed

17 files changed

+1505
-202
lines changed

pkgs/core/src/PgflowSqlClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export class PgflowSqlClient<TFlow extends AnyFlow>
5656
SELECT pgflow.complete_task(
5757
run_id => ${stepTask.run_id}::uuid,
5858
step_slug => ${stepTask.step_slug}::text,
59-
task_index => ${0}::int,
59+
task_index => ${stepTask.task_index}::int,
6060
output => ${this.sql.json(output || null)}::jsonb
6161
);
6262
`;
@@ -74,7 +74,7 @@ export class PgflowSqlClient<TFlow extends AnyFlow>
7474
SELECT pgflow.fail_task(
7575
run_id => ${stepTask.run_id}::uuid,
7676
step_slug => ${stepTask.step_slug}::text,
77-
task_index => ${0}::int,
77+
task_index => ${stepTask.task_index}::int,
7878
error_message => ${errorString}::text
7979
);
8080
`;

pkgs/core/src/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export type StepTaskRecord<TFlow extends AnyFlow> = {
2727
flow_slug: string;
2828
run_id: string;
2929
step_slug: StepSlug;
30+
task_index: number;
3031
input: Simplify<StepInput<TFlow, StepSlug>>;
3132
msg_id: number;
3233
};
@@ -36,7 +37,7 @@ export type StepTaskRecord<TFlow extends AnyFlow> = {
3637
* Composite key that is enough to find a particular step task
3738
* Contains only the minimum fields needed to identify a task
3839
*/
39-
export type StepTaskKey = Pick<StepTaskRecord<any>, 'run_id' | 'step_slug'>;
40+
export type StepTaskKey = Pick<StepTaskRecord<any>, 'run_id' | 'step_slug' | 'task_index'>;
4041

4142

4243

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
begin;
2+
select plan(5);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test 1: Single step returns task_index 0
6+
select pgflow.create_flow('single_task');
7+
select pgflow.add_step('single_task', 'step1');
8+
select pgflow.start_flow('single_task', '{"data": "test"}'::jsonb);
9+
10+
-- Ensure worker and read message
11+
select pgflow_tests.ensure_worker('single_task');
12+
13+
with msgs as (
14+
select * from pgflow.read_with_poll('single_task', 10, 5, 1, 50) limit 1
15+
),
16+
msg_ids as (
17+
select array_agg(msg_id) as ids from msgs
18+
),
19+
started_tasks as (
20+
select * from pgflow.start_tasks(
21+
'single_task',
22+
(select ids from msg_ids),
23+
'11111111-1111-1111-1111-111111111111'::uuid
24+
)
25+
)
26+
select is(
27+
(select task_index from started_tasks),
28+
0,
29+
'Single step task should have task_index 0'
30+
);
31+
32+
-- Test 2: Map step with array of 3 elements returns correct task_index for each
33+
select pgflow_tests.reset_db();
34+
select pgflow.create_flow('map_flow');
35+
select pgflow.add_step('map_flow', 'map_step', '{}', null, null, null, null, 'map');
36+
select pgflow.start_flow('map_flow', '[1, 2, 3]'::jsonb);
37+
38+
-- Ensure worker
39+
select pgflow_tests.ensure_worker('map_flow');
40+
41+
-- Read all 3 messages
42+
with msgs as (
43+
select * from pgflow.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id
44+
),
45+
msg_ids as (
46+
select array_agg(msg_id order by msg_id) as ids from msgs
47+
),
48+
started_tasks as (
49+
select * from pgflow.start_tasks(
50+
'map_flow',
51+
(select ids from msg_ids),
52+
'11111111-1111-1111-1111-111111111111'::uuid
53+
) order by task_index
54+
)
55+
select is(
56+
array_agg(task_index order by task_index),
57+
ARRAY[0, 1, 2],
58+
'Map step tasks should have sequential task_index values'
59+
) from started_tasks;
60+
61+
-- Test 3: Map step with 5 elements returns correct task_index values
62+
select pgflow_tests.reset_db();
63+
select pgflow.create_flow('map_five');
64+
select pgflow.add_step('map_five', 'mapper', '{}', null, null, null, null, 'map');
65+
select pgflow.start_flow('map_five', '["a", "b", "c", "d", "e"]'::jsonb);
66+
67+
-- Ensure worker
68+
select pgflow_tests.ensure_worker('map_five');
69+
70+
-- Read all 5 messages
71+
with msgs as (
72+
select * from pgflow.read_with_poll('map_five', 10, 5, 5, 50) order by msg_id
73+
),
74+
msg_ids as (
75+
select array_agg(msg_id order by msg_id) as ids from msgs
76+
),
77+
started_tasks as (
78+
select * from pgflow.start_tasks(
79+
'map_five',
80+
(select ids from msg_ids),
81+
'11111111-1111-1111-1111-111111111111'::uuid
82+
) order by task_index
83+
)
84+
select is(
85+
array_agg(task_index order by task_index),
86+
ARRAY[0, 1, 2, 3, 4],
87+
'Map step with 5 elements should have task_index 0-4'
88+
) from started_tasks;
89+
90+
-- Test 4: Dependent map step preserves task_index
91+
select pgflow_tests.reset_db();
92+
select pgflow.create_flow('map_chain');
93+
select pgflow.add_step('map_chain', 'first', '{}', null, null, null, null, 'map');
94+
select pgflow.add_step('map_chain', 'second', ARRAY['first'], null, null, null, null, 'map');
95+
select pgflow.start_flow('map_chain', '[10, 20]'::jsonb);
96+
97+
-- Complete first map tasks
98+
select pgflow_tests.ensure_worker('map_chain');
99+
-- Complete task index 0
100+
with poll_result as (
101+
select * from pgflow_tests.read_and_start('map_chain', 1, 1) limit 1
102+
)
103+
select pgflow.complete_task(
104+
run_id,
105+
step_slug,
106+
task_index,
107+
jsonb_build_object('value', (input::int) * 2)
108+
) from poll_result;
109+
-- Complete task index 1
110+
with poll_result as (
111+
select * from pgflow_tests.read_and_start('map_chain', 1, 1) limit 1
112+
)
113+
select pgflow.complete_task(
114+
run_id,
115+
step_slug,
116+
task_index,
117+
jsonb_build_object('value', (input::int) * 2)
118+
) from poll_result;
119+
120+
-- Now read and start second map tasks
121+
select pgflow_tests.ensure_worker('map_chain', '22222222-2222-2222-2222-222222222222'::uuid);
122+
with msgs as (
123+
select * from pgflow.read_with_poll('map_chain', 10, 5, 2, 50) order by msg_id
124+
),
125+
msg_ids as (
126+
select array_agg(msg_id order by msg_id) as ids from msgs
127+
),
128+
started_tasks as (
129+
select * from pgflow.start_tasks(
130+
'map_chain',
131+
(select ids from msg_ids),
132+
'22222222-2222-2222-2222-222222222222'::uuid
133+
) order by task_index
134+
)
135+
select is(
136+
array_agg(task_index order by task_index),
137+
ARRAY[0, 1],
138+
'Dependent map step should preserve task_index from parent'
139+
) from started_tasks;
140+
141+
-- Test 5: Multiple single steps in sequence all have task_index 0
142+
select pgflow_tests.reset_db();
143+
select pgflow.create_flow('sequential');
144+
select pgflow.add_step('sequential', 'step_a');
145+
select pgflow.add_step('sequential', 'step_b', ARRAY['step_a']);
146+
select pgflow.add_step('sequential', 'step_c', ARRAY['step_b']);
147+
select pgflow.start_flow('sequential', '{"test": true}'::jsonb);
148+
149+
-- Process step_a
150+
select pgflow_tests.ensure_worker('sequential');
151+
with poll_result as (
152+
select * from pgflow_tests.read_and_start('sequential', 1, 1)
153+
)
154+
select pgflow.complete_task(
155+
run_id,
156+
step_slug,
157+
task_index,
158+
'{"result": "a"}'::jsonb
159+
) from poll_result;
160+
161+
-- Process step_b
162+
select pgflow_tests.ensure_worker('sequential', '33333333-3333-3333-3333-333333333333'::uuid);
163+
with msgs as (
164+
select * from pgflow.read_with_poll('sequential', 10, 5, 1, 50) limit 1
165+
),
166+
msg_ids as (
167+
select array_agg(msg_id) as ids from msgs
168+
),
169+
started_tasks as (
170+
select * from pgflow.start_tasks(
171+
'sequential',
172+
(select ids from msg_ids),
173+
'33333333-3333-3333-3333-333333333333'::uuid
174+
)
175+
)
176+
select is(
177+
(select task_index from started_tasks),
178+
0,
179+
'Sequential single steps should all have task_index 0'
180+
);
181+
182+
select finish();
183+
rollback;

pkgs/edge-worker/src/__tests__/types/flow-compatibility.test-d.ts

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,14 @@
11
import { describe, it } from 'vitest';
2-
import { Flow, type Context } from '@pgflow/dsl';
2+
import { Flow } from '@pgflow/dsl';
33
import { EdgeWorker } from '../../EdgeWorker.js';
4-
import type { Sql } from 'postgres';
5-
import type { SupabaseClient } from '@supabase/supabase-js';
6-
7-
// Mock types for testing
8-
interface TestRedis {
9-
get: (key: string) => Promise<string | null>;
10-
set: (key: string, value: string) => Promise<void>;
11-
}
124

135
describe('Flow Compatibility Type Tests', () => {
146
it('should accept flows that only use platform resources', () => {
157
const flow = new Flow({ slug: 'platform_only' })
16-
.step({ slug: 'query' }, (_input, _ctx: Context<{ sql: Sql }>) => {
8+
.step({ slug: 'query' }, () => {
179
return { result: 'data' };
1810
})
19-
.step({ slug: 'auth' }, (_input, _ctx: Context<{ supabase: SupabaseClient }>) => {
11+
.step({ slug: 'auth' }, () => {
2012
return { authenticated: true };
2113
});
2214

@@ -38,7 +30,7 @@ describe('Flow Compatibility Type Tests', () => {
3830

3931
it('should reject flows that require non-platform resources', () => {
4032
const flow = new Flow({ slug: 'custom_resource' })
41-
.step({ slug: 'cache' }, (_input, _ctx: Context<{ redis: TestRedis }>) => {
33+
.step({ slug: 'cache' }, () => {
4234
return { cached: true };
4335
});
4436

@@ -48,13 +40,10 @@ describe('Flow Compatibility Type Tests', () => {
4840

4941
it('should work with mixed platform resources', () => {
5042
const flow = new Flow({ slug: 'mixed_platform' })
51-
.step({ slug: 'query' }, (_input, _ctx: Context<{ sql: Sql }>) => {
43+
.step({ slug: 'query' }, () => {
5244
return { data: [] };
5345
})
54-
.step({ slug: 'store' }, (_input, _ctx: Context<{
55-
sql: Sql,
56-
supabase: SupabaseClient
57-
}>) => {
46+
.step({ slug: 'store' }, () => {
5847
return { stored: true };
5948
});
6049

pkgs/edge-worker/src/core/context.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ import { deepClone, deepFreeze } from './deepUtils.js';
6363
export function createContextSafeConfig<T extends Record<string, unknown>>(
6464
config: T
6565
): Readonly<T extends { sql: unknown } ? Omit<T, 'sql'> : T> {
66-
const { sql: _sql, ...safeConfig } = config as T & { sql?: unknown };
66+
const { sql, ...safeConfig } = config as T & { sql?: unknown };
67+
void sql;
6768
const clonedConfig = deepClone(safeConfig);
6869
return deepFreeze(clonedConfig) as Readonly<T extends { sql: unknown } ? Omit<T, 'sql'> : T>;
6970
}

pkgs/edge-worker/src/examples/supabase-flow-example.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const myFlow = new Flow({ slug: 'supabase_example' })
1313
})
1414
.step({ slug: 'notify_admin' }, async (input, ctx) => {
1515
// Supabase client available with service role access
16-
const { data: _data, error } = await ctx.supabase
16+
const { error } = await ctx.supabase
1717
.from('admin_notifications')
1818
.insert({
1919
message: `Found ${input.query_users.users.length} active users`,
@@ -25,7 +25,7 @@ const myFlow = new Flow({ slug: 'supabase_example' })
2525
})
2626
.step({ slug: 'public_update' }, async (input, ctx) => {
2727
// Use the same client for all operations
28-
const { data: _data } = await ctx.supabase
28+
await ctx.supabase
2929
.from('public_stats')
3030
.update({ last_user_count: input.query_users.users.length })
3131
.eq('id', 1);

pkgs/edge-worker/src/examples/type-check-example.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import { Flow } from '@pgflow/dsl';
22
import { EdgeWorker } from '../EdgeWorker.js';
33
import type { Sql } from 'postgres';
4-
import type { SupabaseClient } from '@supabase/supabase-js';
54

65
// Example 1: Flow using only platform resources - should work
76
const validFlow = new Flow({ slug: 'valid_flow' })
87
.step({ slug: 'query' }, async (_input, ctx: { sql: Sql }) => {
98
const result = await ctx.sql`SELECT * FROM users`;
109
return { users: result };
1110
})
12-
.step({ slug: 'notify' }, (_input, _ctx: { supabase: SupabaseClient }) => {
11+
.step({ slug: 'notify' }, () => {
1312
// Use Supabase client for operations
1413
return { notified: true };
1514
});
@@ -19,7 +18,7 @@ EdgeWorker.start(validFlow);
1918

2019
// Example 2: Flow using non-existent resources - should fail
2120
const invalidFlow = new Flow({ slug: 'invalid_flow' })
22-
.step({ slug: 'cache' }, (_input, _ctx: { redis: unknown }) => {
21+
.step({ slug: 'cache' }, () => {
2322
// Platform doesn't provide redis
2423
return { cached: true };
2524
});

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
6969

7070
const sql =
7171
config.sql ||
72-
postgres(config.connectionString!, {
72+
postgres(config.connectionString as string, {
7373
max: config.maxPgConnections ?? DEFAULT_FLOW_CONFIG.maxPgConnections,
7474
prepare: false,
7575
});

pkgs/edge-worker/src/queue/createQueueWorker.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ function normalizeQueueConfig(config: QueueWorkerConfig, sql: postgres.Sql): Res
6969
validateRetryConfig(retryConfig);
7070

7171
// Strip deprecated fields before merging
72-
const { retryDelay: _rd, retryLimit: _rl, ...rest } = config;
72+
const { retryDelay, retryLimit, ...rest } = config;
73+
void retryDelay;
74+
void retryLimit;
7375
return {
7476
connectionString: '',
7577
...DEFAULT_QUEUE_CONFIG,

0 commit comments

Comments
 (0)