Skip to content

Commit b29f6a7

Browse files
committed
feat: add AbortSignal support to Swarm, Graph, and Node
Thread AbortSignal through the multiagent orchestration layer: - Add MultiAgentOptions with signal to MultiAgent interface - Swarm checks signal.aborted before each handoff step - Graph checks signal.aborted before launching new nodes - Signal forwarded through Node.stream() → handle() → agent.stream() so running agents are cancelled - Return CANCELLED status when signal is aborted - Guard Swarm._resolveContent for empty results --- Prompt: work out how AbortSignal should work with multiagent
1 parent 0d6e760 commit b29f6a7

9 files changed

Lines changed: 102 additions & 27 deletions

File tree

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,14 @@
105105
"@aws-sdk/client-secrets-manager": "^3.943.0",
106106
"@aws-sdk/client-sts": "^3.996.0",
107107
"@aws-sdk/credential-providers": "^3.943.0",
108+
"@google/genai": "^1.40.0",
108109
"@opentelemetry/api": "^1.9.0",
109110
"@opentelemetry/exporter-metrics-otlp-http": "^0.57.2",
110111
"@opentelemetry/exporter-trace-otlp-http": "^0.57.2",
111112
"@opentelemetry/resources": "^1.30.1",
112113
"@opentelemetry/sdk-metrics": "^1.30.1",
113114
"@opentelemetry/sdk-trace-base": "^1.30.1",
114115
"@opentelemetry/sdk-trace-node": "^1.30.1",
115-
"@google/genai": "^1.40.0",
116116
"@types/express": "^5.0.6",
117117
"@types/node": "^24.6.0",
118118
"@types/uuid": "^10.0.0",
@@ -121,9 +121,9 @@
121121
"@vitest/browser": "^4.0.15",
122122
"@vitest/browser-playwright": "^4.0.15",
123123
"@vitest/coverage-v8": "^4.0.15",
124-
"express": "^5.2.1",
125124
"eslint": "^9.0.0",
126125
"eslint-plugin-tsdoc": "^0.5.0",
126+
"express": "^5.2.1",
127127
"husky": "^9.1.7",
128128
"openai": "^6.7.0",
129129
"playwright": "^1.56.1",

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,3 +240,4 @@ export { AgentMetrics } from './telemetry/meter.js'
240240
// Multi-agent orchestration
241241
export { Graph } from './multiagent/index.js'
242242
export { Swarm } from './multiagent/index.js'
243+
export type { MultiAgentOptions } from './multiagent/index.js'

src/multiagent/__tests__/graph.test.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,23 @@ describe('Graph', () => {
488488
})
489489
})
490490

491+
describe('abort signal', () => {
492+
it('stops execution when signal is already aborted', async () => {
493+
const graph = new Graph({
494+
nodes: [makeAgent('a'), makeAgent('b')],
495+
edges: [['a', 'b']],
496+
})
497+
498+
const controller = new AbortController()
499+
controller.abort()
500+
501+
const result = await graph.invoke('start', { signal: controller.signal })
502+
503+
expect(result.status).toBe(Status.CANCELLED)
504+
expect(result.results).toHaveLength(0)
505+
})
506+
})
507+
491508
describe('stream', () => {
492509
it('yields lifecycle events in correct order for single node', async () => {
493510
const graph = new Graph({

src/multiagent/__tests__/swarm.test.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,23 @@ describe('Swarm', () => {
294294
})
295295
})
296296

297+
describe('abort signal', () => {
298+
it('stops execution when signal is already aborted', async () => {
299+
const swarm = new Swarm({
300+
nodes: [createHandoffAgent('a', { agentId: 'b', message: 'go' }), createFinalAgent('b', 'done')],
301+
start: 'a',
302+
})
303+
304+
const controller = new AbortController()
305+
controller.abort()
306+
307+
const result = await swarm.invoke('start', { signal: controller.signal })
308+
309+
expect(result.status).toBe(Status.CANCELLED)
310+
expect(result.results).toHaveLength(0)
311+
})
312+
})
313+
297314
describe('stream', () => {
298315
it('yields lifecycle events in correct order for single agent', async () => {
299316
const swarm = new Swarm({

src/multiagent/graph.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { MultiAgentPluginRegistry } from './plugins.js'
1111
import type { NodeDefinition } from './nodes.js'
1212
import { AgentNode, MultiAgentNode, Node } from './nodes.js'
1313
import { MultiAgentState, MultiAgentResult, NodeResult, Status } from './state.js'
14-
import type { MultiAgent } from './multiagent.js'
14+
import type { MultiAgent, MultiAgentOptions } from './multiagent.js'
1515
import { Swarm } from './swarm.js'
1616
import type { MultiAgentStreamEvent } from './events.js'
1717
import {
@@ -143,8 +143,8 @@ export class Graph implements MultiAgent {
143143
* @param input - The input to pass to entry point nodes
144144
* @returns Promise resolving to the final MultiAgentResult
145145
*/
146-
async invoke(input: MultiAgentInput): Promise<MultiAgentResult> {
147-
const gen = this.stream(input)
146+
async invoke(input: MultiAgentInput, options?: MultiAgentOptions): Promise<MultiAgentResult> {
147+
const gen = this.stream(input, options)
148148
let next = await gen.next()
149149
while (!next.done) {
150150
next = await gen.next()
@@ -170,10 +170,13 @@ export class Graph implements MultiAgent {
170170
* @param input - The input to pass to entry nodes
171171
* @returns Async generator yielding streaming events and returning a MultiAgentResult
172172
*/
173-
async *stream(input: MultiAgentInput): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
173+
async *stream(
174+
input: MultiAgentInput,
175+
options?: MultiAgentOptions
176+
): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
174177
await this.initialize()
175178

176-
const gen = this._stream(input)
179+
const gen = this._stream(input, options?.signal)
177180
try {
178181
let next = await gen.next()
179182
while (!next.done) {
@@ -189,7 +192,10 @@ export class Graph implements MultiAgent {
189192
}
190193
}
191194

192-
private async *_stream(input: MultiAgentInput): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
195+
private async *_stream(
196+
input: MultiAgentInput,
197+
signal?: AbortSignal
198+
): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
193199
const state = new MultiAgentState({ nodeIds: [...this.nodes.keys()] })
194200

195201
const queue = new Queue()
@@ -208,13 +214,14 @@ export class Graph implements MultiAgent {
208214
let result: MultiAgentResult | undefined
209215
try {
210216
while (targets.length > 0 || streams.size > 0) {
217+
if (signal?.aborted) break
211218
while (targets.length > 0 && streams.size < this.config.maxConcurrency) {
212219
const node = targets.shift()!
213220

214221
this._checkSteps(state)
215222
state.steps++
216223

217-
streams.set(node.id, this._streamNode(node, input, state, queue, multiAgentSpan))
224+
streams.set(node.id, this._streamNode(node, input, state, queue, multiAgentSpan, signal))
218225
}
219226

220227
await queue.wait()
@@ -252,6 +259,7 @@ export class Graph implements MultiAgent {
252259
}
253260

254261
result = new MultiAgentResult({
262+
...(signal?.aborted && { status: Status.CANCELLED }),
255263
results: state.results,
256264
content: this._resolveContent(state),
257265
duration: Date.now() - state.startTime,
@@ -284,7 +292,8 @@ export class Graph implements MultiAgent {
284292
input: MultiAgentInput,
285293
state: MultiAgentState,
286294
queue: Queue,
287-
multiAgentSpan: Span | null
295+
multiAgentSpan: Span | null,
296+
signal?: AbortSignal
288297
): Promise<void> {
289298
const nodeState = state.node(node.id)!
290299

@@ -319,7 +328,9 @@ export class Graph implements MultiAgent {
319328
try {
320329
const nodeInput = this._resolveNodeInput(node, input, state)
321330

322-
const gen = this._tracer.withSpanContext(nodeSpan, () => node.stream(nodeInput, state))
331+
const gen = this._tracer.withSpanContext(nodeSpan, () =>
332+
node.stream(nodeInput, state, signal ? { signal } : undefined)
333+
)
323334
let next = await this._tracer.withSpanContext(nodeSpan, () => gen.next())
324335
while (!next.done) {
325336
await queue.send({ type: 'event', node, event: next.value })

src/multiagent/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,4 @@ export type { SwarmConfig, SwarmNodeDefinition, SwarmOptions } from './swarm.js'
4040

4141
export type { MultiAgentPlugin } from './plugins.js'
4242

43-
export type { MultiAgent, MultiAgentInput } from './multiagent.js'
43+
export type { MultiAgent, MultiAgentInput, MultiAgentOptions } from './multiagent.js'

src/multiagent/multiagent.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ import type { MultiAgentResult } from './state.js'
1111
*/
1212
export type MultiAgentInput = Exclude<InvokeArgs, Message[] | MessageData[]>
1313

14+
/**
15+
* Options for multi-agent orchestrator invocations.
16+
*/
17+
export interface MultiAgentOptions {
18+
/**
19+
* AbortSignal to cancel the orchestration.
20+
* When aborted, the orchestrator stops launching new nodes and returns
21+
* a result with status CANCELLED containing any partial results.
22+
*/
23+
signal?: AbortSignal
24+
}
25+
1426
/**
1527
* Interface for any multi-agent orchestrator that can stream execution.
1628
* Implement this interface to create custom orchestration patterns that can be
@@ -23,16 +35,18 @@ export interface MultiAgent {
2335
/**
2436
* Execute the orchestrator and return the final result.
2537
* @param input - Input to pass to the orchestrator
38+
* @param options - Optional invocation options (e.g. abort signal)
2639
* @returns The aggregate result from all executed nodes
2740
*/
28-
invoke(input: MultiAgentInput): Promise<MultiAgentResult>
41+
invoke(input: MultiAgentInput, options?: MultiAgentOptions): Promise<MultiAgentResult>
2942

3043
/**
3144
* Execute the orchestrator and stream events as they occur.
3245
* @param input - Input to pass to the orchestrator
46+
* @param options - Optional invocation options (e.g. abort signal)
3347
* @returns Async generator yielding events and returning the final result
3448
*/
35-
stream(input: MultiAgentInput): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined>
49+
stream(input: MultiAgentInput, options?: MultiAgentOptions): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined>
3650

3751
/**
3852
* Register a hook callback for a specific orchestrator event type.

src/multiagent/nodes.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ export interface NodeInputOptions {
3434
* Structured output schema for this node invocation.
3535
*/
3636
structuredOutputSchema?: z.ZodSchema
37+
/**
38+
* Optional abort signal to cancel node execution.
39+
*/
40+
signal?: AbortSignal
3741
}
3842

3943
/**
@@ -172,6 +176,7 @@ export class AgentNode extends Node {
172176
try {
173177
const invokeOptions: InvokeOptions = {
174178
...(options?.structuredOutputSchema && { structuredOutputSchema: options.structuredOutputSchema }),
179+
...(options?.signal && { signal: options.signal }),
175180
}
176181

177182
const gen = this._agent.stream(input, invokeOptions)
@@ -238,15 +243,15 @@ export class MultiAgentNode extends Node {
238243
*
239244
* @param input - Input to pass to the orchestrator
240245
* @param state - The current multi-agent state
241-
* @param _options - Per-invocation options (unused by orchestrator nodes)
246+
* @param options - Per-invocation options from the orchestrator
242247
* @returns Async generator yielding streaming events and returning the orchestrator's content
243248
*/
244249
async *handle(
245250
input: MultiAgentInput,
246251
state: MultiAgentState,
247-
_options?: NodeInputOptions
252+
options?: NodeInputOptions
248253
): AsyncGenerator<MultiAgentStreamEvent, NodeResultUpdate, undefined> {
249-
const gen = this._orchestrator.stream(input)
254+
const gen = this._orchestrator.stream(input, options?.signal ? { signal: options.signal } : undefined)
250255
let next = await gen.next()
251256
while (!next.done) {
252257
const event = next.value

src/multiagent/swarm.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { TextBlock } from '../types/messages.js'
1313
import type { AgentNodeOptions } from './nodes.js'
1414
import { AgentNode } from './nodes.js'
1515
import { MultiAgentState, MultiAgentResult, NodeResult, Status } from './state.js'
16-
import type { MultiAgent } from './multiagent.js'
16+
import type { MultiAgent, MultiAgentOptions } from './multiagent.js'
1717
import type { MultiAgentStreamEvent } from './events.js'
1818
import {
1919
AfterMultiAgentInvocationEvent,
@@ -158,8 +158,8 @@ export class Swarm implements MultiAgent {
158158
* @param input - The input to pass to the start agent
159159
* @returns Promise resolving to the final MultiAgentResult
160160
*/
161-
async invoke(input: MultiAgentInput): Promise<MultiAgentResult> {
162-
const gen = this.stream(input)
161+
async invoke(input: MultiAgentInput, options?: MultiAgentOptions): Promise<MultiAgentResult> {
162+
const gen = this.stream(input, options)
163163
let next = await gen.next()
164164
while (!next.done) {
165165
next = await gen.next()
@@ -174,10 +174,13 @@ export class Swarm implements MultiAgent {
174174
* @param input - The input to pass to the start agent
175175
* @returns Async generator yielding streaming events and returning a MultiAgentResult
176176
*/
177-
async *stream(input: MultiAgentInput): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
177+
async *stream(
178+
input: MultiAgentInput,
179+
options?: MultiAgentOptions
180+
): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
178181
await this.initialize()
179182

180-
const gen = this._stream(input)
183+
const gen = this._stream(input, options?.signal)
181184
let next = await gen.next()
182185
while (!next.done) {
183186
if (next.value instanceof HookableEvent) {
@@ -189,7 +192,10 @@ export class Swarm implements MultiAgent {
189192
return next.value
190193
}
191194

192-
private async *_stream(input: MultiAgentInput): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
195+
private async *_stream(
196+
input: MultiAgentInput,
197+
signal?: AbortSignal
198+
): AsyncGenerator<MultiAgentStreamEvent, MultiAgentResult, undefined> {
193199
const state = new MultiAgentState({
194200
nodeIds: [...this.nodes.keys()],
195201
})
@@ -209,10 +215,11 @@ export class Swarm implements MultiAgent {
209215

210216
try {
211217
while (state.steps < this.config.maxSteps) {
218+
if (signal?.aborted) break
212219
state.steps++
213220

214221
// Execute current node
215-
const nodeResult = yield* this._streamNode(node, input, state, handoff, multiAgentSpan)
222+
const nodeResult = yield* this._streamNode(node, input, state, handoff, multiAgentSpan, signal)
216223
handoff = nodeResult.structuredOutput as HandoffResult | undefined
217224
state.results.push(nodeResult)
218225

@@ -231,6 +238,7 @@ export class Swarm implements MultiAgent {
231238
this._checkSteps(state, handoff)
232239

233240
result = new MultiAgentResult({
241+
...(signal?.aborted && { status: Status.CANCELLED }),
234242
results: state.results,
235243
content: this._resolveContent(state),
236244
duration: Date.now() - state.startTime,
@@ -257,7 +265,8 @@ export class Swarm implements MultiAgent {
257265
input: MultiAgentInput,
258266
state: MultiAgentState,
259267
handoff: HandoffResult | undefined,
260-
multiAgentSpan: Span | null
268+
multiAgentSpan: Span | null,
269+
signal?: AbortSignal
261270
): AsyncGenerator<MultiAgentStreamEvent, NodeResult, undefined> {
262271
const nodeState = state.node(node.id)!
263272
const handoffSchema = this._buildHandoffSchema(node.id)
@@ -283,7 +292,7 @@ export class Swarm implements MultiAgent {
283292

284293
try {
285294
const gen = this._tracer.withSpanContext(nodeSpan, () =>
286-
node.stream(nodeInput, state, { structuredOutputSchema: handoffSchema })
295+
node.stream(nodeInput, state, { structuredOutputSchema: handoffSchema, ...(signal && { signal }) })
287296
)
288297
let next = await this._tracer.withSpanContext(nodeSpan, () => gen.next())
289298
while (!next.done) {
@@ -345,7 +354,8 @@ export class Swarm implements MultiAgent {
345354
}
346355

347356
private _resolveContent(state: MultiAgentState): ContentBlock[] {
348-
const last = state.results[state.results.length - 1]!
357+
const last = state.results[state.results.length - 1]
358+
if (!last) return []
349359
state.node(last.nodeId)!.terminus = true
350360

351361
const handoff = last.structuredOutput as HandoffResult | undefined

0 commit comments

Comments
 (0)