diff --git a/package.json b/package.json index 454540a9..a57a14d1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@aws/agentcore", - "version": "0.3.0-preview.5.1", + "version": "0.3.0-preview.6.0", "description": "CLI for Amazon Bedrock AgentCore", "license": "Apache-2.0", "repository": { diff --git a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap index 3a18af18..24d7caab 100644 --- a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap +++ b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap @@ -1645,7 +1645,8 @@ readme = "README.md" requires-python = ">=3.10" dependencies = [ {{#if (eq modelProvider "Anthropic")}}"anthropic >= 0.30.0", - {{/if}}"aws-opentelemetry-distro", + {{/if}}"a2a-sdk[all] >= 0.2.0", + "aws-opentelemetry-distro", "bedrock-agentcore[a2a] >= 1.0.3", "botocore[crt] >= 1.35.0", {{#if (eq modelProvider "Gemini")}}"google-genai >= 1.0.0", @@ -4076,29 +4077,30 @@ Thumbs.db exports[`Assets Directory Snapshots > Python framework assets > python/python/mcp/standalone/base/main.py should match snapshot 1`] = ` "from mcp.server.fastmcp import FastMCP -import uvicorn -mcp = FastMCP("{{ name }}") +mcp = FastMCP("{{ name }}", host="0.0.0.0", stateless_http=True) @mcp.tool() def add_numbers(a: int, b: int) -> int: - """Return the sum of two numbers.""" + """Add two numbers together""" return a + b @mcp.tool() -def greet(name: str) -> str: - """Return a greeting for the given name.""" - return f"Hello, {name}!" +def multiply_numbers(a: int, b: int) -> int: + """Multiply two numbers together""" + return a * b + + +@mcp.tool() +def greet_user(name: str) -> str: + """Greet a user by name""" + return f"Hello, {name}! Nice to meet you." if __name__ == "__main__": - uvicorn.run( - mcp.streamable_http_app(), - host="0.0.0.0", - port=8000, - ) + mcp.run(transport="streamable-http") " `; @@ -4114,10 +4116,7 @@ description = "AgentCore MCP Server" readme = "README.md" requires-python = ">=3.10" dependencies = [ - "aws-opentelemetry-distro", - "bedrock-agentcore >= 1.0.3", "mcp >= 1.19.0", - "uvicorn >= 0.30.0", ] [tool.hatch.build.targets.wheel] diff --git a/src/assets/python/a2a/strands/base/pyproject.toml b/src/assets/python/a2a/strands/base/pyproject.toml index b01475be..e062c556 100644 --- a/src/assets/python/a2a/strands/base/pyproject.toml +++ b/src/assets/python/a2a/strands/base/pyproject.toml @@ -10,7 +10,8 @@ readme = "README.md" requires-python = ">=3.10" dependencies = [ {{#if (eq modelProvider "Anthropic")}}"anthropic >= 0.30.0", - {{/if}}"aws-opentelemetry-distro", + {{/if}}"a2a-sdk[all] >= 0.2.0", + "aws-opentelemetry-distro", "bedrock-agentcore[a2a] >= 1.0.3", "botocore[crt] >= 1.35.0", {{#if (eq modelProvider "Gemini")}}"google-genai >= 1.0.0", diff --git a/src/assets/python/mcp/standalone/base/main.py b/src/assets/python/mcp/standalone/base/main.py index f91415ec..4ba0f990 100644 --- a/src/assets/python/mcp/standalone/base/main.py +++ b/src/assets/python/mcp/standalone/base/main.py @@ -1,24 +1,25 @@ from mcp.server.fastmcp import FastMCP -import uvicorn -mcp = FastMCP("{{ name }}") +mcp = FastMCP("{{ name }}", host="0.0.0.0", stateless_http=True) @mcp.tool() def add_numbers(a: int, b: int) -> int: - """Return the sum of two numbers.""" + """Add two numbers together""" return a + b @mcp.tool() -def greet(name: str) -> str: - """Return a greeting for the given name.""" - return f"Hello, {name}!" +def multiply_numbers(a: int, b: int) -> int: + """Multiply two numbers together""" + return a * b + + +@mcp.tool() +def greet_user(name: str) -> str: + """Greet a user by name""" + return f"Hello, {name}! Nice to meet you." if __name__ == "__main__": - uvicorn.run( - mcp.streamable_http_app(), - host="0.0.0.0", - port=8000, - ) + mcp.run(transport="streamable-http") diff --git a/src/assets/python/mcp/standalone/base/pyproject.toml b/src/assets/python/mcp/standalone/base/pyproject.toml index 599a5cef..824f612c 100644 --- a/src/assets/python/mcp/standalone/base/pyproject.toml +++ b/src/assets/python/mcp/standalone/base/pyproject.toml @@ -9,10 +9,7 @@ description = "AgentCore MCP Server" readme = "README.md" requires-python = ">=3.10" dependencies = [ - "aws-opentelemetry-distro", - "bedrock-agentcore >= 1.0.3", "mcp >= 1.19.0", - "uvicorn >= 0.30.0", ] [tool.hatch.build.targets.wheel] diff --git a/src/cli/aws/__tests__/agentcore.test.ts b/src/cli/aws/__tests__/agentcore.test.ts index 494c64d2..e26e4324 100644 --- a/src/cli/aws/__tests__/agentcore.test.ts +++ b/src/cli/aws/__tests__/agentcore.test.ts @@ -1,4 +1,4 @@ -import { extractResult, parseSSE, parseSSELine } from '../agentcore.js'; +import { extractResult, parseA2AResponse, parseSSE, parseSSELine } from '../agentcore.js'; import { describe, expect, it } from 'vitest'; describe('parseSSELine', () => { @@ -97,3 +97,82 @@ describe('extractResult', () => { expect(extractResult('')).toBe(''); }); }); + +describe('parseA2AResponse', () => { + it('extracts text from artifacts with kind:text parts', () => { + const response = JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { + artifacts: [{ parts: [{ kind: 'text', text: 'Hello from A2A' }] }], + }, + }); + expect(parseA2AResponse(response)).toBe('Hello from A2A'); + }); + + it('extracts text from artifacts with type:text parts (backward compat)', () => { + const response = JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { + artifacts: [{ parts: [{ type: 'text', text: 'Hello' }] }], + }, + }); + expect(parseA2AResponse(response)).toBe('Hello'); + }); + + it('concatenates text from multiple parts', () => { + const response = JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { + artifacts: [ + { + parts: [ + { kind: 'text', text: 'part1' }, + { kind: 'text', text: 'part2' }, + ], + }, + ], + }, + }); + expect(parseA2AResponse(response)).toBe('part1part2'); + }); + + it('returns error message for JSON-RPC error', () => { + const response = JSON.stringify({ + jsonrpc: '2.0', + id: 1, + error: { code: -32600, message: 'Bad request' }, + }); + expect(parseA2AResponse(response)).toBe('Error: Bad request'); + }); + + it('falls back to history for agent messages', () => { + const response = JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { + history: [ + { role: 'user', parts: [{ kind: 'text', text: 'hi' }] }, + { role: 'agent', parts: [{ kind: 'text', text: 'Hello!' }] }, + ], + }, + }); + expect(parseA2AResponse(response)).toBe('Hello!'); + }); + + it('returns stringified result when no text parts found', () => { + const response = JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { id: 'task-1', status: { state: 'completed' } }, + }); + const parsed = parseA2AResponse(response); + expect(parsed).toContain('task-1'); + }); + + it('returns raw text for non-JSON input', () => { + expect(parseA2AResponse('not json')).toBe('not json'); + }); +}); diff --git a/src/cli/aws/agentcore.ts b/src/cli/aws/agentcore.ts index 8baf9f72..4a03b3c8 100644 --- a/src/cli/aws/agentcore.ts +++ b/src/cli/aws/agentcore.ts @@ -1,3 +1,4 @@ +import { parseJsonRpcResponse } from '../../lib/utils/json-rpc'; import { getCredentialProvider } from './account'; import { BedrockAgentCoreClient, @@ -234,6 +235,375 @@ export async function invokeAgentRuntime(options: InvokeAgentRuntimeOptions): Pr }; } +// --------------------------------------------------------------------------- +// MCP: JSON-RPC over InvokeAgentRuntime +// --------------------------------------------------------------------------- + +export interface McpInvokeOptions { + region: string; + runtimeArn: string; + userId?: string; + mcpSessionId?: string; + logger?: SSELogger; +} + +export interface McpToolDef { + name: string; + description?: string; + inputSchema?: Record; +} + +export interface McpListToolsResult { + tools: McpToolDef[]; + mcpSessionId?: string; +} + +let mcpRequestId = 1; + +interface McpRpcResult { + result: Record; + mcpSessionId?: string; + error?: { message?: string; code?: number }; +} + +/** Send a JSON-RPC payload through InvokeAgentRuntime and return the parsed response. */ +async function mcpRpcCall(options: McpInvokeOptions, body: Record): Promise { + const client = new BedrockAgentCoreClient({ + region: options.region, + credentials: getCredentialProvider(), + }); + + options.logger?.logSSEEvent(`MCP request: ${JSON.stringify(body)}`); + + const command = new InvokeAgentRuntimeCommand({ + agentRuntimeArn: options.runtimeArn, + payload: new TextEncoder().encode(JSON.stringify(body)), + contentType: 'application/json', + accept: 'application/json, text/event-stream', + mcpSessionId: options.mcpSessionId, + mcpProtocolVersion: '2025-03-26', + runtimeUserId: options.userId ?? DEFAULT_RUNTIME_USER_ID, + }); + + const response = await client.send(command); + + if (!response.response) { + throw new Error('No response from AgentCore Runtime'); + } + + const bytes = await response.response.transformToByteArray(); + const text = new TextDecoder().decode(bytes); + + options.logger?.logSSEEvent(`MCP response: ${text}`); + + const parsed = parseJsonRpcResponse(text); + + return { + result: (parsed.result as Record) ?? {}, + mcpSessionId: response.mcpSessionId, + error: parsed.error as McpRpcResult['error'], + }; +} + +/** Call mcpRpcCall and throw on JSON-RPC errors. Use mcpRpcCall directly when errors should be tolerated. */ +async function mcpRpcCallStrict(options: McpInvokeOptions, body: Record): Promise { + const result = await mcpRpcCall(options, body); + if (result.error) { + throw new Error(result.error.message ?? `MCP error (code ${result.error.code})`); + } + return result; +} + +/** Send a JSON-RPC notification (no id, no response expected). */ +async function mcpRpcNotify(options: McpInvokeOptions, body: Record): Promise { + const client = new BedrockAgentCoreClient({ + region: options.region, + credentials: getCredentialProvider(), + }); + + const command = new InvokeAgentRuntimeCommand({ + agentRuntimeArn: options.runtimeArn, + payload: new TextEncoder().encode(JSON.stringify(body)), + contentType: 'application/json', + accept: 'application/json, text/event-stream', + mcpSessionId: options.mcpSessionId, + mcpProtocolVersion: '2025-03-26', + runtimeUserId: options.userId ?? DEFAULT_RUNTIME_USER_ID, + }); + + await client.send(command); +} + +/** + * Initialize MCP session and list available tools via InvokeAgentRuntime. + * Retries on cold-start initialization timeouts. + */ +export async function mcpListTools(options: McpInvokeOptions): Promise { + const maxRetries = 3; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + return await mcpListToolsOnce(options); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + const isColdStart = msg.includes('initialization time exceeded') || msg.includes('initialization'); + + if (isColdStart && attempt < maxRetries - 1) { + options.logger?.logSSEEvent(`MCP cold start (attempt ${attempt + 1}/${maxRetries}), retrying...`); + await new Promise(resolve => setTimeout(resolve, 2000)); + continue; + } + throw err; + } + } + + throw new Error('Failed to list MCP tools after retries'); +} + +async function mcpListToolsOnce(options: McpInvokeOptions): Promise { + // 1. Initialize — tolerate JSON-RPC errors (stateless servers may reject initialize but still return a session ID) + const initResult = await mcpRpcCall(options, { + jsonrpc: '2.0', + id: mcpRequestId++, + method: 'initialize', + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { name: 'agentcore-cli', version: '1.0.0' }, + }, + }); + + if (initResult.error) { + options.logger?.logSSEEvent( + `MCP initialize returned error (expected for stateless servers): ${initResult.error.message}` + ); + } + + const sessionId = initResult.mcpSessionId; + const optionsWithSession = { ...options, mcpSessionId: sessionId }; + + // 2. Send initialized notification + await mcpRpcNotify(optionsWithSession, { + jsonrpc: '2.0', + method: 'notifications/initialized', + }); + + // 3. List tools + const listResult = await mcpRpcCallStrict(optionsWithSession, { + jsonrpc: '2.0', + id: mcpRequestId++, + method: 'tools/list', + params: {}, + }); + + const tools = (listResult.result as { tools?: McpToolDef[] }).tools ?? []; + + return { + tools: tools.map(t => ({ name: t.name, description: t.description, inputSchema: t.inputSchema })), + mcpSessionId: sessionId, + }; +} + +/** + * Initialize an MCP session (without listing tools). + * Returns just the session ID needed for subsequent tool calls. + */ +export async function mcpInitSession(options: McpInvokeOptions): Promise { + const initResult = await mcpRpcCall(options, { + jsonrpc: '2.0', + id: mcpRequestId++, + method: 'initialize', + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { name: 'agentcore-cli', version: '1.0.0' }, + }, + }); + + const sessionId = initResult.mcpSessionId; + const optionsWithSession = { ...options, mcpSessionId: sessionId }; + + await mcpRpcNotify(optionsWithSession, { + jsonrpc: '2.0', + method: 'notifications/initialized', + }); + + return sessionId; +} + +/** + * Call an MCP tool via InvokeAgentRuntime. + * Retries on cold-start initialization timeouts. + */ +export async function mcpCallTool( + options: McpInvokeOptions, + toolName: string, + args: Record +): Promise { + const maxRetries = 3; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + const { result } = await mcpRpcCallStrict(options, { + jsonrpc: '2.0', + id: mcpRequestId++, + method: 'tools/call', + params: { name: toolName, arguments: args }, + }); + + const content = (result as { content?: { type?: string; text?: string }[] }).content; + if (content) { + const texts: string[] = []; + for (const item of content) { + if (item.text !== undefined) { + texts.push(item.text); + } + } + if (texts.length > 0) return texts.join(''); + } + + return JSON.stringify(result, null, 2); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + const isColdStart = msg.includes('initialization time exceeded') || msg.includes('initialization'); + + if (isColdStart && attempt < maxRetries - 1) { + options.logger?.logSSEEvent(`MCP cold start (attempt ${attempt + 1}/${maxRetries}), retrying...`); + await new Promise(resolve => setTimeout(resolve, 2000)); + continue; + } + throw err; + } + } + + throw new Error('Failed to call MCP tool after retries'); +} + +// --------------------------------------------------------------------------- +// A2A: JSON-RPC message/send over InvokeAgentRuntime +// --------------------------------------------------------------------------- + +export interface A2AInvokeOptions { + region: string; + runtimeArn: string; + userId?: string; + logger?: SSELogger; +} + +let a2aRequestId = 1; + +/** + * Invoke a deployed A2A agent via InvokeAgentRuntime with JSON-RPC message/send. + * Streams text parts from the response artifacts. + */ +export async function invokeA2ARuntime(options: A2AInvokeOptions, message: string): Promise { + const client = new BedrockAgentCoreClient({ + region: options.region, + credentials: getCredentialProvider(), + }); + + const body = { + jsonrpc: '2.0', + id: a2aRequestId++, + method: 'message/send', + params: { + message: { + role: 'user', + parts: [{ kind: 'text', text: message }], + messageId: `msg-${Date.now()}`, + }, + }, + }; + + options.logger?.logSSEEvent(`A2A request: ${JSON.stringify(body)}`); + + const command = new InvokeAgentRuntimeCommand({ + agentRuntimeArn: options.runtimeArn, + payload: new TextEncoder().encode(JSON.stringify(body)), + contentType: 'application/json', + accept: 'application/json, text/event-stream', + runtimeUserId: options.userId ?? DEFAULT_RUNTIME_USER_ID, + }); + + const response = await client.send(command); + + if (!response.response) { + throw new Error('No response from AgentCore Runtime'); + } + + const bytes = await response.response.transformToByteArray(); + const text = new TextDecoder().decode(bytes); + + options.logger?.logSSEEvent(`A2A response: ${text}`); + + const parsed = parseA2AResponse(text); + + return { + stream: singleValueStream(parsed), + sessionId: undefined, + }; +} + +/** Wrap a single string value as an AsyncGenerator for StreamingInvokeResult compatibility. */ +async function* singleValueStream(value: string): AsyncGenerator { + yield await Promise.resolve(value); +} + +/** Extract text content from A2A JSON-RPC response. Supports both kind:'text' and type:'text' part formats. */ +export function parseA2AResponse(text: string): string { + try { + const parsed: unknown = JSON.parse(text); + if (!parsed || typeof parsed !== 'object') return text; + + const obj = parsed as Record; + + // Check for JSON-RPC error + if (obj.error && typeof obj.error === 'object') { + const err = obj.error as { message?: string }; + return `Error: ${err.message ?? JSON.stringify(obj.error)}`; + } + + // Extract text from result.artifacts[].parts[].text + const result = obj.result as Record | undefined; + if (!result) return text; + + const artifacts = result.artifacts as { parts?: { kind?: string; type?: string; text?: string }[] }[] | undefined; + if (artifacts) { + const texts: string[] = []; + for (const artifact of artifacts) { + if (artifact.parts) { + for (const part of artifact.parts) { + if ((part.kind === 'text' || part.type === 'text') && part.text !== undefined) { + texts.push(part.text); + } + } + } + } + if (texts.length > 0) return texts.join(''); + } + + // Fallback: check history for the last assistant message + const history = result.history as + | { role?: string; parts?: { kind?: string; type?: string; text?: string }[] }[] + | undefined; + if (history) { + for (let i = history.length - 1; i >= 0; i--) { + const msg = history[i]; + if (msg?.role === 'agent' && msg.parts) { + const agentTexts = msg.parts + .filter(p => (p.kind === 'text' || p.type === 'text') && p.text !== undefined) + .map(p => p.text!); + if (agentTexts.length > 0) return agentTexts.join(''); + } + } + } + + return JSON.stringify(result, null, 2); + } catch { + return text; + } +} + /** * Stop a runtime session. */ diff --git a/src/cli/aws/index.ts b/src/cli/aws/index.ts index 1f943c40..7662fb53 100644 --- a/src/cli/aws/index.ts +++ b/src/cli/aws/index.ts @@ -17,11 +17,18 @@ export { streamLogs, searchLogs, type LogEvent, type StreamLogsOptions, type Sea export { enableTransactionSearch, type TransactionSearchEnableResult } from './transaction-search'; export { DEFAULT_RUNTIME_USER_ID, + invokeA2ARuntime, invokeAgentRuntime, invokeAgentRuntimeStreaming, + mcpInitSession, + mcpListTools, + mcpCallTool, stopRuntimeSession, type InvokeAgentRuntimeOptions, type InvokeAgentRuntimeResult, + type McpInvokeOptions, + type McpToolDef, + type McpListToolsResult, type StreamingInvokeResult, type StopRuntimeSessionOptions, type StopRuntimeSessionResult, diff --git a/src/cli/commands/dev/command.tsx b/src/cli/commands/dev/command.tsx index 94aa42a3..201ea419 100644 --- a/src/cli/commands/dev/command.tsx +++ b/src/cli/commands/dev/command.tsx @@ -2,13 +2,17 @@ import { findConfigRoot, getWorkingDirectory, readEnvFile } from '../../../lib'; import { getErrorMessage } from '../../errors'; import { ExecLogger } from '../../logging'; import { + callMcpTool, createDevServer, findAvailablePort, getAgentPort, getDevConfig, getDevSupportedAgents, + getEndpointUrl, invokeAgent, invokeAgentStreaming, + invokeForProtocol, + listMcpTools, loadProjectConfig, } from '../../operations/dev'; import { getGatewayEnvVars } from '../../operations/dev/gateway-env.js'; @@ -48,6 +52,74 @@ async function invokeDevServer(port: number, prompt: string, stream: boolean): P } } +async function invokeA2ADevServer(port: number, prompt: string): Promise { + try { + for await (const chunk of invokeForProtocol('A2A', { port, message: prompt })) { + process.stdout.write(chunk); + } + process.stdout.write('\n'); + } catch (err) { + if (err instanceof Error && err.message.includes('ECONNREFUSED')) { + console.error(`Error: Dev server not running on port ${port}`); + console.error('Start it with: agentcore dev'); + } else { + console.error(`Error: ${err instanceof Error ? err.message : String(err)}`); + } + process.exit(1); + } +} + +async function handleMcpInvoke(port: number, invokeValue: string, toolName?: string, input?: string): Promise { + try { + if (invokeValue === 'list-tools') { + const { tools } = await listMcpTools(port); + if (tools.length === 0) { + console.log('No tools available.'); + return; + } + console.log('Available tools:'); + for (const tool of tools) { + const desc = tool.description ? ` - ${tool.description}` : ''; + console.log(` ${tool.name}${desc}`); + } + } else if (invokeValue === 'call-tool') { + if (!toolName) { + console.error('Error: --tool is required with --invoke call-tool'); + console.error('Usage: agentcore dev --invoke call-tool --tool --input \'{"arg": "value"}\''); + process.exit(1); + } + // Initialize session first, then call tool with the session ID + const { sessionId } = await listMcpTools(port); + let args: Record = {}; + if (input) { + try { + args = JSON.parse(input) as Record; + } catch { + console.error(`Error: Invalid JSON for --input: ${input}`); + console.error('Expected format: --input \'{"key": "value"}\''); + process.exit(1); + } + } + const result = await callMcpTool(port, toolName, args, sessionId); + console.log(result); + } else { + console.error(`Error: Unknown MCP invoke command "${invokeValue}"`); + console.error('Usage:'); + console.error(' agentcore dev --invoke list-tools'); + console.error(' agentcore dev --invoke call-tool --tool --input \'{"arg": "value"}\''); + process.exit(1); + } + } catch (err) { + if (err instanceof Error && err.message.includes('ECONNREFUSED')) { + console.error(`Error: Dev server not running on port ${port}`); + console.error('Start it with: agentcore dev'); + } else { + console.error(`Error: ${err instanceof Error ? err.message : String(err)}`); + } + process.exit(1); + } +} + export const registerDev = (program: Command) => { program .command('dev') @@ -58,6 +130,8 @@ export const registerDev = (program: Command) => { .option('-i, --invoke ', 'Invoke running dev server (use --agent if multiple) [non-interactive]') .option('-s, --stream', 'Stream response when using --invoke [non-interactive]') .option('-l, --logs', 'Run dev server with logs to stdout [non-interactive]') + .option('--tool ', 'MCP tool name (used with --invoke call-tool)') + .option('--input ', 'MCP tool arguments as JSON (used with --invoke call-tool)') .action(async opts => { try { const port = parseInt(opts.port, 10); @@ -79,12 +153,25 @@ export const registerDev = (program: Command) => { process.exit(1); } - // Show model info if available - if (targetAgent?.modelProvider) { + const protocol = targetAgent?.protocol ?? 'HTTP'; + + // Override port for protocols with fixed framework ports + if (protocol === 'A2A') invokePort = 9000; + else if (protocol === 'MCP') invokePort = 8000; + + // Show model info if available (not applicable to MCP) + if (protocol !== 'MCP' && targetAgent?.modelProvider) { console.log(`Provider: ${targetAgent.modelProvider}`); } - await invokeDevServer(invokePort, opts.invoke, opts.stream ?? false); + // Protocol-aware dispatch + if (protocol === 'MCP') { + await handleMcpInvoke(invokePort, opts.invoke, opts.tool, opts.input); + } else if (protocol === 'A2A') { + await invokeA2ADevServer(invokePort, opts.invoke); + } else { + await invokeDevServer(invokePort, opts.invoke, opts.stream ?? false); + } return; } @@ -145,11 +232,17 @@ export const registerDev = (program: Command) => { // Create logger for log file path const logger = new ExecLogger({ command: 'dev' }); - // Calculate port based on agent index - const basePort = getAgentPort(project, config.agentName, port); - const actualPort = await findAvailablePort(basePort); - if (actualPort !== basePort) { - console.log(`Port ${basePort} in use, using ${actualPort}`); + // Calculate port: A2A/MCP use fixed framework ports, HTTP uses configurable port + const isA2A = config.protocol === 'A2A'; + const isMcp = config.protocol === 'MCP'; + const fixedPort = isA2A ? 9000 : isMcp ? 8000 : getAgentPort(project, config.agentName, port); + const actualPort = await findAvailablePort(fixedPort); + if ((isA2A || isMcp) && actualPort !== fixedPort) { + console.error(`Error: Port ${fixedPort} is in use. ${config.protocol} agents require port ${fixedPort}.`); + process.exit(1); + } + if (actualPort !== fixedPort) { + console.log(`Port ${fixedPort} in use, using ${actualPort}`); } // Get provider info from agent config @@ -158,8 +251,13 @@ export const registerDev = (program: Command) => { console.log(`Starting dev server...`); console.log(`Agent: ${config.agentName}`); - console.log(`Provider: ${providerInfo}`); - console.log(`Server: http://localhost:${actualPort}/invocations`); + if (config.protocol !== 'MCP') { + console.log(`Provider: ${providerInfo}`); + } + if (config.protocol !== 'HTTP') { + console.log(`Protocol: ${config.protocol}`); + } + console.log(`Server: ${getEndpointUrl(actualPort, config.protocol)}`); console.log(`Log: ${logger.getRelativeLogPath()}`); console.log(`Press Ctrl+C to stop\n`); diff --git a/src/cli/commands/invoke/action.ts b/src/cli/commands/invoke/action.ts index 588c1f29..c43b5c1e 100644 --- a/src/cli/commands/invoke/action.ts +++ b/src/cli/commands/invoke/action.ts @@ -1,7 +1,15 @@ import { ConfigIO } from '../../../lib'; import type { AgentCoreProjectSpec, AwsDeploymentTargets, DeployedState } from '../../../schema'; -import { invokeAgentRuntime, invokeAgentRuntimeStreaming } from '../../aws'; +import { + invokeA2ARuntime, + invokeAgentRuntime, + invokeAgentRuntimeStreaming, + mcpCallTool, + mcpInitSession, + mcpListTools, +} from '../../aws'; import { InvokeLogger } from '../../logging'; +import { formatMcpToolList } from '../../operations/dev/utils'; import type { InvokeOptions, InvokeResult } from './types'; export interface InvokeContext { @@ -81,10 +89,108 @@ export async function handleInvoke(context: InvokeContext, options: InvokeOption return { success: false, error: `Agent '${agentSpec.name}' is not deployed to target '${selectedTargetName}'` }; } + // MCP protocol handling + if (agentSpec.protocol === 'MCP') { + const mcpOpts = { + region: targetConfig.region, + runtimeArn: agentState.runtimeArn, + userId: options.userId, + }; + + // list-tools: list available MCP tools + if (options.prompt === 'list-tools') { + try { + const result = await mcpListTools(mcpOpts); + const response = formatMcpToolList(result.tools); + return { + success: true, + agentName: agentSpec.name, + targetName: selectedTargetName, + response, + }; + } catch (err) { + return { + success: false, + error: `Failed to list MCP tools: ${err instanceof Error ? err.message : String(err)}`, + }; + } + } + + // call-tool: call an MCP tool by name + if (options.prompt === 'call-tool') { + if (!options.tool) { + return { + success: false, + error: 'MCP call-tool requires --tool . Use "list-tools" to see available tools.', + }; + } + let args: Record = {}; + if (options.input) { + try { + args = JSON.parse(options.input) as Record; + } catch { + return { success: false, error: `Invalid JSON for --input: ${options.input}` }; + } + } + try { + // Lightweight init to get session ID (no tools/list round-trip) + const mcpSessionId = await mcpInitSession(mcpOpts); + const response = await mcpCallTool({ ...mcpOpts, mcpSessionId }, options.tool, args); + return { + success: true, + agentName: agentSpec.name, + targetName: selectedTargetName, + response, + }; + } catch (err) { + return { + success: false, + error: `Failed to call MCP tool: ${err instanceof Error ? err.message : String(err)}`, + }; + } + } + + if (!options.prompt) { + return { + success: false, + error: + 'MCP agents require a command. Usage:\n agentcore invoke list-tools\n agentcore invoke call-tool --tool --input \'{"arg": "value"}\'', + }; + } + } + if (!options.prompt) { return { success: false, error: 'No prompt provided. Usage: agentcore invoke "your prompt"' }; } + // A2A protocol handling — send JSON-RPC message/send via InvokeAgentRuntime + if (agentSpec.protocol === 'A2A') { + try { + const a2aResult = await invokeA2ARuntime( + { region: targetConfig.region, runtimeArn: agentState.runtimeArn, userId: options.userId }, + options.prompt + ); + let response = ''; + for await (const chunk of a2aResult.stream) { + response += chunk; + if (options.stream) { + process.stdout.write(chunk); + } + } + if (options.stream) { + process.stdout.write('\n'); + } + return { + success: true, + agentName: agentSpec.name, + targetName: selectedTargetName, + response, + }; + } catch (err) { + return { success: false, error: `A2A invoke failed: ${err instanceof Error ? err.message : String(err)}` }; + } + } + // Get provider info if available const providerInfo = agentSpec.modelProvider; diff --git a/src/cli/commands/invoke/command.tsx b/src/cli/commands/invoke/command.tsx index 8e65d332..f2b2367d 100644 --- a/src/cli/commands/invoke/command.tsx +++ b/src/cli/commands/invoke/command.tsx @@ -101,6 +101,8 @@ export const registerInvoke = (program: Command) => { .option('--user-id ', 'User ID for runtime invocation (default: "default-user")') .option('--json', 'Output as JSON [non-interactive]') .option('--stream', 'Stream response in real-time (TUI streams by default) [non-interactive]') + .option('--tool ', 'MCP tool name (use with "call-tool" prompt) [non-interactive]') + .option('--input ', 'MCP tool arguments as JSON (use with --tool) [non-interactive]') .action( async ( positionalPrompt: string | undefined, @@ -112,6 +114,8 @@ export const registerInvoke = (program: Command) => { userId?: string; json?: boolean; stream?: boolean; + tool?: string; + input?: string; } ) => { try { @@ -120,7 +124,14 @@ export const registerInvoke = (program: Command) => { const prompt = cliOptions.prompt ?? positionalPrompt; // CLI mode if any CLI-specific options provided (follows deploy command pattern) - if (prompt || cliOptions.json || cliOptions.target || cliOptions.stream || cliOptions.agent) { + if ( + prompt || + cliOptions.json || + cliOptions.target || + cliOptions.stream || + cliOptions.agent || + cliOptions.tool + ) { await handleInvokeCLI({ prompt, agentName: cliOptions.agent, @@ -129,6 +140,8 @@ export const registerInvoke = (program: Command) => { userId: cliOptions.userId, json: cliOptions.json, stream: cliOptions.stream, + tool: cliOptions.tool, + input: cliOptions.input, }); } else { // No CLI options - interactive TUI mode diff --git a/src/cli/commands/invoke/types.ts b/src/cli/commands/invoke/types.ts index dc2a62c6..dfd95c2a 100644 --- a/src/cli/commands/invoke/types.ts +++ b/src/cli/commands/invoke/types.ts @@ -6,6 +6,10 @@ export interface InvokeOptions { userId?: string; json?: boolean; stream?: boolean; + /** MCP tool name (used with prompt "call-tool") */ + tool?: string; + /** MCP tool arguments as JSON string (used with --tool) */ + input?: string; } export interface InvokeResult { diff --git a/src/cli/operations/agent/generate/schema-mapper.ts b/src/cli/operations/agent/generate/schema-mapper.ts index 30c4cb39..fee8dd33 100644 --- a/src/cli/operations/agent/generate/schema-mapper.ts +++ b/src/cli/operations/agent/generate/schema-mapper.ts @@ -129,6 +129,8 @@ export function mapGenerateConfigToAgent(config: GenerateConfig): AgentEnvSpec { }, }), ...(protocol !== 'MCP' && { modelProvider: config.modelProvider }), + // MCP uses mcp.run() which is incompatible with the opentelemetry-instrument wrapper + ...(protocol === 'MCP' && { instrumentation: { enableOtel: false } }), }; } diff --git a/src/cli/operations/dev/__tests__/codezip-dev-server.test.ts b/src/cli/operations/dev/__tests__/codezip-dev-server.test.ts new file mode 100644 index 00000000..f0a91bd3 --- /dev/null +++ b/src/cli/operations/dev/__tests__/codezip-dev-server.test.ts @@ -0,0 +1,142 @@ +import { CodeZipDevServer } from '../codezip-dev-server'; +import type { DevConfig } from '../config'; +import type { DevServerCallbacks, DevServerOptions } from '../dev-server'; +import { EventEmitter } from 'events'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const mockSpawn = vi.fn(); +vi.mock('child_process', () => ({ + spawn: (...args: unknown[]) => mockSpawn(...args), + spawnSync: vi.fn(() => ({ status: 0 })), +})); + +vi.mock('fs', () => ({ + existsSync: vi.fn(() => true), +})); + +vi.mock('../../../../lib/utils/platform', () => ({ + getVenvExecutable: (venvPath: string, executable: string) => `${venvPath}/bin/${executable}`, +})); + +function createMockChildProcess() { + const proc = new EventEmitter() as any; + proc.stdout = new EventEmitter(); + proc.stderr = new EventEmitter(); + proc.killed = false; + proc.kill = vi.fn(); + return proc; +} + +const mockCallbacks: DevServerCallbacks = { onLog: vi.fn(), onExit: vi.fn() }; +const defaultOptions: DevServerOptions = { port: 8080, envVars: { MY_KEY: 'secret' }, callbacks: mockCallbacks }; + +describe('CodeZipDevServer spawn config', () => { + beforeEach(() => { + mockSpawn.mockReturnValue(createMockChildProcess()); + }); + + it('HTTP: uses uvicorn with --reload', async () => { + const config: DevConfig = { + agentName: 'HttpAgent', + module: 'main.py', + directory: '/project/app', + hasConfig: true, + isPython: true, + buildType: 'CodeZip', + protocol: 'HTTP', + }; + + const server = new CodeZipDevServer(config, defaultOptions); + await server.start(); + + expect(mockSpawn).toHaveBeenCalledWith( + '/project/app/.venv/bin/uvicorn', + expect.arrayContaining(['--reload', '--host', '127.0.0.1', '--port', '8080']), + expect.objectContaining({ cwd: '/project/app' }) + ); + }); + + it('MCP: uses python directly with main.py', async () => { + const config: DevConfig = { + agentName: 'McpAgent', + module: 'main.py', + directory: '/project/app', + hasConfig: true, + isPython: true, + buildType: 'CodeZip', + protocol: 'MCP', + }; + + const server = new CodeZipDevServer(config, defaultOptions); + await server.start(); + + expect(mockSpawn).toHaveBeenCalledWith( + '/project/app/.venv/bin/python', + ['main.py'], + expect.objectContaining({ cwd: '/project/app' }) + ); + }); + + it('A2A: uses python directly with main.py', async () => { + const config: DevConfig = { + agentName: 'A2aAgent', + module: 'main.py', + directory: '/project/app', + hasConfig: true, + isPython: true, + buildType: 'CodeZip', + protocol: 'A2A', + }; + + const server = new CodeZipDevServer(config, defaultOptions); + await server.start(); + + expect(mockSpawn).toHaveBeenCalledWith( + '/project/app/.venv/bin/python', + ['main.py'], + expect.objectContaining({ cwd: '/project/app' }) + ); + }); + + it('non-HTTP: passes env vars including PORT and LOCAL_DEV', async () => { + const config: DevConfig = { + agentName: 'A2aAgent', + module: 'main.py', + directory: '/project/app', + hasConfig: true, + isPython: true, + buildType: 'CodeZip', + protocol: 'A2A', + }; + + const server = new CodeZipDevServer(config, defaultOptions); + await server.start(); + + const spawnCall = mockSpawn.mock.calls[0]!; + const env = spawnCall[2].env; + expect(env.PORT).toBe('8080'); + expect(env.LOCAL_DEV).toBe('1'); + expect(env.MY_KEY).toBe('secret'); + }); + + it('MCP: extracts file from module:function entrypoint', async () => { + const config: DevConfig = { + agentName: 'McpAgent', + module: 'app.py:handler', + directory: '/project/app', + hasConfig: true, + isPython: true, + buildType: 'CodeZip', + protocol: 'MCP', + }; + + const server = new CodeZipDevServer(config, defaultOptions); + await server.start(); + + expect(mockSpawn).toHaveBeenCalledWith( + '/project/app/.venv/bin/python', + ['app.py'], + expect.objectContaining({ cwd: '/project/app' }) + ); + }); +}); diff --git a/src/cli/operations/dev/__tests__/config.test.ts b/src/cli/operations/dev/__tests__/config.test.ts index 54e17109..282e842a 100644 --- a/src/cli/operations/dev/__tests__/config.test.ts +++ b/src/cli/operations/dev/__tests__/config.test.ts @@ -220,6 +220,77 @@ describe('getDevConfig', () => { expect(config?.buildType).toBe('Container'); }); + it('returns protocol HTTP by default when agent has no protocol', () => { + const project: AgentCoreProjectSpec = { + name: 'TestProject', + version: 1, + agents: [ + { + type: 'AgentCoreRuntime', + name: 'PythonAgent', + build: 'CodeZip', + runtimeVersion: 'PYTHON_3_12', + entrypoint: filePath('main.py'), + codeLocation: dirPath('./agents/python'), + }, + ], + memories: [], + credentials: [], + }; + + const config = getDevConfig(workingDir, project, '/test/project/agentcore'); + expect(config).not.toBeNull(); + expect(config!.protocol).toBe('HTTP'); + }); + + it('returns protocol MCP for MCP agents', () => { + const project: AgentCoreProjectSpec = { + name: 'TestProject', + version: 1, + agents: [ + { + type: 'AgentCoreRuntime', + name: 'McpAgent', + build: 'CodeZip', + runtimeVersion: 'PYTHON_3_12', + entrypoint: filePath('main.py'), + codeLocation: dirPath('./agents/mcp'), + protocol: 'MCP', + }, + ], + memories: [], + credentials: [], + }; + + const config = getDevConfig(workingDir, project, '/test/project/agentcore'); + expect(config).not.toBeNull(); + expect(config!.protocol).toBe('MCP'); + }); + + it('returns protocol A2A for A2A agents', () => { + const project: AgentCoreProjectSpec = { + name: 'TestProject', + version: 1, + agents: [ + { + type: 'AgentCoreRuntime', + name: 'A2aAgent', + build: 'CodeZip', + runtimeVersion: 'PYTHON_3_12', + entrypoint: filePath('main.py'), + codeLocation: dirPath('./agents/a2a'), + protocol: 'A2A', + }, + ], + memories: [], + credentials: [], + }; + + const config = getDevConfig(workingDir, project, '/test/project/agentcore'); + expect(config).not.toBeNull(); + expect(config!.protocol).toBe('A2A'); + }); + it('handles .py: entrypoint format (module:function)', () => { const project: AgentCoreProjectSpec = { name: 'TestProject', diff --git a/src/cli/operations/dev/__tests__/container-dev-server.test.ts b/src/cli/operations/dev/__tests__/container-dev-server.test.ts index 3b96bd0a..30dbe43c 100644 --- a/src/cli/operations/dev/__tests__/container-dev-server.test.ts +++ b/src/cli/operations/dev/__tests__/container-dev-server.test.ts @@ -10,6 +10,7 @@ const mockSpawn = vi.fn(); const mockExistsSync = vi.fn(); const mockDetectContainerRuntime = vi.fn(); const mockGetStartHint = vi.fn(); +const mockWaitForServerReady = vi.fn(); vi.mock('child_process', () => ({ spawnSync: (...args: unknown[]) => mockSpawnSync(...args), @@ -31,6 +32,14 @@ vi.mock('../../../external-requirements/detect', () => ({ getStartHint: (...args: unknown[]) => mockGetStartHint(...args), })); +vi.mock('../utils', async importOriginal => { + const actual: Record = await importOriginal(); + return { + ...actual, + waitForServerReady: (...args: unknown[]) => mockWaitForServerReady(...args), + }; +}); + function createMockChildProcess() { const proc = new EventEmitter() as any; proc.stdout = new EventEmitter(); @@ -82,6 +91,7 @@ const defaultConfig: DevConfig = { hasConfig: true, isPython: true, buildType: 'Container' as any, + protocol: 'HTTP', }; const mockCallbacks: DevServerCallbacks = { onLog: vi.fn(), onExit: vi.fn() }; @@ -93,6 +103,8 @@ describe('ContainerDevServer', () => { beforeEach(() => { vi.clearAllMocks(); savedEnv = { ...process.env }; + // Default: container server becomes ready immediately + mockWaitForServerReady.mockResolvedValue(true); }); afterEach(() => { @@ -188,7 +200,7 @@ describe('ContainerDevServer', () => { expect(mockCallbacks.onLog).toHaveBeenCalledWith('system', 'Container image built successfully.'); }); - it('logs system-level start message and triggers TUI readiness after container is spawned', async () => { + it('waits for server to be ready before triggering TUI readiness', async () => { mockSuccessfulPrepare(); const server = new ContainerDevServer(defaultConfig, defaultOptions); @@ -196,12 +208,28 @@ describe('ContainerDevServer', () => { expect(mockCallbacks.onLog).toHaveBeenCalledWith( 'system', - 'Container agentcore-dev-testagent started on port 9000.' + 'Container agentcore-dev-testagent started, waiting for server to be ready...' ); - // Emits readiness trigger for TUI detection + expect(mockWaitForServerReady).toHaveBeenCalledWith(9000); + // Emits readiness trigger for TUI detection only after port is ready expect(mockCallbacks.onLog).toHaveBeenCalledWith('info', 'Application startup complete'); }); + it('logs error when container server does not become ready in time', async () => { + mockSuccessfulPrepare(); + mockWaitForServerReady.mockResolvedValue(false); + + const server = new ContainerDevServer(defaultConfig, defaultOptions); + await server.start(); + + expect(mockCallbacks.onLog).toHaveBeenCalledWith( + 'error', + 'Container server did not become ready within 60 seconds.' + ); + // Should NOT emit readiness trigger + expect(mockCallbacks.onLog).not.toHaveBeenCalledWith('info', 'Application startup complete'); + }); + it('builds image directly without a dev layer', async () => { mockSuccessfulPrepare(); diff --git a/src/cli/operations/dev/__tests__/dev-server.test.ts b/src/cli/operations/dev/__tests__/dev-server.test.ts index 8f873849..06827e9c 100644 --- a/src/cli/operations/dev/__tests__/dev-server.test.ts +++ b/src/cli/operations/dev/__tests__/dev-server.test.ts @@ -42,6 +42,7 @@ const config: DevConfig = { hasConfig: true, isPython: true, buildType: 'CodeZip', + protocol: 'HTTP', }; describe('DevServer', () => { diff --git a/src/cli/operations/dev/__tests__/invoke-a2a.test.ts b/src/cli/operations/dev/__tests__/invoke-a2a.test.ts new file mode 100644 index 00000000..9428177f --- /dev/null +++ b/src/cli/operations/dev/__tests__/invoke-a2a.test.ts @@ -0,0 +1,212 @@ +import { ServerError } from '../invoke'; +import { invokeA2AStreaming } from '../invoke-a2a'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const mockFetch = vi.fn(); +global.fetch = mockFetch; + +describe('invokeA2AStreaming', () => { + beforeEach(() => { + mockFetch.mockReset(); + }); + + it('sends message/stream JSON-RPC and yields artifact text', async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map([['content-type', 'application/json']]), + body: null, + text: () => + JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { + id: 'task-1', + status: { state: 'completed' }, + artifacts: [ + { + parts: [{ type: 'text', text: 'The answer is 4.' }], + }, + ], + }, + }), + }); + + const chunks: string[] = []; + for await (const chunk of invokeA2AStreaming({ port: 8080, message: 'what is 2+2' })) { + chunks.push(chunk); + } + + expect(chunks.join('')).toBe('The answer is 4.'); + + // Verify the request format + const call = mockFetch.mock.calls[0]!; + expect(call[0]).toBe('http://localhost:8080/'); + const body = JSON.parse(call[1]!.body); + expect(body.method).toBe('message/stream'); + expect(body.params.message.messageId).toBeDefined(); + expect(body.params.message.parts[0].kind).toBe('text'); + expect(body.params.message.parts[0].text).toBe('what is 2+2'); + }); + + it('retries on connection errors', async () => { + // First attempt fails + mockFetch.mockRejectedValueOnce(new TypeError('fetch failed')); + + // Second attempt succeeds + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map([['content-type', 'application/json']]), + body: null, + text: () => + JSON.stringify({ + jsonrpc: '2.0', + id: 2, + result: { + artifacts: [{ parts: [{ type: 'text', text: 'ok' }] }], + }, + }), + }); + + const chunks: string[] = []; + for await (const chunk of invokeA2AStreaming({ port: 8080, message: 'hello' })) { + chunks.push(chunk); + } + + expect(chunks.join('')).toBe('ok'); + expect(mockFetch).toHaveBeenCalledTimes(2); + }); + + it('throws ServerError on HTTP error without retrying', async () => { + mockFetch.mockResolvedValueOnce({ + ok: false, + status: 500, + text: () => 'Internal Server Error', + }); + + const gen = invokeA2AStreaming({ port: 8080, message: 'test' }); + await expect(gen.next()).rejects.toThrow(ServerError); + }); + + it('handles JSON-RPC error in response', async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map([['content-type', 'application/json']]), + body: null, + text: () => + JSON.stringify({ + jsonrpc: '2.0', + id: 1, + error: { code: -32600, message: 'Bad request' }, + }), + }); + + const gen = invokeA2AStreaming({ port: 8080, message: 'test' }); + await expect(gen.next()).rejects.toThrow(ServerError); + }); + + it('streams text from status-update events and skips duplicate artifact-update', async () => { + // Simulate SSE stream with status-update chunks followed by artifact-update + const sseLines = [ + 'data: {"kind":"status-update","status":{"state":"working","message":{"parts":[{"kind":"text","text":"Hello"}]}}}\n\n', + 'data: {"kind":"status-update","status":{"state":"working","message":{"parts":[{"kind":"text","text":" world"}]}}}\n\n', + 'data: {"kind":"artifact-update","artifact":{"parts":[{"kind":"text","text":"Hello world"}]}}\n\n', + 'data: {"kind":"status-update","status":{"state":"completed"},"final":true}\n\n', + ]; + + const encoder = new TextEncoder(); + let chunkIndex = 0; + const mockBody = { + getReader: () => ({ + read: () => { + if (chunkIndex < sseLines.length) { + return Promise.resolve({ done: false as const, value: encoder.encode(sseLines[chunkIndex++]) }); + } + return Promise.resolve({ done: true as const, value: undefined }); + }, + releaseLock: vi.fn(), + }), + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map([['content-type', 'text/event-stream']]), + body: mockBody, + }); + + const chunks: string[] = []; + const statuses: string[] = []; + for await (const chunk of invokeA2AStreaming({ + port: 8080, + message: 'hello', + onStatus: s => statuses.push(s), + })) { + chunks.push(chunk); + } + + // Should yield incremental status-update text, not the duplicate artifact-update + expect(chunks).toEqual(['Hello', ' world']); + expect(chunks.join('')).toBe('Hello world'); + // Should have received status callbacks + expect(statuses).toContain('working'); + expect(statuses).toContain('completed'); + }); + + it('yields artifact-update text when no status-update text was streamed', async () => { + // SSE stream with only artifact-update (no streaming status-update text) + const sseLines = [ + 'data: {"kind":"status-update","status":{"state":"working"}}\n\n', + 'data: {"kind":"artifact-update","artifact":{"parts":[{"kind":"text","text":"Result here"}]}}\n\n', + 'data: {"kind":"status-update","status":{"state":"completed"},"final":true}\n\n', + ]; + + const encoder = new TextEncoder(); + let chunkIndex = 0; + const mockBody = { + getReader: () => ({ + read: () => { + if (chunkIndex < sseLines.length) { + return Promise.resolve({ done: false as const, value: encoder.encode(sseLines[chunkIndex++]) }); + } + return Promise.resolve({ done: true as const, value: undefined }); + }, + releaseLock: vi.fn(), + }), + }; + + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map([['content-type', 'text/event-stream']]), + body: mockBody, + }); + + const chunks: string[] = []; + for await (const chunk of invokeA2AStreaming({ port: 8080, message: 'hello' })) { + chunks.push(chunk); + } + + // Should yield artifact-update text since no status-update text was streamed + expect(chunks).toEqual(['Result here']); + }); + + it('yields fallback JSON when no artifacts found', async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map([['content-type', 'application/json']]), + body: null, + text: () => + JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { id: 'task-1', status: { state: 'completed' } }, + }), + }); + + const chunks: string[] = []; + for await (const chunk of invokeA2AStreaming({ port: 8080, message: 'test' })) { + chunks.push(chunk); + } + + // Should yield the stringified result as fallback + expect(chunks.length).toBeGreaterThan(0); + }); +}); diff --git a/src/cli/operations/dev/__tests__/invoke-mcp.test.ts b/src/cli/operations/dev/__tests__/invoke-mcp.test.ts new file mode 100644 index 00000000..1ca0758e --- /dev/null +++ b/src/cli/operations/dev/__tests__/invoke-mcp.test.ts @@ -0,0 +1,146 @@ +import { ServerError } from '../invoke'; +import { callMcpTool, listMcpTools } from '../invoke-mcp'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const mockFetch = vi.fn(); +global.fetch = mockFetch; + +describe('listMcpTools', () => { + beforeEach(() => { + mockFetch.mockReset(); + }); + + it('sends initialize + tools/list and returns parsed tools', async () => { + // Mock initialize response + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map([['mcp-session-id', 'test-session']]), + text: () => JSON.stringify({ jsonrpc: '2.0', id: 1, result: { protocolVersion: '2025-03-26' } }), + }); + + // Mock initialized notification response + mockFetch.mockResolvedValueOnce({ ok: true, text: () => '' }); + + // Mock tools/list response + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => + JSON.stringify({ + jsonrpc: '2.0', + id: 2, + result: { + tools: [ + { + name: 'add_numbers', + description: 'Add two numbers', + inputSchema: { properties: { a: { type: 'integer' }, b: { type: 'integer' } } }, + }, + { name: 'greet', description: 'Say hello' }, + ], + }, + }), + }); + + const result = await listMcpTools(8080); + + expect(result.tools).toHaveLength(2); + expect(result.tools[0]!.name).toBe('add_numbers'); + expect(result.tools[0]!.description).toBe('Add two numbers'); + expect(result.tools[1]!.name).toBe('greet'); + expect(result.sessionId).toBe('test-session'); + + // Verify initialize was called first + expect(mockFetch).toHaveBeenCalledTimes(3); + const initCall = mockFetch.mock.calls[0]!; + expect(initCall[0]).toBe('http://localhost:8080/mcp'); + const initBody = JSON.parse(initCall[1]!.body); + expect(initBody.method).toBe('initialize'); + }); + + it('retries on connection errors', async () => { + // First attempt fails with connection error + mockFetch.mockRejectedValueOnce(new TypeError('fetch failed')); + + // Second attempt succeeds + mockFetch.mockResolvedValueOnce({ + ok: true, + headers: new Map(), + text: () => JSON.stringify({ jsonrpc: '2.0', id: 1, result: {} }), + }); + mockFetch.mockResolvedValueOnce({ ok: true, text: () => '' }); + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => JSON.stringify({ jsonrpc: '2.0', id: 2, result: { tools: [] } }), + }); + + const result = await listMcpTools(8080); + expect(result.tools).toEqual([]); + // 1 failed + 3 successful = 4 total calls + expect(mockFetch).toHaveBeenCalledTimes(4); + }); + + it('throws ServerError on HTTP error', async () => { + mockFetch.mockResolvedValueOnce({ + ok: false, + status: 500, + text: () => 'Internal Server Error', + }); + + await expect(listMcpTools(8080)).rejects.toThrow(ServerError); + }); +}); + +describe('callMcpTool', () => { + beforeEach(() => { + mockFetch.mockReset(); + }); + + it('sends tools/call and returns result text', async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => + JSON.stringify({ + jsonrpc: '2.0', + id: 1, + result: { + content: [{ type: 'text', text: '42' }], + }, + }), + }); + + const result = await callMcpTool(8080, 'add_numbers', { a: 1, b: 2 }); + expect(result).toBe('42'); + + const call = mockFetch.mock.calls[0]!; + const body = JSON.parse(call[1]!.body); + expect(body.method).toBe('tools/call'); + expect(body.params.name).toBe('add_numbers'); + expect(body.params.arguments).toEqual({ a: 1, b: 2 }); + }); + + it('includes session ID in header when provided', async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => JSON.stringify({ jsonrpc: '2.0', id: 1, result: { content: [{ text: 'ok' }] } }), + }); + + await callMcpTool(8080, 'test', {}, 'my-session'); + + const call = mockFetch.mock.calls[0]!; + expect(call[1]!.headers['mcp-session-id']).toBe('my-session'); + }); + + it('throws on JSON-RPC error', async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => + JSON.stringify({ + jsonrpc: '2.0', + id: 1, + error: { code: -32600, message: 'Invalid tool' }, + }), + }); + + await expect(callMcpTool(8080, 'bad_tool', {})).rejects.toThrow('Invalid tool'); + }); +}); diff --git a/src/cli/operations/dev/__tests__/utils.test.ts b/src/cli/operations/dev/__tests__/utils.test.ts index fcb44c96..0e24c789 100644 --- a/src/cli/operations/dev/__tests__/utils.test.ts +++ b/src/cli/operations/dev/__tests__/utils.test.ts @@ -1,4 +1,12 @@ -import { convertEntrypointToModule, findAvailablePort, waitForPort } from '../utils.js'; +import { + convertEntrypointToModule, + findAvailablePort, + formatMcpToolList, + getEndpointUrl, + isConnectionError, + sleep, + waitForPort, +} from '../utils.js'; import { afterEach, describe, expect, it, vi } from 'vitest'; /** @@ -97,3 +105,77 @@ describe('waitForPort', () => { expect(result).toBe(false); }); }); + +describe('getEndpointUrl', () => { + it('returns /mcp for MCP protocol', () => { + expect(getEndpointUrl(8000, 'MCP')).toBe('http://localhost:8000/mcp'); + }); + + it('returns / for A2A protocol', () => { + expect(getEndpointUrl(9000, 'A2A')).toBe('http://localhost:9000/'); + }); + + it('returns /invocations for HTTP protocol', () => { + expect(getEndpointUrl(8080, 'HTTP')).toBe('http://localhost:8080/invocations'); + }); + + it('returns /invocations for unknown protocol', () => { + expect(getEndpointUrl(8080, 'UNKNOWN')).toBe('http://localhost:8080/invocations'); + }); +}); + +describe('formatMcpToolList', () => { + it('formats tools with descriptions and params', () => { + const tools = [ + { + name: 'add', + description: 'Add numbers', + inputSchema: { properties: { a: { type: 'integer' }, b: { type: 'integer' } } }, + }, + { name: 'greet', description: 'Say hello' }, + ]; + const result = formatMcpToolList(tools); + expect(result).toContain('Available tools (2)'); + expect(result).toContain('add(a: integer, b: integer) - Add numbers'); + expect(result).toContain('greet() - Say hello'); + expect(result).toContain('Type: tool_name'); + }); + + it('handles tools with no description', () => { + const tools = [{ name: 'test' }]; + const result = formatMcpToolList(tools); + expect(result).toContain('test()'); + expect(result).not.toContain(' - '); + }); + + it('handles empty tool list', () => { + const result = formatMcpToolList([]); + expect(result).toContain('Available tools (0)'); + }); +}); + +describe('isConnectionError', () => { + it('detects ECONNREFUSED', () => { + expect(isConnectionError(new Error('connect ECONNREFUSED 127.0.0.1:8080'))).toBe(true); + }); + + it('detects fetch failed', () => { + expect(isConnectionError(new Error('fetch failed'))).toBe(true); + }); + + it('does not match arbitrary errors', () => { + expect(isConnectionError(new Error('timeout'))).toBe(false); + }); + + it('does not match partial "fetch" in other messages', () => { + expect(isConnectionError(new Error('failed to fetch data from API'))).toBe(false); + }); +}); + +describe('sleep', () => { + it('resolves after specified delay', async () => { + const start = Date.now(); + await sleep(50); + expect(Date.now() - start).toBeGreaterThanOrEqual(40); + }); +}); diff --git a/src/cli/operations/dev/codezip-dev-server.ts b/src/cli/operations/dev/codezip-dev-server.ts index ae5b46f6..4428493d 100644 --- a/src/cli/operations/dev/codezip-dev-server.ts +++ b/src/cli/operations/dev/codezip-dev-server.ts @@ -1,4 +1,5 @@ import { getVenvExecutable } from '../../../lib/utils/platform'; +import type { ProtocolMode } from '../../../schema'; import { DevServer, type LogLevel, type SpawnConfig } from './dev-server'; import { convertEntrypointToModule } from './utils'; import { spawnSync } from 'child_process'; @@ -8,15 +9,28 @@ import { join } from 'path'; /** * Ensures a Python virtual environment exists and has dependencies installed. * Creates the venv and runs uv sync if .venv doesn't exist. + * For non-HTTP protocols, checks for python instead of uvicorn. * Returns true if successful, false otherwise. */ -function ensurePythonVenv(cwd: string, onLog: (level: LogLevel, message: string) => void): boolean { +function ensurePythonVenv( + cwd: string, + onLog: (level: LogLevel, message: string) => void, + protocol: ProtocolMode = 'HTTP' +): boolean { const venvPath = join(cwd, '.venv'); - const uvicornPath = getVenvExecutable(venvPath, 'uvicorn'); - // Check if venv and uvicorn already exist - if (existsSync(uvicornPath)) { - return true; + if (protocol === 'HTTP') { + // For HTTP, uvicorn binary is a reliable proxy for "deps installed" + const uvicornPath = getVenvExecutable(venvPath, 'uvicorn'); + if (existsSync(uvicornPath)) { + return true; + } + } else { + // For MCP/A2A, check python binary as a proxy for "venv + deps installed" + const pythonPath = getVenvExecutable(venvPath, 'python'); + if (existsSync(pythonPath)) { + return true; + } } onLog('system', 'Setting up Python environment...'); @@ -35,11 +49,16 @@ function ensurePythonVenv(cwd: string, onLog: (level: LogLevel, message: string) onLog('info', 'Installing dependencies...'); const syncResult = spawnSync('uv', ['sync'], { cwd, stdio: 'pipe' }); if (syncResult.status !== 0) { - // Fallback: try installing uvicorn directly if uv sync fails - onLog('warn', 'uv sync failed, trying direct uvicorn install...'); - const pipResult = spawnSync('uv', ['pip', 'install', 'uvicorn'], { cwd, stdio: 'pipe' }); - if (pipResult.status !== 0) { - onLog('error', `Failed to install dependencies: ${pipResult.stderr?.toString() || 'unknown error'}`); + if (protocol === 'HTTP') { + // Fallback: try installing uvicorn directly if uv sync fails + onLog('warn', 'uv sync failed, trying direct uvicorn install...'); + const pipResult = spawnSync('uv', ['pip', 'install', 'uvicorn'], { cwd, stdio: 'pipe' }); + if (pipResult.status !== 0) { + onLog('error', `Failed to install dependencies: ${pipResult.stderr?.toString() || 'unknown error'}`); + return false; + } + } else { + onLog('error', `Failed to install dependencies: ${syncResult.stderr?.toString() || 'unknown error'}`); return false; } } @@ -52,24 +71,43 @@ function ensurePythonVenv(cwd: string, onLog: (level: LogLevel, message: string) export class CodeZipDevServer extends DevServer { protected prepare(): Promise { return Promise.resolve( - this.config.isPython ? ensurePythonVenv(this.config.directory, this.options.callbacks.onLog) : true + this.config.isPython + ? ensurePythonVenv(this.config.directory, this.options.callbacks.onLog, this.config.protocol) + : true ); } protected getSpawnConfig(): SpawnConfig { - const { module, directory, isPython } = this.config; + const { module, directory, isPython, protocol } = this.config; const { port, envVars = {} } = this.options; + const env = { ...process.env, ...envVars, PORT: String(port), LOCAL_DEV: '1' }; + + if (!isPython) { + // Node.js path (unchanged) + return { + cmd: 'npx', + args: ['tsx', 'watch', (module.split(':')[0] ?? module).replace(/\./g, '/') + '.ts'], + cwd: directory, + env, + }; + } + + const venvDir = join(directory, '.venv'); - const cmd = isPython ? getVenvExecutable(join(directory, '.venv'), 'uvicorn') : 'npx'; - const args = isPython - ? [convertEntrypointToModule(module), '--reload', '--host', '127.0.0.1', '--port', String(port)] - : ['tsx', 'watch', (module.split(':')[0] ?? module).replace(/\./g, '/') + '.ts']; + if (protocol !== 'HTTP') { + // MCP/A2A: run python main.py directly (no module-level ASGI app) + const python = getVenvExecutable(venvDir, 'python'); + const entryFile = module.split(':')[0] ?? module; + return { cmd: python, args: [entryFile], cwd: directory, env }; + } + // HTTP: uvicorn with hot-reload (existing behavior) + const uvicorn = getVenvExecutable(venvDir, 'uvicorn'); return { - cmd, - args, + cmd: uvicorn, + args: [convertEntrypointToModule(module), '--reload', '--host', '127.0.0.1', '--port', String(port)], cwd: directory, - env: { ...process.env, ...envVars, PORT: String(port), LOCAL_DEV: '1' }, + env, }; } } diff --git a/src/cli/operations/dev/config.ts b/src/cli/operations/dev/config.ts index 647926c9..fdbeca65 100644 --- a/src/cli/operations/dev/config.ts +++ b/src/cli/operations/dev/config.ts @@ -1,5 +1,5 @@ import { ConfigIO, findConfigRoot } from '../../../lib'; -import type { AgentCoreProjectSpec, AgentEnvSpec, BuildType } from '../../../schema'; +import type { AgentCoreProjectSpec, AgentEnvSpec, BuildType, ProtocolMode } from '../../../schema'; import { dirname, isAbsolute, join } from 'node:path'; export interface DevConfig { @@ -9,6 +9,7 @@ export interface DevConfig { hasConfig: boolean; isPython: boolean; buildType: BuildType; + protocol: ProtocolMode; } interface DevSupportResult { @@ -138,6 +139,7 @@ export function getDevConfig( hasConfig: true, isPython: isPythonAgent(targetAgent), buildType: targetAgent.build, + protocol: targetAgent.protocol ?? 'HTTP', }; } diff --git a/src/cli/operations/dev/container-dev-server.ts b/src/cli/operations/dev/container-dev-server.ts index 2240496b..3690e8ee 100644 --- a/src/cli/operations/dev/container-dev-server.ts +++ b/src/cli/operations/dev/container-dev-server.ts @@ -2,6 +2,7 @@ import { CONTAINER_INTERNAL_PORT, DOCKERFILE_NAME } from '../../../lib'; import { getUvBuildArgs } from '../../../lib/packaging/build-args'; import { detectContainerRuntime, getStartHint } from '../../external-requirements/detect'; import { DevServer, type LogLevel, type SpawnConfig } from './dev-server'; +import { waitForServerReady } from './utils'; import { type ChildProcess, spawn, spawnSync } from 'child_process'; import { existsSync } from 'fs'; import { homedir } from 'os'; @@ -21,16 +22,24 @@ export class ContainerDevServer extends DevServer { return this.imageName; } - /** Override start to log when the container is launched and trigger TUI readiness detection. - * DevLogger filters 'info', so without a 'system'-level message the log would be empty after build. - * Include "Application startup complete" so the TUI detects container readiness. */ + /** Override start to wait for the container's server to accept connections before + * signaling readiness. The base class spawns `docker run`, but the internal server + * needs time to boot. We poll the mapped port so the TUI only enables input once + * the container is actually ready to handle requests. */ override async start(): Promise { const child = await super.start(); if (child) { const { onLog } = this.options.callbacks; - onLog('system', `Container ${this.containerName} started on port ${this.options.port}.`); - // Trigger TUI readiness detection (useDevServer looks for this exact string) - onLog('info', 'Application startup complete'); + onLog('system', `Container ${this.containerName} started, waiting for server to be ready...`); + + // Poll until the container's server is accepting connections (up to 60s) + const ready = await waitForServerReady(this.options.port); + if (ready) { + // Trigger TUI readiness detection (useDevServer looks for this exact string) + onLog('info', 'Application startup complete'); + } else { + onLog('error', 'Container server did not become ready within 60 seconds.'); + } } return child; } diff --git a/src/cli/operations/dev/index.ts b/src/cli/operations/dev/index.ts index 4d9ee675..ae3c8edd 100644 --- a/src/cli/operations/dev/index.ts +++ b/src/cli/operations/dev/index.ts @@ -10,4 +10,10 @@ export { export { getDevConfig, getDevSupportedAgents, getAgentPort, loadProjectConfig, type DevConfig } from './config'; -export { ConnectionError, ServerError, invokeAgent, invokeAgentStreaming } from './invoke'; +export { ConnectionError, ServerError, invokeAgent, invokeAgentStreaming, invokeForProtocol } from './invoke'; + +export { invokeA2AStreaming, fetchA2AAgentCard, type A2AAgentCard } from './invoke-a2a'; + +export { listMcpTools, callMcpTool, type McpTool, type McpToolsResult } from './invoke-mcp'; + +export { getEndpointUrl, formatMcpToolList } from './utils'; diff --git a/src/cli/operations/dev/invoke-a2a.ts b/src/cli/operations/dev/invoke-a2a.ts new file mode 100644 index 00000000..d67fd74d --- /dev/null +++ b/src/cli/operations/dev/invoke-a2a.ts @@ -0,0 +1,294 @@ +import { ConnectionError, type InvokeStreamingOptions, type SSELogger, ServerError } from './invoke-types'; +import { isConnectionError, sleep } from './utils'; +import { randomUUID } from 'crypto'; + +let requestId = 1; + +export interface A2AAgentCard { + name?: string; + description?: string; + version?: string; + url?: string; + skills?: { id?: string; name?: string; description?: string; tags?: string[] }[]; + capabilities?: { streaming?: boolean }; + defaultInputModes?: string[]; + defaultOutputModes?: string[]; +} + +/** + * Fetch the A2A agent card from /.well-known/agent.json. + * Returns null if not available (retries on connection errors). + */ +export async function fetchA2AAgentCard(port: number, logger?: SSELogger): Promise { + const maxRetries = 5; + const baseDelay = 500; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + const res = await fetch(`http://localhost:${port}/.well-known/agent.json`, { + method: 'GET', + headers: { Accept: 'application/json' }, + }); + + if (!res.ok) { + logger?.log?.('warn', `Agent card not available (${res.status})`); + return null; + } + + const card = (await res.json()) as A2AAgentCard; + logger?.log?.('system', `A2A agent card: ${card.name ?? 'unnamed'}`); + return card; + } catch (err) { + const error = err instanceof Error ? err : new Error(String(err)); + if (isConnectionError(error) && attempt < maxRetries - 1) { + const delay = baseDelay * Math.pow(2, attempt); + await sleep(delay); + continue; + } + + logger?.log?.('warn', `Failed to fetch agent card: ${error.message}`); + return null; + } + } + + return null; +} + +/** + * Invokes an A2A agent using JSON-RPC 2.0 message/stream (SSE) with + * fallback to message/send (non-streaming). + * Yields text chunks as they arrive from artifact-update and status-update events. + */ +export async function* invokeA2AStreaming(options: InvokeStreamingOptions): AsyncGenerator { + const { port, message: msg, logger, onStatus } = options; + const maxRetries = 5; + const baseDelay = 500; + let lastError: Error | null = null; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + const body = { + jsonrpc: '2.0', + id: requestId++, + method: 'message/stream', + params: { + message: { + messageId: randomUUID(), + role: 'user', + parts: [{ kind: 'text', text: msg }], + }, + }, + }; + + logger?.log?.('system', `A2A message/stream: ${msg}`); + + const res = await fetch(`http://localhost:${port}/`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', Accept: 'text/event-stream' }, + body: JSON.stringify(body), + }); + + if (!res.ok) { + const responseBody = await res.text(); + throw new ServerError(res.status, responseBody); + } + + const contentType = res.headers.get('content-type') ?? ''; + + // Handle SSE streaming response + if (contentType.includes('text/event-stream') && res.body) { + yield* parseA2ASSEStream(res.body, logger, onStatus); + return; + } + + // Handle non-streaming JSON-RPC response (fallback) + const responseText = await res.text(); + logger?.logSSEEvent(responseText); + + try { + const json = JSON.parse(responseText) as Record; + + if (json.error) { + const rpcError = json.error as { message?: string; code?: number }; + throw new ServerError(rpcError.code ?? 500, rpcError.message ?? 'A2A RPC error'); + } + + const result = json.result as Record | undefined; + if (result) { + const text = extractTaskText(result); + if (text) { + yield text; + } else { + yield JSON.stringify(result, null, 2); + } + } else { + yield responseText; + } + } catch (e) { + if (e instanceof ServerError) throw e; + yield responseText; + } + + return; + } catch (err) { + if (err instanceof ServerError) { + logger?.log?.('error', `Server error (${err.statusCode}): ${err.message}`); + throw err; + } + + lastError = err instanceof Error ? err : new Error(String(err)); + + if (isConnectionError(lastError)) { + const delay = baseDelay * Math.pow(2, attempt); + logger?.log?.( + 'warn', + `Connection failed (attempt ${attempt + 1}/${maxRetries}): ${lastError.message}. Retrying in ${delay}ms...` + ); + await sleep(delay); + continue; + } + + logger?.log?.('error', `Request failed: ${lastError.stack ?? lastError.message}`); + throw lastError; + } + } + + const finalError = new ConnectionError(lastError ?? new Error('Failed to connect to A2A server after retries')); + logger?.log?.('error', `Failed to connect after ${maxRetries} attempts: ${finalError.message}`); + throw finalError; +} + +/** Parse SSE stream from A2A message/stream response */ +async function* parseA2ASSEStream( + body: ReadableStream, + logger?: SSELogger, + onStatus?: (status: string) => void +): AsyncGenerator { + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + let streamedFromStatus = false; + + try { + while (true) { + const result = await reader.read(); + if (result.done) break; + + buffer += decoder.decode(result.value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() ?? ''; + + for (const line of lines) { + if (!line.startsWith('data: ')) continue; + const data = line.slice(6).trim(); + if (!data) continue; + + logger?.logSSEEvent(line); + + try { + const event = JSON.parse(data) as Record; + handleSSEEvent(event, onStatus); + const text = extractSSEEventText(event, streamedFromStatus); + if (text) { + if (isStatusUpdateEvent(event)) streamedFromStatus = true; + yield text; + } + } catch { + yield data; + } + } + } + } finally { + reader.releaseLock(); + } +} + +/** Dispatch status-update events to the onStatus callback */ +function handleSSEEvent(event: Record, onStatus?: (status: string) => void): void { + if (!onStatus) return; + const target = (event.result as Record) ?? event; + if (target.kind !== 'status-update') return; + const status = target.status as { state?: string } | undefined; + if (status?.state) { + onStatus(status.state); + } +} + +/** Check if an event (possibly wrapped in JSON-RPC envelope) is a status-update */ +function isStatusUpdateEvent(event: Record): boolean { + const target = (event.result as Record) ?? event; + return target.kind === 'status-update'; +} + +/** + * Extract displayable text from an A2A SSE event. + * + * Events come in two forms: + * - artifact-update: { kind: 'artifact-update', artifact: { parts: [{ kind: 'text', text: '...' }] } } + * - status-update: { kind: 'status-update', status: { state: '...', message?: { parts: [...] } }, final: bool } + * + * Events can also be wrapped in a JSON-RPC result envelope. + * + * When `streamedFromStatus` is true, artifact-update text is skipped because + * the same content was already streamed incrementally via status-update events. + */ +function extractSSEEventText(event: Record, streamedFromStatus = false): string | null { + // Unwrap JSON-RPC result envelope if present + const target = (event.result as Record) ?? event; + const kind = target.kind as string | undefined; + + if (kind === 'artifact-update') { + // Skip if we already streamed this content via status-update events + if (streamedFromStatus) return null; + const artifact = target.artifact as { parts?: { kind?: string; text?: string }[] } | undefined; + return extractPartsText(artifact?.parts); + } + + if (kind === 'status-update') { + // Extract streaming text from status-update message parts (working state) + const status = target.status as + | { state?: string; message?: { parts?: { kind?: string; type?: string; text?: string }[] } } + | undefined; + if (status?.message?.parts) { + return extractPartsText(status.message.parts); + } + return null; + } + + // Fallback: try extracting from a full Task result (non-streaming envelope) + return extractTaskText(target); +} + +/** Extract text from a full Task result (has artifacts array and/or status) */ +function extractTaskText(result: Record): string | null { + // Try artifacts first + const artifacts = result.artifacts as { parts?: { kind?: string; type?: string; text?: string }[] }[] | undefined; + if (artifacts) { + const texts: string[] = []; + for (const artifact of artifacts) { + const text = extractPartsText(artifact.parts); + if (text) texts.push(text); + } + if (texts.length > 0) return texts.join('\n'); + } + + // Try status message + const status = result.status as { message?: { parts?: { kind?: string; text?: string }[] } } | undefined; + if (status?.message?.parts) { + return extractPartsText(status.message.parts); + } + + return null; +} + +/** Extract text from a parts array (supports both kind:'text' and type:'text' formats) */ +function extractPartsText(parts: { kind?: string; type?: string; text?: string }[] | undefined): string | null { + if (!parts) return null; + const texts: string[] = []; + for (const part of parts) { + if ((part.kind === 'text' || part.type === 'text') && part.text) { + texts.push(part.text); + } + } + return texts.length > 0 ? texts.join('') : null; +} diff --git a/src/cli/operations/dev/invoke-mcp.ts b/src/cli/operations/dev/invoke-mcp.ts new file mode 100644 index 00000000..df677c85 --- /dev/null +++ b/src/cli/operations/dev/invoke-mcp.ts @@ -0,0 +1,198 @@ +import { parseJsonRpcResponse } from '../../../lib/utils/json-rpc'; +import { ConnectionError, type SSELogger, ServerError } from './invoke-types'; +import { isConnectionError, sleep } from './utils'; + +let requestId = 1; + +export interface McpTool { + name: string; + description?: string; + inputSchema?: Record; +} + +export interface McpToolsResult { + tools: McpTool[]; + sessionId?: string; +} + +/** + * Initialize MCP session and list available tools. + * Sends initialize + tools/list JSON-RPC requests to the MCP endpoint. + * Returns tools and the session ID needed for subsequent calls. + */ +export async function listMcpTools(port: number, logger?: SSELogger): Promise { + const maxRetries = 5; + const baseDelay = 500; + let lastError: Error | null = null; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + // 1. Initialize session + const initBody = { + jsonrpc: '2.0', + id: requestId++, + method: 'initialize', + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { name: 'agentcore-cli', version: '1.0.0' }, + }, + }; + + logger?.log?.('system', 'MCP initialize'); + + const initRes = await fetch(`http://localhost:${port}/mcp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', Accept: 'application/json, text/event-stream' }, + body: JSON.stringify(initBody), + }); + + if (!initRes.ok) { + const body = await initRes.text(); + throw new ServerError(initRes.status, body); + } + + // Extract session ID from response header + const sessionId = initRes.headers.get('mcp-session-id'); + const initResponseText = await initRes.text(); + logger?.logSSEEvent(initResponseText); + + // 2. Send initialized notification + const initializedBody = { + jsonrpc: '2.0', + method: 'notifications/initialized', + }; + + const headers: Record = { 'Content-Type': 'application/json' }; + if (sessionId) headers['mcp-session-id'] = sessionId; + + await fetch(`http://localhost:${port}/mcp`, { + method: 'POST', + headers, + body: JSON.stringify(initializedBody), + }); + + // 3. List tools + const listBody = { + jsonrpc: '2.0', + id: requestId++, + method: 'tools/list', + params: {}, + }; + + logger?.log?.('system', 'MCP tools/list'); + + const listRes = await fetch(`http://localhost:${port}/mcp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', Accept: 'application/json, text/event-stream', ...headers }, + body: JSON.stringify(listBody), + }); + + if (!listRes.ok) { + const body = await listRes.text(); + throw new ServerError(listRes.status, body); + } + + const listResponseText = await listRes.text(); + logger?.logSSEEvent(listResponseText); + + const parsed = parseJsonRpcResponse(listResponseText); + const result = parsed.result as { tools?: McpTool[] } | undefined; + const tools = result?.tools ?? []; + + return { + tools: tools.map(t => ({ + name: t.name, + description: t.description, + inputSchema: t.inputSchema, + })), + sessionId: sessionId ?? undefined, + }; + } catch (err) { + if (err instanceof ServerError) { + logger?.log?.('error', `Server error (${err.statusCode}): ${err.message}`); + throw err; + } + + lastError = err instanceof Error ? err : new Error(String(err)); + + if (isConnectionError(lastError)) { + const delay = baseDelay * Math.pow(2, attempt); + logger?.log?.( + 'warn', + `Connection failed (attempt ${attempt + 1}/${maxRetries}): ${lastError.message}. Retrying in ${delay}ms...` + ); + await sleep(delay); + continue; + } + + logger?.log?.('error', `Request failed: ${lastError.stack ?? lastError.message}`); + throw lastError; + } + } + + const finalError = new ConnectionError(lastError ?? new Error('Failed to connect to MCP server after retries')); + logger?.log?.('error', `Failed to connect after ${maxRetries} attempts: ${finalError.message}`); + throw finalError; +} + +/** + * Call an MCP tool by name with JSON arguments. + * Requires a session ID from a previous initialize call. + */ +export async function callMcpTool( + port: number, + toolName: string, + args: Record, + sessionId?: string, + logger?: SSELogger +): Promise { + const body = { + jsonrpc: '2.0', + id: requestId++, + method: 'tools/call', + params: { name: toolName, arguments: args }, + }; + + logger?.log?.('system', `MCP tools/call: ${toolName}(${JSON.stringify(args)})`); + + const headers: Record = { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + }; + if (sessionId) headers['mcp-session-id'] = sessionId; + + const res = await fetch(`http://localhost:${port}/mcp`, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); + + if (!res.ok) { + const responseBody = await res.text(); + throw new ServerError(res.status, responseBody); + } + + const responseText = await res.text(); + logger?.logSSEEvent(responseText); + + const parsed = parseJsonRpcResponse(responseText); + + if (parsed.error) { + const rpcError = parsed.error as { message?: string; code?: number }; + throw new Error(rpcError.message ?? `MCP error (code ${rpcError.code})`); + } + + const result = parsed.result as { content?: { type?: string; text?: string }[] } | undefined; + if (result?.content) { + const texts: string[] = []; + for (const item of result.content) { + if (item.text !== undefined) { + texts.push(item.text); + } + } + if (texts.length > 0) return texts.join(''); + } + + return JSON.stringify(parsed.result, null, 2); +} diff --git a/src/cli/operations/dev/invoke-types.ts b/src/cli/operations/dev/invoke-types.ts new file mode 100644 index 00000000..1b481718 --- /dev/null +++ b/src/cli/operations/dev/invoke-types.ts @@ -0,0 +1,34 @@ +/** Error thrown when the dev server returns a non-OK HTTP response. */ +export class ServerError extends Error { + constructor( + public readonly statusCode: number, + body: string + ) { + super(body || `Server returned ${statusCode}`); + this.name = 'ServerError'; + } +} + +/** Error thrown when the connection to the dev server fails. */ +export class ConnectionError extends Error { + constructor(cause: Error) { + super(cause.message); + this.name = 'ConnectionError'; + } +} + +/** Logger interface for SSE events and error logging */ +export interface SSELogger { + logSSEEvent(rawLine: string): void; + /** Optional method to log errors and debug info */ + log?(level: 'error' | 'warn' | 'system', message: string): void; +} + +export interface InvokeStreamingOptions { + port: number; + message: string; + /** Optional logger for SSE event debugging */ + logger?: SSELogger; + /** Callback for A2A task status updates (e.g. 'working', 'input-required') */ + onStatus?: (status: string) => void; +} diff --git a/src/cli/operations/dev/invoke.ts b/src/cli/operations/dev/invoke.ts index 0c31cf8f..29e10419 100644 --- a/src/cli/operations/dev/invoke.ts +++ b/src/cli/operations/dev/invoke.ts @@ -1,28 +1,9 @@ -/** Error thrown when the dev server returns a non-OK HTTP response. */ -export class ServerError extends Error { - constructor( - public readonly statusCode: number, - body: string - ) { - super(body || `Server returned ${statusCode}`); - this.name = 'ServerError'; - } -} - -/** Error thrown when the connection to the dev server fails. */ -export class ConnectionError extends Error { - constructor(cause: Error) { - super(cause.message); - this.name = 'ConnectionError'; - } -} +import { invokeA2AStreaming } from './invoke-a2a'; +import { ConnectionError, type InvokeStreamingOptions, type SSELogger, ServerError } from './invoke-types'; +import { isConnectionError, sleep } from './utils'; -/** Logger interface for SSE events and error logging */ -export interface SSELogger { - logSSEEvent(rawLine: string): void; - /** Optional method to log errors and debug info */ - log?(level: 'error' | 'warn' | 'system', message: string): void; -} +// Re-export shared types so existing consumers don't break +export { ConnectionError, ServerError, type InvokeStreamingOptions, type SSELogger } from './invoke-types'; /** * Parse a single SSE data line and extract the content. @@ -69,13 +50,6 @@ function parseSSE(text: string): string { return parts.length > 0 ? parts.join('') : text; } -/** - * Sleep helper for retry delays. - */ -function sleep(ms: number): Promise { - return new Promise(resolve => setTimeout(resolve, ms)); -} - /** * Extract result from a JSON response object. * Handles both {"result": "..."} and plain text responses. @@ -93,13 +67,6 @@ function extractResult(text: string): string { } } -export interface InvokeStreamingOptions { - port: number; - message: string; - /** Optional logger for SSE event debugging */ - logger?: SSELogger; -} - /** * Invokes an agent on the local dev server and streams the response. * Yields text chunks as they arrive from the SSE stream. @@ -209,9 +176,8 @@ export async function* invokeAgentStreaming( } lastError = err instanceof Error ? err : new Error(String(err)); - const isConnectionError = lastError.message.includes('fetch') || lastError.message.includes('ECONNREFUSED'); - if (isConnectionError) { + if (isConnectionError(lastError)) { const delay = baseDelay * Math.pow(2, attempt); logger?.log?.( 'warn', @@ -240,6 +206,26 @@ export interface InvokeOptions { logger?: SSELogger; } +/** + * Protocol-aware invoke dispatcher. + * Routes to the appropriate invoke implementation based on protocol. + * MCP uses a separate tool-based interface (listMcpTools/callMcpTool). + */ +export async function* invokeForProtocol( + protocol: string, + options: InvokeStreamingOptions +): AsyncGenerator { + switch (protocol) { + case 'MCP': + throw new Error('Use listMcpTools/callMcpTool for MCP agents'); + case 'A2A': + yield* invokeA2AStreaming(options); + break; + default: + yield* invokeAgentStreaming(options); + } +} + /** * Invokes an agent running on the local dev server. * Handles both JSON and streaming text responses. @@ -292,9 +278,8 @@ export async function invokeAgent(portOrOptions: number | InvokeOptions, message } lastError = err instanceof Error ? err : new Error(String(err)); - const isConnectionError = lastError.message.includes('fetch') || lastError.message.includes('ECONNREFUSED'); - if (isConnectionError) { + if (isConnectionError(lastError)) { const delay = baseDelay * Math.pow(2, attempt); logger?.log?.( 'warn', diff --git a/src/cli/operations/dev/utils.ts b/src/cli/operations/dev/utils.ts index 4d0247db..112b31b2 100644 --- a/src/cli/operations/dev/utils.ts +++ b/src/cli/operations/dev/utils.ts @@ -1,4 +1,4 @@ -import { createServer } from 'net'; +import { createConnection, createServer } from 'net'; /** Check if a port is available on a specific host */ function checkPort(port: number, host: string): Promise { @@ -39,6 +39,68 @@ export async function waitForPort(port: number, timeoutMs = 3000): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + const listening = await new Promise(resolve => { + const socket = createConnection({ port, host: '127.0.0.1' }, () => { + socket.destroy(); + resolve(true); + }); + socket.on('error', () => { + socket.destroy(); + resolve(false); + }); + }); + if (listening) return true; + await new Promise(resolve => setTimeout(resolve, 500)); + } + return false; +} + +/** Sleep helper for retry delays. */ +export function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +/** Protocol-specific endpoint URL for display. */ +export function getEndpointUrl(port: number, protocol: string): string { + switch (protocol) { + case 'MCP': + return `http://localhost:${port}/mcp`; + case 'A2A': + return `http://localhost:${port}/`; + default: + return `http://localhost:${port}/invocations`; + } +} + +/** + * Format MCP tools into a displayable list string. + */ +export function formatMcpToolList( + tools: { name: string; description?: string; inputSchema?: Record }[] +): string { + const toolLines = tools.map(t => { + const params = t.inputSchema?.properties + ? Object.entries(t.inputSchema.properties as Record) + .map(([name, schema]) => `${name}: ${schema.type ?? 'any'}`) + .join(', ') + : ''; + return ` ${t.name}(${params})${t.description ? ` - ${t.description}` : ''}`; + }); + return `Available tools (${tools.length}):\n${toolLines.join('\n')}\n\nType: tool_name {"arg": "value"} to call a tool. Type "list" to refresh.`; +} + +/** + * Check if an error is a connection error (ECONNREFUSED or fetch failure). + * Only matches actual network-level failures, not application errors. + */ +export function isConnectionError(error: Error): boolean { + return error.message.includes('ECONNREFUSED') || error.message === 'fetch failed'; +} + export function convertEntrypointToModule(entrypoint: string): string { if (entrypoint.includes(':')) return entrypoint; const path = entrypoint.replace(/\.py$/, '').replace(/\//g, '.'); diff --git a/src/cli/primitives/AgentPrimitive.tsx b/src/cli/primitives/AgentPrimitive.tsx index 33e5d190..3d402d1e 100644 --- a/src/cli/primitives/AgentPrimitive.tsx +++ b/src/cli/primitives/AgentPrimitive.tsx @@ -375,6 +375,8 @@ export class AgentPrimitive extends BasePrimitive(undefined); + const [mcpTools, setMcpTools] = useState([]); + + // A2A state + const [a2aAgentCard, setA2aAgentCard] = useState(null); + const [a2aStatus, setA2aStatus] = useState(null); + const serverRef = useRef(null); const loggerRef = useRef(null); const logsRef = useRef([]); @@ -97,6 +113,8 @@ export function useDevServer(options: { workingDir: string; port: number; agentN return getDevConfig(options.workingDir, project, configRoot, options.agentName); }, [options.workingDir, project, configRoot, options.agentName]); + const protocol: ProtocolMode = config?.protocol ?? 'HTTP'; + // Start server when config is loaded useEffect(() => { if (!configLoaded || !config) return; @@ -112,6 +130,11 @@ export function useDevServer(options: { workingDir: string; port: number; agentN agentName: config.agentName, }); + // A2A servers always use port 9000, MCP servers use port 8000 (framework defaults, not configurable via env) + const isA2A = config.protocol === 'A2A'; + const isMcp = config.protocol === 'MCP'; + const fixedPort = isA2A ? 9000 : isMcp ? 8000 : targetPort; + // On restart, reuse the same port. On initial start, find an available port. // If restart times out waiting for port, fall back to finding a new one. const isRestart = restartTrigger > 0; @@ -122,9 +145,22 @@ export function useDevServer(options: { workingDir: string; port: number; agentN addLog('warn', `Port ${actualPortRef.current} not released, finding new port`); } } - const port = isRestart && portFree ? actualPortRef.current : await findAvailablePort(targetPort); - if (!isRestart && port !== targetPort) { - addLog('warn', `Port ${targetPort} in use, using ${port}`); + + let port: number; + if (isA2A || isMcp) { + // A2A/MCP must use their fixed ports; check availability but don't auto-assign another + const available = await findAvailablePort(fixedPort); + if (available !== fixedPort) { + addLog('error', `Port ${fixedPort} is in use. ${config.protocol} agents require port ${fixedPort}.`); + setStatus('error'); + return; + } + port = fixedPort; + } else { + port = isRestart && portFree ? actualPortRef.current : await findAvailablePort(fixedPort); + if (!isRestart && port !== fixedPort) { + addLog('warn', `Port ${fixedPort} in use, using ${port}`); + } } actualPortRef.current = port; setActualPort(port); @@ -143,7 +179,9 @@ export function useDevServer(options: { workingDir: string; port: number; agentN serverReady = true; setStatus('running'); onReadyRef.current?.(); - addLog('system', `Server ready at http://localhost:${port}/invocations`); + + const endpointUrl = getEndpointUrl(port, config.protocol); + addLog('system', `Server ready at ${endpointUrl}`); } else { addLog(level, message); } @@ -191,8 +229,82 @@ export function useDevServer(options: { workingDir: string; port: number; agentN envVars, ]); + // MCP: auto-list tools when server becomes ready + const mcpToolsRef = useRef([]); + + // A2A: fetch agent card when server becomes ready + const fetchAgentCard = useCallback(async () => { + try { + const card = await fetchA2AAgentCard(actualPortRef.current, loggerRef.current ?? undefined); + setA2aAgentCard(card); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + addLog('warn', `Failed to fetch agent card: ${errMsg}`); + } + }, []); + + const fetchMcpTools = useCallback(async () => { + try { + const result = await listMcpTools(actualPortRef.current, loggerRef.current ?? undefined); + setMcpTools(result.tools); + mcpToolsRef.current = result.tools; + mcpSessionIdRef.current = result.sessionId; + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + addLog('error', `Failed to list MCP tools: ${errMsg}`); + setMcpTools([]); + mcpToolsRef.current = []; + } + }, []); + const invoke = async (message: string) => { - // Add user message to conversation + // MCP: parse tool calls from chat input + if (protocol === 'MCP') { + if (message.trim().toLowerCase() === 'list') { + setConversation(prev => [...prev, { role: 'user', content: message }]); + await fetchMcpTools(); + // Use ref for fresh value after async fetch + const tools = mcpToolsRef.current; + setConversation(prev => [...prev, { role: 'assistant', content: formatMcpToolList(tools), isHint: true }]); + return; + } + + // Parse "tool_name {json_args}" or just "tool_name" + const match = /^(\S+)\s*(.*)/.exec(message); + if (!match) return; + const toolName = match[1]!; + const argsStr = match[2]?.trim() ?? ''; + + setConversation(prev => [...prev, { role: 'user', content: message }]); + setIsStreaming(true); + + try { + let args: Record = {}; + if (argsStr) { + args = JSON.parse(argsStr) as Record; + } + const result = await callMcpTool( + actualPort, + toolName, + args, + mcpSessionIdRef.current, + loggerRef.current ?? undefined + ); + setConversation(prev => [...prev, { role: 'assistant', content: `Result: ${result}` }]); + + loggerRef.current?.log('system', `MCP call: ${toolName}(${argsStr})`); + loggerRef.current?.log('response', result); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + addLog('error', `MCP call failed: ${errMsg}`); + setConversation(prev => [...prev, { role: 'assistant', content: errMsg, isError: true }]); + } finally { + setIsStreaming(false); + } + return; + } + + // HTTP and A2A: chat-style invoke setConversation(prev => [...prev, { role: 'user', content: message }]); setStreamingResponse(null); setIsStreaming(true); @@ -200,14 +312,21 @@ export function useDevServer(options: { workingDir: string; port: number; agentN let responseContent = ''; try { - // Pass logger to capture raw SSE events for debugging - const stream = invokeAgentStreaming({ - port: actualPort, - message, - logger: loggerRef.current ?? undefined, - }); - - for await (const chunk of stream) { + // Select streaming function based on protocol + if (protocol === 'A2A') { + setA2aStatus(null); + } + const streamFn = + protocol === 'A2A' + ? invokeA2AStreaming({ + port: actualPort, + message, + logger: loggerRef.current ?? undefined, + onStatus: setA2aStatus, + }) + : invokeAgentStreaming({ port: actualPort, message, logger: loggerRef.current ?? undefined }); + + for await (const chunk of streamFn) { responseContent += chunk; setStreamingResponse(responseContent); } @@ -217,7 +336,7 @@ export function useDevServer(options: { workingDir: string; port: number; agentN setStreamingResponse(null); // Log final response to file - loggerRef.current?.log('system', `→ ${message}`); + loggerRef.current?.log('system', `\u2192 ${message}`); loggerRef.current?.log('response', responseContent); } catch (err) { const rawMsg = err instanceof Error ? err.message : 'Unknown error'; @@ -249,6 +368,7 @@ export function useDevServer(options: { workingDir: string; port: number; agentN setStreamingResponse(null); } finally { setIsStreaming(false); + setA2aStatus(null); } }; @@ -276,6 +396,13 @@ export function useDevServer(options: { workingDir: string; port: number; agentN setStreamingResponse(null); }; + const showMcpHint = () => { + const tools = mcpToolsRef.current; + if (tools.length > 0) { + setConversation(prev => [...prev, { role: 'assistant', content: formatMcpToolList(tools), isHint: true }]); + } + }; + return { logs, status, @@ -294,5 +421,12 @@ export function useDevServer(options: { workingDir: string; port: number; agentN hasMemory: (project?.memories?.length ?? 0) > 0, hasVpc: project?.agents.find(a => a.name === config?.agentName)?.networkMode === 'VPC', modelProvider: project?.agents.find(a => a.name === config?.agentName)?.modelProvider, + protocol, + mcpTools, + fetchMcpTools, + showMcpHint, + a2aAgentCard, + a2aStatus, + fetchAgentCard, }; } diff --git a/src/cli/tui/screens/dev/DevScreen.tsx b/src/cli/tui/screens/dev/DevScreen.tsx index 307886f8..68086cbe 100644 --- a/src/cli/tui/screens/dev/DevScreen.tsx +++ b/src/cli/tui/screens/dev/DevScreen.tsx @@ -1,5 +1,5 @@ import type { AgentEnvSpec } from '../../../../schema'; -import { getDevSupportedAgents, loadProjectConfig } from '../../../operations/dev'; +import { getDevSupportedAgents, getEndpointUrl, loadProjectConfig } from '../../../operations/dev'; import { GradientText, LogLink, Panel, Screen, SelectList, TextInput } from '../../components'; import { type ConversationMessage, useDevServer } from '../../hooks/useDevServer'; import { Box, Text, useInput, useStdout } from 'ink'; @@ -110,6 +110,9 @@ function wrapColoredLines(lines: ColoredLine[], maxWidth: number): ColoredLine[] return wrapped; } +/** Max tools to show in header before truncating */ +const MAX_VISIBLE_TOOLS = 5; + export function DevScreen(props: DevScreenProps) { const [mode, setMode] = useState('select-agent'); const [isExiting, setIsExiting] = useState(false); @@ -179,6 +182,13 @@ export function DevScreen(props: DevScreenProps) { hasMemory, hasVpc, modelProvider, + protocol, + mcpTools, + fetchMcpTools, + showMcpHint, + a2aAgentCard, + a2aStatus, + fetchAgentCard, } = useDevServer({ workingDir, port: props.port ?? 8080, @@ -186,6 +196,34 @@ export function DevScreen(props: DevScreenProps) { onReady: onServerReady, }); + // MCP: auto-list tools when server becomes ready, show hint in conversation + const mcpFetchTriggeredRef = useRef(false); + const [mcpToolsFetched, setMcpToolsFetched] = useState(false); + useEffect(() => { + if (protocol === 'MCP' && status === 'running' && !mcpFetchTriggeredRef.current) { + mcpFetchTriggeredRef.current = true; + void fetchMcpTools().then(() => { + setMcpToolsFetched(true); + showMcpHint(); + }); + } + if (status === 'starting') { + mcpFetchTriggeredRef.current = false; + } + }, [protocol, status, fetchMcpTools, showMcpHint]); + + // A2A: auto-fetch agent card when server becomes ready + const a2aFetchTriggeredRef = useRef(false); + useEffect(() => { + if (protocol === 'A2A' && status === 'running' && !a2aFetchTriggeredRef.current) { + a2aFetchTriggeredRef.current = true; + void fetchAgentCard(); + } + if (status === 'starting') { + a2aFetchTriggeredRef.current = false; + } + }, [protocol, status, fetchAgentCard]); + // Handle exit with brief "stopping" message const handleExit = useCallback(() => { if (isExiting) return; // Prevent double-exit @@ -196,12 +234,21 @@ export function DevScreen(props: DevScreenProps) { }, 1000); }, [props, stop, isExiting]); + const isMcp = protocol === 'MCP'; + // Calculate available height for conversation display const terminalHeight = stdout?.rows ?? 24; const terminalWidth = stdout?.columns ?? 80; // Reserve lines for: header (4-5), help text (1), input area when active (2), margins + // MCP needs extra header space for the tool list + // +1 for "Tools (N):" header, +1 for "... and X more" if truncated + const visibleToolCount = Math.min(mcpTools.length, MAX_VISIBLE_TOOLS); + const mcpToolHeaderLines = + isMcp && mcpTools.length > 0 ? visibleToolCount + 1 + (mcpTools.length > MAX_VISIBLE_TOOLS ? 1 : 0) + 1 : 0; + // A2A agent card takes ~3 lines (name, description, skills) + const a2aCardHeaderLines = protocol === 'A2A' && a2aAgentCard ? 3 : 0; // Reduce height when in input mode to make room for input field - const baseHeight = Math.max(5, terminalHeight - 12); + const baseHeight = Math.max(5, terminalHeight - 12 - mcpToolHeaderLines - a2aCardHeaderLines); const displayHeight = mode === 'input' ? Math.max(3, baseHeight - 2) : baseHeight; const contentWidth = Math.max(40, terminalWidth - 4); @@ -369,18 +416,23 @@ export function DevScreen(props: DevScreenProps) { const visibleLines = lines.slice(effectiveOffset, effectiveOffset + displayHeight); // Dynamic help text + const backOrQuit = supportedAgents.length > 1 ? 'Esc back' : 'Esc quit'; const helpText = mode === 'select-agent' ? '↑↓ select · Enter confirm · q quit' : mode === 'input' - ? 'Enter send · Esc cancel' + ? isMcp + ? 'Enter send · Esc cancel · "list" to refresh tools' + : 'Enter send · Esc cancel' : status === 'starting' - ? `${supportedAgents.length > 1 ? 'Esc back' : 'Esc quit'}` + ? backOrQuit : isStreaming ? '↑↓ scroll' : conversation.length > 0 - ? `↑↓ scroll · Enter invoke · C clear · Ctrl+R restart · ${supportedAgents.length > 1 ? 'Esc back' : 'Esc quit'}` - : `Enter to send a message · Ctrl+R restart · ${supportedAgents.length > 1 ? 'Esc back' : 'Esc quit'}`; + ? `↑↓ scroll · Enter invoke · C clear · Ctrl+R restart · ${backOrQuit}` + : isMcp + ? `Enter to call a tool · Ctrl+R restart · ${backOrQuit}` + : `Enter to send a message · Ctrl+R restart · ${backOrQuit}`; // Agent selection screen if (mode === 'select-agent') { @@ -399,13 +451,21 @@ export function DevScreen(props: DevScreenProps) { ); } + const endpointUrl = getEndpointUrl(actualPort, protocol); + const headerContent = ( Agent: {config?.agentName} - {modelProvider && ( + {protocol !== 'HTTP' && ( + + Protocol: + {protocol} + + )} + {protocol !== 'MCP' && modelProvider && ( Provider: {modelProvider} @@ -413,7 +473,7 @@ export function DevScreen(props: DevScreenProps) { )} Server: - http://localhost:{actualPort}/invocations + {endpointUrl} {!isExiting && ( @@ -442,7 +502,7 @@ export function DevScreen(props: DevScreenProps) { )} {logFilePath && } - {hasMemory && ( + {protocol !== 'MCP' && hasMemory && ( AgentCore memory is not available when running locally. To test memory, deploy and use invoke. @@ -453,6 +513,32 @@ export function DevScreen(props: DevScreenProps) { deployed environment. )} + {protocol === 'MCP' && status === 'running' && mcpTools.length > 0 && ( + + Tools ({mcpTools.length}): + {mcpTools.slice(0, MAX_VISIBLE_TOOLS).map(t => ( + + {t.name} + {t.description && — {t.description}} + + ))} + {mcpTools.length > MAX_VISIBLE_TOOLS && ( + {` ... and ${mcpTools.length - MAX_VISIBLE_TOOLS} more (type "list" to see all)`} + )} + + )} + {protocol === 'MCP' && status === 'running' && mcpTools.length === 0 && mcpToolsFetched && ( + No tools available. + )} + {protocol === 'A2A' && status === 'running' && a2aAgentCard && ( + + {a2aAgentCard.name ?? 'A2A Agent'} + {a2aAgentCard.description && {a2aAgentCard.description}} + {a2aAgentCard.skills && a2aAgentCard.skills.length > 0 && ( + {` Skills: ${a2aAgentCard.skills.map(s => s.name ?? s.id).join(', ')}`} + )} + + )} ); @@ -467,8 +553,12 @@ export function DevScreen(props: DevScreenProps) { {line.text || ' '} ))} - {/* Thinking indicator - shows while waiting for response to start */} - {isStreaming && !streamingResponse && } + {/* Thinking/status indicator - shows while waiting for response to start */} + {isStreaming && !streamingResponse && ( + + )} )} @@ -493,6 +583,7 @@ export function DevScreen(props: DevScreenProps) { { if (text.trim()) { void handleInvoke(text); diff --git a/src/cli/tui/screens/invoke/InvokeScreen.tsx b/src/cli/tui/screens/invoke/InvokeScreen.tsx index e6ddab27..71410054 100644 --- a/src/cli/tui/screens/invoke/InvokeScreen.tsx +++ b/src/cli/tui/screens/invoke/InvokeScreen.tsx @@ -18,7 +18,7 @@ type Mode = 'select-agent' | 'chat' | 'input'; /** * Render conversation messages as a single string for scrolling. */ -function formatConversation(messages: { role: 'user' | 'assistant'; content: string }[]): string { +function formatConversation(messages: { role: 'user' | 'assistant'; content: string; isHint?: boolean }[]): string { const lines: string[] = []; for (const msg of messages) { @@ -106,15 +106,18 @@ export function InvokeScreen({ logFilePath, sessionId, userId, + mcpToolsFetched, selectAgent, invoke, newSession, + fetchMcpTools, } = useInvokeFlow({ initialSessionId, initialUserId }); const [mode, setMode] = useState('select-agent'); const [scrollOffset, setScrollOffset] = useState(0); const [userScrolled, setUserScrolled] = useState(false); const { stdout } = useStdout(); const justCancelledRef = useRef(false); + const mcpFetchTriggeredRef = useRef(false); // Handle initial prompt - skip agent selection if only one agent useEffect(() => { @@ -138,6 +141,15 @@ export function InvokeScreen({ } }, [initialPrompt, phase, messages.length, onExit]); + // MCP: auto-list tools when agent is selected and ready, show hint after fetch + useEffect(() => { + const agent = config?.agents[selectedAgent]; + if (agent?.protocol === 'MCP' && phase === 'ready' && mode !== 'select-agent' && !mcpFetchTriggeredRef.current) { + mcpFetchTriggeredRef.current = true; + void fetchMcpTools(); + } + }, [config, selectedAgent, phase, mode, fetchMcpTools]); + // Return to input mode after invoke completes const prevPhaseRef = useRef(phase); useEffect(() => { @@ -278,23 +290,32 @@ export function InvokeScreen({ agentName: agent.name, }) : undefined; + const agentProtocol = agent?.protocol ?? 'HTTP'; + const agentItems = config.agents.map((a, i) => ({ id: String(i), title: a.name, - description: `Runtime: ${a.state.runtimeId}`, + description: `${a.protocol && a.protocol !== 'HTTP' ? `${a.protocol} · ` : ''}Runtime: ${a.state.runtimeId}`, })); + const isMcp = agentProtocol === 'MCP'; + // Dynamic help text + const backOrQuit = config.agents.length > 1 ? 'Esc back' : 'Esc quit'; const helpText = mode === 'select-agent' ? '↑↓ select · Enter confirm · Esc quit' : mode === 'input' - ? 'Enter send · Esc cancel' + ? isMcp + ? 'Enter send · Esc cancel · "list" to refresh tools' + : 'Enter send · Esc cancel' : phase === 'invoking' ? '↑↓ scroll' : messages.length > 0 - ? `↑↓ scroll · Enter invoke · N new session · ${config.agents.length > 1 ? 'Esc back' : 'Esc quit'}` - : `Enter to send a message · ${config.agents.length > 1 ? 'Esc back' : 'Esc quit'}`; + ? `↑↓ scroll · Enter invoke · N new session · ${backOrQuit}` + : isMcp + ? `Enter to call a tool · N new session · ${backOrQuit}` + : `Enter to send a message · ${backOrQuit}`; const headerContent = ( @@ -308,6 +329,12 @@ export function InvokeScreen({ {agent?.name} )} + {mode !== 'select-agent' && agentProtocol !== 'HTTP' && ( + + Protocol: + {agentProtocol} + + )} {mode !== 'select-agent' && agent?.modelProvider && ( Provider: @@ -391,13 +418,18 @@ export function InvokeScreen({ )} {/* Input area */} + {/* MCP: show loading indicator while fetching tools */} + {isMcp && !mcpToolsFetched && phase === 'ready' && messages.length === 0 && ( + + )} + {mode === 'chat' && phase === 'ready' && messages.length > 0 && ( > )} - {mode === 'chat' && phase === 'ready' && messages.length === 0 && ( - Press Enter to send a message + {mode === 'chat' && phase === 'ready' && messages.length === 0 && (!isMcp || mcpToolsFetched) && ( + {isMcp ? 'Press Enter to call a tool' : 'Press Enter to send a message'} )} {mode === 'input' && phase === 'ready' && ( @@ -405,6 +437,9 @@ export function InvokeScreen({ { if (text.trim()) { setMode('chat'); diff --git a/src/cli/tui/screens/invoke/useInvokeFlow.ts b/src/cli/tui/screens/invoke/useInvokeFlow.ts index 97950fc8..0b2531c1 100644 --- a/src/cli/tui/screens/invoke/useInvokeFlow.ts +++ b/src/cli/tui/screens/invoke/useInvokeFlow.ts @@ -4,16 +4,31 @@ import type { AwsDeploymentTarget, ModelProvider, NetworkMode, + ProtocolMode, AgentCoreProjectSpec as _AgentCoreProjectSpec, } from '../../../../schema'; -import { DEFAULT_RUNTIME_USER_ID, invokeAgentRuntimeStreaming } from '../../../aws'; +import { + DEFAULT_RUNTIME_USER_ID, + type McpToolDef, + invokeA2ARuntime, + invokeAgentRuntimeStreaming, + mcpCallTool, + mcpListTools, +} from '../../../aws'; import { getErrorMessage } from '../../../errors'; import { InvokeLogger } from '../../../logging'; +import { formatMcpToolList } from '../../../operations/dev/utils'; import { generateSessionId } from '../../../operations/session'; import { useCallback, useEffect, useRef, useState } from 'react'; export interface InvokeConfig { - agents: { name: string; state: AgentCoreDeployedState; modelProvider?: ModelProvider; networkMode?: NetworkMode }[]; + agents: { + name: string; + state: AgentCoreDeployedState; + modelProvider?: ModelProvider; + networkMode?: NetworkMode; + protocol?: ProtocolMode; + }[]; target: AwsDeploymentTarget; targetName: string; projectName: string; @@ -28,15 +43,18 @@ export interface InvokeFlowState { phase: 'loading' | 'ready' | 'invoking' | 'error'; config: InvokeConfig | null; selectedAgent: number; - messages: { role: 'user' | 'assistant'; content: string }[]; + messages: { role: 'user' | 'assistant'; content: string; isHint?: boolean }[]; error: string | null; logFilePath: string | null; sessionId: string | null; userId: string; + mcpTools: McpToolDef[]; + mcpToolsFetched: boolean; selectAgent: (index: number) => void; setUserId: (id: string) => void; invoke: (prompt: string) => Promise; newSession: () => void; + fetchMcpTools: () => Promise; } export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState { @@ -44,12 +62,18 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState const [phase, setPhase] = useState<'loading' | 'ready' | 'invoking' | 'error'>('loading'); const [config, setConfig] = useState(null); const [selectedAgent, setSelectedAgent] = useState(0); - const [messages, setMessages] = useState<{ role: 'user' | 'assistant'; content: string }[]>([]); + const [messages, setMessages] = useState<{ role: 'user' | 'assistant'; content: string; isHint?: boolean }[]>([]); const [error, setError] = useState(null); const [logFilePath, setLogFilePath] = useState(null); const [sessionId, setSessionId] = useState(null); const [userId, setUserId] = useState(initialUserId ?? DEFAULT_RUNTIME_USER_ID); + // MCP state + const [mcpTools, setMcpTools] = useState([]); + const [mcpToolsFetched, setMcpToolsFetched] = useState(false); + const mcpToolsRef = useRef([]); + const mcpSessionIdRef = useRef(undefined); + // Persistent logger for the session const loggerRef = useRef(null); @@ -82,14 +106,14 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState const agents: InvokeConfig['agents'] = []; for (const agent of project.agents) { const state = targetState?.resources?.agents?.[agent.name]; - if (state) { - agents.push({ - name: agent.name, - state, - modelProvider: agent.modelProvider, - networkMode: agent.networkMode, - }); - } + if (!state) continue; + agents.push({ + name: agent.name, + state, + modelProvider: agent.modelProvider, + networkMode: agent.networkMode, + protocol: agent.protocol, + }); } if (agents.length === 0) { @@ -102,10 +126,8 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState // Initialize session ID - always generate fresh unless explicitly provided if (initialSessionId) { - // Use provided session ID from --session-id flag setSessionId(initialSessionId); } else { - // Always generate a new session for fresh invocations const newId = generateSessionId(); setSessionId(newId); } @@ -119,6 +141,40 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState void load(); }, [initialSessionId]); + const getMcpInvokeOptions = useCallback(() => { + if (!config) return null; + const agent = config.agents[selectedAgent]; + if (!agent) return null; + return { + region: config.target.region, + runtimeArn: agent.state.runtimeArn, + userId, + mcpSessionId: mcpSessionIdRef.current, + }; + }, [config, selectedAgent, userId]); + + const fetchMcpTools = useCallback(async () => { + const opts = getMcpInvokeOptions(); + if (!opts) return; + + try { + const result = await mcpListTools(opts); + setMcpTools(result.tools); + mcpToolsRef.current = result.tools; + mcpSessionIdRef.current = result.mcpSessionId; + setMcpToolsFetched(true); + if (result.tools.length > 0) { + setMessages(prev => [...prev, { role: 'assistant', content: formatMcpToolList(result.tools), isHint: true }]); + } + } catch (err) { + const errMsg = getErrorMessage(err); + setMessages(prev => [...prev, { role: 'assistant', content: `Failed to list tools: ${errMsg}` }]); + setMcpTools([]); + mcpToolsRef.current = []; + setMcpToolsFetched(true); + } + }, [getMcpInvokeOptions]); + // Track current streaming content to avoid stale closure issues const streamingContentRef = useRef(''); @@ -129,6 +185,8 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState const agent = config.agents[selectedAgent]; if (!agent) return; + const isMcp = agent.protocol === 'MCP'; + // Create logger on first invoke or if agent changed if (!loggerRef.current) { loggerRef.current = new InvokeLogger({ @@ -137,13 +195,73 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState region: config.target.region, sessionId: sessionId ?? undefined, }); - // Store the absolute path for the LogLink component setLogFilePath(loggerRef.current.getAbsoluteLogPath()); } const logger = loggerRef.current; - // Append new user message and empty assistant message (conversation history is preserved) + // MCP: handle tool calls + if (isMcp) { + // "list" refreshes the tool list + if (prompt.trim().toLowerCase() === 'list') { + setMessages(prev => [...prev, { role: 'user', content: prompt }]); + setPhase('invoking'); + await fetchMcpTools(); + setPhase('ready'); + return; + } + + // Parse "tool_name {json_args}" or just "tool_name" + const match = /^(\S+)\s*(.*)/.exec(prompt); + if (!match) return; + const toolName = match[1]!; + const argsStr = match[2]?.trim() ?? ''; + + setMessages(prev => [...prev, { role: 'user', content: prompt }, { role: 'assistant', content: '' }]); + setPhase('invoking'); + + logger.logPrompt(`MCP tools/call: ${toolName}(${argsStr})`, sessionId ?? undefined, userId); + + try { + let args: Record = {}; + if (argsStr) { + args = JSON.parse(argsStr) as Record; + } + const opts = getMcpInvokeOptions(); + if (!opts) throw new Error('No agent config available'); + + const result = await mcpCallTool(opts, toolName, args); + + setMessages(prev => { + const updated = [...prev]; + const lastIdx = updated.length - 1; + if (lastIdx >= 0 && updated[lastIdx]?.role === 'assistant') { + updated[lastIdx] = { role: 'assistant', content: `Result: ${result}` }; + } + return updated; + }); + + logger.logResponse(result); + } catch (err) { + const errMsg = getErrorMessage(err); + logger.logError(err, 'MCP call failed'); + + setMessages(prev => { + const updated = [...prev]; + const lastIdx = updated.length - 1; + if (lastIdx >= 0 && updated[lastIdx]?.role === 'assistant') { + updated[lastIdx] = { role: 'assistant', content: `Error: ${errMsg}` }; + } + return updated; + }); + } + + setPhase('ready'); + return; + } + + // HTTP / A2A: streaming invoke + const isA2A = agent.protocol === 'A2A'; setMessages(prev => [...prev, { role: 'user', content: prompt }, { role: 'assistant', content: '' }]); setPhase('invoking'); streamingContentRef.current = ''; @@ -151,16 +269,20 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState logger.logPrompt(prompt, sessionId ?? undefined, userId); try { - const result = await invokeAgentRuntimeStreaming({ - region: config.target.region, - runtimeArn: agent.state.runtimeArn, - payload: prompt, - sessionId: sessionId ?? undefined, - userId, - logger, // Pass logger for SSE event debugging - }); + const result = isA2A + ? await invokeA2ARuntime( + { region: config.target.region, runtimeArn: agent.state.runtimeArn, userId, logger }, + prompt + ) + : await invokeAgentRuntimeStreaming({ + region: config.target.region, + runtimeArn: agent.state.runtimeArn, + payload: prompt, + sessionId: sessionId ?? undefined, + userId, + logger, + }); - // Update session ID from response if available (for logging purposes) if (result.sessionId) { setSessionId(result.sessionId); logger.updateSessionId(result.sessionId); @@ -169,7 +291,6 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState for await (const chunk of result.stream) { streamingContentRef.current += chunk; const currentContent = streamingContentRef.current; - // Update the last message (assistant) with accumulated content setMessages(prev => { const updated = [...prev]; const lastIdx = updated.length - 1; @@ -187,7 +308,6 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState const errMsg = getErrorMessage(err); logger.logError(err, 'invoke streaming failed'); - // Update the last message with error setMessages(prev => { const updated = [...prev]; const lastIdx = updated.length - 1; @@ -199,13 +319,18 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState setPhase('ready'); } }, - [config, selectedAgent, phase, sessionId, userId] + [config, selectedAgent, phase, sessionId, userId, fetchMcpTools, getMcpInvokeOptions] ); const newSession = useCallback(() => { const newId = generateSessionId(); setSessionId(newId); setMessages([]); + // Reset MCP session + mcpSessionIdRef.current = undefined; + setMcpTools([]); + mcpToolsRef.current = []; + setMcpToolsFetched(false); }, []); return { @@ -217,9 +342,12 @@ export function useInvokeFlow(options: InvokeFlowOptions = {}): InvokeFlowState logFilePath, sessionId, userId, + mcpTools, + mcpToolsFetched, selectAgent: setSelectedAgent, setUserId, invoke, newSession, + fetchMcpTools, }; } diff --git a/src/lib/utils/__tests__/json-rpc.test.ts b/src/lib/utils/__tests__/json-rpc.test.ts new file mode 100644 index 00000000..0aa69748 --- /dev/null +++ b/src/lib/utils/__tests__/json-rpc.test.ts @@ -0,0 +1,49 @@ +import { parseJsonRpcResponse } from '../json-rpc'; +import { describe, expect, it } from 'vitest'; + +describe('parseJsonRpcResponse', () => { + it('parses plain JSON response', () => { + const result = parseJsonRpcResponse('{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}'); + expect(result).toEqual({ jsonrpc: '2.0', id: 1, result: { tools: [] } }); + }); + + it('parses SSE-wrapped JSON-RPC response', () => { + const result = parseJsonRpcResponse('data: {"jsonrpc":"2.0","id":1,"result":{"ok":true}}'); + expect(result).toEqual({ jsonrpc: '2.0', id: 1, result: { ok: true } }); + }); + + it('uses last valid SSE data line', () => { + const text = 'data: {"partial":true}\ndata: {"jsonrpc":"2.0","id":1,"result":{}}'; + const result = parseJsonRpcResponse(text); + expect(result).toEqual({ jsonrpc: '2.0', id: 1, result: {} }); + }); + + it('handles whitespace around response', () => { + const result = parseJsonRpcResponse(' {"result": "ok"} \n'); + expect(result).toEqual({ result: 'ok' }); + }); + + it('parses JSON-RPC error response', () => { + const result = parseJsonRpcResponse('{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"Bad request"}}'); + expect(result.error).toEqual({ code: -32600, message: 'Bad request' }); + }); + + it('throws on HTML error page', () => { + expect(() => parseJsonRpcResponse('500 Error')).toThrow( + 'Failed to parse JSON-RPC response' + ); + }); + + it('throws on empty string', () => { + expect(() => parseJsonRpcResponse('')).toThrow('Failed to parse JSON-RPC response'); + }); + + it('throws on non-JSON non-SSE text', () => { + expect(() => parseJsonRpcResponse('this is not json or sse')).toThrow('Failed to parse JSON-RPC response'); + }); + + it('truncates long text in error message', () => { + const longText = 'x'.repeat(300); + expect(() => parseJsonRpcResponse(longText)).toThrow(/x{200}/); + }); +}); diff --git a/src/lib/utils/index.ts b/src/lib/utils/index.ts index 692ded70..d595d9bf 100644 --- a/src/lib/utils/index.ts +++ b/src/lib/utils/index.ts @@ -10,4 +10,5 @@ export { type SubprocessResult, } from './subprocess'; export { parseTimeString } from './time-parser'; +export { parseJsonRpcResponse } from './json-rpc'; export { validateAgentSchema, validateProjectSchema } from './zod'; diff --git a/src/lib/utils/json-rpc.ts b/src/lib/utils/json-rpc.ts new file mode 100644 index 00000000..315c013f --- /dev/null +++ b/src/lib/utils/json-rpc.ts @@ -0,0 +1,24 @@ +/** Parse a JSON-RPC response, handling both plain JSON and SSE-wrapped formats. Throws if no valid response is found. */ +export function parseJsonRpcResponse(text: string): Record { + const trimmed = text.trim(); + + try { + return JSON.parse(trimmed) as Record; + } catch { + // Might be SSE format + } + + const lines = trimmed.split('\n'); + for (let i = lines.length - 1; i >= 0; i--) { + const line = lines[i]!; + if (line.startsWith('data: ')) { + try { + return JSON.parse(line.slice(6)) as Record; + } catch { + continue; + } + } + } + + throw new Error(`Failed to parse JSON-RPC response: ${trimmed.slice(0, 200)}`); +} diff --git a/src/schema/schemas/__tests__/agent-env.test.ts b/src/schema/schemas/__tests__/agent-env.test.ts index 621a1743..3240dcf0 100644 --- a/src/schema/schemas/__tests__/agent-env.test.ts +++ b/src/schema/schemas/__tests__/agent-env.test.ts @@ -296,9 +296,9 @@ describe('AgentEnvSpecSchema', () => { expect(result.success, `Should accept protocol ${mode}`).toBe(true); }); - it('rejects agent without protocol', () => { + it('accepts agent without protocol (backwards compat)', () => { const { protocol: _protocol, ...agentWithoutProtocol } = { ...validPythonAgent, protocol: undefined }; - expect(AgentEnvSpecSchema.safeParse(agentWithoutProtocol).success).toBe(false); + expect(AgentEnvSpecSchema.safeParse(agentWithoutProtocol).success).toBe(true); }); it('rejects invalid protocol', () => { diff --git a/src/schema/schemas/agent-env.ts b/src/schema/schemas/agent-env.ts index 2fb6a09a..0d79d474 100644 --- a/src/schema/schemas/agent-env.ts +++ b/src/schema/schemas/agent-env.ts @@ -143,7 +143,7 @@ export const AgentEnvSpecSchema = z /** Model provider used by this agent. Optional for backwards compatibility. */ modelProvider: ModelProviderSchema.optional(), /** Protocol for the runtime (HTTP, MCP, A2A). */ - protocol: ProtocolModeSchema, + protocol: ProtocolModeSchema.optional(), }) .superRefine((data, ctx) => { if (data.networkMode === 'VPC' && !data.networkConfig) {