Skip to content

Commit b009fb7

Browse files
committed
test: add integration tests for flows with map steps (#222)
Introduce new tests covering root map, dependent map, and empty array map scenarios. Verify correct execution, task creation, and output aggregation for various flow configurations.
1 parent 1f84152 commit b009fb7

File tree

15 files changed

+1497
-182
lines changed

15 files changed

+1497
-182
lines changed

pkgs/core/src/PgflowSqlClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export class PgflowSqlClient<TFlow extends AnyFlow>
5656
SELECT pgflow.complete_task(
5757
run_id => ${stepTask.run_id}::uuid,
5858
step_slug => ${stepTask.step_slug}::text,
59-
task_index => ${0}::int,
59+
task_index => ${stepTask.task_index}::int,
6060
output => ${this.sql.json(output || null)}::jsonb
6161
);
6262
`;
@@ -74,7 +74,7 @@ export class PgflowSqlClient<TFlow extends AnyFlow>
7474
SELECT pgflow.fail_task(
7575
run_id => ${stepTask.run_id}::uuid,
7676
step_slug => ${stepTask.step_slug}::text,
77-
task_index => ${0}::int,
77+
task_index => ${stepTask.task_index}::int,
7878
error_message => ${errorString}::text
7979
);
8080
`;

pkgs/core/src/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export type StepTaskRecord<TFlow extends AnyFlow> = {
2727
flow_slug: string;
2828
run_id: string;
2929
step_slug: StepSlug;
30+
task_index: number;
3031
input: Simplify<StepInput<TFlow, StepSlug>>;
3132
msg_id: number;
3233
};
@@ -36,7 +37,7 @@ export type StepTaskRecord<TFlow extends AnyFlow> = {
3637
* Composite key that is enough to find a particular step task
3738
* Contains only the minimum fields needed to identify a task
3839
*/
39-
export type StepTaskKey = Pick<StepTaskRecord<AnyFlow>, 'run_id' | 'step_slug'>;
40+
export type StepTaskKey = Pick<StepTaskRecord<AnyFlow>, 'run_id' | 'step_slug' | 'task_index'>;
4041

4142

4243

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
begin;
2+
select plan(5);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test 1: Single step returns task_index 0
6+
select pgflow.create_flow('single_task');
7+
select pgflow.add_step('single_task', 'step1');
8+
select pgflow.start_flow('single_task', '{"data": "test"}'::jsonb);
9+
10+
-- Ensure worker and read message
11+
select pgflow_tests.ensure_worker('single_task');
12+
13+
with msgs as (
14+
select * from pgflow.read_with_poll('single_task', 10, 5, 1, 50) limit 1
15+
),
16+
msg_ids as (
17+
select array_agg(msg_id) as ids from msgs
18+
),
19+
started_tasks as (
20+
select * from pgflow.start_tasks(
21+
'single_task',
22+
(select ids from msg_ids),
23+
'11111111-1111-1111-1111-111111111111'::uuid
24+
)
25+
)
26+
select is(
27+
(select task_index from started_tasks),
28+
0,
29+
'Single step task should have task_index 0'
30+
);
31+
32+
-- Test 2: Map step with array of 3 elements returns correct task_index for each
33+
select pgflow_tests.reset_db();
34+
select pgflow.create_flow('map_flow');
35+
select pgflow.add_step('map_flow', 'map_step', '{}', null, null, null, null, 'map');
36+
select pgflow.start_flow('map_flow', '[1, 2, 3]'::jsonb);
37+
38+
-- Ensure worker
39+
select pgflow_tests.ensure_worker('map_flow');
40+
41+
-- Read all 3 messages
42+
with msgs as (
43+
select * from pgflow.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id
44+
),
45+
msg_ids as (
46+
select array_agg(msg_id order by msg_id) as ids from msgs
47+
),
48+
started_tasks as (
49+
select * from pgflow.start_tasks(
50+
'map_flow',
51+
(select ids from msg_ids),
52+
'11111111-1111-1111-1111-111111111111'::uuid
53+
) order by task_index
54+
)
55+
select is(
56+
array_agg(task_index order by task_index),
57+
ARRAY[0, 1, 2],
58+
'Map step tasks should have sequential task_index values'
59+
) from started_tasks;
60+
61+
-- Test 3: Map step with 5 elements returns correct task_index values
62+
select pgflow_tests.reset_db();
63+
select pgflow.create_flow('map_five');
64+
select pgflow.add_step('map_five', 'mapper', '{}', null, null, null, null, 'map');
65+
select pgflow.start_flow('map_five', '["a", "b", "c", "d", "e"]'::jsonb);
66+
67+
-- Ensure worker
68+
select pgflow_tests.ensure_worker('map_five');
69+
70+
-- Read all 5 messages
71+
with msgs as (
72+
select * from pgflow.read_with_poll('map_five', 10, 5, 5, 50) order by msg_id
73+
),
74+
msg_ids as (
75+
select array_agg(msg_id order by msg_id) as ids from msgs
76+
),
77+
started_tasks as (
78+
select * from pgflow.start_tasks(
79+
'map_five',
80+
(select ids from msg_ids),
81+
'11111111-1111-1111-1111-111111111111'::uuid
82+
) order by task_index
83+
)
84+
select is(
85+
array_agg(task_index order by task_index),
86+
ARRAY[0, 1, 2, 3, 4],
87+
'Map step with 5 elements should have task_index 0-4'
88+
) from started_tasks;
89+
90+
-- Test 4: Dependent map step preserves task_index
91+
select pgflow_tests.reset_db();
92+
select pgflow.create_flow('map_chain');
93+
select pgflow.add_step('map_chain', 'first', '{}', null, null, null, null, 'map');
94+
select pgflow.add_step('map_chain', 'second', ARRAY['first'], null, null, null, null, 'map');
95+
select pgflow.start_flow('map_chain', '[10, 20]'::jsonb);
96+
97+
-- Complete first map tasks
98+
select pgflow_tests.ensure_worker('map_chain');
99+
-- Complete task index 0
100+
with poll_result as (
101+
select * from pgflow_tests.read_and_start('map_chain', 1, 1) limit 1
102+
)
103+
select pgflow.complete_task(
104+
run_id,
105+
step_slug,
106+
task_index,
107+
jsonb_build_object('value', (input::int) * 2)
108+
) from poll_result;
109+
-- Complete task index 1
110+
with poll_result as (
111+
select * from pgflow_tests.read_and_start('map_chain', 1, 1) limit 1
112+
)
113+
select pgflow.complete_task(
114+
run_id,
115+
step_slug,
116+
task_index,
117+
jsonb_build_object('value', (input::int) * 2)
118+
) from poll_result;
119+
120+
-- Now read and start second map tasks
121+
select pgflow_tests.ensure_worker('map_chain', '22222222-2222-2222-2222-222222222222'::uuid);
122+
with msgs as (
123+
select * from pgflow.read_with_poll('map_chain', 10, 5, 2, 50) order by msg_id
124+
),
125+
msg_ids as (
126+
select array_agg(msg_id order by msg_id) as ids from msgs
127+
),
128+
started_tasks as (
129+
select * from pgflow.start_tasks(
130+
'map_chain',
131+
(select ids from msg_ids),
132+
'22222222-2222-2222-2222-222222222222'::uuid
133+
) order by task_index
134+
)
135+
select is(
136+
array_agg(task_index order by task_index),
137+
ARRAY[0, 1],
138+
'Dependent map step should preserve task_index from parent'
139+
) from started_tasks;
140+
141+
-- Test 5: Multiple single steps in sequence all have task_index 0
142+
select pgflow_tests.reset_db();
143+
select pgflow.create_flow('sequential');
144+
select pgflow.add_step('sequential', 'step_a');
145+
select pgflow.add_step('sequential', 'step_b', ARRAY['step_a']);
146+
select pgflow.add_step('sequential', 'step_c', ARRAY['step_b']);
147+
select pgflow.start_flow('sequential', '{"test": true}'::jsonb);
148+
149+
-- Process step_a
150+
select pgflow_tests.ensure_worker('sequential');
151+
with poll_result as (
152+
select * from pgflow_tests.read_and_start('sequential', 1, 1)
153+
)
154+
select pgflow.complete_task(
155+
run_id,
156+
step_slug,
157+
task_index,
158+
'{"result": "a"}'::jsonb
159+
) from poll_result;
160+
161+
-- Process step_b
162+
select pgflow_tests.ensure_worker('sequential', '33333333-3333-3333-3333-333333333333'::uuid);
163+
with msgs as (
164+
select * from pgflow.read_with_poll('sequential', 10, 5, 1, 50) limit 1
165+
),
166+
msg_ids as (
167+
select array_agg(msg_id) as ids from msgs
168+
),
169+
started_tasks as (
170+
select * from pgflow.start_tasks(
171+
'sequential',
172+
(select ids from msg_ids),
173+
'33333333-3333-3333-3333-333333333333'::uuid
174+
)
175+
)
176+
select is(
177+
(select task_index from started_tasks),
178+
0,
179+
'Sequential single steps should all have task_index 0'
180+
);
181+
182+
select finish();
183+
rollback;

pkgs/edge-worker/src/core/context.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ import { deepClone, deepFreeze } from './deepUtils.js';
6363
export function createContextSafeConfig<T extends Record<string, unknown>>(
6464
config: T
6565
): Readonly<T extends { sql: unknown } ? Omit<T, 'sql'> : T> {
66-
const { sql: _sql, ...safeConfig } = config as T & { sql?: unknown };
66+
const { sql, ...safeConfig } = config as T & { sql?: unknown };
67+
void sql;
6768
const clonedConfig = deepClone(safeConfig);
6869
return deepFreeze(clonedConfig) as Readonly<T extends { sql: unknown } ? Omit<T, 'sql'> : T>;
6970
}

pkgs/edge-worker/src/examples/supabase-flow-example.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const myFlow = new Flow({ slug: 'supabase_example' })
1313
})
1414
.step({ slug: 'notify_admin', dependsOn: ['query_users'] }, async (input, ctx) => {
1515
// Supabase client available with service role access
16-
const { data: _data, error } = await ctx.supabase
16+
const { error } = await ctx.supabase
1717
.from('admin_notifications')
1818
.insert({
1919
message: `Found ${input.query_users.users.length} active users`,
@@ -25,7 +25,7 @@ const myFlow = new Flow({ slug: 'supabase_example' })
2525
})
2626
.step({ slug: 'public_update', dependsOn: ['query_users'] }, async (input, ctx) => {
2727
// Use the same client for all operations
28-
const { data: _data } = await ctx.supabase
28+
await ctx.supabase
2929
.from('public_stats')
3030
.update({ last_user_count: input.query_users.users.length })
3131
.eq('id', 1);

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
6969

7070
const sql =
7171
config.sql ||
72-
postgres(config.connectionString!, {
72+
postgres(config.connectionString as string, {
7373
max: config.maxPgConnections ?? DEFAULT_FLOW_CONFIG.maxPgConnections,
7474
prepare: false,
7575
});

pkgs/edge-worker/src/queue/createQueueWorker.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ function normalizeQueueConfig(config: QueueWorkerConfig, sql: postgres.Sql): Res
6969
validateRetryConfig(retryConfig);
7070

7171
// Strip deprecated fields before merging
72-
const { retryDelay: _rd, retryLimit: _rl, ...rest } = config;
72+
const { retryDelay, retryLimit, ...rest } = config;
73+
void retryDelay;
74+
void retryLimit;
7375
return {
7476
connectionString: '',
7577
...DEFAULT_QUEUE_CONFIG,

0 commit comments

Comments
 (0)