Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/guidellm/backends/openai/request_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,16 @@ def compile_non_streaming(
text = choice.get("text", "")
input_metrics, output_metrics = self.extract_metrics(usage, text)

return GenerationResponse(
compiled = GenerationResponse(
request_id=request.request_id,
request_args=arguments.model_dump_json(),
response_id=response.get("id"), # use vLLM ID if available
text=text,
input_metrics=input_metrics,
output_metrics=output_metrics,
)
self._validate_compiled_response(compiled)
return compiled

def add_streaming_line(self, line: str) -> int | None:
"""
Expand Down Expand Up @@ -297,14 +299,26 @@ def compile_streaming(
text = "".join(self.streaming_texts)
input_metrics, output_metrics = self.extract_metrics(self.streaming_usage, text)

return GenerationResponse(
compiled = GenerationResponse(
request_id=request.request_id,
request_args=arguments.model_dump_json(),
response_id=self.streaming_response_id, # use vLLM ID if available
text=text,
input_metrics=input_metrics,
output_metrics=output_metrics,
)
self._validate_compiled_response(compiled)
return compiled

def _validate_compiled_response(self, response: GenerationResponse) -> None:
"""Raise when endpoint produced a terminal payload with no usable output."""
has_text = bool(response.text and response.text.strip())
output_tokens = response.output_metrics.total_tokens or 0
if not has_text and output_tokens <= 0:
raise ValueError(
"[UNUSABLE_BACKEND_RESPONSE] backend resolved without a usable "
"terminal response payload"
)

def extract_line_data(self, line: str) -> dict[str, Any] | None:
"""
Expand Down Expand Up @@ -541,14 +555,16 @@ def compile_non_streaming(
text = choice.get("message", {}).get("content", "")
input_metrics, output_metrics = self.extract_metrics(usage, text)

return GenerationResponse(
compiled = GenerationResponse(
request_id=request.request_id,
request_args=arguments.model_dump_json(),
response_id=response.get("id"), # use vLLM ID if available
text=text,
input_metrics=input_metrics,
output_metrics=output_metrics,
)
self._validate_compiled_response(compiled)
return compiled

def add_streaming_line(self, line: str) -> int | None:
"""
Expand Down Expand Up @@ -591,14 +607,16 @@ def compile_streaming(
text = "".join(self.streaming_texts)
input_metrics, output_metrics = self.extract_metrics(self.streaming_usage, text)

return GenerationResponse(
compiled = GenerationResponse(
request_id=request.request_id,
request_args=arguments.model_dump_json(),
response_id=self.streaming_response_id, # use vLLM ID if available
text=text,
input_metrics=input_metrics,
output_metrics=output_metrics,
)
self._validate_compiled_response(compiled)
return compiled


@OpenAIRequestHandlerFactory.register(
Expand Down
9 changes: 1 addition & 8 deletions src/guidellm/scheduler/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,18 +397,11 @@ async def _process_next_request( # noqa: C901
async for resp, info in self.backend.resolve( # type: ignore[attr-defined]
request, request_info, history or None
):
response = resp
request_info = info
if request_info is None:
raise RuntimeError("Received invalid request info from backend")

if (
resp is None
and request_info.timings.first_token_iteration is not None
):
self._send_update("first_token", None, request, request_info)

response = resp

# Complete the request
request_info.timings.resolve_end = time.time()
self._send_update("completed", response, request, request_info)
Expand Down
103 changes: 81 additions & 22 deletions tests/unit/backends/openai/test_request_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ def test_format_ignore_eos(self, valid_instances):
10,
5,
),
({"choices": [{"text": ""}], "usage": {}}, "", None, None),
({"choices": [], "usage": {}}, "", None, None),
({}, "", None, None),
],
)
def test_non_streaming(
Expand Down Expand Up @@ -369,7 +366,6 @@ def test_non_streaming(
None,
None,
),
(["", "data: [DONE]"], "", None, None),
],
)
def test_streaming(
Expand Down Expand Up @@ -403,6 +399,47 @@ def test_streaming(
assert response.output_metrics.text_words == len(expected_text.split())
assert response.output_metrics.text_characters == len(expected_text)

@pytest.mark.regression
@pytest.mark.parametrize(
"response",
[
{"choices": [{"text": ""}], "usage": {}},
{"choices": [], "usage": {}},
{},
],
)
def test_non_streaming_raises_for_unusable_terminal_payload(
self, valid_instances, generation_request, response
):
"""Test unusable non-streaming text response raises.

### WRITTEN BY AI ###
"""
instance = valid_instances
arguments = instance.format(generation_request)

with pytest.raises(ValueError, match="UNUSABLE_BACKEND_RESPONSE"):
instance.compile_non_streaming(generation_request, arguments, response)

@pytest.mark.regression
def test_streaming_raises_for_unusable_terminal_payload(
self, valid_instances, generation_request
):
"""Test unusable streaming text response raises.

### WRITTEN BY AI ###
"""
instance = valid_instances
arguments = instance.format(generation_request)

for line in ["", "data: [DONE]"]:
result = instance.add_streaming_line(line)
if result is None:
break

with pytest.raises(ValueError, match="UNUSABLE_BACKEND_RESPONSE"):
instance.compile_streaming(generation_request, arguments)

@pytest.mark.smoke
@pytest.mark.parametrize(
("line", "expected_output"),
Expand Down Expand Up @@ -824,18 +861,6 @@ def test_format_multimodal(self, valid_instances):
10,
5,
),
(
{"choices": [{"message": {"content": ""}}], "usage": {}},
"",
None,
None,
),
(
{"choices": [], "usage": {}},
"",
None,
None,
),
],
)
def test_non_streaming(
Expand Down Expand Up @@ -889,12 +914,6 @@ def test_non_streaming(
None,
None,
),
(
["", "data: [DONE]"],
"",
None,
None,
),
],
)
def test_streaming(
Expand Down Expand Up @@ -926,6 +945,46 @@ def test_streaming(
assert response.input_metrics.text_tokens == expected_input_tokens
assert response.output_metrics.text_tokens == expected_output_tokens

@pytest.mark.regression
@pytest.mark.parametrize(
"response",
[
{"choices": [{"message": {"content": ""}}], "usage": {}},
{"choices": [], "usage": {}},
],
)
def test_non_streaming_raises_for_unusable_terminal_payload(
self, valid_instances, generation_request, response
):
"""Test unusable non-streaming chat response raises.

### WRITTEN BY AI ###
"""
instance = valid_instances
arguments = instance.format(generation_request)

with pytest.raises(ValueError, match="UNUSABLE_BACKEND_RESPONSE"):
instance.compile_non_streaming(generation_request, arguments, response)

@pytest.mark.regression
def test_streaming_raises_for_unusable_terminal_payload(
self, valid_instances, generation_request
):
"""Test unusable streaming chat response raises.

### WRITTEN BY AI ###
"""
instance = valid_instances
arguments = instance.format(generation_request)

for line in ["", "data: [DONE]"]:
result = instance.add_streaming_line(line)
if result is None:
break

with pytest.raises(ValueError, match="UNUSABLE_BACKEND_RESPONSE"):
instance.compile_streaming(generation_request, arguments)


class TestAudioRequestHandler:
"""Test cases for AudioRequestHandler.
Expand Down
Loading