Skip to content

Commit 54bee7b

Browse files
Jacksunweicopybara-github
authored andcommitted
feat: Supports stateDelta in ADK Java AgentRunRequest for ADK web server
PiperOrigin-RevId: 816053257
1 parent 9bf9c9f commit 54bee7b

File tree

7 files changed

+425
-64
lines changed

7 files changed

+425
-64
lines changed

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 112 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.adk.agents.RunConfig;
2626
import com.google.adk.artifacts.BaseArtifactService;
2727
import com.google.adk.events.Event;
28+
import com.google.adk.events.EventActions;
2829
import com.google.adk.memory.BaseMemoryService;
2930
import com.google.adk.plugins.BasePlugin;
3031
import com.google.adk.plugins.PluginManager;
@@ -50,7 +51,9 @@
5051
import java.util.ArrayList;
5152
import java.util.Collections;
5253
import java.util.List;
54+
import java.util.Map;
5355
import java.util.Optional;
56+
import java.util.concurrent.ConcurrentHashMap;
5457
import javax.annotation.Nullable;
5558

5659
/** The main class for the GenAI Agents runner. */
@@ -176,56 +179,66 @@ private Single<Event> appendNewMessageToSession(
176179
return this.sessionService.appendEvent(session, event);
177180
}
178181

182+
/** See {@link #runAsync(String, String, Content, RunConfig, Map)}. */
183+
public Flowable<Event> runAsync(
184+
String userId, String sessionId, Content newMessage, RunConfig runConfig) {
185+
return runAsync(userId, sessionId, newMessage, runConfig, /* stateDelta= */ null);
186+
}
187+
179188
/**
180-
* Runs the agent in the standard mode.
189+
* Runs the agent with an invocation-based mode.
190+
*
191+
* <p>TODO: make this the main implementation.
181192
*
182193
* @param userId The ID of the user for the session.
183194
* @param sessionId The ID of the session to run the agent in.
184195
* @param newMessage The new message from the user to process.
185196
* @param runConfig Configuration for the agent run.
197+
* @param stateDelta Optional map of state updates to merge into the session for this run.
186198
* @return A Flowable stream of {@link Event} objects generated by the agent during execution.
187199
*/
188200
public Flowable<Event> runAsync(
189-
String userId, String sessionId, Content newMessage, RunConfig runConfig) {
201+
String userId,
202+
String sessionId,
203+
Content newMessage,
204+
RunConfig runConfig,
205+
@Nullable Map<String, Object> stateDelta) {
190206
Maybe<Session> maybeSession =
191207
this.sessionService.getSession(appName, userId, sessionId, Optional.empty());
192208
return maybeSession
193209
.switchIfEmpty(
194210
Single.error(
195211
new IllegalArgumentException(
196212
String.format("Session not found: %s for user %s", sessionId, userId))))
197-
.flatMapPublisher(session -> this.runAsync(session, newMessage, runConfig));
213+
.flatMapPublisher(session -> this.runAsync(session, newMessage, runConfig, stateDelta));
198214
}
199215

200-
/**
201-
* Asynchronously runs the agent for a given user and session, processing a new message and using
202-
* a default {@link RunConfig}.
203-
*
204-
* <p>This method initiates an agent execution within the specified session, appending the
205-
* provided new message to the session's history. It utilizes a default {@code RunConfig} to
206-
* control execution parameters. The method returns a stream of {@link Event} objects representing
207-
* the agent's activity during the run.
208-
*
209-
* @param userId The ID of the user initiating the session.
210-
* @param sessionId The ID of the session in which the agent will run.
211-
* @param newMessage The new {@link Content} message to be processed by the agent.
212-
* @return A {@link Flowable} emitting {@link Event} objects generated by the agent.
213-
*/
216+
/** See {@link #runAsync(String, String, Content, RunConfig, Map)}. */
214217
public Flowable<Event> runAsync(String userId, String sessionId, Content newMessage) {
215218
return runAsync(userId, sessionId, newMessage, RunConfig.builder().build());
216219
}
217220

221+
/** See {@link #runAsync(Session, Content, RunConfig, Map)}. */
222+
public Flowable<Event> runAsync(Session session, Content newMessage, RunConfig runConfig) {
223+
return runAsync(session, newMessage, runConfig, /* stateDelta= */ null);
224+
}
225+
218226
/**
219-
* Runs the agent in the standard mode using a provided Session object.
227+
* Runs the agent asynchronously using a provided Session object.
220228
*
221229
* @param session The session to run the agent in.
222230
* @param newMessage The new message from the user to process.
223231
* @param runConfig Configuration for the agent run.
232+
* @param stateDelta Optional map of state updates to merge into the session for this run.
224233
* @return A Flowable stream of {@link Event} objects generated by the agent during execution.
225234
*/
226-
public Flowable<Event> runAsync(Session session, Content newMessage, RunConfig runConfig) {
235+
public Flowable<Event> runAsync(
236+
Session session,
237+
Content newMessage,
238+
RunConfig runConfig,
239+
@Nullable Map<String, Object> stateDelta) {
227240
Span span = Telemetry.getTracer().spanBuilder("invocation").startSpan();
228-
try (Scope scope = span.makeCurrent()) {
241+
try (Scope unusedScope = span.makeCurrent()) {
229242
BaseAgent rootAgent = this.agent;
230243
InvocationContext context =
231244
newInvocationContext(
@@ -234,6 +247,12 @@ public Flowable<Event> runAsync(Session session, Content newMessage, RunConfig r
234247
/* liveRequestQueue= */ Optional.empty(),
235248
runConfig);
236249

250+
// Emit state delta event if provided, using the same invocation ID
251+
Single<Session> sessionSingle =
252+
(stateDelta != null && !stateDelta.isEmpty())
253+
? emitStateDeltaEvent(session, stateDelta, context.invocationId())
254+
: Single.just(session);
255+
237256
Maybe<Event> beforeRunEvent =
238257
this.pluginManager
239258
.runBeforeRunCallback(context)
@@ -247,42 +266,49 @@ public Flowable<Event> runAsync(Session session, Content newMessage, RunConfig r
247266
.build());
248267

249268
Flowable<Event> agentEvents =
250-
Flowable.defer(
251-
() ->
252-
this.pluginManager
253-
.runOnUserMessageCallback(context, newMessage)
254-
.switchIfEmpty(Single.just(newMessage))
255-
.flatMap(
256-
content ->
257-
(content != null)
258-
? appendNewMessageToSession(
259-
session,
260-
content,
261-
context,
262-
runConfig.saveInputBlobsAsArtifacts())
263-
: Single.just(null))
264-
.flatMapPublisher(
265-
event -> {
266-
InvocationContext contextWithNewMessage =
267-
newInvocationContext(
268-
session, event.content(), Optional.empty(), runConfig);
269-
contextWithNewMessage.agent(this.findAgentToRun(session, rootAgent));
270-
return contextWithNewMessage
271-
.agent()
272-
.runAsync(contextWithNewMessage)
273-
.flatMap(
274-
agentEvent ->
275-
this.sessionService
276-
.appendEvent(session, agentEvent)
277-
.flatMap(
278-
registeredEvent ->
279-
contextWithNewMessage
280-
.pluginManager()
281-
.runOnEventCallback(
282-
contextWithNewMessage, registeredEvent)
283-
.defaultIfEmpty(registeredEvent))
284-
.toFlowable());
285-
}));
269+
sessionSingle.flatMapPublisher(
270+
updatedSession ->
271+
Flowable.defer(
272+
() ->
273+
this.pluginManager
274+
.runOnUserMessageCallback(context, newMessage)
275+
.switchIfEmpty(Single.just(newMessage))
276+
.flatMap(
277+
content ->
278+
(content != null)
279+
? appendNewMessageToSession(
280+
updatedSession,
281+
content,
282+
context,
283+
runConfig.saveInputBlobsAsArtifacts())
284+
: Single.just(null))
285+
.flatMapPublisher(
286+
event -> {
287+
InvocationContext contextWithNewMessage =
288+
newInvocationContext(
289+
updatedSession,
290+
event.content(),
291+
Optional.empty(),
292+
runConfig);
293+
contextWithNewMessage.agent(
294+
this.findAgentToRun(updatedSession, rootAgent));
295+
return contextWithNewMessage
296+
.agent()
297+
.runAsync(contextWithNewMessage)
298+
.flatMap(
299+
agentEvent ->
300+
this.sessionService
301+
.appendEvent(updatedSession, agentEvent)
302+
.flatMap(
303+
registeredEvent ->
304+
contextWithNewMessage
305+
.pluginManager()
306+
.runOnEventCallback(
307+
contextWithNewMessage,
308+
registeredEvent)
309+
.defaultIfEmpty(registeredEvent))
310+
.toFlowable());
311+
})));
286312

287313
return beforeRunEvent
288314
.toFlowable()
@@ -302,6 +328,36 @@ public Flowable<Event> runAsync(Session session, Content newMessage, RunConfig r
302328
}
303329
}
304330

331+
/**
332+
* Emits a state update event and returns the updated session.
333+
*
334+
* @param session The session to update.
335+
* @param stateDelta The state delta to apply.
336+
* @param invocationId The invocation ID to use for the state delta event.
337+
* @return Single emitting the updated session after applying the state delta.
338+
*/
339+
private Single<Session> emitStateDeltaEvent(
340+
Session session, Map<String, Object> stateDelta, String invocationId) {
341+
ConcurrentHashMap<String, Object> deltaMap = new ConcurrentHashMap<>(stateDelta);
342+
343+
Event stateEvent =
344+
Event.builder()
345+
.id(Event.generateEventId())
346+
.invocationId(invocationId)
347+
.author("user")
348+
.actions(EventActions.builder().stateDelta(deltaMap).build())
349+
.timestamp(System.currentTimeMillis())
350+
.build();
351+
352+
return this.sessionService
353+
.appendEvent(session, stateEvent)
354+
.flatMap(
355+
event ->
356+
this.sessionService
357+
.getSession(session.appName(), session.userId(), session.id(), Optional.empty())
358+
.switchIfEmpty(Single.error(new IllegalStateException("Session not found"))));
359+
}
360+
305361
/**
306362
* Creates an {@link InvocationContext} for a live (streaming) run.
307363
*

0 commit comments

Comments
 (0)