From dcaa9ff2d24d48fdc608b4f4df05a0aa78faa529 Mon Sep 17 00:00:00 2001 From: anais Date: Wed, 17 Jun 2026 15:31:22 -0700 Subject: [PATCH 1/3] improve app-server thread list and resume rpc paths --- codex-rs/app-server/src/request_processors.rs | 17 +- .../request_processors/thread_lifecycle.rs | 20 +- .../request_processors/thread_processor.rs | 252 ++++++++++------ .../request_processors/token_usage_replay.rs | 56 ++-- codex-rs/app-server/src/thread_status.rs | 16 + codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/session/mod.rs | 37 +++ codex-rs/core/src/session/session.rs | 32 +- codex-rs/core/src/session/tests.rs | 3 + .../core/src/session/tests/guardian_tests.rs | 1 + codex-rs/core/src/thread_manager.rs | 52 ++++ codex-rs/rollout/src/list.rs | 177 ++++++++--- codex-rs/rollout/src/recorder.rs | 275 +++++++++++++----- codex-rs/rollout/src/recorder_tests.rs | 2 + codex-rs/rollout/src/session_index.rs | 43 +-- codex-rs/rollout/src/state_db.rs | 140 +++++++-- codex-rs/rollout/src/tests.rs | 11 + .../0040_thread_spawn_edges_parent_child.sql | 2 + codex-rs/state/src/lib.rs | 2 + codex-rs/state/src/model/mod.rs | 2 + codex-rs/state/src/model/thread_metadata.rs | 50 ++++ codex-rs/state/src/runtime.rs | 2 + codex-rs/state/src/runtime/threads.rs | 176 ++++++++++- codex-rs/thread-store/src/live_thread.rs | 34 ++- codex-rs/thread-store/src/local/helpers.rs | 107 ++++++- .../thread-store/src/local/list_threads.rs | 119 ++++++-- codex-rs/thread-store/src/local/mod.rs | 8 + .../thread-store/src/local/read_thread.rs | 116 +++++--- .../src/local/update_thread_metadata.rs | 2 + .../thread-store/src/thread_metadata_sync.rs | 55 +++- codex-rs/thread-store/src/types.rs | 3 + 31 files changed, 1451 insertions(+), 362 deletions(-) create mode 100644 codex-rs/state/migrations/0040_thread_spawn_edges_parent_child.sql diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 8c202a806d02..1a5f570ef0e5 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -530,8 +530,11 @@ use crate::thread_state::ConnectionCapabilities; use crate::thread_state::ThreadListenerCommand; use crate::thread_state::ThreadState; use crate::thread_state::ThreadStateManager; +use token_usage_replay::TokenUsageTurnOwner; +use token_usage_replay::latest_token_usage_turn_id_from_owner; use token_usage_replay::latest_token_usage_turn_id_from_rollout_items; use token_usage_replay::send_thread_token_usage_update_to_connection; +use token_usage_replay::token_usage_turn_owner_for_rollout_item; fn resolve_request_cwd(cwd: Option) -> Result, JSONRPCErrorError> { cwd.map(|cwd| { @@ -606,11 +609,23 @@ pub(crate) use self::thread_summary::thread_settings_from_config_snapshot; pub(crate) use self::thread_summary::thread_settings_from_core_snapshot; pub(crate) fn build_api_turns_from_rollout_items(items: &[RolloutItem]) -> Vec { + build_api_turns_from_rollout_items_with_token_usage_owner(items).0 +} + +fn build_api_turns_from_rollout_items_with_token_usage_owner( + items: &[RolloutItem], +) -> (Vec, Option) { let mut builder = ThreadHistoryBuilder::new(); + let mut token_usage_turn_owner = None; + for item in items { + if let Some(owner) = token_usage_turn_owner_for_rollout_item(&builder, item) { + token_usage_turn_owner = Some(owner); + } if is_persisted_rollout_item(item) { builder.handle_rollout_item(item); } } - builder.finish() + + (builder.finish(), token_usage_turn_owner) } diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index fd9e93e1894c..5237fa770daf 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -558,13 +558,15 @@ pub(super) async fn handle_pending_thread_resume_request( let request_id = pending.request_id; let connection_id = request_id.connection_id; let mut thread = pending.thread_summary; - if pending.include_turns { + let token_usage_turn_id = if pending.include_turns { populate_thread_turns_from_history( &mut thread, &pending.history_items, active_turn.as_ref(), - ); - } + ) + } else { + None + }; let thread_status = thread_watch_manager .loaded_status_for_thread(&thread.id) @@ -667,10 +669,6 @@ pub(super) async fn handle_pending_thread_resume_request( // Match cold resume: metadata-only resume should attach the listener without // paying the cost of turn reconstruction for historical usage replay. if let Some(token_usage_thread) = token_usage_thread { - let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( - &pending.history_items, - token_usage_thread.turns.as_slice(), - ); // Rejoining a loaded thread has the same UI contract as a cold resume, but // uses the live conversation state instead of reconstructing a new session. send_thread_token_usage_update_to_connection( @@ -742,12 +740,16 @@ pub(crate) fn populate_thread_turns_from_history( thread: &mut Thread, items: &[RolloutItem], active_turn: Option<&Turn>, -) { - let mut turns = build_api_turns_from_rollout_items(items); +) -> Option { + let (mut turns, token_usage_turn_owner) = + build_api_turns_from_rollout_items_with_token_usage_owner(items); if let Some(active_turn) = active_turn { merge_turn_history_with_active_turn(&mut turns, active_turn.clone()); } + let token_usage_turn_id = + latest_token_usage_turn_id_from_owner(token_usage_turn_owner, turns.as_slice()); thread.turns = turns; + token_usage_turn_id } pub(super) async fn resolve_pending_server_request( diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 63883cbdfe5c..14e0323e413f 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -159,6 +159,26 @@ fn merge_persisted_resume_metadata( } } +fn merge_stored_thread_resume_metadata( + request_overrides: &mut Option>, + typesafe_overrides: &mut ConfigOverrides, + stored_thread: &StoredThread, +) { + if has_model_resume_override(request_overrides.as_ref(), typesafe_overrides) { + return; + } + + typesafe_overrides.model = stored_thread.model.clone(); + typesafe_overrides.model_provider = Some(stored_thread.model_provider.clone()); + + if let Some(reasoning_effort) = stored_thread.reasoning_effort.as_ref() { + request_overrides.get_or_insert_with(HashMap::new).insert( + "model_reasoning_effort".to_string(), + serde_json::Value::String(reasoning_effort.to_string()), + ); + } +} + fn normalize_thread_list_cwd_filters( cwd: Option, ) -> Result>, JSONRPCErrorError> { @@ -172,7 +192,7 @@ fn normalize_thread_list_cwd_filters( }; let mut normalized_cwds = Vec::with_capacity(cwds.len()); for cwd in cwds { - let cwd = AbsolutePathBuf::relative_to_current_dir(cwd.as_str()) + let cwd = AbsolutePathBuf::from_absolute_path(cwd.as_str()) .map(AbsolutePathBuf::into_path_buf) .map_err(|err| { invalid_params(format!("invalid thread/list cwd filter `{cwd}`: {err}")) @@ -372,6 +392,40 @@ enum RunningThreadResumeResult { NotRunning(Option>), } +enum ResumeResponseHistory { + Resumed { + conversation_id: ThreadId, + items: Option>, + }, + Forked(Vec), + Missing, +} + +impl ResumeResponseHistory { + fn from_initial_history(history: &InitialHistory, keep_resumed_items: bool) -> Self { + match history { + InitialHistory::Resumed(resumed) => Self::Resumed { + conversation_id: resumed.conversation_id, + items: keep_resumed_items.then(|| resumed.history.clone()), + }, + InitialHistory::Forked(items) => Self::Forked(items.clone()), + InitialHistory::New | InitialHistory::Cleared => Self::Missing, + } + } + + fn is_resumed(&self) -> bool { + matches!(self, Self::Resumed { .. }) + } + + fn rollout_items(&self) -> Option<&[RolloutItem]> { + match self { + Self::Resumed { items, .. } => items.as_deref(), + Self::Forked(items) => Some(items.as_slice()), + Self::Missing => None, + } + } +} + impl ThreadRequestProcessor { #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -1936,28 +1990,25 @@ impl ThreadRequestProcessor { let backwards_cursor = stored_threads.first().and_then(|thread| { thread_backwards_cursor_for_sort_key(thread, store_sort_key, sort_direction) }); - let mut threads = Vec::with_capacity(stored_threads.len()); - let mut status_ids = Vec::with_capacity(stored_threads.len()); + let status_ids = stored_threads + .iter() + .map(|thread| thread.thread_id.to_string()) + .collect::>(); let fallback_provider = self.config.model_provider_id.clone(); - for stored_thread in stored_threads { - let (thread, _) = thread_from_stored_thread( - stored_thread, - fallback_provider.as_str(), - &self.config.cwd, - ); - status_ids.push(thread.id.clone()); - threads.push(thread); - } - let statuses = self .thread_watch_manager - .loaded_statuses_for_threads(status_ids) + .loaded_status_overrides_for_threads(status_ids) .await; - let data: Vec<_> = threads + let data: Vec<_> = stored_threads .into_iter() - .map(|mut thread| { + .map(|stored_thread| { + let (mut thread, _) = thread_from_stored_thread( + stored_thread, + fallback_provider.as_str(), + &self.config.cwd, + ); if let Some(status) = statuses.get(&thread.id) { thread.status = status.clone(); } @@ -2078,7 +2129,7 @@ impl ThreadRequestProcessor { } let statuses = self .thread_watch_manager - .loaded_statuses_for_threads(status_ids) + .loaded_status_overrides_for_threads(status_ids) .await; let data = results .into_iter() @@ -2599,9 +2650,10 @@ impl ThreadRequestProcessor { .await .map(|thread_history| (thread_history, None)) } else if let Some(stored_thread) = stored_thread_from_running_probe { - self.stored_thread_to_initial_history(&stored_thread) + let mut stored_thread = *stored_thread; + self.stored_thread_to_initial_history(&mut stored_thread) .await - .map(|thread_history| (thread_history, Some(*stored_thread))) + .map(|thread_history| (thread_history, Some(stored_thread))) } else { self.resume_thread_from_rollout(&thread_id, path.as_ref()) .await @@ -2631,12 +2683,23 @@ impl ThreadRequestProcessor { developer_instructions, personality, ); - self.load_and_apply_persisted_resume_metadata( - &thread_history, - &mut request_overrides, - &mut typesafe_overrides, - ) - .await; + if let Some(stored_thread) = resume_source_thread + .as_ref() + .filter(|stored_thread| stored_thread.model.is_some()) + { + merge_stored_thread_resume_metadata( + &mut request_overrides, + &mut typesafe_overrides, + stored_thread, + ); + } else { + self.load_and_apply_persisted_resume_metadata( + &thread_history, + &mut request_overrides, + &mut typesafe_overrides, + ) + .await; + } // Derive a Config using the same logic as new conversation, honoring overrides if provided. let config = match self @@ -2652,11 +2715,14 @@ impl ThreadRequestProcessor { } }; - let response_history = thread_history.clone(); + let response_history = ResumeResponseHistory::from_initial_history( + &thread_history, + include_turns || initial_turns_page.is_some(), + ); match self .thread_manager - .resume_thread_with_history( + .resume_thread_with_history_without_initial_messages( config, thread_history, self.auth_manager.clone(), @@ -2702,7 +2768,7 @@ impl ThreadRequestProcessor { "thread", ); - let mut thread = match self + let (mut thread, token_usage_turn_id) = match self .load_thread_from_resume_source_or_send_internal( thread_id, codex_thread.as_ref(), @@ -2713,7 +2779,7 @@ impl ThreadRequestProcessor { ) .await { - Ok(thread) => thread, + Ok((thread, token_usage_turn_id)) => (thread, token_usage_turn_id), Err(message) => { self.outgoing .send_error(request_id, internal_error(message)) @@ -2751,8 +2817,17 @@ impl ThreadRequestProcessor { ); let token_usage_thread = include_turns.then(|| thread.clone()); let mut initial_turns_page = if let Some(params) = initial_turns_page.as_ref() { + let Some(history_items) = response_history.rollout_items() else { + self.outgoing + .send_error( + request_id, + internal_error("resume history missing for initial turns page"), + ) + .await; + return Ok(()); + }; match build_thread_resume_initial_turns_page( - &response_history.get_rollout_items(), + history_items, thread.status.clone(), /*has_live_running_thread*/ false, /*active_turn*/ None, @@ -2795,10 +2870,6 @@ impl ThreadRequestProcessor { // `excludeTurns` is explicitly the cheap resume path, so avoid // rebuilding history only to attribute a replayed usage update. if let Some(token_usage_thread) = token_usage_thread { - let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( - &response_history.get_rollout_items(), - token_usage_thread.turns.as_slice(), - ); // The client needs restored usage before it starts another turn. // Sending after the response preserves JSON-RPC request ordering while // still filling the status line before the next turn lifecycle begins. @@ -2851,6 +2922,8 @@ impl ThreadRequestProcessor { app_server_client_name: Option, app_server_client_version: Option, ) -> Result { + let include_turns = !params.exclude_turns; + let needs_persisted_history = include_turns || params.initial_turns_page.is_some(); let running_thread = if params.history.is_some() { if let Ok(existing_thread_id) = ThreadId::from_string(¶ms.thread_id) && self @@ -2871,7 +2944,7 @@ impl ThreadRequestProcessor { .read_stored_thread_for_resume( ¶ms.thread_id, /*path*/ None, - /*include_history*/ true, + needs_persisted_history, ) .await?; Some((existing_thread_id, existing_thread, source_thread)) @@ -2954,15 +3027,20 @@ impl ThreadRequestProcessor { } let redact_resume_payloads = should_redact_thread_resume_payloads(app_server_client_name.as_deref()); - let history_items = source_thread - .history - .as_ref() - .map(|history| history.items.clone()) - .ok_or_else(|| { - internal_error(format!( - "thread {existing_thread_id} did not include persisted history" - )) - })?; + let mut summary_source_thread = source_thread; + let history_items = if needs_persisted_history { + summary_source_thread + .history + .take() + .map(|history| history.items) + .ok_or_else(|| { + internal_error(format!( + "thread {existing_thread_id} did not include persisted history" + )) + })? + } else { + Vec::new() + }; let thread_state = self .thread_state_manager @@ -2981,8 +3059,6 @@ impl ThreadRequestProcessor { ) .await?; - let mut summary_source_thread = source_thread; - summary_source_thread.history = None; let mut thread_summary = self.stored_thread_to_api_thread( summary_source_thread, config_snapshot.model_provider_id.as_str(), @@ -3015,7 +3091,7 @@ impl ThreadRequestProcessor { thread_summary, emit_thread_goal_update, thread_goal_state_db, - include_turns: !params.exclude_turns, + include_turns, initial_turns_page: params.initial_turns_page.clone(), redact_resume_payloads, }), @@ -3053,11 +3129,11 @@ impl ThreadRequestProcessor { thread_id: &str, path: Option<&PathBuf>, ) -> Result<(InitialHistory, StoredThread), JSONRPCErrorError> { - let stored_thread = self + let mut stored_thread = self .read_stored_thread_for_resume(thread_id, path, /*include_history*/ true) .await?; let history = self - .stored_thread_to_initial_history(&stored_thread) + .stored_thread_to_initial_history(&mut stored_thread) .await?; Ok((history, stored_thread)) } @@ -3105,21 +3181,17 @@ impl ThreadRequestProcessor { #[tracing::instrument(level = "trace", skip_all)] async fn stored_thread_to_initial_history( &self, - stored_thread: &StoredThread, + stored_thread: &mut StoredThread, ) -> Result { let thread_id = stored_thread.thread_id; - let history = stored_thread - .history - .as_ref() - .map(|history| history.items.clone()) - .ok_or_else(|| { - internal_error(format!( - "thread {thread_id} did not include persisted history" - )) - })?; + let history = stored_thread.history.take().ok_or_else(|| { + internal_error(format!( + "thread {thread_id} did not include persisted history" + )) + })?; Ok(InitialHistory::Resumed(ResumedHistory { conversation_id: thread_id, - history, + history: history.items, rollout_path: stored_thread.rollout_path.clone(), })) } @@ -3161,15 +3233,18 @@ impl ThreadRequestProcessor { &self, thread_id: ThreadId, thread: &CodexThread, - thread_history: &InitialHistory, + thread_history: &ResumeResponseHistory, rollout_path: &Path, resume_source_thread: Option, include_turns: bool, - ) -> std::result::Result { + ) -> std::result::Result<(Thread, Option), String> { let config_snapshot = thread.config_snapshot().await; let session_id = thread.session_configured().session_id.to_string(); + let needs_thread_name_lookup = !thread_history.is_resumed(); let thread = match thread_history { - InitialHistory::Resumed(resumed) => { + ResumeResponseHistory::Resumed { + conversation_id, .. + } => { let fallback_provider = config_snapshot.model_provider_id.as_str(); if let Some(stored_thread) = resume_source_thread { let stored_thread = @@ -3208,7 +3283,7 @@ impl ThreadRequestProcessor { match self .thread_store .read_thread(StoreReadThreadParams { - thread_id: resumed.conversation_id, + thread_id: *conversation_id, include_archived: true, include_history: false, }) @@ -3226,7 +3301,7 @@ impl ThreadRequestProcessor { } } } - InitialHistory::Forked(items) => { + ResumeResponseHistory::Forked(items) => { let mut thread = build_thread_from_snapshot( thread_id, session_id.clone(), @@ -3236,7 +3311,7 @@ impl ThreadRequestProcessor { thread.preview = preview_from_rollout_items(items); Ok(thread) } - InitialHistory::New | InitialHistory::Cleared => Err(format!( + ResumeResponseHistory::Missing => Err(format!( "failed to build resume response for thread {thread_id}: initial history missing" )), }; @@ -3244,16 +3319,24 @@ impl ThreadRequestProcessor { thread.id = thread_id.to_string(); thread.session_id = session_id; thread.path = Some(rollout_path.to_path_buf()); - if include_turns { - let history_items = thread_history.get_rollout_items(); + let token_usage_turn_id = if include_turns { + let Some(history_items) = thread_history.rollout_items() else { + return Err(format!( + "failed to build resume response for thread {thread_id}: history items missing" + )); + }; populate_thread_turns_from_history( &mut thread, - &history_items, + history_items, /*active_turn*/ None, - ); + ) + } else { + None + }; + if needs_thread_name_lookup { + self.attach_thread_name(thread_id, &mut thread).await; } - self.attach_thread_name(thread_id, &mut thread).await; - Ok(thread) + Ok((thread, token_usage_turn_id)) } async fn attach_thread_name(&self, thread_id: ThreadId, thread: &mut Thread) { @@ -3627,6 +3710,7 @@ impl ThreadRequestProcessor { SortDirection::Asc => StoreSortDirection::Asc, SortDirection::Desc => StoreSortDirection::Desc, }; + let store_applies_cwd_filters = self.thread_store.as_any().is::(); while remaining > 0 { let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); @@ -3658,11 +3742,12 @@ impl ThreadRequestProcessor { if source_kind_filter .as_ref() .is_none_or(|filter| source_kind_matches(&source, filter)) - && cwd_filters.as_ref().is_none_or(|expected_cwds| { - expected_cwds.iter().any(|expected_cwd| { - path_utils::paths_match_after_normalization(&it.cwd, expected_cwd) - }) - }) + && (store_applies_cwd_filters + || cwd_filters.as_ref().is_none_or(|expected_cwds| { + expected_cwds.iter().any(|expected_cwd| { + path_utils::paths_match_after_normalization(&it.cwd, expected_cwd) + }) + })) { filtered.push(it); if filtered.len() >= remaining { @@ -4134,13 +4219,12 @@ pub(crate) fn thread_from_stored_thread( branch: info.branch, origin_url: info.repository_url, }); - let cwd = AbsolutePathBuf::relative_to_current_dir(path_utils::normalize_for_native_workdir( - thread.cwd, - )) - .unwrap_or_else(|err| { - warn!("failed to normalize thread cwd while reading stored thread: {err}"); - fallback_cwd.clone() - }); + let cwd = + AbsolutePathBuf::from_absolute_path(path_utils::normalize_for_native_workdir(thread.cwd)) + .unwrap_or_else(|err| { + warn!("failed to normalize thread cwd while reading stored thread: {err}"); + fallback_cwd.clone() + }); let source = with_thread_spawn_agent_metadata( thread.source, thread.agent_nickname.clone(), diff --git a/codex-rs/app-server/src/request_processors/token_usage_replay.rs b/codex-rs/app-server/src/request_processors/token_usage_replay.rs index d4d2228d4b44..579fb7398b22 100644 --- a/codex-rs/app-server/src/request_processors/token_usage_replay.rs +++ b/codex-rs/app-server/src/request_processors/token_usage_replay.rs @@ -13,7 +13,6 @@ use std::sync::Arc; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::Thread; -use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::ThreadTokenUsage; use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification; use codex_app_server_protocol::Turn; @@ -61,40 +60,57 @@ pub(super) async fn send_thread_token_usage_update_to_connection( /// /// The id is preferred when it still appears in the rebuilt thread. The position is a /// fallback for histories whose implicit turn ids are regenerated during reconstruction. -struct TokenUsageTurnOwner { +pub(super) struct TokenUsageTurnOwner { id: String, position: Option, } +pub(super) fn token_usage_turn_owner_for_rollout_item( + builder: &codex_app_server_protocol::ThreadHistoryBuilder, + item: &RolloutItem, +) -> Option { + if !matches!(item, RolloutItem::EventMsg(EventMsg::TokenCount(_))) { + return None; + } + + builder + .active_turn_snapshot() + .map(|turn| TokenUsageTurnOwner { + id: turn.id, + position: builder.active_turn_position(), + }) +} + +pub(super) fn latest_token_usage_turn_id_from_owner( + owner: Option, + turns: &[Turn], +) -> Option { + let owner = owner?; + if turns.iter().any(|turn| turn.id == owner.id) { + Some(owner.id) + } else { + owner + .position + .and_then(|position| turns.get(position)) + .map(|turn| turn.id.clone()) + } +} + pub(super) fn latest_token_usage_turn_id_from_rollout_items( rollout_items: &[RolloutItem], turns: &[Turn], ) -> Option { - let mut builder = ThreadHistoryBuilder::new(); + let mut builder = codex_app_server_protocol::ThreadHistoryBuilder::new(); let mut token_usage_turn_owner = None; for item in rollout_items { - if matches!(item, RolloutItem::EventMsg(EventMsg::TokenCount(_))) { - token_usage_turn_owner = - builder - .active_turn_snapshot() - .map(|turn| TokenUsageTurnOwner { - id: turn.id, - position: builder.active_turn_position(), - }); + if let Some(owner) = token_usage_turn_owner_for_rollout_item(&builder, item) { + token_usage_turn_owner = Some(owner); } builder.handle_rollout_item(item); } - let owner = token_usage_turn_owner?; - if turns.iter().any(|turn| turn.id == owner.id) { - Some(owner.id) - } else { - owner - .position - .and_then(|position| turns.get(position)) - .map(|turn| turn.id.clone()) - } + latest_token_usage_turn_id_from_owner(token_usage_turn_owner, turns) } /// Chooses a fallback turn id that should own a replayed token usage update. diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index a5997a58f5d7..f73cbd7862d4 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -113,6 +113,7 @@ impl ThreadWatchManager { self.state.lock().await.loaded_status_for_thread(thread_id) } + #[cfg(test)] pub(crate) async fn loaded_statuses_for_threads( &self, thread_ids: Vec, @@ -127,6 +128,21 @@ impl ThreadWatchManager { .collect() } + pub(crate) async fn loaded_status_overrides_for_threads( + &self, + thread_ids: Vec, + ) -> HashMap { + let state = self.state.lock().await; + thread_ids + .into_iter() + .filter_map(|thread_id| { + state.status_for(&thread_id).and_then(|status| { + (!matches!(status, ThreadStatus::NotLoaded)).then_some((thread_id, status)) + }) + }) + .collect() + } + #[cfg(test)] pub(crate) async fn running_turn_count(&self) -> usize { self.state diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index dab902ec3741..37ad218dcbc6 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -99,6 +99,7 @@ pub(crate) async fn run_codex_thread_interactive( mcp_manager: Arc::clone(&parent_session.services.mcp_manager), extensions: Arc::clone(&parent_session.services.extensions), conversation_history, + include_initial_messages: true, session_source: SessionSource::SubAgent(subagent_source.clone()), forked_from_thread_id, parent_thread_id: Some(parent_session.thread_id), diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index b55c4b11d771..5c47747d9167 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -419,6 +419,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) mcp_manager: Arc, pub(crate) extensions: Arc>, pub(crate) conversation_history: InitialHistory, + pub(crate) include_initial_messages: bool, pub(crate) session_source: SessionSource, pub(crate) forked_from_thread_id: Option, pub(crate) parent_thread_id: Option, @@ -507,6 +508,7 @@ impl Codex { mcp_manager, extensions, conversation_history, + include_initial_messages, session_source, forked_from_thread_id, parent_thread_id, @@ -659,6 +661,7 @@ impl Codex { tx_event.clone(), agent_status_tx.clone(), conversation_history, + include_initial_messages, session_source_clone, skills_service, plugins_manager, @@ -896,8 +899,13 @@ pub(crate) fn session_loop_termination_from_handle( async fn thread_title_from_thread_store( live_thread: Option<&LiveThread>, thread_store: &Arc, + state_db_ctx: Option<&codex_state::StateRuntime>, conversation_id: ThreadId, ) -> Option { + if let Some(title) = thread_title_from_state_db(state_db_ctx, conversation_id).await { + return Some(title); + } + let thread = match live_thread { Some(live_thread) => { live_thread @@ -941,6 +949,35 @@ fn push_prompt_fragment( } } +async fn thread_title_from_state_db( + state_db_ctx: Option<&codex_state::StateRuntime>, + conversation_id: ThreadId, +) -> Option { + let metadata = state_db_ctx? + .get_thread(conversation_id) + .await + .ok() + .flatten()?; + distinct_metadata_title( + metadata.title.as_str(), + metadata.preview.as_deref(), + metadata.first_user_message.as_deref(), + ) +} + +fn distinct_metadata_title( + title: &str, + preview: Option<&str>, + first_user_message: Option<&str>, +) -> Option { + let title = title.trim(); + if title.is_empty() || first_user_message.map(str::trim) == Some(title) { + return None; + } + let preview = preview.or(first_user_message).unwrap_or_default().trim(); + (preview != title).then(|| title.to_string()) +} + impl Session { pub(crate) async fn app_server_client_metadata(&self) -> AppServerClientMetadata { let state = self.state.lock().await; diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index af2d1abeb89f..393ec22d4c86 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -478,6 +478,7 @@ impl Session { tx_event: Sender, agent_status: watch::Sender, initial_history: InitialHistory, + include_initial_messages: bool, session_source: SessionSource, skills_service: Arc, plugins_manager: Arc, @@ -562,10 +563,14 @@ impl Session { LiveThread::create(Arc::clone(&thread_store), params).await? } InitialHistory::Resumed(resumed_history) => { + let defer_metadata_history_load = + thread_store.as_any().is::(); let params = ResumeThreadParams { thread_id: resumed_history.conversation_id, rollout_path: resumed_history.rollout_path.clone(), - history: Some(resumed_history.history.clone()), + history: (!defer_metadata_history_load) + .then(|| resumed_history.history.clone()), + defer_metadata_history_load, include_archived: true, metadata: ThreadPersistenceMetadata { cwd: Some(config.cwd.to_path_buf()), @@ -852,12 +857,17 @@ impl Session { ); } let thread_name = - thread_title_from_thread_store(live_thread_init.as_ref(), &thread_store, thread_id) - .instrument(info_span!( - "session_init.thread_name_lookup", - otel.name = "session_init.thread_name_lookup", - )) - .await; + thread_title_from_thread_store( + live_thread_init.as_ref(), + &thread_store, + state_db_ctx.as_deref(), + thread_id, + ) + .instrument(info_span!( + "session_init.thread_name_lookup", + otel.name = "session_init.thread_name_lookup", + )) + .await; session_configuration.thread_name = thread_name.clone(); validate_config_lock_if_configured(&session_configuration).await?; export_config_lock_if_configured(&session_configuration, thread_id).await?; @@ -1075,8 +1085,12 @@ impl Session { *guard = Arc::downgrade(&sess); } // Dispatch the SessionConfiguredEvent first and then report any errors. - // If resuming, include converted initial messages in the payload so UIs can render them immediately. - let initial_messages = initial_history.get_event_msgs(); + // If requested for a resumed session, include converted initial messages in the payload so UIs can render them immediately. + let initial_messages = if include_initial_messages { + initial_history.get_event_msgs() + } else { + None + }; let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 0b1cc045c44a..4597d9c4b65c 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -4850,6 +4850,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() { tx_event, agent_status_tx, InitialHistory::New, + /*include_initial_messages*/ true, SessionSource::Exec, skills_service, plugins_manager, @@ -5201,6 +5202,7 @@ async fn make_session_with_config_and_rx( tx_event, agent_status_tx, InitialHistory::New, + /*include_initial_messages*/ true, SessionSource::Exec, skills_service, plugins_manager, @@ -5307,6 +5309,7 @@ async fn make_session_with_history_source_and_agent_control_and_rx( tx_event, agent_status_tx, initial_history, + /*include_initial_messages*/ true, session_source, skills_service, plugins_manager, diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index 5de331ee1fb1..a18e9c1b3302 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -716,6 +716,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { mcp_manager, extensions: codex_extension_api::empty_extension_registry(), conversation_history: InitialHistory::New, + include_initial_messages: true, session_source: SessionSource::SubAgent(SubAgentSource::Other( GUARDIAN_REVIEWER_NAME.to_string(), )), diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 814c8fe764d4..9a5275eddf55 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -652,6 +652,7 @@ impl ThreadManager { options.environments, options.thread_extension_init, options.supports_openai_form_elicitation, + /*include_initial_messages*/ true, /*user_shell_override*/ None, )) .await @@ -721,6 +722,46 @@ impl ThreadManager { auth_manager: Arc, parent_trace: Option, supports_openai_form_elicitation: bool, + ) -> CodexResult { + self.resume_thread_with_history_inner( + config, + initial_history, + auth_manager, + parent_trace, + supports_openai_form_elicitation, + /*include_initial_messages*/ true, + ) + .await + } + + #[instrument(level = "trace", skip_all)] + pub async fn resume_thread_with_history_without_initial_messages( + &self, + config: Config, + initial_history: InitialHistory, + auth_manager: Arc, + parent_trace: Option, + supports_openai_form_elicitation: bool, + ) -> CodexResult { + self.resume_thread_with_history_inner( + config, + initial_history, + auth_manager, + parent_trace, + supports_openai_form_elicitation, + /*include_initial_messages*/ false, + ) + .await + } + + async fn resume_thread_with_history_inner( + &self, + config: Config, + initial_history: InitialHistory, + auth_manager: Arc, + parent_trace: Option, + supports_openai_form_elicitation: bool, + include_initial_messages: bool, ) -> CodexResult { let agent_control = self.agent_control_for_config(&config); let environments = default_thread_environment_selections( @@ -747,6 +788,7 @@ impl ThreadManager { environments, /*thread_extension_init*/ ExtensionDataInit::default(), supports_openai_form_elicitation, + include_initial_messages, /*user_shell_override*/ None, )) .await @@ -777,6 +819,7 @@ impl ThreadManager { environments, /*thread_extension_init*/ ExtensionDataInit::default(), supports_openai_form_elicitation, + /*include_initial_messages*/ true, /*user_shell_override*/ Some(user_shell_override), )) .await @@ -816,6 +859,7 @@ impl ThreadManager { environments, /*thread_extension_init*/ ExtensionDataInit::default(), supports_openai_form_elicitation, + /*include_initial_messages*/ true, /*user_shell_override*/ Some(user_shell_override), )) .await @@ -997,6 +1041,7 @@ impl ThreadManager { environments, /*thread_extension_init*/ ExtensionDataInit::default(), supports_openai_form_elicitation, + /*include_initial_messages*/ true, /*user_shell_override*/ None, )) .await @@ -1258,6 +1303,7 @@ impl ThreadManagerState { environments, /*thread_extension_init*/ ExtensionDataInit::default(), /*supports_openai_form_elicitation*/ false, + /*include_initial_messages*/ true, /*user_shell_override*/ None, )) .await @@ -1296,6 +1342,7 @@ impl ThreadManagerState { environments, /*thread_extension_init*/ ExtensionDataInit::default(), /*supports_openai_form_elicitation*/ false, + /*include_initial_messages*/ true, /*user_shell_override*/ None, )) .await @@ -1335,6 +1382,7 @@ impl ThreadManagerState { environments, /*thread_extension_init*/ ExtensionDataInit::default(), /*supports_openai_form_elicitation*/ false, + /*include_initial_messages*/ true, /*user_shell_override*/ None, )) .await @@ -1357,6 +1405,7 @@ impl ThreadManagerState { environments: Vec, thread_extension_init: ExtensionDataInit, supports_openai_form_elicitation: bool, + include_initial_messages: bool, user_shell_override: Option, ) -> CodexResult { Box::pin(self.spawn_thread_with_source( @@ -1376,6 +1425,7 @@ impl ThreadManagerState { environments, thread_extension_init, supports_openai_form_elicitation, + include_initial_messages, user_shell_override, )) .await @@ -1400,6 +1450,7 @@ impl ThreadManagerState { environments: Vec, thread_extension_init: ExtensionDataInit, supports_openai_form_elicitation: bool, + include_initial_messages: bool, user_shell_override: Option, ) -> CodexResult { let is_resumed_thread = matches!(&initial_history, InitialHistory::Resumed(_)); @@ -1453,6 +1504,7 @@ impl ThreadManagerState { mcp_manager: Arc::clone(&self.mcp_manager), extensions: Arc::clone(&self.extensions), conversation_history: initial_history, + include_initial_messages, session_source, forked_from_thread_id, parent_thread_id, diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index f5013e7aaec7..de720c87da52 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -52,6 +52,8 @@ pub struct ThreadItem { pub first_user_message: Option, /// Best available user-facing preview for discovery and list display. pub preview: Option, + /// User-facing title from persisted metadata, when available. + pub title: Option, /// Working directory from session metadata. pub cwd: Option, /// Git branch from session metadata. @@ -94,6 +96,7 @@ struct HeadTailSummary { thread_id: Option, first_user_message: Option, preview: Option, + title: Option, cwd: Option, git_branch: Option, git_sha: Option, @@ -266,30 +269,6 @@ impl<'a> RolloutFileVisitor for FilesByCreatedAtVisitor<'a> { } } -/// Collects lightweight file candidates (path + id + mtime). -/// Sorting after mtime happens after all files are collected. -struct FilesByUpdatedAtVisitor<'a> { - candidates: &'a mut Vec, -} - -impl<'a> RolloutFileVisitor for FilesByUpdatedAtVisitor<'a> { - async fn visit( - &mut self, - _ts: OffsetDateTime, - id: Uuid, - path: PathBuf, - _scanned: usize, - ) -> ControlFlow<()> { - let updated_at = file_modified_time(&path).await.unwrap_or(None); - self.candidates.push(ThreadCandidate { - path, - id, - updated_at, - }); - ControlFlow::Continue(()) - } -} - impl serde::Serialize for Cursor { fn serialize(&self, serializer: S) -> Result where @@ -801,6 +780,7 @@ async fn build_thread_item( thread_id, first_user_message, preview, + title, cwd, git_branch, git_sha, @@ -823,6 +803,7 @@ async fn build_thread_item( thread_id, first_user_message, preview, + title, cwd, git_branch, git_sha, @@ -981,49 +962,97 @@ async fn collect_files_by_updated_at( root: &Path, scanned_files: &mut usize, ) -> io::Result> { - let mut candidates = Vec::new(); - let mut visitor = FilesByUpdatedAtVisitor { - candidates: &mut candidates, - }; - walk_rollout_files(root, scanned_files, &mut visitor).await?; - + let root = root.to_path_buf(); + let initial_scanned_files = *scanned_files; + let (candidates, scanned) = tokio::task::spawn_blocking(move || { + let mut scanned_files = initial_scanned_files; + let candidates = collect_files_by_updated_at_blocking(root.as_path(), &mut scanned_files)?; + Ok::<_, io::Error>((candidates, scanned_files)) + }) + .await + .map_err(io::Error::other)??; + *scanned_files = scanned; Ok(candidates) } async fn collect_flat_files_by_updated_at( root: &Path, scanned_files: &mut usize, +) -> io::Result> { + let root = root.to_path_buf(); + let initial_scanned_files = *scanned_files; + let (candidates, scanned) = tokio::task::spawn_blocking(move || { + let mut scanned_files = initial_scanned_files; + let candidates = + collect_flat_files_by_updated_at_blocking(root.as_path(), &mut scanned_files)?; + Ok::<_, io::Error>((candidates, scanned_files)) + }) + .await + .map_err(io::Error::other)??; + *scanned_files = scanned; + Ok(candidates) +} + +fn collect_files_by_updated_at_blocking( + root: &Path, + scanned_files: &mut usize, ) -> io::Result> { let mut candidates = Vec::new(); - let mut dir = tokio::fs::read_dir(root).await?; - while let Some(entry) = dir.next_entry().await? { + let year_dirs = collect_dirs_desc_blocking(root, |s| s.parse::().ok())?; + + 'outer: for (_year, year_path) in year_dirs.iter() { if *scanned_files >= MAX_SCAN_FILES { break; } - if !entry - .file_type() - .await - .map(|ft| ft.is_file()) - .unwrap_or(false) - { - continue; + let month_dirs = collect_dirs_desc_blocking(year_path, |s| s.parse::().ok())?; + for (_month, month_path) in month_dirs.iter() { + if *scanned_files >= MAX_SCAN_FILES { + break 'outer; + } + let day_dirs = collect_dirs_desc_blocking(month_path, |s| s.parse::().ok())?; + for (_day, day_path) in day_dirs.iter() { + if *scanned_files >= MAX_SCAN_FILES { + break 'outer; + } + let day_files = collect_rollout_day_file_candidates_blocking(day_path)?; + for (_ts, id, path, updated_at) in day_files.into_iter() { + *scanned_files += 1; + if *scanned_files > MAX_SCAN_FILES { + break 'outer; + } + candidates.push(ThreadCandidate { + path, + id, + updated_at, + }); + } + } } - let Some(rollout_file) = compression::RolloutFile::from_path(entry.path()) else { - continue; - }; - let Some((_ts, id)) = parse_timestamp_uuid_from_filename(rollout_file.plain_file_name()) - else { + } + + Ok(candidates) +} + +fn collect_flat_files_by_updated_at_blocking( + root: &Path, + scanned_files: &mut usize, +) -> io::Result> { + let mut candidates = Vec::new(); + let dir = std::fs::read_dir(root)?; + for entry in dir { + if *scanned_files >= MAX_SCAN_FILES { + break; + } + let entry = entry?; + let Some((_ts, id, path, updated_at)) = rollout_candidate_from_dir_entry(&entry) else { continue; }; *scanned_files += 1; if *scanned_files > MAX_SCAN_FILES { break; } - let updated_at = file_modified_time(rollout_file.path()) - .await - .unwrap_or(None); candidates.push(ThreadCandidate { - path: rollout_file.into_path(), + path, id, updated_at, }); @@ -1032,6 +1061,58 @@ async fn collect_flat_files_by_updated_at( Ok(candidates) } +fn collect_dirs_desc_blocking(parent: &Path, parse: F) -> io::Result> +where + T: Ord + Copy, + F: Fn(&str) -> Option, +{ + let mut vec: Vec<(T, PathBuf)> = Vec::new(); + let dir = std::fs::read_dir(parent)?; + for entry in dir { + let entry = entry?; + if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) + && let Some(s) = entry.file_name().to_str() + && let Some(v) = parse(s) + { + vec.push((v, entry.path())); + } + } + vec.sort_by_key(|(v, _)| Reverse(*v)); + Ok(vec) +} + +fn collect_rollout_day_file_candidates_blocking( + day_path: &Path, +) -> io::Result)>> { + let mut day_files = Vec::new(); + let dir = std::fs::read_dir(day_path)?; + for entry in dir { + let entry = entry?; + if let Some(candidate) = rollout_candidate_from_dir_entry(&entry) { + day_files.push(candidate); + } + } + day_files.sort_by_key(|(ts, sid, _path, _updated_at)| (Reverse(*ts), Reverse(*sid))); + Ok(day_files) +} + +fn rollout_candidate_from_dir_entry( + entry: &std::fs::DirEntry, +) -> Option<(OffsetDateTime, Uuid, PathBuf, Option)> { + if !entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) { + return None; + } + let rollout_file = compression::RolloutFile::from_path(entry.path())?; + let (ts, id) = parse_timestamp_uuid_from_filename(rollout_file.plain_file_name())?; + let updated_at = entry + .metadata() + .ok() + .and_then(|meta| meta.modified().ok()) + .map(OffsetDateTime::from) + .and_then(truncate_to_millis); + Some((ts, id, rollout_file.into_path(), updated_at)) +} + async fn walk_rollout_files( root: &Path, scanned_files: &mut usize, diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 7ba111e944a9..0c251b03444e 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -379,6 +379,7 @@ impl RolloutRecorder { /*parent_thread_id*/ None, archived, search_term, + /*validate_rollout_paths*/ false, ) .await .map(Into::into) @@ -452,8 +453,8 @@ impl RolloutRecorder { // Warm the DB by repairing every filesystem hit before querying SQLite. Source/provider/cwd // filters are already validated from rollout head metadata, so lightweight read-repair is // enough there. Search can depend on full title metadata, so keep full reconciliation. - for item in &fs_page.items { - if search_term.is_some() { + if search_term.is_some() { + for item in &fs_page.items { state_db::reconcile_rollout( state_db_ctx.as_deref(), item.path.as_path(), @@ -464,15 +465,19 @@ impl RolloutRecorder { /*new_thread_memory_mode*/ None, ) .await; - } else { - state_db::read_repair_rollout_path( - state_db_ctx.as_deref(), - item.thread_id, - Some(archived), - item.path.as_path(), - ) - .await; } + } else { + let repair_paths = fs_page + .items + .iter() + .map(|item| (item.thread_id, item.path.clone())) + .collect::>(); + state_db::read_repair_rollout_paths( + state_db_ctx.as_deref(), + repair_paths.as_slice(), + Some(archived), + ) + .await; } let db_page = state_db::list_threads_db( @@ -488,6 +493,7 @@ impl RolloutRecorder { /*parent_thread_id*/ None, archived, search_term, + /*validate_rollout_paths*/ true, ) .await; if let Some(db_page) = db_page { @@ -517,6 +523,7 @@ impl RolloutRecorder { /*parent_thread_id*/ None, archived, search_term, + /*validate_rollout_paths*/ true, ) .await { @@ -557,6 +564,7 @@ impl RolloutRecorder { /*parent_thread_id*/ None, archived, search_term, + /*validate_rollout_paths*/ true, ) .await { @@ -635,6 +643,7 @@ impl RolloutRecorder { /*parent_thread_id*/ None, /*archived*/ false, /*search_term*/ None, + /*validate_rollout_paths*/ true, ) .await else { @@ -868,59 +877,38 @@ impl RolloutRecorder { path: &Path, ) -> std::io::Result<(Vec, Option, usize)> { trace!("Resuming rollout from {path:?}"); + if let Some(existing_path) = compression::existing_rollout_path(path).await + && compression::RolloutFile::from_path(existing_path.clone()) + .is_some_and(|rollout_file| !rollout_file.is_compressed()) + { + match tokio::task::spawn_blocking(move || { + load_plain_rollout_items_blocking(existing_path.as_path()) + }) + .await + .map_err(IoError::other)? + { + Ok(result) => return Ok(result), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} + Err(err) => return Err(err), + } + } + let mut items: Vec = Vec::new(); let mut thread_id: Option = None; let mut parse_errors = 0usize; let mut reader = compression::open_rollout_line_reader(path).await?; let mut saw_non_empty_line = false; while let Some(line) = reader.next_line().await? { - if line.trim().is_empty() { - continue; - } - saw_non_empty_line = true; - let mut v: Value = match serde_json::from_str(&line) { - Ok(v) => v, - Err(e) => { - warn!("failed to parse line as JSON: {line:?}, error: {e}"); - parse_errors = parse_errors.saturating_add(1); - continue; - } - }; - if strip_legacy_ghost_snapshot_rollout_line(&mut v) { - trace!("skipping legacy ghost_snapshot rollout line"); - continue; - } - - // Parse the rollout line structure - match serde_json::from_value::(v.clone()) { - Ok(rollout_line) => { - let item = rollout_line.item; - // Use the FIRST SessionMeta encountered in the file as the canonical - // thread id and main session information. Keep all items intact. - if thread_id.is_none() - && let RolloutItem::SessionMeta(session_meta_line) = &item - { - thread_id = Some(session_meta_line.meta.id); - } - items.push(item); - } - Err(e) => { - trace!("failed to parse rollout line: {e}"); - parse_errors = parse_errors.saturating_add(1); - } - } - } - if !saw_non_empty_line { - return Err(IoError::other("empty session file")); + parse_rollout_item_line( + line.as_str(), + &mut items, + &mut thread_id, + &mut parse_errors, + &mut saw_non_empty_line, + ); } - tracing::debug!( - "Resumed rollout with {} items, thread ID: {:?}, parse errors: {}", - items.len(), - thread_id, - parse_errors, - ); - Ok((items, thread_id, parse_errors)) + finish_loaded_rollout(items, thread_id, parse_errors, saw_non_empty_line) } pub async fn get_rollout_history(path: &Path) -> std::io::Result { @@ -968,6 +956,91 @@ impl RolloutRecorder { } } +fn load_plain_rollout_items_blocking( + path: &Path, +) -> std::io::Result<(Vec, Option, usize)> { + let contents = std::fs::read_to_string(path)?; + let mut items: Vec = Vec::new(); + let mut thread_id: Option = None; + let mut parse_errors = 0usize; + let mut saw_non_empty_line = false; + for line in contents.lines() { + parse_rollout_item_line( + line, + &mut items, + &mut thread_id, + &mut parse_errors, + &mut saw_non_empty_line, + ); + } + finish_loaded_rollout(items, thread_id, parse_errors, saw_non_empty_line) +} + +fn parse_rollout_item_line( + line: &str, + items: &mut Vec, + thread_id: &mut Option, + parse_errors: &mut usize, + saw_non_empty_line: &mut bool, +) { + if line.trim().is_empty() { + return; + } + *saw_non_empty_line = true; + let rollout_line = if line.contains("ghost_snapshot") { + let mut v: Value = match serde_json::from_str(line) { + Ok(v) => v, + Err(e) => { + warn!("failed to parse line as JSON: {line:?}, error: {e}"); + *parse_errors = (*parse_errors).saturating_add(1); + return; + } + }; + if strip_legacy_ghost_snapshot_rollout_line(&mut v) { + trace!("skipping legacy ghost_snapshot rollout line"); + return; + } + serde_json::from_value::(v) + } else { + serde_json::from_str::(line) + }; + + match rollout_line { + Ok(rollout_line) => { + let item = rollout_line.item; + if thread_id.is_none() + && let RolloutItem::SessionMeta(session_meta_line) = &item + { + *thread_id = Some(session_meta_line.meta.id); + } + items.push(item); + } + Err(e) => { + trace!("failed to parse rollout line: {e}"); + *parse_errors = (*parse_errors).saturating_add(1); + } + } +} + +fn finish_loaded_rollout( + items: Vec, + thread_id: Option, + parse_errors: usize, + saw_non_empty_line: bool, +) -> std::io::Result<(Vec, Option, usize)> { + if !saw_non_empty_line { + return Err(IoError::other("empty session file")); + } + + tracing::debug!( + "Resumed rollout with {} items, thread ID: {:?}, parse errors: {}", + items.len(), + thread_id, + parse_errors, + ); + Ok((items, thread_id, parse_errors)) +} + fn strip_legacy_ghost_snapshot_rollout_line(value: &mut Value) -> bool { match value.get("type").and_then(Value::as_str) { Some("response_item") => value @@ -1037,19 +1110,43 @@ async fn fill_missing_thread_item_metadata_from_state_db( return page; }; + let thread_ids = page + .items + .iter() + .filter_map(|item| item.thread_id) + .collect::>(); + let mut metadata_by_id = match state_db_ctx.get_threads(thread_ids.as_slice()).await { + Ok(metadata_by_id) => metadata_by_id, + Err(err) => { + warn!( + "state db get_threads failed while overlaying filesystem scan thread metadata: {err}" + ); + for item in &mut page.items { + let Some(thread_id) = item.thread_id else { + continue; + }; + let metadata = match state_db_ctx.get_thread(thread_id).await { + Ok(Some(metadata)) => metadata, + Ok(None) => continue, + Err(err) => { + warn!( + "state db get_thread failed while overlaying filesystem scan thread metadata: {err}" + ); + continue; + } + }; + fill_missing_thread_item_metadata(item, thread_item_from_state_metadata(metadata)); + } + return page; + } + }; + for item in &mut page.items { let Some(thread_id) = item.thread_id else { continue; }; - let metadata = match state_db_ctx.get_thread(thread_id).await { - Ok(Some(metadata)) => metadata, - Ok(None) => continue, - Err(err) => { - warn!( - "state db get_thread failed while overlaying filesystem scan thread metadata: {err}" - ); - continue; - } + let Some(metadata) = metadata_by_id.remove(&thread_id) else { + continue; }; fill_missing_thread_item_metadata(item, thread_item_from_state_metadata(metadata)); } @@ -1063,6 +1160,7 @@ fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadIt thread_id: _state_thread_id, first_user_message, preview, + title, cwd, git_branch, git_sha, @@ -1084,6 +1182,9 @@ fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadIt if item.preview.is_none() { item.preview = preview; } + if item.title.is_none() { + item.title = title; + } if item.cwd.is_none() { item.cwd = cwd; } @@ -1741,6 +1842,22 @@ impl JsonlWriter { } } +impl From for ThreadsPage { + fn from(db_page: codex_state::ThreadListPage) -> Self { + let items = db_page + .items + .into_iter() + .map(thread_item_from_state_list_item) + .collect(); + Self { + items, + next_cursor: db_page.next_anchor.map(Into::into), + num_scanned_files: db_page.num_scanned_rows, + reached_scan_cap: false, + } + } +} + impl From for ThreadsPage { fn from(db_page: codex_state::ThreadsPage) -> Self { let items = db_page @@ -1757,12 +1874,40 @@ impl From for ThreadsPage { } } +fn thread_item_from_state_list_item(item: codex_state::ThreadListItem) -> ThreadItem { + ThreadItem { + path: item.rollout_path, + thread_id: Some(item.id), + first_user_message: item.first_user_message, + preview: item.preview, + title: Some(item.title), + cwd: Some(item.cwd), + git_branch: item.git_branch, + git_sha: item.git_sha, + git_origin_url: item.git_origin_url, + source: Some( + serde_json::from_str(item.source.as_str()) + .or_else(|_| serde_json::from_value(Value::String(item.source))) + .unwrap_or(SessionSource::Unknown), + ), + parent_thread_id: None, + agent_nickname: item.agent_nickname, + agent_role: item.agent_role, + model_provider: Some(item.model_provider), + cli_version: Some(item.cli_version), + created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)), + updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Millis, true)), + recency_at: Some(item.recency_at.to_rfc3339_opts(SecondsFormat::Millis, true)), + } +} + fn thread_item_from_state_metadata(item: codex_state::ThreadMetadata) -> ThreadItem { ThreadItem { path: item.rollout_path, thread_id: Some(item.id), first_user_message: item.first_user_message, preview: item.preview, + title: Some(item.title), cwd: Some(item.cwd), git_branch: item.git_branch, git_sha: item.git_sha, @@ -1837,7 +1982,7 @@ async fn resume_candidate_matches_cwd( } async fn select_resume_path_from_db_page( - page: &codex_state::ThreadsPage, + page: &codex_state::ThreadListPage, filter_cwd: Option<&Path>, default_provider: &str, ) -> Option { diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 2558750f1516..2a54ccd76d6e 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -973,6 +973,7 @@ fn fill_missing_thread_item_metadata_preserves_identity_and_prefers_state_git_fi thread_id: Some(filesystem_thread_id), first_user_message: Some("filesystem message".to_string()), preview: Some("filesystem preview".to_string()), + title: None, cwd: None, git_branch: Some("filesystem-branch".to_string()), git_sha: Some("filesystem-sha".to_string()), @@ -992,6 +993,7 @@ fn fill_missing_thread_item_metadata_preserves_identity_and_prefers_state_git_fi thread_id: Some(state_thread_id), first_user_message: Some("state message".to_string()), preview: Some("state preview".to_string()), + title: None, cwd: Some(PathBuf::from("/tmp/state-cwd")), git_branch: Some("state-branch".to_string()), git_sha: Some("state-sha".to_string()), diff --git a/codex-rs/rollout/src/session_index.rs b/codex-rs/rollout/src/session_index.rs index 96ff9d825e01..aa846870fb56 100644 --- a/codex-rs/rollout/src/session_index.rs +++ b/codex-rs/rollout/src/session_index.rs @@ -15,7 +15,6 @@ use codex_protocol::ThreadId; use codex_protocol::protocol::SessionMetaLine; use serde::Deserialize; use serde::Serialize; -use tokio::io::AsyncBufReadExt; const SESSION_INDEX_FILE: &str = "session_index.jsonl"; const READ_CHUNK_SIZE: usize = 8192; @@ -130,26 +129,10 @@ pub async fn find_thread_names_by_ids( return Ok(HashMap::new()); } - let file = tokio::fs::File::open(&path).await?; - let reader = tokio::io::BufReader::new(file); - let mut lines = reader.lines(); - let mut names = HashMap::with_capacity(thread_ids.len()); - - while let Some(line) = lines.next_line().await? { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - let Ok(entry) = serde_json::from_str::(trimmed) else { - continue; - }; - let name = entry.thread_name.trim(); - if !name.is_empty() && thread_ids.contains(&entry.id) { - names.insert(entry.id, name.to_string()); - } - } - - Ok(names) + let thread_ids = thread_ids.clone(); + tokio::task::spawn_blocking(move || scan_index_from_end_by_ids(&path, &thread_ids)) + .await + .map_err(std::io::Error::other)? } /// Locate a recorded thread rollout and read its session metadata by thread name. @@ -205,6 +188,24 @@ fn scan_index_from_end_by_id( scan_index_from_end(path, |entry| entry.id == *thread_id) } +fn scan_index_from_end_by_ids( + path: &Path, + thread_ids: &HashSet, +) -> std::io::Result> { + let mut names = HashMap::with_capacity(thread_ids.len()); + scan_index_from_end_for_each(path, |entry| { + let name = entry.thread_name.trim(); + if !name.is_empty() && thread_ids.contains(&entry.id) && !names.contains_key(&entry.id) { + names.insert(entry.id, name.to_string()); + if names.len() == thread_ids.len() { + return Ok(Some(entry.clone())); + } + } + Ok(None) + })?; + Ok(names) +} + fn stream_thread_ids_from_end_by_name( path: &Path, name: &str, diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index 96fbf7af1c73..9564e7566009 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -12,6 +12,7 @@ use codex_protocol::ThreadId; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; pub use codex_state::LogEntry; +use codex_state::ThreadMetadata; use codex_state::ThreadMetadataBuilder; use codex_utils_path::normalize_for_path_comparison; use serde_json::Value; @@ -355,7 +356,7 @@ pub async fn list_thread_ids_db( } } -/// List thread metadata from SQLite without rollout directory traversal. +/// List thread summaries from SQLite without rollout directory traversal. #[allow(clippy::too_many_arguments)] pub async fn list_threads_db( context: Option<&codex_state::StateRuntime>, @@ -370,7 +371,8 @@ pub async fn list_threads_db( parent_thread_id: Option, archived: bool, search_term: Option<&str>, -) -> Option { + validate_rollout_paths: bool, +) -> Option { let ctx = context?; if ctx.codex_home() != codex_home { warn!( @@ -415,15 +417,15 @@ pub async fn list_threads_db( }; let page = match parent_thread_id { Some(parent_thread_id) => { - ctx.list_threads_by_parent(page_size, parent_thread_id, filters) + ctx.list_thread_summaries_by_parent(page_size, parent_thread_id, filters) .await } - None => ctx.list_threads(page_size, filters).await, + None => ctx.list_thread_summaries(page_size, filters).await, }; match page { Ok(mut page) => { - // Parent-filtered listings intentionally treat persisted state as authoritative. - if parent_thread_id.is_some() { + // Parent-filtered and DB-only listings intentionally treat persisted state as authoritative. + if parent_thread_id.is_some() || !validate_rollout_paths { return Some(page); } let mut valid_items = Vec::with_capacity(page.items.len()); @@ -559,6 +561,67 @@ pub async fn reconcile_rollout( } } +/// Repair a batch of thread rollout paths after filesystem fallback succeeds. +pub async fn read_repair_rollout_paths( + context: Option<&codex_state::StateRuntime>, + rollout_paths: &[(Option, PathBuf)], + archived_only: Option, +) { + let Some(ctx) = context else { + return; + }; + if rollout_paths.is_empty() { + return; + } + + let mut seen_thread_ids = std::collections::HashSet::with_capacity(rollout_paths.len()); + let mut thread_ids = Vec::with_capacity(rollout_paths.len()); + for (thread_id, _) in rollout_paths { + if let Some(thread_id) = thread_id + && seen_thread_ids.insert(*thread_id) + { + thread_ids.push(*thread_id); + } + } + + let metadata_by_id = match ctx.get_threads(&thread_ids).await { + Ok(metadata_by_id) => metadata_by_id, + Err(err) => { + warn!("state db batch read-repair lookup failed: {err}"); + for (thread_id, rollout_path) in rollout_paths { + read_repair_rollout_path(Some(ctx), *thread_id, archived_only, rollout_path).await; + } + return; + } + }; + + for (thread_id, rollout_path) in rollout_paths { + if let Some(thread_id) = thread_id { + if let Some(metadata) = metadata_by_id.get(thread_id) { + if read_repair_existing_rollout_metadata( + ctx, + metadata, + archived_only, + rollout_path.as_path(), + ) + .await + { + continue; + } + } else { + warn!( + "state db discrepancy during read_repair_rollout_path: upsert_needed (slow path)" + ); + } + } else { + warn!( + "state db discrepancy during read_repair_rollout_path: upsert_needed (slow path)" + ); + } + reconcile_rollout_path_from_contents(ctx, rollout_path.as_path(), archived_only).await; + } +} + /// Repair a thread's rollout path after filesystem fallback succeeds. pub async fn read_repair_rollout_path( context: Option<&codex_state::StateRuntime>, @@ -577,28 +640,8 @@ pub async fn read_repair_rollout_path( && let Ok(Some(metadata)) = ctx.get_thread(thread_id).await { saw_existing_metadata = true; - let mut repaired = metadata.clone(); - repaired.rollout_path = rollout_path.to_path_buf(); - repaired.cwd = normalize_cwd_for_state_db(&repaired.cwd); - match archived_only { - Some(true) if repaired.archived_at.is_none() => { - repaired.archived_at = Some(repaired.updated_at); - } - Some(false) => { - repaired.archived_at = None; - } - Some(true) | None => {} - } - if repaired == metadata { - return; - } - warn!("state db discrepancy during read_repair_rollout_path: upsert_needed (fast path)"); - if let Err(err) = ctx.upsert_thread(&repaired).await { - warn!( - "state db read-repair upsert failed for {}: {err}", - rollout_path.display() - ); - } else { + if read_repair_existing_rollout_metadata(ctx, &metadata, archived_only, rollout_path).await + { return; } } @@ -608,6 +651,47 @@ pub async fn read_repair_rollout_path( if !saw_existing_metadata { warn!("state db discrepancy during read_repair_rollout_path: upsert_needed (slow path)"); } + reconcile_rollout_path_from_contents(ctx, rollout_path, archived_only).await; +} + +async fn read_repair_existing_rollout_metadata( + ctx: &codex_state::StateRuntime, + metadata: &ThreadMetadata, + archived_only: Option, + rollout_path: &Path, +) -> bool { + let mut repaired = metadata.clone(); + repaired.rollout_path = rollout_path.to_path_buf(); + repaired.cwd = normalize_cwd_for_state_db(&repaired.cwd); + match archived_only { + Some(true) if repaired.archived_at.is_none() => { + repaired.archived_at = Some(repaired.updated_at); + } + Some(false) => { + repaired.archived_at = None; + } + Some(true) | None => {} + } + if repaired == *metadata { + return true; + } + warn!("state db discrepancy during read_repair_rollout_path: upsert_needed (fast path)"); + if let Err(err) = ctx.upsert_thread(&repaired).await { + warn!( + "state db read-repair upsert failed for {}: {err}", + rollout_path.display() + ); + false + } else { + true + } +} + +async fn reconcile_rollout_path_from_contents( + ctx: &codex_state::StateRuntime, + rollout_path: &Path, + archived_only: Option, +) { let default_provider = crate::list::read_session_meta_line(rollout_path) .await .ok() diff --git a/codex-rs/rollout/src/tests.rs b/codex-rs/rollout/src/tests.rs index 0e3ae912dd91..cb31541870d7 100644 --- a/codex-rs/rollout/src/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -601,6 +601,7 @@ async fn test_list_conversations_latest_first() { thread_id: Some(thread_id_from_uuid(u3)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -620,6 +621,7 @@ async fn test_list_conversations_latest_first() { thread_id: Some(thread_id_from_uuid(u2)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -639,6 +641,7 @@ async fn test_list_conversations_latest_first() { thread_id: Some(thread_id_from_uuid(u1)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -751,6 +754,7 @@ async fn test_pagination_cursor() { thread_id: Some(thread_id_from_uuid(u5)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -770,6 +774,7 @@ async fn test_pagination_cursor() { thread_id: Some(thread_id_from_uuid(u4)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -825,6 +830,7 @@ async fn test_pagination_cursor() { thread_id: Some(thread_id_from_uuid(u3)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -844,6 +850,7 @@ async fn test_pagination_cursor() { thread_id: Some(thread_id_from_uuid(u2)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -891,6 +898,7 @@ async fn test_pagination_cursor() { thread_id: Some(thread_id_from_uuid(u1)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -1063,6 +1071,7 @@ async fn test_get_thread_contents() { thread_id: Some(thread_id_from_uuid(uuid)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -1419,6 +1428,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { thread_id: Some(thread_id_from_uuid(u3)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, @@ -1438,6 +1448,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { thread_id: Some(thread_id_from_uuid(u2)), first_user_message: Some("Hello from user".to_string()), preview: Some("Hello from user".to_string()), + title: None, cwd: Some(Path::new(".").to_path_buf()), git_branch: None, git_sha: None, diff --git a/codex-rs/state/migrations/0040_thread_spawn_edges_parent_child.sql b/codex-rs/state/migrations/0040_thread_spawn_edges_parent_child.sql new file mode 100644 index 000000000000..0c5690b19a32 --- /dev/null +++ b/codex-rs/state/migrations/0040_thread_spawn_edges_parent_child.sql @@ -0,0 +1,2 @@ +CREATE INDEX idx_thread_spawn_edges_parent_child + ON thread_spawn_edges(parent_thread_id, child_thread_id); diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 6b5b907653a7..6bd35df3afff 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -53,6 +53,8 @@ pub use model::Stage1Output; pub use model::Stage1StartupClaimParams; pub use model::ThreadGoal; pub use model::ThreadGoalStatus; +pub use model::ThreadListItem; +pub use model::ThreadListPage; pub use model::ThreadMetadata; pub use model::ThreadMetadataBuilder; pub use model::ThreadsPage; diff --git a/codex-rs/state/src/model/mod.rs b/codex-rs/state/src/model/mod.rs index 2c725e50aba6..ffeca9e4b6c6 100644 --- a/codex-rs/state/src/model/mod.rs +++ b/codex-rs/state/src/model/mod.rs @@ -31,6 +31,8 @@ pub use thread_metadata::BackfillStats; pub use thread_metadata::ExtractionOutcome; pub use thread_metadata::SortDirection; pub use thread_metadata::SortKey; +pub use thread_metadata::ThreadListItem; +pub use thread_metadata::ThreadListPage; pub use thread_metadata::ThreadMetadata; pub use thread_metadata::ThreadMetadataBuilder; pub use thread_metadata::ThreadsPage; diff --git a/codex-rs/state/src/model/thread_metadata.rs b/codex-rs/state/src/model/thread_metadata.rs index c25c458986e3..d28bb23d639b 100644 --- a/codex-rs/state/src/model/thread_metadata.rs +++ b/codex-rs/state/src/model/thread_metadata.rs @@ -49,6 +49,56 @@ pub struct ThreadsPage { pub num_scanned_rows: usize, } +/// A single page of lightweight thread list results. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ThreadListPage { + /// The thread summary items in this page. + pub items: Vec, + /// The next anchor to use for pagination, if any. + pub next_anchor: Option, + /// The number of rows scanned to produce this page. + pub num_scanned_rows: usize, +} + +/// Thread metadata fields needed to render list/search pages. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ThreadListItem { + /// The thread identifier. + pub id: ThreadId, + /// The absolute rollout path on disk. + pub rollout_path: PathBuf, + /// The creation timestamp. + pub created_at: DateTime, + /// The last update timestamp. + pub updated_at: DateTime, + /// The product recency timestamp used by recent-thread listings. + pub recency_at: DateTime, + /// The session source (stringified enum). + pub source: String, + /// Optional random unique nickname assigned to an AgentControl-spawned sub-agent. + pub agent_nickname: Option, + /// Optional role (agent_role) assigned to an AgentControl-spawned sub-agent. + pub agent_role: Option, + /// The model provider identifier. + pub model_provider: String, + /// The working directory for the thread. + pub cwd: PathBuf, + /// Version of the CLI that created the thread. + pub cli_version: String, + /// A best-effort thread title. + pub title: String, + /// Best available user-facing preview for discovery and list display. + pub preview: Option, + /// First user message observed for this thread, if any. + pub first_user_message: Option, + /// The git commit SHA, if known. + pub git_sha: Option, + /// The git branch name, if known. + pub git_branch: Option, + /// The git origin URL, if known. + pub git_origin_url: Option, +} + /// The outcome of extracting metadata from a rollout. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ExtractionOutcome { diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 4b1d264d0b82..44c2b8ec2039 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -13,6 +13,8 @@ use crate::LogRow; use crate::MEMORIES_DB_FILENAME; use crate::STATE_DB_FILENAME; use crate::SortKey; +use crate::ThreadListItem; +use crate::ThreadListPage; use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; use crate::ThreadsPage; diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 576f7602642e..32ae4674cc44 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -45,6 +45,36 @@ WHERE threads.id = ? .transpose() } + pub async fn get_threads( + &self, + ids: &[ThreadId], + ) -> anyhow::Result> { + if ids.is_empty() { + return Ok(std::collections::HashMap::new()); + } + + const MAX_SQL_BINDINGS: usize = 900; + let mut metadata_by_id = std::collections::HashMap::with_capacity(ids.len()); + for chunk in ids.chunks(MAX_SQL_BINDINGS) { + let mut builder = QueryBuilder::::new(""); + push_thread_select_columns(&mut builder); + builder.push(" FROM threads WHERE threads.id IN ("); + let mut separated = builder.separated(", "); + for id in chunk { + separated.push_bind(id.to_string()); + } + separated.push_unseparated(")"); + + let rows = builder.build().fetch_all(self.pool.as_ref()).await?; + for row in rows { + let metadata = ThreadRow::try_from_row(&row).and_then(ThreadMetadata::try_from)?; + metadata_by_id.insert(metadata.id, metadata); + } + } + + Ok(metadata_by_id) + } + pub async fn get_thread_memory_mode(&self, id: ThreadId) -> anyhow::Result> { let row = sqlx::query("SELECT memory_mode FROM threads WHERE id = ?") .bind(id.to_string()) @@ -416,6 +446,59 @@ ON CONFLICT(child_thread_id) DO NOTHING .await } + /// List lightweight thread summaries using the underlying database. + pub async fn list_thread_summaries( + &self, + page_size: usize, + filters: ThreadFilterOptions<'_>, + ) -> anyhow::Result { + self.list_thread_summaries_matching(page_size, filters, /*parent_thread_id*/ None) + .await + } + + /// List lightweight direct children of `parent_thread_id` using persisted spawn edges. + pub async fn list_thread_summaries_by_parent( + &self, + page_size: usize, + parent_thread_id: ThreadId, + filters: ThreadFilterOptions<'_>, + ) -> anyhow::Result { + self.list_thread_summaries_matching(page_size, filters, Some(parent_thread_id)) + .await + } + + async fn list_thread_summaries_matching( + &self, + page_size: usize, + filters: ThreadFilterOptions<'_>, + parent_thread_id: Option, + ) -> anyhow::Result { + let limit = page_size.saturating_add(1); + + let mut builder = QueryBuilder::::new(""); + push_list_thread_summaries_query(&mut builder, filters, parent_thread_id, limit); + + let rows = builder.build().fetch_all(self.pool.as_ref()).await?; + let mut items = rows + .into_iter() + .map(|row| thread_list_item_from_row(&row)) + .collect::, _>>()?; + let num_scanned_rows = items.len(); + let next_anchor = if items.len() > page_size { + items.pop(); + items + .last() + .and_then(|item| anchor_from_list_item(item, filters.sort_key)) + } else { + None + }; + Ok(ThreadListPage { + items, + next_anchor, + num_scanned_rows, + }) + } + async fn list_threads_matching( &self, page_size: usize, @@ -1094,6 +1177,16 @@ fn one_thread_id_from_rows( } } +fn push_list_thread_summaries_query( + builder: &mut QueryBuilder, + filters: ThreadFilterOptions<'_>, + parent_thread_id: Option, + limit: usize, +) { + push_thread_list_select_columns(builder); + push_list_threads_from_filter_order(builder, filters, parent_thread_id, limit); +} + fn push_list_threads_query( builder: &mut QueryBuilder, filters: ThreadFilterOptions<'_>, @@ -1101,14 +1194,25 @@ fn push_list_threads_query( limit: usize, ) { push_thread_select_columns(builder); - builder.push(" FROM threads"); + push_list_threads_from_filter_order(builder, filters, parent_thread_id, limit); +} + +fn push_list_threads_from_filter_order( + builder: &mut QueryBuilder, + filters: ThreadFilterOptions<'_>, + parent_thread_id: Option, + limit: usize, +) { + if parent_thread_id.is_some() { + builder.push(" FROM thread_spawn_edges CROSS JOIN threads"); + } else { + builder.push(" FROM threads"); + } push_thread_filters(builder, filters); if let Some(parent_thread_id) = parent_thread_id { - builder.push( - " AND threads.id IN (SELECT child_thread_id FROM thread_spawn_edges WHERE parent_thread_id = ", - ); + builder.push(" AND thread_spawn_edges.parent_thread_id = "); builder.push_bind(parent_thread_id.to_string()); - builder.push(")"); + builder.push(" AND threads.id = thread_spawn_edges.child_thread_id"); } let order_by_index = match filters.cwd_filters { // Multi-cwd listing is supported but at the time of writing has no current use in production. @@ -1125,6 +1229,31 @@ fn push_list_threads_query( ); } +fn push_thread_list_select_columns(builder: &mut QueryBuilder) { + builder.push( + r#" +SELECT + threads.id, + threads.rollout_path, + threads.created_at_ms AS created_at, + threads.updated_at_ms AS updated_at, + threads.recency_at_ms AS recency_at, + threads.source, + threads.agent_nickname, + threads.agent_role, + threads.model_provider, + threads.cwd, + threads.cli_version, + threads.title, + threads.preview, + threads.first_user_message, + threads.git_sha, + threads.git_branch, + threads.git_origin_url +"#, + ); +} + pub(super) fn push_thread_select_columns(builder: &mut QueryBuilder) { builder.push( r#" @@ -1325,6 +1454,43 @@ pub(super) fn push_thread_order_and_limit( builder.push_bind(limit as i64); } +fn thread_list_item_from_row(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result { + let id: String = row.try_get("id")?; + let preview: String = row.try_get("preview")?; + let first_user_message: String = row.try_get("first_user_message")?; + Ok(ThreadListItem { + id: ThreadId::try_from(id)?, + rollout_path: PathBuf::from(row.try_get::("rollout_path")?), + created_at: epoch_millis_to_datetime(row.try_get("created_at")?)?, + updated_at: epoch_millis_to_datetime(row.try_get("updated_at")?)?, + recency_at: epoch_millis_to_datetime(row.try_get("recency_at")?)?, + source: row.try_get("source")?, + agent_nickname: row.try_get("agent_nickname")?, + agent_role: row.try_get("agent_role")?, + model_provider: row.try_get("model_provider")?, + cwd: PathBuf::from(row.try_get::("cwd")?), + cli_version: row.try_get("cli_version")?, + title: row.try_get("title")?, + preview: (!preview.is_empty()).then_some(preview), + first_user_message: (!first_user_message.is_empty()).then_some(first_user_message), + git_sha: row.try_get("git_sha")?, + git_branch: row.try_get("git_branch")?, + git_origin_url: row.try_get("git_origin_url")?, + }) +} + +fn anchor_from_list_item(item: &ThreadListItem, sort_key: SortKey) -> Option { + let ts = match sort_key { + SortKey::CreatedAt => item.created_at, + SortKey::UpdatedAt => item.updated_at, + SortKey::RecencyAt => item.recency_at, + }; + Some(crate::Anchor { + ts, + id: (sort_key == SortKey::RecencyAt).then_some(item.id), + }) +} + fn metadata_preview(metadata: &crate::ThreadMetadata) -> &str { metadata .preview diff --git a/codex-rs/thread-store/src/live_thread.rs b/codex-rs/thread-store/src/live_thread.rs index 88d347f57ba9..64b44838553c 100644 --- a/codex-rs/thread-store/src/live_thread.rs +++ b/codex-rs/thread-store/src/live_thread.rs @@ -32,6 +32,7 @@ pub struct LiveThread { thread_id: ThreadId, thread_store: Arc, metadata_sync: Arc>, + resume_history_include_archived: bool, } /// Owns a live thread while session initialization is still fallible. @@ -95,16 +96,18 @@ impl LiveThread { thread_id, thread_store, metadata_sync: Arc::new(Mutex::new(metadata_sync)), + resume_history_include_archived: true, }) } pub async fn resume( thread_store: Arc, - mut params: ResumeThreadParams, + params: ResumeThreadParams, ) -> ThreadStoreResult { let thread_id = params.thread_id; - let should_load_history = params.history.is_none(); let include_archived = params.include_archived; + let should_load_history = params.history.is_none() && !params.defer_metadata_history_load; + let mut params = params; thread_store.resume_thread(params.clone()).await?; if should_load_history { match thread_store @@ -130,6 +133,7 @@ impl LiveThread { thread_id, thread_store, metadata_sync: Arc::new(Mutex::new(metadata_sync)), + resume_history_include_archived: include_archived, }) } @@ -138,6 +142,9 @@ impl LiveThread { if items.is_empty() { return Ok(()); } + if !canonical_items.is_empty() { + self.observe_deferred_resume_history().await?; + } self.thread_store .append_items(AppendThreadItemsParams { thread_id: self.thread_id, @@ -189,6 +196,29 @@ impl LiveThread { self.thread_store.discard_thread(self.thread_id).await } + async fn observe_deferred_resume_history(&self) -> ThreadStoreResult<()> { + if !self + .metadata_sync + .lock() + .await + .needs_resume_history_observation() + { + return Ok(()); + } + let history = self + .thread_store + .load_history(LoadThreadHistoryParams { + thread_id: self.thread_id, + include_archived: self.resume_history_include_archived, + }) + .await?; + let mut metadata_sync = self.metadata_sync.lock().await; + if metadata_sync.needs_resume_history_observation() { + metadata_sync.observe_loaded_resume_history(history.items.as_slice()); + } + Ok(()) + } + pub async fn load_history( &self, include_archived: bool, diff --git a/codex-rs/thread-store/src/local/helpers.rs b/codex-rs/thread-store/src/local/helpers.rs index f48a0136c099..22be50048231 100644 --- a/codex-rs/thread-store/src/local/helpers.rs +++ b/codex-rs/thread-store/src/local/helpers.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; use std::time::SystemTime; use chrono::DateTime; +use chrono::Timelike; use chrono::Utc; use codex_git_utils::GitSha; use codex_protocol::ThreadId; @@ -17,6 +18,7 @@ use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; use codex_rollout::ThreadItem; +use codex_state::ThreadListItem; use codex_state::ThreadMetadata; use crate::StoredThread; @@ -119,6 +121,14 @@ pub(super) fn stored_thread_from_rollout_item( .clone() .or_else(|| item.first_user_message.clone()) .unwrap_or_default(); + let name = item + .title + .as_deref() + .map(str::trim) + .filter(|title| !title.is_empty()) + .filter(|title| item.first_user_message.as_deref().map(str::trim) != Some(*title)) + .filter(|title| preview.trim() != *title) + .map(str::to_string); let rollout_path = codex_rollout::plain_rollout_path(item.path.as_path()); Some(StoredThread { @@ -128,7 +138,7 @@ pub(super) fn stored_thread_from_rollout_item( forked_from_id: None, parent_thread_id: item.parent_thread_id, preview, - name: None, + name, model_provider: item .model_provider .filter(|provider| !provider.is_empty()) @@ -155,6 +165,86 @@ pub(super) fn stored_thread_from_rollout_item( }) } +pub(super) fn stored_thread_from_state_list_item( + item: ThreadListItem, + archived: bool, + parent_thread_id: Option, + default_provider: &str, +) -> StoredThread { + let ThreadListItem { + id, + rollout_path, + created_at, + updated_at, + recency_at, + source, + agent_nickname, + agent_role, + model_provider, + cwd, + cli_version, + title, + preview, + first_user_message, + git_sha, + git_branch, + git_origin_url, + } = item; + + let preview = preview + .or_else(|| first_user_message.clone()) + .unwrap_or_default(); + let title_trimmed = title.trim(); + let name = if title_trimmed.is_empty() + || first_user_message.as_deref().map(str::trim) == Some(title_trimmed) + || preview.trim() == title_trimmed + { + None + } else { + Some(title_trimmed.to_string()) + }; + let git_info = git_info_from_parts(git_sha, git_branch, git_origin_url); + let rollout_path = codex_rollout::plain_rollout_path(rollout_path.as_path()); + let model_provider = if model_provider.is_empty() { + default_provider.to_string() + } else { + model_provider + }; + let created_at = truncate_datetime_to_seconds(created_at); + let updated_at = truncate_datetime_to_millis(updated_at); + let recency_at = truncate_datetime_to_millis(recency_at); + + StoredThread { + thread_id: id, + extra_config: None, + rollout_path: Some(rollout_path), + forked_from_id: None, + parent_thread_id, + preview, + name, + model_provider, + model: None, + reasoning_effort: None, + created_at, + updated_at, + recency_at, + archived_at: archived.then_some(updated_at), + cwd, + cli_version, + source: parse_session_source(&source), + thread_source: None, + agent_nickname, + agent_role, + agent_path: None, + git_info, + approval_mode: AskForApproval::OnRequest, + permission_profile: PermissionProfile::read_only(), + token_usage: None, + first_user_message, + history: None, + } +} + pub(super) fn permission_profile_from_metadata_value(value: &str, cwd: &Path) -> PermissionProfile { serde_json::from_str::(value) .or_else(|_| { @@ -192,6 +282,21 @@ pub(super) fn set_thread_name_from_title(thread: &mut StoredThread, title: Strin thread.name = Some(title); } +pub(super) fn parse_session_source(source: &str) -> SessionSource { + serde_json::from_str(source) + .or_else(|_| serde_json::from_value(serde_json::Value::String(source.to_string()))) + .unwrap_or(SessionSource::Unknown) +} + +fn truncate_datetime_to_seconds(timestamp: DateTime) -> DateTime { + DateTime::::from_timestamp(timestamp.timestamp(), 0).unwrap_or(timestamp) +} + +fn truncate_datetime_to_millis(timestamp: DateTime) -> DateTime { + let nanos = timestamp.nanosecond() / 1_000_000 * 1_000_000; + DateTime::::from_timestamp(timestamp.timestamp(), nanos).unwrap_or(timestamp) +} + fn parse_rfc3339(value: Option<&str>) -> Option> { DateTime::parse_from_rfc3339(value?) .ok() diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index 03ae163afea9..a1543a82db60 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -1,18 +1,17 @@ -use std::collections::HashMap; use std::collections::HashSet; -use codex_protocol::ThreadId; use codex_rollout::RolloutConfig; use codex_rollout::RolloutRecorder; use codex_rollout::find_thread_names_by_ids; use codex_rollout::parse_cursor; use super::LocalThreadStore; -use super::helpers::distinct_thread_metadata_title; use super::helpers::set_thread_name_from_title; use super::helpers::stored_thread_from_rollout_item; +use super::helpers::stored_thread_from_state_list_item; use crate::ListThreadsParams; use crate::SortDirection; +use crate::StoredThread; use crate::ThreadPage; use crate::ThreadSortKey; use crate::ThreadStoreError; @@ -41,6 +40,18 @@ pub(super) async fn list_threads( SortDirection::Desc => codex_rollout::SortDirection::Desc, }; let state_db = store.state_db().await; + if params.parent_thread_id.is_some() || params.use_state_db_only { + return list_threads_from_state_db( + store, + state_db.as_deref(), + ¶ms, + cursor.as_ref(), + sort_key, + sort_direction, + ) + .await; + } + let rollout_config = RolloutConfig { codex_home: store.config.codex_home.clone(), sqlite_home: store.config.sqlite_home.clone(), @@ -76,36 +87,91 @@ pub(super) async fn list_threads( }) .collect::>(); - let thread_ids = items + fill_legacy_thread_names(store, &mut items).await; + + Ok(ThreadPage { items, next_cursor }) +} + +async fn list_threads_from_state_db( + store: &LocalThreadStore, + state_db: Option<&codex_state::StateRuntime>, + params: &ListThreadsParams, + cursor: Option<&codex_rollout::Cursor>, + sort_key: codex_rollout::ThreadSortKey, + sort_direction: codex_rollout::SortDirection, +) -> ThreadStoreResult { + let page = codex_rollout::state_db::list_threads_db( + state_db, + store.config.codex_home.as_path(), + params.page_size, + cursor, + sort_key, + sort_direction, + params.allowed_sources.as_slice(), + params.model_providers.as_deref(), + params.cwd_filters.as_deref(), + params.parent_thread_id, + params.archived, + params.search_term.as_deref(), + /*validate_rollout_paths*/ false, + ) + .await; + + let Some(page) = page else { + if params.parent_thread_id.is_some() { + return Err(ThreadStoreError::Internal { + message: "state DB unavailable for parent-filtered thread listing".to_string(), + }); + } + return Ok(ThreadPage { + items: Vec::new(), + next_cursor: None, + }); + }; + + let codex_state::ThreadListPage { + items, next_anchor, .. + } = page; + let next_cursor = next_anchor + .map(codex_rollout::Cursor::from) + .as_ref() + .and_then(|cursor| serde_json::to_value(cursor).ok()) + .and_then(|value| value.as_str().map(str::to_owned)); + let mut items = items + .into_iter() + .map(|metadata| { + stored_thread_from_state_list_item( + metadata, + params.archived, + params.parent_thread_id, + store.config.default_model_provider_id.as_str(), + ) + }) + .collect::>(); + fill_legacy_thread_names(store, &mut items).await; + + Ok(ThreadPage { items, next_cursor }) +} + +async fn fill_legacy_thread_names(store: &LocalThreadStore, items: &mut [StoredThread]) { + let thread_ids_missing_names = items .iter() + .filter(|thread| thread.name.is_none()) .map(|thread| thread.thread_id) .collect::>(); - let mut names = HashMap::::with_capacity(thread_ids.len()); - if let Some(state_db_ctx) = store.state_db().await { - for &thread_id in &thread_ids { - let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else { - continue; - }; - if let Some(title) = distinct_thread_metadata_title(&metadata) { - names.insert(thread_id, title); - } - } - } - if names.len() < thread_ids.len() + if !thread_ids_missing_names.is_empty() && let Ok(legacy_names) = - find_thread_names_by_ids(store.config.codex_home.as_path(), &thread_ids).await + find_thread_names_by_ids(store.config.codex_home.as_path(), &thread_ids_missing_names) + .await { - for (thread_id, title) in legacy_names { - names.entry(thread_id).or_insert(title); - } - } - for thread in &mut items { - if let Some(title) = names.get(&thread.thread_id).cloned() { - set_thread_name_from_title(thread, title); + for thread in items { + if thread.name.is_none() + && let Some(title) = legacy_names.get(&thread.thread_id).cloned() + { + set_thread_name_from_title(thread, title); + } } } - - Ok(ThreadPage { items, next_cursor }) } pub(super) async fn list_rollout_threads( @@ -131,6 +197,7 @@ pub(super) async fn list_rollout_threads( Some(parent_thread_id), params.archived, params.search_term.as_deref(), + /*validate_rollout_paths*/ false, ) .await .ok_or_else(|| ThreadStoreError::Internal { diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 19a0d2955bc9..1e1c2083327a 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -643,6 +643,7 @@ mod tests { thread_id, rollout_path: Some(rollout_path), history: None, + defer_metadata_history_load: false, include_archived: false, metadata: ThreadPersistenceMetadata { cwd: Some(home.path().to_path_buf()), @@ -697,6 +698,7 @@ mod tests { thread_id, rollout_path: Some(rollout_path), history: None, + defer_metadata_history_load: false, include_archived: false, metadata: ThreadPersistenceMetadata { cwd: Some(home.path().to_path_buf()), @@ -826,6 +828,7 @@ mod tests { thread_id, rollout_path: None, history: None, + defer_metadata_history_load: false, include_archived: true, metadata: thread_metadata(), }) @@ -886,6 +889,7 @@ mod tests { thread_id, rollout_path: Some(rollout_path), history: None, + defer_metadata_history_load: false, include_archived: true, metadata: thread_metadata(), }) @@ -908,6 +912,7 @@ mod tests { thread_id, rollout_path: Some(rollout_path), history: None, + defer_metadata_history_load: false, include_archived: true, metadata: ThreadPersistenceMetadata { cwd: None, @@ -937,6 +942,7 @@ mod tests { thread_id, rollout_path: Some(rollout_path), history: None, + defer_metadata_history_load: false, include_archived: true, metadata: thread_metadata(), }) @@ -985,6 +991,7 @@ mod tests { thread_id, rollout_path: Some(rollout_path.clone()), history: None, + defer_metadata_history_load: false, include_archived: true, metadata: thread_metadata(), }) @@ -1023,6 +1030,7 @@ mod tests { thread_id, rollout_path: Some(rollout_path), history: None, + defer_metadata_history_load: false, include_archived: true, metadata: thread_metadata(), }) diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index a1e1c5919035..0da467c8e93a 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -2,6 +2,7 @@ use chrono::DateTime; use chrono::Utc; use codex_protocol::models::PermissionProfile; use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_rollout::RolloutRecorder; @@ -39,15 +40,17 @@ pub(super) async fn read_thread( metadata.rollout_path.as_path(), ))) && (!params.include_history - || sqlite_rollout_path_can_load_history_for_thread( - store, - &metadata.rollout_path, - thread_id, - ) - .await) + || codex_rollout::existing_rollout_path(metadata.rollout_path.as_path()) + .await + .is_some()) { let metadata_sandbox_policy = metadata.sandbox_policy.clone(); - let mut thread = stored_thread_from_sqlite_metadata(store, metadata).await; + let mut thread = stored_thread_from_sqlite_metadata( + store, + metadata, + /*include_session_meta*/ !params.include_history, + ) + .await; if !params.include_history && let Some(rollout_path) = thread.rollout_path.clone() && let Ok(mut rollout_thread) = read_thread_from_rollout_path(store, rollout_path).await @@ -66,8 +69,22 @@ pub(super) async fn read_thread( ); thread = rollout_thread; } - attach_history_if_requested(&mut thread, params.include_history).await?; - return Ok(thread); + if params.include_history { + let Some(rollout_path) = thread.rollout_path.clone() else { + return Err(ThreadStoreError::Internal { + message: format!("failed to load thread history for thread {thread_id}"), + }); + }; + if let Some(history) = + load_history_for_thread(rollout_path.as_path(), thread_id).await? + { + apply_session_meta_from_history(&mut thread, &history.items); + thread.history = Some(history); + return Ok(thread); + } + } else { + return Ok(thread); + } } let path = resolve_rollout_path(store, thread_id, params.include_archived) @@ -86,22 +103,6 @@ pub(super) async fn read_thread( Ok(thread) } -async fn sqlite_rollout_path_can_load_history_for_thread( - store: &LocalThreadStore, - path: &std::path::Path, - thread_id: codex_protocol::ThreadId, -) -> bool { - if codex_rollout::existing_rollout_path(path).await.is_none() { - return false; - } - // SQLite metadata can outlive a moved/recreated rollout path. When history is - // requested, verify the path still resolves to the requested thread before - // trusting it as the source replay. - read_thread_from_rollout_path(store, path.to_path_buf()) - .await - .is_ok_and(|thread| thread.thread_id == thread_id) -} - pub(super) async fn read_thread_by_rollout_path( store: &LocalThreadStore, rollout_path: std::path::PathBuf, @@ -117,6 +118,9 @@ pub(super) async fn read_thread_by_rollout_path( } if let Some(metadata) = read_sqlite_metadata(store, thread.thread_id).await { thread.recency_at = metadata.recency_at; + if let Some(title) = distinct_thread_metadata_title(&metadata) { + set_thread_name_from_title(&mut thread, title); + } let existing_git_info = thread.git_info.take(); let (fallback_sha, fallback_branch, fallback_origin_url) = match existing_git_info { Some(info) => ( @@ -190,9 +194,17 @@ async fn attach_history_if_requested( message: format!("failed to load thread history for thread {thread_id}"), }); }; - let items = load_history_items(&path).await?; - thread.history = Some(StoredThreadHistory { thread_id, items }); - Ok(()) + match load_history_for_thread(&path, thread_id).await? { + Some(history) => { + thread.history = Some(history); + Ok(()) + } + None => Err(ThreadStoreError::InvalidRequest { + message: format!( + "rollout history for thread {thread_id} belongs to a different thread" + ), + }), + } } async fn resolve_rollout_path( @@ -280,15 +292,34 @@ async fn read_thread_from_rollout_path( Ok(thread) } -async fn load_history_items( +fn apply_session_meta_from_history(thread: &mut StoredThread, items: &[RolloutItem]) { + if let Some(meta_line) = items.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(meta_line), + RolloutItem::ResponseItem(_) + | RolloutItem::InterAgentCommunication(_) + | RolloutItem::Compacted(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) => None, + }) { + thread.forked_from_id = meta_line.meta.forked_from_id; + thread.parent_thread_id = meta_line.meta.parent_thread_id; + } +} + +async fn load_history_for_thread( path: &std::path::Path, -) -> ThreadStoreResult> { - let (items, _, _) = RolloutRecorder::load_rollout_items(path) - .await - .map_err(|err| ThreadStoreError::Internal { - message: format!("failed to load thread history {}: {err}", path.display()), - })?; - Ok(items) + thread_id: codex_protocol::ThreadId, +) -> ThreadStoreResult> { + let (items, history_thread_id, _) = + RolloutRecorder::load_rollout_items(path) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to load thread history {}: {err}", path.display()), + })?; + if history_thread_id != Some(thread_id) { + return Ok(None); + } + Ok(Some(StoredThreadHistory { thread_id, items })) } async fn read_sqlite_metadata( @@ -302,6 +333,7 @@ async fn read_sqlite_metadata( async fn stored_thread_from_sqlite_metadata( store: &LocalThreadStore, metadata: ThreadMetadata, + include_session_meta: bool, ) -> StoredThread { let name = match distinct_thread_metadata_title(&metadata) { Some(title) => Some(title), @@ -311,10 +343,14 @@ async fn stored_thread_from_sqlite_metadata( .flatten() .filter(|title| !title.trim().is_empty()), }; - let session_meta = read_session_meta_line(metadata.rollout_path.as_path()) - .await - .ok() - .map(|meta_line| meta_line.meta); + let session_meta = if include_session_meta { + read_session_meta_line(metadata.rollout_path.as_path()) + .await + .ok() + .map(|meta_line| meta_line.meta) + } else { + None + }; let rollout_path = codex_rollout::plain_rollout_path(metadata.rollout_path.as_path()); let forked_from_id = session_meta.as_ref().and_then(|meta| meta.forked_from_id); let parent_thread_id = session_meta.as_ref().and_then(|meta| meta.parent_thread_id); diff --git a/codex-rs/thread-store/src/local/update_thread_metadata.rs b/codex-rs/thread-store/src/local/update_thread_metadata.rs index d1c76ad327a6..2a428adbd466 100644 --- a/codex-rs/thread-store/src/local/update_thread_metadata.rs +++ b/codex-rs/thread-store/src/local/update_thread_metadata.rs @@ -803,6 +803,7 @@ mod tests { thread_id, rollout_path: Some(path.clone()), history: None, + defer_metadata_history_load: false, include_archived: true, metadata: test_thread_metadata(), }) @@ -1606,6 +1607,7 @@ mod tests { thread_id, rollout_path: Some(archived_path.clone()), history: None, + defer_metadata_history_load: false, include_archived: true, metadata: test_thread_metadata(), }) diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index b2953d7ef98c..dac339ae6ccc 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -41,6 +41,7 @@ pub(crate) struct ThreadMetadataSync { last_touch_persisted_at: Option, defer_create_update_until_history_exists: bool, defer_resume_update_until_append: bool, + needs_resume_history_observation: bool, } pub(crate) struct PendingThreadMetadataPatch { @@ -87,6 +88,7 @@ impl ThreadMetadataSync { last_touch_persisted_at: None, defer_create_update_until_history_exists: true, defer_resume_update_until_append: false, + needs_resume_history_observation: false, } } @@ -106,15 +108,28 @@ impl ThreadMetadataSync { last_touch_persisted_at: None, defer_create_update_until_history_exists: false, defer_resume_update_until_append: false, + needs_resume_history_observation: params.history.is_none(), }; if let Some(history) = params.history.as_deref() { - let update = sync.observe_resume_history(history); - sync.merge_pending_update(update); - sync.defer_resume_update_until_append = sync.pending_update.is_some(); + sync.observe_loaded_resume_history(history); } sync } + pub(crate) fn needs_resume_history_observation(&self) -> bool { + self.needs_resume_history_observation + } + + pub(crate) fn observe_loaded_resume_history(&mut self, items: &[RolloutItem]) { + if !self.needs_resume_history_observation && self.pending_update.is_some() { + return; + } + let update = self.observe_resume_history(items); + self.merge_pending_update(update); + self.defer_resume_update_until_append = self.pending_update.is_some(); + self.needs_resume_history_observation = false; + } + pub(crate) fn take_pending_update(&self) -> Option { self.pending_update .clone() @@ -127,6 +142,9 @@ impl ThreadMetadataSync { pub(crate) fn take_pending_update_for_existing_history( &self, ) -> Option { + if self.needs_resume_history_observation { + return None; + } if self.defer_create_update_until_history_exists { return None; } @@ -548,11 +566,42 @@ mod tests { ); } + #[test] + fn deferred_resume_history_observation_keeps_existing_preview_fields() { + let thread_id = ThreadId::new(); + let mut params = resume_params(thread_id, Vec::new()); + params.history = None; + params.defer_metadata_history_load = true; + let mut sync = ThreadMetadataSync::for_resume(¶ms); + + assert!(sync.needs_resume_history_observation()); + assert!(sync.take_pending_update_for_existing_history().is_none()); + + sync.observe_loaded_resume_history(&[RolloutItem::EventMsg(EventMsg::UserMessage( + user_message("first user text"), + ))]); + let update = sync + .observe_appended_items(&[RolloutItem::EventMsg(EventMsg::UserMessage(user_message( + "later user text", + )))]) + .expect("first append should flush resume metadata"); + + assert!(!sync.needs_resume_history_observation()); + assert_eq!(update.patch.preview.as_deref(), Some("first user text")); + assert_eq!(update.patch.title.as_deref(), Some("first user text")); + assert_eq!( + update.patch.first_user_message.as_deref(), + Some("first user text") + ); + assert!(update.patch.updated_at.is_some()); + } + fn resume_params(thread_id: ThreadId, history: Vec) -> ResumeThreadParams { ResumeThreadParams { thread_id, rollout_path: None, history: Some(history), + defer_metadata_history_load: false, include_archived: false, metadata: ThreadPersistenceMetadata { cwd: None, diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 2e3c60723feb..f70891bc308a 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -94,6 +94,9 @@ pub struct ResumeThreadParams { pub rollout_path: Option, /// Known replay history for the resumed thread, if already loaded by the caller. pub history: Option>, + /// Defer loading history for metadata repair until the next append. + #[serde(default)] + pub defer_metadata_history_load: bool, /// Whether archived threads may be reopened. pub include_archived: bool, /// Metadata for future writes appended to the resumed live thread. From ce3329b3e5bda2a55ab6cdefbf8c6ec8fa1b5484 Mon Sep 17 00:00:00 2001 From: anais Date: Wed, 17 Jun 2026 22:33:52 -0700 Subject: [PATCH 2/3] address app-server review feedback --- codex-rs/rollout/src/recorder.rs | 20 +++++++++++--------- codex-rs/thread-store/src/local/helpers.rs | 5 +++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 0c251b03444e..9a53aecac266 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::fs; use std::fs::File; +use std::io::BufRead; use std::io::Error as IoError; use std::path::Path; use std::path::PathBuf; @@ -959,14 +960,15 @@ impl RolloutRecorder { fn load_plain_rollout_items_blocking( path: &Path, ) -> std::io::Result<(Vec, Option, usize)> { - let contents = std::fs::read_to_string(path)?; + let reader = std::io::BufReader::new(File::open(path)?); let mut items: Vec = Vec::new(); let mut thread_id: Option = None; let mut parse_errors = 0usize; let mut saw_non_empty_line = false; - for line in contents.lines() { + for line in reader.lines() { + let line = line?; parse_rollout_item_line( - line, + line.as_str(), &mut items, &mut thread_id, &mut parse_errors, @@ -1875,6 +1877,10 @@ impl From for ThreadsPage { } fn thread_item_from_state_list_item(item: codex_state::ThreadListItem) -> ThreadItem { + let source = serde_json::from_str(item.source.as_str()) + .or_else(|_| serde_json::from_value(Value::String(item.source))) + .unwrap_or(SessionSource::Unknown); + let parent_thread_id = source.parent_thread_id(); ThreadItem { path: item.rollout_path, thread_id: Some(item.id), @@ -1885,12 +1891,8 @@ fn thread_item_from_state_list_item(item: codex_state::ThreadListItem) -> Thread git_branch: item.git_branch, git_sha: item.git_sha, git_origin_url: item.git_origin_url, - source: Some( - serde_json::from_str(item.source.as_str()) - .or_else(|_| serde_json::from_value(Value::String(item.source))) - .unwrap_or(SessionSource::Unknown), - ), - parent_thread_id: None, + source: Some(source), + parent_thread_id, agent_nickname: item.agent_nickname, agent_role: item.agent_role, model_provider: Some(item.model_provider), diff --git a/codex-rs/thread-store/src/local/helpers.rs b/codex-rs/thread-store/src/local/helpers.rs index 22be50048231..1f76a6692081 100644 --- a/codex-rs/thread-store/src/local/helpers.rs +++ b/codex-rs/thread-store/src/local/helpers.rs @@ -210,6 +210,7 @@ pub(super) fn stored_thread_from_state_list_item( } else { model_provider }; + let source = parse_session_source(&source); let created_at = truncate_datetime_to_seconds(created_at); let updated_at = truncate_datetime_to_millis(updated_at); let recency_at = truncate_datetime_to_millis(recency_at); @@ -219,7 +220,7 @@ pub(super) fn stored_thread_from_state_list_item( extra_config: None, rollout_path: Some(rollout_path), forked_from_id: None, - parent_thread_id, + parent_thread_id: parent_thread_id.or_else(|| source.parent_thread_id()), preview, name, model_provider, @@ -231,7 +232,7 @@ pub(super) fn stored_thread_from_state_list_item( archived_at: archived.then_some(updated_at), cwd, cli_version, - source: parse_session_source(&source), + source, thread_source: None, agent_nickname, agent_role, From 17171be10865fa438bcde65fede61c35bd9f8a36 Mon Sep 17 00:00:00 2001 From: anais Date: Thu, 18 Jun 2026 14:07:31 -0700 Subject: [PATCH 3/3] fix thread resume and cwd regressions --- .../request_processors/thread_processor.rs | 15 ++-- .../tests/suite/v2/thread_resume.rs | 71 +++++++++++++++++++ 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 14e0323e413f..f248645a01f9 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -192,7 +192,7 @@ fn normalize_thread_list_cwd_filters( }; let mut normalized_cwds = Vec::with_capacity(cwds.len()); for cwd in cwds { - let cwd = AbsolutePathBuf::from_absolute_path(cwd.as_str()) + let cwd = AbsolutePathBuf::relative_to_current_dir(cwd.as_str()) .map(AbsolutePathBuf::into_path_buf) .map_err(|err| { invalid_params(format!("invalid thread/list cwd filter `{cwd}`: {err}")) @@ -4219,12 +4219,13 @@ pub(crate) fn thread_from_stored_thread( branch: info.branch, origin_url: info.repository_url, }); - let cwd = - AbsolutePathBuf::from_absolute_path(path_utils::normalize_for_native_workdir(thread.cwd)) - .unwrap_or_else(|err| { - warn!("failed to normalize thread cwd while reading stored thread: {err}"); - fallback_cwd.clone() - }); + let cwd = AbsolutePathBuf::relative_to_current_dir(path_utils::normalize_for_native_workdir( + thread.cwd, + )) + .unwrap_or_else(|err| { + warn!("failed to normalize thread cwd while reading stored thread: {err}"); + fallback_cwd.clone() + }); let source = with_thread_spawn_agent_metadata( thread.source, thread.agent_nickname.clone(), diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index c191263d1cbd..c0f3f5312455 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -258,6 +258,77 @@ async fn thread_resume_with_empty_path_uses_running_thread_id() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_resume_then_turn_preserves_first_user_metadata() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + let first_user_message = "original first user message"; + let thread_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + first_user_message, + Some("mock_provider"), + /*git_info*/ None, + )?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread_id.clone(), + exclude_turns: true, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed, .. + } = to_response::(resume_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: resumed.id, + client_user_message_id: None, + input: vec![UserInput::Text { + text: "later user message".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let state_db = + StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?; + let thread_metadata = state_db + .get_thread(ThreadId::from_string(&thread_id)?) + .await? + .expect("resumed thread metadata should exist"); + assert_eq!(thread_metadata.preview.as_deref(), Some(first_user_message)); + assert_eq!( + thread_metadata.first_user_message.as_deref(), + Some(first_user_message) + ); + + Ok(()) +} + #[tokio::test] async fn thread_resume_running_thread_uses_cached_instruction_sources() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await;