-
Notifications
You must be signed in to change notification settings - Fork 350
WIP: streaming + claude reasoning fix #1073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
0b6fb3a
Proof of concept streaming API. WIP
scosman 9642085
add integration test for streaming and fix Claude model reasoning
leonardmq c57d241
fix: merge conflicts
leonardmq 06d24fd
not enforce thinking because adaptive
leonardmq ca436de
fix: test due to run config properties type union
leonardmq 36891dd
refactor: make the thinking claude models into separate ones to avoid…
leonardmq 2d44c56
Merge branch 'leonard/kil-428-make-autosave_runs-default-false-and-in…
leonardmq 06706c4
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-420-a…
leonardmq 944fb4e
fix: dependencies
leonardmq a7442e6
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-420-a…
leonardmq c520e27
fix: pin uv tools in CI and checks
leonardmq 1eb0fe6
fix: pinned uv run in another workflow and mcp hooks
leonardmq 737b57b
Merge branch 'leonard/kil-442-fix-pin-uv-tools-in-ci-build-is-broken'…
leonardmq File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
60 changes: 60 additions & 0 deletions
60
libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Any, AsyncIterator, Optional, Union | ||
|
|
||
| import litellm | ||
| from litellm.types.utils import ( | ||
| ModelResponse, | ||
| ModelResponseStream, | ||
| TextCompletionResponse, | ||
| ) | ||
|
|
||
|
|
||
| class StreamingCompletion: | ||
| """ | ||
| Async iterable wrapper around ``litellm.acompletion`` with streaming. | ||
|
|
||
| Yields ``ModelResponseStream`` chunks as they arrive. After iteration | ||
| completes, the assembled ``ModelResponse`` is available via the | ||
| ``.response`` property. | ||
|
|
||
| Usage:: | ||
|
|
||
| stream = StreamingCompletion(model=..., messages=...) | ||
| async for chunk in stream: | ||
| # handle chunk however you like (print, log, send over WS, …) | ||
| pass | ||
| final = stream.response # fully assembled ModelResponse | ||
| """ | ||
|
|
||
| def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
| kwargs = dict(kwargs) | ||
| kwargs.pop("stream", None) | ||
| self._args = args | ||
| self._kwargs = kwargs | ||
| self._response: Optional[Union[ModelResponse, TextCompletionResponse]] = None | ||
| self._iterated: bool = False | ||
|
|
||
| @property | ||
| def response(self) -> Optional[Union[ModelResponse, TextCompletionResponse]]: | ||
| """The final assembled response. Only available after iteration.""" | ||
| if not self._iterated: | ||
| raise RuntimeError( | ||
| "StreamingCompletion has not been iterated yet. " | ||
| "Use 'async for chunk in stream:' before accessing .response" | ||
| ) | ||
| return self._response | ||
|
|
||
| async def __aiter__(self) -> AsyncIterator[ModelResponseStream]: | ||
| self._response = None | ||
| self._iterated = False | ||
|
|
||
| chunks: list[ModelResponseStream] = [] | ||
| stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs) | ||
|
|
||
| async for chunk in stream: | ||
| chunks.append(chunk) | ||
| yield chunk | ||
|
|
||
| self._response = litellm.stream_chunk_builder(chunks) | ||
| self._iterated = True | ||
139 changes: 139 additions & 0 deletions
139
libs/core/kiln_ai/adapters/litellm_utils/test_litellm_streaming.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from types import SimpleNamespace | ||
| from typing import Any, List | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from kiln_ai.adapters.litellm_utils.litellm_streaming import StreamingCompletion | ||
|
|
||
|
|
||
| def _make_chunk(content: str | None = None, finish_reason: str | None = None) -> Any: | ||
| """Build a minimal chunk object matching litellm's streaming shape.""" | ||
| delta = SimpleNamespace(content=content, role="assistant") | ||
| choice = SimpleNamespace(delta=delta, finish_reason=finish_reason, index=0) | ||
| return SimpleNamespace(choices=[choice], id="chatcmpl-test", model="test-model") | ||
|
|
||
|
|
||
| async def _async_iter(items: List[Any]): | ||
| """Turn a plain list into an async iterator.""" | ||
| for item in items: | ||
| yield item | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_acompletion(): | ||
| with patch("litellm.acompletion") as mock: | ||
| yield mock | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_chunk_builder(): | ||
| with patch("litellm.stream_chunk_builder") as mock: | ||
| yield mock | ||
|
|
||
|
|
||
| class TestStreamingCompletion: | ||
| async def test_yields_all_chunks(self, mock_acompletion, mock_chunk_builder): | ||
| chunks = [_make_chunk("Hello"), _make_chunk(" world"), _make_chunk("!")] | ||
| mock_acompletion.return_value = _async_iter(chunks) | ||
| mock_chunk_builder.return_value = MagicMock(name="final_response") | ||
|
|
||
| stream = StreamingCompletion(model="test", messages=[]) | ||
| received = [chunk async for chunk in stream] | ||
|
|
||
| assert received == chunks | ||
|
|
||
| async def test_response_available_after_iteration( | ||
| self, mock_acompletion, mock_chunk_builder | ||
| ): | ||
| chunks = [_make_chunk("hi")] | ||
| mock_acompletion.return_value = _async_iter(chunks) | ||
| sentinel = MagicMock(name="final_response") | ||
| mock_chunk_builder.return_value = sentinel | ||
|
|
||
| stream = StreamingCompletion(model="test", messages=[]) | ||
| async for _ in stream: | ||
| pass | ||
|
|
||
| assert stream.response is sentinel | ||
|
|
||
| async def test_response_raises_before_iteration(self): | ||
| stream = StreamingCompletion(model="test", messages=[]) | ||
| with pytest.raises(RuntimeError, match="not been iterated"): | ||
| _ = stream.response | ||
|
|
||
| async def test_stream_kwarg_is_stripped(self, mock_acompletion, mock_chunk_builder): | ||
| mock_acompletion.return_value = _async_iter([]) | ||
| mock_chunk_builder.return_value = None | ||
|
|
||
| stream = StreamingCompletion(model="test", messages=[], stream=False) | ||
| async for _ in stream: | ||
| pass | ||
|
|
||
| _, call_kwargs = mock_acompletion.call_args | ||
| assert call_kwargs["stream"] is True | ||
|
|
||
| async def test_passes_args_and_kwargs_through( | ||
| self, mock_acompletion, mock_chunk_builder | ||
| ): | ||
| mock_acompletion.return_value = _async_iter([]) | ||
| mock_chunk_builder.return_value = None | ||
|
|
||
| stream = StreamingCompletion( | ||
| model="gpt-4", messages=[{"role": "user", "content": "hi"}], temperature=0.5 | ||
| ) | ||
| async for _ in stream: | ||
| pass | ||
|
|
||
| _, call_kwargs = mock_acompletion.call_args | ||
| assert call_kwargs["model"] == "gpt-4" | ||
| assert call_kwargs["messages"] == [{"role": "user", "content": "hi"}] | ||
| assert call_kwargs["temperature"] == 0.5 | ||
| assert call_kwargs["stream"] is True | ||
|
|
||
| async def test_chunks_passed_to_builder(self, mock_acompletion, mock_chunk_builder): | ||
| chunks = [_make_chunk("a"), _make_chunk("b")] | ||
| mock_acompletion.return_value = _async_iter(chunks) | ||
| mock_chunk_builder.return_value = MagicMock() | ||
|
|
||
| stream = StreamingCompletion(model="test", messages=[]) | ||
| async for _ in stream: | ||
| pass | ||
|
|
||
| mock_chunk_builder.assert_called_once_with(chunks) | ||
|
|
||
| async def test_re_iteration_resets_state( | ||
| self, mock_acompletion, mock_chunk_builder | ||
| ): | ||
| first_chunks = [_make_chunk("first")] | ||
| second_chunks = [_make_chunk("second")] | ||
| first_response = MagicMock(name="first_response") | ||
| second_response = MagicMock(name="second_response") | ||
|
|
||
| mock_acompletion.side_effect = [ | ||
| _async_iter(first_chunks), | ||
| _async_iter(second_chunks), | ||
| ] | ||
| mock_chunk_builder.side_effect = [first_response, second_response] | ||
|
|
||
| stream = StreamingCompletion(model="test", messages=[]) | ||
|
|
||
| async for _ in stream: | ||
| pass | ||
| assert stream.response is first_response | ||
|
|
||
| async for _ in stream: | ||
| pass | ||
| assert stream.response is second_response | ||
|
|
||
| async def test_empty_stream(self, mock_acompletion, mock_chunk_builder): | ||
| mock_acompletion.return_value = _async_iter([]) | ||
| mock_chunk_builder.return_value = None | ||
|
|
||
| stream = StreamingCompletion(model="test", messages=[]) | ||
| received = [chunk async for chunk in stream] | ||
|
|
||
| assert received == [] | ||
| assert stream.response is None |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing
try/finallyleavesresponseinaccessible after any interrupted iteration.If the consumer's
async forbody raises (e.g., anon_chunkcallback throws) or if the streaming call itself fails, Python sendsGeneratorExitinto the generator at theyieldpoint. The two lines after the loop never execute, so_iteratedstaysFalseandstream.responsewill always raiseRuntimeError— callers cannot distinguish "not yet started" from "stream failed".Additionally, the litellm
streamobject (aCustomStreamWrapper) won't have.aclose()called implicitly when the generator is abandoned without exhaustion.🐛 Proposed fix — `try/finally` for guaranteed state finalization
async def __aiter__(self) -> AsyncIterator[ModelResponseStream]: self._response = None self._iterated = False chunks: list[ModelResponseStream] = [] - stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs) - - async for chunk in stream: - chunks.append(chunk) - yield chunk - - self._response = litellm.stream_chunk_builder(chunks) - self._iterated = True + try: + stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs) + async for chunk in stream: + chunks.append(chunk) + yield chunk + finally: + self._response = litellm.stream_chunk_builder(chunks) if chunks else None + self._iterated = True🤖 Prompt for AI Agents