fix(ai): ensure that tool calls batched in a single turn always run in parallel#17623
fix(ai): ensure that tool calls batched in a single turn always run in parallel#17623cdamus wants to merge 11 commits into
Conversation
9c5d0c7 to
21d3093
Compare
|
I myself have tested this PR with:
|
…allExecutor
Agent delegations are ordinary tool calls, so whether parallel delegations
issued in a single model turn actually run concurrently was decided by each
provider's tool-call loop, and those loops were inconsistent: Anthropic and
Google ran them in parallel while the OpenAI Response API and Ollama ran them
sequentially. As a result, parallel delegations ran one-after-another on the
sequential providers.
Introduce an injectable `ToolCallExecutor` service in @theia/ai-core that
executes the tool calls of a single model turn concurrently (Promise.all) with
uniform error handling: tool-not-found and thrown handlers become error results
instead of rejections, so one failing call never short-circuits its siblings,
and results are returned in input order for deterministic message threading.
The concurrency contract ("tool calls in one turn run concurrently; the model
serializes dependent calls across turns") is documented on the service and on
ToolRequest.handler. The service is bound in DI so that adopters can rebind it.
Refactor Anthropic and Google onto the executor (no behavior change) and fix
the OpenAI Response API and Ollama providers to execute tool calls concurrently
while preserving their per-call side effects and ordering.
Part of #17533
Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
ChatResponseImpl::doAddContent attached a forwarding onDidChange listener to each newly added tool-call content but never disposed it. The stream parser clears and re-adds prior content (including a preceding tool call) on every streamed text token, so the same content accumulated one listener per token, producing "Possible Emitter memory leak detected" warnings once an agent streamed a long text response after a tool call. This surfaced via parallel sub-agent delegation, which drives the Explore agents through exactly that pattern. Register the forwarding listeners against a DisposableCollection that clearContent disposes, so that re-adding content no longer leaks listeners. Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
The OpenAI Chat Completions and Copilot providers used the OpenAI SDK's `chat.completions.runTools()` runner, whose built-in tool loop executes tool calls strictly sequentially with no parallel option. As a result, parallel agent delegations (which are tool calls) ran one after another on these providers. Replace the SDK runner with ChatCompletionStreamingAsyncIterator, which drives the raw chunk stream and a multi-turn tool loop. The tool calls of each turn are executed concurrently via the shared ToolCallExecutor; the iterator emits the same stream-response part shapes as before (open / arguments-delta / finished), threads the assistant tool_calls plus matching tool messages into the next turn, honors maxChatCompletions, and aborts the in-flight stream on cancellation. createTools now returns plain function tool definitions since the SDK runner no longer invokes the handlers. Fixes #17533 Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
21d3093 to
2084318
Compare
sdirix
left a comment
There was a problem hiding this comment.
Thank you for the great PR 🎉
- I tested all suggested models and in all of them parallel execution worked ✔️
- I like the introduction of the model factories ✔️
- Personally I would like to see them go one step further even, i.e. making the models themselves
@injectable, but this could be part of a follow up
- Personally I would like to see them go one step further even, i.e. making the models themselves
- I like that we finally get rid of the
runToolsin OpenAI ✔️
Minor improvements:
- I would remove some of the comments which are specific to the PR but not that important for the code
- I would like to see different names for the new interfaces. See my comment
I noticed an issue (in general):
Cancelling parallel tool calls breaks the chat session when using Gemini models. This can be reproduced by instructing parallel code exploration or shell executions, cancelling the session and the trying to continue using the session.
The errors look like this:
Error 400:
Function call is missing a thought_signature in functionCall parts. This is required for tools to work correctly, and missing thought_signature may lead to degraded model performance. Additional data, function call `default_api:delegateToAgent` , position 14. Please refer to https://ai.google.dev/gemini-api/docs/thought-signatures for more details.
Note that I reproduced this is in current Theia as well so it's not a regression of this PR. If you see it in scope, please fix the issue here. If not, then I can create a dedicated ticket for this issue.
|
|
||
| // Injected when resolved via DI; the initializer keeps direct instantiation (e.g. in tests) working. | ||
| @inject(ToolCallExecutor) | ||
| readonly toolCallExecutor: ToolCallExecutor = new ToolCallExecutor(); |
There was a problem hiding this comment.
Personally, I don't like code which is there just for testing and could mask real issues. Can't we set/mock/inject this field in the tests?
| const anthropicConnectionModule = ConnectionContainerModule.create(({ bind, bindBackendService, bindFrontendService }) => { | ||
| bind(AnthropicLanguageModelsManagerImpl).toSelf().inSingletonScope(); | ||
| bind(AnthropicLanguageModelsManager).toService(AnthropicLanguageModelsManagerImpl); | ||
| bind(AnthropicLanguageModelFactory).toFactory<AnthropicModel, [AnthropicModelParams]>( |
There was a problem hiding this comment.
Great change, I wanted to use a factory approach for a longer time now. Which is one of the reasons I brought up #17520.
Personally I would like if it goes a step further and makes the model itself @injectable too. The params can then be injected as an object and the tool call executor would be injected separately.
| * A single tool call collected from one model response/turn, in a provider-neutral shape. | ||
| * Each language model normalizes its own representation into this shape before execution. | ||
| */ | ||
| export interface PreparedToolCall { |
There was a problem hiding this comment.
I don't like the name that much (there is no intuitive meaning to it) and I worry about introducing yet more interfaces for the same concepts.
We then have ToolCall, PreparedToolCall, ToolCallExecutionResult, ToolCallResult, ToolInvocationContext, ToolCallExecutionOptions, ToolRequest, ToolRequestHandler etc. etc.
Just from their names its hard to guess what is used when.
Maybe the issue is that ToolCall has too many roles which is why we need another interface?
Now as I am writing this, I would like to see streamlined names and concepts for tool calling in general but this is definitely out of scope of this PR.
Maybe instead rename these new interfaces to avoid introducing yet more similar names. What do you think of renaming PreparedToolCall -> ToolInvocation and ToolCallExecutionResult -> ToolCallOutcome. To me this reads more "low-level" which is what they are used for. Also I would highlight in the JSDoc that these are the "low level" interfaces.
| return result; | ||
| } | ||
|
|
||
| /** Parameters for constructing an {@link AnthropicModel}. */ |
There was a problem hiding this comment.
| /** Parameters for constructing an {@link AnthropicModel}. */ |
| return { name: tc.name, result: handlerResult, id: tc.id, arguments: argsObject }; | ||
|
|
||
| })); | ||
| // Tool calls of a single turn are executed concurrently; see ToolCallExecutor. |
There was a problem hiding this comment.
Not really a relevant comment
| // Tool calls of a single turn are executed concurrently; see ToolCallExecutor. |
| const openAiConnectionModule = ConnectionContainerModule.create(({ bind, bindBackendService, bindFrontendService }) => { | ||
| bind(OpenAiLanguageModelsManagerImpl).toSelf().inSingletonScope(); | ||
| bind(OpenAiLanguageModelsManager).toService(OpenAiLanguageModelsManagerImpl); | ||
| // Connection-scoped because it injects the connection-scoped ToolCallExecutor. |
There was a problem hiding this comment.
Is there a specific reason the ToolCallExecutor is connection-scoped?
There was a problem hiding this comment.
All of the services that use the ToolCallExecutor are connection-scoped, perhaps because they are stateful. At present, this ToolCallExecutor implementation is not stateful, but it is replaceable by downstream applications that could substitute one that is stateful, and then its global scoping might be awkward. I suppose in that case a downstream application could unbind the global service and rebind it as connection-scoped, but then what about other global services that in the mean-time have picked up a dependency on it? It gets complicated fast.
I have no particular aversion to a global scope for this stateless utility service, so if that's preferred I can make that changed. It mostly is just following the most common pattern of the AI services, which is connection scope.
|
|
||
| protected async startIteration(): Promise<void> { | ||
| try { | ||
| while (this.iteration < this.options.maxChatCompletions && !this.cancellationRequested) { |
There was a problem hiding this comment.
I don't think we really need the maxChatCompletions anymore. We only had that because the runTools API required it. We don't use something like this for the other models, so I would get rid of it.
|
|
||
| export type DeveloperMessageSettings = 'user' | 'system' | 'developer' | 'mergeWithFollowingUserMessage' | 'skip'; | ||
|
|
||
| /** Parameters for constructing an {@link OpenAiModel}. */ |
There was a problem hiding this comment.
| /** Parameters for constructing an {@link OpenAiModel}. */ |
| stream: true, | ||
| ...settings | ||
| }); | ||
| // Drive the tool loop ourselves so that the tool calls of a single turn run concurrently. |
There was a problem hiding this comment.
| // Drive the tool loop ourselves so that the tool calls of a single turn run concurrently. |
| try { | ||
| const result = await tool.handler(toolCall.arguments, ToolInvocationContext.create(itemId)); | ||
| toolCall.result = result; | ||
| // Tool calls of a single turn are executed concurrently; see ToolCallExecutor. |
There was a problem hiding this comment.
| // Tool calls of a single turn are executed concurrently; see ToolCallExecutor. |
Remove comments in the code that don't explain anything that needed explanation. Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
Rename PreparedToolCall to ToolInvocation and ToolCallExecutionResult to ToolCallOutcome, as suggested in review. Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
Split ToolCallExecutor into a symbol & interface for substitutability. Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
The executor holds no per-connection state, so bind it once in the root container rather than per connection. Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
The cap existed only to bound the OpenAI SDK runTools runner, which the hand-rolled streaming tool loop replaced; the other providers run uncapped. Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
Make the provider language models @Injectable and bind them as transient self-services. Their runtime configuration is injected as a <provider>ModelParams object and their service dependencies are injected separately; the <provider>LanguageModelFactory now resolves the model from a child container that carries the params. Public mutable fields are seeded from the params in @PostConstruct so that the managers' patchLanguageModel/status mutations keep working. Extract OpenAiModelUtils into its own file (openai-model-utils.ts) so the model can inject it without a same-file forward-reference, per the one-class-per-file guideline. Use named loggers for new logging. This lets adopters rebind any of these services to substitute custom implementations. Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com>
|
Thanks @sdirix this was really great feedback, so far 😀 I've pushed a merge and a bunch of commits to address your comments. I've also elected to take that further step that you mentioned in refactoring of the creation of the models, to bind the default implementation classes as transient self-services with their dependencies injected. Now the default factories just resolve the model implementations in child containers, widget-style. I think though that the Gemini cancellation problem had best be raised as a separate issue; it really would be too much scope widening for this PR. Let me know what you think of these changes! |
…l-subagents Signed-off-by: Christian W. Damus <cdamus@eclipsesource.com> # Conflicts: # CHANGELOG.md
What it does
Fixes #17533.
AI sub-agent delegations are ordinary tool calls, so whether several delegations issued by a coordinating agent in a single model turn run concurrently was decided entirely by each LLM provider's tool-call execution loop. Those loops were inconsistent: Anthropic and Google already ran a turn's tool calls in parallel, while the OpenAI Chat Completions, OpenAI Response API, Copilot, and Ollama providers ran them sequentially. As a result, parallel delegations (for example the PR reviewer or Architect fanning out explorations) executed one after another on those providers.
This PR makes the contract explicit and uniform: tool calls emitted within a single model response/turn are executed concurrently, since a model serializes genuinely dependent calls across turns instead of batching them.
ToolCallExecutorservice in@theia/ai-coreas the single common strategy for executing a turn's tool calls. It runs all matched handlers concurrently, applies uniform error handling (tool-not-found and thrown handlers become error results rather than rejections, so one failure never short-circuits its siblings), forwards cancellation, and returns results in input order so providers build their tool-result messages deterministically even though execution interleaves. It is bound in the connection-scoped backend container so that downstream applications may rebind it.forloops to the executorrunTools()runner, whose tool loop is strictly sequential, with a custom streaming/tool loop (ChatCompletionStreamingAsyncIterator) for the OpenAI Chat Completions and Copilot providers, so their turns also execute tool calls concurrently.<provider>LanguageModelFactorybindings for instantiating the provider language models, so that downstream applications can substitute model implementations.ChatResponseImpl: the per-contentonDidChangeforwarding listener registered bydoAddContentwas never disposed, and the stream parser re-adds preceding content (e.g. a tool call) on every streamed text token, so a tool call followed by a long text response accumulated one listener per token. This surfaced because functional parallel delegation drives the Explore agents through exactly that pattern.Caution
Note to reviewers: the first two of these commits I consider to be quite straightforward and safe. The third, which reimplements in parallel the sequential mechanics of a third-party API that the OpenAI and Copilot providers were using, is more risky as it is quite a more complex change. So you might consider carving it off into a separate PR to merge the easier bits first. However, the original bug is not fixed without this change, because anybody using OpenAI ChatGPT model for subagent delegation will not see parallel execution without it: the AI preferences default to the Chat Completions API model implementation, which needs this more complex fix, instead of the Response API model, which relies on the simpler fix.
How to test
Select an OpenAI ChatGPT model (make sure that Response API is not enabled in the preferences) for the PR Reviewer agent and review this PR 😬 . Observe that the subagents looking at different aspects of the PR proceed not in sequence like this:
but in parallel, like this:
and in fact they may even finish out of order, like this:
Then switch the OpenAI preferences to use the Response API and repeat the experiment.
Then repeat the experiment again with as many other models as you have access to.
Also check that you never see "Possible Emitter memory leak detected" warnings in the console during heavy sub-agent streaming.
Alternatively, a more focused experiment that additionally proves the ordering out outputs
The above experiment doesn't conclusively show (necessarily) that the outputs of parallel subagents are collated in the original request/call order. The following alternative scenario does.
Again, for all models that you have access to, and Open AI with and without Response API preference:
exploreagent, and its prompt asks the model to batch delegations). Keep a fast, consistent model on theexploreagent.packages/terminal,packages/debug,packages/search-in-workspace,packages/notebook. Summarize the main services and entry points of each."Also check that you never see "Possible Emitter memory leak detected" warnings in the console during heavy sub-agent streaming.
Follow-ups
OpenAiModelUtilsremains bound in the root container; onlyOpenAiResponseApiUtilswas moved to the connection scope because it injects the now connection-scopedToolCallExecutor). Aligning the remaining stateless utils with the connection scope could be a later cleanup.HuggingFace,Llamafile, andVercel AIproviders were intentionally left out of the executor refactor because Vercel already executes a step's tools concurrently. They could adopt the same pattern later for consistency.Breaking changes
This PR introduces breaking changes and requires careful review. If yes, the breaking changes section in the changelog has been updated.
OpenAiResponseApiUtilsis now bound in the connection-scoped backend container instead of the root container, so it can no longer be injected into root servicesOpenAiLanguageModelsManagerImplno longer injectsOpenAiModelUtilsorOpenAiResponseApiUtilsand theopenAiModelUtilsandresponseApiUtilsprotected fields were removedOpenAiModel.createTools()andCopilotLanguageModel.createTools()now returnChatCompletionTool[]instead ofRunnableToolFunctionWithoutParse[]because the OpenAI SDKrunToolsrunner is no longer usedAttribution
None.
Review checklist
nlsservice (for details, please see the Internationalization/Localization section in the Coding Guidelines)Reminder for reviewers