diff --git a/.changeset/workflow-iteration.md b/.changeset/workflow-iteration.md new file mode 100644 index 0000000..386813b --- /dev/null +++ b/.changeset/workflow-iteration.md @@ -0,0 +1,5 @@ +--- +"@generata/core": minor +--- + +Add `each:` to `.step()` so a workflow step can run a sub-workflow once per item from a directory glob, JSON file, or function. Each iteration runs as a normal sub-workflow with its own runId. Per-iteration outputs are recorded in a manifest at `/.generata/loops/--.json`, surfaced downstream as `_manifest`. Supports `concurrency` (sequential by default), `onFailure: "halt" | "continue"`, an `onItemFail` agent for per-failure side effects, and `maxRetries` per iteration. New exports: `LoopWorkflowStep` and `EachSource` from `@generata/core`. diff --git a/packages/core/src/define.test.ts b/packages/core/src/define.test.ts index 21c9caf..fcfab92 100644 --- a/packages/core/src/define.test.ts +++ b/packages/core/src/define.test.ts @@ -1,5 +1,5 @@ import { describe, it } from "node:test"; -import { equal, ok } from "node:assert/strict"; +import { equal, ok, throws } from "node:assert/strict"; import { defineAgent, defineWorkflow, worktree } from "./define.js"; describe("defineWorkflow chain builder", () => { @@ -223,3 +223,144 @@ describe("defineWorkflow chain builder", () => { ok(threw, "expected empty workflow to throw"); }); }); + +describe("defineWorkflow .step() with sub-workflow", () => { + const stub = defineAgent({ + type: "worker", + description: "x", + modelTier: "light", + tools: [], + permissions: "full", + timeoutSeconds: 60, + promptContext: [], + prompt: () => "p", + }); + (stub as any).name = "stub"; + + const subWorkflow = defineWorkflow({ + description: "review one note", + required: ["file"], + }) + .step("read", stub) + .build(); + (subWorkflow as any).name = "review-note"; + + it("accepts a WorkflowDef as a step value with each.glob + as", () => { + const wf = defineWorkflow({ description: "outer" }) + .step("reviews", subWorkflow, { + each: { glob: "notes/*.md" }, + as: "file", + }) + .build(); + ok(wf.steps[0]); + equal(wf.steps[0].id, "reviews"); + ok("subWorkflow" in wf.steps[0]); + }); + + it("accepts each.json without as", () => { + const wf = defineWorkflow({ description: "outer" }) + .step("reviews", subWorkflow, { + each: { json: "tasks.json" }, + }) + .build(); + ok("subWorkflow" in wf.steps[0]); + }); + + it("accepts each.items without as", () => { + const wf = defineWorkflow({ description: "outer" }) + .step("reviews", subWorkflow, { + each: { items: () => [{ file: "a.md" }] }, + }) + .build(); + ok("subWorkflow" in wf.steps[0]); + }); + + it("rejects glob source without as", () => { + throws(() => + defineWorkflow({ description: "outer" }) + .step("reviews", subWorkflow, { + each: { glob: "notes/*.md" }, + } as never) + .build(), + ); + }); + + it("rejects concurrency: 0", () => { + throws(() => + defineWorkflow({ description: "outer" }) + .step("reviews", subWorkflow, { + each: { glob: "notes/*.md" }, + as: "file", + concurrency: 0, + }) + .build(), + ); + }); + + it("rejects missing each: when value is a workflow", () => { + throws(() => + defineWorkflow({ description: "outer" }) + .step("reviews", subWorkflow, {} as never) + .build(), + ); + }); + + it("threads dependsOn, maxRetries, onFailure, onItemFail through to the built step", () => { + const failHandler = defineAgent({ + type: "worker", + description: "h", + modelTier: "light", + tools: [], + permissions: "full", + timeoutSeconds: 60, + promptContext: [], + prompt: () => "p", + }); + (failHandler as any).name = "failHandler"; + + const wf = defineWorkflow({ description: "outer" }) + .step("setup", stub) + .step("reviews", subWorkflow, { + each: { glob: "notes/*.md" }, + as: "file", + concurrency: 4, + onFailure: "continue", + onItemFail: failHandler, + maxRetries: 2, + dependsOn: ["setup"], + }) + .build(); + const loopStep = wf.steps[1] as Record; + equal(loopStep.concurrency, 4); + equal(loopStep.onFailure, "continue"); + equal(loopStep.maxRetries, 2); + ok(Array.isArray(loopStep.dependsOn)); + equal((loopStep.dependsOn as string[])[0], "setup"); + ok(loopStep.onItemFail); + }); + + it("rejects factory-form agent passed bare as onItemFail", () => { + // Factory-form: defineAgent called with a callable, returns AgentCallable with kind: "agent" + const factory = defineAgent<{ x: string }>(({ x }) => ({ + type: "worker", + description: "f", + modelTier: "light", + tools: [], + permissions: "full", + timeoutSeconds: 60, + promptContext: [], + prompt: () => `f ${x}`, + })); + (factory as any).name = "factory"; + + throws(() => + defineWorkflow({ description: "outer" }) + .step("reviews", subWorkflow, { + each: { glob: "*.md" }, + as: "file", + onItemFail: factory as never, + }) + .build(), + ); + }); +}); diff --git a/packages/core/src/define.ts b/packages/core/src/define.ts index 43aaaa4..b97b888 100644 --- a/packages/core/src/define.ts +++ b/packages/core/src/define.ts @@ -161,6 +161,27 @@ type StepValue = | (AgentDef & { readonly [_factoryBrand]?: never }) | ((params: TParams) => StepInvocation>); +// Loop-form step options. Glob source requires `as:` (it streams strings); json +// and items sources can omit `as:` (each item is already an object record). +type LoopStepOptions = { + each: + | { glob: string } + | { json: string } + | { items: (b: BuiltinPromptArgs) => unknown[] | Promise }; + as?: string; + concurrency?: number; + onFailure?: "halt" | "continue"; + onItemFail?: + | (LLMAgentDef & { readonly [_factoryBrand]?: never }) + | ((params: Record & { error: string }) => StepInvocation); + maxRetries?: number; + dependsOn?: string[]; +}; + +// Loop-form step value: a workflow definition. Discriminated by `kind: "workflow"` +// (stamped by defineWorkflow().build() before the def reaches a consumer). +type LoopStepValue = WorkflowDef; + // Pulls the declared output keys back out of a step value's type. // - Bare AgentDef: the value's `outputs` field (if declared) carries the keys. // - stepFn: the StepInvocation it returns is generic on TOutputs (set by @@ -203,6 +224,13 @@ type InternalStep = { // Stored loose because either an object agent or a callable factory may be // passed; the engine narrows by `typeof === "function"` at rejection time. onReject?: LLMAgentDef | ((inputs: Record) => StepInvocation); + // Loop-form fields. Set when `value` was a WorkflowDef and `each:` was provided. + subWorkflow?: WorkflowDef; + each?: LoopStepOptions["each"]; + as?: string; + concurrency?: number; + onFailure?: "halt" | "continue"; + onItemFail?: LLMAgentDef | ((inputs: Record) => StepInvocation); }; type WorkflowConfigInput< @@ -220,12 +248,27 @@ type WorkflowConfigInput< // Each .step() returns a Builder with TPrior expanded by the new step's id and // TBaseParams extended with any `outputs` declared on the step's agent. The // agent's emit values reach downstream stepFns as named string params. +// +// Two overloads: +// - Agent-form (existing): bare agent or stepFn, threads declared `outputs` +// keys into downstream params. +// - Loop-form (new): WorkflowDef value with `each:`, emits `_manifest` +// into downstream params (a path to the per-item run manifest). +// +// Order matters: the agent-form overload must come first so TS picks it for +// non-WorkflowDef values; the loop-form overload requires `LoopStepOptions`, +// so a missing/empty options bag won't accidentally match it. export type WorkflowBuilder = { step>>( id: Id, value: V, options?: StepOptions>, ): WorkflowBuilder, TPrior | Id>; + step( + id: Id, + value: LoopStepValue, + options: LoopStepOptions, + ): WorkflowBuilder, TPrior | Id>; build(): WorkflowDef; }; @@ -247,13 +290,64 @@ export function defineWorkflow< const builder = { step( id: string, - value: AgentDef | ((p: Record) => StepInvocation), - options?: StepOptions, + value: AgentDef | WorkflowDef | ((p: Record) => StepInvocation), + options?: StepOptions | LoopStepOptions, ) { if (steps.some((s) => s.id === id)) { throw new Error(`defineWorkflow: duplicate step id '${id}'`); } - const internal: InternalStep = { id, ...options }; + // Loop-form: value is a built workflow (stamped kind: "workflow"). This + // branch must run before the agent-form branch since a WorkflowDef is + // also a plain object. AgentCallables are callable AND carry kind: + // "agent" (not "workflow"), so the `kind === "workflow"` check cleanly + // separates the two. + if ( + typeof value === "object" && + value !== null && + (value as { kind?: unknown }).kind === "workflow" + ) { + const opts = options as LoopStepOptions | undefined; + if (!opts || !opts.each) { + throw new Error( + `defineWorkflow: step '${id}' has a sub-workflow value but no 'each:' option`, + ); + } + if ("glob" in opts.each && !opts.as) { + throw new Error( + `defineWorkflow: step '${id}' has each.glob but no 'as:' (required for string items)`, + ); + } + if (opts.concurrency !== undefined && opts.concurrency < 1) { + throw new Error( + `defineWorkflow: step '${id}' concurrency must be a positive integer (got ${opts.concurrency})`, + ); + } + if ( + typeof opts.onItemFail === "function" && + (opts.onItemFail as { kind?: unknown }).kind === "agent" + ) { + const fnName = (opts.onItemFail as { name?: string }).name || ""; + throw new Error( + `Step '${id}': factory-form agent '${fnName}' cannot be passed bare as onItemFail. ` + + `Wrap it in a step fn: onItemFail: ({...}) => ${fnName}({...inputs})`, + ); + } + const internal: InternalStep = { + id, + subWorkflow: value as WorkflowDef, + each: opts.each, + as: opts.as, + concurrency: opts.concurrency ?? 1, + onFailure: opts.onFailure ?? "halt", + onItemFail: opts.onItemFail as InternalStep["onItemFail"], + maxRetries: opts.maxRetries, + dependsOn: opts.dependsOn, + }; + steps.push(internal); + return builder; + } + // Agent-form / stepFn-form (existing). + const internal: InternalStep = { id, ...(options as StepOptions) }; if (typeof value === "function") { // Factory-form agents are callable AND carry kind: "agent". Passing one // bare would skip the input mapping and run with a sentinel template. diff --git a/packages/core/src/engine.ts b/packages/core/src/engine.ts index 8471a17..04cb7f8 100644 --- a/packages/core/src/engine.ts +++ b/packages/core/src/engine.ts @@ -36,7 +36,8 @@ import { } from "./logger.js"; import { formatPrecheckReport, precheckWorkflow } from "./precheck.js"; import { resolveEnvProfile, type ResolvedEnv } from "./env-profile.js"; -import { resolveStepShape } from "./step-shape.js"; +import { isLoopStep, resolveStepShape } from "./step-shape.js"; +import { runLoopStep } from "./loop/runner.js"; // Workers signal a structural halt by leading their output with `STATUS: halt`. // The critic retry loop checks this to short-circuit retries that would re-hit @@ -99,6 +100,11 @@ function resolveStepForRun( params: Record, stepOutputs: Record, ): { agent: LLMAgentDef; args: Record } { + if (isLoopStep(step)) { + throw new Error( + `resolveStepForRun called on loop step '${step.id}' - loop step execution is not yet wired in the engine`, + ); + } if ("stepFn" in step) { const stringParams: StepParams = Object.fromEntries( Object.entries({ ...params, ...stepOutputs }).map(([k, v]) => [k, String(v)]), @@ -168,7 +174,11 @@ export async function runWorkflow( // Precheck confirmed env keys are resolvable; now materialise them. const requiredEnvKeys = [ - ...new Set(workflow.steps.flatMap((s) => resolveStepShape(s).agent.envKeys ?? [])), + ...new Set( + workflow.steps + .filter((s) => !isLoopStep(s)) + .flatMap((s) => resolveStepShape(s).agent.envKeys ?? []), + ), ]; const resolvedEnv: ResolvedEnv = resolveEnvProfile(requiredEnvKeys, profile); @@ -284,6 +294,87 @@ export async function runWorkflow( runnable.map(async (step) => { pending.delete(step.id); + if (isLoopStep(step)) { + const stepIndex = workflow.steps.findIndex((s) => s.id === step.id) + 1; + logStepStart(stepIndex, totalSteps, step.id); + const { today: _t, time: _ti } = getTodayAndTime(); + const startedAt = new Date().toISOString(); + const startTs = Date.now(); + // builtins.work_dir uses executionRoot (worktree-aware path) so the loop's + // source materialisation reads from inside the worktree. The runWorkflow + // call below receives the OUTER workDir so the sub-workflow's own isolation + // declaration anchors at the user-supplied root, not nested-inside-a-worktree + // (git doesn't support nested worktrees anyway). + const result = await runLoopStep( + { + outerWorkflowName: workflow.name, + outerRunId: workflowId, + step: { + id: step.id, + // schema validates subWorkflow shape via runtime refine, but the + // type is `unknown` to avoid a TS circular reference. Cast here. + subWorkflow: step.subWorkflow as WorkflowDef, + each: step.each, + as: step.as, + concurrency: step.concurrency, + onFailure: step.onFailure, + onItemFail: step.onItemFail, + maxRetries: step.maxRetries, + }, + outerParams: params, + builtins: { work_dir: executionRoot, today: _t, time: _ti }, + config, + workDir, + }, + { + // Forward engine deps (e.g. test-mocked runAgent) so the + // sub-workflow honours the same wiring. Without this, the + // outer call's mock runAgent is dropped on entry to the + // sub-run and the real CLI is invoked. + runWorkflow: (subWf, subParams, subConfig, subWorkDir) => + runWorkflow(subWf, subParams, subConfig, subWorkDir, promptLogFile, deps), + }, + ); + const completedAt = new Date().toISOString(); + const durationMs = Date.now() - startTs; + params = { ...params, [`${step.id}_manifest`]: result.manifest_path }; + stepOutputs[step.id] = result.manifest_path; + logStepDone( + step.id, + durationMs, + 0, // estimated cost + "", // model label + undefined, // verdict + false, // costWasReported + 0, // tokens + config.showPricing, + ); + stepResults.push({ + stepId: step.id, + output: result.manifest_path, + metrics: { + agent: "", + model: "", + model_tier: "", + workflow_id: workflow.name, + step_id: step.id, + started_at: startedAt, + completed_at: completedAt, + duration_ms: durationMs, + input_tokens: 0, + output_tokens: 0, + cache_read_tokens: 0, + cache_write_tokens: 0, + estimated_cost_usd: 0, + cost_was_reported: false, + status: "success", + exit_code: 0, + }, + }); + completed.add(step.id); + return; + } + const stepIndex = workflow.steps.findIndex((s) => s.id === step.id) + 1; const resolved = resolveStepForRun(step, params, stepOutputs); diff --git a/packages/core/src/loop-integration.test.ts b/packages/core/src/loop-integration.test.ts new file mode 100644 index 0000000..b44c246 --- /dev/null +++ b/packages/core/src/loop-integration.test.ts @@ -0,0 +1,124 @@ +import { equal, ok } from "node:assert/strict"; +import { mkdtempSync, readdirSync, readFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { describe, it } from "node:test"; +import type { RunOptions, RunResult } from "./agent-runner.js"; +import { defineAgent, defineWorkflow } from "./define.js"; +import { runWorkflow } from "./engine.js"; +import type { GlobalConfig } from "./schema.js"; + +const cfg: GlobalConfig = { + modelTiers: { heavy: "h", standard: "s", light: "l" }, + workDir: "", + agentsDir: "agents", + metricsDir: "metrics", + logsDir: "logs", + notifications: false, + logPrompts: false, + showPricing: false, + showWeeklyMetrics: false, + verboseOutput: false, + maxCriticRetries: 3, +}; + +describe("loop integration", () => { + it("runs a sub-workflow per item and surfaces the manifest path", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-int-")); + try { + const reviewer = defineAgent({ + type: "worker", + description: "reviewer", + modelTier: "light", + tools: [], + permissions: "full", + timeoutSeconds: 60, + promptContext: [], + prompt: ({ file }) => `review ${file}`, + }); + (reviewer as { name: string }).name = "reviewer"; + + const reviewNote = defineWorkflow({ + description: "review one note", + required: ["file"], + }) + .step("read", reviewer) + .build(); + (reviewNote as { name: string }).name = "review-note"; + + const summariser = defineAgent({ + type: "worker", + description: "summariser", + modelTier: "light", + tools: [], + permissions: "full", + timeoutSeconds: 60, + promptContext: [], + prompt: ({ manifest_path }) => `summarise ${manifest_path}`, + }); + (summariser as { name: string }).name = "summariser"; + + const outer = defineWorkflow({ description: "outer" }) + .step("reviews", reviewNote, { + each: { items: () => ["a.md", "b.md"] }, + as: "file", + }) + .step("summary", ({ reviews_manifest }) => ({ + kind: "step-invocation" as const, + agent: summariser, + args: { manifest_path: reviews_manifest }, + })) + .build(); + (outer as { name: string }).name = "outer"; + + const calls: string[] = []; + const fakeRunAgent = async (opts: RunOptions): Promise => { + calls.push(`${opts.agent.name}:${JSON.stringify(opts.args)}`); + const now = new Date().toISOString(); + return { + output: "done", + metrics: { + agent: opts.agent.name, + model: "fake", + model_tier: "light", + workflow_id: null, + step_id: null, + started_at: now, + completed_at: now, + duration_ms: 1, + input_tokens: 1, + output_tokens: 1, + cache_read_tokens: 0, + cache_write_tokens: 0, + estimated_cost_usd: 0, + cost_was_reported: false, + status: "success", + exit_code: 0, + }, + }; + }; + + const result = await runWorkflow(outer, {}, cfg, dir, undefined, { + runAgent: fakeRunAgent, + }); + + equal(result.success, true); + // Two reviewer calls (one per item) + one summariser call. + equal(calls.filter((c) => c.startsWith("reviewer:")).length, 2); + equal(calls.filter((c) => c.startsWith("summariser:")).length, 1); + + const manifestDir = join(dir, ".generata", "loops"); + const files = readdirSync(manifestDir); + equal(files.length, 1); + const manifest = JSON.parse(readFileSync(join(manifestDir, files[0]), "utf8")); + equal(manifest.items.length, 2); + equal(manifest.items[0].vars.file, "a.md"); + equal(manifest.items[1].vars.file, "b.md"); + + // Summariser saw the manifest path. + ok(calls[2].includes(".generata/loops/")); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/core/src/loop/binding.test.ts b/packages/core/src/loop/binding.test.ts new file mode 100644 index 0000000..9266548 --- /dev/null +++ b/packages/core/src/loop/binding.test.ts @@ -0,0 +1,95 @@ +import { describe, it } from "node:test"; +import { deepEqual } from "node:assert/strict"; +import { bindItems } from "./binding.js"; + +describe("bindItems", () => { + it("binds string items under the as: name", () => { + const result = bindItems(["a.md", "b.md"], { as: "file", required: ["file"] }); + deepEqual(result.errors, []); + deepEqual(result.vars, [{ file: "a.md" }, { file: "b.md" }]); + }); + + it("rejects string items when as: is missing", () => { + const result = bindItems(["a.md"], { as: undefined, required: ["file"] }); + deepEqual(result.vars, []); + deepEqual(result.errors.length, 1); + deepEqual(result.errors[0].includes("as:"), true); + }); + + it("spreads object item keys into the params bag", () => { + const result = bindItems( + [ + { id: "1", title: "Foo" }, + { id: "2", title: "Bar" }, + ], + { as: undefined, required: ["id", "title"] }, + ); + deepEqual(result.errors, []); + deepEqual(result.vars, [ + { id: "1", title: "Foo" }, + { id: "2", title: "Bar" }, + ]); + }); + + it("auto-stringifies number, boolean, and null values", () => { + const result = bindItems([{ id: 1, ok: true, note: null }], { + as: undefined, + required: ["id", "ok", "note"], + }); + deepEqual(result.errors, []); + deepEqual(result.vars, [{ id: "1", ok: "true", note: "null" }]); + }); + + it("JSON-stringifies nested object/array values", () => { + const result = bindItems([{ id: "1", tags: ["a", "b"], meta: { k: "v" } }], { + as: undefined, + required: ["id", "tags", "meta"], + }); + deepEqual(result.errors, []); + deepEqual(result.vars, [{ id: "1", tags: '["a","b"]', meta: '{"k":"v"}' }]); + }); + + it("rejects object items when as: is set", () => { + const result = bindItems([{ id: "1" }], { as: "task", required: ["id"] }); + deepEqual(result.vars, []); + deepEqual(result.errors.length, 1); + deepEqual(result.errors[0].includes("as:"), true); + }); + + it("rejects mixed-shape arrays (string + object)", () => { + const result = bindItems(["a.md", { id: "1" }] as unknown[], { + as: "file", + required: ["file"], + }); + deepEqual(result.vars, []); + deepEqual(result.errors.length, 1); + deepEqual(result.errors[0].includes("mixed"), true); + }); + + it("reports missing required keys per offending item index", () => { + const result = bindItems([{ id: "1", title: "Foo" }, { id: "2" }], { + as: undefined, + required: ["id", "title"], + }); + deepEqual(result.vars, []); + deepEqual(result.errors.length, 1); + deepEqual(result.errors[0].includes("index 1"), true); + deepEqual(result.errors[0].includes("title"), true); + }); + + it("treats undefined values as missing required keys", () => { + const result = bindItems([{ id: "1", title: undefined }], { + as: undefined, + required: ["id", "title"], + }); + deepEqual(result.vars, []); + deepEqual(result.errors.length, 1); + deepEqual(result.errors[0].includes("title"), true); + }); + + it("returns empty vars and no errors for empty input", () => { + const result = bindItems([], { as: "file", required: ["file"] }); + deepEqual(result.vars, []); + deepEqual(result.errors, []); + }); +}); diff --git a/packages/core/src/loop/binding.ts b/packages/core/src/loop/binding.ts new file mode 100644 index 0000000..597c066 --- /dev/null +++ b/packages/core/src/loop/binding.ts @@ -0,0 +1,81 @@ +export interface BindOptions { + as: string | undefined; + required: readonly string[]; +} + +export interface BindResult { + vars: Record[]; + errors: string[]; +} + +function isPlainObject(v: unknown): v is Record { + return typeof v === "object" && v !== null && !Array.isArray(v); +} + +function stringifyValue(v: unknown): string { + if (v === null) return "null"; + if (v === undefined) return ""; + if (typeof v === "string") return v; + if (typeof v === "number" || typeof v === "boolean") return String(v); + // Date, RegExp, class instances etc. fall through to JSON.stringify - users + // who need ISO dates or custom shapes should pre-convert before passing items in. + return JSON.stringify(v); +} + +export function bindItems(items: unknown[], options: BindOptions): BindResult { + if (items.length === 0) return { vars: [], errors: [] }; + + const allStrings = items.every((i) => typeof i === "string"); + const allObjects = items.every(isPlainObject); + + if (!allStrings && !allObjects) { + return { + vars: [], + errors: ["each: items must be all strings or all objects (mixed shapes are not allowed)"], + }; + } + + if (allStrings) { + if (!options.as) { + return { + vars: [], + errors: ["each: string items require an `as:` option to name the binding"], + }; + } + const name = options.as; + const vars = (items as string[]).map((s) => ({ [name]: s })); + const missing = options.required.filter((r) => r !== name); + if (missing.length > 0) { + return { + vars: [], + errors: [ + `each: sub-workflow requires [${options.required.join(", ")}] but only '${name}' is bound`, + ], + }; + } + return { vars, errors: [] }; + } + + // allObjects + if (options.as) { + return { + vars: [], + errors: ["each: object items must not use `as:` (keys spread into the params bag)"], + }; + } + const vars: Record[] = []; + const errors: string[] = []; + for (let i = 0; i < items.length; i++) { + const obj = items[i] as Record; + const missing = options.required.filter((r) => !Object.hasOwn(obj, r) || obj[r] === undefined); + if (missing.length > 0) { + errors.push(`each: item at index ${i} missing required keys: [${missing.join(", ")}]`); + continue; + } + const bag: Record = {}; + for (const [k, v] of Object.entries(obj)) bag[k] = stringifyValue(v); + vars.push(bag); + } + if (errors.length > 0) return { vars: [], errors }; + return { vars, errors: [] }; +} diff --git a/packages/core/src/loop/manifest.test.ts b/packages/core/src/loop/manifest.test.ts new file mode 100644 index 0000000..fd9d1cf --- /dev/null +++ b/packages/core/src/loop/manifest.test.ts @@ -0,0 +1,85 @@ +import { describe, it } from "node:test"; +import { deepEqual, equal } from "node:assert/strict"; +import { mkdtempSync, readFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { loopManifestPath, writeManifest, type LoopManifest } from "./manifest.js"; + +describe("loopManifestPath", () => { + it("composes /.generata/loops/--.json", () => { + const path = loopManifestPath("/proj", "review-and-ship", "reviews", "20260504-103201"); + equal(path, "/proj/.generata/loops/review-and-ship-reviews-20260504-103201.json"); + }); + + it("flattens slashes in workflow names (mirrors prompt-log behaviour)", () => { + const path = loopManifestPath("/proj", "core/review-and-ship", "reviews", "20260504"); + equal(path, "/proj/.generata/loops/core-review-and-ship-reviews-20260504.json"); + }); +}); + +describe("writeManifest", () => { + it("writes the manifest as pretty-printed JSON and creates the dir", () => { + const dir = mkdtempSync(join(tmpdir(), "loop-manifest-")); + try { + const path = join(dir, ".generata", "loops", "wf-step-rid.json"); + const manifest: LoopManifest = { + workflow: "wf", + step: "step", + subWorkflow: "sub", + runId: "rid", + startedAt: "2026-05-04T10:00:00Z", + finishedAt: "2026-05-04T10:01:00Z", + source: { kind: "glob", spec: "*.md", count: 1 }, + concurrency: 1, + onFailure: "halt", + items: [ + { + index: 0, + vars: { file: "a.md" }, + status: "ok", + runId: "rid-0", + outputs: { read: "a.review.md" }, + }, + ], + }; + writeManifest(path, manifest); + const round = JSON.parse(readFileSync(path, "utf8")); + deepEqual(round, manifest); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("round-trips a failed item with error + attempts", () => { + const dir = mkdtempSync(join(tmpdir(), "loop-manifest-")); + try { + const path = join(dir, ".generata", "loops", "wf-step-rid.json"); + const manifest: LoopManifest = { + workflow: "wf", + step: "step", + subWorkflow: "sub", + runId: "rid", + startedAt: "2026-05-04T10:00:00Z", + finishedAt: "2026-05-04T10:01:00Z", + source: { kind: "items", spec: "", count: 1 }, + concurrency: 1, + onFailure: "continue", + items: [ + { + index: 0, + vars: { id: "1" }, + status: "failed", + runId: "rid-0", + error: "rate limit exceeded", + attempts: 3, + }, + ], + }; + writeManifest(path, manifest); + const round = JSON.parse(readFileSync(path, "utf8")); + deepEqual(round, manifest); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/core/src/loop/manifest.ts b/packages/core/src/loop/manifest.ts new file mode 100644 index 0000000..b19dd38 --- /dev/null +++ b/packages/core/src/loop/manifest.ts @@ -0,0 +1,48 @@ +import { mkdirSync, writeFileSync } from "node:fs"; +import { dirname, resolve } from "node:path"; + +export interface LoopManifestItem { + index: number; + vars: Record; + status: "ok" | "failed"; + runId: string; + outputs?: Record; + error?: string; + attempts?: number; +} + +export interface LoopManifest { + workflow: string; + step: string; + subWorkflow: string; + runId: string; + startedAt: string; + finishedAt: string; + source: { kind: "glob" | "json" | "items"; spec: string; count: number }; + concurrency: number; + onFailure: "halt" | "continue"; + items: LoopManifestItem[]; +} + +function flatten(s: string): string { + return s.replace(/\//g, "-"); +} + +export function loopManifestPath( + workDir: string, + workflowName: string, + stepId: string, + runId: string, +): string { + return resolve( + workDir, + ".generata", + "loops", + `${flatten(workflowName)}-${flatten(stepId)}-${flatten(runId)}.json`, + ); +} + +export function writeManifest(path: string, manifest: LoopManifest): void { + mkdirSync(dirname(path), { recursive: true }); + writeFileSync(path, JSON.stringify(manifest, null, 2) + "\n", "utf8"); +} diff --git a/packages/core/src/loop/runner.test.ts b/packages/core/src/loop/runner.test.ts new file mode 100644 index 0000000..d16d2e6 --- /dev/null +++ b/packages/core/src/loop/runner.test.ts @@ -0,0 +1,302 @@ +import { describe, it } from "node:test"; +import { deepEqual, equal, ok, rejects } from "node:assert/strict"; +import { mkdtempSync, readFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { runLoopStep, type LoopStepInput } from "./runner.js"; +import type { WorkflowDef, GlobalConfig } from "../schema.js"; + +const minimalConfig: GlobalConfig = { + modelTiers: { heavy: "h", standard: "s", light: "l" }, + workDir: "", + agentsDir: "agents", + metricsDir: "metrics", + logsDir: "logs", + notifications: false, + logPrompts: false, + showPricing: false, + showWeeklyMetrics: false, + verboseOutput: false, + maxCriticRetries: 3, +}; + +function fakeSubWorkflow(name: string, required: string[] = []): WorkflowDef { + return { + kind: "workflow", + name, + description: "x", + required, + variables: {}, + isolation: "none", + steps: [ + { + id: "noop", + agent: { type: "worker", name: "n", kind: "agent" } as never, + }, + ], + } as never; +} + +describe("runLoopStep", () => { + it("runs the sub-workflow once per item with bound vars and writes the manifest", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-runner-")); + try { + const calls: Record[] = []; + const fakeRun = async (_wf: WorkflowDef, params: Record) => { + calls.push(params); + return { + workflowName: "sub", + steps: [], + success: true, + totalCost: 0, + totalTokens: 0, + costWasReported: false, + durationMs: 1, + }; + }; + const input: LoopStepInput = { + outerWorkflowName: "outer", + outerRunId: "run-1", + step: { + id: "reviews", + subWorkflow: fakeSubWorkflow("sub", ["file"]), + each: { items: () => ["a.md", "b.md"] }, + as: "file", + concurrency: 1, + onFailure: "halt", + }, + outerParams: {}, + builtins: { work_dir: dir, today: "2026-05-04", time: "10:00:00" }, + config: minimalConfig, + workDir: dir, + }; + const result = await runLoopStep(input, { runWorkflow: fakeRun }); + ok(result.manifest_path.endsWith("outer-reviews-run-1.json")); + const manifest = JSON.parse(readFileSync(result.manifest_path, "utf8")); + equal(manifest.items.length, 2); + equal(manifest.items[0].vars.file, "a.md"); + equal(manifest.items[1].vars.file, "b.md"); + equal(manifest.items[0].status, "ok"); + deepEqual( + calls.map((c) => c.file), + ["a.md", "b.md"], + ); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("under onFailure='halt', stops at the first failed iteration and propagates the error", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-runner-")); + try { + let invocations = 0; + const fakeRun = async () => { + invocations++; + throw new Error(`boom on call ${invocations}`); + }; + const input: LoopStepInput = { + outerWorkflowName: "outer", + outerRunId: "run-2", + step: { + id: "reviews", + subWorkflow: fakeSubWorkflow("sub", ["file"]), + each: { items: () => ["a.md", "b.md", "c.md"] }, + as: "file", + concurrency: 1, + onFailure: "halt", + }, + outerParams: {}, + builtins: { work_dir: dir, today: "2026-05-04", time: "10:00:00" }, + config: minimalConfig, + workDir: dir, + }; + await rejects(() => runLoopStep(input, { runWorkflow: fakeRun }), /boom/); + equal(invocations, 1); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("under onFailure='continue', records failures and runs all iterations", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-runner-")); + try { + let n = 0; + const fakeRun = async () => { + n++; + if (n === 2) throw new Error("boom on 2"); + return { + workflowName: "sub", + steps: [], + success: true, + totalCost: 0, + totalTokens: 0, + costWasReported: false, + durationMs: 1, + }; + }; + const input: LoopStepInput = { + outerWorkflowName: "outer", + outerRunId: "run-3", + step: { + id: "reviews", + subWorkflow: fakeSubWorkflow("sub", ["file"]), + each: { items: () => ["a.md", "b.md", "c.md"] }, + as: "file", + concurrency: 1, + onFailure: "continue", + }, + outerParams: {}, + builtins: { work_dir: dir, today: "2026-05-04", time: "10:00:00" }, + config: minimalConfig, + workDir: dir, + }; + const result = await runLoopStep(input, { runWorkflow: fakeRun }); + const manifest = JSON.parse(readFileSync(result.manifest_path, "utf8")); + equal(manifest.items.length, 3); + equal(manifest.items[0].status, "ok"); + equal(manifest.items[1].status, "failed"); + equal(manifest.items[2].status, "ok"); + ok(manifest.items[1].error.includes("boom on 2")); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("preserves source order in manifest with concurrency > 1", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-runner-")); + try { + const delays: Record = { "a.md": 30, "b.md": 5, "c.md": 15 }; + const fakeRun = async (_wf: WorkflowDef, params: Record) => { + await new Promise((r) => setTimeout(r, delays[params.file as string])); + return { + workflowName: "sub", + steps: [], + success: true, + totalCost: 0, + totalTokens: 0, + costWasReported: false, + durationMs: 1, + }; + }; + const input: LoopStepInput = { + outerWorkflowName: "outer", + outerRunId: "run-4", + step: { + id: "reviews", + subWorkflow: fakeSubWorkflow("sub", ["file"]), + each: { items: () => ["a.md", "b.md", "c.md"] }, + as: "file", + concurrency: 3, + onFailure: "halt", + }, + outerParams: {}, + builtins: { work_dir: dir, today: "2026-05-04", time: "10:00:00" }, + config: minimalConfig, + workDir: dir, + }; + const result = await runLoopStep(input, { runWorkflow: fakeRun }); + const manifest = JSON.parse(readFileSync(result.manifest_path, "utf8")); + deepEqual( + manifest.items.map((i: { vars: { file: string } }) => i.vars.file), + ["a.md", "b.md", "c.md"], + ); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("aborts at loop start if items violate binding rules", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-runner-")); + try { + const fakeRun = async () => { + throw new Error("should not be called"); + }; + const input: LoopStepInput = { + outerWorkflowName: "outer", + outerRunId: "run-5", + step: { + id: "reviews", + subWorkflow: fakeSubWorkflow("sub", ["file"]), + each: { items: () => ["a.md", { id: "1" }] as unknown[] }, + as: "file", + concurrency: 1, + onFailure: "halt", + }, + outerParams: {}, + builtins: { work_dir: dir, today: "2026-05-04", time: "10:00:00" }, + config: minimalConfig, + workDir: dir, + }; + await rejects(() => runLoopStep(input, { runWorkflow: fakeRun }), /mixed/); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("propagates the FIRST failure under concurrency > 1 with halt", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-runner-")); + try { + const delays: Record = { "a.md": 5, "b.md": 15, "c.md": 25 }; + const errors: Record = { + "a.md": "first-fail", + "b.md": "second-fail", + "c.md": "third-fail", + }; + const fakeRun = async (_wf: WorkflowDef, params: Record) => { + await new Promise((r) => setTimeout(r, delays[params.file as string])); + throw new Error(errors[params.file as string]); + }; + const input: LoopStepInput = { + outerWorkflowName: "outer", + outerRunId: "run-6", + step: { + id: "reviews", + subWorkflow: fakeSubWorkflow("sub", ["file"]), + each: { items: () => ["a.md", "b.md", "c.md"] }, + as: "file", + concurrency: 3, + onFailure: "halt", + }, + outerParams: {}, + builtins: { work_dir: dir, today: "2026-05-04", time: "10:00:00" }, + config: minimalConfig, + workDir: dir, + }; + await rejects(() => runLoopStep(input, { runWorkflow: fakeRun }), /first-fail/); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("rejects bare-agent onItemFail with a clear error", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-runner-")); + try { + const fakeRun = async () => { + throw new Error("iteration failed"); + }; + const input: LoopStepInput = { + outerWorkflowName: "outer", + outerRunId: "run-7", + step: { + id: "reviews", + subWorkflow: fakeSubWorkflow("sub", ["file"]), + each: { items: () => ["a.md"] }, + as: "file", + concurrency: 1, + onFailure: "continue", + onItemFail: { type: "worker", name: "h", kind: "agent" } as never, + }, + outerParams: {}, + builtins: { work_dir: dir, today: "2026-05-04", time: "10:00:00" }, + config: minimalConfig, + workDir: dir, + }; + await rejects( + () => runLoopStep(input, { runWorkflow: fakeRun }), + /onItemFail bare-agent form is not yet supported/, + ); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/core/src/loop/runner.ts b/packages/core/src/loop/runner.ts new file mode 100644 index 0000000..00543e2 --- /dev/null +++ b/packages/core/src/loop/runner.ts @@ -0,0 +1,174 @@ +import type { + BuiltinPromptArgs, + GlobalConfig, + WorkflowDef, + LLMAgentDef, + EachSource, +} from "../schema.js"; +import type { WorkflowResult } from "../engine.js"; +import { materialiseSource } from "./sources.js"; +import { bindItems } from "./binding.js"; +import { + loopManifestPath, + writeManifest, + type LoopManifest, + type LoopManifestItem, +} from "./manifest.js"; + +export interface LoopStep { + id: string; + subWorkflow: WorkflowDef; + each: EachSource; + as: string | undefined; + concurrency: number; + onFailure: "halt" | "continue"; + onItemFail?: LLMAgentDef | ((inputs: Record & { error: string }) => unknown); + maxRetries?: number; +} + +export interface LoopStepInput { + outerWorkflowName: string; + outerRunId: string; + step: LoopStep; + outerParams: Record; + builtins: BuiltinPromptArgs; + config: GlobalConfig; + workDir: string; +} + +export interface LoopRunnerDeps { + runWorkflow: ( + workflow: WorkflowDef, + params: Record, + config: GlobalConfig, + workDir: string, + ) => Promise; +} + +function sourceKind(source: EachSource): "glob" | "json" | "items" { + if ("glob" in source) return "glob"; + if ("json" in source) return "json"; + return "items"; +} + +function sourceSpec(source: EachSource): string { + if ("glob" in source) return source.glob; + if ("json" in source) return source.json; + return ""; +} + +export async function runLoopStep( + input: LoopStepInput, + deps: LoopRunnerDeps, +): Promise<{ manifest_path: string }> { + const { step, outerParams, builtins, config, workDir, outerWorkflowName, outerRunId } = input; + const startedAt = new Date().toISOString(); + + const items = await materialiseSource(step.each, builtins); + + const required = step.subWorkflow.required ?? []; + const binding = bindItems(items, { as: step.as, required }); + if (binding.errors.length > 0) { + throw new Error(`Loop step '${step.id}': ${binding.errors.join("; ")}`); + } + + const total = binding.vars.length; + const manifestItems: LoopManifestItem[] = Array.from({ length: total }); + let halted = false; + let haltError: Error | undefined; + + // Worker pool: pull from a shared index counter; each worker awaits one + // sub-run at a time. Source order is preserved in the manifest via the + // index field, regardless of completion order. + let next = 0; + const launchOne = async (): Promise => { + while (true) { + if (halted) return; + const i = next++; + if (i >= total) return; + const vars = binding.vars[i]; + const subRunId = `${outerRunId}-${step.id}-${i}`; + let attempts = 0; + const maxAttempts = (step.maxRetries ?? 0) + 1; + // eslint-disable-next-line no-constant-condition + while (true) { + attempts++; + try { + const result = await deps.runWorkflow( + step.subWorkflow, + { ...outerParams, ...vars }, + config, + workDir, + ); + const outputs: Record = {}; + for (const s of result.steps) outputs[s.stepId] = s.output; + manifestItems[i] = { + index: i, + vars, + status: "ok", + runId: subRunId, + outputs, + }; + break; + } catch (err) { + if (attempts < maxAttempts) continue; + const msg = err instanceof Error ? err.message : String(err); + manifestItems[i] = { + index: i, + vars, + status: "failed", + runId: subRunId, + error: msg, + attempts, + }; + if (step.onItemFail) { + if (typeof step.onItemFail === "function") { + const fn = step.onItemFail; + try { + await fn({ ...vars, error: msg }); + } catch (handlerErr) { + console.warn( + `Loop step '${step.id}': onItemFail handler threw: ${handlerErr instanceof Error ? handlerErr.message : String(handlerErr)}`, + ); + } + } else { + throw new Error( + `Loop step '${step.id}': onItemFail bare-agent form is not yet supported by the runner - wrap in a step fn`, + ); + } + } + if (step.onFailure === "halt") { + halted = true; + if (!haltError) haltError = err as Error; + } + break; + } + } + } + }; + + const concurrency = Math.min(step.concurrency, Math.max(total, 1)); + const workers = Array.from({ length: concurrency }, () => launchOne()); + await Promise.all(workers); + + const finishedAt = new Date().toISOString(); + // Compact the array: slots remain `undefined` if halt fired before they ran. + const items_out = manifestItems.filter((m) => m !== undefined); + const manifest: LoopManifest = { + workflow: outerWorkflowName, + step: step.id, + subWorkflow: step.subWorkflow.name, + runId: outerRunId, + startedAt, + finishedAt, + source: { kind: sourceKind(step.each), spec: sourceSpec(step.each), count: total }, + concurrency, + onFailure: step.onFailure, + items: items_out, + }; + const path = loopManifestPath(workDir, outerWorkflowName, step.id, outerRunId); + writeManifest(path, manifest); + + if (haltError) throw haltError; + return { manifest_path: path }; +} diff --git a/packages/core/src/loop/schema.test.ts b/packages/core/src/loop/schema.test.ts new file mode 100644 index 0000000..86eb2bb --- /dev/null +++ b/packages/core/src/loop/schema.test.ts @@ -0,0 +1,112 @@ +import { describe, it } from "node:test"; +import { equal, ok, throws } from "node:assert/strict"; +import { LoopWorkflowStep, WorkflowStep } from "../schema.js"; + +const minimalSubWorkflow = { + kind: "workflow", + name: "sub", + description: "x", + required: ["file"], + variables: {}, + isolation: "none", + steps: [{ id: "noop", agent: { type: "worker", name: "n", kind: "agent" } }], +} as never; + +describe("LoopWorkflowStep", () => { + it("accepts a minimal config with glob source and as", () => { + const parsed = LoopWorkflowStep.parse({ + id: "reviews", + subWorkflow: minimalSubWorkflow, + each: { glob: "notes/*.md" }, + as: "file", + }); + equal(parsed.id, "reviews"); + equal(parsed.concurrency, 1); + equal(parsed.onFailure, "halt"); + }); + + it("accepts json source without as", () => { + const parsed = LoopWorkflowStep.parse({ + id: "drafts", + subWorkflow: minimalSubWorkflow, + each: { json: "tasks.json" }, + }); + ok(!("as" in parsed) || parsed.as === undefined); + }); + + it("rejects missing each", () => { + throws(() => + LoopWorkflowStep.parse({ + id: "x", + subWorkflow: minimalSubWorkflow, + }), + ); + }); + + it("rejects concurrency: 0", () => { + throws(() => + LoopWorkflowStep.parse({ + id: "x", + subWorkflow: minimalSubWorkflow, + each: { glob: "*.md" }, + as: "f", + concurrency: 0, + }), + ); + }); + + it("rejects glob source without as: at parse time", () => { + throws(() => + LoopWorkflowStep.parse({ + id: "x", + subWorkflow: minimalSubWorkflow, + each: { glob: "*.md" }, + }), + ); + }); + + it("rejects empty as: (min(1) constraint)", () => { + throws(() => + LoopWorkflowStep.parse({ + id: "x", + subWorkflow: minimalSubWorkflow, + each: { glob: "*.md" }, + as: "", + }), + ); + }); + + it("accepts onFailure: continue", () => { + const parsed = LoopWorkflowStep.parse({ + id: "x", + subWorkflow: minimalSubWorkflow, + each: { glob: "*.md" }, + as: "f", + onFailure: "continue", + }); + equal(parsed.onFailure, "continue"); + }); + + it("rejects subWorkflow with kind: agent (must be a workflow)", () => { + throws(() => + LoopWorkflowStep.parse({ + id: "x", + subWorkflow: { kind: "agent", name: "n", type: "worker" } as never, + each: { glob: "*.md" }, + as: "f", + }), + ); + }); +}); + +describe("WorkflowStep union", () => { + it("accepts a LoopWorkflowStep value", () => { + const parsed = WorkflowStep.parse({ + id: "reviews", + subWorkflow: minimalSubWorkflow, + each: { glob: "*.md" }, + as: "file", + }); + equal(parsed.id, "reviews"); + }); +}); diff --git a/packages/core/src/loop/sources.test.ts b/packages/core/src/loop/sources.test.ts new file mode 100644 index 0000000..a5156e5 --- /dev/null +++ b/packages/core/src/loop/sources.test.ts @@ -0,0 +1,126 @@ +import { describe, it } from "node:test"; +import { deepEqual, rejects } from "node:assert/strict"; +import { mkdtempSync, mkdirSync, writeFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { materialiseSource } from "./sources.js"; + +const builtins = { work_dir: "/tmp", today: "2026-05-04", time: "10:00:00" }; + +describe("materialiseSource", () => { + describe("glob", () => { + it("returns matched files lex-sorted by full path", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-sources-")); + try { + writeFileSync(join(dir, "zebra.md"), ""); + writeFileSync(join(dir, "Apple.md"), ""); + writeFileSync(join(dir, "banana.md"), ""); + const result = await materialiseSource( + { glob: `${dir}/*.md` }, + { ...builtins, work_dir: dir }, + ); + deepEqual(result, [`${dir}/Apple.md`, `${dir}/banana.md`, `${dir}/zebra.md`]); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("returns empty array when nothing matches", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-sources-")); + try { + const result = await materialiseSource( + { glob: `${dir}/*.md` }, + { ...builtins, work_dir: dir }, + ); + deepEqual(result, []); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("sorts subdirectory paths after sibling files at the same prefix", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-sources-")); + try { + writeFileSync(join(dir, "banana.md"), ""); + mkdirSync(join(dir, "sub")); + writeFileSync(join(dir, "sub", "alpha.md"), ""); + const result = await materialiseSource( + { glob: `${dir}/**/*.md` }, + { ...builtins, work_dir: dir }, + ); + deepEqual(result, [`${dir}/banana.md`, `${dir}/sub/alpha.md`]); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + }); + + describe("json", () => { + it("returns the parsed array as-is, preserving order", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-sources-")); + try { + const path = join(dir, "tasks.json"); + writeFileSync(path, JSON.stringify([{ id: "1" }, { id: "2" }])); + const result = await materialiseSource({ json: path }, { ...builtins, work_dir: dir }); + deepEqual(result, [{ id: "1" }, { id: "2" }]); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("rejects when file is missing", async () => { + await rejects( + () => materialiseSource({ json: "/nonexistent/tasks.json" }, builtins), + /tasks\.json/, + ); + }); + + it("rejects when JSON does not parse to an array", async () => { + const dir = mkdtempSync(join(tmpdir(), "loop-sources-")); + try { + const path = join(dir, "obj.json"); + writeFileSync(path, JSON.stringify({ not: "array" })); + await rejects(() => materialiseSource({ json: path }, builtins), /must parse to an array/); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + }); + + describe("items", () => { + it("invokes the function with builtins and returns its array", async () => { + const result = await materialiseSource( + { items: ({ work_dir }) => [`${work_dir}/a`, `${work_dir}/b`] }, + builtins, + ); + deepEqual(result, ["/tmp/a", "/tmp/b"]); + }); + + it("awaits async functions", async () => { + const result = await materialiseSource({ items: async () => [1, 2, 3] }, builtins); + deepEqual(result, [1, 2, 3]); + }); + + it("rejects when the function returns a non-array", async () => { + await rejects( + () => materialiseSource({ items: () => "not an array" as never }, builtins), + /must return an array/, + ); + }); + + it("propagates errors thrown by the function", async () => { + await rejects( + () => + materialiseSource( + { + items: () => { + throw new Error("boom"); + }, + }, + builtins, + ), + /boom/, + ); + }); + }); +}); diff --git a/packages/core/src/loop/sources.ts b/packages/core/src/loop/sources.ts new file mode 100644 index 0000000..2eaed9c --- /dev/null +++ b/packages/core/src/loop/sources.ts @@ -0,0 +1,41 @@ +import { readFile, glob as fsGlob } from "node:fs/promises"; +import type { BuiltinPromptArgs, EachSource } from "../schema.js"; + +export type { EachSource }; + +export async function materialiseSource( + source: EachSource, + builtins: BuiltinPromptArgs, +): Promise { + if ("glob" in source) { + const matches: string[] = []; + // node:fs/promises glob is experimental on Node 22 (stable from Node 24). + // Emits an ExperimentalWarning at first use - acceptable until v24 is the floor. + for await (const path of fsGlob(source.glob)) matches.push(path); + return matches.sort(); + } + if ("json" in source) { + let raw: string; + try { + raw = await readFile(source.json, "utf8"); + } catch (err) { + throw new Error(`each.json: cannot read '${source.json}': ${(err as Error).message}`); + } + let parsed: unknown; + try { + parsed = JSON.parse(raw); + } catch (err) { + throw new Error(`each.json: '${source.json}' is not valid JSON: ${(err as Error).message}`); + } + if (!Array.isArray(parsed)) { + throw new Error(`each.json: '${source.json}' must parse to an array`); + } + return parsed; + } + // items + const result = await source.items(builtins); + if (!Array.isArray(result)) { + throw new Error(`each.items: function must return an array, got ${typeof result}`); + } + return result; +} diff --git a/packages/core/src/precheck.test.ts b/packages/core/src/precheck.test.ts index 4557c2a..51795f2 100644 --- a/packages/core/src/precheck.test.ts +++ b/packages/core/src/precheck.test.ts @@ -1,4 +1,4 @@ -import { deepStrictEqual, ok, strictEqual } from "node:assert/strict"; +import { deepEqual, deepStrictEqual, ok, strictEqual } from "node:assert/strict"; import { describe, it } from "node:test"; import { defineAgent, defineWorkflow } from "./define.js"; import { formatPrecheckReport, precheckWorkflow } from "./precheck.js"; @@ -335,6 +335,63 @@ describe("precheckWorkflow", () => { }); }); +describe("precheckWorkflow with loop steps", () => { + it("threads _manifest into the available set for downstream stepFns", () => { + const stub = { + type: "worker" as const, + name: "stub", + kind: "agent" as const, + description: "x", + modelTier: "light" as const, + tools: [], + permissions: "full" as const, + timeoutSeconds: 60, + promptContext: [], + prompt: () => "p", + maxRetries: 1, + envKeys: [], + }; + const subWorkflow = { + kind: "workflow" as const, + name: "sub", + description: "x", + required: ["file"], + variables: {}, + isolation: "none" as const, + steps: [{ id: "noop", agent: stub }], + }; + const workflow = { + kind: "workflow" as const, + name: "outer", + description: "x", + required: [], + variables: {}, + isolation: "none" as const, + steps: [ + { + id: "reviews", + subWorkflow, + each: { glob: "*.md" }, + as: "file", + concurrency: 1, + onFailure: "halt" as const, + }, + { + id: "summary", + stepFn: ({ reviews_manifest }: Record) => ({ + kind: "step-invocation", + agent: stub, + args: { manifest_path: reviews_manifest }, + }), + }, + ], + } as never; + + const issues = precheckWorkflow(workflow, {}); + deepEqual(issues, []); + }); +}); + describe("formatPrecheckReport", () => { it("includes the workflow name, one line per issue, and a tail", () => { const report = formatPrecheckReport("wf", [ diff --git a/packages/core/src/precheck.ts b/packages/core/src/precheck.ts index e3496d7..c114a72 100644 --- a/packages/core/src/precheck.ts +++ b/packages/core/src/precheck.ts @@ -3,7 +3,7 @@ import { resolve } from "path"; import { BUILTIN_ARGS, LLMAgentDef, WorkflowDef } from "./schema.js"; import { extractPromptParams } from "./context-builder.js"; import { EnvProfileError, resolveEnvProfile } from "./env-profile.js"; -import { resolveStepShape } from "./step-shape.js"; +import { isLoopStep, resolveStepShape } from "./step-shape.js"; export interface PrecheckIssue { stepId?: string; @@ -111,6 +111,17 @@ export function precheckWorkflow( for (let i = 0; i < workflow.steps.length; i++) { const step = workflow.steps[i]; + if (isLoopStep(step)) { + // Loop-specific structural checks (each shape, as: requirements, + // concurrency) ran at workflow build time. Validate dependsOn + // references here for symmetry with the regular branch. + for (const dep of step.dependsOn ?? []) { + if (!stepIds.has(dep)) { + issues.push({ stepId: step.id, message: `dependsOn references unknown step '${dep}'` }); + } + } + continue; + } const { agent } = resolveStepShape(step); if (agent.type === "planner" && agent.interactive && i !== 0) { @@ -131,7 +142,10 @@ export function precheckWorkflow( } const dep = step.dependsOn === undefined ? workflow.steps[i - 1]?.id : step.dependsOn[0]; const depStep = dep ? workflow.steps.find((s) => s.id === dep) : undefined; - const depAgent = depStep ? resolveStepShape(depStep).agent : undefined; + // A critic depending on a loop step is non-retryable - loop iterations + // can't be safely re-run as a single upstream invocation. + const depAgent = + depStep && !isLoopStep(depStep) ? resolveStepShape(depStep).agent : undefined; const retryable = depAgent?.type === "worker" || (depAgent?.type === "planner" && !depAgent.interactive); if (!retryable) { @@ -178,7 +192,10 @@ export function precheckWorkflow( // params shell bin (see generata/bin/params + RunResult.params in agent-runner.ts). // `derive` runs per-step, so it sees these from step 1 onward - add them to the base // so derive reads of plan_name/instructions don't trip the precheck. - if (resolveStepShape(workflow.steps[0]).agent.type === "planner") { + if ( + !isLoopStep(workflow.steps[0]) && + resolveStepShape(workflow.steps[0]).agent.type === "planner" + ) { base.add("plan_name"); base.add("instructions"); } @@ -197,17 +214,30 @@ export function precheckWorkflow( for (let i = 0; i < workflow.steps.length; i++) { const step = workflow.steps[i]; - const { agent, args } = resolveStepShape(step); // Available set = base + every prior step id + every prior step's declared // outputs keys (the engine merges those into the params bag at runtime). + // For prior loop steps, expose `_manifest` instead - that's the only + // output a loop step contributes to the params bag. const available = new Set(base); for (let j = 0; j < i; j++) { - available.add(workflow.steps[j].id); - const priorAgent = resolveStepShape(workflow.steps[j]).agent; + const prior = workflow.steps[j]; + available.add(prior.id); + if (isLoopStep(prior)) { + available.add(`${prior.id}_manifest`); + continue; + } + const priorAgent = resolveStepShape(prior).agent; if (priorAgent.outputs) for (const k of Object.keys(priorAgent.outputs)) available.add(k); } + // Loop steps have no readable prompt/args at this layer - the sub-workflow + // is prechecked when each iteration fires (each iteration is its own + // runWorkflow call which runs its own precheck). + if (isLoopStep(step)) continue; + + const { agent, args } = resolveStepShape(step); + // For stepFn-form steps, also introspect the stepFn body for unavailable reads // (matches the old behaviour of flagging args fns that read missing keys). if ("stepFn" in step) { @@ -319,6 +349,7 @@ export function precheckWorkflow( const envByAgent = new Map>(); for (const step of workflow.steps) { + if (isLoopStep(step)) continue; const { agent } = resolveStepShape(step); const keys = agent.envKeys ?? []; if (keys.length === 0) continue; diff --git a/packages/core/src/schema.ts b/packages/core/src/schema.ts index 16c28eb..2917178 100644 --- a/packages/core/src/schema.ts +++ b/packages/core/src/schema.ts @@ -177,7 +177,67 @@ const NonCriticWorkflowStep = z.object({ export type CriticWorkflowStep = z.infer; export type FnWorkflowStep = z.infer; -export const WorkflowStep = z.union([CriticWorkflowStep, NonCriticWorkflowStep, FnWorkflowStep]); +const EachGlob = z.object({ glob: z.string().min(1) }).strict(); +const EachJson = z.object({ json: z.string().min(1) }).strict(); +const EachItems = z + .object({ + items: z.custom<(b: BuiltinPromptArgs) => unknown[] | Promise>( + (val) => typeof val === "function", + "each.items must be a function", + ), + }) + .strict(); + +export const EachSource = z.union([EachGlob, EachJson, EachItems]); +export type EachSource = z.infer; + +const LoopStepOptionsBase = z.object({ + concurrency: z.number().int().positive().default(1), + onFailure: z.enum(["halt", "continue"]).default("halt"), + onItemFail: z + .custom) => StepInvocation)>((val) => { + if (val === null || val === undefined) return false; + if (typeof val === "function") return true; + return ( + typeof val === "object" && + "type" in val && + ["worker", "planner", "critic"].includes((val as { type: unknown }).type as string) + ); + }, "onItemFail must be an LLM agent definition or a function returning a StepInvocation") + .optional(), + maxRetries: z.number().int().nonnegative().optional(), + dependsOn: z.array(z.string()).optional(), +}); + +// Runtime predicate for subWorkflow shape - we validate `kind === "workflow"` +// only, since fully recursing into WorkflowDef here would create a TS circular +// reference (WorkflowDef -> WorkflowStep -> LoopWorkflowStep -> WorkflowDef). +const isWorkflowDef = (val: unknown): boolean => + typeof val === "object" && val !== null && (val as { kind?: unknown }).kind === "workflow"; + +// Discriminated by source so glob requires `as:` and json/items reject it at parse time. +export const LoopWorkflowStep = z.union([ + LoopStepOptionsBase.extend({ + id: z.string(), + subWorkflow: z.unknown().refine(isWorkflowDef, "subWorkflow must be a WorkflowDef"), + each: EachGlob, + as: z.string().min(1), + }).strict(), + LoopStepOptionsBase.extend({ + id: z.string(), + subWorkflow: z.unknown().refine(isWorkflowDef, "subWorkflow must be a WorkflowDef"), + each: z.union([EachJson, EachItems]), + as: z.string().optional(), + }).strict(), +]); +export type LoopWorkflowStep = z.infer; + +export const WorkflowStep = z.union([ + CriticWorkflowStep, + NonCriticWorkflowStep, + FnWorkflowStep, + LoopWorkflowStep, +]); export type WorkflowStep = z.infer; export type DeriveFn = (args: Record) => Record; diff --git a/packages/core/src/step-shape.ts b/packages/core/src/step-shape.ts index 191c3cf..2d0db13 100644 --- a/packages/core/src/step-shape.ts +++ b/packages/core/src/step-shape.ts @@ -1,14 +1,27 @@ import type { AgentDef, StepParams, WorkflowStep } from "./schema.js"; +export function isLoopStep(step: WorkflowStep): step is Extract { + return "each" in step && "subWorkflow" in step; +} + // Resolve a workflow step (agent-form or stepFn-form) to its underlying // agent + args. For function-form steps, run the stepFn under a Proxy that -// returns string placeholders so it can execute without throwing — the +// returns string placeholders so it can execute without throwing - the // returned StepInvocation gives both the agent and the args mapping for // any caller that needs them (precheck, env-key collection, output rendering). +// +// Loop steps have no agent of their own (the sub-workflow's agents are +// validated separately when each iteration fires). Callers that hit this +// path should branch on isLoopStep() before calling resolveStepShape. export function resolveStepShape(step: WorkflowStep): { agent: AgentDef; args: Record | ((p: StepParams) => Record) | undefined; } { + if (isLoopStep(step)) { + throw new Error( + `resolveStepShape called on loop step '${step.id}' - callers must check isLoopStep() first`, + ); + } if ("stepFn" in step) { const sentinel = new Proxy({} as StepParams, { get: (_, key) => `__placeholder_${String(key)}__`,