Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
a0ff74d
feat(core): add EachSource materialiser for loop iteration
ben-rogerson May 4, 2026
dec8124
chore(core): polish source materialiser (rename + import + cast)
ben-rogerson May 4, 2026
1f6ad4d
feat(core): bind loop items to sub-workflow params
ben-rogerson May 4, 2026
8fb5621
chore(core): tighten loop item binding (undefined + hasOwn)
ben-rogerson May 4, 2026
e546d7a
feat(core): add loop manifest schema and writer
ben-rogerson May 4, 2026
a5baaeb
chore(core): harden loop manifest path + add failed-item test
ben-rogerson May 4, 2026
f9dc307
feat(core): add LoopWorkflowStep schema and isLoopStep helper
ben-rogerson May 4, 2026
335444a
chore(core): dedupe EachSource + tighten loop schema tests
ben-rogerson May 4, 2026
ff1ba3f
feat(core): .step() accepts WorkflowDef for loop iteration
ben-rogerson May 4, 2026
bb8a01d
chore(core): tighten loop step builder (onItemFail guard + pass-throu…
ben-rogerson May 4, 2026
e996ceb
feat(core): loop runner orchestrates iterated sub-workflows
ben-rogerson May 4, 2026
37e93b6
chore(core): tighten loop runner (halt-first + onItemFail diagnostics)
ben-rogerson May 4, 2026
cb50a83
feat(core): engine dispatches loop steps to runLoopStep
ben-rogerson May 4, 2026
594c76d
chore(core): real timestamps + logStepDone for loop steps
ben-rogerson May 5, 2026
6cf0609
feat(core): precheck threads loop manifest into available set
ben-rogerson May 5, 2026
d803b21
test(core): end-to-end loop integration test
ben-rogerson May 5, 2026
276e250
chore: changeset for workflow iteration
ben-rogerson May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/workflow-iteration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@generata/core": minor
---

Add `each:` to `.step()` so a workflow step can run a sub-workflow once per item from a directory glob, JSON file, or function. Each iteration runs as a normal sub-workflow with its own runId. Per-iteration outputs are recorded in a manifest at `<work_dir>/.generata/loops/<workflow>-<step>-<run>.json`, surfaced downstream as `<step-id>_manifest`. Supports `concurrency` (sequential by default), `onFailure: "halt" | "continue"`, an `onItemFail` agent for per-failure side effects, and `maxRetries` per iteration. New exports: `LoopWorkflowStep` and `EachSource` from `@generata/core`.
143 changes: 142 additions & 1 deletion packages/core/src/define.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, it } from "node:test";
import { equal, ok } from "node:assert/strict";
import { equal, ok, throws } from "node:assert/strict";
import { defineAgent, defineWorkflow, worktree } from "./define.js";

describe("defineWorkflow chain builder", () => {
Expand Down Expand Up @@ -223,3 +223,144 @@ describe("defineWorkflow chain builder", () => {
ok(threw, "expected empty workflow to throw");
});
});

describe("defineWorkflow .step() with sub-workflow", () => {
const stub = defineAgent({
type: "worker",
description: "x",
modelTier: "light",
tools: [],
permissions: "full",
timeoutSeconds: 60,
promptContext: [],
prompt: () => "p",
});
(stub as any).name = "stub";

const subWorkflow = defineWorkflow({
description: "review one note",
required: ["file"],
})
.step("read", stub)
.build();
(subWorkflow as any).name = "review-note";

it("accepts a WorkflowDef as a step value with each.glob + as", () => {
const wf = defineWorkflow({ description: "outer" })
.step("reviews", subWorkflow, {
each: { glob: "notes/*.md" },
as: "file",
})
.build();
ok(wf.steps[0]);
equal(wf.steps[0].id, "reviews");
ok("subWorkflow" in wf.steps[0]);
});

it("accepts each.json without as", () => {
const wf = defineWorkflow({ description: "outer" })
.step("reviews", subWorkflow, {
each: { json: "tasks.json" },
})
.build();
ok("subWorkflow" in wf.steps[0]);
});

it("accepts each.items without as", () => {
const wf = defineWorkflow({ description: "outer" })
.step("reviews", subWorkflow, {
each: { items: () => [{ file: "a.md" }] },
})
.build();
ok("subWorkflow" in wf.steps[0]);
});

it("rejects glob source without as", () => {
throws(() =>
defineWorkflow({ description: "outer" })
.step("reviews", subWorkflow, {
each: { glob: "notes/*.md" },
} as never)
.build(),
);
});

it("rejects concurrency: 0", () => {
throws(() =>
defineWorkflow({ description: "outer" })
.step("reviews", subWorkflow, {
each: { glob: "notes/*.md" },
as: "file",
concurrency: 0,
})
.build(),
);
});

it("rejects missing each: when value is a workflow", () => {
throws(() =>
defineWorkflow({ description: "outer" })
.step("reviews", subWorkflow, {} as never)
.build(),
);
});

it("threads dependsOn, maxRetries, onFailure, onItemFail through to the built step", () => {
const failHandler = defineAgent({
type: "worker",
description: "h",
modelTier: "light",
tools: [],
permissions: "full",
timeoutSeconds: 60,
promptContext: [],
prompt: () => "p",
});
(failHandler as any).name = "failHandler";

const wf = defineWorkflow({ description: "outer" })
.step("setup", stub)
.step("reviews", subWorkflow, {
each: { glob: "notes/*.md" },
as: "file",
concurrency: 4,
onFailure: "continue",
onItemFail: failHandler,
maxRetries: 2,
dependsOn: ["setup"],
})
.build();
const loopStep = wf.steps[1] as Record<string, unknown>;
equal(loopStep.concurrency, 4);
equal(loopStep.onFailure, "continue");
equal(loopStep.maxRetries, 2);
ok(Array.isArray(loopStep.dependsOn));
equal((loopStep.dependsOn as string[])[0], "setup");
ok(loopStep.onItemFail);
});

it("rejects factory-form agent passed bare as onItemFail", () => {
// Factory-form: defineAgent called with a callable, returns AgentCallable with kind: "agent"
const factory = defineAgent<{ x: string }>(({ x }) => ({
type: "worker",
description: "f",
modelTier: "light",
tools: [],
permissions: "full",
timeoutSeconds: 60,
promptContext: [],
prompt: () => `f ${x}`,
}));
(factory as any).name = "factory";

throws(() =>
defineWorkflow({ description: "outer" })
.step("reviews", subWorkflow, {
each: { glob: "*.md" },
as: "file",
onItemFail: factory as never,
})
.build(),
);
});
});
100 changes: 97 additions & 3 deletions packages/core/src/define.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,27 @@ type StepValue<TParams> =
| (AgentDef & { readonly [_factoryBrand]?: never })
| ((params: TParams) => StepInvocation<Record<string, string>>);

// Loop-form step options. Glob source requires `as:` (it streams strings); json
// and items sources can omit `as:` (each item is already an object record).
type LoopStepOptions = {
each:
| { glob: string }
| { json: string }
| { items: (b: BuiltinPromptArgs) => unknown[] | Promise<unknown[]> };
as?: string;
concurrency?: number;
onFailure?: "halt" | "continue";
onItemFail?:
| (LLMAgentDef & { readonly [_factoryBrand]?: never })
| ((params: Record<string, string> & { error: string }) => StepInvocation);
maxRetries?: number;
dependsOn?: string[];
};

// Loop-form step value: a workflow definition. Discriminated by `kind: "workflow"`
// (stamped by defineWorkflow().build() before the def reaches a consumer).
type LoopStepValue = WorkflowDef;

// Pulls the declared output keys back out of a step value's type.
// - Bare AgentDef: the value's `outputs` field (if declared) carries the keys.
// - stepFn: the StepInvocation it returns is generic on TOutputs (set by
Expand Down Expand Up @@ -203,6 +224,13 @@ type InternalStep = {
// Stored loose because either an object agent or a callable factory may be
// passed; the engine narrows by `typeof === "function"` at rejection time.
onReject?: LLMAgentDef | ((inputs: Record<string, string>) => StepInvocation);
// Loop-form fields. Set when `value` was a WorkflowDef and `each:` was provided.
subWorkflow?: WorkflowDef;
each?: LoopStepOptions["each"];
as?: string;
concurrency?: number;
onFailure?: "halt" | "continue";
onItemFail?: LLMAgentDef | ((inputs: Record<string, string>) => StepInvocation);
};

type WorkflowConfigInput<
Expand All @@ -220,12 +248,27 @@ type WorkflowConfigInput<
// Each .step() returns a Builder with TPrior expanded by the new step's id and
// TBaseParams extended with any `outputs` declared on the step's agent. The
// agent's emit values reach downstream stepFns as named string params.
//
// Two overloads:
// - Agent-form (existing): bare agent or stepFn, threads declared `outputs`
// keys into downstream params.
// - Loop-form (new): WorkflowDef value with `each:`, emits `<id>_manifest`
// into downstream params (a path to the per-item run manifest).
//
// Order matters: the agent-form overload must come first so TS picks it for
// non-WorkflowDef values; the loop-form overload requires `LoopStepOptions`,
// so a missing/empty options bag won't accidentally match it.
export type WorkflowBuilder<TBaseParams, TPrior extends string> = {
step<const Id extends string, V extends StepValue<TBaseParams & Record<TPrior, string>>>(
id: Id,
value: V,
options?: StepOptions<TBaseParams & Record<TPrior, string>>,
): WorkflowBuilder<TBaseParams & StepValueOutputs<V>, TPrior | Id>;
step<const Id extends string>(
id: Id,
value: LoopStepValue,
options: LoopStepOptions,
): WorkflowBuilder<TBaseParams & Record<`${Id}_manifest`, string>, TPrior | Id>;
build(): WorkflowDef;
};

Expand All @@ -247,13 +290,64 @@ export function defineWorkflow<
const builder = {
step(
id: string,
value: AgentDef | ((p: Record<string, string>) => StepInvocation),
options?: StepOptions,
value: AgentDef | WorkflowDef | ((p: Record<string, string>) => StepInvocation),
options?: StepOptions | LoopStepOptions,
) {
if (steps.some((s) => s.id === id)) {
throw new Error(`defineWorkflow: duplicate step id '${id}'`);
}
const internal: InternalStep = { id, ...options };
// Loop-form: value is a built workflow (stamped kind: "workflow"). This
// branch must run before the agent-form branch since a WorkflowDef is
// also a plain object. AgentCallables are callable AND carry kind:
// "agent" (not "workflow"), so the `kind === "workflow"` check cleanly
// separates the two.
if (
typeof value === "object" &&
value !== null &&
(value as { kind?: unknown }).kind === "workflow"
) {
const opts = options as LoopStepOptions | undefined;
if (!opts || !opts.each) {
throw new Error(
`defineWorkflow: step '${id}' has a sub-workflow value but no 'each:' option`,
);
}
if ("glob" in opts.each && !opts.as) {
throw new Error(
`defineWorkflow: step '${id}' has each.glob but no 'as:' (required for string items)`,
);
}
if (opts.concurrency !== undefined && opts.concurrency < 1) {
throw new Error(
`defineWorkflow: step '${id}' concurrency must be a positive integer (got ${opts.concurrency})`,
);
}
if (
typeof opts.onItemFail === "function" &&
(opts.onItemFail as { kind?: unknown }).kind === "agent"
) {
const fnName = (opts.onItemFail as { name?: string }).name || "<factory>";
throw new Error(
`Step '${id}': factory-form agent '${fnName}' cannot be passed bare as onItemFail. ` +
`Wrap it in a step fn: onItemFail: ({...}) => ${fnName}({...inputs})`,
);
}
const internal: InternalStep = {
id,
subWorkflow: value as WorkflowDef,
each: opts.each,
as: opts.as,
concurrency: opts.concurrency ?? 1,
onFailure: opts.onFailure ?? "halt",
onItemFail: opts.onItemFail as InternalStep["onItemFail"],
maxRetries: opts.maxRetries,
dependsOn: opts.dependsOn,
};
steps.push(internal);
return builder;
}
// Agent-form / stepFn-form (existing).
const internal: InternalStep = { id, ...(options as StepOptions) };
if (typeof value === "function") {
// Factory-form agents are callable AND carry kind: "agent". Passing one
// bare would skip the input mapping and run with a sentinel template.
Expand Down
Loading
Loading