Advance to next agent without waiting for user input #3211
-
|
This is a follow up to #3204 . I was able to get the context to persist. My issue now is that when an agent finishes, I would like to have the next agent start automatically. Currently, it requires the user to say something, like "ok" to nudge it forward. My orchestrator code is as follows: class ControlWorkflowAgent(BaseAgent):
"""
An orchestrator agent that manages the workflow between conversational sub-agents.
It inspects the final output of each sub-agent for a completion signal
and updates the session state to advance the pipeline.
"""
output_key: str = "workflow_status"
def __init__(self, agents: List[BaseAgent]):
super().__init__(
name="control_workflow_agent",
description="An orchestrator for a series of conversational agents.",
sub_agents=agents
)
async def _run_async_impl(self, ctx: InvocationContext):
# Get the current step from session state, defaulting to 0
current_agent_index = ctx.session.state.get("current_agent_index", 0)
logger.info("[WORKFLOW] Current agent index: %s", current_agent_index)
# Check if the pipeline is complete
if current_agent_index >= len(self.sub_agents):
logger.info("[WORKFLOW] Pipeline is complete.")
yield Event(author=self.name, content=Content(parts=[Part(text="The workflow is complete.")]))
else:
active_agent = self.sub_agents[current_agent_index]
logger.info("[WORKFLOW] Running agent: %s", active_agent.name)
async for event in active_agent.run_async(ctx):
if event.is_final_response():
logger.info(f"[WORKFLOW] Event.is_final_response() set to {event.is_final_response()}, checking the termination signal in the content.")
if not event.content or not event.content.parts:
continue
text = "".join(p.text for p in event.content.parts if p.text)
if active_agent.name + "_complete" in text:
logger.info(f"[WORKFLOW] Termination signal found in the content. Advancing to next agent.")
event.actions = EventActions(state_delta={"current_agent_index": current_agent_index + 1})
else:
logger.info(f"[WORKFLOW] Termination signal not found in the content. Continuing to next event.")
yield eventline current_agent_index = ctx.session.state.get("current_agent_index", 0)
...
async for event in active_agent.run_async(ctx): |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
|
@adk-bot Any thoughts on this? How does the ADK event loop decide that it needs to keep working instead of asking the user for an input? |
Beta Was this translation helpful? Give feedback.
-
|
Response from ADK Answering Agent (experimental, answer may be inaccurate) TLDR: To automatically advance to the next agent, your orchestrator should use a Hello! I can see you've put a lot of thought into your When your To achieve automatic advancement, the orchestrator needs to manage the entire sequence of agents within a single invocation. You can do this by implementing a Here is a revised version of your async def _run_async_impl(self, ctx: InvocationContext):
# Get the current step from session state, defaulting to 0
current_agent_index = ctx.session.state.get("current_agent_index", 0)
# Loop through the agent pipeline as long as there are agents to run
while current_agent_index < len(self.sub_agents):
active_agent = self.sub_agents[current_agent_index]
logger.info(f"[WORKFLOW] Running agent: {active_agent.name} at index {current_agent_index}")
final_text = ""
# Stream events from the active sub-agent
async for event in active_agent.run_async(ctx):
# Pass sub-agent events through to the user
yield event
# Capture the text from the final response to check for the completion signal
if event.is_final_response() and event.content:
final_text = "".join(p.text for p in event.content.parts if p.text)
# Check for the completion signal after the sub-agent has finished
if active_agent.name + "_complete" in final_text:
logger.info(f"Termination signal found for {active_agent.name}. Advancing to the next agent.")
current_agent_index += 1
# Yield a status update event with the new index before the next loop iteration.
# This updates the session state and can also provide a transitional message.
yield Event(
author=self.name,
actions=EventActions(state_delta={"current_agent_index": current_agent_index})
)
else:
logger.info("No termination signal found. The workflow will pause and wait for the next user input.")
# If the signal is not found, break the loop and end the orchestration for this turn.
break
# After the loop finishes, check if the entire pipeline is complete
if current_agent_index >= len(self.sub_agents):
logger.info("[WORKFLOW] Pipeline is complete.")
yield Event(author=self.name, content=Content(parts=[Part(text="The workflow is complete.")]))By using this |
Beta Was this translation helpful? Give feedback.
Response from ADK Answering Agent (experimental, answer may be inaccurate)
TLDR: To automatically advance to the next agent, your orchestrator should use a
whileloop within its own_run_async_implmethod. This keeps the orchestrator in control of the workflow and allows it to call sub-agents sequentially in a single run, rather than ending its execution and waiting for a new user prompt.Hello! I can see you've put a lot of thought into your
ControlWorkflowAgent. Your current approach of updating the state when an agent completes is correct, but the issue you're facing is a matter of control flow.When your
async for event in active_agent.run_async(ctx):loop finishes, the_run_async_impl…