diff --git a/examples/puppeteer-worker-example/app/ai/agents/puppeteer/workers/pdf.worker.ts b/examples/puppeteer-worker-example/app/ai/agents/puppeteer/workers/pdf.worker.ts index c8a7ca0..09e83a1 100644 --- a/examples/puppeteer-worker-example/app/ai/agents/puppeteer/workers/pdf.worker.ts +++ b/examples/puppeteer-worker-example/app/ai/agents/puppeteer/workers/pdf.worker.ts @@ -43,6 +43,20 @@ type Output = z.infer; export const workerConfig: WorkerConfig = { timeout: 300, // 5 minutes memorySize: 1024, // 1GB +// schedule: [{ +// method: 'scheduler', +// rate: ['cron(0 0/4 ? * MON-FRI *)'], +// timezone: 'America/New_York', +// input: { key1: 'value1' } +// }, +// { +// rate: 'rate(10 minutes)', +// enabled: true, +// input: { key1: 'value1', key2: 'value2' } +// }, +// 'rate(2 hours)', +// { rate: 'cron(0 12 * * ? *)', enabled: false } +// ] }; export const pdfWorker = createWorker({ diff --git a/examples/root/app/ai/agents/system/index.ts b/examples/root/app/ai/agents/system/index.ts index ddf7d49..7e2b2eb 100644 --- a/examples/root/app/ai/agents/system/index.ts +++ b/examples/root/app/ai/agents/system/index.ts @@ -16,15 +16,18 @@ export const systemAgent = aiRouter inputSchema: z.object({}), execute: async () => ({ result: new Date().toLocaleDateString() }), metadata: { - hideUI: true, + hideUI: false, // Set to false so return value is written to stream for workflow steps }, }) .agent('/current_time', async (ctx) => { const { format } = ctx.request.params; + ctx.response.write({ type: 'data-start', data: 'Getting current time...' }); + const currentTime = new Date().toLocaleTimeString('en-US', { + hour12: format === '12h', + }) + ctx.response.write({ type: 'data-end', data: `${currentTime}` }); return { - result: new Date().toLocaleTimeString('en-US', { - hour12: format === '12h', - }), + result: currentTime, }; }) .actAsTool('/current_time', { diff --git a/examples/root/app/ai/agents/workflows/onboarding/index.ts b/examples/root/app/ai/agents/workflows/onboarding/index.ts new file mode 100644 index 0000000..31ca994 --- /dev/null +++ b/examples/root/app/ai/agents/workflows/onboarding/index.ts @@ -0,0 +1,19 @@ +// // This file registers the workflow with ai-router. +// // The actual workflow function is defined in workflow.ts to avoid +// // importing from @microfox/ai-router in the workflow file (which would +// // cause the Workflow DevKit scanner to detect Node.js dependencies). + +// import { createWorkflow } from '@microfox/ai-router/workflow'; +// import { onboardingWorkflowFn, onboardingInputSchema, onboardingOutputSchema } from './workflow'; + +// export const onboardingWorkflow = createWorkflow({ +// id: 'onboarding-workflow-v1', +// version: '1.0', +// input: onboardingInputSchema, +// output: onboardingOutputSchema, +// // External runtime entrypoint +// workflowFn: onboardingWorkflowFn, +// }); + +// // Re-export the workflow function for direct use if needed +// export { onboardingWorkflowFn }; diff --git a/examples/root/app/ai/agents/workflows/onboarding/workflow.ts b/examples/root/app/ai/agents/workflows/onboarding/workflow.ts new file mode 100644 index 0000000..107e920 --- /dev/null +++ b/examples/root/app/ai/agents/workflows/onboarding/workflow.ts @@ -0,0 +1,106 @@ +// import { z } from 'zod'; +// import { defineHook } from 'workflow'; + +// // Define input/output schemas +// const onboardingInputSchema = z.object({ +// email: z.string().email(), +// name: z.string(), +// }); +// const onboardingOutputSchema = z.object({ status: z.string(), userId: z.string().optional() }); + +// // Define verification payload schema for type safety +// const verificationPayloadSchema = z.object({ +// type: z.enum(['email_click', 'admin_override']), +// verifiedAt: z.string().optional(), +// }); + +// // Define the hook using defineHook for type safety +// const verificationHookSchema = defineHook({ +// schema: verificationPayloadSchema, +// }); + +// // Define step functions using the official workflow package +// // Steps must have the "use step" directive and are called via step() +// async function createUser(email: string, name: string) { +// "use step"; +// // Simulate user creation +// await new Promise(resolve => setTimeout(resolve, 500)); +// const userId = `user_${Date.now()}`; +// console.log(`[CREATE USER] Created user ${userId} for ${email}`); +// return { userId, email }; +// } + +// async function sendVerificationEmail(userId: string, email: string, verificationUrl: string) { +// "use step"; +// // Simulate email sending +// console.log(`[EMAIL] Sending verification to ${email}: ${verificationUrl}`); +// await new Promise(resolve => setTimeout(resolve, 300)); +// return { sent: true }; +// } + +// async function markVerified(userId: string, method: 'email' | 'admin') { +// "use step"; +// console.log(`[VERIFY] User ${userId} verified via ${method}`); +// await new Promise(resolve => setTimeout(resolve, 200)); +// return { verified: true }; +// } + +// async function sendWelcomeEmail(userId: string, email: string) { +// "use step"; +// console.log(`[EMAIL] Sending welcome email to ${email}`); +// await new Promise(resolve => setTimeout(resolve, 300)); +// return { sent: true }; +// } + + +// // External runtime entrypoint: `"use workflow"` function for onboarding. +// // IMPORTANT: This function must be exported directly for the Workflow DevKit +// // to process it at build time. The "use workflow" directive must be the first +// // statement in the function body. +// export async function onboardingWorkflowFn(input: z.infer) { +// "use workflow"; + +// const { email, name } = input; + +// // Step 1: Create user - call step function directly (Workflow DevKit intercepts via "use step") +// const user = await createUser(email, name); + +// // Step 2: Create hook for email verification HITL +// // Using defineHook pattern - create hook instance with custom token +// // Token pattern: onboarding-verification:${email} +// // NOTE: This will cause conflicts if multiple workflows start with same input! +// // For production, use a unique identifier (runId, timestamp, or UUID) in the token +// const hook = verificationHookSchema.create({ +// token: `onboarding-verification:${email}`, +// }); + +// // Log the token so it can be retrieved +// console.log(`[HITL] Hook token: ${hook.token}`); + +// // Step 3: Send verification email +// // Note: The actual signal URL will be constructed by the frontend using the runId +// // from the workflow status response. The hook token is deterministic for lookup. +// const verificationMessage = `Please verify your email. Use the workflow status endpoint to get the runId and call the signal endpoint.`; +// await sendVerificationEmail(user.userId, user.email, verificationMessage); + +// // Step 4: Wait for verification (HITL pause) +// // Workflow pauses here until someone calls the signal endpoint with the payload +// // No compute resources are consumed while waiting - the workflow status should be "paused" +// const verification = await hook; +// console.log(`[HITL] Received verification for user ${user.userId}:`, verification); + +// // Step 5: Mark as verified based on verification type +// await markVerified(user.userId, verification.type === 'email_click' ? 'email' : 'admin'); + +// // Step 6: Send welcome email - call step function directly +// await sendWelcomeEmail(user.userId, user.email); + +// return { +// status: 'completed', +// userId: user.userId, +// }; +// } + +// // Export schemas for use in registration file +// export { onboardingInputSchema, onboardingOutputSchema }; + diff --git a/examples/root/app/ai/agents/workflows/research/index.ts b/examples/root/app/ai/agents/workflows/research/index.ts new file mode 100644 index 0000000..b755328 --- /dev/null +++ b/examples/root/app/ai/agents/workflows/research/index.ts @@ -0,0 +1,19 @@ +// // This file registers the workflow with ai-router. +// // The actual workflow function is defined in workflow.ts to avoid +// // importing from @microfox/ai-router in the workflow file (which would +// // cause the Workflow DevKit scanner to detect Node.js dependencies). + +// import { createWorkflow } from '@microfox/ai-router'; +// import { researchWorkflowFn, researchInputSchema, researchOutputSchema } from './workflow'; + +// export const researchWorkflow = createWorkflow({ +// id: 'research-workflow-v1', +// version: '1.0', +// input: researchInputSchema, +// output: researchOutputSchema, +// // External runtime entrypoint +// workflowFn: researchWorkflowFn, +// }); + +// // Re-export the workflow function for direct use if needed +// export { researchWorkflowFn }; diff --git a/examples/root/app/ai/agents/workflows/research/workflow.ts b/examples/root/app/ai/agents/workflows/research/workflow.ts new file mode 100644 index 0000000..a48c548 --- /dev/null +++ b/examples/root/app/ai/agents/workflows/research/workflow.ts @@ -0,0 +1,118 @@ +// import { z } from 'zod'; +// import { defineHook, sleep } from 'workflow'; + +// // Define input/output schemas +// const researchInputSchema = z.object({ +// topic: z.string(), +// email: z.string().email(), +// }); +// const researchOutputSchema = z.object({ status: z.string(), summaryUrl: z.string().optional() }); + +// // Define approval payload schema for type safety +// const approvalPayloadSchema = z.object({ +// decision: z.enum(['approve', 'reject']), +// comments: z.string().optional(), +// }); + +// // Define the hook using defineHook for type safety +// // This creates a typed hook that can be awaited in the workflow +// const approvalHookSchema = defineHook({ +// schema: approvalPayloadSchema, +// }); + +// // Define step functions using the official workflow package +// // Steps must have the "use step" directive and are called via step() +// async function searchWeb(query: string) { +// "use step"; +// // Simulate web search - in real app, this would call Brave Search API +// await new Promise(resolve => setTimeout(resolve, 1000)); // Simulate delay +// return [ +// `Result 1: Comprehensive analysis of ${query}`, +// `Result 2: Latest trends in ${query}`, +// `Result 3: Expert opinions on ${query}`, +// ]; +// } + +// async function summarizeResults(results: string[]) { +// "use step"; +// // Simulate summarization +// await new Promise(resolve => setTimeout(resolve, 500)); +// return { +// summary: `Summary of ${results.length} research results`, +// keyPoints: results.slice(0, 3), +// }; +// } + +// async function sendEmail(body: string, recipient: string) { +// "use step"; +// // Simulate email sending +// console.log(`[EMAIL] Sending to ${recipient}: ${body.substring(0, 50)}...`); +// await new Promise(resolve => setTimeout(resolve, 300)); +// return { +// success: true, +// messageId: `msg_${Date.now()}`, +// }; +// } + +// // External runtime entrypoint: `"use workflow"` function that orchestrates steps. +// // This will be used by the official `workflow` runtime via the adapter. +// // IMPORTANT: This function must be exported directly for the Workflow DevKit +// // to process it at build time. The "use workflow" directive must be the first +// // statement in the function body. +// export async function researchWorkflowFn(input: z.infer) { +// "use workflow"; + +// const { topic, email } = input; + +// // Step 1: Search - call step function directly (Workflow DevKit intercepts via "use step") +// const results = await searchWeb(topic); + +// if (results.length === 0) { +// return { status: 'failed', summaryUrl: undefined }; +// } + +// // Step 2: Summarize - call step function directly +// const summary = await summarizeResults(results); + +// await sleep("1 min"); + +// // Step 3: Create hook for human approval (HITL) +// // Using defineHook pattern - create hook instance with custom token +// // The workflow will pause at await hook until the hook is resumed +// // Token pattern: research-approval:${topic}:${email} +// // NOTE: This is deterministic - the frontend can construct this token +// // If multiple workflows start with the same input, there will be token conflicts +// // For production, consider including runId or using unique identifiers +// const hook = approvalHookSchema.create({ +// token: `research-approval:${topic}:${email}`, +// }); + +// console.log(`[HITL] Waiting for approval of research summary for topic: ${topic}`); +// console.log(`[HITL] Hook token: ${hook.token}`); + +// // Step 4: Wait for human approval (HITL pause) +// // Workflow pauses here until someone calls the signal endpoint with approval/rejection +// // No compute resources are consumed while waiting - the workflow status should be "paused" +// const approval = await hook; +// console.log(`[HITL] Received approval decision:`, approval); + +// if (approval.decision === 'reject') { +// return { +// status: 'rejected', +// summaryUrl: undefined, +// }; +// } + +// // Step 5: Send email with approved summary +// const emailBody = `${summary.summary}\n\nKey Points:\n${summary.keyPoints.map((p: string) => `- ${p}`).join('\n')}`; +// const emailResult = await sendEmail(emailBody, email); + +// return { +// status: 'completed', +// summaryUrl: `https://example.com/summary/${emailResult.messageId}`, +// }; +// } + +// // Export schemas for use in registration file +// export { researchInputSchema, researchOutputSchema }; + diff --git a/examples/root/app/ai/agents/workflows/shared.ts b/examples/root/app/ai/agents/workflows/shared.ts new file mode 100644 index 0000000..ef9fffc --- /dev/null +++ b/examples/root/app/ai/agents/workflows/shared.ts @@ -0,0 +1,22 @@ +// import { researchWorkflow } from './research'; +// import { onboardingWorkflow } from './onboarding'; +// import { AiRouter } from '@microfox/ai-router'; + +// // Shared router instance for workflows +// // This router will be mounted at /workflows in the main router +// export const aiRouter = new AiRouter(); + +// export const aiWorkflowRouter = aiRouter.useWorkflow( +// '/research', +// researchWorkflow, +// { +// exposeAsTool: true, +// } +// ) +// .useWorkflow( +// '/onboarding', +// onboardingWorkflow, +// { +// exposeAsTool: true, +// } +// ) diff --git a/examples/root/app/ai/index.ts b/examples/root/app/ai/index.ts index 07fd642..204607c 100644 --- a/examples/root/app/ai/index.ts +++ b/examples/root/app/ai/index.ts @@ -14,17 +14,21 @@ import { thinkerAgent } from './agents/thinker'; import { contextLimiter } from './middlewares/contextLimiter'; import { onlyTextParts } from './middlewares/onlyTextParts'; -const aiRouter = new AiRouter(); +const aiRouter = new AiRouter(undefined, undefined); // aiRouter.setLogger(console); +// import { aiWorkflowRouter as workflowRouter } from './agents/workflows/shared'; + const aiMainRouter = aiRouter .agent('/system', systemAgent) .agent('/summarize', summarizeAgent) .agent('/research', braveResearchAgent) .agent('/thinker', thinkerAgent) - .use('/', contextLimiter(5)) - .use('/', onlyTextParts(100)) - .agent('/', async (props) => { + // Mount workflow router as sub-router + // .agent('/workflows', workflowRouter) + .before('/', contextLimiter(5)) + .before('/', onlyTextParts(100)) + .agent('/', async (props: any) => { // show a loading indicator props.response.writeMessageMetadata({ loader: 'Thinking...', @@ -52,7 +56,7 @@ const aiMainRouter = aiRouter stepCountIs(10), ({ steps }) => steps.some((step) => - step.toolResults.some((tool) => tool.output?._isFinal), + step.toolResults.some((tool: any) => tool.output?._isFinal), ), ], onError: (error) => { @@ -74,6 +78,7 @@ const aiMainRouter = aiRouter // console.log('--------REGISTRY--------'); const aiRouterRegistry = aiMainRouter.registry(); +// console.log('Workflow paths:', Object.keys(aiRouterRegistry.map).filter(p => p.includes('workflow'))); const aiRouterTools = aiRouterRegistry.tools; type AiRouterTools = InferUITools; // console.log('--------REGISTRY--------'); diff --git a/examples/root/app/api/studio/chat/route.ts b/examples/root/app/api/studio/chat/route.ts index 84f88fa..730a1cb 100644 --- a/examples/root/app/api/studio/chat/route.ts +++ b/examples/root/app/api/studio/chat/route.ts @@ -16,7 +16,7 @@ export async function POST(req: NextRequest) { const revalidatePath = lastMessage?.metadata?.revalidatePath; return aiMainRouter - .use( + .before( '/', StudioConfig.studioSettings.database.type === 'upstash-redis' ? chatRestoreUpstash diff --git a/examples/root/app/api/studio/chat/sessions/chatSessionLocal.ts b/examples/root/app/api/studio/chat/sessions/chatSessionLocal.ts index 82e510b..be35fc5 100644 --- a/examples/root/app/api/studio/chat/sessions/chatSessionLocal.ts +++ b/examples/root/app/api/studio/chat/sessions/chatSessionLocal.ts @@ -139,10 +139,7 @@ export const sessionLocalListOut = async () => { * @param next - The next middleware or router * @returns */ -export const chatRestoreLocal: AiMiddleware<{ - sessionId: string; - loader?: string; -}> = async (props, next) => { +export const chatRestoreLocal: AiMiddleware = async (props, next) => { try { const { sessionId, messages } = props.request; diff --git a/examples/root/app/api/studio/chat/sessions/chatSessionUpstash.ts b/examples/root/app/api/studio/chat/sessions/chatSessionUpstash.ts index 5bb41cd..bbb1c67 100644 --- a/examples/root/app/api/studio/chat/sessions/chatSessionUpstash.ts +++ b/examples/root/app/api/studio/chat/sessions/chatSessionUpstash.ts @@ -40,10 +40,7 @@ export const messageStore = new CrudHash( * @param next - The next middleware or router * @returns */ -export const chatRestoreUpstash: AiMiddleware<{ - sessionId: string; - loader?: string; -}> = async (props, next) => { +export const chatRestoreUpstash: AiMiddleware = async (props, next) => { try { const { sessionId, messages } = props.request; if (!sessionId || sessionId === 'undefined') { diff --git a/examples/root/app/api/studio/workflow/agent/[...slug]/route.ts b/examples/root/app/api/studio/workflow/agent/[...slug]/route.ts new file mode 100644 index 0000000..570a2df --- /dev/null +++ b/examples/root/app/api/studio/workflow/agent/[...slug]/route.ts @@ -0,0 +1,101 @@ +import { NextRequest, NextResponse } from 'next/server'; +import { agentWorkflowFn } from '../agentWorkflow'; + +// GET example: http://localhost:3000/api/studio/workflow/agent/thinker/questions?userIntent= + +export async function GET(req: NextRequest) { + const agentFullPath = req.nextUrl.href.split('/api/studio/workflow/agent')[1]; + const agentPath = agentFullPath.includes('?') + ? agentFullPath.split('?')[0] + : agentFullPath; + + const searchParams = req.nextUrl.searchParams; + const params: any = {}; + searchParams.entries().forEach(([key, value]) => { + params[key] = value; + }); + + // Construct base URL for the agent endpoint + const baseUrl = req.nextUrl.origin; + + try { + // Import workflow API + const workflowApi = await import('workflow/api'); + const { start } = workflowApi; + + // Start the workflow + const run = await start(agentWorkflowFn, [{ + agentPath, + input: params, + baseUrl: `${baseUrl}/api/studio/chat/agent`, + messages: [], + }]); + + // Get the current status + const status: string = await run.status; + + return NextResponse.json({ + runId: run.runId, + status, + }, { status: 200 }); + } catch (error: any) { + return NextResponse.json( + { error: error?.message || String(error) }, + { status: 500 } + ); + } +} + +// POST example: +// curl -X POST http://localhost:3000/api/studio/workflow/agent/thinker/questions +// -H "Content-Type: application/json" +// -d '{"messages": [{"role": "user", "content": "What is the capital of France?"}], "input": {...}}' + +export async function POST(req: NextRequest) { + const body = await req.json(); + + const agentFullPath = req.nextUrl.href.split('/api/studio/workflow/agent')[1]; + const agentPath = agentFullPath.includes('?') + ? agentFullPath.split('?')[0] + : agentFullPath; + + const searchParams = req.nextUrl.searchParams; + const params: any = {}; + searchParams.entries().forEach(([key, value]) => { + params[key] = value; + }); + + // Extract input and messages from body + const { messages, input, ...restOfBody } = body; + const agentInput = input || restOfBody; + + // Construct base URL for the agent endpoint + const baseUrl = req.nextUrl.origin; + + try { + // Import workflow API + const workflowApi = await import('workflow/api'); + const { start } = workflowApi; + + // Start the workflow + const run = await start(agentWorkflowFn, [{ + agentPath, + input: agentInput, + baseUrl: `${baseUrl}/api/studio/chat/agent`, + messages: messages || [], + }]); + + // Get the current status + const status: string = await run.status; + + return NextResponse.json({ + runId: run.runId, + status, + }, { status: 200 }); + } catch (error: any) { + return NextResponse.json( + { error: error?.message || String(error) }, + { status: 500 } + ); + } +} diff --git a/examples/root/app/api/studio/workflow/agent/agentStep.ts b/examples/root/app/api/studio/workflow/agent/agentStep.ts new file mode 100644 index 0000000..337999f --- /dev/null +++ b/examples/root/app/api/studio/workflow/agent/agentStep.ts @@ -0,0 +1,41 @@ +// Step function that calls the agent via HTTP +// This file must be separate to avoid Next.js dependencies in workflow runtime +export async function callAgentStep(input: { + agentPath: string; + input: any; + baseUrl: string; + messages: any[]; +}) { + "use step"; + + const { agentPath, input: agentInput, baseUrl, messages } = input; + + // Construct the full URL + const url = baseUrl + ? `${baseUrl}${agentPath.startsWith('/') ? agentPath : '/' + agentPath}` + : agentPath; + + // Make HTTP POST request to the agent endpoint + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + messages, + input: agentInput, + params: agentInput, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Agent call failed: ${response.status} ${response.statusText}. ${errorText}` + ); + } + + // Read the response as JSON (UIMessage stream) + const responseData = await response.json(); + return responseData; +} diff --git a/examples/root/app/api/studio/workflow/agent/agentWorkflow.ts b/examples/root/app/api/studio/workflow/agent/agentWorkflow.ts new file mode 100644 index 0000000..44860a9 --- /dev/null +++ b/examples/root/app/api/studio/workflow/agent/agentWorkflow.ts @@ -0,0 +1,16 @@ +import { callAgentStep } from './agentStep'; + +// Workflow function that calls the agent step +// This file must be separate to avoid Next.js dependencies in workflow runtime +export async function agentWorkflowFn(input: { + agentPath: string; + input: any; + baseUrl: string; + messages: any[]; +}) { + "use workflow"; + + // Call the step with the workflow input + const result = await callAgentStep(input); + return result; +} diff --git a/examples/root/app/api/studio/workflow/orchestrate/orchestrateWorkflow.ts b/examples/root/app/api/studio/workflow/orchestrate/orchestrateWorkflow.ts new file mode 100644 index 0000000..64fde84 --- /dev/null +++ b/examples/root/app/api/studio/workflow/orchestrate/orchestrateWorkflow.ts @@ -0,0 +1,184 @@ +// Main orchestration workflow function +// This file must be separate to avoid Next.js dependencies in workflow runtime + +import type { OrchestrationConfig, OrchestrationContext, OrchestrationStep } from '@microfox/ai-router/workflow/orchestrate'; +import { + callAgentStep, + callWorkflowStep, + resolveInput, + resolveToken +} from './steps'; + +// Execute a single step +async function executeStep( + step: OrchestrationStep, + context: OrchestrationContext, + baseUrl: string, + messages: any[] +): Promise { + switch (step.type) { + case 'agent': { + const agentInput = step.input !== undefined + ? resolveInput(step.input, context) + : context.previous || context.input; + + if (step.await === false) { + // Fire-and-forget: start workflow and return immediately + const result = await callWorkflowStep({ + workflowPath: step.agent, + workflowInput: agentInput, + baseUrl, + messages, + }); + + const output = { runId: result.runId, status: result.status }; + + // Update context + if (step.id) { + context.steps[step.id] = output; + } + context.previous = output; + context.all.push(output); + + return output; + } else { + // Blocking: await agent result + const result = await callAgentStep({ + agentPath: step.agent, + agentInput, + baseUrl, + messages, + await: true, + }); + + // Update context + if (step.id) { + context.steps[step.id] = result; + } + context.previous = result; + context.all.push(result); + + return result; + } + } + + case 'hook': { + // defineHook().create() must be called directly in the workflow function, not in a step + const token = resolveToken(step.token, context); + + // Import defineHook from workflow + const { defineHook } = await import('workflow'); + const { z } = await import('zod'); + + // Create a hook schema (using z.any() for generic hooks without schema) + const hookSchema = defineHook({ + schema: step.schema || z.any(), // Use provided schema or default to any + }); + + // Create hook instance with token and await it + const hook = hookSchema.create({ token }); + const payload = await hook; + + const output = { token, payload }; + + // Update context + if (step.id) { + context.steps[step.id] = output; + } + context.previous = output; + context.all.push(output); + + return output; + } + + case 'sleep': { + // sleep() must be called directly in the workflow function, not in a step + // Import sleep from workflow + const { sleep } = await import('workflow'); + + // @ts-expect-error - StringValue type is stricter than string, but runtime accepts both + await sleep(step.duration); + + const output = { slept: step.duration }; + + // Update context + context.previous = output; + context.all.push(output); + + return output; + } + + case 'condition': { + const conditionResult = step.if(context); + const stepsToExecute = conditionResult ? step.then : (step.else || []); + + // Execute steps in the selected branch + for (const branchStep of stepsToExecute) { + await executeStep(branchStep, context, baseUrl, messages); + } + + return { condition: conditionResult }; + } + + case 'parallel': { + // Execute all steps in parallel + const promises = step.steps.map(branchStep => + executeStep(branchStep, context, baseUrl, messages) + ); + + const results = await Promise.all(promises); + + const output = { parallel: results }; + + // Update context + context.previous = output; + context.all.push(output); + + return output; + } + + default: { + const _exhaustive: never = step; + throw new Error(`Unknown step type: ${(_exhaustive as any).type}`); + } + } +} + +// Main workflow function +export async function orchestrateWorkflowFn(input: { + config: OrchestrationConfig; + baseUrl: string; +}) { + "use workflow"; + + const { config, baseUrl } = input; + + // Try to get runId from workflow runtime if available + // Note: This may not be available in all workflow runtime versions + let runId: string | undefined; + try { + const workflowApi = await import('workflow/api'); + // Some workflow runtimes might expose current run context + // For now, we'll leave it undefined if not available + } catch { + // Ignore if not available + } + + const context: OrchestrationContext = { + input: config.input || {}, + steps: {}, + previous: null, + all: [], + runId, + }; + + // Execute steps sequentially + for (const step of config.steps) { + await executeStep(step, context, baseUrl, config.messages || []); + } + + return { + context, + result: context.previous, + }; +} diff --git a/examples/root/app/api/studio/workflow/orchestrate/route.ts b/examples/root/app/api/studio/workflow/orchestrate/route.ts new file mode 100644 index 0000000..79a2e92 --- /dev/null +++ b/examples/root/app/api/studio/workflow/orchestrate/route.ts @@ -0,0 +1,65 @@ +import { NextRequest, NextResponse } from 'next/server'; +import { orchestrateWorkflowFn } from './orchestrateWorkflow'; +import type { OrchestrationConfig } from '@microfox/ai-router/workflow/orchestrate'; + +// POST endpoint to start an orchestration workflow +// Example: curl -X POST http://localhost:3000/api/studio/workflow/orchestrate \ +// -H "Content-Type: application/json" \ +// -d '{"config": {"steps": [...], "input": {...}}, "messages": []}' + +export async function POST(req: NextRequest) { + try { + const body = await req.json(); + const { config, messages, input } = body; + + if (!config || !config.steps || !Array.isArray(config.steps)) { + return NextResponse.json( + { error: 'config with steps array is required' }, + { status: 400 } + ); + } + + // Merge input into config if provided + const orchestrationConfig: OrchestrationConfig = { + ...config, + input: input || config.input, + messages: messages || config.messages || [], + }; + + // Construct base URL for agent calls + const baseUrl = req.nextUrl.origin; + + // Import workflow API + const workflowApi = await import('workflow/api'); + const { start } = workflowApi; + + if (!start) { + throw new Error( + '[orchestrate] `workflow/api` does not export `start`. ' + + 'Check that you are using a compatible version of the workflow runtime.', + ); + } + + // Start the orchestration workflow + const run = await start(orchestrateWorkflowFn, [{ + config: orchestrationConfig, + baseUrl: `${baseUrl}/api/studio/chat/agent`, + }]); + + // Get runId after starting (needed for token generation in context) + const runId = run.runId; + + // Get the current status + const status: string = await run.status; + + return NextResponse.json({ + runId: run.runId, + status, + }, { status: 200 }); + } catch (error: any) { + return NextResponse.json( + { error: error?.message || String(error) }, + { status: 500 } + ); + } +} diff --git a/examples/root/app/api/studio/workflow/orchestrate/steps.ts b/examples/root/app/api/studio/workflow/orchestrate/steps.ts new file mode 100644 index 0000000..e62009a --- /dev/null +++ b/examples/root/app/api/studio/workflow/orchestrate/steps.ts @@ -0,0 +1,186 @@ +// Step execution functions for orchestration workflow +// These must be separate files to avoid Next.js dependencies in workflow runtime + +import type { OrchestrationContext, OrchestrationStep } from '@microfox/ai-router/workflow/orchestrate'; + +// Helper to extract agent return value from UIMessage array +function extractAgentResult(uiMessages: any[]): any { + // UIMessage format: [{ id: "...", parts: [{ type: "...", ... }] }] + // Agent return values can be in various formats + if (!uiMessages || uiMessages.length === 0) { + return null; + } + + // Look for data parts or tool-call-result parts in all messages + for (const message of uiMessages) { + if (message.parts && Array.isArray(message.parts)) { + for (const part of message.parts) { + // Check for tool-call-result parts (most common for agent returns) + // writeCustomTool writes with 'output' field + if (part.type === 'tool-call-result') { + if (part.output !== undefined) { + return part.output; + } + if (part.result !== undefined) { + return part.result; + } + } + // Check for tool-{toolName} parts (format: tool-systemCurrentDate, etc.) + if (part.type?.startsWith('tool-') && part.output !== undefined) { + return part.output; + } + // Check for data parts + if (part.type === 'data' && part.data !== undefined) { + return part.data; + } + // Check for data-end parts + if (part.type === 'data-end' && part.data !== undefined) { + return part.data; + } + } + } + } + + // If no parts found, the agent might have returned a value directly + // Check if the message itself contains the return value + // Some agents return values that get wrapped differently + if (uiMessages.length === 1) { + const message = uiMessages[0]; + // If message has no parts, check if it has a result property + if (!message.parts || message.parts.length === 0) { + // Check for direct result property + if (message.result !== undefined) { + return message.result; + } + // Return the message itself if it looks like a result object + if (typeof message === 'object' && !message.id && !message.parts) { + return message; + } + } + } + + // If we still haven't found anything, return null + // This indicates the agent didn't return any data or the format is unexpected + return null; +} + +// Call agent step (await or fire-and-forget) +export async function callAgentStep(input: { + agentPath: string; + agentInput: any; + baseUrl: string; + messages: any[]; + await: boolean; +}) { + "use step"; + + const { agentPath, agentInput, baseUrl, messages, await: shouldAwait } = input; + + // Construct the full URL - use chat agent endpoint + const url = baseUrl + ? `${baseUrl}${agentPath.startsWith('/') ? agentPath : '/' + agentPath}` + : agentPath; + + // Make HTTP POST request to the agent endpoint + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + messages, + input: agentInput, + params: agentInput, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Agent call failed: ${response.status} ${response.statusText}. ${errorText}` + ); + } + + // Read the response as JSON (UIMessage array) + const uiMessages = await response.json(); + + // Debug: Log the raw response to understand the format + console.log(`[callAgentStep] Agent: ${agentPath}, Raw Response:`, JSON.stringify(uiMessages, null, 2)); + + // Extract the actual agent return value from the UIMessage array + const agentResult = extractAgentResult(uiMessages); + + // Debug: Log the extracted result + console.log(`[callAgentStep] Agent: ${agentPath}, Extracted Result:`, JSON.stringify(agentResult, null, 2)); + + return agentResult; +} + +// Call workflow step (for fire-and-forget agents) +export async function callWorkflowStep(input: { + workflowPath: string; + workflowInput: any; + baseUrl: string; + messages: any[]; +}) { + "use step"; + + const { workflowPath, workflowInput, baseUrl, messages } = input; + + // Construct the workflow API URL + const workflowApiPath = baseUrl + ? `${baseUrl}/api/studio/workflow/agent${workflowPath.startsWith('/') ? workflowPath : '/' + workflowPath}` + : `/api/studio/workflow/agent${workflowPath.startsWith('/') ? workflowPath : '/' + workflowPath}`; + + // Make HTTP POST request to start workflow (fire-and-forget) + const response = await fetch(workflowApiPath, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + input: workflowInput, + messages, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Workflow call failed: ${response.status} ${response.statusText}. ${errorText}` + ); + } + + const result = await response.json(); + return { runId: result.runId, status: result.status }; +} + +// Note: defineHook().create() cannot be called from a step function - it must be called directly +// in the workflow function. The hook step is handled in orchestrateWorkflow.ts +// This function is kept for reference but not used. + +// Note: sleep() cannot be called from a step function - it must be called directly +// in the workflow function. The sleep step is handled in orchestrateWorkflow.ts +// This function is kept for reference but not used. + +// Helper to resolve input value (static or function) +export function resolveInput( + input: any | ((ctx: OrchestrationContext) => any), + context: OrchestrationContext +): any { + if (typeof input === 'function') { + return input(context); + } + return input; +} + +// Helper to resolve token (static or function) +export function resolveToken( + token: string | ((ctx: OrchestrationContext) => string), + context: OrchestrationContext +): string { + if (typeof token === 'function') { + return token(context); + } + return token; +} diff --git a/examples/root/app/api/studio/workflow/signal/route.ts b/examples/root/app/api/studio/workflow/signal/route.ts new file mode 100644 index 0000000..6a30fc4 --- /dev/null +++ b/examples/root/app/api/studio/workflow/signal/route.ts @@ -0,0 +1,90 @@ +import { NextRequest, NextResponse } from 'next/server'; + +// POST endpoint to signal/resume a workflow hook +// Example: curl -X POST http://localhost:3000/api/studio/workflow/signal \ +// -H "Content-Type: application/json" \ +// -d '{"token": "research-approval:topic:email", "payload": {"decision": "approve"}}' + +export async function POST(req: NextRequest) { + try { + const body = await req.json(); + const { token, payload } = body; + + if (!token) { + return NextResponse.json( + { error: 'token is required in request body' }, + { status: 400 } + ); + } + + if (payload === undefined || payload === null) { + return NextResponse.json( + { error: 'payload is required in request body' }, + { status: 400 } + ); + } + + // Import workflow API + const workflowApi = await import('workflow/api'); + const { resumeHook } = workflowApi; + + if (!resumeHook) { + throw new Error( + '[workflow] `workflow/api` does not export `resumeHook`. ' + + 'Check that you are using a compatible version of the workflow runtime.', + ); + } + + // Resume the hook with token and payload + try { + await resumeHook(token, payload); + + // Return success response + return NextResponse.json( + { + status: 'resumed', + message: 'Hook resumed successfully', + }, + { status: 200 } + ); + } catch (error: any) { + // If hook resume fails, try webhook resume as fallback + try { + const { resumeWebhook } = workflowApi; + if (resumeWebhook) { + await resumeWebhook(token, payload); + return NextResponse.json( + { + status: 'resumed', + message: 'Webhook resumed successfully', + }, + { status: 200 } + ); + } + } catch (webhookError: any) { + // If both fail, return the original hook error + return NextResponse.json( + { + error: `Failed to resume workflow hook/webhook: ${error?.message || String(error)}. ` + + `Make sure the token is correct and the workflow is waiting for a signal.`, + }, + { status: 400 } + ); + } + + // If webhook resume also failed or doesn't exist, return hook error + return NextResponse.json( + { + error: `Failed to resume workflow hook: ${error?.message || String(error)}. ` + + `Make sure the token is correct and the workflow is waiting for a signal.`, + }, + { status: 400 } + ); + } + } catch (error: any) { + return NextResponse.json( + { error: error?.message || String(error) }, + { status: 500 } + ); + } +} diff --git a/examples/root/app/api/studio/workflow/status/route.ts b/examples/root/app/api/studio/workflow/status/route.ts new file mode 100644 index 0000000..115740e --- /dev/null +++ b/examples/root/app/api/studio/workflow/status/route.ts @@ -0,0 +1,102 @@ +import { NextRequest, NextResponse } from 'next/server'; + +// GET endpoint to get workflow status by runId +// Example: http://localhost:3000/api/studio/workflow/status?runId=wrun_xxx + +export async function GET(req: NextRequest) { + const searchParams = req.nextUrl.searchParams; + const runId = searchParams.get('runId'); + + if (!runId) { + return NextResponse.json( + { error: 'runId query parameter is required' }, + { status: 400 } + ); + } + + try { + // Import workflow API + const workflowApi = await import('workflow/api'); + const { getRun } = workflowApi; + + if (!getRun) { + throw new Error( + '[workflow] `workflow/api` does not export `getRun`. ' + + 'Check that you are using a compatible version of the workflow runtime.', + ); + } + + // Get the run object + const run = getRun(runId); + if (!run) { + return NextResponse.json( + { error: `Workflow run ${runId} not found` }, + { status: 404 } + ); + } + + // Get the current status + let status: string; + let workflowError: any; + try { + status = await run.status; + try { + const errorValue = await (run as any).error; + if (errorValue) { + workflowError = errorValue; + if (status === 'running' || status === 'pending') { + status = 'failed'; + } + } + } catch { + // run.error might not be available or might throw - that's okay + } + } catch (err: any) { + status = 'error'; + workflowError = err; + } + + // Get result if completed + let result: any; + let error: any; + + if (status === 'completed') { + try { + result = await run.returnValue; + } catch (err: any) { + error = err; + } + } else if (status === 'failed' || status === 'error') { + error = workflowError; + } + + // Build response + const response: any = { + runId, + status, + }; + + if (result !== undefined) { + response.result = result; + } + + if (error) { + response.error = error?.message || String(error); + } + + // If paused, include hook information + if (status === 'paused') { + response.hook = { + token: '', // Token must be provided by caller - construct it using workflow input and runId + type: 'hook', + }; + } + + return NextResponse.json(response, { status: 200 }); + } catch (error: any) { + return NextResponse.json( + { error: error?.message || String(error) }, + { status: 500 } + ); + } +} diff --git a/examples/root/app/page.tsx b/examples/root/app/page.tsx index c149c93..5f77f2a 100644 --- a/examples/root/app/page.tsx +++ b/examples/root/app/page.tsx @@ -45,13 +45,18 @@ export default function Homepage() { {/* Action Buttons */}
- +
diff --git a/examples/root/app/workflows/onboarding/page.tsx b/examples/root/app/workflows/onboarding/page.tsx new file mode 100644 index 0000000..c196dc5 --- /dev/null +++ b/examples/root/app/workflows/onboarding/page.tsx @@ -0,0 +1,245 @@ +'use client'; + +import { useState } from 'react'; +import { Button } from '@/components/ui/button'; +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; +import { Input } from '@/components/ui/input'; +import { Label } from '@/components/ui/label'; +import { Badge } from '@/components/ui/badge'; +import { Alert, AlertDescription } from '@/components/ui/alert'; +import { Loader2, CheckCircle2, XCircle, Clock, Mail } from 'lucide-react'; + +export default function OnboardingWorkflowPage() { + const [name, setName] = useState(''); + const [email, setEmail] = useState(''); + const [loading, setLoading] = useState(false); + const [instanceId, setInstanceId] = useState(null); + const [status, setStatus] = useState(null); + const [error, setError] = useState(null); + + const startWorkflow = async () => { + if (!name || !email) { + setError('Please fill in all fields'); + return; + } + + setLoading(true); + setError(null); + setStatus(null); + + try { + const response = await fetch('/api/studio/chat/agent/workflows/onboarding', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ input: { name, email } }), + }); + + if (!response.ok) { + throw new Error('Failed to start workflow'); + } + + const data = await response.json(); + // Handle the response format: array of messages with parts + // Format: [{"id":"...","parts":[{"type":"tool-...","output":{...}}]}] + const result = data[0]?.parts?.[0]?.output || data[0]?.output || data; + + // Check for error response + if (result?.status === 'error' || result?.error) { + setError(result.error || result.message || 'Workflow execution failed'); + setStatus('error'); + return; + } + + if (result?.runId) { + setInstanceId(result.runId); + setStatus(result.status); + + // Poll for status if not completed + if (result.status !== 'completed' && result.status !== 'failed' && result.status !== 'error') { + pollStatus(result.runId); + } + } else { + setError('Invalid response format: missing runId'); + } + } catch (err: any) { + setError(err.message || 'Failed to start workflow'); + } finally { + setLoading(false); + } + }; + + const pollStatus = async (id: string) => { + const interval = setInterval(async () => { + try { + const response = await fetch(`/api/studio/chat/agent/workflows/onboarding/${id}`); + if (response.ok) { + const data = await response.json(); + // Handle the response format: array of messages with parts + const result = data[0]?.parts?.[0]?.output || data[0]?.output || data; + + // Check for error response + if (result?.status === 'error' || result?.error) { + setError(result.error || result.message || 'Workflow status check failed'); + setStatus('error'); + clearInterval(interval); + return; + } + + if (result && result.status) { + setStatus(result.status); + + if (result.status === 'completed' || result.status === 'rejected' || result.status === 'failed' || result.status === 'error') { + clearInterval(interval); + } + } + } + } catch (err) { + console.error('Failed to poll status', err); + clearInterval(interval); + } + }, 10000); // Poll every 10 seconds as requested + + // Clean up after 5 minutes + setTimeout(() => clearInterval(interval), 5 * 60 * 1000); + }; + + const getStatusBadge = () => { + if (!status) return null; + + switch (status) { + case 'completed': + return Completed; + case 'paused': + return Waiting for Verification; + case 'error': + case 'failed': + return Error; + default: + return {status}; + } + }; + + return ( +
+ + + Onboarding Workflow + + Start a user onboarding workflow with email verification + + + +
+
+ + setName(e.target.value)} + disabled={loading} + /> +
+ +
+ + setEmail(e.target.value)} + disabled={loading} + /> +
+
+ + {error && ( + + {error} + + )} + + {instanceId && ( + + +
+
+ Instance ID: {instanceId} +
+ {getStatusBadge()} +
+ {(status === 'paused' || status === 'running') && ( +
+

+ Workflow is waiting for email verification. You can simulate: +

+
+ + +
+
+ )} +
+
+ )} + + +
+
+
+ ); +} + diff --git a/examples/root/app/workflows/orchestrate/page.tsx b/examples/root/app/workflows/orchestrate/page.tsx new file mode 100644 index 0000000..2c044e8 --- /dev/null +++ b/examples/root/app/workflows/orchestrate/page.tsx @@ -0,0 +1,350 @@ +'use client'; + +import { useState } from 'react'; +import { Button } from '@/components/ui/button'; +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; +import { Input } from '@/components/ui/input'; +import { Label } from '@/components/ui/label'; +import { Badge } from '@/components/ui/badge'; +import { Alert, AlertDescription } from '@/components/ui/alert'; +import { Textarea } from '@/components/ui/textarea'; +import { Loader2, CheckCircle2, XCircle, Clock, Play, Pause } from 'lucide-react'; + +export default function OrchestrateWorkflowPage() { + const [topic, setTopic] = useState(''); + const [userId, setUserId] = useState(''); + const [loading, setLoading] = useState(false); + const [runId, setRunId] = useState(null); + const [status, setStatus] = useState(null); + const [error, setError] = useState(null); + const [hookToken, setHookToken] = useState(null); + const [result, setResult] = useState(null); + + const startWorkflow = async () => { + if (!topic || !userId) { + setError('Please fill in all fields'); + return; + } + + setLoading(true); + setError(null); + setStatus(null); + setResult(null); + setRunId(null); + + try { + // Create orchestration config + const config = { + steps: [ + { + type: 'agent' as const, + agent: '/system/current_date', + id: 'date', + input: { format: 'iso', timezone: 'UTC' }, + }, + { + type: 'sleep' as const, + duration: '2s', // 2 second delay + }, + { + type: 'hook' as const, + token: `orchestrate-approval:${userId}:${topic}`, + id: 'approval', + }, + { + type: 'agent' as const, + agent: '/system/current_date', + id: 'dateAfterApproval', + input: { format: 'iso', timezone: 'UTC' }, + }, + ], + input: { + topic, + userId, + }, + }; + + const response = await fetch('/api/studio/workflow/orchestrate', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ config }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Failed to start workflow: ${errorText}`); + } + + const data = await response.json(); + + if (data.error) { + setError(data.error); + setStatus('error'); + return; + } + + if (data.runId) { + setRunId(data.runId); + setStatus(data.status); + + // Poll for status if not completed + if (data.status !== 'completed' && data.status !== 'failed' && data.status !== 'error') { + console.log('Polling status for runId:', data.runId); + setHookToken(`orchestrate-approval:${userId}:${topic}`) + pollStatus(data.runId); + } + } else { + setError('Invalid response format: missing runId'); + } + } catch (err: any) { + setError(err.message || 'Failed to start workflow'); + setStatus('error'); + } finally { + setLoading(false); + } + }; + + const pollStatus = async (id: string) => { + const interval = setInterval(async () => { + try { + const response = await fetch(`/api/studio/workflow/status?runId=${id}`); + if (response.ok) { + const data = await response.json(); + + if (data.error) { + setError(data.error); + setStatus('error'); + clearInterval(interval); + return; + } + + if (data.status) { + setStatus(data.status); + + // Store hook token if available (when workflow is paused) + if (data.hook?.token) { + setHookToken(data.hook.token); + } + + // Store result if completed + if (data.status === 'completed' && data.result) { + setResult(data.result); + } + + if ( + data.status === 'completed' || + data.status === 'failed' || + data.status === 'error' + ) { + clearInterval(interval); + } + } + } + } catch (err) { + console.error('Failed to poll status', err); + clearInterval(interval); + } + }, 2000); // Poll every 2 seconds + + // Clean up after 5 minutes + setTimeout(() => clearInterval(interval), 5 * 60 * 1000); + }; + + const signalApproval = async (decision: 'approve' | 'reject') => { + if (!hookToken) { + setError('Hook token not available'); + return; + } + + try { + const response = await fetch('/api/studio/workflow/signal', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + token: hookToken, + payload: { decision, timestamp: new Date().toISOString() }, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Failed to signal: ${errorText}`); + } + + // Continue polling + if (runId) { + pollStatus(runId); + } + } catch (err: any) { + setError(err.message || 'Failed to send signal'); + } + }; + + const getStatusBadge = () => { + if (!status) return null; + + switch (status) { + case 'completed': + return ( + + Completed + + ); + case 'paused': + return ( + + Waiting for Approval + + ); + case 'running': + return ( + + Running + + ); + case 'error': + case 'failed': + return ( + + Error + + ); + default: + return {status}; + } + }; + + return ( +
+ + + Orchestration Workflow Test + + Test the orchestration system with multiple agents, sleep, and HITL hooks + + + +
+
+ + setTopic(e.target.value)} + disabled={loading} + /> +
+ +
+ + setUserId(e.target.value)} + disabled={loading} + /> +
+
+ +
+

Workflow Steps:

+
    +
  1. Call /system/current_date agent (get current date)
  2. +
  3. Sleep for 2 seconds
  4. +
  5. Wait for approval (HITL hook)
  6. +
  7. Call /system/current_date agent again (get date after approval)
  8. +
+
+ + {error && ( + + {error} + + )} + + {runId && ( + + +
+
+
+ Run ID: {runId} +
+
+ Status: {status} +
+
+ {getStatusBadge()} +
+ {(status === 'paused' || status === 'running') && ( +
+ <> +

+ Workflow is waiting for approval. Use the buttons below to continue: +

+
+ + +
+ +
+ )} +
+
+ )} + + {result && ( + + + Workflow Result + + +