diff --git a/README.md b/README.md index 7469fe0..be895d8 100644 --- a/README.md +++ b/README.md @@ -193,3 +193,38 @@ steps: stdin: $categorize.stdout condition: $approve.approved ``` + +### Step fields + +| Field | Required | Description | +|---|---|---| +| `id` | yes | Unique step identifier; used to reference the step's output in subsequent steps via `$id.stdout` or `$id.json`. | +| `command` | yes (for regular steps) | Shell command to run. Mutually exclusive with `lobster`. | +| `lobster` | yes (for sub-workflow steps) | Path to a `.lobster` file to run as a sub-workflow (resolved relative to the parent workflow). Mutually exclusive with `command`. | +| `args` | no | Key/value map of input arguments passed to the sub-workflow. Values support `${arg}` and `$stepId.stdout`/`$stepId.json` template syntax. Only valid when `lobster` is set. | +| `loop` | no | Repeat the sub-workflow step in a loop. Only valid when `lobster` is set. | +| `loop.maxIterations` | yes (when `loop` is set) | Maximum number of iterations. | +| `loop.condition` | no | Shell command evaluated after each iteration. Exit code 0 continues the loop; non-zero stops it early. Receives `LOBSTER_LOOP_STDOUT`, `LOBSTER_LOOP_JSON`, and `LOBSTER_LOOP_ITERATION` as environment variables. | +| `stdin` | no | Pass a previous step's output as stdin. | +| `approval` | no | Set to `required` to insert an approval gate before the step runs. | +| `condition` | no | Expression that must be truthy for the step to run. | + +### Sub-workflow step example + +Use the `lobster` field to embed another `.lobster` file as a step in your workflow, optionally passing arguments and looping until a condition is met: + +```yaml +steps: + - id: prepare + command: echo "hello" + + - id: process + lobster: ./sub_workflow.lobster + args: + input: $prepare.stdout + loop: + maxIterations: 10 + condition: '! echo "$LOBSTER_LOOP_STDOUT" | grep -q "^done"' +``` + +The sub-workflow's last step result (stdout/json) is stored as the step result and is accessible via `$process.stdout` / `$process.json` in subsequent steps. diff --git a/src/workflows/file.ts b/src/workflows/file.ts index be17fee..c5e1e7d 100644 --- a/src/workflows/file.ts +++ b/src/workflows/file.ts @@ -16,9 +16,19 @@ export type WorkflowFile = { steps: WorkflowStep[]; }; +export type LoopConfig = { + maxIterations: number; + condition?: string; +}; + export type WorkflowStep = { id: string; - command: string; + // Sub-lobster step: lobster holds the path to the sub-workflow file + lobster?: string; + args?: Record; + loop?: LoopConfig; + // Regular shell step: + command?: string; env?: Record; cwd?: string; stdin?: unknown; @@ -98,12 +108,18 @@ export async function loadWorkflowFile(filePath: string): Promise if (!step.id || typeof step.id !== 'string') { throw new Error('Workflow step requires an id'); } - if (!step.command || typeof step.command !== 'string') { - throw new Error(`Workflow step ${step.id} requires a command string`); - } if (seen.has(step.id)) { throw new Error(`Duplicate workflow step id: ${step.id}`); } + if (step.lobster !== undefined) { + if (typeof step.lobster !== 'string' || !step.lobster) { + throw new Error(`Workflow step ${step.id} 'lobster' must be a non-empty file path string`); + } + } else { + if (!step.command || typeof step.command !== 'string') { + throw new Error(`Workflow step ${step.id} requires a command string`); + } + } seen.add(step.id); } @@ -174,7 +190,14 @@ export async function runWorkflowFile({ continue; } - const command = resolveTemplate(step.command, resolvedArgs, results); + if (step.lobster !== undefined) { + const stepResult = await runLobsterSubStep(step, resolvedArgs, results, resolvedFilePath, ctx); + results[step.id] = stepResult; + lastStepId = step.id; + continue; + } + + const command = resolveTemplate(step.command!, resolvedArgs, results); const stdinValue = resolveStdin(step.stdin, resolvedArgs, results); const env = mergeEnv(ctx.env, workflow.env, step.env, resolvedArgs, results); const cwd = resolveCwd(step.cwd ?? workflow.cwd, resolvedArgs); @@ -505,3 +528,104 @@ async function runShellCommand({ }); }); } + +async function runLobsterSubStep( + step: WorkflowStep, + parentArgs: Record, + parentResults: Record, + parentFilePath: string, + ctx: RunContext, +): Promise { + const resolvedFile = resolveSubWorkflowPath(step.lobster!, parentFilePath, parentArgs, parentResults); + const subArgs = resolveSubWorkflowArgs(step.args, parentArgs, parentResults); + + if (step.loop) { + return runLobsterSubStepLoop(step.id, resolvedFile, subArgs, step.loop, ctx); + } + + const result = await runWorkflowFile({ filePath: resolvedFile, args: subArgs, ctx }); + if (result.status !== 'ok') { + throw new Error(`Sub-workflow step '${step.id}' did not complete: ${result.status}`); + } + return subWorkflowResultToStepResult(step.id, result); +} + +async function runLobsterSubStepLoop( + stepId: string, + filePath: string, + args: Record, + loop: LoopConfig, + ctx: RunContext, +): Promise { + let lastResult: WorkflowStepResult = { id: stepId }; + + for (let i = 0; i < loop.maxIterations; i++) { + const result = await runWorkflowFile({ filePath, args, ctx }); + if (result.status !== 'ok') { + throw new Error(`Sub-workflow loop iteration ${i + 1} did not complete: ${result.status}`); + } + lastResult = subWorkflowResultToStepResult(stepId, result); + + if (loop.condition) { + const continueLoop = await evaluateLoopCondition(loop.condition, lastResult, i + 1, ctx.env); + if (!continueLoop) break; + } + } + + return lastResult; +} + +async function evaluateLoopCondition( + condition: string, + lastResult: WorkflowStepResult, + iteration: number, + env: Record, +): Promise { + const loopEnv: Record = { + ...env, + LOBSTER_LOOP_STDOUT: lastResult.stdout ?? '', + LOBSTER_LOOP_JSON: lastResult.json !== undefined ? JSON.stringify(lastResult.json) : '', + LOBSTER_LOOP_ITERATION: String(iteration), + }; + try { + await runShellCommand({ command: condition, stdin: null, env: loopEnv }); + return true; + } catch { + return false; + } +} + +function resolveSubWorkflowPath( + file: string, + parentFilePath: string, + parentArgs: Record, + parentResults: Record, +): string { + const resolved = resolveTemplate(file, parentArgs, parentResults); + if (path.isAbsolute(resolved)) return resolved; + return path.resolve(path.dirname(parentFilePath), resolved); +} + +function resolveSubWorkflowArgs( + args: Record | undefined, + parentArgs: Record, + parentResults: Record, +): Record { + if (!args) return {}; + const resolved: Record = {}; + for (const [key, value] of Object.entries(args)) { + resolved[key] = typeof value === 'string' ? resolveTemplate(value, parentArgs, parentResults) : value; + } + return resolved; +} + +function subWorkflowResultToStepResult(id: string, result: WorkflowRunResult): WorkflowStepResult { + const output = result.output; + if (output.length === 0) return { id }; + if (output.length === 1) { + const item = output[0]; + if (typeof item === 'string') return { id, stdout: item }; + return { id, json: item, stdout: JSON.stringify(item) }; + } + return { id, json: output, stdout: JSON.stringify(output) }; +} diff --git a/test/lobster_sub_step.test.ts b/test/lobster_sub_step.test.ts new file mode 100644 index 0000000..e78df44 --- /dev/null +++ b/test/lobster_sub_step.test.ts @@ -0,0 +1,186 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import { promises as fsp } from 'node:fs'; +import path from 'node:path'; +import os from 'node:os'; + +import { runWorkflowFile } from '../src/workflows/file.js'; + +const ctx = { + stdin: process.stdin, + stdout: process.stdout, + stderr: process.stderr, + env: process.env as Record, + mode: 'tool' as const, +}; + +async function writeTmp(dir: string, name: string, content: unknown) { + const filePath = path.join(dir, name); + await fsp.writeFile(filePath, JSON.stringify(content, null, 2), 'utf8'); + return filePath; +} + +test('lobster sub-step runs a sub-workflow and captures its output', async () => { + const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-sub-')); + + const subWorkflow = { + args: { greeting: { default: 'hello' } }, + steps: [ + { id: 'say', command: 'echo "${greeting} world"' }, + ], + }; + await writeTmp(tmpDir, 'sub.lobster', subWorkflow); + + const mainWorkflow = { + steps: [ + { id: 'greet', lobster: './sub.lobster', args: { greeting: 'hi' } }, + ], + }; + const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow); + + const result = await runWorkflowFile({ filePath: mainPath, ctx }); + assert.equal(result.status, 'ok'); + assert.equal((result.output[0] as string).trim(), 'hi world'); +}); + +test('lobster sub-step passes parent step output as args', async () => { + const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-sub-')); + + const subWorkflow = { + args: { value: { default: '' } }, + steps: [ + { id: 'out', command: 'echo "received:${value}"' }, + ], + }; + await writeTmp(tmpDir, 'sub.lobster', subWorkflow); + + const mainWorkflow = { + steps: [ + { id: 'produce', command: 'echo "myvalue"' }, + { id: 'consume', lobster: './sub.lobster', args: { value: '$produce.stdout' } }, + ], + }; + const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow); + + const result = await runWorkflowFile({ filePath: mainPath, ctx }); + assert.equal(result.status, 'ok'); + assert.match(result.output[0] as string, /received:myvalue/); +}); + +test('lobster sub-step loop runs exactly maxIterations times when no condition', async () => { + const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-loop-')); + + const counterFile = path.join(tmpDir, 'count.txt'); + await fsp.writeFile(counterFile, '0', 'utf8'); + + const subWorkflow = { + steps: [ + { id: 'inc', command: `c=$(cat ${counterFile}); echo $((c+1)) | tee ${counterFile}` }, + ], + }; + await writeTmp(tmpDir, 'sub.lobster', subWorkflow); + + const mainWorkflow = { + steps: [ + { id: 'loop', lobster: './sub.lobster', loop: { maxIterations: 3 } }, + ], + }; + const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow); + + const result = await runWorkflowFile({ filePath: mainPath, ctx }); + assert.equal(result.status, 'ok'); + + const finalCount = parseInt((await fsp.readFile(counterFile, 'utf8')).trim(), 10); + assert.equal(finalCount, 3); +}); + +test('lobster sub-step loop stops when condition exits non-zero', async () => { + const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-cond-')); + + const counterFile = path.join(tmpDir, 'count.txt'); + await fsp.writeFile(counterFile, '0', 'utf8'); + + const subWorkflow = { + steps: [ + { id: 'inc', command: `c=$(cat ${counterFile}); echo $((c+1)) | tee ${counterFile}` }, + ], + }; + await writeTmp(tmpDir, 'sub.lobster', subWorkflow); + + // Condition: continue while LOBSTER_LOOP_ITERATION < 2 (stop after 2nd iteration) + const mainWorkflow = { + steps: [ + { + id: 'loop', + lobster: './sub.lobster', + loop: { + maxIterations: 10, + condition: '[ "$LOBSTER_LOOP_ITERATION" -lt 2 ]', + }, + }, + ], + }; + const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow); + + const result = await runWorkflowFile({ filePath: mainPath, ctx }); + assert.equal(result.status, 'ok'); + + const finalCount = parseInt((await fsp.readFile(counterFile, 'utf8')).trim(), 10); + assert.equal(finalCount, 2); +}); + +test('lobster sub-step loop condition uses LOBSTER_LOOP_STDOUT', async () => { + const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-stdout-')); + + const counterFile = path.join(tmpDir, 'count.txt'); + await fsp.writeFile(counterFile, '0', 'utf8'); + + // Sub-workflow: increment counter and echo "done" when count reaches 3 + const subWorkflow = { + steps: [ + { + id: 'step', + command: `c=$(cat ${counterFile}); n=$((c+1)); echo $n > ${counterFile}; if [ $n -ge 3 ]; then echo "done"; else echo "continue"; fi`, + }, + ], + }; + await writeTmp(tmpDir, 'sub.lobster', subWorkflow); + + // Condition: continue while stdout is NOT "done" + const mainWorkflow = { + steps: [ + { + id: 'loop', + lobster: './sub.lobster', + loop: { + maxIterations: 10, + condition: '! echo "$LOBSTER_LOOP_STDOUT" | grep -q "^done"', + }, + }, + ], + }; + const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow); + + const result = await runWorkflowFile({ filePath: mainPath, ctx }); + assert.equal(result.status, 'ok'); + + const finalCount = parseInt((await fsp.readFile(counterFile, 'utf8')).trim(), 10); + assert.equal(finalCount, 3); + assert.match(result.output[0] as string, /done/); +}); + +test('loadWorkflowFile rejects lobster step without file', async () => { + const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-val-')); + + const workflow = { + steps: [ + { id: 'bad', lobster: '' }, + ], + }; + const filePath = await writeTmp(tmpDir, 'workflow.lobster', workflow); + + await assert.rejects( + () => runWorkflowFile({ filePath, ctx }), + /'lobster' must be a non-empty file path/, + ); +});