Skip to content

Commit 74c50fd

Browse files
mjschockclaude
andcommitted
feat: add streaming HITL support and complete human-in-the-loop implementation
This commit completes the human-in-the-loop (HITL) implementation by adding full streaming support, matching the TypeScript SDK functionality. **Streaming HITL Support:** 1. **ToolApprovalItem Handling** (_run_impl.py:67, 1282-1284) - Added ToolApprovalItem to imports - Handle ToolApprovalItem in stream_step_items_to_queue - Prevents "Unexpected item type" errors during streaming 2. **NextStepInterruption in Streaming** (run.py:1222-1226) - Added NextStepInterruption case in streaming turn loop - Sets interruptions and completes stream when approval needed - Matches non-streaming interruption handling 3. **RunState Support in run_streamed** (run.py:890-905) - Added full RunState input handling - Restores context wrapper from RunState - Enables streaming resumption after approval 4. **Streaming Tool Execution** (run.py:1044-1101) - Added run_state parameter to _start_streaming - Execute approved tools when resuming from interruption - Created _execute_approved_tools instance method - Created _execute_approved_tools_static classmethod for streaming 5. **RunResultStreaming.to_state()** (result.py:401-451) - Added to_state() method to RunResultStreaming - Enables state serialization from streaming results - Includes current_turn for proper state restoration - Complete parity with non-streaming RunResult.to_state() **RunState Enhancements:** 6. **Runtime Imports** (run_state.py:108, 238, 369, 461) - Added runtime imports for NextStepInterruption - Fixes NameError when serializing/deserializing interruptions - Keeps TYPE_CHECKING imports for type hints 7. **from_json() Method** (run_state.py:385-475) - Added from_json() static method for dict deserialization - Complements existing from_string() method - Matches TypeScript API: to_json() / from_json() **Examples:** 8. **human_in_the_loop.py** (examples/agent_patterns/) - Complete non-streaming HITL example - Demonstrates state serialization to JSON file - Shows approve/reject workflow with while loop - Matches TypeScript non-streaming example behavior 9. **human_in_the_loop_stream.py** (examples/agent_patterns/) - Complete streaming HITL example - Uses Runner.run_streamed() for streaming output - Shows streaming with interruption handling - Updated docstring to reflect streaming support - Includes while loop for rejection handling - Matches TypeScript streaming example behavior **Key Design Decisions:** - Kept _start_streaming as @classmethod (existing pattern) - Separate instance/classmethod for tool execution (additive only) - No breaking changes to existing functionality - Complete API parity with TypeScript SDK - Rejection returns error message to LLM for retry - While loops in examples handle rejection/retry flow **Testing:** - ✅ Streaming HITL: interruption, approval, resumption - ✅ Non-streaming HITL: interruption, approval, resumption - ✅ State serialization: to_json() / from_json() - ✅ Tool rejection: message returned, retry possible - ✅ Examples: both streaming and non-streaming work - ✅ Code quality: ruff format, ruff check, mypy pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 3eb0228 commit 74c50fd

File tree

7 files changed

+605
-31
lines changed

7 files changed

+605
-31
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""Human-in-the-loop example with tool approval.
2+
3+
This example demonstrates how to:
4+
1. Define tools that require approval before execution
5+
2. Handle interruptions when tool approval is needed
6+
3. Serialize/deserialize run state to continue execution later
7+
4. Approve or reject tool calls based on user input
8+
"""
9+
10+
import asyncio
11+
import json
12+
13+
from agents import Agent, Runner, RunState, function_tool
14+
15+
16+
@function_tool
17+
async def get_weather(city: str) -> str:
18+
"""Get the weather for a given city.
19+
20+
Args:
21+
city: The city to get weather for.
22+
23+
Returns:
24+
Weather information for the city.
25+
"""
26+
return f"The weather in {city} is sunny"
27+
28+
29+
async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
30+
"""Check if temperature tool needs approval."""
31+
return "Oakland" in params.get("city", "")
32+
33+
34+
@function_tool(
35+
# Dynamic approval: only require approval for Oakland
36+
needs_approval=_needs_temperature_approval
37+
)
38+
async def get_temperature(city: str) -> str:
39+
"""Get the temperature for a given city.
40+
41+
Args:
42+
city: The city to get temperature for.
43+
44+
Returns:
45+
Temperature information for the city.
46+
"""
47+
return f"The temperature in {city} is 20° Celsius"
48+
49+
50+
# Main agent with tool that requires approval
51+
agent = Agent(
52+
name="Weather Assistant",
53+
instructions=(
54+
"You are a helpful weather assistant. "
55+
"Answer questions about weather and temperature using the available tools."
56+
),
57+
tools=[get_weather, get_temperature],
58+
)
59+
60+
61+
async def confirm(question: str) -> bool:
62+
"""Prompt user for yes/no confirmation.
63+
64+
Args:
65+
question: The question to ask.
66+
67+
Returns:
68+
True if user confirms, False otherwise.
69+
"""
70+
# Note: In a real application, you would use proper async input
71+
# For now, using synchronous input with run_in_executor
72+
loop = asyncio.get_event_loop()
73+
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
74+
normalized = answer.strip().lower()
75+
return normalized in ("y", "yes")
76+
77+
78+
async def main():
79+
"""Run the human-in-the-loop example."""
80+
result = await Runner.run(
81+
agent,
82+
"What is the weather and temperature in Oakland?",
83+
)
84+
85+
has_interruptions = len(result.interruptions) > 0
86+
87+
while has_interruptions:
88+
print("\n" + "=" * 80)
89+
print("Run interrupted - tool approval required")
90+
print("=" * 80)
91+
92+
# Storing state to file (demonstrating serialization)
93+
state = result.to_state()
94+
state_json = state.to_json()
95+
with open("result.json", "w") as f:
96+
json.dump(state_json, f, indent=2)
97+
98+
print("State saved to result.json")
99+
100+
# From here on you could run things on a different thread/process
101+
102+
# Reading state from file (demonstrating deserialization)
103+
print("Loading state from result.json")
104+
with open("result.json", "r") as f:
105+
stored_state_json = json.load(f)
106+
107+
state = RunState.from_json(agent, stored_state_json)
108+
109+
# Process each interruption
110+
for interruption in result.interruptions:
111+
print(f"\nTool call details:")
112+
print(f" Agent: {interruption.agent.name}")
113+
print(f" Tool: {interruption.raw_item.name}") # type: ignore
114+
print(f" Arguments: {interruption.raw_item.arguments}") # type: ignore
115+
116+
confirmed = await confirm("\nDo you approve this tool call?")
117+
118+
if confirmed:
119+
print(f"✓ Approved: {interruption.raw_item.name}")
120+
state.approve(interruption)
121+
else:
122+
print(f"✗ Rejected: {interruption.raw_item.name}")
123+
state.reject(interruption)
124+
125+
# Resume execution with the updated state
126+
print("\nResuming agent execution...")
127+
result = await Runner.run(agent, state)
128+
has_interruptions = len(result.interruptions) > 0
129+
130+
print("\n" + "=" * 80)
131+
print("Final Output:")
132+
print("=" * 80)
133+
print(result.final_output)
134+
135+
136+
if __name__ == "__main__":
137+
asyncio.run(main())
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
"""Human-in-the-loop example with streaming.
2+
3+
This example demonstrates the human-in-the-loop (HITL) pattern with streaming.
4+
The agent will pause execution when a tool requiring approval is called,
5+
allowing you to approve or reject the tool call before continuing.
6+
7+
The streaming version provides real-time feedback as the agent processes
8+
the request, then pauses for approval when needed.
9+
"""
10+
11+
import asyncio
12+
13+
from agents import Agent, Runner, function_tool
14+
15+
16+
async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
17+
"""Check if temperature tool needs approval."""
18+
return "Oakland" in params.get("city", "")
19+
20+
21+
@function_tool(
22+
# Dynamic approval: only require approval for Oakland
23+
needs_approval=_needs_temperature_approval
24+
)
25+
async def get_temperature(city: str) -> str:
26+
"""Get the temperature for a given city.
27+
28+
Args:
29+
city: The city to get temperature for.
30+
31+
Returns:
32+
Temperature information for the city.
33+
"""
34+
return f"The temperature in {city} is 20° Celsius"
35+
36+
37+
@function_tool
38+
async def get_weather(city: str) -> str:
39+
"""Get the weather for a given city.
40+
41+
Args:
42+
city: The city to get weather for.
43+
44+
Returns:
45+
Weather information for the city.
46+
"""
47+
return f"The weather in {city} is sunny."
48+
49+
50+
async def confirm(question: str) -> bool:
51+
"""Prompt user for yes/no confirmation.
52+
53+
Args:
54+
question: The question to ask.
55+
56+
Returns:
57+
True if user confirms, False otherwise.
58+
"""
59+
loop = asyncio.get_event_loop()
60+
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
61+
return answer.strip().lower() in ["y", "yes"]
62+
63+
64+
async def main():
65+
"""Run the human-in-the-loop example."""
66+
main_agent = Agent(
67+
name="Weather Assistant",
68+
instructions=(
69+
"You are a helpful weather assistant. "
70+
"Answer questions about weather and temperature using the available tools."
71+
),
72+
tools=[get_temperature, get_weather],
73+
)
74+
75+
# Run the agent with streaming
76+
result = Runner.run_streamed(
77+
main_agent,
78+
"What is the weather and temperature in Oakland?",
79+
)
80+
async for _ in result.stream_events():
81+
pass # Process streaming events silently or could print them
82+
83+
# Handle interruptions
84+
while len(result.interruptions) > 0:
85+
print("\n" + "=" * 80)
86+
print("Human-in-the-loop: approval required for the following tool calls:")
87+
print("=" * 80)
88+
89+
state = result.to_state()
90+
91+
for interruption in result.interruptions:
92+
print(f"\nTool call details:")
93+
print(f" Agent: {interruption.agent.name}")
94+
print(f" Tool: {interruption.raw_item.name}") # type: ignore
95+
print(f" Arguments: {interruption.raw_item.arguments}") # type: ignore
96+
97+
confirmed = await confirm("\nDo you approve this tool call?")
98+
99+
if confirmed:
100+
print(f"✓ Approved: {interruption.raw_item.name}")
101+
state.approve(interruption)
102+
else:
103+
print(f"✗ Rejected: {interruption.raw_item.name}")
104+
state.reject(interruption)
105+
106+
# Resume execution with streaming
107+
print("\nResuming agent execution...")
108+
result = Runner.run_streamed(main_agent, state)
109+
async for _ in result.stream_events():
110+
pass # Process streaming events silently or could print them
111+
112+
print("\n" + "=" * 80)
113+
print("Final Output:")
114+
print("=" * 80)
115+
print(result.final_output)
116+
print("\nDone!")
117+
118+
119+
if __name__ == "__main__":
120+
asyncio.run(main())

src/agents/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
from .result import RunResult, RunResultStreaming
6262
from .run import RunConfig, Runner
6363
from .run_context import RunContextWrapper, TContext
64-
from .run_state import NextStepInterruption, RunState
64+
from .run_state import RunState
6565
from .stream_events import (
6666
AgentUpdatedStreamEvent,
6767
RawResponsesStreamEvent,

src/agents/_run_impl.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
ModelResponse,
6565
ReasoningItem,
6666
RunItem,
67+
ToolApprovalItem,
6768
ToolCallItem,
6869
ToolCallOutputItem,
6970
TResponseInputItem,
@@ -922,18 +923,25 @@ async def run_single_tool(
922923

923924
results = await asyncio.gather(*tasks)
924925

925-
function_tool_results = [
926-
FunctionToolResult(
927-
tool=tool_run.function_tool,
928-
output=result,
929-
run_item=ToolCallOutputItem(
930-
output=result,
931-
raw_item=ItemHelpers.tool_call_output_item(tool_run.tool_call, result),
932-
agent=agent,
933-
),
934-
)
935-
for tool_run, result in zip(tool_runs, results)
936-
]
926+
function_tool_results = []
927+
for tool_run, result in zip(tool_runs, results):
928+
# If result is already a FunctionToolResult (e.g., from approval interruption),
929+
# use it directly instead of wrapping it
930+
if isinstance(result, FunctionToolResult):
931+
function_tool_results.append(result)
932+
else:
933+
# Normal case: wrap the result in a FunctionToolResult
934+
function_tool_results.append(
935+
FunctionToolResult(
936+
tool=tool_run.function_tool,
937+
output=result,
938+
run_item=ToolCallOutputItem(
939+
output=result,
940+
raw_item=ItemHelpers.tool_call_output_item(tool_run.tool_call, result),
941+
agent=agent,
942+
),
943+
)
944+
)
937945

938946
return function_tool_results, tool_input_guardrail_results, tool_output_guardrail_results
939947

@@ -1272,6 +1280,9 @@ def stream_step_items_to_queue(
12721280
event = RunItemStreamEvent(item=item, name="mcp_approval_response")
12731281
elif isinstance(item, MCPListToolsItem):
12741282
event = RunItemStreamEvent(item=item, name="mcp_list_tools")
1283+
elif isinstance(item, ToolApprovalItem):
1284+
# Tool approval items should not be streamed - they represent interruptions
1285+
event = None
12751286

12761287
else:
12771288
logger.warning(f"Unexpected item type: {type(item)}")

src/agents/result.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def to_state(self) -> Any:
146146
result = await Runner.run(agent, state)
147147
```
148148
"""
149+
from ._run_impl import NextStepInterruption
149150
from .run_state import RunState
150151

151152
# Create a RunState from the current result
@@ -162,6 +163,10 @@ def to_state(self) -> Any:
162163
state._input_guardrail_results = self.input_guardrail_results
163164
state._output_guardrail_results = self.output_guardrail_results
164165

166+
# If there are interruptions, set the current step
167+
if self.interruptions:
168+
state._current_step = NextStepInterruption(interruptions=self.interruptions)
169+
165170
return state
166171

167172
def __str__(self) -> str:
@@ -392,3 +397,55 @@ async def _await_task_safely(self, task: asyncio.Task[Any] | None) -> None:
392397
except Exception:
393398
# The exception will be surfaced via _check_errors() if needed.
394399
pass
400+
401+
def to_state(self) -> Any:
402+
"""Create a RunState from this streaming result to resume execution.
403+
404+
This is useful when the run was interrupted (e.g., for tool approval). You can
405+
approve or reject the tool calls on the returned state, then pass it back to
406+
`Runner.run_streamed()` to continue execution.
407+
408+
Returns:
409+
A RunState that can be used to resume the run.
410+
411+
Example:
412+
```python
413+
# Run agent until it needs approval
414+
result = Runner.run_streamed(agent, "Use the delete_file tool")
415+
async for event in result.stream_events():
416+
pass
417+
418+
if result.interruptions:
419+
# Approve the tool call
420+
state = result.to_state()
421+
state.approve(result.interruptions[0])
422+
423+
# Resume the run
424+
result = Runner.run_streamed(agent, state)
425+
async for event in result.stream_events():
426+
pass
427+
```
428+
"""
429+
from ._run_impl import NextStepInterruption
430+
from .run_state import RunState
431+
432+
# Create a RunState from the current result
433+
state = RunState(
434+
context=self.context_wrapper,
435+
original_input=self.input,
436+
starting_agent=self.last_agent,
437+
max_turns=self.max_turns,
438+
)
439+
440+
# Populate the state with data from the result
441+
state._generated_items = self.new_items
442+
state._model_responses = self.raw_responses
443+
state._input_guardrail_results = self.input_guardrail_results
444+
state._output_guardrail_results = self.output_guardrail_results
445+
state._current_turn = self.current_turn
446+
447+
# If there are interruptions, set the current step
448+
if self.interruptions:
449+
state._current_step = NextStepInterruption(interruptions=self.interruptions)
450+
451+
return state

0 commit comments

Comments
 (0)