Skip to content

Commit d60d06b

Browse files
committed
fix: defer parsing incomplete streamed text
1 parent 3842a5e commit d60d06b

2 files changed

Lines changed: 205 additions & 1 deletion

File tree

src/openai/lib/streaming/responses/_responses.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from typing import Any, List, Generic, Iterable, Awaitable, cast
66
from typing_extensions import Self, Callable, Iterator, AsyncIterator
77

8+
import pydantic
9+
810
from ._types import ParsedResponseSnapshot
911
from ._events import (
1012
ResponseStreamEvent,
@@ -286,7 +288,7 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven
286288
logprobs=event.logprobs,
287289
type="response.output_text.done",
288290
text=event.text,
289-
parsed=parse_text(event.text, text_format=self._text_format),
291+
parsed=self._parse_text_done_event(event.text),
290292
)
291293
)
292294
elif event.type == "response.function_call_arguments.delta":
@@ -370,3 +372,16 @@ def _create_initial_response(self, event: RawResponseStreamEvent) -> ParsedRespo
370372
raise RuntimeError(f"Expected to have received `response.created` before `{event.type}`")
371373

372374
return construct_type_unchecked(type_=ParsedResponseSnapshot, value=event.response.to_dict())
375+
376+
def _parse_text_done_event(self, text: str) -> TextFormatT | None:
377+
try:
378+
return parse_text(text, text_format=self._text_format)
379+
except pydantic.ValidationError as exc:
380+
if not _is_json_parse_error(exc):
381+
raise
382+
383+
return None
384+
385+
386+
def _is_json_parse_error(exc: pydantic.ValidationError) -> bool:
387+
return any("json" in str(error.get("type", "")).lower() for error in exc.errors())

tests/lib/responses/test_responses.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,108 @@
33
from typing_extensions import TypeVar
44

55
import pytest
6+
import pydantic
67
from respx import MockRouter
78
from inline_snapshot import snapshot
89

910
from openai import OpenAI, AsyncOpenAI
1011
from openai._utils import assert_signatures_in_sync
12+
from openai._models import construct_type_unchecked
13+
from openai.types.responses import (
14+
ResponseCreatedEvent,
15+
ResponseTextDoneEvent,
16+
ResponseCompletedEvent as RawResponseCompletedEvent,
17+
ResponseIncompleteEvent,
18+
ResponseOutputItemAddedEvent,
19+
ResponseContentPartAddedEvent,
20+
)
21+
from openai.lib.streaming.responses._responses import ResponseStreamState
1122

1223
from ...conftest import base_url
1324
from ..snapshots import make_snapshot_request
1425

1526
_T = TypeVar("_T")
1627

28+
29+
class _StructuredText(pydantic.BaseModel):
30+
answer: str
31+
32+
33+
def _response_payload(*, status: str, output: list[object] | None = None) -> dict[str, object]:
34+
return {
35+
"id": "resp_test",
36+
"object": "response",
37+
"created_at": 0,
38+
"model": "gpt-4.1",
39+
"output": output or [],
40+
"parallel_tool_calls": True,
41+
"temperature": None,
42+
"tool_choice": "auto",
43+
"tools": [],
44+
"top_p": None,
45+
"status": status,
46+
}
47+
48+
49+
def _message_payload(*, text: str, status: str) -> dict[str, object]:
50+
return {
51+
"id": "msg_test",
52+
"type": "message",
53+
"role": "assistant",
54+
"status": status,
55+
"content": [
56+
{
57+
"type": "output_text",
58+
"text": text,
59+
"annotations": [],
60+
"logprobs": [],
61+
}
62+
],
63+
}
64+
65+
66+
def _start_response_stream(state: ResponseStreamState[_StructuredText]) -> None:
67+
state.handle_event(
68+
construct_type_unchecked(
69+
type_=ResponseCreatedEvent,
70+
value={
71+
"type": "response.created",
72+
"sequence_number": 0,
73+
"response": _response_payload(status="in_progress"),
74+
},
75+
)
76+
)
77+
state.handle_event(
78+
construct_type_unchecked(
79+
type_=ResponseOutputItemAddedEvent,
80+
value={
81+
"type": "response.output_item.added",
82+
"sequence_number": 1,
83+
"output_index": 0,
84+
"item": _message_payload(text="", status="in_progress"),
85+
},
86+
)
87+
)
88+
state.handle_event(
89+
construct_type_unchecked(
90+
type_=ResponseContentPartAddedEvent,
91+
value={
92+
"type": "response.content_part.added",
93+
"sequence_number": 2,
94+
"output_index": 0,
95+
"content_index": 0,
96+
"item_id": "msg_test",
97+
"part": {
98+
"type": "output_text",
99+
"text": "",
100+
"annotations": [],
101+
"logprobs": [],
102+
},
103+
},
104+
)
105+
)
106+
107+
17108
# all the snapshots in this file are auto-generated from the live API
18109
#
19110
# you can update them with
@@ -41,6 +132,104 @@ def test_output_text(client: OpenAI, respx_mock: MockRouter) -> None:
41132
)
42133

43134

135+
def test_stream_output_text_done_defers_invalid_structured_parse() -> None:
136+
state = ResponseStreamState(text_format=_StructuredText, input_tools=[])
137+
_start_response_stream(state)
138+
139+
events = state.handle_event(
140+
construct_type_unchecked(
141+
type_=ResponseTextDoneEvent,
142+
value={
143+
"type": "response.output_text.done",
144+
"sequence_number": 3,
145+
"output_index": 0,
146+
"content_index": 0,
147+
"item_id": "msg_test",
148+
"logprobs": [],
149+
"text": '{"answer":',
150+
},
151+
)
152+
)
153+
154+
assert len(events) == 1
155+
assert events[0].type == "response.output_text.done"
156+
assert events[0].parsed is None
157+
158+
incomplete_events = state.handle_event(
159+
construct_type_unchecked(
160+
type_=ResponseIncompleteEvent,
161+
value={
162+
"type": "response.incomplete",
163+
"sequence_number": 4,
164+
"response": {
165+
**_response_payload(
166+
status="incomplete",
167+
output=[_message_payload(text='{"answer":', status="incomplete")],
168+
),
169+
"incomplete_details": {"reason": "max_output_tokens"},
170+
},
171+
},
172+
)
173+
)
174+
assert incomplete_events[0].type == "response.incomplete"
175+
176+
177+
def test_stream_output_text_done_preserves_structured_validation_errors() -> None:
178+
state = ResponseStreamState(text_format=_StructuredText, input_tools=[])
179+
_start_response_stream(state)
180+
181+
with pytest.raises(pydantic.ValidationError):
182+
state.handle_event(
183+
construct_type_unchecked(
184+
type_=ResponseTextDoneEvent,
185+
value={
186+
"type": "response.output_text.done",
187+
"sequence_number": 3,
188+
"output_index": 0,
189+
"content_index": 0,
190+
"item_id": "msg_test",
191+
"logprobs": [],
192+
"text": "{}",
193+
},
194+
)
195+
)
196+
197+
198+
def test_stream_completed_response_still_raises_invalid_structured_parse() -> None:
199+
state = ResponseStreamState(text_format=_StructuredText, input_tools=[])
200+
_start_response_stream(state)
201+
202+
state.handle_event(
203+
construct_type_unchecked(
204+
type_=ResponseTextDoneEvent,
205+
value={
206+
"type": "response.output_text.done",
207+
"sequence_number": 3,
208+
"output_index": 0,
209+
"content_index": 0,
210+
"item_id": "msg_test",
211+
"logprobs": [],
212+
"text": '{"answer":',
213+
},
214+
)
215+
)
216+
217+
with pytest.raises(pydantic.ValidationError):
218+
state.handle_event(
219+
construct_type_unchecked(
220+
type_=RawResponseCompletedEvent,
221+
value={
222+
"type": "response.completed",
223+
"sequence_number": 4,
224+
"response": _response_payload(
225+
status="completed",
226+
output=[_message_payload(text='{"answer":', status="completed")],
227+
),
228+
},
229+
)
230+
)
231+
232+
44233
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
45234
def test_stream_method_definition_in_sync(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
46235
checking_client: OpenAI | AsyncOpenAI = client if sync else async_client

0 commit comments

Comments
 (0)