diff --git a/.changeset/fix-multiagent-partial-failure.md b/.changeset/fix-multiagent-partial-failure.md new file mode 100644 index 00000000..5b30ff13 --- /dev/null +++ b/.changeset/fix-multiagent-partial-failure.md @@ -0,0 +1,5 @@ +--- +"@adcp/client": patch +--- + +Fix multi-agent partial failure handling using Promise.allSettled diff --git a/src/lib/core/ADCPMultiAgentClient.ts b/src/lib/core/ADCPMultiAgentClient.ts index 4327400c..86c0fa32 100644 --- a/src/lib/core/ADCPMultiAgentClient.ts +++ b/src/lib/core/ADCPMultiAgentClient.ts @@ -66,6 +66,34 @@ export class AgentCollection { } } + private async executeAllSettled( + operation: (client: AgentClient) => Promise> + ): Promise[]> { + const clients = Array.from(this.clients.values()); + const results = await Promise.allSettled(clients.map(client => operation(client))); + + return results.map((result, index) => { + if (result.status === 'fulfilled') { + return result.value; + } + const client = clients[index]; + return { + success: false, + status: 'completed' as const, + error: result.reason instanceof Error ? result.reason.message : String(result.reason), + metadata: { + taskId: '', + taskName: '', + agent: client.getAgent(), + responseTimeMs: 0, + timestamp: new Date().toISOString(), + clarificationRounds: 0, + status: 'failed' as const, + }, + }; + }); + } + // ====== PARALLEL TASK EXECUTION ====== /** @@ -76,8 +104,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => client.getProducts(params, inputHandler, options)); - return Promise.all(promises); + return this.executeAllSettled(client => client.getProducts(params, inputHandler, options)); } /** @@ -88,10 +115,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.listCreativeFormats(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.listCreativeFormats(params, inputHandler, options)); } /** @@ -103,10 +127,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.createMediaBuy(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.createMediaBuy(params, inputHandler, options)); } /** @@ -117,10 +138,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.updateMediaBuy(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.updateMediaBuy(params, inputHandler, options)); } /** @@ -131,10 +149,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.syncCreatives(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.syncCreatives(params, inputHandler, options)); } /** @@ -145,10 +160,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.listCreatives(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.listCreatives(params, inputHandler, options)); } /** @@ -159,10 +171,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.getMediaBuyDelivery(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.getMediaBuyDelivery(params, inputHandler, options)); } /** @@ -173,10 +182,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.listAuthorizedProperties(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.listAuthorizedProperties(params, inputHandler, options)); } /** @@ -187,10 +193,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.providePerformanceFeedback(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.providePerformanceFeedback(params, inputHandler, options)); } /** @@ -201,8 +204,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => client.getSignals(params, inputHandler, options)); - return Promise.all(promises); + return this.executeAllSettled(client => client.getSignals(params, inputHandler, options)); } /** @@ -213,10 +215,7 @@ export class AgentCollection { inputHandler?: InputHandler, options?: TaskOptions ): Promise[]> { - const promises = Array.from(this.clients.values()).map(client => - client.activateSignal(params, inputHandler, options) - ); - return Promise.all(promises); + return this.executeAllSettled(client => client.activateSignal(params, inputHandler, options)); } // ====== COLLECTION UTILITIES ====== @@ -270,11 +269,12 @@ export class AgentCollection { } /** - * Execute a custom function on all agents in parallel + * Execute a custom function on all agents in parallel. + * Returns PromiseSettledResult array so callers can handle partial failures. */ - async execute(executor: (agent: AgentClient) => Promise): Promise { + async execute(executor: (agent: AgentClient) => Promise): Promise[]> { const promises = Array.from(this.clients.values()).map(executor); - return Promise.all(promises); + return Promise.allSettled(promises); } } @@ -757,8 +757,10 @@ export class ADCPMultiAgentClient { */ async listAllTasks(): Promise { const taskPromises = Array.from(this.agentClients.values()).map(agent => agent.listTasks()); - const taskArrays = await Promise.all(taskPromises); - return taskArrays.flat(); + const results = await Promise.allSettled(taskPromises); + return results + .filter((r): r is PromiseFulfilledResult => r.status === 'fulfilled') + .flatMap(r => r.value); } /** @@ -772,8 +774,10 @@ export class ADCPMultiAgentClient { const agent = this.agentClients.get(agentId); return agent ? agent.listTasks() : Promise.resolve([]); }); - const taskArrays = await Promise.all(taskPromises); - return taskArrays.flat(); + const results = await Promise.allSettled(taskPromises); + return results + .filter((r): r is PromiseFulfilledResult => r.status === 'fulfilled') + .flatMap(r => r.value); } /** @@ -857,7 +861,7 @@ export class ADCPMultiAgentClient { const promises = Array.from(this.agentClients.values()).map(agent => agent.registerWebhook(`${webhookUrl}?agentId=${agent.getAgentId()}`, taskTypes) ); - await Promise.all(promises); + await Promise.allSettled(promises); } /** @@ -865,7 +869,7 @@ export class ADCPMultiAgentClient { */ async unregisterAllWebhooks(): Promise { const promises = Array.from(this.agentClients.values()).map(agent => agent.unregisterWebhook()); - await Promise.all(promises); + await Promise.allSettled(promises); } /** diff --git a/test/lib/multi-agent-partial-failure.test.js b/test/lib/multi-agent-partial-failure.test.js new file mode 100644 index 00000000..06e72041 --- /dev/null +++ b/test/lib/multi-agent-partial-failure.test.js @@ -0,0 +1,237 @@ +const { test, describe, beforeEach, mock } = require('node:test'); +const assert = require('node:assert'); + +const { AgentCollection } = require('../../dist/lib/core/ADCPMultiAgentClient.js'); + +describe('AgentCollection partial failure handling', () => { + const agent1Config = { + id: 'agent1', + name: 'Agent One', + agent_uri: 'https://agent1.example.com', + protocol: 'mcp', + }; + + const agent2Config = { + id: 'agent2', + name: 'Agent Two', + agent_uri: 'https://agent2.example.com', + protocol: 'a2a', + }; + + describe('getProducts with partial failure', () => { + test('returns results from successful agents when one fails', async () => { + const collection = new AgentCollection([agent1Config, agent2Config]); + const clients = collection.getAllAgents(); + + const successResult = { + success: true, + status: 'completed', + data: { products: [{ name: 'Product A' }] }, + metadata: { + taskId: 'task-1', + taskName: 'get_products', + agent: { id: 'agent1', name: 'Agent One', protocol: 'mcp' }, + responseTimeMs: 100, + timestamp: new Date().toISOString(), + clarificationRounds: 0, + status: 'completed', + }, + }; + + clients[0].getProducts = mock.fn(async () => successResult); + clients[1].getProducts = mock.fn(async () => { + throw new Error('Network timeout'); + }); + + const results = await collection.getProducts({ brief: 'test brief' }); + + assert.strictEqual(results.length, 2); + assert.strictEqual(results[0].success, true); + assert.strictEqual(results[1].success, false); + assert.deepStrictEqual(results[0].data, { products: [{ name: 'Product A' }] }); + }); + + test('failed agent result contains error message', async () => { + const collection = new AgentCollection([agent1Config, agent2Config]); + const clients = collection.getAllAgents(); + + clients[0].getProducts = mock.fn(async () => ({ + success: true, + status: 'completed', + data: { products: [] }, + metadata: { + taskId: 'task-1', + taskName: 'get_products', + agent: { id: 'agent1', name: 'Agent One', protocol: 'mcp' }, + responseTimeMs: 100, + timestamp: new Date().toISOString(), + clarificationRounds: 0, + status: 'completed', + }, + })); + clients[1].getProducts = mock.fn(async () => { + throw new Error('Connection refused'); + }); + + const results = await collection.getProducts({ brief: 'test' }); + + assert.strictEqual(results[1].success, false); + assert.strictEqual(results[1].error, 'Connection refused'); + }); + + test('failed agent result contains agent metadata', async () => { + const collection = new AgentCollection([agent1Config, agent2Config]); + const clients = collection.getAllAgents(); + + clients[0].getProducts = mock.fn(async () => ({ + success: true, + status: 'completed', + data: { products: [] }, + metadata: { + taskId: 'task-1', + taskName: 'get_products', + agent: { id: 'agent1', name: 'Agent One', protocol: 'mcp' }, + responseTimeMs: 100, + timestamp: new Date().toISOString(), + clarificationRounds: 0, + status: 'completed', + }, + })); + clients[1].getProducts = mock.fn(async () => { + throw new Error('Agent unavailable'); + }); + + const results = await collection.getProducts({ brief: 'test' }); + + assert.ok(results[1].metadata); + assert.ok(results[1].metadata.agent); + assert.strictEqual(results[1].metadata.agent.id, 'agent2'); + assert.strictEqual(results[1].metadata.agent.name, 'Agent Two'); + assert.strictEqual(results[1].metadata.agent.protocol, 'a2a'); + assert.strictEqual(results[1].metadata.status, 'failed'); + }); + + test('handles non-Error thrown values', async () => { + const collection = new AgentCollection([agent1Config]); + const clients = collection.getAllAgents(); + + clients[0].getProducts = mock.fn(async () => { + throw 'String error message'; + }); + + const results = await collection.getProducts({ brief: 'test' }); + + assert.strictEqual(results[0].success, false); + assert.strictEqual(results[0].error, 'String error message'); + }); + + test('all agents succeed returns all successful results', async () => { + const collection = new AgentCollection([agent1Config, agent2Config]); + const clients = collection.getAllAgents(); + + const makeSuccessResult = (agentId, agentName, protocol) => ({ + success: true, + status: 'completed', + data: { products: [{ name: `Product from ${agentId}` }] }, + metadata: { + taskId: `task-${agentId}`, + taskName: 'get_products', + agent: { id: agentId, name: agentName, protocol }, + responseTimeMs: 100, + timestamp: new Date().toISOString(), + clarificationRounds: 0, + status: 'completed', + }, + }); + + clients[0].getProducts = mock.fn(async () => makeSuccessResult('agent1', 'Agent One', 'mcp')); + clients[1].getProducts = mock.fn(async () => makeSuccessResult('agent2', 'Agent Two', 'a2a')); + + const results = await collection.getProducts({ brief: 'test' }); + + assert.strictEqual(results.length, 2); + assert.strictEqual(results[0].success, true); + assert.strictEqual(results[1].success, true); + }); + + test('all agents fail returns all failure results', async () => { + const collection = new AgentCollection([agent1Config, agent2Config]); + const clients = collection.getAllAgents(); + + clients[0].getProducts = mock.fn(async () => { + throw new Error('Agent 1 error'); + }); + clients[1].getProducts = mock.fn(async () => { + throw new Error('Agent 2 error'); + }); + + const results = await collection.getProducts({ brief: 'test' }); + + assert.strictEqual(results.length, 2); + assert.strictEqual(results[0].success, false); + assert.strictEqual(results[1].success, false); + assert.strictEqual(results[0].error, 'Agent 1 error'); + assert.strictEqual(results[1].error, 'Agent 2 error'); + }); + }); + + describe('execute() returns PromiseSettledResult array', () => { + test('execute returns fulfilled result for successful operation', async () => { + const collection = new AgentCollection([agent1Config]); + const clients = collection.getAllAgents(); + + clients[0].getProducts = mock.fn(async () => ({ + success: true, + data: { products: [] }, + })); + + const results = await collection.execute(async client => { + return client.getProducts({ brief: 'test' }); + }); + + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].status, 'fulfilled'); + assert.ok('value' in results[0]); + assert.strictEqual(results[0].value.success, true); + }); + + test('execute returns rejected result for throwing operation', async () => { + const collection = new AgentCollection([agent1Config]); + const clients = collection.getAllAgents(); + + const results = await collection.execute(async () => { + throw new Error('Custom executor error'); + }); + + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].status, 'rejected'); + assert.ok('reason' in results[0]); + assert.strictEqual(results[0].reason.message, 'Custom executor error'); + }); + + test('execute returns mixed results for partial failures', async () => { + const collection = new AgentCollection([agent1Config, agent2Config]); + const clients = collection.getAllAgents(); + + let callCount = 0; + const results = await collection.execute(async client => { + callCount++; + if (client.getAgentId() === 'agent2') { + throw new Error('Agent 2 failed'); + } + return { result: 'success', agentId: client.getAgentId() }; + }); + + assert.strictEqual(results.length, 2); + + const fulfilled = results.filter(r => r.status === 'fulfilled'); + const rejected = results.filter(r => r.status === 'rejected'); + + assert.strictEqual(fulfilled.length, 1); + assert.strictEqual(rejected.length, 1); + + assert.strictEqual(fulfilled[0].value.result, 'success'); + assert.strictEqual(rejected[0].reason.message, 'Agent 2 failed'); + }); + }); +});