Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,38 @@ steps:
stdin: $categorize.stdout
condition: $approve.approved
```

### Step fields

| Field | Required | Description |
|---|---|---|
| `id` | yes | Unique step identifier; used to reference the step's output in subsequent steps via `$id.stdout` or `$id.json`. |
| `command` | yes (for regular steps) | Shell command to run. Mutually exclusive with `lobster`. |
| `lobster` | yes (for sub-workflow steps) | Path to a `.lobster` file to run as a sub-workflow (resolved relative to the parent workflow). Mutually exclusive with `command`. |
| `args` | no | Key/value map of input arguments passed to the sub-workflow. Values support `${arg}` and `$stepId.stdout`/`$stepId.json` template syntax. Only valid when `lobster` is set. |
| `loop` | no | Repeat the sub-workflow step in a loop. Only valid when `lobster` is set. |
| `loop.maxIterations` | yes (when `loop` is set) | Maximum number of iterations. |
| `loop.condition` | no | Shell command evaluated after each iteration. Exit code 0 continues the loop; non-zero stops it early. Receives `LOBSTER_LOOP_STDOUT`, `LOBSTER_LOOP_JSON`, and `LOBSTER_LOOP_ITERATION` as environment variables. |
| `stdin` | no | Pass a previous step's output as stdin. |
| `approval` | no | Set to `required` to insert an approval gate before the step runs. |
| `condition` | no | Expression that must be truthy for the step to run. |

### Sub-workflow step example

Use the `lobster` field to embed another `.lobster` file as a step in your workflow, optionally passing arguments and looping until a condition is met:

```yaml
steps:
- id: prepare
command: echo "hello"

- id: process
lobster: ./sub_workflow.lobster
args:
input: $prepare.stdout
loop:
maxIterations: 10
condition: '! echo "$LOBSTER_LOOP_STDOUT" | grep -q "^done"'
```

The sub-workflow's last step result (stdout/json) is stored as the step result and is accessible via `$process.stdout` / `$process.json` in subsequent steps.
134 changes: 129 additions & 5 deletions src/workflows/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,19 @@ export type WorkflowFile = {
steps: WorkflowStep[];
};

export type LoopConfig = {
maxIterations: number;
condition?: string;
};

export type WorkflowStep = {
id: string;
command: string;
// Sub-lobster step: lobster holds the path to the sub-workflow file
lobster?: string;
args?: Record<string, unknown>;
loop?: LoopConfig;
// Regular shell step:
command?: string;
env?: Record<string, string>;
cwd?: string;
stdin?: unknown;
Expand Down Expand Up @@ -98,12 +108,18 @@ export async function loadWorkflowFile(filePath: string): Promise<WorkflowFile>
if (!step.id || typeof step.id !== 'string') {
throw new Error('Workflow step requires an id');
}
if (!step.command || typeof step.command !== 'string') {
throw new Error(`Workflow step ${step.id} requires a command string`);
}
if (seen.has(step.id)) {
throw new Error(`Duplicate workflow step id: ${step.id}`);
}
if (step.lobster !== undefined) {
if (typeof step.lobster !== 'string' || !step.lobster) {
throw new Error(`Workflow step ${step.id} 'lobster' must be a non-empty file path string`);
}
} else {
if (!step.command || typeof step.command !== 'string') {
throw new Error(`Workflow step ${step.id} requires a command string`);
}
}
seen.add(step.id);
}

Expand Down Expand Up @@ -174,7 +190,14 @@ export async function runWorkflowFile({
continue;
}

const command = resolveTemplate(step.command, resolvedArgs, results);
if (step.lobster !== undefined) {
const stepResult = await runLobsterSubStep(step, resolvedArgs, results, resolvedFilePath, ctx);
results[step.id] = stepResult;
lastStepId = step.id;
continue;
}

const command = resolveTemplate(step.command!, resolvedArgs, results);
const stdinValue = resolveStdin(step.stdin, resolvedArgs, results);
const env = mergeEnv(ctx.env, workflow.env, step.env, resolvedArgs, results);
const cwd = resolveCwd(step.cwd ?? workflow.cwd, resolvedArgs);
Expand Down Expand Up @@ -505,3 +528,104 @@ async function runShellCommand({
});
});
}

async function runLobsterSubStep(
step: WorkflowStep,
parentArgs: Record<string, unknown>,
parentResults: Record<string, WorkflowStepResult>,
parentFilePath: string,
ctx: RunContext,
): Promise<WorkflowStepResult> {
const resolvedFile = resolveSubWorkflowPath(step.lobster!, parentFilePath, parentArgs, parentResults);
const subArgs = resolveSubWorkflowArgs(step.args, parentArgs, parentResults);

if (step.loop) {
return runLobsterSubStepLoop(step.id, resolvedFile, subArgs, step.loop, ctx);
}

const result = await runWorkflowFile({ filePath: resolvedFile, args: subArgs, ctx });
if (result.status !== 'ok') {
throw new Error(`Sub-workflow step '${step.id}' did not complete: ${result.status}`);
}
return subWorkflowResultToStepResult(step.id, result);
}

async function runLobsterSubStepLoop(
stepId: string,
filePath: string,
args: Record<string, unknown>,
loop: LoopConfig,
ctx: RunContext,
): Promise<WorkflowStepResult> {
let lastResult: WorkflowStepResult = { id: stepId };

for (let i = 0; i < loop.maxIterations; i++) {
const result = await runWorkflowFile({ filePath, args, ctx });
if (result.status !== 'ok') {
throw new Error(`Sub-workflow loop iteration ${i + 1} did not complete: ${result.status}`);
}
lastResult = subWorkflowResultToStepResult(stepId, result);

if (loop.condition) {
const continueLoop = await evaluateLoopCondition(loop.condition, lastResult, i + 1, ctx.env);
if (!continueLoop) break;
}
}

return lastResult;
}

async function evaluateLoopCondition(
condition: string,
lastResult: WorkflowStepResult,
iteration: number,
env: Record<string, string | undefined>,
): Promise<boolean> {
const loopEnv: Record<string, string | undefined> = {
...env,
LOBSTER_LOOP_STDOUT: lastResult.stdout ?? '',
LOBSTER_LOOP_JSON: lastResult.json !== undefined ? JSON.stringify(lastResult.json) : '',
LOBSTER_LOOP_ITERATION: String(iteration),
};
try {
await runShellCommand({ command: condition, stdin: null, env: loopEnv });
return true;
} catch {
return false;
}
}

function resolveSubWorkflowPath(
file: string,
parentFilePath: string,
parentArgs: Record<string, unknown>,
parentResults: Record<string, WorkflowStepResult>,
): string {
const resolved = resolveTemplate(file, parentArgs, parentResults);
if (path.isAbsolute(resolved)) return resolved;
return path.resolve(path.dirname(parentFilePath), resolved);
}

function resolveSubWorkflowArgs(
args: Record<string, unknown> | undefined,
parentArgs: Record<string, unknown>,
parentResults: Record<string, WorkflowStepResult>,
): Record<string, unknown> {
if (!args) return {};
const resolved: Record<string, unknown> = {};
for (const [key, value] of Object.entries(args)) {
resolved[key] = typeof value === 'string' ? resolveTemplate(value, parentArgs, parentResults) : value;
}
return resolved;
}

function subWorkflowResultToStepResult(id: string, result: WorkflowRunResult): WorkflowStepResult {
const output = result.output;
if (output.length === 0) return { id };
if (output.length === 1) {
const item = output[0];
if (typeof item === 'string') return { id, stdout: item };
return { id, json: item, stdout: JSON.stringify(item) };
}
return { id, json: output, stdout: JSON.stringify(output) };
}
186 changes: 186 additions & 0 deletions test/lobster_sub_step.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { promises as fsp } from 'node:fs';
import path from 'node:path';
import os from 'node:os';

import { runWorkflowFile } from '../src/workflows/file.js';

const ctx = {
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
env: process.env as Record<string, string | undefined>,
mode: 'tool' as const,
};

async function writeTmp(dir: string, name: string, content: unknown) {
const filePath = path.join(dir, name);
await fsp.writeFile(filePath, JSON.stringify(content, null, 2), 'utf8');
return filePath;
}

test('lobster sub-step runs a sub-workflow and captures its output', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-sub-'));

const subWorkflow = {
args: { greeting: { default: 'hello' } },
steps: [
{ id: 'say', command: 'echo "${greeting} world"' },
],
};
await writeTmp(tmpDir, 'sub.lobster', subWorkflow);

const mainWorkflow = {
steps: [
{ id: 'greet', lobster: './sub.lobster', args: { greeting: 'hi' } },
],
};
const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow);

const result = await runWorkflowFile({ filePath: mainPath, ctx });
assert.equal(result.status, 'ok');
assert.equal((result.output[0] as string).trim(), 'hi world');
});

test('lobster sub-step passes parent step output as args', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-sub-'));

const subWorkflow = {
args: { value: { default: '' } },
steps: [
{ id: 'out', command: 'echo "received:${value}"' },
],
};
await writeTmp(tmpDir, 'sub.lobster', subWorkflow);

const mainWorkflow = {
steps: [
{ id: 'produce', command: 'echo "myvalue"' },
{ id: 'consume', lobster: './sub.lobster', args: { value: '$produce.stdout' } },
],
};
const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow);

const result = await runWorkflowFile({ filePath: mainPath, ctx });
assert.equal(result.status, 'ok');
assert.match(result.output[0] as string, /received:myvalue/);
});

test('lobster sub-step loop runs exactly maxIterations times when no condition', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-loop-'));

const counterFile = path.join(tmpDir, 'count.txt');
await fsp.writeFile(counterFile, '0', 'utf8');

const subWorkflow = {
steps: [
{ id: 'inc', command: `c=$(cat ${counterFile}); echo $((c+1)) | tee ${counterFile}` },
],
};
await writeTmp(tmpDir, 'sub.lobster', subWorkflow);

const mainWorkflow = {
steps: [
{ id: 'loop', lobster: './sub.lobster', loop: { maxIterations: 3 } },
],
};
const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow);

const result = await runWorkflowFile({ filePath: mainPath, ctx });
assert.equal(result.status, 'ok');

const finalCount = parseInt((await fsp.readFile(counterFile, 'utf8')).trim(), 10);
assert.equal(finalCount, 3);
});

test('lobster sub-step loop stops when condition exits non-zero', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-cond-'));

const counterFile = path.join(tmpDir, 'count.txt');
await fsp.writeFile(counterFile, '0', 'utf8');

const subWorkflow = {
steps: [
{ id: 'inc', command: `c=$(cat ${counterFile}); echo $((c+1)) | tee ${counterFile}` },
],
};
await writeTmp(tmpDir, 'sub.lobster', subWorkflow);

// Condition: continue while LOBSTER_LOOP_ITERATION < 2 (stop after 2nd iteration)
const mainWorkflow = {
steps: [
{
id: 'loop',
lobster: './sub.lobster',
loop: {
maxIterations: 10,
condition: '[ "$LOBSTER_LOOP_ITERATION" -lt 2 ]',
},
},
],
};
const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow);

const result = await runWorkflowFile({ filePath: mainPath, ctx });
assert.equal(result.status, 'ok');

const finalCount = parseInt((await fsp.readFile(counterFile, 'utf8')).trim(), 10);
assert.equal(finalCount, 2);
});

test('lobster sub-step loop condition uses LOBSTER_LOOP_STDOUT', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-stdout-'));

const counterFile = path.join(tmpDir, 'count.txt');
await fsp.writeFile(counterFile, '0', 'utf8');

// Sub-workflow: increment counter and echo "done" when count reaches 3
const subWorkflow = {
steps: [
{
id: 'step',
command: `c=$(cat ${counterFile}); n=$((c+1)); echo $n > ${counterFile}; if [ $n -ge 3 ]; then echo "done"; else echo "continue"; fi`,
},
],
};
await writeTmp(tmpDir, 'sub.lobster', subWorkflow);

// Condition: continue while stdout is NOT "done"
const mainWorkflow = {
steps: [
{
id: 'loop',
lobster: './sub.lobster',
loop: {
maxIterations: 10,
condition: '! echo "$LOBSTER_LOOP_STDOUT" | grep -q "^done"',
},
},
],
};
const mainPath = await writeTmp(tmpDir, 'main.lobster', mainWorkflow);

const result = await runWorkflowFile({ filePath: mainPath, ctx });
assert.equal(result.status, 'ok');

const finalCount = parseInt((await fsp.readFile(counterFile, 'utf8')).trim(), 10);
assert.equal(finalCount, 3);
assert.match(result.output[0] as string, /done/);
});

test('loadWorkflowFile rejects lobster step without file', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-val-'));

const workflow = {
steps: [
{ id: 'bad', lobster: '' },
],
};
const filePath = await writeTmp(tmpDir, 'workflow.lobster', workflow);

await assert.rejects(
() => runWorkflowFile({ filePath, ctx }),
/'lobster' must be a non-empty file path/,
);
});