From b89cc1df5eb64c53d91de09bc97516f91b246263 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Fri, 19 Sep 2025 12:46:19 +0200 Subject: [PATCH] test: add integration tests for flow mapping and array handling Introduce new tests covering root map, dependent map, and empty array map scenarios. Verify correct execution, task creation, and output aggregation for various flow configurations. --- pkgs/core/src/PgflowSqlClient.ts | 4 +- pkgs/core/src/types.ts | 3 +- .../start_tasks/returns_task_index.test.sql | 183 ++++ pkgs/edge-worker/src/core/context.ts | 3 +- .../src/examples/supabase-flow-example.ts | 4 +- pkgs/edge-worker/src/flow/createFlowWorker.ts | 2 +- .../src/queue/createQueueWorker.ts | 4 +- .../tests/integration/flow/_testHelpers.ts | 216 +++++ .../flow/contextResourcesFlow.test.ts | 34 +- .../tests/integration/flow/mapFlow.test.ts | 214 +++++ .../integration/flow/minimalFlow.test.ts | 101 +-- .../flow/performanceMapFlow.test.ts | 804 ++++++++++++++++++ .../integration/flow/startDelayFlow.test.ts | 101 +-- .../stepTaskExecutorContext.test.ts | 5 + pkgs/example-flows/src/example-flow.ts | 1 + 15 files changed, 1497 insertions(+), 182 deletions(-) create mode 100644 pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql create mode 100644 pkgs/edge-worker/tests/integration/flow/_testHelpers.ts create mode 100644 pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts create mode 100644 pkgs/edge-worker/tests/integration/flow/performanceMapFlow.test.ts diff --git a/pkgs/core/src/PgflowSqlClient.ts b/pkgs/core/src/PgflowSqlClient.ts index ab4507bb6..47a80bb06 100644 --- a/pkgs/core/src/PgflowSqlClient.ts +++ b/pkgs/core/src/PgflowSqlClient.ts @@ -56,7 +56,7 @@ export class PgflowSqlClient SELECT pgflow.complete_task( run_id => ${stepTask.run_id}::uuid, step_slug => ${stepTask.step_slug}::text, - task_index => ${0}::int, + task_index => ${stepTask.task_index}::int, output => ${this.sql.json(output || null)}::jsonb ); `; @@ -74,7 +74,7 @@ export class PgflowSqlClient SELECT pgflow.fail_task( run_id => ${stepTask.run_id}::uuid, step_slug => ${stepTask.step_slug}::text, - task_index => ${0}::int, + task_index => ${stepTask.task_index}::int, error_message => ${errorString}::text ); `; diff --git a/pkgs/core/src/types.ts b/pkgs/core/src/types.ts index da64a82ba..ebc7e5337 100644 --- a/pkgs/core/src/types.ts +++ b/pkgs/core/src/types.ts @@ -27,6 +27,7 @@ export type StepTaskRecord = { flow_slug: string; run_id: string; step_slug: StepSlug; + task_index: number; input: Simplify>; msg_id: number; }; @@ -36,7 +37,7 @@ export type StepTaskRecord = { * Composite key that is enough to find a particular step task * Contains only the minimum fields needed to identify a task */ -export type StepTaskKey = Pick, 'run_id' | 'step_slug'>; +export type StepTaskKey = Pick, 'run_id' | 'step_slug' | 'task_index'>; diff --git a/pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql b/pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql new file mode 100644 index 000000000..b06d0afe3 --- /dev/null +++ b/pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql @@ -0,0 +1,183 @@ +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Test 1: Single step returns task_index 0 +select pgflow.create_flow('single_task'); +select pgflow.add_step('single_task', 'step1'); +select pgflow.start_flow('single_task', '{"data": "test"}'::jsonb); + +-- Ensure worker and read message +select pgflow_tests.ensure_worker('single_task'); + +with msgs as ( + select * from pgflow.read_with_poll('single_task', 10, 5, 1, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'single_task', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +select is( + (select task_index from started_tasks), + 0, + 'Single step task should have task_index 0' +); + +-- Test 2: Map step with array of 3 elements returns correct task_index for each +select pgflow_tests.reset_db(); +select pgflow.create_flow('map_flow'); +select pgflow.add_step('map_flow', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.start_flow('map_flow', '[1, 2, 3]'::jsonb); + +-- Ensure worker +select pgflow_tests.ensure_worker('map_flow'); + +-- Read all 3 messages +with msgs as ( + select * from pgflow.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id +), +msg_ids as ( + select array_agg(msg_id order by msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'map_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) order by task_index +) +select is( + array_agg(task_index order by task_index), + ARRAY[0, 1, 2], + 'Map step tasks should have sequential task_index values' +) from started_tasks; + +-- Test 3: Map step with 5 elements returns correct task_index values +select pgflow_tests.reset_db(); +select pgflow.create_flow('map_five'); +select pgflow.add_step('map_five', 'mapper', '{}', null, null, null, null, 'map'); +select pgflow.start_flow('map_five', '["a", "b", "c", "d", "e"]'::jsonb); + +-- Ensure worker +select pgflow_tests.ensure_worker('map_five'); + +-- Read all 5 messages +with msgs as ( + select * from pgflow.read_with_poll('map_five', 10, 5, 5, 50) order by msg_id +), +msg_ids as ( + select array_agg(msg_id order by msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'map_five', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) order by task_index +) +select is( + array_agg(task_index order by task_index), + ARRAY[0, 1, 2, 3, 4], + 'Map step with 5 elements should have task_index 0-4' +) from started_tasks; + +-- Test 4: Dependent map step preserves task_index +select pgflow_tests.reset_db(); +select pgflow.create_flow('map_chain'); +select pgflow.add_step('map_chain', 'first', '{}', null, null, null, null, 'map'); +select pgflow.add_step('map_chain', 'second', ARRAY['first'], null, null, null, null, 'map'); +select pgflow.start_flow('map_chain', '[10, 20]'::jsonb); + +-- Complete first map tasks +select pgflow_tests.ensure_worker('map_chain'); +-- Complete task index 0 +with poll_result as ( + select * from pgflow_tests.read_and_start('map_chain', 1, 1) limit 1 +) +select pgflow.complete_task( + run_id, + step_slug, + task_index, + jsonb_build_object('value', (input::int) * 2) +) from poll_result; +-- Complete task index 1 +with poll_result as ( + select * from pgflow_tests.read_and_start('map_chain', 1, 1) limit 1 +) +select pgflow.complete_task( + run_id, + step_slug, + task_index, + jsonb_build_object('value', (input::int) * 2) +) from poll_result; + +-- Now read and start second map tasks +select pgflow_tests.ensure_worker('map_chain', '22222222-2222-2222-2222-222222222222'::uuid); +with msgs as ( + select * from pgflow.read_with_poll('map_chain', 10, 5, 2, 50) order by msg_id +), +msg_ids as ( + select array_agg(msg_id order by msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'map_chain', + (select ids from msg_ids), + '22222222-2222-2222-2222-222222222222'::uuid + ) order by task_index +) +select is( + array_agg(task_index order by task_index), + ARRAY[0, 1], + 'Dependent map step should preserve task_index from parent' +) from started_tasks; + +-- Test 5: Multiple single steps in sequence all have task_index 0 +select pgflow_tests.reset_db(); +select pgflow.create_flow('sequential'); +select pgflow.add_step('sequential', 'step_a'); +select pgflow.add_step('sequential', 'step_b', ARRAY['step_a']); +select pgflow.add_step('sequential', 'step_c', ARRAY['step_b']); +select pgflow.start_flow('sequential', '{"test": true}'::jsonb); + +-- Process step_a +select pgflow_tests.ensure_worker('sequential'); +with poll_result as ( + select * from pgflow_tests.read_and_start('sequential', 1, 1) +) +select pgflow.complete_task( + run_id, + step_slug, + task_index, + '{"result": "a"}'::jsonb +) from poll_result; + +-- Process step_b +select pgflow_tests.ensure_worker('sequential', '33333333-3333-3333-3333-333333333333'::uuid); +with msgs as ( + select * from pgflow.read_with_poll('sequential', 10, 5, 1, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'sequential', + (select ids from msg_ids), + '33333333-3333-3333-3333-333333333333'::uuid + ) +) +select is( + (select task_index from started_tasks), + 0, + 'Sequential single steps should all have task_index 0' +); + +select finish(); +rollback; \ No newline at end of file diff --git a/pkgs/edge-worker/src/core/context.ts b/pkgs/edge-worker/src/core/context.ts index 45add00ec..c87dcc294 100644 --- a/pkgs/edge-worker/src/core/context.ts +++ b/pkgs/edge-worker/src/core/context.ts @@ -63,7 +63,8 @@ import { deepClone, deepFreeze } from './deepUtils.js'; export function createContextSafeConfig>( config: T ): Readonly : T> { - const { sql: _sql, ...safeConfig } = config as T & { sql?: unknown }; + const { sql, ...safeConfig } = config as T & { sql?: unknown }; + void sql; const clonedConfig = deepClone(safeConfig); return deepFreeze(clonedConfig) as Readonly : T>; } diff --git a/pkgs/edge-worker/src/examples/supabase-flow-example.ts b/pkgs/edge-worker/src/examples/supabase-flow-example.ts index 64bf2ac40..93ff10203 100644 --- a/pkgs/edge-worker/src/examples/supabase-flow-example.ts +++ b/pkgs/edge-worker/src/examples/supabase-flow-example.ts @@ -13,7 +13,7 @@ const myFlow = new Flow({ slug: 'supabase_example' }) }) .step({ slug: 'notify_admin', dependsOn: ['query_users'] }, async (input, ctx) => { // Supabase client available with service role access - const { data: _data, error } = await ctx.supabase + const { error } = await ctx.supabase .from('admin_notifications') .insert({ message: `Found ${input.query_users.users.length} active users`, @@ -25,7 +25,7 @@ const myFlow = new Flow({ slug: 'supabase_example' }) }) .step({ slug: 'public_update', dependsOn: ['query_users'] }, async (input, ctx) => { // Use the same client for all operations - const { data: _data } = await ctx.supabase + await ctx.supabase .from('public_stats') .update({ last_user_count: input.query_users.users.length }) .eq('id', 1); diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index 23b7ecaf0..729b250ac 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -69,7 +69,7 @@ export function createFlowWorker { + return await waitFor( + async () => { + const [run] = await sql` + SELECT * FROM pgflow.runs + WHERE run_id = ${runId} + AND status IN ('completed', 'failed') + LIMIT 1 + `; + return run || false; + }, + { + pollIntervalMs: options?.pollIntervalMs ?? 500, + timeoutMs: options?.timeoutMs ?? 15000, + description: `flow run ${runId} to complete`, + } + ); +}; + +/** + * Create a flow with a single root map step + */ +export const createRootMapFlow = async ( + sql: postgres.Sql, + flowSlug: string, + stepSlug: string +) => { + await sql`select pgflow.create_flow(${flowSlug});`; + await sql`select pgflow.add_step(${flowSlug}, ${stepSlug}, ARRAY[]::text[], null, null, null, null, 'map');`; +}; + +/** + * Create a flow with single steps (non-map) with dependencies + */ +export const createSimpleFlow = async ( + sql: postgres.Sql, + flowSlug: string, + steps: Array<{ slug: string; deps: string[] }> +) => { + await sql`select pgflow.create_flow(${flowSlug});`; + for (const step of steps) { + if (step.deps.length > 0) { + await sql`select pgflow.add_step(${flowSlug}, ${step.slug}, deps_slugs => ARRAY[${step.deps}]::text[]);`; + } else { + await sql`select pgflow.add_step(${flowSlug}, ${step.slug});`; + } + } +}; + +/** + * Create a flow with mixed step types (single and map) + */ +export const createMixedFlow = async ( + sql: postgres.Sql, + flowSlug: string, + steps: Array<{ slug: string; deps: string[]; type: 'single' | 'map' }> +) => { + await sql`select pgflow.create_flow(${flowSlug});`; + for (const step of steps) { + const deps = step.deps.length > 0 ? sql`ARRAY[${step.deps}]::text[]` : sql`ARRAY[]::text[]`; + await sql`select pgflow.add_step(${flowSlug}, ${step.slug}, ${deps}, null, null, null, null, ${step.type});`; + } +}; + +/** + * Get step states for a run + */ +export const getStepStates = async (sql: postgres.Sql, runId: string) => { + return await sql<{ step_slug: string; status: string }[]>` + SELECT step_slug, status FROM pgflow.step_states + WHERE run_id = ${runId} + ORDER BY step_slug; + `; +}; + +/** + * Get step tasks for a run, optionally filtered by step + */ +export const getStepTasks = async ( + sql: postgres.Sql, + runId: string, + stepSlug?: string +) => { + const whereClause = stepSlug + ? sql`WHERE run_id = ${runId} AND step_slug = ${stepSlug}` + : sql`WHERE run_id = ${runId}`; + + return await sql< + { step_slug: string; status: string; output: Json; task_index: number }[] + >` + SELECT step_slug, status, output, task_index FROM pgflow.step_tasks + ${whereClause} + ORDER BY step_slug, task_index; + `; +}; + +/** + * Get the final output of a run + */ +export const getRunOutput = async (sql: postgres.Sql, runId: string) => { + const [run] = await sql<{ status: string; output: Json }[]>` + SELECT status, output FROM pgflow.runs WHERE run_id = ${runId}; + `; + return run; +}; + +/** + * Assert all steps have completed successfully + */ +export const assertAllStepsCompleted = ( + stepStates: Array<{ step_slug: string; status: string }> +) => { + for (const state of stepStates) { + assertEquals(state.status, 'completed', `${state.step_slug} should be completed`); + } +}; + +/** + * Assert all tasks have completed successfully + */ +export const assertAllTasksCompleted = ( + stepTasks: Array<{ step_slug: string; status: string }> +) => { + for (const task of stepTasks) { + assertEquals( + task.status, + 'completed', + `Task for ${task.step_slug} should be completed` + ); + } +}; + +/** + * Monitor progress of task execution and display updates + */ +export const monitorProgress = async ( + sql: postgres.Sql, + runId: string, + totalExpected: number, + options?: { + intervalMs?: number; + showProgress?: boolean; + } +): Promise => { + const intervalMs = options?.intervalMs ?? 1000; + const showProgress = options?.showProgress ?? true; + + if (!showProgress) return; + + const startTime = Date.now(); + + const checkProgress = async () => { + const [stats] = await sql` + SELECT + COUNT(*) FILTER (WHERE status = 'completed') as completed, + COUNT(*) FILTER (WHERE status = 'failed') as failed, + COUNT(*) as total + FROM pgflow.step_tasks + WHERE run_id = ${runId} + `; + + const completed = Number(stats.completed); + const failed = Number(stats.failed); + const total = Number(stats.total); + + if (total === 0) return { completed: 0, failed: 0, total: 0, done: false }; + + // Calculate rate and ETA + const elapsed = (Date.now() - startTime) / 1000; + const rate = completed / elapsed; + const remaining = totalExpected - completed; + const eta = rate > 0 ? remaining / rate : 0; + + // Format progress bar + const percentage = Math.floor((completed / totalExpected) * 100); + const barLength = 30; + const filledLength = Math.floor((completed / totalExpected) * barLength); + const bar = '█'.repeat(filledLength) + '░'.repeat(barLength - filledLength); + + // Update single line with carriage return + const encoder = new TextEncoder(); + const progressText = `\r⏳ Progress: [${bar}] ${percentage}% | ${completed}/${totalExpected} tasks | ${rate.toFixed(1)}/s | ETA: ${eta.toFixed(0)}s`; + await Deno.stdout.write(encoder.encode(progressText)); + + const done = completed + failed >= totalExpected; + if (done) { + // Clear the line and print final status + await Deno.stdout.write(encoder.encode('\r' + ' '.repeat(80) + '\r')); + console.log(`✅ Completed: ${completed}/${totalExpected} tasks in ${elapsed.toFixed(1)}s (${rate.toFixed(1)} tasks/s)`); + } + + return { completed, failed, total, done }; + }; + + // Poll until done + while (true) { + const { done } = await checkProgress(); + if (done) break; + await delay(intervalMs); + } +}; \ No newline at end of file diff --git a/pkgs/edge-worker/tests/integration/flow/contextResourcesFlow.test.ts b/pkgs/edge-worker/tests/integration/flow/contextResourcesFlow.test.ts index 661bff513..334517f8c 100644 --- a/pkgs/edge-worker/tests/integration/flow/contextResourcesFlow.test.ts +++ b/pkgs/edge-worker/tests/integration/flow/contextResourcesFlow.test.ts @@ -1,9 +1,9 @@ import { assert, assertEquals, assertExists } from '@std/assert'; import { withPgNoTransaction } from '../../db.ts'; import { Flow } from '@pgflow/dsl/supabase'; -import { waitFor } from '../../e2e/_helpers.ts'; import { delay } from '@std/async'; import { startFlow, startWorker } from '../_helpers.ts'; +import { waitForRunCompletion, createSimpleFlow } from './_testHelpers.ts'; // Define a flow that tests ONLY the essential context resources provided by EdgeWorker const ContextResourcesFlow = new Flow<{ testId: number }>({ @@ -63,35 +63,17 @@ Deno.test( }); try { - // Create flow and steps in database - await sql`select pgflow.create_flow('test_context_resources_flow');`; - await sql`select pgflow.add_step('test_context_resources_flow', 'verifyContextResources');`; + // Setup: Create flow with single step that verifies context + await createSimpleFlow(sql, 'test_context_resources_flow', [ + { slug: 'verifyContextResources', deps: [] }, + ]); - // Start a flow run with test input + // Execute: Start flow and wait for completion const testId = Math.floor(Math.random() * 1000); const flowRun = await startFlow(sql, ContextResourcesFlow, { testId }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); - // Wait for the run to complete - const polledRun = await waitFor( - async () => { - const [run] = await sql` - SELECT * FROM pgflow.runs WHERE run_id = ${flowRun.run_id}; - `; - - if (run.status != 'completed' && run.status != 'failed') { - return false; - } - - return run; - }, - { - pollIntervalMs: 500, - timeoutMs: 5000, - description: `context resources flow run ${flowRun.run_id} to complete`, - } - ); - - // Verify the run completed successfully - this confirms that all context assertions passed + // Verify: Run completed (context assertions passed in handler) assertEquals( polledRun.status, 'completed', diff --git a/pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts b/pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts new file mode 100644 index 000000000..a5884efdd --- /dev/null +++ b/pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts @@ -0,0 +1,214 @@ +import { assert, assertEquals } from '@std/assert'; +import { withPgNoTransaction } from '../../db.ts'; +import { Flow } from '@pgflow/dsl'; +import { delay } from '@std/async'; +import { startFlow, startWorker } from '../_helpers.ts'; +import { + waitForRunCompletion, + createRootMapFlow, + createMixedFlow, + getStepStates, + getStepTasks, + getRunOutput, + assertAllStepsCompleted, +} from './_testHelpers.ts'; + +// Test 1: Root map - flow input is array, map processes each element +const RootMapFlow = new Flow({ slug: 'test_root_map_flow' }) + .map({ slug: 'double' }, async (num) => { + await delay(1); + return num * 2; + }); + +// Test 2: Dependent map - regular step returns array, map processes it +const DependentMapFlow = new Flow<{ prefix: string }>({ slug: 'test_dependent_map_flow' }) + .step({ slug: 'generateArray' }, async () => { + await delay(1); + return ['a', 'b', 'c']; + }) + .map({ slug: 'uppercase', array: 'generateArray' }, async (str) => { + await delay(1); + return str.toUpperCase(); + }) + .step({ slug: 'aggregate', dependsOn: ['uppercase'] }, async (input) => { + await delay(1); + return { + prefix: input.run.prefix, + processed: input.uppercase, + count: input.uppercase.length + }; + }); + +// Test 3: Empty array map - tests taskless cascade completion +const EmptyArrayMapFlow = new Flow({ slug: 'test_empty_array_map_flow' }) + .map({ slug: 'process' }, async (num) => { + await delay(1); + return num * 10; + }) + .step({ slug: 'summarize', dependsOn: ['process'] }, async (input) => { + await delay(1); + return { + isEmpty: input.process.length === 0, + results: input.process + }; + }); + +Deno.test( + 'root map executes successfully', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const worker = startWorker(sql, RootMapFlow, { + maxConcurrent: 3, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }); + + try { + // Setup: Create flow with root map step + await createRootMapFlow(sql, 'test_root_map_flow', 'double'); + + // Execute: Start flow with array input + const flowRun = await startFlow(sql, RootMapFlow, [1, 2, 3]); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + // Verify: Run completed successfully + assert(polledRun.status === 'completed', 'Run should be completed'); + + // Verify: Step states + const stepStates = await getStepStates(sql, flowRun.run_id); + assertEquals(stepStates.length, 1, 'Should have 1 step state'); + assertAllStepsCompleted(stepStates); + + // Verify: Tasks were created and completed with correct outputs + const stepTasks = await getStepTasks(sql, flowRun.run_id); + assertEquals(stepTasks.length, 3, 'Should have 3 tasks for 3 array elements'); + + // Verify each task output (inputs [1,2,3] doubled to [2,4,6]) + assertEquals(stepTasks[0].output, 2, 'First task (input 1) should output 2'); + assertEquals(stepTasks[1].output, 4, 'Second task (input 2) should output 4'); + assertEquals(stepTasks[2].output, 6, 'Third task (input 3) should output 6'); + + // Verify: Final aggregated output + const finalRun = await getRunOutput(sql, flowRun.run_id); + assertEquals(finalRun.status, 'completed', 'Run should be completed'); + assertEquals(finalRun.output, { double: [2, 4, 6] }, 'Run output should have aggregated results'); + + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'dependent map executes successfully', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const worker = startWorker(sql, DependentMapFlow, { + maxConcurrent: 3, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }); + + try { + // Setup: Create flow with dependent steps (single -> map -> single) + await createMixedFlow(sql, 'test_dependent_map_flow', [ + { slug: 'generateArray', deps: [], type: 'single' }, + { slug: 'uppercase', deps: ['generateArray'], type: 'map' }, + { slug: 'aggregate', deps: ['uppercase'], type: 'single' } + ]); + + // Execute: Start flow and wait for completion + const flowRun = await startFlow(sql, DependentMapFlow, { prefix: 'test' }); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + // Verify: Run completed successfully + assert(polledRun.status === 'completed', 'Run should be completed'); + + // Verify: All steps completed + const stepStates = await getStepStates(sql, flowRun.run_id); + assertEquals(stepStates.length, 3, 'Should have 3 step states'); + assertAllStepsCompleted(stepStates); + + // Verify: Map step created correct tasks + const mapTasks = await getStepTasks(sql, flowRun.run_id, 'uppercase'); + assertEquals(mapTasks.length, 3, 'Should have 3 map tasks'); + + // Verify map outputs (inputs ["a","b","c"] uppercased to ["A","B","C"]) + assertEquals(mapTasks[0].output, 'A', 'First task (input "a") should output "A"'); + assertEquals(mapTasks[1].output, 'B', 'Second task (input "b") should output "B"'); + assertEquals(mapTasks[2].output, 'C', 'Third task (input "c") should output "C"'); + + // Verify: Final aggregated output + const finalRun = await getRunOutput(sql, flowRun.run_id); + assertEquals(finalRun.status, 'completed', 'Run should be completed'); + assertEquals( + finalRun.output, + { + aggregate: { + prefix: 'test', + processed: ['A', 'B', 'C'], + count: 3 + } + }, + 'Run output should have final aggregated results' + ); + + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'empty array map completes without tasks', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const worker = startWorker(sql, EmptyArrayMapFlow, { + maxConcurrent: 3, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }); + + try { + // Setup: Create flow with map step followed by dependent step + await createMixedFlow(sql, 'test_empty_array_map_flow', [ + { slug: 'process', deps: [], type: 'map' }, + { slug: 'summarize', deps: ['process'], type: 'single' } + ]); + + // Execute: Start flow with empty array + const flowRun = await startFlow(sql, EmptyArrayMapFlow, []); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); + + // Verify: Run completed despite empty array + assert(polledRun.status === 'completed', 'Run should be completed'); + + // Verify: Both steps completed + const stepStates = await getStepStates(sql, flowRun.run_id); + assertEquals(stepStates.length, 2, 'Should have 2 step states'); + assertAllStepsCompleted(stepStates); + + // Verify: No tasks created for empty array map + const mapTasks = await getStepTasks(sql, flowRun.run_id, 'process'); + assertEquals(mapTasks.length, 0, 'Should have NO tasks for empty array map'); + + // Verify: Summarize step handled empty array correctly + const finalRun = await getRunOutput(sql, flowRun.run_id); + assertEquals(finalRun.status, 'completed', 'Run should be completed'); + assertEquals( + finalRun.output, + { summarize: { isEmpty: true, results: [] } }, + 'Run output should show empty results' + ); + + } finally { + await worker.stop(); + } + }) +); \ No newline at end of file diff --git a/pkgs/edge-worker/tests/integration/flow/minimalFlow.test.ts b/pkgs/edge-worker/tests/integration/flow/minimalFlow.test.ts index e1b44d83c..306f37345 100644 --- a/pkgs/edge-worker/tests/integration/flow/minimalFlow.test.ts +++ b/pkgs/edge-worker/tests/integration/flow/minimalFlow.test.ts @@ -1,10 +1,17 @@ import { assert, assertEquals } from '@std/assert'; import { withPgNoTransaction } from '../../db.ts'; import { Flow } from '@pgflow/dsl'; -import { waitFor } from '../../e2e/_helpers.ts'; import { delay } from '@std/async'; -import type { Json } from '@pgflow/core'; import { startFlow, startWorker } from '../_helpers.ts'; +import { + waitForRunCompletion, + createSimpleFlow, + getStepStates, + getStepTasks, + getRunOutput, + assertAllStepsCompleted, + assertAllTasksCompleted, +} from './_testHelpers.ts'; // Define a minimal flow with two steps: // 1. Convert a number to a string @@ -35,91 +42,39 @@ Deno.test( }); try { - await sql`select pgflow.create_flow('test_minimal_flow');`; - await sql`select pgflow.add_step('test_minimal_flow', 'toStringStep');`; - await sql`select pgflow.add_step('test_minimal_flow', 'wrapInArrayStep', deps_slugs => ARRAY['toStringStep']::text[]);`; + // Setup: Create flow with two dependent steps + await createSimpleFlow(sql, 'test_minimal_flow', [ + { slug: 'toStringStep', deps: [] }, + { slug: 'wrapInArrayStep', deps: ['toStringStep'] }, + ]); - // Start a flow run with input value 42 + // Execute: Start flow with input value 42 const flowRun = await startFlow(sql, MinimalFlow, 42); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); - let i = 0; - // Wait for the run to complete with a timeout - const polledRun = await waitFor( - async () => { - // Check run status - const [run] = await sql` - SELECT * FROM pgflow.runs WHERE run_id = ${flowRun.run_id}; - `; - - i += 1; - console.log(`Run ${i}`, run); - - if (run.status != 'completed' && run.status != 'failed') { - return false; - } - - return run; - }, - { - pollIntervalMs: 500, - timeoutMs: 5000, - description: `flow run ${flowRun.run_id} to be 'completed'`, - } - ); - - console.log('Polled run', polledRun); - + // Verify: Run completed successfully assert(polledRun.status === 'completed', 'Run should be completed'); - // Verify step_states are all completed - const stepStates = await sql<{ step_slug: string; status: string }[]>` - SELECT step_slug, status FROM pgflow.step_states - WHERE run_id = ${flowRun.run_id} - ORDER BY step_slug; - `; - - console.log('Step states:', stepStates); - assertEquals( - stepStates.map((s) => s.status), - ['completed', 'completed'], - 'All step states should be completed' - ); - - // Verify step_tasks are all succeeded - const stepTasks = await sql< - { step_slug: string; status: string; output: Json }[] - >` - SELECT step_slug, status, output FROM pgflow.step_tasks - WHERE run_id = ${flowRun.run_id} - ORDER BY step_slug; - `; - - console.log('Step tasks:', stepTasks); - assertEquals( - stepTasks.map((s) => s.status), - ['completed', 'completed'], - 'All step tasks should be succeeded' - ); - - // Verify run is succeeded - const [finalRun] = await sql<{ status: string; output: unknown }[]>` - SELECT status, output FROM pgflow.runs WHERE run_id = ${flowRun.run_id}; - `; + // Verify: All steps completed + const stepStates = await getStepStates(sql, flowRun.run_id); + assertEquals(stepStates.length, 2, 'Should have 2 step states'); + assertAllStepsCompleted(stepStates); - console.log('Final run:', finalRun); - assertEquals(finalRun.status, 'completed', 'Run should be succeeded'); + // Verify: All tasks completed + const stepTasks = await getStepTasks(sql, flowRun.run_id); + assertEquals(stepTasks.length, 2, 'Should have 2 step tasks'); + assertAllTasksCompleted(stepTasks); - // Verify run output matches expected ["42"] + // Verify: Final output matches expected ["42"] + const finalRun = await getRunOutput(sql, flowRun.run_id); + assertEquals(finalRun.status, 'completed', 'Run should be completed'); assertEquals( finalRun.output, { wrapInArrayStep: ['42'] }, 'Run output should match expected value' ); } finally { - console.log('Stopping worker'); - // Stop the worker await worker.stop(); - console.log('Worker stopped'); } }) ); diff --git a/pkgs/edge-worker/tests/integration/flow/performanceMapFlow.test.ts b/pkgs/edge-worker/tests/integration/flow/performanceMapFlow.test.ts new file mode 100644 index 000000000..d46f3c9e1 --- /dev/null +++ b/pkgs/edge-worker/tests/integration/flow/performanceMapFlow.test.ts @@ -0,0 +1,804 @@ +import { assert, assertEquals } from '@std/assert'; +import { withPgNoTransaction } from '../../db.ts'; +import { Flow } from '@pgflow/dsl'; +import { delay } from '@std/async'; +import { startFlow, startWorker } from '../_helpers.ts'; +import { + waitForRunCompletion, + createRootMapFlow, + getStepStates, + getStepTasks, + getRunOutput, + assertAllStepsCompleted, + monitorProgress, +} from './_testHelpers.ts'; +import type { postgres } from '../../sql.ts'; +import type { RunRow } from '@pgflow/core'; + +// Performance metrics collector +interface PerformanceMetrics { + arraySize: number; + startTime: number; + flowStartTime: number; + flowStartedTime: number; // When start_flow returns + firstTaskPickupTime: number; // When first task is picked up + flowEndTime: number; + totalDuration: number; + flowExecutionTime: number; + flowStartupTime: number; // Time to start flow + timeToFirstTask: number; // Time from start to first pickup + tasksPerSecond: number; + successfulTasks: number; + failedTasks: number; +} + +// Multi-flow performance metrics +interface MultiFlowMetrics { + numFlows: number; + numWorkers: number; + totalElements: number; + overallStartTime: number; + overallEndTime: number; + overallDuration: number; + individualFlowMetrics: PerformanceMetrics[]; + averageFlowStartupTime: number; + averageTimeToFirstTask: number; + totalTasksPerSecond: number; + workerUtilization: number; +} + +// Helper to format duration in human-readable format +const formatDuration = (ms: number): string => { + if (ms < 1000) return `${ms.toFixed(0)}ms`; + if (ms < 60000) return `${(ms / 1000).toFixed(2)}s`; + return `${(ms / 60000).toFixed(2)}m`; +}; + +// Helper to print performance report with tasteful emojis +const printPerformanceReport = (metrics: PerformanceMetrics) => { + const separator = '─'.repeat(60); + + console.log(`\n${separator}`); + console.log('📊 Performance Test Results'); + console.log(separator); + + console.log(`\n📈 Test Configuration:`); + console.log(` Array Size: ${metrics.arraySize.toLocaleString()} elements`); + + console.log(`\n⏱️ Timing Breakdown:`); + console.log(` Total Duration: ${formatDuration(metrics.totalDuration)}`); + console.log( + ` Flow Execution: ${formatDuration(metrics.flowExecutionTime)}` + ); + console.log( + ` Setup Overhead: ${formatDuration( + metrics.flowStartTime - metrics.startTime + )}` + ); + + console.log(`\n🎯 Critical Metrics:`); + console.log( + ` Flow Startup Time: ${formatDuration(metrics.flowStartupTime)}` + ); + console.log( + ` Time to First Task: ${formatDuration(metrics.timeToFirstTask)}` + ); + console.log(` Tasks/Second: ${metrics.tasksPerSecond.toFixed(2)}`); + console.log( + ` Avg Task Time: ${( + metrics.flowExecutionTime / metrics.arraySize + ).toFixed(2)}ms` + ); + + console.log(`\n✅ Results:`); + console.log(` Successful: ${metrics.successfulTasks.toLocaleString()}`); + console.log(` Failed: ${metrics.failedTasks}`); + + // Performance rating based on tasks per second + const rating = + metrics.tasksPerSecond > 100 + ? '🏆 Excellent!' + : metrics.tasksPerSecond > 50 + ? '👍 Good' + : metrics.tasksPerSecond > 20 + ? '📌 Acceptable' + : '⚠️ Needs Optimization'; + + // Startup performance rating + const startupRating = + metrics.timeToFirstTask < 100 + ? '⚡ Very Fast' + : metrics.timeToFirstTask < 500 + ? '✨ Fast' + : metrics.timeToFirstTask < 1000 + ? '👌 Normal' + : '🐌 Slow'; + + console.log(`\n${separator}`); + console.log(`Performance Rating: ${rating}`); + console.log(`Startup Speed: ${startupRating}`); + console.log(`${separator}\n`); +}; + +// Define a flow that processes a large array of strings +const LargeArrayMapFlow = new Flow({ + slug: 'test_large_array_map_flow', +}).map({ slug: 'processString' }, async (str) => { + // Simulate some string processing work + await delay(Math.random() * 10); // Random delay 0-10ms to simulate variable processing time + + // Transform the string (simple uppercase + length calculation) + return { + original: str, + uppercase: str.toUpperCase(), + length: str.length, + reversed: str.split('').reverse().join(''), + }; +}); + +// Helper to generate test data +const generateTestStrings = ( + count: number, + flowIndex: number = 0 +): string[] => { + const words = [ + 'alpha', + 'beta', + 'gamma', + 'delta', + 'epsilon', + 'zeta', + 'eta', + 'theta', + 'iota', + 'kappa', + ]; + const strings: string[] = []; + + for (let i = 0; i < count; i++) { + // Generate varied strings to test different processing times + const word = words[i % words.length]; + const suffix = Math.floor(i / words.length) + .toString() + .padStart(4, '0'); + strings.push(`flow${flowIndex}_${word}_${suffix}`); + } + + return strings; +}; + +// Helper to start a single flow and track its metrics +const startFlowWithMetrics = async ( + sql: postgres.Sql, + flow: typeof LargeArrayMapFlow, + testData: string[], + _flowIndex: number +): Promise<{ flowRun: RunRow; metrics: PerformanceMetrics }> => { + const metrics: PerformanceMetrics = { + arraySize: testData.length, + startTime: Date.now(), + flowStartTime: 0, + flowStartedTime: 0, + firstTaskPickupTime: 0, + flowEndTime: 0, + totalDuration: 0, + flowExecutionTime: 0, + flowStartupTime: 0, + timeToFirstTask: 0, + tasksPerSecond: 0, + successfulTasks: 0, + failedTasks: 0, + }; + + // Measure flow startup time + metrics.flowStartTime = Date.now(); + const flowRun = await startFlow(sql, flow, testData); + metrics.flowStartedTime = Date.now(); + metrics.flowStartupTime = metrics.flowStartedTime - metrics.flowStartTime; + + return { flowRun, metrics }; +}; + +// Helper to monitor first task pickup +const monitorFirstTaskPickup = async ( + sql: postgres.Sql, + runId: string, + metrics: PerformanceMetrics +): Promise => { + // Poll for first task pickup + let pollAttempts = 0; + while (!metrics.firstTaskPickupTime && pollAttempts < 50) { + const result = await sql` + SELECT MIN(started_at) as first_task_time + FROM pgflow.step_tasks + WHERE run_id = ${runId} + AND started_at IS NOT NULL + `; + if (result[0].first_task_time) { + metrics.firstTaskPickupTime = new Date( + result[0].first_task_time + ).getTime(); + metrics.timeToFirstTask = + metrics.firstTaskPickupTime - metrics.flowStartedTime; + break; + } + await delay(100); + pollAttempts++; + } +}; + +// Helper to collect final metrics for a flow +const collectFlowMetrics = async ( + sql: postgres.Sql, + flowRun: RunRow, + metrics: PerformanceMetrics +): Promise => { + metrics.flowEndTime = Date.now(); + metrics.totalDuration = metrics.flowEndTime - metrics.startTime; + metrics.flowExecutionTime = metrics.flowEndTime - metrics.flowStartTime; + metrics.tasksPerSecond = + metrics.arraySize / (metrics.flowExecutionTime / 1000); + + // Get task completion stats + const stepTasks = await getStepTasks(sql, flowRun.run_id); + metrics.successfulTasks = stepTasks.filter( + (t) => t.status === 'completed' + ).length; + metrics.failedTasks = stepTasks.filter((t) => t.status === 'failed').length; +}; + +// Helper to start multiple workers +interface WorkerConfig { + maxConcurrent: number; + [key: string]: unknown; +} + +const startWorkers = ( + sql: postgres.Sql, + flow: typeof LargeArrayMapFlow, + numWorkers: number, + config: WorkerConfig +) => { + const workers = []; + + for (let i = 0; i < numWorkers; i++) { + const worker = startWorker(sql, flow, { + ...config, + // Add slight variation to worker configs to simulate real-world + maxConcurrent: config.maxConcurrent + (i % 2) * 5, + }); + workers.push(worker); + } + + return workers; +}; + +// Helper to print multi-flow performance report +const printMultiFlowReport = (metrics: MultiFlowMetrics) => { + const separator = '═'.repeat(60); + + console.log(`\n${separator}`); + console.log('📊 Multi-Flow Performance Test Results'); + console.log(separator); + + console.log(`\n🔄 Test Configuration:`); + console.log(` Concurrent Flows: ${metrics.numFlows}`); + console.log(` Worker Instances: ${metrics.numWorkers}`); + console.log(` Total Elements: ${metrics.totalElements.toLocaleString()}`); + + console.log(`\n⏱️ Overall Timing:`); + console.log(` Total Duration: ${formatDuration(metrics.overallDuration)}`); + console.log( + ` Avg Flow Startup: ${formatDuration(metrics.averageFlowStartupTime)}` + ); + console.log( + ` Avg Time to First Task: ${formatDuration( + metrics.averageTimeToFirstTask + )}` + ); + + console.log(`\n🚀 Throughput:`); + console.log( + ` Combined Tasks/Second: ${metrics.totalTasksPerSecond.toFixed(2)}` + ); + console.log( + ` Tasks per Worker/Second: ${( + metrics.totalTasksPerSecond / metrics.numWorkers + ).toFixed(2)}` + ); + + console.log(`\n📈 Individual Flow Stats:`); + metrics.individualFlowMetrics.forEach((flow, i) => { + console.log( + ` Flow ${i + 1}: ${flow.successfulTasks}/${ + flow.arraySize + } tasks, ${formatDuration(flow.flowExecutionTime)}` + ); + }); + + console.log(`\n${separator}\n`); +}; + +// Multi-flow/multi-worker test +Deno.test( + 'multi-flow multi-worker performance test - 10 flows, 4 workers', + { + sanitizeOps: false, + sanitizeResources: false, + }, + withPgNoTransaction(async (sql) => { + // Save console methods for later restoration + const originalLog = console.log; + const originalDebug = console.debug; + + await sql`select pgflow_tests.reset_db();`; + + // Suppress verbose worker output during test + console.log = () => {}; + console.debug = () => {}; + + // Configuration + const NUM_FLOWS = 100; + const NUM_WORKERS = 4; + const ELEMENTS_PER_FLOW = 100; + + const workers = startWorkers(sql, LargeArrayMapFlow, NUM_WORKERS, { + maxConcurrent: 25, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 50, + }); + + try { + // Restore console for our status messages + console.log = originalLog; + + console.log('\n🔧 Setting up multi-flow performance test...'); + await createRootMapFlow( + sql, + 'test_large_array_map_flow', + 'processString' + ); + + console.log( + `📝 Starting ${NUM_FLOWS} flows with ${ELEMENTS_PER_FLOW} elements each...` + ); + console.log(`👷 Running with ${NUM_WORKERS} workers...`); + + const multiMetrics: MultiFlowMetrics = { + numFlows: NUM_FLOWS, + numWorkers: NUM_WORKERS, + totalElements: NUM_FLOWS * ELEMENTS_PER_FLOW, + overallStartTime: Date.now(), + overallEndTime: 0, + overallDuration: 0, + individualFlowMetrics: [], + averageFlowStartupTime: 0, + averageTimeToFirstTask: 0, + totalTasksPerSecond: 0, + workerUtilization: 0, + }; + + // Suppress output during execution + console.log = () => {}; + + // Start all flows concurrently + const flowPromises = []; + for (let i = 0; i < NUM_FLOWS; i++) { + const testData = generateTestStrings(ELEMENTS_PER_FLOW, i); + flowPromises.push( + startFlowWithMetrics(sql, LargeArrayMapFlow, testData, i) + ); + } + + const flowResults = await Promise.all(flowPromises); + + // Monitor first task pickup for all flows + const monitorPromises = flowResults.map(({ flowRun, metrics }) => + monitorFirstTaskPickup(sql, flowRun.run_id, metrics) + ); + await Promise.all(monitorPromises); + + // Wait for all flows to complete + const completionPromises = flowResults.map( + async ({ flowRun, metrics }) => { + await waitForRunCompletion(sql, flowRun.run_id); + await collectFlowMetrics(sql, flowRun, metrics); + return metrics; + } + ); + + multiMetrics.individualFlowMetrics = await Promise.all( + completionPromises + ); + multiMetrics.overallEndTime = Date.now(); + multiMetrics.overallDuration = + multiMetrics.overallEndTime - multiMetrics.overallStartTime; + + // Calculate aggregated metrics + multiMetrics.averageFlowStartupTime = + multiMetrics.individualFlowMetrics.reduce( + (sum, m) => sum + m.flowStartupTime, + 0 + ) / NUM_FLOWS; + multiMetrics.averageTimeToFirstTask = + multiMetrics.individualFlowMetrics.reduce( + (sum, m) => sum + m.timeToFirstTask, + 0 + ) / NUM_FLOWS; + multiMetrics.totalTasksPerSecond = + multiMetrics.totalElements / (multiMetrics.overallDuration / 1000); + + // Restore console for results + console.log = originalLog; + + // Print report + printMultiFlowReport(multiMetrics); + + // Assertions + const totalSuccessful = multiMetrics.individualFlowMetrics.reduce( + (sum, m) => sum + m.successfulTasks, + 0 + ); + assertEquals( + totalSuccessful, + multiMetrics.totalElements, + 'All tasks should complete' + ); + + assert( + multiMetrics.totalTasksPerSecond > 50, + `Multi-flow throughput too low: ${multiMetrics.totalTasksPerSecond.toFixed( + 2 + )} tasks/sec` + ); + } finally { + // Restore console + console.log = originalLog; + console.debug = originalDebug; + + // Stop all workers + await Promise.all(workers.map((w) => w.stop())); + } + }) +); + +// Original single-flow test (renamed for clarity) +Deno.test( + 'single-flow performance test - 1000 elements', + { + // Increase timeout for performance test (3 minutes should be plenty) + sanitizeOps: false, + sanitizeResources: false, + }, + withPgNoTransaction(async (sql) => { + // Save console methods for later restoration + const originalLog = console.log; + const originalDebug = console.debug; + + const metrics: PerformanceMetrics = { + arraySize: 1000, + startTime: Date.now(), + flowStartTime: 0, + flowStartedTime: 0, + firstTaskPickupTime: 0, + flowEndTime: 0, + totalDuration: 0, + flowExecutionTime: 0, + flowStartupTime: 0, + timeToFirstTask: 0, + tasksPerSecond: 0, + successfulTasks: 0, + failedTasks: 0, + }; + + await sql`select pgflow_tests.reset_db();`; + + // Suppress verbose worker output during test + console.log = () => {}; + console.debug = () => {}; + + const worker = startWorker(sql, LargeArrayMapFlow, { + maxConcurrent: 50, // Higher concurrency for performance test + batchSize: 25, // Larger batch size for better throughput + maxPollSeconds: 1, + pollIntervalMs: 100, // Faster polling for performance + }); + + try { + // Restore console for our status messages + console.log = originalLog; + + // Setup: Create flow with root map step + console.log('\n🔧 Setting up performance test...'); + await createRootMapFlow( + sql, + 'test_large_array_map_flow', + 'processString' + ); + + // Generate test data + console.log(`📝 Generating ${metrics.arraySize} test strings...`); + const testData = generateTestStrings(metrics.arraySize); + + // Execute: Start flow with large array + console.log(`🚀 Starting flow with ${metrics.arraySize} elements...`); + + // Suppress verbose output during execution + console.log = () => {}; + + // Measure flow startup time + metrics.flowStartTime = Date.now(); + const flowRun = await startFlow(sql, LargeArrayMapFlow, testData); + metrics.flowStartedTime = Date.now(); + metrics.flowStartupTime = metrics.flowStartedTime - metrics.flowStartTime; + + // Restore console for progress display + console.log = originalLog; + + // Poll for first task pickup + const firstTaskResult = await sql` + SELECT MIN(started_at) as first_task_time + FROM pgflow.step_tasks + WHERE run_id = ${flowRun.run_id} + AND started_at IS NOT NULL + `; + + // Keep polling until we get the first task or timeout + let pollAttempts = 0; + while (!firstTaskResult[0].first_task_time && pollAttempts < 50) { + await delay(100); + const result = await sql` + SELECT MIN(started_at) as first_task_time + FROM pgflow.step_tasks + WHERE run_id = ${flowRun.run_id} + AND started_at IS NOT NULL + `; + if (result[0].first_task_time) { + metrics.firstTaskPickupTime = new Date( + result[0].first_task_time + ).getTime(); + metrics.timeToFirstTask = + metrics.firstTaskPickupTime - metrics.flowStartedTime; + break; + } + pollAttempts++; + } + + // Suppress verbose output again during polling + console.log = () => {}; + + // Start progress monitoring and wait for completion in parallel + const [polledRun] = await Promise.all([ + waitForRunCompletion(sql, flowRun.run_id, { timeoutMs: 180000 }), + monitorProgress(sql, flowRun.run_id, metrics.arraySize), + ]); + metrics.flowEndTime = Date.now(); + + // Restore console for results + console.log = originalLog; + + // Calculate timing metrics + metrics.totalDuration = metrics.flowEndTime - metrics.startTime; + metrics.flowExecutionTime = metrics.flowEndTime - metrics.flowStartTime; + metrics.tasksPerSecond = + metrics.arraySize / (metrics.flowExecutionTime / 1000); + + // If we didn't get first task time in the loop, try one more time + if (!metrics.firstTaskPickupTime) { + const finalCheck = await sql` + SELECT MIN(started_at) as first_task_time + FROM pgflow.step_tasks + WHERE run_id = ${flowRun.run_id} + AND started_at IS NOT NULL + `; + if (finalCheck[0].first_task_time) { + metrics.firstTaskPickupTime = new Date( + finalCheck[0].first_task_time + ).getTime(); + metrics.timeToFirstTask = + metrics.firstTaskPickupTime - metrics.flowStartedTime; + } + } + + // Verify: Run completed successfully + assert(polledRun.status === 'completed', 'Run should be completed'); + + // Verify: Step states + const stepStates = await getStepStates(sql, flowRun.run_id); + assertEquals(stepStates.length, 1, 'Should have 1 step state'); + assertAllStepsCompleted(stepStates); + + // Get detailed task information + const stepTasks = await getStepTasks(sql, flowRun.run_id); + assertEquals( + stepTasks.length, + metrics.arraySize, + `Should have ${metrics.arraySize} tasks` + ); + + // Count successful vs failed tasks + metrics.successfulTasks = stepTasks.filter( + (t) => t.status === 'completed' + ).length; + metrics.failedTasks = stepTasks.filter( + (t) => t.status === 'failed' + ).length; + + // Verify all tasks completed successfully + assertEquals( + metrics.successfulTasks, + metrics.arraySize, + 'All tasks should complete successfully' + ); + assertEquals(metrics.failedTasks, 0, 'No tasks should fail'); + + // Sample verification - check a few outputs to ensure correctness + const sampleTask = stepTasks[0]; + assert(sampleTask.output !== null, 'Task should have output'); + + // Type assertion for the output structure + const output = sampleTask.output as Record; + assert('original' in output, 'Output should have original field'); + assert('uppercase' in output, 'Output should have uppercase field'); + assert('length' in output, 'Output should have length field'); + assert('reversed' in output, 'Output should have reversed field'); + + // Verify: Final aggregated output exists and has correct structure + const finalRun = await getRunOutput(sql, flowRun.run_id); + assertEquals(finalRun.status, 'completed', 'Run should be completed'); + assert(finalRun.output !== null, 'Run should have output'); + + // The output should have processString key with array of results + const runOutput = finalRun.output as Record; + assert( + 'processString' in runOutput, + 'Output should have processString key' + ); + assert( + Array.isArray(runOutput.processString), + 'processString should be an array' + ); + assertEquals( + runOutput.processString.length, + metrics.arraySize, + 'Should have all results' + ); + + // Print performance report + printPerformanceReport(metrics); + + // Performance assertions (adjust thresholds based on your requirements) + assert( + metrics.tasksPerSecond > 10, + `Performance too low: ${metrics.tasksPerSecond.toFixed( + 2 + )} tasks/sec (minimum: 10)` + ); + + assert( + metrics.flowExecutionTime < 120000, + `Execution took too long: ${formatDuration( + metrics.flowExecutionTime + )} (maximum: 2 minutes)` + ); + } finally { + // Restore console + console.log = originalLog; + console.debug = originalDebug; + await worker.stop(); + } + }) +); + +// Additional test with even larger array (optional - can be enabled for stress testing) +Deno.test( + 'large array map stress test - 5000 elements', + { + sanitizeOps: false, + sanitizeResources: false, + }, + withPgNoTransaction(async (sql) => { + // Save console methods for later restoration + const originalLog = console.log; + const originalDebug = console.debug; + + const metrics: PerformanceMetrics = { + arraySize: 5000, + startTime: Date.now(), + flowStartTime: 0, + flowStartedTime: 0, + firstTaskPickupTime: 0, + flowEndTime: 0, + totalDuration: 0, + flowExecutionTime: 0, + flowStartupTime: 0, + timeToFirstTask: 0, + tasksPerSecond: 0, + successfulTasks: 0, + failedTasks: 0, + }; + + await sql`select pgflow_tests.reset_db();`; + + // Suppress verbose output + console.log = () => {}; + console.debug = () => {}; + + const worker = startWorker(sql, LargeArrayMapFlow, { + maxConcurrent: 100, // Even higher concurrency for stress test + batchSize: 50, + maxPollSeconds: 2, + pollIntervalMs: 50, + }); + + try { + await createRootMapFlow( + sql, + 'test_large_array_map_flow', + 'processString' + ); + + const testData = generateTestStrings(metrics.arraySize); + + // Restore console for status message + console.log = originalLog; + console.log( + `\n🔥 Stress test: Starting flow with ${metrics.arraySize} elements...` + ); + + // Suppress verbose output during execution + console.log = () => {}; + + // Measure flow startup time + metrics.flowStartTime = Date.now(); + const flowRun = await startFlow(sql, LargeArrayMapFlow, testData); + metrics.flowStartedTime = Date.now(); + metrics.flowStartupTime = metrics.flowStartedTime - metrics.flowStartTime; + + // Restore console for progress display + console.log = originalLog; + + // Suppress verbose output again during polling + console.log = () => {}; + + // Start progress monitoring and wait for completion in parallel + const [polledRun] = await Promise.all([ + waitForRunCompletion(sql, flowRun.run_id, { + timeoutMs: 300000, // 5 minutes + pollIntervalMs: 1000, // Poll every second + }), + monitorProgress(sql, flowRun.run_id, metrics.arraySize), + ]); + metrics.flowEndTime = Date.now(); + + // Restore console for results + console.log = originalLog; + + metrics.totalDuration = metrics.flowEndTime - metrics.startTime; + metrics.flowExecutionTime = metrics.flowEndTime - metrics.flowStartTime; + metrics.tasksPerSecond = + metrics.arraySize / (metrics.flowExecutionTime / 1000); + + assert( + polledRun.status === 'completed', + 'Stress test run should complete' + ); + + const stepTasks = await getStepTasks(sql, flowRun.run_id); + metrics.successfulTasks = stepTasks.filter( + (t) => t.status === 'completed' + ).length; + metrics.failedTasks = stepTasks.filter( + (t) => t.status === 'failed' + ).length; + + printPerformanceReport(metrics); + } finally { + // Restore console + console.log = originalLog; + console.debug = originalDebug; + await worker.stop(); + } + }) +); diff --git a/pkgs/edge-worker/tests/integration/flow/startDelayFlow.test.ts b/pkgs/edge-worker/tests/integration/flow/startDelayFlow.test.ts index cc4e8d567..7b9c420de 100644 --- a/pkgs/edge-worker/tests/integration/flow/startDelayFlow.test.ts +++ b/pkgs/edge-worker/tests/integration/flow/startDelayFlow.test.ts @@ -4,6 +4,7 @@ import { Flow } from '@pgflow/dsl'; import { waitFor } from '../../e2e/_helpers.ts'; import { delay } from '@std/async'; import { startFlow, startWorker } from '../_helpers.ts'; +import { waitForRunCompletion, getStepStates, getStepTasks, getRunOutput } from './_testHelpers.ts'; // Test flow with root step delay const RootStepDelayFlow = new Flow<{ message: string }>({ @@ -14,8 +15,8 @@ const RootStepDelayFlow = new Flow<{ message: string }>({ .step({ slug: 'delayedRoot', startDelay: 2 // 2 second delay - }, (input) => { - console.log('Executing delayedRoot step'); + }, async (input) => { + await delay(1); return `Delayed: ${input.run.message}`; }); @@ -25,16 +26,16 @@ const NormalStepDelayFlow = new Flow<{ value: number }>({ }) .step({ slug: 'immediate' - }, (input) => { - console.log('Executing immediate step'); + }, async (input) => { + await delay(1); return input.run.value * 2; }) .step({ slug: 'delayed', dependsOn: ['immediate'], startDelay: 3 // 3 second delay after immediate completes - }, (input) => { - console.log('Executing delayed step'); + }, async (input) => { + await delay(1); return (input.immediate as number) + 10; }); @@ -45,24 +46,24 @@ const CascadedDelayFlow = new Flow<{ start: string }>({ .step({ slug: 'first', startDelay: 1 // 1 second delay - }, (input) => { - console.log('Executing first step'); + }, async (input) => { + await delay(1); return `${input.run.start}->first`; }) .step({ slug: 'second', dependsOn: ['first'], startDelay: 2 // 2 second delay after first completes - }, (input) => { - console.log('Executing second step'); + }, async (input) => { + await delay(1); return `${input.first}->second`; }) .step({ slug: 'third', dependsOn: ['second'], startDelay: 1 // 1 second delay after second completes - }, (input) => { - console.log('Executing third step'); + }, async (input) => { + await delay(1); return `${input.second}->third`; }); @@ -102,24 +103,11 @@ Deno.test( assertEquals(initialTask.status, 'queued', 'Task should be queued with delay'); // Wait for completion - const polledRun = await waitFor( - async () => { - const [run] = await sql` - SELECT * FROM pgflow.runs WHERE run_id = ${flowRun.run_id}; - `; - return run.status === 'completed' ? run : false; - }, - { - pollIntervalMs: 200, - timeoutMs: 10000, - description: `root step delay flow to complete`, - } - ); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); const endTime = Date.now(); const duration = (endTime - startTime) / 1000; - console.log(`Flow completed in ${duration}s`); assert(duration >= 2, 'Flow should take at least 2 seconds due to startDelay'); // Verify output @@ -159,11 +147,7 @@ Deno.test( await delay(1000); // Check that immediate step is completed but delayed step is still queued - const stepStates = await sql` - SELECT step_slug, status FROM pgflow.step_states - WHERE run_id = ${flowRun.run_id} - ORDER BY step_slug; - `; + const stepStates = await getStepStates(sql, flowRun.run_id); const immediateState = stepStates.find(s => s.step_slug === 'immediate'); const delayedState = stepStates.find(s => s.step_slug === 'delayed'); @@ -172,31 +156,16 @@ Deno.test( assertEquals(delayedState?.status, 'started', 'Delayed step should be started (waiting)'); // Check task status - const [delayedTask] = await sql` - SELECT status FROM pgflow.step_tasks - WHERE run_id = ${flowRun.run_id} AND step_slug = 'delayed'; - `; + const delayedTasks = await getStepTasks(sql, flowRun.run_id, 'delayed'); + const delayedTask = delayedTasks[0]; assertEquals(delayedTask.status, 'queued', 'Delayed task should still be queued'); // Wait for completion - const polledRun = await waitFor( - async () => { - const [run] = await sql` - SELECT * FROM pgflow.runs WHERE run_id = ${flowRun.run_id}; - `; - return run.status === 'completed' ? run : false; - }, - { - pollIntervalMs: 200, - timeoutMs: 10000, - description: `normal step delay flow to complete`, - } - ); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); const endTime = Date.now(); const duration = (endTime - startTime) / 1000; - console.log(`Flow completed in ${duration}s`); assert(duration >= 3, 'Flow should take at least 3 seconds due to delayed step'); // Verify output - only the last step's output is returned @@ -249,7 +218,6 @@ Deno.test( for (const state of stepStates) { if (state.status === 'completed' && !stepCompletionTimes[state.step_slug]) { stepCompletionTimes[state.step_slug] = Date.now(); - console.log(`Step ${state.step_slug} completed at ${(Date.now() - startTime) / 1000}s`); } } @@ -268,8 +236,6 @@ Deno.test( const endTime = Date.now(); const totalDuration = (endTime - startTime) / 1000; - console.log(`Flow completed in ${totalDuration}s`); - // Verify timing // - First step: 1s delay // - Second step: starts after first completes + 2s delay @@ -289,10 +255,8 @@ Deno.test( } // Verify final output - only the last step's output is returned - const [finalRun] = await sql` - SELECT output FROM pgflow.runs WHERE run_id = ${flowRun.run_id}; - `; - + const finalRun = await getRunOutput(sql, flowRun.run_id); + assertEquals( finalRun.output, { @@ -320,9 +284,9 @@ Deno.test( .step({ slug: 'retryStep', startDelay: 3 - }, (_input) => { + }, async (_input) => { attemptCount++; - console.log(`Attempt ${attemptCount}`); + await delay(1); if (attemptCount === 1) { throw new Error('First attempt fails'); } @@ -348,35 +312,22 @@ Deno.test( await delay(4000); // Check that first attempt has failed - const [firstAttemptTask] = await sql` - SELECT status, attempts_count, error_message FROM pgflow.step_tasks + const firstAttemptTasks = await sql` + SELECT status, attempts_count, error_message FROM pgflow.step_tasks WHERE run_id = ${flowRun.run_id} AND step_slug = 'retryStep'; `; + const firstAttemptTask = firstAttemptTasks[0]; assertEquals(firstAttemptTask.status, 'queued', 'Task should be queued for retry'); assert(firstAttemptTask.attempts_count >= 1, 'Should have at least one attempt'); assert(firstAttemptTask.error_message?.includes('First attempt fails'), 'Should have error message'); // Wait for completion - const polledRun = await waitFor( - async () => { - const [run] = await sql` - SELECT * FROM pgflow.runs WHERE run_id = ${flowRun.run_id}; - `; - return run.status === 'completed' ? run : false; - }, - { - pollIntervalMs: 200, - timeoutMs: 10000, - description: `retry flow to complete`, - } - ); + const polledRun = await waitForRunCompletion(sql, flowRun.run_id); const endTime = Date.now(); const duration = (endTime - startTime) / 1000; - console.log(`Flow completed in ${duration}s`); - // Verify timing: initial delay (3s) + retry delay (1s baseDelay) assert(duration >= 4, 'Flow should take at least 4 seconds (3s initial + 1s retry)'); assert(duration < 7, 'Flow should not re-apply startDelay on retry'); diff --git a/pkgs/edge-worker/tests/integration/stepTaskExecutorContext.test.ts b/pkgs/edge-worker/tests/integration/stepTaskExecutorContext.test.ts index 339107e9b..8d9227978 100644 --- a/pkgs/edge-worker/tests/integration/stepTaskExecutorContext.test.ts +++ b/pkgs/edge-worker/tests/integration/stepTaskExecutorContext.test.ts @@ -56,6 +56,7 @@ Deno.test( msg_id: 123, run_id: 'test-run-id', step_slug: 'test_step', + task_index: 0, input: { run: { data: 'test data' } }, }; @@ -117,6 +118,7 @@ Deno.test( msg_id: 456, run_id: 'legacy_run_id', step_slug: 'legacy_step', + task_index: 0, input: { run: { value: 42 } }, }; @@ -183,6 +185,7 @@ Deno.test( msg_id: 789, run_id: 'raw_run_id', step_slug: 'check_raw', + task_index: 0, input: { run: {} }, }; @@ -242,6 +245,7 @@ Deno.test( msg_id: 999, run_id: 'supabase_run_id', step_slug: 'check_clients', + task_index: 0, input: { run: {} }, }; @@ -322,6 +326,7 @@ Deno.test( msg_id: 456, run_id: 'complex_run', step_slug: 'fetch_data', + task_index: 0, input: { run: { id: 123 } }, }; diff --git a/pkgs/example-flows/src/example-flow.ts b/pkgs/example-flows/src/example-flow.ts index 73f818f81..3f1fe2121 100644 --- a/pkgs/example-flows/src/example-flow.ts +++ b/pkgs/example-flows/src/example-flow.ts @@ -33,6 +33,7 @@ export const stepTaskRecord: StepTaskRecord = { flow_slug: 'example_flow', run_id: '123', step_slug: 'normalStep', + task_index: 0, input: { run: { value: 23 }, rootStep: { doubledValue: 23 },