Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/channels/telegram/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::brain::agent::AgentService;
use crate::config::Config;
use crate::db::ChannelMessageRepository;
use crate::services::{ServiceContext, SessionService};
use std::collections::HashMap;
use std::sync::Arc;
use teloxide::prelude::*;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -107,9 +106,6 @@ impl TelegramAgent {
}
}

// Per-user session tracking for non-owner users (owner shares TUI session)
let extra_sessions: Arc<Mutex<HashMap<i64, (Uuid, std::time::Instant)>>> =
Arc::new(Mutex::new(HashMap::new()));
let agent = self.agent_service.clone();
let session_svc = self.session_service.clone();
let bot_token = Arc::new(token);
Expand Down Expand Up @@ -179,14 +175,12 @@ impl TelegramAgent {
let agent = agent.clone();
let session_svc = session_svc.clone();
let shared_session = shared_session.clone();
let extra_sessions = extra_sessions.clone();
let config_rx = config_rx.clone();
move |bot: Bot, query: CallbackQuery| {
let state = telegram_state.clone();
let agent = agent.clone();
let session_svc = session_svc.clone();
let shared_session = shared_session.clone();
let extra_sessions = extra_sessions.clone();
let config_rx = config_rx.clone();
async move {
if let Some(data) = query.data.as_deref() {
Expand Down Expand Up @@ -382,15 +376,16 @@ impl TelegramAgent {

if is_owner {
*shared_session.lock().await = Some(new_id);
} else {
extra_sessions.lock().await.insert(
caller_id,
(new_id, std::time::Instant::now()),
);
}
state
.register_session_chat(new_id, query.message.as_ref().map(|m| m.chat().id.0).unwrap_or(caller_id))
.await;
// Owner and guest: bind chat_id → session_id for
// handle_message (issue #121). Guest extra_sessions
// map was removed — it was never read on ingest.
let switch_chat_id = query
.message
.as_ref()
.map(|m| m.chat().id.0)
.unwrap_or(caller_id);
state.register_session_chat(new_id, switch_chat_id).await;

// Touch updated_at so find_session_by_title_suffix returns this session on next message
if let Ok(Some(s)) = session_svc.get_session(new_id).await {
Expand Down
126 changes: 91 additions & 35 deletions src/channels/telegram/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Processes incoming messages: text, voice (STT/TTS), photos, image documents, allowlist enforcement.
//! Supports live streaming (edit-based) and Telegram-native approval inline keyboards.

use super::session_resolve;
use super::TelegramState;
use crate::brain::agent::{AgentService, ProgressCallback, ProgressEvent};
use crate::config::{Config, RespondTo};
Expand Down Expand Up @@ -847,7 +848,6 @@ pub(crate) async fn handle_message(
}
});

// Resolve session: owner shares the TUI session, other users get their own.
// Owner = first user in the config's allowed_users list (Vec order, not HashSet).
let owner_id = tg_cfg
.allowed_users
Expand Down Expand Up @@ -880,25 +880,84 @@ pub(crate) async fn handle_message(
// 2026-04-25: a "🦀 KRAB-INCEPTION 🦀" group renamed to "🦀 HEY IOLO
// BUILD 🦀" produced two distinct DB rows under the old title-only
// lookup. The chat_id suffix prevents that.
let chat_id_suffix = format!("[chat:{}]", msg.chat.id.0);
let session_title = if is_dm {
format!(
"Telegram: DM {} ({}) {}",
user.first_name, user_id, chat_id_suffix
)
} else {
format!("Telegram: {} {}", chat_title, chat_id_suffix)
};
let chat_id = msg.chat.id.0;
let chat_id_suffix = session_resolve::chat_id_suffix(chat_id);
let session_title = session_resolve::build_session_title(
is_dm,
&user.first_name,
user_id,
&chat_title,
chat_id,
);
// Legacy title format used before the chat_id suffix was added.
// Kept so the first message after the upgrade matches the existing
// row instead of orphaning it.
let legacy_title = if is_dm {
format!("Telegram: DM {} ({})", user.first_name, user_id)
} else {
format!("Telegram: {}", chat_title)
};
let legacy_title =
session_resolve::build_legacy_session_title(is_dm, &user.first_name, user_id, &chat_title);

let session_id = {
// Resolve policy (chat map → suffix → create): see
// session_resolve::choose_resolve_source and telegram_session_resolve_test.
// 0) Explicit chat→session binding from /sessions switch or prior message.
// Policy: choose_resolve_source (tests) — ChatBound when map → live row.
if let Some(bound_id) = telegram_state.chat_session(chat_id).await
&& let Ok(Some(bound)) = session_svc.get_session(bound_id).await
&& !bound.is_archived()
&& matches!(
session_resolve::choose_resolve_source(Some(bound_id), false, None),
session_resolve::ResolveSource::ChatBound
)
{
if session_resolve::session_idle_expired(bound.updated_at, idle_timeout_hours) {
if let Err(e) = session_svc.archive_session(bound.id).await {
tracing::error!(
"Telegram: failed to archive idle chat-bound session {}: {}",
bound.id,
e
);
}
match crate::channels::session_init::create_channel_session(
&session_svc,
Some(session_title.clone()),
)
.await
{
Ok(new_session) => {
tracing::info!(
"Telegram: idle-timeout reset (chat-bound) — new session {} for \"{}\"",
new_session.id,
session_title,
);
new_session.id
}
Err(e) => {
tracing::error!("Telegram: failed to create session: {}", e);
bot.send_message(msg.chat.id, "Internal error creating session.")
.await?;
return Ok(());
}
}
} else {
if session_resolve::should_refresh_label(
bound.title.as_deref().unwrap_or(""),
&session_title,
) {
let mut renamed = bound.clone();
renamed.title = Some(session_title.clone());
if let Err(e) = session_svc.update_session(&renamed).await {
tracing::warn!(
"Telegram: failed to refresh session {} label: {}",
bound_id,
e
);
}
}
tracing::debug!(
"Telegram: using chat-bound session {} for chat_id={}",
bound_id,
chat_id
);
bound_id
}
} else {
// 1) Stable lookup: any session whose title ends with the chat_id
// suffix is THIS chat regardless of how the label has changed.
// 2) Legacy fallback: pre-suffix sessions match the bare title.
Expand Down Expand Up @@ -934,10 +993,7 @@ pub(crate) async fn handle_message(
}

if let Some(session) = existing {
if idle_timeout_hours.is_some_and(|h| {
let elapsed = (chrono::Utc::now() - session.updated_at).num_seconds();
elapsed > (h * 3600.0) as i64
}) {
if session_resolve::session_idle_expired(session.updated_at, idle_timeout_hours) {
if let Err(e) = session_svc.archive_session(session.id).await {
tracing::error!("Telegram: failed to archive session {}: {}", session.id, e);
}
Expand All @@ -963,12 +1019,12 @@ pub(crate) async fn handle_message(
}
}
} else {
// Label drift: the chat was renamed since this session
// was created. The chat_id suffix kept us pointing at
// the right row, but the stored title still shows the
// old label. Update it so /sessions etc. show the
// user's current name for the chat.
if session.title.as_deref() != Some(session_title.as_str()) {
// Label drift: refresh display label when appropriate (issue #121:
// do not revert auto-titled DM sessions to the default template).
if session_resolve::should_refresh_label(
session.title.as_deref().unwrap_or(""),
&session_title,
) {
let mut renamed = session.clone();
let prev_title = renamed.title.clone().unwrap_or_default();
renamed.title = Some(session_title.clone());
Expand Down Expand Up @@ -1019,6 +1075,7 @@ pub(crate) async fn handle_message(
}
}
}
}
};

tracing::info!(
Expand Down Expand Up @@ -1093,14 +1150,13 @@ pub(crate) async fn handle_message(
// via `find_session_by_title_suffix` and resolution
// reverts to the previously-bound session — i.e. /new
// appears to do nothing (issue #89).
let session_title = if is_dm {
format!(
"Telegram: DM {} ({}) {}",
user.first_name, user_id, chat_id_suffix
)
} else {
format!("Telegram: {} {}", chat_title, chat_id_suffix)
};
let session_title = session_resolve::build_session_title(
is_dm,
&user.first_name,
user_id,
&chat_title,
chat_id,
);
// Archive the previous session on /new, except for the owner —
// owner sessions stay non-archived so they remain visible in
// /sessions for history review. Guest sessions get archived
Expand Down
1 change: 1 addition & 0 deletions src/channels/telegram/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod agent;
pub(crate) mod follow_up_question;
pub(crate) mod handler;
pub(crate) mod session_resolve;

pub use agent::TelegramAgent;

Expand Down
Loading