From 13f4930c44a9bf4d2d28ea4d62f34b603ec7add0 Mon Sep 17 00:00:00 2001 From: Alexey Leshchenko Date: Tue, 26 May 2026 14:10:35 +0300 Subject: [PATCH] fix(telegram): preserve auto-titled sessions and bind chat on switch Fixes #121. Stop label drift from overwriting auto-titled DM sessions; resolve sessions via chat_sessions before suffix fallback; apply idle timeout on the bound path; remove dead extra_sessions map. Adds session_resolve helpers and unit tests. --- src/channels/telegram/agent.rs | 23 +-- src/channels/telegram/handler.rs | 126 ++++++++++---- src/channels/telegram/mod.rs | 1 + src/channels/telegram/session_resolve.rs | 177 +++++++++++++++++++ src/tests/mod.rs | 1 + src/tests/telegram_session_resolve_test.rs | 193 +++++++++++++++++++++ 6 files changed, 472 insertions(+), 49 deletions(-) create mode 100644 src/channels/telegram/session_resolve.rs create mode 100644 src/tests/telegram_session_resolve_test.rs diff --git a/src/channels/telegram/agent.rs b/src/channels/telegram/agent.rs index 10be8a43..c866dd53 100644 --- a/src/channels/telegram/agent.rs +++ b/src/channels/telegram/agent.rs @@ -8,7 +8,6 @@ use crate::brain::agent::AgentService; use crate::config::Config; use crate::db::ChannelMessageRepository; use crate::services::{ServiceContext, SessionService}; -use std::collections::HashMap; use std::sync::Arc; use teloxide::prelude::*; use tokio::sync::Mutex; @@ -107,9 +106,6 @@ impl TelegramAgent { } } - // Per-user session tracking for non-owner users (owner shares TUI session) - let extra_sessions: Arc>> = - Arc::new(Mutex::new(HashMap::new())); let agent = self.agent_service.clone(); let session_svc = self.session_service.clone(); let bot_token = Arc::new(token); @@ -179,14 +175,12 @@ impl TelegramAgent { let agent = agent.clone(); let session_svc = session_svc.clone(); let shared_session = shared_session.clone(); - let extra_sessions = extra_sessions.clone(); let config_rx = config_rx.clone(); move |bot: Bot, query: CallbackQuery| { let state = telegram_state.clone(); let agent = agent.clone(); let session_svc = session_svc.clone(); let shared_session = shared_session.clone(); - let extra_sessions = extra_sessions.clone(); let config_rx = config_rx.clone(); async move { if let Some(data) = query.data.as_deref() { @@ -382,15 +376,16 @@ impl TelegramAgent { if is_owner { *shared_session.lock().await = Some(new_id); - } else { - extra_sessions.lock().await.insert( - caller_id, - (new_id, std::time::Instant::now()), - ); } - state - .register_session_chat(new_id, query.message.as_ref().map(|m| m.chat().id.0).unwrap_or(caller_id)) - .await; + // Owner and guest: bind chat_id β†’ session_id for + // handle_message (issue #121). Guest extra_sessions + // map was removed β€” it was never read on ingest. + let switch_chat_id = query + .message + .as_ref() + .map(|m| m.chat().id.0) + .unwrap_or(caller_id); + state.register_session_chat(new_id, switch_chat_id).await; // Touch updated_at so find_session_by_title_suffix returns this session on next message if let Ok(Some(s)) = session_svc.get_session(new_id).await { diff --git a/src/channels/telegram/handler.rs b/src/channels/telegram/handler.rs index 03d87941..310c2dcf 100644 --- a/src/channels/telegram/handler.rs +++ b/src/channels/telegram/handler.rs @@ -3,6 +3,7 @@ //! Processes incoming messages: text, voice (STT/TTS), photos, image documents, allowlist enforcement. //! Supports live streaming (edit-based) and Telegram-native approval inline keyboards. +use super::session_resolve; use super::TelegramState; use crate::brain::agent::{AgentService, ProgressCallback, ProgressEvent}; use crate::config::{Config, RespondTo}; @@ -847,7 +848,6 @@ pub(crate) async fn handle_message( } }); - // Resolve session: owner shares the TUI session, other users get their own. // Owner = first user in the config's allowed_users list (Vec order, not HashSet). let owner_id = tg_cfg .allowed_users @@ -880,25 +880,84 @@ pub(crate) async fn handle_message( // 2026-04-25: a "πŸ¦€ KRAB-INCEPTION πŸ¦€" group renamed to "πŸ¦€ HEY IOLO // BUILD πŸ¦€" produced two distinct DB rows under the old title-only // lookup. The chat_id suffix prevents that. - let chat_id_suffix = format!("[chat:{}]", msg.chat.id.0); - let session_title = if is_dm { - format!( - "Telegram: DM {} ({}) {}", - user.first_name, user_id, chat_id_suffix - ) - } else { - format!("Telegram: {} {}", chat_title, chat_id_suffix) - }; + let chat_id = msg.chat.id.0; + let chat_id_suffix = session_resolve::chat_id_suffix(chat_id); + let session_title = session_resolve::build_session_title( + is_dm, + &user.first_name, + user_id, + &chat_title, + chat_id, + ); // Legacy title format used before the chat_id suffix was added. - // Kept so the first message after the upgrade matches the existing - // row instead of orphaning it. - let legacy_title = if is_dm { - format!("Telegram: DM {} ({})", user.first_name, user_id) - } else { - format!("Telegram: {}", chat_title) - }; + let legacy_title = + session_resolve::build_legacy_session_title(is_dm, &user.first_name, user_id, &chat_title); let session_id = { + // Resolve policy (chat map β†’ suffix β†’ create): see + // session_resolve::choose_resolve_source and telegram_session_resolve_test. + // 0) Explicit chatβ†’session binding from /sessions switch or prior message. + // Policy: choose_resolve_source (tests) β€” ChatBound when map β†’ live row. + if let Some(bound_id) = telegram_state.chat_session(chat_id).await + && let Ok(Some(bound)) = session_svc.get_session(bound_id).await + && !bound.is_archived() + && matches!( + session_resolve::choose_resolve_source(Some(bound_id), false, None), + session_resolve::ResolveSource::ChatBound + ) + { + if session_resolve::session_idle_expired(bound.updated_at, idle_timeout_hours) { + if let Err(e) = session_svc.archive_session(bound.id).await { + tracing::error!( + "Telegram: failed to archive idle chat-bound session {}: {}", + bound.id, + e + ); + } + match crate::channels::session_init::create_channel_session( + &session_svc, + Some(session_title.clone()), + ) + .await + { + Ok(new_session) => { + tracing::info!( + "Telegram: idle-timeout reset (chat-bound) β€” new session {} for \"{}\"", + new_session.id, + session_title, + ); + new_session.id + } + Err(e) => { + tracing::error!("Telegram: failed to create session: {}", e); + bot.send_message(msg.chat.id, "Internal error creating session.") + .await?; + return Ok(()); + } + } + } else { + if session_resolve::should_refresh_label( + bound.title.as_deref().unwrap_or(""), + &session_title, + ) { + let mut renamed = bound.clone(); + renamed.title = Some(session_title.clone()); + if let Err(e) = session_svc.update_session(&renamed).await { + tracing::warn!( + "Telegram: failed to refresh session {} label: {}", + bound_id, + e + ); + } + } + tracing::debug!( + "Telegram: using chat-bound session {} for chat_id={}", + bound_id, + chat_id + ); + bound_id + } + } else { // 1) Stable lookup: any session whose title ends with the chat_id // suffix is THIS chat regardless of how the label has changed. // 2) Legacy fallback: pre-suffix sessions match the bare title. @@ -934,10 +993,7 @@ pub(crate) async fn handle_message( } if let Some(session) = existing { - if idle_timeout_hours.is_some_and(|h| { - let elapsed = (chrono::Utc::now() - session.updated_at).num_seconds(); - elapsed > (h * 3600.0) as i64 - }) { + if session_resolve::session_idle_expired(session.updated_at, idle_timeout_hours) { if let Err(e) = session_svc.archive_session(session.id).await { tracing::error!("Telegram: failed to archive session {}: {}", session.id, e); } @@ -963,12 +1019,12 @@ pub(crate) async fn handle_message( } } } else { - // Label drift: the chat was renamed since this session - // was created. The chat_id suffix kept us pointing at - // the right row, but the stored title still shows the - // old label. Update it so /sessions etc. show the - // user's current name for the chat. - if session.title.as_deref() != Some(session_title.as_str()) { + // Label drift: refresh display label when appropriate (issue #121: + // do not revert auto-titled DM sessions to the default template). + if session_resolve::should_refresh_label( + session.title.as_deref().unwrap_or(""), + &session_title, + ) { let mut renamed = session.clone(); let prev_title = renamed.title.clone().unwrap_or_default(); renamed.title = Some(session_title.clone()); @@ -1019,6 +1075,7 @@ pub(crate) async fn handle_message( } } } + } }; tracing::info!( @@ -1093,14 +1150,13 @@ pub(crate) async fn handle_message( // via `find_session_by_title_suffix` and resolution // reverts to the previously-bound session β€” i.e. /new // appears to do nothing (issue #89). - let session_title = if is_dm { - format!( - "Telegram: DM {} ({}) {}", - user.first_name, user_id, chat_id_suffix - ) - } else { - format!("Telegram: {} {}", chat_title, chat_id_suffix) - }; + let session_title = session_resolve::build_session_title( + is_dm, + &user.first_name, + user_id, + &chat_title, + chat_id, + ); // Archive the previous session on /new, except for the owner β€” // owner sessions stay non-archived so they remain visible in // /sessions for history review. Guest sessions get archived diff --git a/src/channels/telegram/mod.rs b/src/channels/telegram/mod.rs index 24e7ce64..c75f8ad2 100644 --- a/src/channels/telegram/mod.rs +++ b/src/channels/telegram/mod.rs @@ -6,6 +6,7 @@ mod agent; pub(crate) mod follow_up_question; pub(crate) mod handler; +pub(crate) mod session_resolve; pub use agent::TelegramAgent; diff --git a/src/channels/telegram/session_resolve.rs b/src/channels/telegram/session_resolve.rs new file mode 100644 index 00000000..c8aac2c5 --- /dev/null +++ b/src/channels/telegram/session_resolve.rs @@ -0,0 +1,177 @@ +//! Pure Telegram session title + label-drift helpers (testable without teloxide). +//! +//! Issue #121: naive full-title comparison reverted auto-titled DM sessions back +//! to the default `Telegram: DM …` template on every subsequent message. + +/// Build the canonical session title for a Telegram chat. +pub fn build_session_title( + is_dm: bool, + user_name: &str, + user_id: i64, + chat_title: &str, + chat_id: i64, +) -> String { + let chat_id_suffix = format!("[chat:{chat_id}]"); + if is_dm { + format!("Telegram: DM {user_name} ({user_id}) {chat_id_suffix}") + } else { + format!("Telegram: {chat_title} {chat_id_suffix}") + } +} + +/// Legacy title format (pre suffix) for migration lookups. +pub fn build_legacy_session_title(is_dm: bool, user_name: &str, user_id: i64, chat_title: &str) -> String { + if is_dm { + format!("Telegram: DM {user_name} ({user_id})") + } else { + format!("Telegram: {chat_title}") + } +} + +pub fn chat_id_suffix(chat_id: i64) -> String { + format!("[chat:{chat_id}]") +} + +/// True when a session exceeded the configured idle window (same rule as handler suffix path). +pub fn session_idle_expired(updated_at: chrono::DateTime, idle_hours: Option) -> bool { + idle_hours.is_some_and(|h| { + let elapsed = (chrono::Utc::now() - updated_at).num_seconds(); + elapsed > (h * 3600.0) as i64 + }) +} + +/// Handler resolve policy: explicit chat binding wins over suffix `updated_at` winner. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResolveSource { + ChatBound, + Suffix, + Create, +} + +pub fn choose_resolve_source( + chat_bound: Option, + bound_archived: bool, + suffix_match: Option, +) -> ResolveSource { + if chat_bound.is_some() && !bound_archived { + ResolveSource::ChatBound + } else if suffix_match.is_some() { + ResolveSource::Suffix + } else { + ResolveSource::Create + } +} + +/// Whether to overwrite a stored session title with the freshly built template. +/// +/// - Default DM titles: refresh when the template default changed (display name). +/// - Auto-titled / custom DM titles: never clobber (issue #121). +/// - Telegram groups: refresh when the visible group label changed (suffix stable). +pub fn should_refresh_label(stored: &str, template: &str) -> bool { + if stored == template { + return false; + } + + if crate::brain::agent::service::AgentService::is_default_channel_title(stored) { + return crate::brain::agent::service::AgentService::is_default_channel_title(template) + && stored != template; + } + + if is_telegram_group_session_title(stored) && is_telegram_group_session_title(template) { + return telegram_middle_label(stored) != telegram_middle_label(template); + } + + false +} + +fn is_telegram_group_session_title(title: &str) -> bool { + let Some(rest) = title.strip_prefix("Telegram: ") else { + return false; + }; + !rest.starts_with("DM ") && title.contains("[chat:") +} + +fn telegram_middle_label(title: &str) -> String { + let body = title + .strip_prefix("Telegram: ") + .unwrap_or(title) + .trim(); + let suffix = crate::brain::agent::service::AgentService::extract_chat_id_suffix(title); + if suffix.is_empty() { + return body.to_string(); + } + body.strip_suffix(suffix) + .unwrap_or(body) + .trim() + .to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn dm_template_format() { + let t = build_session_title(true, "Alice", 123, "", 456); + assert_eq!(t, "Telegram: DM Alice (123) [chat:456]"); + } + + #[test] + fn should_not_clobber_auto_titled_dm() { + let auto = "Telegram: Fix deploy [chat:133526395]"; + let template = build_session_title(true, "Alexey", 133526395, "", 133526395); + assert!(!should_refresh_label(auto, template)); + } + + #[test] + fn should_refresh_group_rename() { + let old = "Telegram: Old Group [chat:-1]"; + let new = "Telegram: New Group [chat:-1]"; + assert!(should_refresh_label(old, new)); + } + + #[test] + fn default_dm_still_refreshes_on_name_change() { + let old = build_session_title(true, "Alice", 1, "", 99); + let new = build_session_title(true, "Bob", 1, "", 99); + assert!(should_refresh_label(&old, &new)); + } + + #[test] + fn chat_bound_wins_over_suffix_candidate() { + let a = uuid::Uuid::new_v4(); + let b = uuid::Uuid::new_v4(); + assert_eq!( + choose_resolve_source(Some(a), false, Some(b)), + ResolveSource::ChatBound + ); + } + + #[test] + fn archived_bound_falls_through_to_suffix() { + let a = uuid::Uuid::new_v4(); + let b = uuid::Uuid::new_v4(); + assert_eq!( + choose_resolve_source(Some(a), true, Some(b)), + ResolveSource::Suffix + ); + } + + #[test] + fn session_idle_expired_within_and_past_window() { + let recent = chrono::Utc::now() - chrono::Duration::minutes(30); + assert!(!session_idle_expired(recent, Some(1.0))); + + let stale = chrono::Utc::now() - chrono::Duration::hours(2); + assert!(session_idle_expired(stale, Some(1.0))); + assert!(!session_idle_expired(stale, None)); + } + + #[test] + fn session_idle_expired_boundary_not_yet_expired() { + let at_limit = chrono::Utc::now() - chrono::Duration::seconds(3600); + assert!(!session_idle_expired(at_limit, Some(1.0))); + let past_limit = chrono::Utc::now() - chrono::Duration::seconds(3601); + assert!(session_idle_expired(past_limit, Some(1.0))); + } +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 65684351..5cf761f2 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -122,6 +122,7 @@ pub mod slash_autocomplete_dimensions_test; pub mod split_pane_test; pub mod subagent_test; pub mod telegram_resume_test; +pub mod telegram_session_resolve_test; pub mod token_tracking_test; pub mod tool_execution_repo_test; pub mod tool_loop_helpers_test; diff --git a/src/tests/telegram_session_resolve_test.rs b/src/tests/telegram_session_resolve_test.rs new file mode 100644 index 00000000..c0956ff6 --- /dev/null +++ b/src/tests/telegram_session_resolve_test.rs @@ -0,0 +1,193 @@ +//! Integration tests for Telegram session title + label drift (issue #121). + +use crate::channels::telegram::session_resolve::{ + build_session_title, chat_id_suffix, choose_resolve_source, session_idle_expired, + should_refresh_label, ResolveSource, +}; +use crate::channels::telegram::TelegramState; +use uuid::Uuid; +use crate::db::Database; +use crate::db::models::Session; +use crate::db::repository::SessionRepository; +use crate::services::{ServiceContext, SessionService}; + +async fn fresh_repo() -> (Database, SessionRepository) { + let db = Database::connect_in_memory() + .await + .expect("in-memory DB connect"); + db.run_migrations().await.expect("migrations"); + let repo = SessionRepository::new(db.pool().clone()); + (db, repo) +} + +#[test] +fn resolve_policy_prefers_chat_bound_over_suffix_winner() { + let bound = Uuid::new_v4(); + let suffix = Uuid::new_v4(); + assert_eq!( + choose_resolve_source(Some(bound), false, Some(suffix)), + ResolveSource::ChatBound + ); +} + +#[tokio::test] +async fn telegram_state_chat_map_survives_suffix_competition() { + let state = TelegramState::new(); + let chat_id = 4242_i64; + let bound = Uuid::new_v4(); + let suffix_winner = Uuid::new_v4(); + state.register_session_chat(bound, chat_id).await; + assert_eq!(state.chat_session(chat_id).await, Some(bound)); + assert_eq!( + choose_resolve_source(state.chat_session(chat_id).await, false, Some(suffix_winner)), + ResolveSource::ChatBound + ); +} + +#[test] +fn should_not_clobber_auto_titled_dm_title() { + let auto = "Telegram: Fix deploy pipeline [chat:133526395]"; + let template = build_session_title(true, "Alexey", 133526395, "", 133526395); + assert!( + !should_refresh_label(auto, &template), + "auto-titled DM must not revert to default template" + ); +} + +#[test] +fn group_rename_still_refreshes() { + let old = "Telegram: Old Group [chat:-5246593256]"; + let new = "Telegram: New Group [chat:-5246593256]"; + assert!(should_refresh_label(old, new)); +} + +#[tokio::test] +async fn suffix_lookup_after_switch_touch_picks_switched_row() { + let (_db, repo) = fresh_repo().await; + let chat_id = 42_i64; + let suffix = chat_id_suffix(chat_id); + let title = build_session_title(true, "U", 1, "", chat_id); + + let older = Session::new(Some(title.clone()), None, None); + repo.create(&older).await.expect("create older"); + + let mut newer = Session::new(Some(title), None, None); + newer.updated_at = older.updated_at + chrono::Duration::seconds(1); + repo.create(&newer).await.expect("create newer"); + + // Simulate /sessions switch to older session (touch updated_at) + let mut switched = older.clone(); + switched.updated_at = newer.updated_at + chrono::Duration::seconds(1); + repo.update(&switched).await.expect("touch older"); + + let hit = repo + .find_by_title_suffix(&suffix) + .await + .expect("query") + .expect("hit"); + assert_eq!(hit.id, older.id); +} + +#[tokio::test] +async fn auto_titled_title_survives_should_refresh_check() { + let template = build_session_title(true, "Alice", 1, "", 99); + let auto_titled = format!( + "Telegram: Deploy fix {}", + chat_id_suffix(99) + ); + assert!(!should_refresh_label(&auto_titled, &template)); +} + +/// Mirrors handler chat-bound idle branch: archive stale bound row, create replacement. +/// Guest /sessions switch only needs register_session_chat (extra_sessions map removed). +#[tokio::test] +async fn register_session_chat_binds_guest_dm() { + let state = TelegramState::new(); + let guest_chat_id = 9988_i64; + let session_id = Uuid::new_v4(); + state.register_session_chat(session_id, guest_chat_id).await; + assert_eq!(state.chat_session(guest_chat_id).await, Some(session_id)); + assert_eq!( + choose_resolve_source(state.chat_session(guest_chat_id).await, false, None), + ResolveSource::ChatBound + ); +} + +#[tokio::test] +async fn archived_chat_map_entry_uses_suffix_not_bound() { + let bound = Uuid::new_v4(); + let suffix = Uuid::new_v4(); + assert_eq!( + choose_resolve_source(Some(bound), true, Some(suffix)), + ResolveSource::Suffix + ); +} + +#[tokio::test] +async fn suffix_path_when_chat_map_empty() { + let suffix = Uuid::new_v4(); + assert_eq!( + choose_resolve_source(None, false, Some(suffix)), + ResolveSource::Suffix + ); + assert_eq!( + choose_resolve_source(None, false, None), + ResolveSource::Create + ); +} + +#[tokio::test] +async fn chat_bound_idle_archives_and_creates_new_session() { + let (db, repo) = fresh_repo().await; + let ctx = ServiceContext::new(db.pool().clone()); + let svc = SessionService::new(ctx.clone()); + let chat_id = 77_i64; + let title = build_session_title(true, "U", 1, "", chat_id); + + let mut bound = Session::new(Some(title.clone()), None, None); + bound.updated_at = chrono::Utc::now() - chrono::Duration::hours(48); + repo.create(&bound).await.expect("create bound"); + assert!(session_idle_expired(bound.updated_at, Some(1.0))); + + repo.archive(bound.id).await.expect("archive idle bound"); + let new_session = svc + .create_session(Some(title)) + .await + .expect("create replacement"); + + assert_ne!(new_session.id, bound.id); + let archived = svc + .get_session(bound.id) + .await + .expect("get") + .expect("row"); + assert!(archived.is_archived()); +} + +#[tokio::test] +async fn service_update_session_title_preserves_suffix() { + let db = Database::connect_in_memory() + .await + .expect("connect"); + db.run_migrations().await.expect("migrations"); + let ctx = ServiceContext::new(db.pool().clone()); + let svc = SessionService::new(ctx); + + let title = build_session_title(true, "U", 1, "", 77); + let session = svc + .create_session(Some(title.clone())) + .await + .expect("create"); + + let new_title = format!("Telegram: Custom topic {}", chat_id_suffix(77)); + svc.update_session_title(session.id, Some(new_title.clone())) + .await + .expect("rename"); + + let loaded = svc.get_session(session.id).await.expect("get").expect("row"); + assert_eq!(loaded.title.as_deref(), Some(new_title.as_str())); + assert!( + loaded.title.as_ref().unwrap().ends_with("[chat:77]"), + "suffix must remain for lookup" + ); +}