Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3f83187
refactor: support multiturn (existing conversation history with task …
leonardmq Feb 26, 2026
6c7cc58
refactor: use correct typing in chat formatter
leonardmq Feb 27, 2026
04748e4
refactor: retrieve task_run one level up
leonardmq Feb 27, 2026
57896a8
Proof of concept streaming API
scosman Feb 18, 2026
3d6ced2
test: paid integration test for streaming
leonardmq Mar 3, 2026
1464fb8
test: add test for session + streaming together
leonardmq Mar 3, 2026
dee10b3
Update libs/core/kiln_ai/adapters/model_adapters/test_litellm_adapter…
leonardmq Mar 3, 2026
456088d
refactor: stream with support for AI SDK (with tool events) and OpenA…
leonardmq Mar 8, 2026
0ea65b4
refactor: ai sdk events as pydantic models
leonardmq Mar 8, 2026
a98d886
fix: model_dump implementation and remove to_see to leave transport s…
leonardmq Mar 8, 2026
e989dca
fix: should reset before next round of toolcalls
leonardmq Mar 8, 2026
11710f2
refactor: take in a trace instead of a task_run for session continuation
leonardmq Mar 8, 2026
3ad6a27
refactor: remove ability to continue task run at api level
leonardmq Mar 8, 2026
3f08ed5
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-420-a…
leonardmq Mar 8, 2026
4d8e99f
refactor: wrap stream iterators to allow exposing task run at the end
leonardmq Mar 8, 2026
eb537ed
fix: remove autosave_runs hardcoded
leonardmq Mar 8, 2026
66b3151
fix: close text when opening toolcall
leonardmq Mar 8, 2026
4460580
fix: reset fully
leonardmq Mar 8, 2026
c516dca
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-447-f…
leonardmq Mar 10, 2026
89133f6
feat: support nesting task runs into each other
leonardmq Mar 13, 2026
dc74f47
chore: remove redundant test
leonardmq Mar 13, 2026
79d3806
chore: lint unused import
leonardmq Mar 13, 2026
8aa5da1
refactor: allow for subclasses to declare parent types
leonardmq Mar 16, 2026
27e0393
fix: throw on task run parent error
leonardmq Mar 16, 2026
243da49
test: add mega test
leonardmq Mar 16, 2026
1accfe0
refactor: iterative walk back up the tree instead of recursion
leonardmq Mar 16, 2026
7d536d3
refactor: remove unused arg
leonardmq Mar 16, 2026
0540e50
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-447-f…
leonardmq Mar 16, 2026
880ae77
fix: paid test with thinking level
leonardmq Mar 16, 2026
f707004
fix: finish_step must come before finish in AI SDK protocol
leonardmq Mar 16, 2026
61b1249
test: more coverage for streaming
leonardmq Mar 16, 2026
5f81cfc
fix: do not use only the cached parent
leonardmq Mar 16, 2026
72dde7a
refactor: remove dead code, add test
leonardmq Mar 16, 2026
a5a902c
Merge branch 'leonard/kil-447-feat-stream-multiturn-ai-sdk-openai-pro…
leonardmq Mar 16, 2026
e26e52d
cr: use isinstance instead of class name
leonardmq Mar 17, 2026
2c120de
refactor: actually saving task_run under its parent during invoke
leonardmq Mar 17, 2026
f9af9d1
Merge pull request #1126 from Kiln-AI/leonard/kil-461-feat-nesting-ta…
leonardmq Mar 17, 2026
a712d9a
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-447-f…
leonardmq Mar 18, 2026
8e4a7f9
Merge branch 'leonard/kil-447-feat-stream-multiturn-ai-sdk-openai-pro…
leonardmq Mar 18, 2026
cb4401c
Merge branch 'main' of github.com:Kiln-AI/Kiln into leonard/kil-447-f…
leonardmq Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ libs/core/build
libs/server/build

dist/
test_output/

.mcp.json

Expand Down
10 changes: 10 additions & 0 deletions app/web_ui/src/lib/api_schema.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7031,6 +7031,11 @@ export interface components {
*
* Contains the input used, its source, the output produced, and optional
* repair information if the output needed correction.
*
* Can be nested under another TaskRun; nested runs are stored as child runs
* in a "runs" subfolder (same relationship name as Task's runs).
*
* Accepts both Task and TaskRun as parents (polymorphic).
*/
"TaskRun-Input": {
/**
Expand Down Expand Up @@ -7093,6 +7098,11 @@ export interface components {
*
* Contains the input used, its source, the output produced, and optional
* repair information if the output needed correction.
*
* Can be nested under another TaskRun; nested runs are stored as child runs
* in a "runs" subfolder (same relationship name as Task's runs).
*
* Accepts both Task and TaskRun as parents (polymorphic).
*/
"TaskRun-Output": {
/**
Expand Down
4 changes: 4 additions & 0 deletions libs/core/kiln_ai/adapters/chat/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .chat_formatter import (
BasicChatMessage,
ChatCompletionMessageIncludingLiteLLM,
ChatFormatter,
ChatMessage,
ChatStrategy,
MultiturnFormatter,
ToolCallMessage,
ToolResponseMessage,
get_chat_formatter,
Expand All @@ -11,9 +13,11 @@

__all__ = [
"BasicChatMessage",
"ChatCompletionMessageIncludingLiteLLM",
"ChatFormatter",
"ChatMessage",
"ChatStrategy",
"MultiturnFormatter",
"ToolCallMessage",
"ToolResponseMessage",
"build_tool_call_messages",
Expand Down
61 changes: 59 additions & 2 deletions libs/core/kiln_ai/adapters/chat/chat_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Literal, Optional, Sequence, Union
from typing import Dict, List, Literal, Optional, Sequence, TypeAlias, Union

from litellm.types.utils import Message as LiteLLMMessage

from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType
from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
from kiln_ai.utils.open_ai_types import ChatCompletionMessageToolCallParam
from kiln_ai.utils.open_ai_types import (
ChatCompletionMessageParam,
ChatCompletionMessageToolCallParam,
)

COT_FINAL_ANSWER_PROMPT = "Considering the above, return a final result."


ChatCompletionMessageIncludingLiteLLM: TypeAlias = Union[
ChatCompletionMessageParam, LiteLLMMessage
]


@dataclass
class BasicChatMessage:
role: Literal["system", "assistant", "user"]
Expand Down Expand Up @@ -90,6 +100,10 @@ def intermediate_outputs(self) -> Dict[str, str]:
"""Get the intermediate outputs from the chat formatter."""
return self._intermediate_outputs

def initial_messages(self) -> list[ChatCompletionMessageIncludingLiteLLM]:
"""Messages to seed the conversation. Empty for fresh runs; prior trace for continuation."""
return []

@abstractmethod
def next_turn(self, previous_output: str | None = None) -> Optional[ChatTurn]:
"""Advance the conversation and return the next messages if any."""
Expand Down Expand Up @@ -236,6 +250,49 @@ def next_turn(self, previous_output: str | None = None) -> Optional[ChatTurn]:
return None


class MultiturnFormatter(ChatFormatter):
"""
Formatter for continuing a multi-turn conversation with prior trace.
Takes prior_trace (existing conversation) and appends the new user message.
Produces a single turn: the new user message. Tool calls and multi-turn
model responses are handled by _run_model_turn's internal loop.
"""

def __init__(
self,
prior_trace: list[ChatCompletionMessageParam],
user_input: InputType,
) -> None:
super().__init__(
system_message="",
user_input=user_input,
thinking_instructions=None,
)
self._prior_trace = prior_trace

def initial_messages(self) -> list[ChatCompletionMessageIncludingLiteLLM]:
"""Messages to seed the conversation (prior trace)."""
return list(self._prior_trace)

def next_turn(self, previous_output: str | None = None) -> Optional[ChatTurn]:
if self._state == "start":
# prior trace is already in the messages list and contains system and so on, we only need
# to append the latest new user message
user_msg = BasicChatMessage("user", format_user_message(self.user_input))
self._state = "awaiting_final"
self._messages.append(user_msg)
return ChatTurn(messages=[user_msg], final_call=True)

if self._state == "awaiting_final":
if previous_output is None:
raise ValueError("previous_output required for final step")
self._messages.append(BasicChatMessage("assistant", previous_output))
self._state = "done"
return None

return None


def get_chat_formatter(
strategy: ChatStrategy,
system_message: str,
Expand Down
71 changes: 71 additions & 0 deletions libs/core/kiln_ai/adapters/chat/test_chat_formatter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from kiln_ai.adapters.chat import ChatStrategy, get_chat_formatter
from kiln_ai.adapters.chat.chat_formatter import (
COT_FINAL_ANSWER_PROMPT,
MultiturnFormatter,
format_user_message,
)

Expand Down Expand Up @@ -119,6 +120,76 @@ def test_chat_formatter_r1_style():
assert formatter.intermediate_outputs() == {}


def test_multiturn_formatter_initial_messages():
prior_trace = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
]
formatter = MultiturnFormatter(prior_trace=prior_trace, user_input="new input")
assert formatter.initial_messages() == prior_trace


def test_multiturn_formatter_next_turn():
prior_trace = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
]
formatter = MultiturnFormatter(prior_trace=prior_trace, user_input="follow-up")

first = formatter.next_turn()
assert first is not None
assert len(first.messages) == 1
assert first.messages[0].role == "user"
assert first.messages[0].content == "follow-up"
assert first.final_call

assert formatter.next_turn("assistant response") is None


def test_multiturn_formatter_preserves_tool_call_messages():
prior_trace = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "4"},
{
"role": "assistant",
"content": "",
"reasoning_content": "Let me multiply 4 by 7.\n",
"tool_calls": [
{
"id": "call_abc123",
"function": {"arguments": '{"a": 4, "b": 7}', "name": "multiply"},
"type": "function",
}
],
},
{
"content": "28",
"role": "tool",
"tool_call_id": "call_abc123",
"kiln_task_tool_data": None,
},
{
"role": "assistant",
"content": "4 multiplied by 7 is 28.",
"reasoning_content": "Done.\n",
},
]
formatter = MultiturnFormatter(prior_trace=prior_trace, user_input="now double it")
initial = formatter.initial_messages()
assert initial == prior_trace
assert initial[2]["tool_calls"][0]["id"] == "call_abc123"
assert initial[2]["tool_calls"][0]["function"]["name"] == "multiply"
assert initial[3]["role"] == "tool"
assert initial[3]["tool_call_id"] == "call_abc123"

first = formatter.next_turn()
assert first is not None
assert len(first.messages) == 1
assert first.messages[0].role == "user"
assert first.messages[0].content == "now double it"
assert first.final_call


def test_format_user_message():
# String
assert format_user_message("test input") == "test input"
Expand Down
Empty file.
60 changes: 60 additions & 0 deletions libs/core/kiln_ai/adapters/litellm_utils/litellm_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from typing import Any, AsyncIterator, Optional, Union

import litellm
from litellm.types.utils import (
ModelResponse,
ModelResponseStream,
TextCompletionResponse,
)


class StreamingCompletion:
"""
Async iterable wrapper around ``litellm.acompletion`` with streaming.

Yields ``ModelResponseStream`` chunks as they arrive. After iteration
completes, the assembled ``ModelResponse`` is available via the
``.response`` property.

Usage::

stream = StreamingCompletion(model=..., messages=...)
async for chunk in stream:
# handle chunk however you like (print, log, send over WS, …)
pass
final = stream.response # fully assembled ModelResponse
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs = dict(kwargs)
kwargs.pop("stream", None)
self._args = args
self._kwargs = kwargs
self._response: Optional[Union[ModelResponse, TextCompletionResponse]] = None
self._iterated: bool = False

@property
def response(self) -> Optional[Union[ModelResponse, TextCompletionResponse]]:
"""The final assembled response. Only available after iteration."""
if not self._iterated:
raise RuntimeError(
"StreamingCompletion has not been iterated yet. "
"Use 'async for chunk in stream:' before accessing .response"
)
return self._response

async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
self._response = None
self._iterated = False

chunks: list[ModelResponseStream] = []
stream = await litellm.acompletion(*self._args, stream=True, **self._kwargs)

async for chunk in stream:
chunks.append(chunk)
yield chunk

self._response = litellm.stream_chunk_builder(chunks)
self._iterated = True
Loading
Loading