frontend: SSE reconnection via tasks/resubscribe#2224
Conversation
When an SSE connection drops mid-stream, the UI currently treats it as a fatal error even though the task is likely still running server-side. Wire up the A2A SDK's resubscribeTask() method to transparently recover from connection drops. On SSE failure, if the task was still in-flight (working/submitted), we retry up to 5 times with exponential backoff (1s, 2s, 4s, 8s, 16s). The reconnection state is shown inline via connection-status content blocks so the user sees what's happening. Key changes: - Bump @a2a-js/sdk to ^0.3.10 (adds resubscribeTask) - Add connection-status ContentBlock variant (types + DB + serialization) - Extract finalizeMessage() from streamMessage() so both happy path and recovery path share the same finalization logic - Move streaming state init before try block so catch can inspect it - Add resubscribeLoop() with exponential backoff - New ConnectionStatusBlock component for inline reconnection UI - Error path now preserves content received before disconnect
Eight scenarios covering the full reconnection state machine: 1. SSE drops mid-stream, single resubscribe recovers to completion 2. Task already completed before disconnect - no resubscribe needed 3. No task ID captured - resubscribe impossible 4. First resubscribe fails, second succeeds (exponential backoff) 5. All 5 resubscribe attempts fail - gave-up with error preserved 6. Task reaches terminal state during a failing resubscribe attempt 7. Happy path baseline - no disconnect at all 8. Connection-status block replacement (UI shows only latest status) Tests mock the AI SDK streamText return and the A2A client's resubscribeTask() to simulate realistic SSE connection drops with async generators that yield events then throw.
- Move "reconnected" status after stream processing completes and task reaches terminal state, not before (premature success signal) - Continue retrying when resubscribe stream ends cleanly but task hasn't reached terminal state (server-side graceful close) - Rethrow TypeError/ReferenceError in resubscribe catch instead of silently retrying programming errors with exponential backoff - Wrap recovery finalizeMessage in try/catch so a DB write failure falls through to the error path instead of becoming an unhandled rejection - Add a2a-error variant to DB ContentBlock type and serialization (was missing, causing Date objects stored where strings expected) - Guard maxAttempts with ?? '?' in gave-up state (parity with reconnecting branch) - Export parseA2AError for direct unit testing - Add 13 new tests: parseA2AError (7), taskId fallback from response.id (2), terminal state variants (1), clean-close retry (1), no-taskState no-resubscribe (1), DB persistence after recovery (1) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- gave-up block now replaces stale reconnecting block instead of stacking two alerts on top of each other - Error path uses state.capturedTaskState ?? 'failed' instead of hardcoding 'failed', so a successful recovery that fails on DB write preserves the actual task outcome - Programming errors (TypeError, ReferenceError) in resubscribe break out of retry loop immediately instead of rethrowing through the catch block (which would escape as unhandled rejection) - Add tests for: TypeError breaks retry loop (scenario 15), finalizeMessage failure fallthrough (scenario 16), gave-up replaces reconnecting (scenario 17), SSE prefix stripping without code (parseA2AError) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Stop storing full task content (contentBlocks, artifacts, toolCalls) in IndexedDB. The authoritative task state lives on the A2A server. On reload, assistant messages are hydrated via tasks/get and converted back to ContentBlock[] through a new taskToContentBlocks() utility. Only minimal stubs (taskId, taskState, usage) are persisted in DB. Error-path a2a-error blocks are still stored since they're client-side artifacts not available through tasks/get. Also hide the context ID bar when there are no messages (after clear), while keeping headerActions visible.
On reload, tasks/get returns artifacts with many individual text parts (one per streamed chunk). Each rendered as a separate Response component, breaking markdown formatting. Add consolidateTextParts() to merge consecutive text parts into one before rendering. Also add resolveStaleToolBlocks() to transition tool blocks from 'input-available' (Working spinner) to the correct terminal state when the task has finished. Many A2A agents never send tool_response events, leaving tools stuck with an animated Working badge forever.
- Reset attempt counter when resubscribe makes progress, allowing unlimited retries as long as the server keeps responding - Push 'reconnected' status on first resubscribe event so the UI spinner is replaced before new content arrives - Call resolveStaleToolBlocks in finalizeMessage so tool badges reflect the correct state when streaming ends
When all resubscribe attempts are exhausted, the gave-up alert now shows a 'Reload to check status' button. Reloading triggers tasks/get which fetches the current task state from the server.
e9e41b5 to
23dcda4
Compare
|
@claude Review the PR |
|
Claude finished @weeco's task in 2m 11s —— View job PR Review: SSE Reconnection via tasks/resubscribeComprehensive code review completed. Overall, this is an excellent implementation of SSE reconnection with strong test coverage and thoughtful error handling. The code is well-structured, thoroughly tested, and includes helpful documentation. Tasks
✅ Strengths
🔍 Issues & SuggestionsCritical Issues1. Race Condition in Connection Status Block Replacement (use-message-streaming.ts:216-232) The current logic replaces the last block if it's a // Current code at line 216-232
if (status === 'reconnecting' || status === 'reconnected' || status === 'gave-up') {
const lastIdx = state.contentBlocks.length - 1;
if (lastIdx >= 0 && state.contentBlocks[lastIdx].type === 'connection-status') {
state.contentBlocks[lastIdx] = block;
// ... update UI
return;
}
}Problem: If Suggestion: Track the connection status block index explicitly in type StreamingState = {
// ... existing fields
connectionStatusBlockIndex?: number; // NEW
};
const pushConnectionStatus = ({ status, state, ... }: ConnectionStatusParams): void => {
const block: ContentBlock = { /* ... */ };
if (status === 'reconnecting' || status === 'reconnected' || status === 'gave-up') {
if (state.connectionStatusBlockIndex !== undefined &&
state.contentBlocks[state.connectionStatusBlockIndex]?.type === 'connection-status') {
state.contentBlocks[state.connectionStatusBlockIndex] = block;
onMessageUpdate(/* ... */);
return;
}
}
// First connection-status or new disconnection
state.connectionStatusBlockIndex = state.contentBlocks.length;
state.contentBlocks.push(block);
onMessageUpdate(/* ... */);
};This ensures we always replace the correct block even if other content arrives between status updates. Medium Priority Issues2. Unclear Finalize Behavior After Recovery Failure (use-message-streaming.ts:420-429) const recovered = await resubscribeLoop(state, agentCardUrl, assistantMessage, onMessageUpdate);
if (recovered) {
try {
const result = await finalizeMessage(state, assistantMessage);
onMessageUpdate(result.assistantMessage);
return result;
} catch {
// finalizeMessage failed (e.g. DB write) -- fall through to error path
}
}Issue: The empty catch block swallows the error details. While test scenario 16 covers this case, production debugging would benefit from logging. Suggestion: Log the error for observability: } catch (finalizeError) {
console.error('finalizeMessage failed after recovery:', finalizeError);
// fall through to error path
}3. Potential Memory Leak in Long-Running Reconnection (use-message-streaming.ts:256-313) The Suggestion: Consider explicit cleanup if the SDK provides it: let currentClient: A2AClient | null = null;
while (attempt < MAX_RESUBSCRIBE_ATTEMPTS) {
// ...
try {
currentClient?.close?.(); // If SDK provides cleanup method
currentClient = await createA2AClient(agentCardUrl);
const stream = currentClient.resubscribeTask({ id: taskId });
// ...
}
}4. Hardcoded Usage Metadata Structure (types.ts:73-80, database-operations.ts:234-241) The usage metadata structure is duplicated and hardcoded across multiple files. If the SDK changes the usage structure, this requires updates in several places. Current: Explicit type definition repeated: // types.ts
export type MessageUsageMetadata = {
input_tokens: number;
output_tokens: number;
total_tokens: number;
max_input_tokens?: number;
cached_tokens?: number;
reasoning_tokens?: number;
};
// database-operations.ts
usage?: {
input_tokens: number;
output_tokens: number;
// ... repeated
};Suggestion: Import the type from SDK if available, or extract to a shared type definition: import type { UsageMetadata } from '@a2a-js/sdk'; // If available
// OR create a shared type module5. Type Mismatch in Database Schema (chat-db.ts:67-74) The Current: // chat-db.ts (DB types) - timestamp: string
type: 'connection-status';
timestamp: string;
// types.ts (runtime types) - timestamp: Date
type: 'connection-status';
timestamp: Date;Suggestion: Document this explicitly or use branded types: // In database-operations.ts
type SerializedContentBlock = ContentBlock & {
timestamp: string; // ISO 8601
};Minor Issues6. Biome Disable Comment Justification (use-message-streaming.ts:261, 326) Two // Line 261
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: retry loop with backoff, progress detection, and error classificationSuggestion: While the complexity is justified, consider extracting helper functions like:
This would reduce cognitive complexity and improve testability. 7. Magic Number for Max Attempts (use-message-streaming.ts:114) const MAX_RESUBSCRIBE_ATTEMPTS = 5;Suggestion: Consider making this configurable or at least document the rationale for 5 attempts. Total backoff time is 31 seconds (1+2+4+8+16), which seems reasonable but might warrant explanation. /**
* Maximum consecutive reconnection attempts before giving up.
* With exponential backoff (1s, 2s, 4s, 8s, 16s), total retry time is ~31s.
* The counter resets on any progress, so tasks making incremental progress
* can reconnect indefinitely.
*/
const MAX_RESUBSCRIBE_ATTEMPTS = 5;8. Connection Status Component Button Behavior (connection-status-block.tsx:69-72) The "Reload to check status" button triggers a full page reload, which might be jarring: <Button onClick={() => window.location.reload()} size="sm" variant="outline">
<RefreshCwIcon />
Reload to check status
</Button>Suggestion: Consider a gentler retry mechanism first (retry resubscription once more) before forcing a reload, or use soft reload navigation if using a router. 9. Test Timer Handling Inconsistency (use-message-streaming.test.ts:382-391) Most tests use vi.useRealTimers();
vi.useFakeTimers({ shouldAdvanceTime: false });Suggestion: Document why this test requires manual advancement while others use auto-advance. This prevents confusion during future test maintenance. 10. Missing Error Code Constants (use-message-streaming.ts:49-54) The regex patterns parse JSON-RPC error codes, but the code doesn't define constants for common error codes (like -1 for unknown, -32000 for server error, etc.). Suggestion: Define error code constants if you need to handle specific codes differently: const JSON_RPC_ERROR_CODES = {
UNKNOWN: -1,
SERVER_ERROR: -32000,
INVALID_REQUEST: -32600,
// ... etc
} as const;📋 Additional ObservationsGood Practices Observed:
Minor Suggestions:
🎯 VerdictThis PR demonstrates high-quality engineering with:
The critical issue (#1) should be addressed before merging to prevent potential UI glitches in production. The other issues are enhancements that could be addressed in follow-up PRs. Recommendation: ✅ Approve after addressing the race condition in issue #1 Great work on this resilient reconnection implementation! The exponential backoff with progress-based retry reset is particularly clever for handling long-running tasks. |
Document MAX_RESUBSCRIBE_ATTEMPTS with backoff timing rationale. Log finalizeMessage errors after recovery for production observability.
What
When an SSE connection drops mid-stream, transparently reconnect using the A2A protocol's
tasks/resubscribeendpoint instead of showing a fatal error. Bump@a2a-js/sdkfrom 0.3.5 to 0.3.10.Why
Long-running agent tasks are particularly vulnerable to transient network failures. A brief WiFi hiccup or load balancer timeout shouldn't kill the entire conversation. The task keeps running server-side -- we just lost the event stream and need to pick it back up.
Implementation details
The core logic lives in
use-message-streaming.ts. WhenstreamMessage()catches an error and the task is still in-flight (has a task ID, state is not terminal), it enters a resubscribe loop:disconnectedconnection-status blockA2AClientand callsresubscribeTask({ id: taskId })gave-upblock and fall through to the existing error pathNew
connection-statusContentBlock variant flows through the full type system: runtime types, DB schema, serialization, deserialization, and a newConnectionStatusBlockUI component.The
reconnecting/reconnectedstatus blocks replace each other in-place so the UI doesn't accumulate stale status lines during retries.Test suite covers 8 scenarios: successful stream, error handling, reconnection success, reconnection after partial content, all retries exhausted, non-resubscribable errors, error parsing, and task completion during reconnection.
References
tasks/resubscribe: https://google.github.io/A2A/#/topics/streaming?id=disconnection-and-recovery