Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/cli/src/agents/anto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ struct GetTimeParams {
impl Agent for Anto {
type Chunk = StreamChunk;

const NAME: &str = "Anto";

const SYSTEM_PROMPT: &str = "You are Anto, a helpful assistant. You can get the current time.";

fn tools() -> Vec<Tool> {
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ChatCmd {
println!("REASONING\n{reasoning_content}");
}

if let Some(content) = response.message() {
if let Some(content) = response.content() {
println!("\n\nCONTENT\n{content}");
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ futures-core.workspace = true
futures-util.workspace = true
reqwest.workspace = true
schemars.workspace = true
tracing.workspace = true
16 changes: 16 additions & 0 deletions crates/core/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub trait Agent: Clone {
/// The parsed chunk from [StreamChunk]
type Chunk;

/// The name of the agent
const NAME: &str;

/// The system prompt for the agent
const SYSTEM_PROMPT: &str;

Expand All @@ -23,6 +26,17 @@ pub trait Agent: Clone {
ToolChoice::Auto
}

/// Compact the chat history to reduce token usage.
///
/// This method is called before each LLM request (both `send` and `stream`).
/// Agents can override this to remove redundant data from historical messages,
/// such as outdated candle data or large context that's no longer relevant.
///
/// The default implementation returns messages unchanged.
fn compact(&self, messages: Vec<Message>) -> Vec<Message> {
messages
}

/// Dispatch tool calls
fn dispatch(&self, tools: &[ToolCall]) -> impl Future<Output = Vec<Message>> {
async move {
Expand All @@ -45,6 +59,8 @@ pub trait Agent: Clone {
impl Agent for () {
type Chunk = StreamChunk;

const NAME: &str = "Default";

const SYSTEM_PROMPT: &str = "You are a helpful assistant.";

async fn chunk(&self, chunk: &StreamChunk) -> Result<Self::Chunk> {
Expand Down
93 changes: 38 additions & 55 deletions crates/core/src/chat.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
//! Chat abstractions for the unified LLM Interfaces

use crate::{
Agent, Config, FinishReason, General, LLM, Response, Role, ToolCall, message::Message,
};
use crate::{Agent, Config, FinishReason, General, LLM, Response, Role, message::Message};
use anyhow::Result;
use futures_core::Stream;
use futures_util::StreamExt;
use std::collections::HashMap;

const MAX_TOOL_CALLS: usize = 16;

Expand Down Expand Up @@ -43,10 +40,18 @@ impl<P: LLM> Chat<P, ()> {
}

impl<P: LLM, A: Agent> Chat<P, A> {
/// Get a mutable reference to the agent
pub fn agent_mut(&mut self) -> &mut A {
&mut self.agent
}

/// Get the chat messages for API requests.
///
/// This applies agent-specific compaction to reduce token usage,
/// then strips reasoning content from non-tool-call messages.
pub fn messages(&self) -> Vec<Message> {
self.messages
.clone()
self.agent
.compact(self.messages.clone())
.into_iter()
.map(|mut m| {
if m.tool_calls.is_empty() {
Expand Down Expand Up @@ -89,20 +94,17 @@ impl<P: LLM, A: Agent> Chat<P, A> {
self.messages.push(message);
for _ in 0..MAX_TOOL_CALLS {
let response = self.provider.send(&config, &self.messages()).await?;
if let Some(message) = response.message() {
self.messages.push(Message::assistant(
message,
response.reasoning().cloned(),
response.tool_calls(),
));
}

let Some(tool_calls) = response.tool_calls() else {
let Some(message) = response.message() else {
return Ok(response);
};

let result = self.agent.dispatch(tool_calls).await;
self.messages.extend(result);
if message.tool_calls.is_empty() {
self.messages.push(message);
return Ok(response);
}

let result = self.agent.dispatch(&message.tool_calls).await;
self.messages.extend([vec![message], result].concat());
}

anyhow::bail!("max tool calls reached");
Expand All @@ -121,38 +123,20 @@ impl<P: LLM, A: Agent> Chat<P, A> {
async_stream::try_stream! {
for _ in 0..MAX_TOOL_CALLS {
let messages = self.messages();
let mut builder = Message::builder(Role::Assistant);

// Stream the chunks
let inner = self.provider.stream(config.clone(), &messages, self.usage);
futures_util::pin_mut!(inner);

let mut tool_calls: HashMap<u32, ToolCall> = HashMap::new();
let mut message = String::new();
let mut reasoning = String::new();
while let Some(chunk) = inner.next().await {
let chunk = chunk?;
if let Some(calls) = chunk.tool_calls() {
for call in calls {
let entry = tool_calls.entry(call.index).or_default();
if !call.id.is_empty() {
entry.id.clone_from(&call.id);
}
if !call.call_type.is_empty() {
entry.call_type.clone_from(&call.call_type);
}
if !call.function.name.is_empty() {
entry.function.name.clone_from(&call.function.name);
}
entry.function.arguments.push_str(&call.function.arguments);
while let Some(result) = inner.next().await {
let chunk = match result {
Ok(chunk) => chunk,
Err(e) => {
tracing::error!("Error in LLM stream: {:?}", e);
Err(e)?
}
}

if let Some(content) = chunk.content() {
message.push_str(content);
}

if let Some(reason) = chunk.reasoning_content() {
reasoning.push_str(reason);
}

};
builder.accept(&chunk);
yield self.agent.chunk(&chunk).await?;
if let Some(reason) = chunk.reason() {
match reason {
Expand All @@ -163,17 +147,16 @@ impl<P: LLM, A: Agent> Chat<P, A> {
}
}

let reasoning = if reasoning.is_empty() { None } else { Some(reasoning) };
if tool_calls.is_empty() {
self.messages.push(Message::assistant(&message, reasoning, None));
// Build the message and dispatch tool calls
let message = builder.build();
if message.tool_calls.is_empty() {
self.messages.push(message);
break;
} else {
let mut calls: Vec<_> = tool_calls.into_values().collect();
calls.sort_by_key(|c| c.index);
self.messages.push(Message::assistant(&message, reasoning, Some(&calls)));
let result = self.agent.dispatch(&calls).await;
self.messages.extend(result);
}


let result = self.agent.dispatch(&message.tool_calls).await;
self.messages.extend([vec![message], result].concat());
}
}
}
Expand Down
57 changes: 56 additions & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Turbofish LLM message

use crate::ToolCall;
use std::collections::BTreeMap;

use crate::{StreamChunk, ToolCall};
use serde::{Deserialize, Serialize};

/// A message in the chat
Expand Down Expand Up @@ -77,6 +79,59 @@ impl Message {
..Default::default()
}
}

/// Create a new message builder
pub fn builder(role: Role) -> MessageBuilder {
MessageBuilder::new(role)
}
}

/// A builder for messages
pub struct MessageBuilder {
/// The message
message: Message,

/// The tool calls
calls: BTreeMap<u32, ToolCall>,
}

impl MessageBuilder {
/// Create a new message builder
pub fn new(role: Role) -> Self {
Self {
message: Message {
role,
..Default::default()
},
calls: BTreeMap::new(),
}
}

/// Accept a chunk from the stream
pub fn accept(&mut self, chunk: &StreamChunk) {
if let Some(calls) = chunk.tool_calls() {
for call in calls {
let entry = self.calls.entry(call.index).or_default();
entry.merge(call);
}
}

if let Some(content) = chunk.content() {
self.message.content.push_str(content);
}

if let Some(reason) = chunk.reasoning_content() {
self.message.reasoning_content.push_str(reason);
}
}

/// Build the message
pub fn build(mut self) -> Message {
if !self.calls.is_empty() {
self.message.tool_calls = self.calls.into_values().collect();
}
self.message
}
}

/// The role of a message
Expand Down
13 changes: 11 additions & 2 deletions crates/core/src/response.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Chat response abstractions for the unified LLM Interfaces

use crate::{Role, tool::ToolCall};
use crate::{Message, Role, tool::ToolCall};
use serde::{Deserialize, Serialize};

/// A chat completion response from the LLM
Expand Down Expand Up @@ -29,8 +29,17 @@ pub struct Response {
}

impl Response {
pub fn message(&self) -> Option<Message> {
let choice = self.choices.first()?;
Some(Message::assistant(
choice.message.content.clone().unwrap_or_default(),
choice.message.reasoning_content.clone(),
choice.message.tool_calls.as_deref(),
))
}

/// Get the first message from the response
pub fn message(&self) -> Option<&String> {
pub fn content(&self) -> Option<&String> {
self.choices
.first()
.and_then(|choice| choice.message.content.as_ref())
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ pub struct ToolCall {
pub function: FunctionCall,
}

impl ToolCall {
/// Merge two tool calls into one
pub fn merge(&mut self, call: &Self) {
if !call.id.is_empty() {
self.id.clone_from(&call.id);
}
if !call.call_type.is_empty() {
self.call_type.clone_from(&call.call_type);
}
if !call.function.name.is_empty() {
self.function.name.clone_from(&call.function.name);
}
self.function.arguments.push_str(&call.function.arguments);
}
}

/// A function call within a tool call
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct FunctionCall {
Expand Down
2 changes: 1 addition & 1 deletion crates/cydonia/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
//! This is the umbrella crate that re-exports all ullm components.

pub use ccore::{
self, Agent, Chat, Client, Config, General, LLM, Message, StreamChunk, Tool, ToolCall,
self, Agent, Chat, Client, Config, General, LLM, Message, Role, StreamChunk, Tool, ToolCall,
};
pub use deepseek::DeepSeek;
21 changes: 10 additions & 11 deletions llm/deepseek/src/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl LLM for DeepSeek {
.text()
.await?;

tracing::debug!("response: {text}");
serde_json::from_str(&text).map_err(Into::into)
// self.client
// .request(Method::POST, ENDPOINT)
Expand All @@ -63,25 +62,25 @@ impl LLM for DeepSeek {
usage: bool,
) -> impl Stream<Item = Result<StreamChunk>> {
let body = req.messages(messages).stream(usage);
tracing::debug!(
"request: {}",
serde_json::to_string(&body).unwrap_or_default()
);
let request = self
.client
.request(Method::POST, ENDPOINT)
.headers(self.headers.clone())
.json(&body);

try_stream! {
let mut stream = request.send().await?.bytes_stream();
while let Some(chunk) = stream.next().await {
let text = String::from_utf8_lossy(&chunk?).into_owned();
let response = request.send().await?;
let mut stream = response.bytes_stream();
while let Some(Ok(bytes)) = stream.next().await {
let text = String::from_utf8_lossy(&bytes).into_owned();
for data in text.split("data: ").skip(1).filter(|s| !s.starts_with("[DONE]")) {
tracing::debug!("response: {}", data.trim());
match serde_json::from_str::<StreamChunk>(data.trim()) {
let trimmed = data.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<StreamChunk>(trimmed) {
Ok(chunk) => yield chunk,
Err(e) => tracing::warn!("failed to parse chunk: {e}, data: {}", data.trim()),
Err(e) => tracing::warn!("Failed to parse chunk: {e}, data: {}", trimmed),
}
}
}
Expand Down