Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async function handleRun({ argv, registry }) {
stderr: process.stderr,
env: process.env,
mode: normalizedMode,
registry,
},
});

Expand Down Expand Up @@ -314,6 +315,7 @@ async function handleResume({ argv, registry }) {
stderr: process.stderr,
env: process.env,
mode: 'tool',
registry,
},
resume: payload,
approved: true,
Expand Down
8 changes: 7 additions & 1 deletion src/commands/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ import { gogGmailSearchCommand } from "./stdlib/gog_gmail_search.js";
import { gogGmailSendCommand } from "./stdlib/gog_gmail_send.js";
import { emailTriageCommand } from "./stdlib/email_triage.js";

export function createDefaultRegistry() {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export interface Registry {
get(name: string): any;
list(): string[];
}

export function createDefaultRegistry(): Registry {
const commands = new Map();

for (const cmd of [
Expand Down
3 changes: 0 additions & 3 deletions src/commands/stdlib/llm_task_invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ type CacheEntry = {
storedAt: string;
};

type Transport = 'clawd';

export const llmTaskInvokeCommand = {
name: 'llm_task.invoke',
meta: {
Expand Down Expand Up @@ -198,7 +196,6 @@ export const llmTaskInvokeCommand = {
const env = ctx.env ?? process.env;

const clawdUrl = String(env.CLAWD_URL ?? '').trim();
const transport: Transport = 'clawd';
if (!clawdUrl) {
throw new Error('llm_task.invoke requires CLAWD_URL (run via Clawdbot gateway)');
}
Expand Down
97 changes: 89 additions & 8 deletions src/workflows/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { randomUUID } from 'node:crypto';

import { encodeToken } from '../token.js';
import { readStateJson, writeStateJson } from '../state/store.js';
import { parsePipeline } from '../parser.js';
import { runPipeline } from '../runtime.js';
import type { Registry } from '../commands/registry.js';

export type WorkflowFile = {
name?: string;
Expand All @@ -18,7 +21,8 @@ export type WorkflowFile = {

export type WorkflowStep = {
id: string;
command: string;
command?: string;
pipeline?: string;
env?: Record<string, string>;
cwd?: string;
stdin?: unknown;
Expand Down Expand Up @@ -53,6 +57,7 @@ type RunContext = {
stderr: NodeJS.WritableStream;
env: Record<string, string | undefined>;
mode: 'human' | 'tool' | 'sdk';
registry?: Registry;
};

export type WorkflowResumePayload = {
Expand Down Expand Up @@ -98,8 +103,13 @@ export async function loadWorkflowFile(filePath: string): Promise<WorkflowFile>
if (!step.id || typeof step.id !== 'string') {
throw new Error('Workflow step requires an id');
}
if (!step.command || typeof step.command !== 'string') {
throw new Error(`Workflow step ${step.id} requires a command string`);
const hasCommand = step.command && typeof step.command === 'string';
const hasPipeline = step.pipeline && typeof step.pipeline === 'string';
if (!hasCommand && !hasPipeline) {
throw new Error(`Workflow step "${step.id}" requires either a "command" or "pipeline" field`);
}
if (hasCommand && hasPipeline) {
throw new Error(`Workflow step "${step.id}" has both "command" and "pipeline" -- use exactly one`);
}
if (seen.has(step.id)) {
throw new Error(`Duplicate workflow step id: ${step.id}`);
Expand Down Expand Up @@ -174,15 +184,29 @@ export async function runWorkflowFile({
continue;
}

const command = resolveTemplate(step.command, resolvedArgs, results);
const stdinValue = resolveStdin(step.stdin, resolvedArgs, results);
const env = mergeEnv(ctx.env, workflow.env, step.env, resolvedArgs, results);
const cwd = resolveCwd(step.cwd ?? workflow.cwd, resolvedArgs);

const { stdout } = await runShellCommand({ command, stdin: stdinValue, env, cwd });
const json = parseJson(stdout);
let stepStdout: string;
let stepJson: unknown;

results[step.id] = { id: step.id, stdout, json };
if (step.pipeline) {
if (step.cwd !== undefined || workflow.cwd !== undefined) {
throw new Error(`Workflow step "${step.id}": "cwd" is not supported for pipeline steps`);
}
const expr = resolveTemplate(step.pipeline, resolvedArgs, results);
const result = await runPipelineStep({ pipelineExpr: expr, stdin: stdinValue, env, ctx });
stepStdout = result.stdout;
stepJson = result.json;
} else {
const command = resolveTemplate(step.command!, resolvedArgs, results);
const cwd = resolveCwd(step.cwd ?? workflow.cwd, resolvedArgs);
const { stdout } = await runShellCommand({ command, stdin: stdinValue, env, cwd });
stepStdout = stdout;
stepJson = parseJson(stdout);
}

results[step.id] = { id: step.id, stdout: stepStdout, json: stepJson };
lastStepId = step.id;

if (isApprovalStep(step.approval)) {
Expand Down Expand Up @@ -463,6 +487,63 @@ function readLine(stdin: NodeJS.ReadableStream) {
});
}

async function runPipelineStep({
pipelineExpr,
stdin,
env,
ctx,
}: {
pipelineExpr: string;
stdin: string | null;
env: Record<string, string | undefined>;
ctx: RunContext;
}): Promise<{ stdout: string; json: unknown }> {
if (!ctx.registry) {
throw new Error('pipeline step requires a registry in the run context');
}

const pipeline = parsePipeline(pipelineExpr);
const input = stdinToStream(stdin);

const result = await runPipeline({
pipeline,
registry: ctx.registry,
input,
stdin: ctx.stdin,
stdout: ctx.stdout,
stderr: ctx.stderr,
env,
mode: ctx.mode,
});

const items = result.items;
if (items.length === 0) {
return { stdout: '', json: undefined };
}
const json = items.length === 1 ? items[0] : items;
return { stdout: JSON.stringify(json), json };
}

async function* stdinToStream(stdin: string | null): AsyncGenerator<unknown> {
if (stdin === null || stdin === '') return;
const trimmed = stdin.trim();
if (!trimmed) return;

try {
const parsed = JSON.parse(trimmed);
if (Array.isArray(parsed)) {
for (const item of parsed) yield item;
} else {
yield parsed;
}
return;
} catch {
// Not valid JSON -- yield as a single string item.
}

yield stdin;
}

async function runShellCommand({
command,
stdin,
Expand Down
Loading