diff --git a/frontend/bun.lock b/frontend/bun.lock index 44ddea6656..23a9b389c7 100644 --- a/frontend/bun.lock +++ b/frontend/bun.lock @@ -4,7 +4,7 @@ "workspaces": { "": { "dependencies": { - "@a2a-js/sdk": "^0.3.5", + "@a2a-js/sdk": "^0.3.10", "@autoform/react": "^4.0.0", "@autoform/zod": "^5.0.0", "@buf/redpandadata_cloud.connectrpc_query-es": "^2.2.0-20251128173054-b9f9fc6e5a70.1", @@ -150,7 +150,7 @@ "elliptic": "^6.6.1", }, "packages": { - "@a2a-js/sdk": ["@a2a-js/sdk@0.3.5", "", { "dependencies": { "uuid": "^11.1.0" }, "peerDependencies": { "express": "^4.21.2 || ^5.1.0" }, "optionalPeers": ["express"] }, "sha512-6xAApkiss2aCbJXmXLC845tifcbYJ/R4Dj22kQsOaanMbf9bvkYhebDEuYPAIu3aaR5MWaBqG7OCK3IF8dqZZQ=="], + "@a2a-js/sdk": ["@a2a-js/sdk@0.3.10", "", { "dependencies": { "uuid": "^11.1.0" }, "peerDependencies": { "@bufbuild/protobuf": "^2.10.2", "@grpc/grpc-js": "^1.11.0", "express": "^4.21.2 || ^5.1.0" }, "optionalPeers": ["@bufbuild/protobuf", "@grpc/grpc-js", "express"] }, "sha512-t6w5ctnwJkSOMRl6M9rn95C1FTHCPqixxMR0yWXtzhZXEnF6mF1NAK0CfKlG3cz+tcwTxkmn287QZC3t9XPgrA=="], "@adobe/css-tools": ["@adobe/css-tools@4.4.4", "", {}, "sha512-Elp+iwUx5rN5+Y8xLt5/GRoG20WGoDCQ/1Fb+1LiGtvwbDavuSk0jhD/eZdckHAuzcDzccnkv+rEjyWfRx18gg=="], diff --git a/frontend/package.json b/frontend/package.json index 4dba7c890c..555b23b1b8 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -44,7 +44,7 @@ "elliptic": "^6.6.1" }, "dependencies": { - "@a2a-js/sdk": "^0.3.5", + "@a2a-js/sdk": "^0.3.10", "@autoform/react": "^4.0.0", "@autoform/zod": "^5.0.0", "@buf/redpandadata_cloud.connectrpc_query-es": "^2.2.0-20251128173054-b9f9fc6e5a70.1", diff --git a/frontend/src/components/pages/agents/details/a2a/chat/ai-agent-chat.tsx b/frontend/src/components/pages/agents/details/a2a/chat/ai-agent-chat.tsx index 6f5c36cc15..ca27bdaf92 100644 --- a/frontend/src/components/pages/agents/details/a2a/chat/ai-agent-chat.tsx +++ b/frontend/src/components/pages/agents/details/a2a/chat/ai-agent-chat.tsx @@ -30,7 +30,7 @@ export const AIAgentChat = ({ agent, headerActions }: AIAgentChatProps) => { const containerRef = useRef(null); // Manage chat messages and context - const { messages, setMessages, contextId, setContextSeed, isLoadingHistory } = useChatMessages(agent.id); + const { messages, setMessages, contextId, setContextSeed, isLoadingHistory } = useChatMessages(agent.id, agent.url); // Manage chat actions (submit, clear, cancel) // Pass agent.url directly so the A2A client can try multiple agent card URLs @@ -88,9 +88,9 @@ export const AIAgentChat = ({ agent, headerActions }: AIAgentChatProps) => { return (
{/* Context ID header */} - {Boolean(contextId) && ( -
-
+
+
+ {messages.length > 0 ? (
@@ -109,10 +109,12 @@ export const AIAgentChat = ({ agent, headerActions }: AIAgentChatProps) => { />
- {headerActions} -
+ ) : ( +
+ )} + {headerActions}
- )} +
diff --git a/frontend/src/components/pages/agents/details/a2a/chat/components/chat-message.tsx b/frontend/src/components/pages/agents/details/a2a/chat/components/chat-message.tsx index eb591bb45b..0a9755d790 100644 --- a/frontend/src/components/pages/agents/details/a2a/chat/components/chat-message.tsx +++ b/frontend/src/components/pages/agents/details/a2a/chat/components/chat-message.tsx @@ -14,6 +14,7 @@ import { Message, MessageBody, MessageContent, MessageMetadata } from 'component import { ChatMessageActions } from './chat-message-actions'; import { A2AErrorBlock } from './message-blocks/a2a-error-block'; import { ArtifactBlock } from './message-blocks/artifact-block'; +import { ConnectionStatusBlock } from './message-blocks/connection-status-block'; import { TaskStatusUpdateBlock } from './message-blocks/task-status-update-block'; import { ToolBlock } from './message-blocks/tool-block'; import { UserMessageContent } from './message-content/user-message-content'; @@ -105,6 +106,16 @@ export const ChatMessage = ({ message, isLoading: _isLoading }: ChatMessageProps ); case 'a2a-error': return ; + case 'connection-status': + return ( + + ); default: return null; } diff --git a/frontend/src/components/pages/agents/details/a2a/chat/components/message-blocks/connection-status-block.tsx b/frontend/src/components/pages/agents/details/a2a/chat/components/message-blocks/connection-status-block.tsx new file mode 100644 index 0000000000..f96f4f2b2f --- /dev/null +++ b/frontend/src/components/pages/agents/details/a2a/chat/components/message-blocks/connection-status-block.tsx @@ -0,0 +1,76 @@ +/** + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import { Alert, AlertDescription, AlertTitle } from 'components/redpanda-ui/components/alert'; +import { Button } from 'components/redpanda-ui/components/button'; +import { Text } from 'components/redpanda-ui/components/typography'; +import { AlertCircleIcon, LoaderCircleIcon, RefreshCwIcon, WifiIcon, WifiOffIcon } from 'lucide-react'; + +type ConnectionStatusBlockProps = { + status: 'disconnected' | 'reconnecting' | 'reconnected' | 'gave-up'; + attempt?: number; + maxAttempts?: number; + timestamp: Date; +}; + +export const ConnectionStatusBlock = ({ status, attempt, maxAttempts, timestamp }: ConnectionStatusBlockProps) => { + const time = timestamp.toLocaleTimeString(undefined, { + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + }); + + if (status === 'reconnected') { + return ( +
+ + Reconnected at {time} +
+ ); + } + + if (status === 'disconnected' || status === 'reconnecting') { + const label = + status === 'reconnecting' && attempt + ? `Reconnecting... (attempt ${attempt} of ${maxAttempts ?? '?'})` + : 'Connection lost, attempting to reconnect...'; + + return ( + } variant="warning"> + + {label} + + + + + The agent task is still running. Trying to re-establish the event stream. + + + + ); + } + + // gave-up + return ( + } variant="destructive"> + Connection lost + + + Unable to reconnect after {maxAttempts ?? '?'} attempts. The agent task may still be running server-side. + + + + + ); +}; diff --git a/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-chat-messages.ts b/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-chat-messages.ts index a76945bbab..25d5d16c0e 100644 --- a/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-chat-messages.ts +++ b/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-chat-messages.ts @@ -27,7 +27,7 @@ type UseChatMessagesResult = { /** * Hook to manage chat messages and context */ -export const useChatMessages = (agentId: string): UseChatMessagesResult => { +export const useChatMessages = (agentId: string, agentCardUrl: string): UseChatMessagesResult => { // Use a stable contextSeed - only generate once per agent, persists across reloads // Only changes when user explicitly clears chat via setContextSeed const [contextSeed, setContextSeed] = useState(() => { @@ -58,7 +58,7 @@ export const useChatMessages = (agentId: string): UseChatMessagesResult => { async function loadChatMessages() { setIsLoadingHistory(true); try { - const loadedMessages = await loadMessages(agentId, contextId); + const loadedMessages = await loadMessages(agentId, contextId, agentCardUrl); setMessages(loadedMessages); } catch { // Error loading messages - silently fail and show empty state @@ -68,7 +68,7 @@ export const useChatMessages = (agentId: string): UseChatMessagesResult => { } loadChatMessages(); - }, [agentId, contextId]); + }, [agentId, contextId, agentCardUrl]); return { messages, diff --git a/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.test.ts b/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.test.ts new file mode 100644 index 0000000000..c2f207a6ad --- /dev/null +++ b/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.test.ts @@ -0,0 +1,1013 @@ +/** + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import type { TaskState, TaskStatusUpdateEvent } from '@a2a-js/sdk'; +import { afterEach, beforeEach, describe, expect, vi } from 'vitest'; + +import { parseA2AError, streamMessage } from './use-message-streaming'; +import type { ContentBlock } from '../types'; +import { updateMessage } from '../utils/database-operations'; + +// --------------------------------------------------------------------------- +// Module mocks +// --------------------------------------------------------------------------- + +vi.mock('config', () => ({ + config: { jwt: 'test-jwt' }, +})); + +vi.mock('../../a2a-provider', () => ({ + a2a: vi.fn(() => ({ modelId: 'mock-model', provider: 'a2a' })), +})); + +vi.mock('../utils/database-operations', () => ({ + saveMessage: vi.fn(async () => undefined), + updateMessage: vi.fn(async () => undefined), +})); + +// We need fine-grained control over streamText per test, so we use a +// module-level factory that each test can override. +let streamTextImpl: (...args: unknown[]) => unknown; + +vi.mock('ai', () => ({ + streamText: (...args: unknown[]) => streamTextImpl(...args), +})); + +let createA2AClientImpl: (...args: unknown[]) => unknown; + +vi.mock('../utils/a2a-client', () => ({ + createA2AClient: (...args: unknown[]) => createA2AClientImpl(...args), +})); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** + * Build a realistic fullStream async iterable that emits A2A SDK events + * wrapped in the AI SDK's streaming protocol, then optionally throws. + */ +function buildFullStream(events: Array<{ type: string; [k: string]: unknown }>, crashAfter?: { error: Error }) { + // biome-ignore lint/suspicious/useAwait: async required for AsyncGenerator return type + return (async function* () { + for (const event of events) { + yield event; + } + if (crashAfter) { + throw crashAfter.error; + } + })(); +} + +/** + * Build a mock streamText result object that looks like the AI SDK return. + */ +function buildStreamTextResult(fullStream: AsyncGenerator, opts?: { text?: string; responseId?: string }) { + return { + fullStream, + text: Promise.resolve(opts?.text ?? ''), + response: Promise.resolve({ id: opts?.responseId }), + }; +} + +/** + * Create a status-update event in the A2A wire format. + */ +function statusUpdateEvent( + taskId: string, + state: TaskState, + opts?: { + text?: string; + messageId?: string; + final?: boolean; + timestamp?: string; + } +): TaskStatusUpdateEvent { + const timestamp = opts?.timestamp ?? new Date().toISOString(); + return { + kind: 'status-update', + contextId: 'ctx-1', + taskId, + final: opts?.final ?? false, + status: { + state, + timestamp, + ...(opts?.text && { + message: { + kind: 'message' as const, + messageId: opts?.messageId ?? 'msg-1', + role: 'agent' as const, + parts: [{ kind: 'text' as const, text: opts.text }], + }, + }), + }, + }; +} + +/** + * Build a mock A2A client whose resubscribeTask returns an async generator. + * + * Yields all events, then throws crashError if provided. + * This means: + * - No events + crashError = throws immediately (simulates connection refused) + * - Events + crashError = yields events then throws (simulates mid-stream drop) + * - Events + no crash = yields events and returns normally (success) + */ +function buildMockClient(events: unknown[], crashError?: Error) { + return { + resubscribeTask: vi.fn(() => + // biome-ignore lint/suspicious/useAwait: async required for AsyncGenerator return type + (async function* () { + for (const event of events) { + yield event; + } + if (crashError) { + throw crashError; + } + })() + ), + }; +} + +/** + * Standard initial stream events that establish a working task. + * + * Real A2A streams follow this sequence: + * 1. response-metadata (AI SDK wrapper, captures taskId early) + * 2. task event (A2A protocol, carries initial task state) + * 3. status-update events (A2A protocol, carry state transitions + messages) + * + * When response-metadata fires first, handleTaskEvent skips because taskId is + * already captured. The status-update handler is what actually sets + * capturedTaskState. This helper produces a realistic initial sequence. + */ +function initialWorkingTaskEvents(taskId: string, text?: string) { + return [ + { type: 'response-metadata' as const, id: taskId }, + { + type: 'raw' as const, + rawValue: { + kind: 'task', + id: taskId, + status: { state: 'working' as TaskState }, + }, + }, + { + type: 'raw' as const, + rawValue: statusUpdateEvent(taskId, 'working', { + text: text ?? 'Working on your request...', + messageId: 'msg-initial', + }), + }, + ]; +} + +/** + * Extract connection-status blocks from a message's contentBlocks. + */ +function connectionStatuses(blocks: ContentBlock[]) { + return blocks.filter((b) => b.type === 'connection-status'); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('streamMessage - SSE reconnection via tasks/resubscribe', () => { + beforeEach(() => { + vi.useFakeTimers({ shouldAdvanceTime: true }); + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + const baseParams = { + prompt: 'hello agent', + agentId: 'agent-1', + agentCardUrl: 'http://localhost:8080/.well-known/agent-card.json', + model: 'test-model', + contextId: 'ctx-1', + }; + + // ------------------------------------------------------------------- + // Scenario 1: SSE drops mid-stream, single resubscribe recovers + // ------------------------------------------------------------------- + test('reconnects after SSE drop and recovers to completed state', async () => { + const TASK_ID = 'task-abc-123'; + const onMessageUpdate = vi.fn(); + + // Phase 1: Initial stream establishes task, emits some work, then crashes. + streamTextImpl = () => + buildStreamTextResult( + buildFullStream(initialWorkingTaskEvents(TASK_ID, 'Analyzing your request...'), { + error: new Error('Error during streaming for task-abc-123: network error'), + }), + { responseId: TASK_ID } + ); + + // Phase 2: Resubscribe stream picks up where we left off and completes. + const mockClient = buildMockClient([ + statusUpdateEvent(TASK_ID, 'working', { + text: 'Still working on it...', + messageId: 'msg-2', + }), + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Here is your answer.', + messageId: 'msg-3', + final: true, + }), + ]); + createA2AClientImpl = vi.fn(async () => mockClient); + + // Execute + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + // Assertions + expect(result.success).toBe(true); + expect(result.assistantMessage.taskId).toBe(TASK_ID); + expect(result.assistantMessage.taskState).toBe('completed'); + + // Verify resubscribeTask was called with correct task ID + expect(mockClient.resubscribeTask).toHaveBeenCalledWith({ id: TASK_ID }); + + // Verify connection status blocks were emitted + const finalBlocks = result.assistantMessage.contentBlocks; + const connBlocks = connectionStatuses(finalBlocks); + + // The reconnecting block replaces disconnected, then reconnected replaces reconnecting + // So the final state should have exactly one connection-status block: 'reconnected' + expect(connBlocks.length).toBeGreaterThanOrEqual(1); + expect(connBlocks.some((b) => b.type === 'connection-status' && b.status === 'reconnected')).toBe(true); + + // Verify the task-status-update blocks from both initial and resubscribe streams + const statusBlocks = finalBlocks.filter((b) => b.type === 'task-status-update'); + const statusTexts = statusBlocks.map((b) => (b.type === 'task-status-update' ? b.text : '')); + expect(statusTexts).toContain('Analyzing your request...'); + expect(statusTexts).toContain('Still working on it...'); + expect(statusTexts).toContain('Here is your answer.'); + }); + + // ------------------------------------------------------------------- + // Scenario 2: Task already completed when SSE drops - no resubscribe + // ------------------------------------------------------------------- + test('does not resubscribe when task reached terminal state before disconnect', async () => { + const TASK_ID = 'task-terminal'; + const onMessageUpdate = vi.fn(); + + const initialEvents = [ + ...initialWorkingTaskEvents(TASK_ID), + { + type: 'raw' as const, + rawValue: statusUpdateEvent(TASK_ID, 'completed', { + text: 'Done!', + messageId: 'msg-final', + final: true, + }), + }, + ]; + + streamTextImpl = () => + buildStreamTextResult( + buildFullStream(initialEvents, { + error: new Error('SSE event contained an error: Connection reset (Code: -1) Data: {}'), + }), + { responseId: TASK_ID } + ); + + createA2AClientImpl = vi.fn(async () => buildMockClient([])); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + // Task completed before crash, so no resubscribe attempt + expect(result.success).toBe(false); // The error is still thrown + expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled(); + + // Error should be an a2a-error block, but content before crash is preserved + const allBlocks = result.assistantMessage.contentBlocks; + expect(allBlocks.some((b) => b.type === 'a2a-error')).toBe(true); + // The "Done!" status block from before the crash should be preserved + expect(allBlocks.some((b) => b.type === 'task-status-update' && b.text === 'Done!')).toBe(true); + }); + + // ------------------------------------------------------------------- + // Scenario 3: No task ID captured - can't resubscribe + // ------------------------------------------------------------------- + test('does not resubscribe when no task ID was captured', async () => { + const onMessageUpdate = vi.fn(); + + // Stream that crashes before any task event + streamTextImpl = () => + buildStreamTextResult( + buildFullStream([], { + error: new Error('SSE event contained an error: Server error (Code: -32000) Data: {}'), + }) + ); + + createA2AClientImpl = vi.fn(async () => buildMockClient([])); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(false); + expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled(); + }); + + // ------------------------------------------------------------------- + // Scenario 4: Resubscribe stream also drops, retry succeeds on 2nd attempt + // ------------------------------------------------------------------- + test('retries resubscribe with exponential backoff after first resubscribe fails', async () => { + const TASK_ID = 'task-retry'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('network error') }), { + responseId: TASK_ID, + }); + + // First resubscribe: crashes immediately (empty events + crashError) + // Second resubscribe: succeeds with completed event + let callCount = 0; + createA2AClientImpl = vi.fn(() => { + callCount += 1; + if (callCount === 1) { + return Promise.resolve(buildMockClient([], new Error('resubscribe also failed'))); + } + return Promise.resolve( + buildMockClient([ + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Finally done.', + messageId: 'msg-final', + final: true, + }), + ]) + ); + }); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(true); + expect(result.assistantMessage.taskState).toBe('completed'); + + // Should have called createA2AClient twice (first attempt failed, second succeeded) + expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(2); + }); + + // ------------------------------------------------------------------- + // Scenario 5: All resubscribe attempts fail - gave-up + // ------------------------------------------------------------------- + test('gives up after max resubscribe attempts and reports error', async () => { + const TASK_ID = 'task-hopeless'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult( + buildFullStream(initialWorkingTaskEvents(TASK_ID, 'Starting...'), { error: new Error('network severed') }), + { responseId: TASK_ID } + ); + + // Every resubscribe attempt crashes immediately + createA2AClientImpl = vi.fn(async () => buildMockClient([], new Error('still down'))); + + // Total backoff: 1s + 2s + 4s + 8s + 16s = 31s. Use advanceTimersByTimeAsync + // to skip the delays without hitting the 30s test timeout. + vi.useRealTimers(); + vi.useFakeTimers({ shouldAdvanceTime: false }); + + const resultPromise = streamMessage({ ...baseParams, onMessageUpdate }); + + // Advance past all 5 backoff delays (31s total with some margin) + for (let i = 0; i < 5; i++) { + await vi.advanceTimersByTimeAsync(2 ** i * 1000 + 100); + } + + const result = await resultPromise; + + // Gave up - should be a failure + expect(result.success).toBe(false); + + // Should have attempted 5 times + expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(5); + + // Final blocks should contain a gave-up status + const allBlocks = result.assistantMessage.contentBlocks; + const gaveUp = allBlocks.find((b) => b.type === 'connection-status' && b.status === 'gave-up'); + expect(gaveUp).toBeDefined(); + if (gaveUp?.type === 'connection-status') { + expect(gaveUp.maxAttempts).toBe(5); + } + + // Should also have the original a2a-error + expect(allBlocks.some((b) => b.type === 'a2a-error')).toBe(true); + + // Content from before the crash should be preserved + expect(allBlocks.some((b) => b.type === 'task-status-update' && b.text === 'Starting...')).toBe(true); + }); + + // ------------------------------------------------------------------- + // Scenario 6: Task reaches terminal state during a failed resubscribe + // ------------------------------------------------------------------- + test('stops retrying if task reaches terminal state during a resubscribe attempt', async () => { + const TASK_ID = 'task-terminates-mid-resub'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('network error') }), { + responseId: TASK_ID, + }); + + // Resubscribe: yields completion event, then connection dies. + // The event handler updates state to 'completed' before the crash. + const partialClient = { + resubscribeTask: vi.fn(() => + // biome-ignore lint/suspicious/useAwait: async required for AsyncGenerator return type + (async function* () { + yield statusUpdateEvent(TASK_ID, 'completed', { + text: 'Task completed just in time.', + messageId: 'msg-done', + final: true, + }); + // Connection drops after delivering the completion event + throw new Error('connection reset after completion'); + })() + ), + }; + + createA2AClientImpl = vi.fn(async () => partialClient); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + // Should recover because the task reached terminal state during the attempt + expect(result.success).toBe(true); + expect(result.assistantMessage.taskState).toBe('completed'); + + // Only one createA2AClient call needed + expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(1); + + const statusBlocks = result.assistantMessage.contentBlocks.filter((b) => b.type === 'task-status-update'); + expect(statusBlocks.some((b) => b.type === 'task-status-update' && b.text === 'Task completed just in time.')).toBe( + true + ); + }); + + // ------------------------------------------------------------------- + // Scenario 7: Happy path - no disconnect at all + // ------------------------------------------------------------------- + test('completes normally without resubscribe when stream does not drop', async () => { + const TASK_ID = 'task-happy'; + const onMessageUpdate = vi.fn(); + + const events = [ + ...initialWorkingTaskEvents(TASK_ID, 'Processing...'), + { + type: 'raw' as const, + rawValue: statusUpdateEvent(TASK_ID, 'completed', { + text: 'All done!', + messageId: 'msg-2', + final: true, + }), + }, + ]; + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(events), { + text: 'All done!', + responseId: TASK_ID, + }); + + createA2AClientImpl = vi.fn(async () => buildMockClient([])); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(true); + expect(result.assistantMessage.taskState).toBe('completed'); + expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled(); + + // No connection-status blocks in the happy path + const connBlocks = connectionStatuses(result.assistantMessage.contentBlocks); + expect(connBlocks).toHaveLength(0); + }); + + // ------------------------------------------------------------------- + // Scenario 8: Connection status block replacement behavior + // ------------------------------------------------------------------- + test('replaces connection-status blocks so UI shows only latest status', async () => { + const TASK_ID = 'task-ui-blocks'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('disconnect') }), { + responseId: TASK_ID, + }); + + // Resubscribe succeeds on first try + const mockClient = buildMockClient([ + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Done.', + messageId: 'msg-done', + final: true, + }), + ]); + createA2AClientImpl = vi.fn(async () => mockClient); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + expect(result.success).toBe(true); + + // Track the connection-status progression through onMessageUpdate calls + const allUpdates = onMessageUpdate.mock.calls.map((args: unknown[]) => { + const msg = args[0] as { contentBlocks: ContentBlock[] }; + return msg.contentBlocks + .filter((b) => b.type === 'connection-status') + .map((b) => (b.type === 'connection-status' ? b.status : null)); + }); + + // At some point we should have seen: + // 1. ['disconnected'] + // 2. ['reconnecting'] (replaced disconnected) + // 3. ['reconnected'] (replaced reconnecting) + const flatStatuses = allUpdates.flat(); + expect(flatStatuses).toContain('disconnected'); + expect(flatStatuses).toContain('reconnecting'); + expect(flatStatuses).toContain('reconnected'); + }); + + // ------------------------------------------------------------------- + // Scenario 9: Does not resubscribe when taskId captured but no taskState + // ------------------------------------------------------------------- + test('does not resubscribe when taskId is captured but no task state was received', async () => { + const TASK_ID = 'task-no-state'; + const onMessageUpdate = vi.fn(); + + // Only emit response-metadata (sets capturedTaskId) but crash before any + // status-update (so capturedTaskState remains undefined). + streamTextImpl = () => + buildStreamTextResult( + buildFullStream([{ type: 'response-metadata' as const, id: TASK_ID }], { + error: new Error('network error'), + }), + { responseId: TASK_ID } + ); + + createA2AClientImpl = vi.fn(async () => buildMockClient([])); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(false); + // isResubscribable requires both capturedTaskId AND capturedTaskState + expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled(); + }); + + // ------------------------------------------------------------------- + // Scenario 10: Captures taskId from response.id fallback + // ------------------------------------------------------------------- + test('captures taskId from response metadata when no task events were emitted', async () => { + const TASK_ID = 'task-fallback-123'; + const onMessageUpdate = vi.fn(); + + // Stream with no raw task/status-update events, just text + streamTextImpl = () => + buildStreamTextResult(buildFullStream([{ type: 'text-delta', text: 'Hello world' }]), { + text: 'Hello world', + responseId: TASK_ID, + }); + + createA2AClientImpl = vi.fn(async () => buildMockClient([])); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(true); + expect(result.assistantMessage.taskId).toBe(TASK_ID); + expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled(); + }); + + // ------------------------------------------------------------------- + // Scenario 11: Does NOT capture taskId from response.id when it starts with "msg-" + // ------------------------------------------------------------------- + test('does not capture taskId from response metadata when id starts with msg-', async () => { + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream([{ type: 'text-delta', text: 'Hello' }]), { + text: 'Hello', + responseId: 'msg-12345', + }); + + createA2AClientImpl = vi.fn(async () => buildMockClient([])); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(true); + expect(result.assistantMessage.taskId).toBeUndefined(); + }); + + // ------------------------------------------------------------------- + // Scenario 12: Does not resubscribe when task state was "failed" before disconnect + // ------------------------------------------------------------------- + test('does not resubscribe when task state was "failed" before disconnect', async () => { + const TASK_ID = 'task-failed-before-crash'; + const onMessageUpdate = vi.fn(); + + const initialEvents = [ + ...initialWorkingTaskEvents(TASK_ID), + { + type: 'raw' as const, + rawValue: statusUpdateEvent(TASK_ID, 'failed', { + text: 'Agent error', + messageId: 'msg-fail', + final: true, + }), + }, + ]; + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialEvents, { error: new Error('connection lost') }), { + responseId: TASK_ID, + }); + + createA2AClientImpl = vi.fn(async () => buildMockClient([])); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + // "failed" is a terminal state -- no resubscribe + expect(result.success).toBe(false); + expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled(); + }); + + // ------------------------------------------------------------------- + // Scenario 13: Resubscribe stream ends cleanly but task not terminal -- retries + // ------------------------------------------------------------------- + test('retries when resubscribe stream ends cleanly but task is not terminal', async () => { + const TASK_ID = 'task-not-done-yet'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('dropped') }), { + responseId: TASK_ID, + }); + + // First resubscribe: stream ends cleanly but only has a "working" update (not terminal) + // Second resubscribe: delivers "completed" + let callCount = 0; + createA2AClientImpl = vi.fn(() => { + callCount += 1; + if (callCount === 1) { + return Promise.resolve( + buildMockClient([ + statusUpdateEvent(TASK_ID, 'working', { + text: 'Still going...', + messageId: 'msg-still', + }), + ]) + ); + } + return Promise.resolve( + buildMockClient([ + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Now done.', + messageId: 'msg-done', + final: true, + }), + ]) + ); + }); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(true); + expect(result.assistantMessage.taskState).toBe('completed'); + // First resubscribe ended cleanly but task wasn't terminal, so it retried + expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(2); + }); + + // ------------------------------------------------------------------- + // Scenario 13b: Attempt counter resets when resubscribe makes progress + // ------------------------------------------------------------------- + test('resets attempt counter when resubscribe delivers events, allowing unlimited retries with progress', async () => { + const TASK_ID = 'task-reset-attempts'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('dropped') }), { + responseId: TASK_ID, + }); + + // Each resubscribe delivers a "working" event (progress) but task never completes, + // then the stream drops. After 3 such cycles, the task finally completes. + let callCount = 0; + createA2AClientImpl = vi.fn(() => { + callCount += 1; + if (callCount <= 3) { + return Promise.resolve( + buildMockClient([ + statusUpdateEvent(TASK_ID, 'working', { + text: `Progress update ${callCount}`, + messageId: `msg-progress-${callCount}`, + }), + ]) + ); + } + return Promise.resolve( + buildMockClient([ + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Finally done after many reconnects.', + messageId: 'msg-final', + final: true, + }), + ]) + ); + }); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + expect(result.success).toBe(true); + expect(result.assistantMessage.taskState).toBe('completed'); + + // Should have called createA2AClient 4 times total (3 progress + 1 completion). + // Without attempt reset, it would have given up after 5 consecutive failures. + expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(4); + + // All progress updates should be preserved + const statusBlocks = result.assistantMessage.contentBlocks.filter((b) => b.type === 'task-status-update'); + const statusTexts = statusBlocks.map((b) => (b.type === 'task-status-update' ? b.text : '')); + expect(statusTexts).toContain('Progress update 1'); + expect(statusTexts).toContain('Progress update 2'); + expect(statusTexts).toContain('Progress update 3'); + expect(statusTexts).toContain('Finally done after many reconnects.'); + }); + + // ------------------------------------------------------------------- + // Scenario 13c: processResubscribeStream pushes "reconnected" on first event + // ------------------------------------------------------------------- + test('shows reconnected status as soon as resubscribe delivers first event', async () => { + const TASK_ID = 'task-reconnected-early'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('disconnect') }), { + responseId: TASK_ID, + }); + + const mockClient = buildMockClient([ + statusUpdateEvent(TASK_ID, 'working', { + text: 'Resumed work...', + messageId: 'msg-resumed', + }), + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Done.', + messageId: 'msg-done', + final: true, + }), + ]); + createA2AClientImpl = vi.fn(async () => mockClient); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + expect(result.success).toBe(true); + + // Track the connection-status progression through onMessageUpdate calls + const statusProgression: string[] = []; + for (const call of onMessageUpdate.mock.calls) { + const msg = call[0] as { contentBlocks: ContentBlock[] }; + const connBlocks = msg.contentBlocks.filter((b) => b.type === 'connection-status'); + for (const b of connBlocks) { + if (b.type === 'connection-status') { + const last = statusProgression.at(-1); + // Only track transitions + if (b.status !== last) { + statusProgression.push(b.status); + } + } + } + } + + // Should see: disconnected → reconnecting → reconnected (before content events) + expect(statusProgression).toContain('disconnected'); + expect(statusProgression).toContain('reconnecting'); + expect(statusProgression).toContain('reconnected'); + + // "reconnected" should appear before the "Resumed work..." content block + const idx = statusProgression.indexOf('reconnected'); + expect(idx).toBeLessThan(statusProgression.length); + }); + + // ------------------------------------------------------------------- + // Scenario 14: updateMessage called with correct args after recovery + // ------------------------------------------------------------------- + test('persists correct state to database after successful recovery', async () => { + const TASK_ID = 'task-db-check'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('dropped') }), { + responseId: TASK_ID, + }); + + const mockClient = buildMockClient([ + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Final answer.', + messageId: 'msg-final', + final: true, + }), + ]); + createA2AClientImpl = vi.fn(async () => mockClient); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + expect(result.success).toBe(true); + + // updateMessage is called during finalizeMessage after recovery + const mockedUpdate = vi.mocked(updateMessage); + const lastCall = mockedUpdate.mock.calls.at(-1); + expect(lastCall).toBeDefined(); + if (!lastCall) { + return; + } + + const [messageId, updates] = lastCall; + expect(messageId).toBe(result.assistantMessage.id); + expect(updates.isStreaming).toBe(false); + expect(updates.taskId).toBe(TASK_ID); + expect(updates.taskState).toBe('completed'); + + // Success path no longer stores contentBlocks in DB (fetched via tasks/get on reload) + expect(updates.contentBlocks).toBeUndefined(); + }); + + // ------------------------------------------------------------------- + // Scenario 15: TypeError in resubscribe rethrown immediately, no retry + // ------------------------------------------------------------------- + test('rethrows TypeError from resubscribe instead of retrying', async () => { + const TASK_ID = 'task-typeerror'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('dropped') }), { + responseId: TASK_ID, + }); + + // Resubscribe throws a TypeError (programming bug, not network) + createA2AClientImpl = vi.fn(async () => ({ + resubscribeTask: () => { + throw new TypeError('Cannot read properties of null'); + }, + })); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + // Should fail and NOT retry + expect(result.success).toBe(false); + expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(1); + + // The error block should contain the TypeError message + const errorBlock = result.assistantMessage.contentBlocks.find((b) => b.type === 'a2a-error'); + expect(errorBlock).toBeDefined(); + }); + + // ------------------------------------------------------------------- + // Scenario 16: finalizeMessage failure after recovery falls through to error path + // ------------------------------------------------------------------- + test('falls through to error path when finalizeMessage fails after recovery', async () => { + const TASK_ID = 'task-finalize-fail'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID), { error: new Error('dropped') }), { + responseId: TASK_ID, + }); + + const mockClient = buildMockClient([ + statusUpdateEvent(TASK_ID, 'completed', { + text: 'Done.', + messageId: 'msg-done', + final: true, + }), + ]); + createA2AClientImpl = vi.fn(async () => mockClient); + + // Make updateMessage reject on the finalizeMessage call (after recovery) + const mockedUpdate = vi.mocked(updateMessage); + mockedUpdate.mockRejectedValueOnce(new Error('DB write failed')); + + const result = await streamMessage({ ...baseParams, onMessageUpdate }); + + // Recovery succeeded but finalizeMessage failed -- falls through to error path + expect(result.success).toBe(false); + + // Task state should be preserved from recovery, not hardcoded to 'failed' + expect(result.assistantMessage.taskState).toBe('completed'); + + // Should have an a2a-error block from the fallthrough + const errorBlock = result.assistantMessage.contentBlocks.find((b) => b.type === 'a2a-error'); + expect(errorBlock).toBeDefined(); + }); + + // ------------------------------------------------------------------- + // Scenario 17: gave-up replaces stale reconnecting block (not appended) + // ------------------------------------------------------------------- + test('gave-up replaces the last reconnecting block instead of stacking', async () => { + const TASK_ID = 'task-gaveup-replace'; + const onMessageUpdate = vi.fn(); + + streamTextImpl = () => + buildStreamTextResult( + buildFullStream(initialWorkingTaskEvents(TASK_ID, 'Starting...'), { error: new Error('dropped') }), + { responseId: TASK_ID } + ); + + createA2AClientImpl = vi.fn(async () => buildMockClient([], new Error('still down'))); + + vi.useRealTimers(); + vi.useFakeTimers({ shouldAdvanceTime: false }); + + const resultPromise = streamMessage({ ...baseParams, onMessageUpdate }); + + for (let i = 0; i < 5; i++) { + await vi.advanceTimersByTimeAsync(2 ** i * 1000 + 100); + } + + const result = await resultPromise; + expect(result.success).toBe(false); + + // There should be exactly ONE connection-status block in the final message + // (gave-up replaced the last reconnecting), not two stacked. + const connBlocks = result.assistantMessage.contentBlocks.filter((b) => b.type === 'connection-status'); + expect(connBlocks).toHaveLength(1); + expect(connBlocks[0].type === 'connection-status' && connBlocks[0].status).toBe('gave-up'); + }); +}); + +// --------------------------------------------------------------------------- +// parseA2AError unit tests +// --------------------------------------------------------------------------- + +describe('parseA2AError', () => { + test('extracts code, message, and data from SSE error format', () => { + const error = new Error('SSE event contained an error: Connection reset (Code: -1) Data: {}'); + const result = parseA2AError(error); + + expect(result.code).toBe(-1); + expect(result.message).toBe('Connection reset'); + expect(result.data).toEqual({}); + }); + + test('extracts code and data from streaming error format', () => { + const error = new Error( + 'Error during streaming for task-abc: network timeout (Code: 500) Data: {"detail":"timeout"}' + ); + const result = parseA2AError(error); + + expect(result.code).toBe(500); + expect(result.data).toEqual({ detail: 'timeout' }); + }); + + test('returns code -1 and raw message for unstructured errors', () => { + const result = parseA2AError('something completely unexpected'); + + expect(result.code).toBe(-1); + expect(result.message).toBe('something completely unexpected'); + expect(result.data).toBeUndefined(); + }); + + test('handles invalid JSON in Data field gracefully', () => { + const error = new Error('SSE event contained an error: Bad (Code: -1) Data: {not-json}'); + const result = parseA2AError(error); + + expect(result.code).toBe(-1); + expect(result.data).toBeUndefined(); + }); + + test('returns "Unknown error" for empty string input', () => { + const result = parseA2AError(''); + + expect(result.code).toBe(-1); + expect(result.message).toBe('Unknown error'); + }); + + test('handles non-Error objects', () => { + const result = parseA2AError(42); + + expect(result.code).toBe(-1); + expect(result.message).toBe('42'); + }); + + test('strips streaming prefix from error without code', () => { + const result = parseA2AError(new Error('Error during streaming for task-xyz: connection refused')); + + expect(result.code).toBe(-1); + expect(result.message).toBe('connection refused'); + }); + + test('strips SSE prefix from error without code', () => { + const result = parseA2AError(new Error('SSE event contained an error: Connection refused')); + + expect(result.code).toBe(-1); + expect(result.message).toBe('Connection refused'); + }); +}); diff --git a/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.ts b/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.ts index 99aa14661b..6272a37710 100644 --- a/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.ts +++ b/frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.ts @@ -24,8 +24,10 @@ import { buildMessageWithContentBlocks, closeActiveTextBlock } from './message-b import type { ResponseMetadataEvent, StreamChunk, StreamingState, TextDeltaEvent } from './streaming-types'; import { a2a } from '../../a2a-provider'; import type { ChatMessage, ContentBlock } from '../types'; +import { createA2AClient } from '../utils/a2a-client'; import { saveMessage, updateMessage } from '../utils/database-operations'; import { createAssistantMessage } from '../utils/message-converter'; +import { resolveStaleToolBlocks } from '../utils/task-to-content-blocks'; /** * Regex patterns for parsing JSON-RPC error details from error messages. @@ -54,7 +56,7 @@ const ERROR_SUFFIX_CODE_REGEX = /\s*\(Code:\s*-?\d+\).*$/i; /** * Parse A2A/JSON-RPC error details from an error message string. */ -const parseA2AError = (error: unknown): JSONRPCError => { +export const parseA2AError = (error: unknown): JSONRPCError => { const errorMessage = error instanceof Error ? error.message : String(error); // Try to parse JSON-RPC error from the error message @@ -108,6 +110,215 @@ type StreamMessageResult = { success: boolean; }; +const TERMINAL_TASK_STATES = new Set(['completed', 'failed', 'canceled', 'rejected']); + +/** + * Maximum consecutive reconnection attempts before giving up. + * With exponential backoff (1s, 2s, 4s, 8s, 16s), total wait is ~31s. + * The counter resets on progress, so tasks making incremental progress + * can reconnect indefinitely. + */ +const MAX_RESUBSCRIBE_ATTEMPTS = 5; + +/** + * Check whether the current streaming state is eligible for resubscription. + * We can only resubscribe if we have a task ID and the task was still in-flight. + */ +const isResubscribable = (state: StreamingState): boolean => + !!state.capturedTaskId && !!state.capturedTaskState && !TERMINAL_TASK_STATES.has(state.capturedTaskState); + +/** + * Finalize a streaming message: close active blocks, persist to DB, and return the result. + */ +const finalizeMessage = async (state: StreamingState, assistantMessage: ChatMessage): Promise => { + closeActiveTextBlock(state.contentBlocks, state.activeTextBlock); + state.activeTextBlock = null; + + // Resolve tool blocks that never received a tool_response + resolveStaleToolBlocks(state.contentBlocks, state.capturedTaskState); + + const finalMessage = buildMessageWithContentBlocks({ + baseMessage: assistantMessage, + contentBlocks: state.contentBlocks, + taskId: state.capturedTaskId, + taskState: state.capturedTaskState, + taskStartIndex: state.taskIdCapturedAtBlockIndex, + usage: state.latestUsage, + }); + + // Only store minimal stub in DB -- full task content fetched via tasks/get on reload + await updateMessage(assistantMessage.id, { + isStreaming: false, + taskId: state.capturedTaskId, + taskState: state.capturedTaskState, + taskStartIndex: state.taskIdCapturedAtBlockIndex, + usage: state.latestUsage, + }); + + return { assistantMessage: finalMessage, success: true }; +}; + +/** + * Process events from an A2A resubscribe stream using the same handlers as the initial stream. + * Returns true if at least one event was successfully processed. + */ +const processResubscribeStream = async ( + stream: AsyncIterable<{ kind?: string }>, + state: StreamingState, + assistantMessage: ChatMessage, + onMessageUpdate: (message: ChatMessage) => void +): Promise => { + let receivedEvents = false; + for await (const event of stream) { + if (!event?.kind) { + continue; + } + + // On first real event, mark as reconnected so the spinner is replaced + // before event handlers push new content blocks after it. + if (!receivedEvents) { + receivedEvents = true; + pushConnectionStatus({ status: 'reconnected', state, assistantMessage, onMessageUpdate }); + } + + if (event.kind === 'task') { + handleTaskEvent(event as Task, state, assistantMessage, onMessageUpdate); + } else if (event.kind === 'status-update') { + handleStatusUpdateEvent(event as TaskStatusUpdateEvent, state, assistantMessage, onMessageUpdate); + } else if (event.kind === 'artifact-update') { + handleArtifactUpdateEvent(event as TaskArtifactUpdateEvent, state, assistantMessage, onMessageUpdate); + } + } + return receivedEvents; +}; + +type ConnectionStatusParams = { + status: 'disconnected' | 'reconnecting' | 'reconnected' | 'gave-up'; + state: StreamingState; + assistantMessage: ChatMessage; + onMessageUpdate: (message: ChatMessage) => void; + attempt?: number; +}; + +/** + * Insert a connection-status content block and push a UI update. + */ +const pushConnectionStatus = ({ + status, + state, + assistantMessage, + onMessageUpdate, + attempt, +}: ConnectionStatusParams): void => { + const block: ContentBlock = { + type: 'connection-status', + status, + attempt, + maxAttempts: MAX_RESUBSCRIBE_ATTEMPTS, + timestamp: new Date(), + }; + + // For transient/final statuses, replace the last connection-status block + // so the UI doesn't accumulate stale status lines. + if (status === 'reconnecting' || status === 'reconnected' || status === 'gave-up') { + const lastIdx = state.contentBlocks.length - 1; + if (lastIdx >= 0 && state.contentBlocks[lastIdx].type === 'connection-status') { + state.contentBlocks[lastIdx] = block; + onMessageUpdate( + buildMessageWithContentBlocks({ + baseMessage: assistantMessage, + contentBlocks: state.contentBlocks, + taskId: state.capturedTaskId, + taskState: state.capturedTaskState, + taskStartIndex: state.taskIdCapturedAtBlockIndex, + usage: state.latestUsage, + }) + ); + return; + } + } + + state.contentBlocks.push(block); + onMessageUpdate( + buildMessageWithContentBlocks({ + baseMessage: assistantMessage, + contentBlocks: state.contentBlocks, + taskId: state.capturedTaskId, + taskState: state.capturedTaskState, + taskStartIndex: state.taskIdCapturedAtBlockIndex, + usage: state.latestUsage, + }) + ); +}; + +/** + * Attempt to resubscribe to a running task after an SSE connection drop. + * Returns true if resubscription succeeded and the task reached a terminal state. + * + * The attempt counter resets whenever a resubscribe makes progress (delivers + * events), so the loop effectively retries indefinitely as long as the server + * is responsive. It only gives up after MAX_RESUBSCRIBE_ATTEMPTS consecutive + * failures with no progress. + */ +const resubscribeLoop = async ( + state: StreamingState, + agentCardUrl: string, + assistantMessage: ChatMessage, + onMessageUpdate: (message: ChatMessage) => void + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: retry loop with backoff, progress detection, and error classification +): Promise => { + const taskId = state.capturedTaskId; + if (!taskId) { + return false; + } + + const ctx = { state, assistantMessage, onMessageUpdate }; + pushConnectionStatus({ ...ctx, status: 'disconnected' }); + + let attempt = 0; + + while (attempt < MAX_RESUBSCRIBE_ATTEMPTS) { + attempt += 1; + // Exponential backoff: 1s, 2s, 4s, 8s, 16s + const delay = 2 ** (attempt - 1) * 1000; + await new Promise((resolve) => setTimeout(resolve, delay)); + + pushConnectionStatus({ ...ctx, status: 'reconnecting', attempt }); + + try { + const client = await createA2AClient(agentCardUrl); + const stream = client.resubscribeTask({ id: taskId }); + const receivedEvents = await processResubscribeStream(stream, state, assistantMessage, onMessageUpdate); + + // Task reached a terminal state — done. + if (state.capturedTaskState && TERMINAL_TASK_STATES.has(state.capturedTaskState)) { + return true; + } + + // Got data but task isn't terminal — stream dropped again. + // Reset attempts since we made progress, and signal a new disconnect. + if (receivedEvents) { + attempt = 0; + pushConnectionStatus({ ...ctx, status: 'disconnected' }); + } + } catch (resubError) { + // If task reached a terminal state during this attempt, we're done + if (state.capturedTaskState && TERMINAL_TASK_STATES.has(state.capturedTaskState)) { + return true; + } + // Programming errors (TypeError, ReferenceError) -- stop retrying immediately + if (resubError instanceof TypeError || resubError instanceof ReferenceError) { + break; + } + // Otherwise retry (network errors, SSE errors, etc.) + } + } + + // All retries exhausted with no progress + pushConnectionStatus({ ...ctx, status: 'gave-up' }); + return false; +}; + /** * Stream a message using the a2a provider */ @@ -129,6 +340,18 @@ export const streamMessage = async ({ // Notify caller about the new message onMessageUpdate(assistantMessage); + // Initialize streaming state before try so it's accessible in catch for resubscription + const state: StreamingState = { + contentBlocks: [], + activeTextBlock: null, + lastEventTimestamp: new Date(), + capturedTaskId: undefined, + capturedTaskState: undefined, + previousTaskState: undefined, + taskIdCapturedAtBlockIndex: undefined, + latestUsage: undefined, + }; + try { // Stream the response using a2a provider const streamResult = streamText({ @@ -147,18 +370,6 @@ export const streamMessage = async ({ includeRawChunks: true, // Enable raw events to capture taskId from task/status-update/artifact-update events }); - // Initialize streaming state with new contentBlocks approach - const state: StreamingState = { - contentBlocks: [], - activeTextBlock: null, - lastEventTimestamp: new Date(), - capturedTaskId: undefined, - capturedTaskState: undefined, - previousTaskState: undefined, - taskIdCapturedAtBlockIndex: undefined, - latestUsage: undefined, - }; - // Consume the full stream and process events for await (const chunk of streamResult.fullStream) { const streamChunk = chunk as StreamChunk; @@ -196,9 +407,6 @@ export const streamMessage = async ({ closeActiveTextBlock(state.contentBlocks, state.activeTextBlock); state.activeTextBlock = null; - // Get final text content (for backward compatibility with DB) - const finalContent = await streamResult.text; - // If we didn't capture taskId during streaming, try to get it from response metadata if (!state.capturedTaskId) { const responseMetadata = await streamResult.response; @@ -209,57 +417,27 @@ export const streamMessage = async ({ } } - // Build final message with all content blocks - const finalMessage = buildMessageWithContentBlocks({ - baseMessage: assistantMessage, - contentBlocks: state.contentBlocks, - taskId: state.capturedTaskId, - taskState: state.capturedTaskState, - taskStartIndex: state.taskIdCapturedAtBlockIndex, - usage: state.latestUsage, - }); - - // Extract artifacts and toolCalls from content blocks for DB compatibility - const artifacts = state.contentBlocks - .filter((block) => block.type === 'artifact') - .map((block) => ({ - artifactId: block.artifactId, - name: block.name, - description: block.description, - parts: block.parts, - })); - - const toolCalls = state.contentBlocks - .filter((block) => block.type === 'tool') - .map((block) => ({ - id: block.toolCallId, - name: block.toolName, - state: block.state, - input: block.input, - output: block.output, - errorText: block.errorText, - messageId: block.messageId || '', - timestamp: block.timestamp, - })); - - // Update database with final content blocks - await updateMessage(assistantMessage.id, { - content: finalContent, - isStreaming: false, - taskId: state.capturedTaskId, - taskState: state.capturedTaskState, - taskStartIndex: state.taskIdCapturedAtBlockIndex, - artifacts, - toolCalls, - contentBlocks: state.contentBlocks, // Store new format - usage: state.latestUsage, // Store usage metadata - }); - - return { - assistantMessage: finalMessage, - success: true, - }; + return await finalizeMessage(state, assistantMessage); } catch (error) { + // If the task is still in-flight, try to resubscribe before giving up + if (isResubscribable(state)) { + closeActiveTextBlock(state.contentBlocks, state.activeTextBlock); + state.activeTextBlock = null; + + const recovered = await resubscribeLoop(state, agentCardUrl, assistantMessage, onMessageUpdate); + if (recovered) { + try { + const result = await finalizeMessage(state, assistantMessage); + onMessageUpdate(result.assistantMessage); + return result; + } catch (finalizeError) { + // biome-ignore lint/suspicious/noConsole: intentional error logging for production observability + console.error('finalizeMessage failed after recovery:', finalizeError); + // fall through to error path + } + } + } + // Parse JSON-RPC error details const a2aError = parseA2AError(error); @@ -270,21 +448,25 @@ export const streamMessage = async ({ timestamp: new Date(), }; + // Append error to existing blocks (preserve any content received before disconnect) + const errorBlocks = [...state.contentBlocks, errorBlock]; + // Build message with error block const errorMessage = buildMessageWithContentBlocks({ baseMessage: assistantMessage, - contentBlocks: [errorBlock], - taskId: undefined, - taskState: 'failed', - taskStartIndex: undefined, + contentBlocks: errorBlocks, + taskId: state.capturedTaskId, + taskState: state.capturedTaskState ?? 'failed', + taskStartIndex: state.taskIdCapturedAtBlockIndex, }); // Update database with error await updateMessage(assistantMessage.id, { - content: '', isStreaming: false, - taskState: 'failed', - contentBlocks: [errorBlock], + taskId: state.capturedTaskId, + taskState: state.capturedTaskState ?? 'failed', + taskStartIndex: state.taskIdCapturedAtBlockIndex, + contentBlocks: errorBlocks, }); // Notify caller about error message diff --git a/frontend/src/components/pages/agents/details/a2a/chat/types.ts b/frontend/src/components/pages/agents/details/a2a/chat/types.ts index d8f410c405..d728e9ee13 100644 --- a/frontend/src/components/pages/agents/details/a2a/chat/types.ts +++ b/frontend/src/components/pages/agents/details/a2a/chat/types.ts @@ -60,6 +60,13 @@ export type ContentBlock = type: 'a2a-error'; error: JSONRPCError; timestamp: Date; + } + | { + type: 'connection-status'; + status: 'disconnected' | 'reconnecting' | 'reconnected' | 'gave-up'; + attempt?: number; + maxAttempts?: number; + timestamp: Date; }; // Message-level usage metadata (stored in database) diff --git a/frontend/src/components/pages/agents/details/a2a/chat/utils/database-operations.ts b/frontend/src/components/pages/agents/details/a2a/chat/utils/database-operations.ts index aa4684a708..1405c4535f 100644 --- a/frontend/src/components/pages/agents/details/a2a/chat/utils/database-operations.ts +++ b/frontend/src/components/pages/agents/details/a2a/chat/utils/database-operations.ts @@ -15,13 +15,21 @@ import { chatDb } from 'database/chat-db'; import { toast } from 'sonner'; import { formatToastErrorMessageGRPC } from 'utils/toast.utils'; -import { convertDbToComponent } from './message-converter'; +import { createA2AClient } from './a2a-client'; +import { buildMessageFromStoredBlocks, reconstructContentBlocks } from './message-converter'; +import { taskToContentBlocks } from './task-to-content-blocks'; import type { ChatMessage, ContentBlock } from '../types'; /** - * Load messages from database for a specific agent and context + * Load messages from database for a specific agent and context. + * Assistant messages with a taskId are hydrated via tasks/get from the A2A server. + * User messages and error-only assistant messages are reconstructed from DB. */ -export const loadMessages = async (agentId: string, contextId: string): Promise => { +export const loadMessages = async ( + agentId: string, + contextId: string, + agentCardUrl: string +): Promise => { try { const dbMessages = await chatDb.messages .where('agentId') @@ -29,7 +37,7 @@ export const loadMessages = async (agentId: string, contextId: string): Promise< .and((msg) => msg.contextId === contextId) .sortBy('timestamp'); - return convertDbToComponent(dbMessages); + return await hydrateMessages(dbMessages, agentCardUrl); } catch (loadChatError) { const connectError = ConnectError.from(loadChatError); toast.error(formatToastErrorMessageGRPC({ error: connectError, action: 'load', entity: 'chat' })); @@ -37,6 +45,122 @@ export const loadMessages = async (agentId: string, contextId: string): Promise< } }; +/** + * Hydrate DB message stubs into full ChatMessages. + * - User messages: reconstruct from DB (they store prompt text) + * - Assistant messages with taskId: fetch via tasks/get, convert to ContentBlocks + * - Assistant messages without taskId (errors): use stored contentBlocks from DB + */ +const hydrateMessages = async (dbMessages: ChatDbMessage[], agentCardUrl: string): Promise => { + // Collect taskIds that need fetching + const taskIds = dbMessages.filter((msg) => msg.sender === 'system' && msg.taskId).map((msg) => msg.taskId as string); + + // Fetch all tasks in parallel + const taskMap = await fetchTasks(taskIds, agentCardUrl); + + return dbMessages.map((dbMsg) => { + // User messages: reconstruct from DB + if (dbMsg.sender === 'user') { + return buildMessageFromStoredBlocks(dbMsg); + } + + // Assistant messages with taskId: use fetched task data + const task = dbMsg.taskId ? taskMap.get(dbMsg.taskId) : undefined; + if (dbMsg.taskId && task) { + return { + id: dbMsg.id, + role: 'assistant' as const, + contentBlocks: taskToContentBlocks(task), + timestamp: dbMsg.timestamp, + contextId: dbMsg.contextId, + taskId: dbMsg.taskId, + taskState: task.status.state as ChatMessage['taskState'], + taskStartIndex: 0, + usage: dbMsg.usage, + }; + } + + // Assistant messages with taskId but tasks/get failed: show placeholder + if (dbMsg.taskId && !taskMap.has(dbMsg.taskId)) { + return { + id: dbMsg.id, + role: 'assistant' as const, + contentBlocks: [ + { + type: 'task-status-update' as const, + taskState: dbMsg.taskState as ChatMessage['taskState'], + text: 'Task history unavailable (agent may be offline)', + final: true, + timestamp: dbMsg.timestamp, + }, + ], + timestamp: dbMsg.timestamp, + contextId: dbMsg.contextId, + taskId: dbMsg.taskId, + taskState: dbMsg.taskState as ChatMessage['taskState'], + taskStartIndex: 0, + usage: dbMsg.usage, + }; + } + + // Assistant messages without taskId: old error messages with stored contentBlocks + if (dbMsg.contentBlocks?.length) { + return buildMessageFromStoredBlocks(dbMsg); + } + + // Ancient messages: reconstruct from flat fields + return { + id: dbMsg.id, + role: 'assistant' as const, + contentBlocks: reconstructContentBlocks(dbMsg), + timestamp: dbMsg.timestamp, + contextId: dbMsg.contextId, + taskId: dbMsg.taskId, + taskState: dbMsg.taskState as ChatMessage['taskState'], + taskStartIndex: dbMsg.taskStartIndex, + usage: dbMsg.usage, + }; + }); +}; + +/** + * Fetch multiple tasks from the A2A server in parallel. + * Returns a Map of taskId -> Task for successful fetches. + * Failed fetches are silently omitted (caller handles missing entries). + */ +const fetchTasks = async ( + taskIds: string[], + agentCardUrl: string +): Promise> => { + const taskMap = new Map(); + if (taskIds.length === 0) { + return taskMap; + } + + try { + const client = await createA2AClient(agentCardUrl); + const results = await Promise.allSettled(taskIds.map((id) => client.getTask({ id }))); + + for (let i = 0; i < results.length; i++) { + const result = results[i]; + if (result.status === 'fulfilled') { + const response = result.value; + // GetTaskResponse = JSONRPCErrorResponse | GetTaskSuccessResponse + if ('error' in response) { + continue; + } + if ('result' in response) { + taskMap.set(taskIds[i], response.result as import('@a2a-js/sdk').Task); + } + } + } + } catch { + // Client creation failed -- all tasks unavailable + } + + return taskMap; +}; + /** * Serialize contentBlocks for database storage (Date → ISO string) */ @@ -51,6 +175,12 @@ const serializeContentBlocks = (blocks: ContentBlock[]): import('database/chat-d if (block.type === 'task-status-update') { return { ...block, timestamp: block.timestamp.toISOString() }; } + if (block.type === 'connection-status') { + return { ...block, timestamp: block.timestamp.toISOString() }; + } + if (block.type === 'a2a-error') { + return { ...block, timestamp: block.timestamp.toISOString() }; + } return block; }) as import('database/chat-db').ContentBlock[]; @@ -96,28 +226,11 @@ export const saveMessage = async ( export const updateMessage = async ( messageId: string, updates: { - content?: string; isStreaming?: boolean; taskId?: string; taskState?: string; taskStartIndex?: number; - artifacts?: Array<{ - artifactId: string; - name?: string; - description?: string; - parts: import('../types').ArtifactPart[]; - }>; - toolCalls?: Array<{ - id: string; - name: string; - state: string; - input?: unknown; - output?: unknown; - errorText?: string; - messageId: string; - timestamp: Date; - }>; - contentBlocks?: ContentBlock[]; // NEW: store content blocks + contentBlocks?: ContentBlock[]; usage?: { input_tokens: number; output_tokens: number; @@ -129,9 +242,7 @@ export const updateMessage = async ( } ): Promise => { try { - // Convert artifacts to database format if provided const dbUpdates: Parameters[1] = { - content: updates.content, isStreaming: updates.isStreaming, taskId: updates.taskId, taskState: updates.taskState, @@ -139,29 +250,7 @@ export const updateMessage = async ( usage: updates.usage, }; - if (updates.artifacts) { - dbUpdates.artifacts = updates.artifacts.map((art) => ({ - id: art.artifactId, - name: art.name, - description: art.description, - parts: art.parts, - })); - } - - if (updates.toolCalls) { - dbUpdates.toolCalls = updates.toolCalls.map((tool) => ({ - id: tool.id, - name: tool.name, - state: tool.state as 'input-available' | 'output-available' | 'output-error', - input: tool.input, - output: tool.output, - errorText: tool.errorText, - messageId: tool.messageId, - timestamp: tool.timestamp, - })); - } - - // NEW: Store contentBlocks if provided (serialize timestamps) + // Only store contentBlocks for error paths (a2a-error blocks aren't in Task) if (updates.contentBlocks) { dbUpdates.contentBlocks = serializeContentBlocks(updates.contentBlocks); } diff --git a/frontend/src/components/pages/agents/details/a2a/chat/utils/message-converter.ts b/frontend/src/components/pages/agents/details/a2a/chat/utils/message-converter.ts index 26c4170203..496cc6179a 100644 --- a/frontend/src/components/pages/agents/details/a2a/chat/utils/message-converter.ts +++ b/frontend/src/components/pages/agents/details/a2a/chat/utils/message-converter.ts @@ -17,13 +17,13 @@ import type { ChatMessage, ContentBlock } from '../types'; /** * Deserialize contentBlocks from database (ISO string → Date) */ -const deserializeContentBlocks = (dbBlocks: import('database/chat-db').ContentBlock[]): ContentBlock[] => +export const deserializeContentBlocks = (dbBlocks: import('database/chat-db').ContentBlock[]): ContentBlock[] => dbBlocks.map((block): ContentBlock => ({ ...block, timestamp: new Date(block.timestamp) }) as ContentBlock); /** * Build chat message from stored contentBlocks */ -const buildMessageFromStoredBlocks = (dbMsg: ChatDbMessage): ChatMessage => ({ +export const buildMessageFromStoredBlocks = (dbMsg: ChatDbMessage): ChatMessage => ({ id: dbMsg.id, role: dbMsg.sender === 'user' ? 'user' : 'assistant', contentBlocks: deserializeContentBlocks(dbMsg.contentBlocks ?? []), @@ -39,7 +39,7 @@ const buildMessageFromStoredBlocks = (dbMsg: ChatDbMessage): ChatMessage => ({ * Reconstruct contentBlocks from flat fields (backward compatibility) */ // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: complex business logic -const reconstructContentBlocks = (dbMsg: ChatDbMessage): ContentBlock[] => { +export const reconstructContentBlocks = (dbMsg: ChatDbMessage): ContentBlock[] => { const contentBlocks: ContentBlock[] = []; const now = new Date(); @@ -105,22 +105,19 @@ const reconstructContentBlocks = (dbMsg: ChatDbMessage): ContentBlock[] => { }; /** - * Convert database messages to component message format + * Convert database messages to component message format. + * Legacy entrypoint -- new code uses loadMessages() which hydrates via tasks/get. */ export const convertDbToComponent = (dbMessages: ChatDbMessage[]): ChatMessage[] => dbMessages.map((dbMsg) => { - // NEW: Prefer stored contentBlocks if available (preserves exact temporal structure) if (dbMsg.contentBlocks && dbMsg.contentBlocks.length > 0) { return buildMessageFromStoredBlocks(dbMsg); } - // FALLBACK: Reconstruct contentBlocks from flat fields (backward compatibility) - const contentBlocks = reconstructContentBlocks(dbMsg); - return { id: dbMsg.id, role: dbMsg.sender === 'user' ? 'user' : 'assistant', - contentBlocks, + contentBlocks: reconstructContentBlocks(dbMsg), timestamp: dbMsg.timestamp, contextId: dbMsg.contextId, taskId: dbMsg.taskId, diff --git a/frontend/src/components/pages/agents/details/a2a/chat/utils/task-to-content-blocks.test.ts b/frontend/src/components/pages/agents/details/a2a/chat/utils/task-to-content-blocks.test.ts new file mode 100644 index 0000000000..1eaa302265 --- /dev/null +++ b/frontend/src/components/pages/agents/details/a2a/chat/utils/task-to-content-blocks.test.ts @@ -0,0 +1,244 @@ +/** + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import { describe, expect, test } from 'vitest'; + +import { consolidateTextParts, resolveStaleToolBlocks, taskToContentBlocks } from './task-to-content-blocks'; +import type { ArtifactPart, ContentBlock } from '../types'; + +// --------------------------------------------------------------------------- +// consolidateTextParts +// --------------------------------------------------------------------------- + +describe('consolidateTextParts', () => { + test('merges consecutive text parts into one', () => { + const parts: ArtifactPart[] = [ + { kind: 'text', text: 'Hello' }, + { kind: 'text', text: ' ' }, + { kind: 'text', text: 'world' }, + ]; + + const result = consolidateTextParts(parts); + + expect(result).toEqual([{ kind: 'text', text: 'Hello world' }]); + }); + + test('preserves non-text parts in order', () => { + const parts: ArtifactPart[] = [ + { kind: 'text', text: 'before' }, + { kind: 'file', file: { name: 'img.png', mimeType: 'image/png', bytes: 'abc' } }, + { kind: 'text', text: 'after' }, + ]; + + const result = consolidateTextParts(parts); + + expect(result).toEqual([ + { kind: 'text', text: 'before' }, + { kind: 'file', file: { name: 'img.png', mimeType: 'image/png', bytes: 'abc' } }, + { kind: 'text', text: 'after' }, + ]); + }); + + test('merges text parts around non-text parts', () => { + const parts: ArtifactPart[] = [ + { kind: 'text', text: 'a' }, + { kind: 'text', text: 'b' }, + { kind: 'data', data: { key: 'value' } }, + { kind: 'text', text: 'c' }, + { kind: 'text', text: 'd' }, + ]; + + const result = consolidateTextParts(parts); + + expect(result).toEqual([ + { kind: 'text', text: 'ab' }, + { kind: 'data', data: { key: 'value' } }, + { kind: 'text', text: 'cd' }, + ]); + }); + + test('returns empty array for empty input', () => { + expect(consolidateTextParts([])).toEqual([]); + }); + + test('returns single text part unchanged', () => { + const parts: ArtifactPart[] = [{ kind: 'text', text: 'only' }]; + const result = consolidateTextParts(parts); + expect(result).toEqual([{ kind: 'text', text: 'only' }]); + }); + + test('does not mutate the input array', () => { + const parts: ArtifactPart[] = [ + { kind: 'text', text: 'a' }, + { kind: 'text', text: 'b' }, + ]; + const original = JSON.parse(JSON.stringify(parts)); + consolidateTextParts(parts); + expect(parts).toEqual(original); + }); +}); + +// --------------------------------------------------------------------------- +// resolveStaleToolBlocks +// --------------------------------------------------------------------------- + +describe('resolveStaleToolBlocks', () => { + const makeToolBlock = (state: 'input-available' | 'output-available' | 'output-error'): ContentBlock => ({ + type: 'tool', + toolCallId: `tool-${Math.random()}`, + toolName: 'search', + state, + input: {}, + timestamp: new Date(), + }); + + test('resolves input-available tools to output-available when task is completed', () => { + const blocks: ContentBlock[] = [makeToolBlock('input-available'), makeToolBlock('input-available')]; + resolveStaleToolBlocks(blocks, 'completed'); + + for (const block of blocks) { + if (block.type === 'tool') { + expect(block.state).toBe('output-available'); + } + } + }); + + test('resolves tools to output-error when task failed', () => { + for (const state of ['failed', 'canceled', 'rejected']) { + const blocks: ContentBlock[] = [makeToolBlock('input-available')]; + resolveStaleToolBlocks(blocks, state); + expect(blocks[0].type === 'tool' && blocks[0].state).toBe('output-error'); + } + }); + + test('does not resolve tools when task is still working', () => { + const blocks: ContentBlock[] = [makeToolBlock('input-available')]; + resolveStaleToolBlocks(blocks, 'working'); + expect(blocks[0].type === 'tool' && blocks[0].state).toBe('input-available'); + }); + + test('does not change tools already in output-available or output-error', () => { + const blocks: ContentBlock[] = [makeToolBlock('output-available'), makeToolBlock('output-error')]; + resolveStaleToolBlocks(blocks, 'completed'); + + expect(blocks[0].type === 'tool' && blocks[0].state).toBe('output-available'); + expect(blocks[1].type === 'tool' && blocks[1].state).toBe('output-error'); + }); + + test('does nothing when taskState is undefined', () => { + const blocks: ContentBlock[] = [makeToolBlock('input-available')]; + resolveStaleToolBlocks(blocks, undefined); + expect(blocks[0].type === 'tool' && blocks[0].state).toBe('input-available'); + }); +}); + +// --------------------------------------------------------------------------- +// taskToContentBlocks – artifact text consolidation +// --------------------------------------------------------------------------- + +// --------------------------------------------------------------------------- +// taskToContentBlocks – stale tool resolution +// --------------------------------------------------------------------------- + +describe('taskToContentBlocks – stale tool blocks', () => { + test('resolves tool blocks without tool_response to output-available on completed task', () => { + const task = { + id: 'task-1', + contextId: 'ctx-1', + status: { + state: 'completed' as const, + timestamp: new Date().toISOString(), + message: { + kind: 'message' as const, + messageId: 'msg-final', + role: 'agent' as const, + parts: [{ kind: 'text' as const, text: 'Done' }], + }, + }, + history: [ + { + kind: 'message' as const, + messageId: 'msg-1', + role: 'agent' as const, + parts: [ + { + kind: 'data' as const, + data: { id: 'call-1', name: 'search', arguments: { q: 'test' } }, + metadata: { data_type: 'tool_request' }, + }, + ], + }, + { + kind: 'message' as const, + messageId: 'msg-2', + role: 'agent' as const, + parts: [{ kind: 'text' as const, text: 'Here are the results.' }], + }, + ], + }; + + const blocks = taskToContentBlocks(task); + const toolBlock = blocks.find((b) => b.type === 'tool'); + + expect(toolBlock).toBeDefined(); + if (toolBlock?.type === 'tool') { + expect(toolBlock.state).toBe('output-available'); + expect(toolBlock.toolName).toBe('search'); + } + }); +}); + +// --------------------------------------------------------------------------- +// taskToContentBlocks – artifact text consolidation +// --------------------------------------------------------------------------- + +describe('taskToContentBlocks – artifact text consolidation', () => { + test('consolidates many text parts from tasks/get into a single text part', () => { + const task = { + id: 'task-1', + contextId: 'ctx-1', + status: { + state: 'completed' as const, + timestamp: new Date().toISOString(), + message: { + kind: 'message' as const, + messageId: 'msg-1', + role: 'agent' as const, + parts: [{ kind: 'text' as const, text: 'Done' }], + }, + }, + artifacts: [ + { + artifactId: 'art-1', + parts: [ + { kind: 'text' as const, text: '# Title\n\n' }, + { kind: 'text' as const, text: 'Some ' }, + { kind: 'text' as const, text: 'content ' }, + { kind: 'text' as const, text: 'here.' }, + ], + }, + ], + }; + + const blocks = taskToContentBlocks(task); + const artifactBlock = blocks.find((b) => b.type === 'artifact'); + + expect(artifactBlock).toBeDefined(); + if (artifactBlock?.type === 'artifact') { + // All four text parts should be consolidated into one + expect(artifactBlock.parts).toHaveLength(1); + expect(artifactBlock.parts[0]).toEqual({ + kind: 'text', + text: '# Title\n\nSome content here.', + }); + } + }); +}); diff --git a/frontend/src/components/pages/agents/details/a2a/chat/utils/task-to-content-blocks.ts b/frontend/src/components/pages/agents/details/a2a/chat/utils/task-to-content-blocks.ts new file mode 100644 index 0000000000..72deec0aa4 --- /dev/null +++ b/frontend/src/components/pages/agents/details/a2a/chat/utils/task-to-content-blocks.ts @@ -0,0 +1,273 @@ +/** + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import type { Artifact, Message1, Part, Task, TaskState } from '@a2a-js/sdk'; + +import type { ArtifactPart, ContentBlock, MessageUsageMetadata } from '../types'; + +/** + * Extract text content from a message's parts (agent messages only). + * Also builds a tool call summary string for display. + */ +const extractAgentMessageText = (parts: Part[]): string => { + const textParts = parts + .filter((part): part is Extract => part.kind === 'text') + .map((part) => part.text); + + const toolRequests = parts.filter((part): part is Extract => { + if (part.kind !== 'data') { + return false; + } + const metadata = part.metadata as Record | undefined; + const data = part.data as Record | undefined; + return metadata?.data_type === 'tool_request' && !!data?.name; + }); + + const toolNames = toolRequests.map((part) => (part.data as Record).name as string); + + let toolSummary = ''; + if (toolNames.length === 1) { + toolSummary = `Calling tool: ${toolNames[0]}`; + } else if (toolNames.length > 1) { + const toolCounts = toolNames.reduce( + (acc, tool) => { + acc[tool] = (acc[tool] || 0) + 1; + return acc; + }, + {} as Record + ); + const toolList = Object.entries(toolCounts) + .map(([tool, count]) => (count > 1 ? `${tool} (${count}×)` : tool)) + .join(', '); + toolSummary = `Calling ${toolNames.length} tools: ${toolList}`; + } + + const text = textParts.join(''); + if (text && toolSummary) { + return `${text}\n\n${toolSummary}`; + } + return toolSummary || text; +}; + +/** + * Extract tool request and tool response blocks from a message's data parts. + */ +const extractToolBlocks = (parts: Part[], messageId: string, timestamp: Date): ContentBlock[] => { + const blocks: ContentBlock[] = []; + + for (const part of parts) { + if (part.kind !== 'data' || !part.metadata?.data_type) { + continue; + } + + const dataType = part.metadata.data_type as string; + const data = part.data as Record; + + if (dataType === 'tool_request' && data?.id && data?.name) { + blocks.push({ + type: 'tool', + toolCallId: data.id as string, + toolName: data.name as string, + state: 'input-available', + input: 'arguments' in data ? data.arguments : undefined, + timestamp, + messageId, + }); + } + } + + return blocks; +}; + +/** + * Apply tool responses to existing tool blocks (mutates blocks in place). + */ +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: tool response matching logic +const applyToolResponses = (blocks: ContentBlock[], parts: Part[], timestamp: Date): void => { + for (const part of parts) { + if (part.kind !== 'data' || !part.metadata?.data_type) { + continue; + } + + const dataType = part.metadata.data_type as string; + const data = part.data as Record; + + if (dataType === 'tool_response' && data?.id) { + const existing = blocks.find((b) => b.type === 'tool' && b.toolCallId === (data.id as string)); + if (existing && existing.type === 'tool') { + const hasError = 'error' in data && data.error; + existing.state = hasError ? 'output-error' : 'output-available'; + if (hasError) { + existing.errorText = data.error as string; + } else { + existing.output = 'result' in data ? data.result : undefined; + } + existing.endTimestamp = timestamp; + } + } + } +}; + +/** + * Convert a Task's history messages into ContentBlock array for rendering. + * Only processes agent messages -- user messages are stored locally in IndexedDB. + */ +const processHistory = (history: Message1[]): ContentBlock[] => { + const blocks: ContentBlock[] = []; + + for (const message of history) { + if (message.role !== 'agent' || !message.parts?.length) { + continue; + } + + const timestamp = new Date(); + const text = extractAgentMessageText(message.parts); + + // Create status-update block for text content + if (text.trim()) { + blocks.push({ + type: 'task-status-update', + text, + messageId: message.messageId, + final: false, + timestamp, + usage: message.metadata?.usage as MessageUsageMetadata | undefined, + }); + } + + // Extract tool request blocks + const toolBlocks = extractToolBlocks(message.parts, message.messageId, timestamp); + blocks.push(...toolBlocks); + + // Apply tool responses to existing tool blocks + applyToolResponses(blocks, message.parts, timestamp); + } + + return blocks; +}; + +const TERMINAL_TASK_STATES = new Set(['completed', 'failed', 'canceled', 'rejected']); + +/** + * Resolve tool blocks still in 'input-available' state when the task is terminal. + * Many A2A agents never send tool_response data parts, so tool blocks would show + * a "Working" spinner forever. When the task has finished, we infer tool outcome + * from the task state (mutates blocks in place): + * - completed → output-available (tools succeeded) + * - failed/canceled/rejected → output-error (task didn't succeed) + */ +export const resolveStaleToolBlocks = (blocks: ContentBlock[], taskState: string | undefined): void => { + if (!(taskState && TERMINAL_TASK_STATES.has(taskState))) { + return; + } + const resolvedState = taskState === 'completed' ? 'output-available' : 'output-error'; + for (const block of blocks) { + if (block.type === 'tool' && block.state === 'input-available') { + block.state = resolvedState; + } + } +}; + +/** + * Merge consecutive text parts into a single text part. + * Non-text parts are preserved in order. This prevents the reload problem + * where tasks/get returns many individual text parts (one per streamed chunk) + * that would each render as a separate Response component. + */ +export const consolidateTextParts = (parts: ArtifactPart[]): ArtifactPart[] => { + const result: ArtifactPart[] = []; + for (const part of parts) { + if (part.kind === 'text') { + const prev = result.at(-1); + if (prev?.kind === 'text') { + prev.text += part.text; + continue; + } + } + // Clone the part so the caller's array isn't mutated + result.push(part.kind === 'text' ? { kind: 'text', text: part.text } : part); + } + return result; +}; + +/** + * Convert Task artifacts to ContentBlock array. + */ +const processArtifacts = (artifacts: Artifact[]): ContentBlock[] => + artifacts + .filter((artifact) => artifact.parts?.length > 0) + .map((artifact) => ({ + type: 'artifact' as const, + artifactId: artifact.artifactId, + name: artifact.name, + description: artifact.description, + parts: consolidateTextParts( + artifact.parts.map((part): ArtifactPart => { + if (part.kind === 'text') { + return { kind: 'text', text: part.text }; + } + if (part.kind === 'file') { + return { + kind: 'file', + file: { + name: part.file.name, + mimeType: part.file.mimeType ?? 'application/octet-stream', + ...('bytes' in part.file ? { bytes: part.file.bytes } : {}), + ...('uri' in part.file ? { uri: part.file.uri } : {}), + }, + }; + } + // DataPart + return { kind: 'data', data: part.data }; + }) + ), + timestamp: new Date(), + })); + +/** + * Convert a Task object (from tasks/get) to the ContentBlock[] format + * that the chat UI renders. + * + * The mapping: + * - task.history (agent messages) -> task-status-update + tool blocks + * - task.artifacts -> artifact blocks + * - task.status -> final task-status-update block + */ +export const taskToContentBlocks = (task: Task): ContentBlock[] => { + const blocks: ContentBlock[] = []; + + // Process history messages + if (task.history?.length) { + blocks.push(...processHistory(task.history)); + } + + // Process artifacts + if (task.artifacts?.length) { + blocks.push(...processArtifacts(task.artifacts)); + } + + // Add final status block + blocks.push({ + type: 'task-status-update', + taskState: task.status.state as TaskState, + text: task.status.message?.parts + ?.filter((p): p is Extract => p.kind === 'text') + .map((p) => p.text) + .join(''), + final: true, + timestamp: task.status.timestamp ? new Date(task.status.timestamp) : new Date(), + }); + + // Resolve tool blocks that never received a tool_response + resolveStaleToolBlocks(blocks, task.status.state); + + return blocks; +}; diff --git a/frontend/src/database/chat-db.ts b/frontend/src/database/chat-db.ts index 569d728822..5ddbd4e894 100644 --- a/frontend/src/database/chat-db.ts +++ b/frontend/src/database/chat-db.ts @@ -72,6 +72,18 @@ export type ContentBlock = cached_tokens?: number; reasoning_tokens?: number; }; + } + | { + type: 'connection-status'; + status: 'disconnected' | 'reconnecting' | 'reconnected' | 'gave-up'; + attempt?: number; + maxAttempts?: number; + timestamp: string; + } + | { + type: 'a2a-error'; + error: { code: number; message: string; data?: Record }; + timestamp: string; }; export type ChatMessage = {