diff --git a/src/cli.ts b/src/cli.ts index 3f78b7a..a9bf705 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -87,6 +87,7 @@ async function handleRun({ argv, registry }) { stderr: process.stderr, env: process.env, mode: normalizedMode, + registry, }, }); @@ -314,6 +315,7 @@ async function handleResume({ argv, registry }) { stderr: process.stderr, env: process.env, mode: 'tool', + registry, }, resume: payload, approved: true, diff --git a/src/commands/registry.ts b/src/commands/registry.ts index 74ef5f5..fc21b51 100644 --- a/src/commands/registry.ts +++ b/src/commands/registry.ts @@ -21,7 +21,13 @@ import { gogGmailSearchCommand } from "./stdlib/gog_gmail_search.js"; import { gogGmailSendCommand } from "./stdlib/gog_gmail_send.js"; import { emailTriageCommand } from "./stdlib/email_triage.js"; -export function createDefaultRegistry() { +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export interface Registry { + get(name: string): any; + list(): string[]; +} + +export function createDefaultRegistry(): Registry { const commands = new Map(); for (const cmd of [ diff --git a/src/commands/stdlib/llm_task_invoke.ts b/src/commands/stdlib/llm_task_invoke.ts index 86ece36..700b48a 100644 --- a/src/commands/stdlib/llm_task_invoke.ts +++ b/src/commands/stdlib/llm_task_invoke.ts @@ -146,8 +146,6 @@ type CacheEntry = { storedAt: string; }; -type Transport = 'clawd'; - export const llmTaskInvokeCommand = { name: 'llm_task.invoke', meta: { @@ -198,7 +196,6 @@ export const llmTaskInvokeCommand = { const env = ctx.env ?? process.env; const clawdUrl = String(env.CLAWD_URL ?? '').trim(); - const transport: Transport = 'clawd'; if (!clawdUrl) { throw new Error('llm_task.invoke requires CLAWD_URL (run via Clawdbot gateway)'); } diff --git a/src/workflows/file.ts b/src/workflows/file.ts index be17fee..42819d3 100644 --- a/src/workflows/file.ts +++ b/src/workflows/file.ts @@ -6,6 +6,9 @@ import { randomUUID } from 'node:crypto'; import { encodeToken } from '../token.js'; import { readStateJson, writeStateJson } from '../state/store.js'; +import { parsePipeline } from '../parser.js'; +import { runPipeline } from '../runtime.js'; +import type { Registry } from '../commands/registry.js'; export type WorkflowFile = { name?: string; @@ -18,7 +21,8 @@ export type WorkflowFile = { export type WorkflowStep = { id: string; - command: string; + command?: string; + pipeline?: string; env?: Record; cwd?: string; stdin?: unknown; @@ -53,6 +57,7 @@ type RunContext = { stderr: NodeJS.WritableStream; env: Record; mode: 'human' | 'tool' | 'sdk'; + registry?: Registry; }; export type WorkflowResumePayload = { @@ -98,8 +103,13 @@ export async function loadWorkflowFile(filePath: string): Promise if (!step.id || typeof step.id !== 'string') { throw new Error('Workflow step requires an id'); } - if (!step.command || typeof step.command !== 'string') { - throw new Error(`Workflow step ${step.id} requires a command string`); + const hasCommand = step.command && typeof step.command === 'string'; + const hasPipeline = step.pipeline && typeof step.pipeline === 'string'; + if (!hasCommand && !hasPipeline) { + throw new Error(`Workflow step "${step.id}" requires either a "command" or "pipeline" field`); + } + if (hasCommand && hasPipeline) { + throw new Error(`Workflow step "${step.id}" has both "command" and "pipeline" -- use exactly one`); } if (seen.has(step.id)) { throw new Error(`Duplicate workflow step id: ${step.id}`); @@ -174,15 +184,29 @@ export async function runWorkflowFile({ continue; } - const command = resolveTemplate(step.command, resolvedArgs, results); const stdinValue = resolveStdin(step.stdin, resolvedArgs, results); const env = mergeEnv(ctx.env, workflow.env, step.env, resolvedArgs, results); - const cwd = resolveCwd(step.cwd ?? workflow.cwd, resolvedArgs); - const { stdout } = await runShellCommand({ command, stdin: stdinValue, env, cwd }); - const json = parseJson(stdout); + let stepStdout: string; + let stepJson: unknown; - results[step.id] = { id: step.id, stdout, json }; + if (step.pipeline) { + if (step.cwd !== undefined || workflow.cwd !== undefined) { + throw new Error(`Workflow step "${step.id}": "cwd" is not supported for pipeline steps`); + } + const expr = resolveTemplate(step.pipeline, resolvedArgs, results); + const result = await runPipelineStep({ pipelineExpr: expr, stdin: stdinValue, env, ctx }); + stepStdout = result.stdout; + stepJson = result.json; + } else { + const command = resolveTemplate(step.command!, resolvedArgs, results); + const cwd = resolveCwd(step.cwd ?? workflow.cwd, resolvedArgs); + const { stdout } = await runShellCommand({ command, stdin: stdinValue, env, cwd }); + stepStdout = stdout; + stepJson = parseJson(stdout); + } + + results[step.id] = { id: step.id, stdout: stepStdout, json: stepJson }; lastStepId = step.id; if (isApprovalStep(step.approval)) { @@ -463,6 +487,63 @@ function readLine(stdin: NodeJS.ReadableStream) { }); } +async function runPipelineStep({ + pipelineExpr, + stdin, + env, + ctx, +}: { + pipelineExpr: string; + stdin: string | null; + env: Record; + ctx: RunContext; +}): Promise<{ stdout: string; json: unknown }> { + if (!ctx.registry) { + throw new Error('pipeline step requires a registry in the run context'); + } + + const pipeline = parsePipeline(pipelineExpr); + const input = stdinToStream(stdin); + + const result = await runPipeline({ + pipeline, + registry: ctx.registry, + input, + stdin: ctx.stdin, + stdout: ctx.stdout, + stderr: ctx.stderr, + env, + mode: ctx.mode, + }); + + const items = result.items; + if (items.length === 0) { + return { stdout: '', json: undefined }; + } + const json = items.length === 1 ? items[0] : items; + return { stdout: JSON.stringify(json), json }; +} + +async function* stdinToStream(stdin: string | null): AsyncGenerator { + if (stdin === null || stdin === '') return; + const trimmed = stdin.trim(); + if (!trimmed) return; + + try { + const parsed = JSON.parse(trimmed); + if (Array.isArray(parsed)) { + for (const item of parsed) yield item; + } else { + yield parsed; + } + return; + } catch { + // Not valid JSON -- yield as a single string item. + } + + yield stdin; +} + async function runShellCommand({ command, stdin, diff --git a/test/workflow_pipeline_step.test.ts b/test/workflow_pipeline_step.test.ts new file mode 100644 index 0000000..760e988 --- /dev/null +++ b/test/workflow_pipeline_step.test.ts @@ -0,0 +1,268 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import { promises as fsp } from 'node:fs'; +import path from 'node:path'; +import os from 'node:os'; + +import { loadWorkflowFile, runWorkflowFile } from '../src/workflows/file.js'; +import { createDefaultRegistry } from '../src/commands/registry.js'; +import { decodeResumeToken } from '../src/resume.js'; + +function makeCtx({ registry = undefined, mode = 'tool' as const } = {}) { + return { + stdin: process.stdin, + stdout: process.stdout, + stderr: process.stderr, + env: { ...process.env }, + mode, + registry, + }; +} + +async function writeTmpWorkflow(workflow: unknown) { + const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-wf-pipeline-')); + const stateDir = path.join(tmpDir, 'state'); + const filePath = path.join(tmpDir, 'workflow.lobster'); + await fsp.writeFile(filePath, JSON.stringify(workflow, null, 2), 'utf8'); + return { filePath, stateDir }; +} + +// Helper: produce JSON via node -e inside exec --shell, avoiding shell quoting issues. +// The outer double quotes are stripped by the pipeline tokenizer; the inner single quotes +// protect the JS expression from the shell. +function execJson(jsExpr: string) { + return `exec --json --shell "node -e '${jsExpr}'"`; +} + +test('pipeline step: basic single command via registry', async () => { + const registry = createDefaultRegistry(); + const workflow = { + steps: [ + { + id: 'fetch', + pipeline: execJson('process.stdout.write(JSON.stringify([{val:42}]))'), + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx({ registry }); + ctx.env.LOBSTER_STATE_DIR = stateDir; + + const result = await runWorkflowFile({ filePath, ctx }); + + assert.equal(result.status, 'ok'); + assert.deepEqual(result.output, [{ val: 42 }]); +}); + +test('pipeline step: pipe chaining with where filter', async () => { + const registry = createDefaultRegistry(); + const workflow = { + steps: [ + { + id: 'filtered', + pipeline: execJson('process.stdout.write(JSON.stringify([{a:1},{a:2},{a:3}]))') + ' | where a>=2', + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx({ registry }); + ctx.env.LOBSTER_STATE_DIR = stateDir; + + const result = await runWorkflowFile({ filePath, ctx }); + + assert.equal(result.status, 'ok'); + assert.deepEqual(result.output, [{ a: 2 }, { a: 3 }]); +}); + +test('pipeline step: cross-step reference from command to pipeline output', async () => { + const registry = createDefaultRegistry(); + const workflow = { + steps: [ + { + id: 'source', + pipeline: execJson('process.stdout.write(JSON.stringify({key:42}))'), + }, + { + id: 'echo_it', + command: "node -e \"process.stdout.write(JSON.stringify({got: $source.json}))\"", + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx({ registry }); + ctx.env.LOBSTER_STATE_DIR = stateDir; + + const result = await runWorkflowFile({ filePath, ctx }); + + assert.equal(result.status, 'ok'); + assert.deepEqual(result.output, [{ got: { key: 42 } }]); +}); + +test('pipeline step: stdin piping from command to pipeline', async () => { + const registry = createDefaultRegistry(); + const workflow = { + steps: [ + { + id: 'produce', + command: "node -e \"process.stdout.write(JSON.stringify([{x:1},{x:2},{x:3}]))\"", + }, + { + id: 'filter', + pipeline: 'where x>1', + stdin: '$produce.stdout', + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx({ registry }); + ctx.env.LOBSTER_STATE_DIR = stateDir; + + const result = await runWorkflowFile({ filePath, ctx }); + + assert.equal(result.status, 'ok'); + assert.deepEqual(result.output, [{ x: 2 }, { x: 3 }]); +}); + +test('pipeline step: validation rejects both command and pipeline', async () => { + const workflow = { + steps: [ + { + id: 'bad', + command: 'echo hello', + pipeline: 'exec --json "echo 1"', + }, + ], + }; + + const { filePath } = await writeTmpWorkflow(workflow); + await assert.rejects( + () => loadWorkflowFile(filePath), + { message: 'Workflow step "bad" has both "command" and "pipeline" -- use exactly one' }, + ); +}); + +test('pipeline step: validation rejects neither command nor pipeline', async () => { + const workflow = { + steps: [ + { + id: 'empty', + }, + ], + }; + + const { filePath } = await writeTmpWorkflow(workflow); + await assert.rejects( + () => loadWorkflowFile(filePath), + { message: 'Workflow step "empty" requires either a "command" or "pipeline" field' }, + ); +}); + +test('pipeline step: step-level cwd on pipeline step throws', async () => { + const registry = createDefaultRegistry(); + const workflow = { + steps: [ + { + id: 'bad_cwd', + pipeline: execJson('process.stdout.write(JSON.stringify([1]))'), + cwd: '/tmp', + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx({ registry }); + ctx.env.LOBSTER_STATE_DIR = stateDir; + + await assert.rejects( + () => runWorkflowFile({ filePath, ctx }), + { message: 'Workflow step "bad_cwd": "cwd" is not supported for pipeline steps' }, + ); +}); + +test('pipeline step: workflow-level cwd on pipeline step throws', async () => { + const registry = createDefaultRegistry(); + const workflow = { + cwd: '/tmp', + steps: [ + { + id: 'inherits_cwd', + pipeline: execJson('process.stdout.write(JSON.stringify([1]))'), + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx({ registry }); + ctx.env.LOBSTER_STATE_DIR = stateDir; + + await assert.rejects( + () => runWorkflowFile({ filePath, ctx }), + { message: 'Workflow step "inherits_cwd": "cwd" is not supported for pipeline steps' }, + ); +}); + +test('pipeline step: missing registry throws clear error', async () => { + const workflow = { + steps: [ + { + id: 'needs_registry', + pipeline: execJson('process.stdout.write(JSON.stringify([1]))'), + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx(); // no registry + ctx.env.LOBSTER_STATE_DIR = stateDir; + + await assert.rejects( + () => runWorkflowFile({ filePath, ctx }), + { message: 'pipeline step requires a registry in the run context' }, + ); +}); + +test('pipeline step: approval triggers halt and resume', async () => { + const registry = createDefaultRegistry(); + const workflow = { + steps: [ + { + id: 'data', + pipeline: execJson('process.stdout.write(JSON.stringify([{item:1}]))'), + approval: 'required', + }, + { + id: 'after', + command: "node -e \"process.stdout.write(JSON.stringify({done:true}))\"", + condition: '$data.approved', + }, + ], + }; + + const { filePath, stateDir } = await writeTmpWorkflow(workflow); + const ctx = makeCtx({ registry }); + ctx.env.LOBSTER_STATE_DIR = stateDir; + + // First run: should halt for approval. + const first = await runWorkflowFile({ filePath, ctx }); + + assert.equal(first.status, 'needs_approval'); + assert.ok(first.requiresApproval?.resumeToken); + + // Resume with approval. + const payload = decodeResumeToken(first.requiresApproval!.resumeToken!); + assert.equal(payload.kind, 'workflow-file'); + + const resumed = await runWorkflowFile({ + filePath, + ctx: { ...ctx, env: { ...ctx.env, LOBSTER_STATE_DIR: stateDir } }, + resume: payload, + approved: true, + }); + + assert.equal(resumed.status, 'ok'); + assert.deepEqual(resumed.output, [{ done: true }]); +});