Skip to content

Commit 31af63e

Browse files
authored
Merge branch 'main' into enhancement/add-streaming-inner-events
2 parents a41d410 + dff0548 commit 31af63e

19 files changed

+502
-70
lines changed

examples/realtime/demo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
9393
self.ui.add_transcript("Audio ended")
9494
elif event.type == "audio":
9595
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
96-
self.ui.play_audio(np_audio)
96+
# Play audio in a separate thread to avoid blocking the event loop
97+
await asyncio.to_thread(self.ui.play_audio, np_audio)
9798
elif event.type == "audio_interrupted":
9899
self.ui.add_transcript("Audio interrupted")
99100
elif event.type == "error":

examples/realtime/no_ui_demo.py

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import asyncio
2+
import queue
23
import sys
4+
import threading
5+
from typing import Any
36

47
import numpy as np
58
import sounddevice as sd
@@ -46,14 +49,77 @@ def __init__(self) -> None:
4649
self.audio_player: sd.OutputStream | None = None
4750
self.recording = False
4851

52+
# Audio output state for callback system
53+
self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=10) # Buffer more chunks
54+
self.interrupt_event = threading.Event()
55+
self.current_audio_chunk: np.ndarray | None = None # type: ignore
56+
self.chunk_position = 0
57+
58+
def _output_callback(self, outdata, frames: int, time, status) -> None:
59+
"""Callback for audio output - handles continuous audio stream from server."""
60+
if status:
61+
print(f"Output callback status: {status}")
62+
63+
# Check if we should clear the queue due to interrupt
64+
if self.interrupt_event.is_set():
65+
# Clear the queue and current chunk state
66+
while not self.output_queue.empty():
67+
try:
68+
self.output_queue.get_nowait()
69+
except queue.Empty:
70+
break
71+
self.current_audio_chunk = None
72+
self.chunk_position = 0
73+
self.interrupt_event.clear()
74+
outdata.fill(0)
75+
return
76+
77+
# Fill output buffer from queue and current chunk
78+
outdata.fill(0) # Start with silence
79+
samples_filled = 0
80+
81+
while samples_filled < len(outdata):
82+
# If we don't have a current chunk, try to get one from queue
83+
if self.current_audio_chunk is None:
84+
try:
85+
self.current_audio_chunk = self.output_queue.get_nowait()
86+
self.chunk_position = 0
87+
except queue.Empty:
88+
# No more audio data available - this causes choppiness
89+
# Uncomment next line to debug underruns:
90+
# print(f"Audio underrun: {samples_filled}/{len(outdata)} samples filled")
91+
break
92+
93+
# Copy data from current chunk to output buffer
94+
remaining_output = len(outdata) - samples_filled
95+
remaining_chunk = len(self.current_audio_chunk) - self.chunk_position
96+
samples_to_copy = min(remaining_output, remaining_chunk)
97+
98+
if samples_to_copy > 0:
99+
chunk_data = self.current_audio_chunk[
100+
self.chunk_position : self.chunk_position + samples_to_copy
101+
]
102+
# More efficient: direct assignment for mono audio instead of reshape
103+
outdata[samples_filled : samples_filled + samples_to_copy, 0] = chunk_data
104+
samples_filled += samples_to_copy
105+
self.chunk_position += samples_to_copy
106+
107+
# If we've used up the entire chunk, reset for next iteration
108+
if self.chunk_position >= len(self.current_audio_chunk):
109+
self.current_audio_chunk = None
110+
self.chunk_position = 0
111+
49112
async def run(self) -> None:
50113
print("Connecting, may take a few seconds...")
51114

52-
# Initialize audio player
115+
# Initialize audio player with callback
116+
chunk_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
53117
self.audio_player = sd.OutputStream(
54118
channels=CHANNELS,
55119
samplerate=SAMPLE_RATE,
56120
dtype=FORMAT,
121+
callback=self._output_callback,
122+
blocksize=chunk_size, # Match our chunk timing for better alignment
57123
)
58124
self.audio_player.start()
59125

@@ -146,15 +212,24 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
146212
elif event.type == "audio_end":
147213
print("Audio ended")
148214
elif event.type == "audio":
149-
# Play audio through speakers
215+
# Enqueue audio for callback-based playback
150216
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
151-
if self.audio_player:
152-
try:
153-
self.audio_player.write(np_audio)
154-
except Exception as e:
155-
print(f"Audio playback error: {e}")
217+
try:
218+
self.output_queue.put_nowait(np_audio)
219+
except queue.Full:
220+
# Queue is full - only drop if we have significant backlog
221+
# This prevents aggressive dropping that could cause choppiness
222+
if self.output_queue.qsize() > 8: # Keep some buffer
223+
try:
224+
self.output_queue.get_nowait()
225+
self.output_queue.put_nowait(np_audio)
226+
except queue.Empty:
227+
pass
228+
# If queue isn't too full, just skip this chunk to avoid blocking
156229
elif event.type == "audio_interrupted":
157230
print("Audio interrupted")
231+
# Signal the output callback to clear its queue and state
232+
self.interrupt_event.set()
158233
elif event.type == "error":
159234
print(f"Error: {event.error}")
160235
elif event.type == "history_updated":

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ requires-python = ">=3.9"
77
license = "MIT"
88
authors = [{ name = "OpenAI", email = "[email protected]" }]
99
dependencies = [
10-
"openai>=1.96.0, <2",
10+
"openai>=1.96.1, <2",
1111
"pydantic>=2.10, <3",
1212
"griffe>=1.5.6, <2",
1313
"typing-extensions>=4.12.2, <5",

src/agents/agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class Agent(AgentBase, Generic[TContext]):
158158
usable with OpenAI models, using the Responses API.
159159
"""
160160

161-
handoffs: list[Agent[Any] | Handoff[TContext]] = field(default_factory=list)
161+
handoffs: list[Agent[Any] | Handoff[TContext, Any]] = field(default_factory=list)
162162
"""Handoffs are sub-agents that the agent can delegate to. You can provide a list of handoffs,
163163
and the agent can choose to delegate to them if relevant. Allows for separation of concerns and
164164
modularity.

src/agents/guardrail.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ def decorator(
244244
return InputGuardrail(
245245
guardrail_function=f,
246246
# If not set, guardrail name uses the function’s name by default.
247-
name=name if name else f.__name__
247+
name=name if name else f.__name__,
248248
)
249249

250250
if func is not None:

src/agents/handoffs.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
from .util._types import MaybeAwaitable
1919

2020
if TYPE_CHECKING:
21-
from .agent import Agent
21+
from .agent import Agent, AgentBase
2222

2323

2424
# The handoff input type is the type of data passed when the agent is called via a handoff.
2525
THandoffInput = TypeVar("THandoffInput", default=Any)
2626

27+
# The agent type that the handoff returns
28+
TAgent = TypeVar("TAgent", bound="AgentBase[Any]", default="Agent[Any]")
29+
2730
OnHandoffWithInput = Callable[[RunContextWrapper[Any], THandoffInput], Any]
2831
OnHandoffWithoutInput = Callable[[RunContextWrapper[Any]], Any]
2932

@@ -52,7 +55,7 @@ class HandoffInputData:
5255

5356

5457
@dataclass
55-
class Handoff(Generic[TContext]):
58+
class Handoff(Generic[TContext, TAgent]):
5659
"""A handoff is when an agent delegates a task to another agent.
5760
For example, in a customer support scenario you might have a "triage agent" that determines
5861
which agent should handle the user's request, and sub-agents that specialize in different
@@ -69,7 +72,7 @@ class Handoff(Generic[TContext]):
6972
"""The JSON schema for the handoff input. Can be empty if the handoff does not take an input.
7073
"""
7174

72-
on_invoke_handoff: Callable[[RunContextWrapper[Any], str], Awaitable[Agent[TContext]]]
75+
on_invoke_handoff: Callable[[RunContextWrapper[Any], str], Awaitable[TAgent]]
7376
"""The function that invokes the handoff. The parameters passed are:
7477
1. The handoff run context
7578
2. The arguments from the LLM, as a JSON string. Empty string if input_json_schema is empty.
@@ -100,20 +103,22 @@ class Handoff(Generic[TContext]):
100103
True, as it increases the likelihood of correct JSON input.
101104
"""
102105

103-
is_enabled: bool | Callable[[RunContextWrapper[Any], Agent[Any]], MaybeAwaitable[bool]] = True
106+
is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase[Any]], MaybeAwaitable[bool]] = (
107+
True
108+
)
104109
"""Whether the handoff is enabled. Either a bool or a Callable that takes the run context and
105110
agent and returns whether the handoff is enabled. You can use this to dynamically enable/disable
106111
a handoff based on your context/state."""
107112

108-
def get_transfer_message(self, agent: Agent[Any]) -> str:
113+
def get_transfer_message(self, agent: AgentBase[Any]) -> str:
109114
return json.dumps({"assistant": agent.name})
110115

111116
@classmethod
112-
def default_tool_name(cls, agent: Agent[Any]) -> str:
117+
def default_tool_name(cls, agent: AgentBase[Any]) -> str:
113118
return _transforms.transform_string_function_style(f"transfer_to_{agent.name}")
114119

115120
@classmethod
116-
def default_tool_description(cls, agent: Agent[Any]) -> str:
121+
def default_tool_description(cls, agent: AgentBase[Any]) -> str:
117122
return (
118123
f"Handoff to the {agent.name} agent to handle the request. "
119124
f"{agent.handoff_description or ''}"
@@ -128,7 +133,7 @@ def handoff(
128133
tool_description_override: str | None = None,
129134
input_filter: Callable[[HandoffInputData], HandoffInputData] | None = None,
130135
is_enabled: bool | Callable[[RunContextWrapper[Any], Agent[Any]], MaybeAwaitable[bool]] = True,
131-
) -> Handoff[TContext]: ...
136+
) -> Handoff[TContext, Agent[TContext]]: ...
132137

133138

134139
@overload
@@ -141,7 +146,7 @@ def handoff(
141146
tool_name_override: str | None = None,
142147
input_filter: Callable[[HandoffInputData], HandoffInputData] | None = None,
143148
is_enabled: bool | Callable[[RunContextWrapper[Any], Agent[Any]], MaybeAwaitable[bool]] = True,
144-
) -> Handoff[TContext]: ...
149+
) -> Handoff[TContext, Agent[TContext]]: ...
145150

146151

147152
@overload
@@ -153,7 +158,7 @@ def handoff(
153158
tool_name_override: str | None = None,
154159
input_filter: Callable[[HandoffInputData], HandoffInputData] | None = None,
155160
is_enabled: bool | Callable[[RunContextWrapper[Any], Agent[Any]], MaybeAwaitable[bool]] = True,
156-
) -> Handoff[TContext]: ...
161+
) -> Handoff[TContext, Agent[TContext]]: ...
157162

158163

159164
def handoff(
@@ -163,8 +168,9 @@ def handoff(
163168
on_handoff: OnHandoffWithInput[THandoffInput] | OnHandoffWithoutInput | None = None,
164169
input_type: type[THandoffInput] | None = None,
165170
input_filter: Callable[[HandoffInputData], HandoffInputData] | None = None,
166-
is_enabled: bool | Callable[[RunContextWrapper[Any], Agent[Any]], MaybeAwaitable[bool]] = True,
167-
) -> Handoff[TContext]:
171+
is_enabled: bool
172+
| Callable[[RunContextWrapper[Any], Agent[TContext]], MaybeAwaitable[bool]] = True,
173+
) -> Handoff[TContext, Agent[TContext]]:
168174
"""Create a handoff from an agent.
169175
170176
Args:
@@ -202,7 +208,7 @@ def handoff(
202208

203209
async def _invoke_handoff(
204210
ctx: RunContextWrapper[Any], input_json: str | None = None
205-
) -> Agent[Any]:
211+
) -> Agent[TContext]:
206212
if input_type is not None and type_adapter is not None:
207213
if input_json is None:
208214
_error_tracing.attach_error_to_current_span(
@@ -239,12 +245,24 @@ async def _invoke_handoff(
239245
# If there is a need, we can make this configurable in the future
240246
input_json_schema = ensure_strict_json_schema(input_json_schema)
241247

248+
async def _is_enabled(ctx: RunContextWrapper[Any], agent_base: AgentBase[Any]) -> bool:
249+
from .agent import Agent
250+
251+
assert callable(is_enabled), "is_enabled must be non-null here"
252+
assert isinstance(agent_base, Agent), "Can't handoff to a non-Agent"
253+
result = is_enabled(ctx, agent_base)
254+
255+
if inspect.isawaitable(result):
256+
return await result
257+
258+
return result
259+
242260
return Handoff(
243261
tool_name=tool_name,
244262
tool_description=tool_description,
245263
input_json_schema=input_json_schema,
246264
on_invoke_handoff=_invoke_handoff,
247265
input_filter=input_filter,
248266
agent_name=agent.name,
249-
is_enabled=is_enabled,
267+
is_enabled=_is_enabled if callable(is_enabled) else is_enabled,
250268
)

src/agents/models/chatcmpl_converter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ def tool_to_openai(cls, tool: Tool) -> ChatCompletionToolParam:
484484
)
485485

486486
@classmethod
487-
def convert_handoff_tool(cls, handoff: Handoff[Any]) -> ChatCompletionToolParam:
487+
def convert_handoff_tool(cls, handoff: Handoff[Any, Any]) -> ChatCompletionToolParam:
488488
return {
489489
"type": "function",
490490
"function": {

src/agents/models/openai_responses.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ def get_response_format(
370370
def convert_tools(
371371
cls,
372372
tools: list[Tool],
373-
handoffs: list[Handoff[Any]],
373+
handoffs: list[Handoff[Any, Any]],
374374
) -> ConvertedTools:
375375
converted_tools: list[ToolParam] = []
376376
includes: list[ResponseIncludable] = []

src/agents/realtime/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
RealtimeToolEnd,
3131
RealtimeToolStart,
3232
)
33+
from .handoffs import realtime_handoff
3334
from .items import (
3435
AssistantMessageItem,
3536
AssistantText,
@@ -92,6 +93,8 @@
9293
"RealtimeAgentHooks",
9394
"RealtimeRunHooks",
9495
"RealtimeRunner",
96+
# Handoffs
97+
"realtime_handoff",
9598
# Config
9699
"RealtimeAudioFormat",
97100
"RealtimeClientMessage",

src/agents/realtime/agent.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
import dataclasses
44
import inspect
55
from collections.abc import Awaitable
6-
from dataclasses import dataclass
6+
from dataclasses import dataclass, field
77
from typing import Any, Callable, Generic, cast
88

99
from ..agent import AgentBase
10+
from ..handoffs import Handoff
1011
from ..lifecycle import AgentHooksBase, RunHooksBase
1112
from ..logger import logger
1213
from ..run_context import RunContextWrapper, TContext
@@ -53,6 +54,14 @@ class RealtimeAgent(AgentBase, Generic[TContext]):
5354
return a string.
5455
"""
5556

57+
handoffs: list[RealtimeAgent[Any] | Handoff[TContext, RealtimeAgent[Any]]] = field(
58+
default_factory=list
59+
)
60+
"""Handoffs are sub-agents that the agent can delegate to. You can provide a list of handoffs,
61+
and the agent can choose to delegate to them if relevant. Allows for separation of concerns and
62+
modularity.
63+
"""
64+
5665
hooks: RealtimeAgentHooks | None = None
5766
"""A class that receives callbacks on various lifecycle events for this agent.
5867
"""

0 commit comments

Comments
 (0)