diff --git a/src/agentsight/src/config.rs b/src/agentsight/src/config.rs index 10e8cbf66..d430524d9 100644 --- a/src/agentsight/src/config.rs +++ b/src/agentsight/src/config.rs @@ -236,6 +236,8 @@ struct JsonFullConfig { #[serde(default)] deadloop: Option, #[serde(default)] + reactive: Option, + #[serde(default)] cgroup_filter_enabled: Option, #[serde(default)] cgroup_ids: Option>, @@ -348,6 +350,17 @@ pub struct JsonRuntime { pub sls_logtail_path: Option, } +/// Reactive exporter configuration section +#[derive(serde::Deserialize, Clone, Debug)] +struct JsonReactive { + #[serde(default)] + enabled: Option, + #[serde(default)] + debounce_secs: Option, + #[serde(default)] + workspace: Option, +} + /// 加密配置:可选公钥(PEM 字符串)或公钥文件路径 #[derive(serde::Deserialize)] struct JsonEncryption { @@ -747,6 +760,14 @@ pub struct AgentsightConfig { // --- Runtime Resource Limits --- /// Bounded channel capacities, pending queue limits, etc. pub runtime_limits: RuntimeLimits, + + // --- Reactive Exporter Configuration --- + /// Enable the reactive exporter (observe→act pipeline) + pub reactive_enabled: Option, + /// Minimum seconds between consecutive checkpoint actions + pub reactive_debounce_secs: Option, + /// Workspace path for ws-ckpt checkpoint + pub reactive_workspace: Option, } impl Default for AgentsightConfig { @@ -821,6 +842,11 @@ impl Default for AgentsightConfig { // Runtime resource limits runtime_limits: RuntimeLimits::default(), + + // Reactive exporter defaults (disabled by default) + reactive_enabled: None, + reactive_debounce_secs: None, + reactive_workspace: None, } } } @@ -961,6 +987,13 @@ impl AgentsightConfig { } } + // 解析 reactive exporter 配置 + if let Some(ref r) = parsed.reactive { + self.reactive_enabled = r.enabled; + self.reactive_debounce_secs = r.debounce_secs; + self.reactive_workspace = r.workspace.clone(); + } + // Parse cgroup filter settings if let Some(v) = parsed.cgroup_filter_enabled { self.cgroup_filter_enabled = v; @@ -1796,4 +1829,22 @@ mod tests { assert!(config.features.token_consumption_enabled); assert!(config.features.sls_logtail_enabled); } + + #[test] + fn test_load_reactive_config() { + let dir = std::env::temp_dir().join(format!("test_reactive_{}", std::process::id())); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join("agentsight.json"); + std::fs::write( + &path, + r#"{"reactive": {"enabled": true, "debounce_secs": 10, "workspace": "/home/test"}}"#, + ) + .unwrap(); + let mut config = AgentsightConfig::default(); + config.load_from_file(&path).unwrap(); + assert_eq!(config.reactive_enabled, Some(true)); + assert_eq!(config.reactive_debounce_secs, Some(10)); + assert_eq!(config.reactive_workspace, Some("/home/test".to_string())); + std::fs::remove_dir_all(&dir).ok(); + } } diff --git a/src/agentsight/src/genai/mod.rs b/src/agentsight/src/genai/mod.rs index e16450819..8f372e0b6 100644 --- a/src/agentsight/src/genai/mod.rs +++ b/src/agentsight/src/genai/mod.rs @@ -13,6 +13,7 @@ pub mod id_resolver; pub mod instance_id; pub mod logtail; mod openai_parse; +pub mod reactive; pub mod semantic; pub mod storage; diff --git a/src/agentsight/src/genai/reactive.rs b/src/agentsight/src/genai/reactive.rs new file mode 100644 index 000000000..6abcbc498 --- /dev/null +++ b/src/agentsight/src/genai/reactive.rs @@ -0,0 +1,769 @@ +use std::process::{Command, Stdio}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, SyncSender}; +use std::thread; +use std::time::{Duration, Instant}; + +use super::exporter::GenAIExporter; +use super::semantic::GenAISemanticEvent; + +fn should_attempt_checkpoint( + last_ckpt: &Instant, + debounce: Duration, + ws_ckpt_available: bool, +) -> CheckpointDecision { + if last_ckpt.elapsed() < debounce { + return CheckpointDecision::Debounced; + } + if !ws_ckpt_available { + return CheckpointDecision::Unavailable; + } + CheckpointDecision::Proceed +} + +#[derive(Debug, PartialEq)] +enum CheckpointDecision { + Proceed, + Debounced, + Unavailable, +} + +fn make_snapshot_id(reason: &str) -> String { + format!( + "auto-{}-{reason}", + chrono::Utc::now().format("%Y%m%dT%H%M%S") + ) +} + +fn make_checkpoint_message(reason: &str, conversation_id: Option<&str>) -> String { + format!( + "reactive: {reason} (conv={})", + conversation_id.unwrap_or("unknown") + ) +} + +fn run_checkpoint_command(workspace: &str, snapshot_id: &str, msg_text: &str) -> bool { + match Command::new("ws-ckpt") + .args([ + "checkpoint", + "-w", + workspace, + "-i", + snapshot_id, + "-m", + msg_text, + ]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + { + Ok(mut child) => { + let deadline = Instant::now() + Duration::from_secs(10); + loop { + match child.try_wait() { + Ok(Some(s)) if s.success() => { + log::info!("[reactive] checkpoint created: {snapshot_id}"); + return true; + } + Ok(Some(s)) => { + log::warn!("[reactive] ws-ckpt exited {s}"); + return false; + } + Ok(None) if Instant::now() >= deadline => { + log::warn!("[reactive] ws-ckpt timed out, killing"); + let _ = child.kill(); + let _ = child.wait(); + return false; + } + Ok(None) => thread::sleep(Duration::from_millis(100)), + Err(e) => { + log::warn!("[reactive] ws-ckpt wait error: {e}"); + return false; + } + } + } + } + Err(e) => { + log::warn!("[reactive] ws-ckpt spawn failed: {e}"); + false + } + } +} + +pub struct ReactiveConfig { + pub enabled: bool, + pub debounce_secs: u64, + pub workspace_path: Option, +} + +impl Default for ReactiveConfig { + fn default() -> Self { + Self { + enabled: false, + debounce_secs: 30, + workspace_path: None, + } + } +} + +#[allow(dead_code)] +enum Msg { + Checkpoint { + reason: String, + conversation_id: Option, + }, + InterruptionAlert { + interruption_type: String, + conversation_id: Option, + }, + TokenAccum { + agent_name: String, + input_tokens: u64, + has_cache: bool, + }, + Advisory { + message: String, + }, + Shutdown, +} + +use std::collections::HashMap as StdHashMap; + +struct AgentTokenState { + cumulative: u64, + any_cache_hit: bool, + window_start: Instant, + last_advisory: Option, +} + +/// Lightweight handle for forwarding interruption alerts to the ReactiveExporter +/// background thread. Send+Sync+Clone — safe to store in AgentSight and call from +/// detect_and_store_interruptions(). +#[derive(Clone)] +pub struct ReactiveNotifier { + tx: SyncSender, +} + +impl ReactiveNotifier { + pub fn notify_interruption(&self, interruption_type: &str, conversation_id: Option) { + let _ = self.tx.try_send(Msg::InterruptionAlert { + interruption_type: interruption_type.to_string(), + conversation_id, + }); + } +} + +pub struct ReactiveExporter { + tx: SyncSender, + shutdown: Arc, + handle: Option>, +} + +impl ReactiveExporter { + pub fn new(config: ReactiveConfig) -> Option<(Self, ReactiveNotifier)> { + if !config.enabled { + return None; + } + + let ws_ckpt_available = Command::new("ws-ckpt") + .arg("--version") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .is_ok(); + + if !ws_ckpt_available { + log::warn!("[reactive] ws-ckpt not found, checkpoint action disabled"); + } + + let workspace = config + .workspace_path + .or_else(|| std::env::var("AGENTSIGHT_WORKSPACE").ok()) + .unwrap_or_else(|| "/root".to_string()); + + let debounce = Duration::from_secs(config.debounce_secs); + let (tx, rx) = mpsc::sync_channel::(32); + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = Arc::clone(&shutdown); + + let handle = thread::Builder::new() + .name("reactive-exporter".into()) + .spawn(move || { + let mut last_ckpt = Instant::now() - debounce; + let mut agent_tokens: StdHashMap = StdHashMap::new(); + let one_hour = Duration::from_secs(3600); + + while !shutdown_clone.load(Ordering::Relaxed) { + let msg = match rx.recv_timeout(Duration::from_secs(1)) { + Ok(m) => m, + Err(mpsc::RecvTimeoutError::Timeout) => continue, + Err(mpsc::RecvTimeoutError::Disconnected) => break, + }; + + match msg { + Msg::Shutdown => break, + Msg::Advisory { message } => { + log::info!("[reactive] advisory: {message}"); + } + Msg::InterruptionAlert { + interruption_type, + conversation_id, + } => { + match should_attempt_checkpoint(&last_ckpt, debounce, ws_ckpt_available) { + CheckpointDecision::Debounced => { + log::debug!("[reactive] debounced interruption alert ({interruption_type})"); + continue; + } + CheckpointDecision::Unavailable => { + log::info!("[reactive] would checkpoint for {interruption_type} but ws-ckpt unavailable"); + continue; + } + CheckpointDecision::Proceed => {} + } + let snapshot_id = make_snapshot_id(&interruption_type); + let msg_text = make_checkpoint_message(&interruption_type, conversation_id.as_deref()); + if run_checkpoint_command(&workspace, &snapshot_id, &msg_text) { + last_ckpt = Instant::now(); + } + } + Msg::TokenAccum { + agent_name, + input_tokens, + has_cache, + } => { + let state = agent_tokens.entry(agent_name.clone()).or_insert_with(|| { + AgentTokenState { + cumulative: 0, + any_cache_hit: false, + window_start: Instant::now(), + last_advisory: None, + } + }); + if state.window_start.elapsed() > one_hour { + state.cumulative = 0; + state.any_cache_hit = false; + state.window_start = Instant::now(); + } + state.cumulative += input_tokens; + if has_cache { + state.any_cache_hit = true; + } + if state.cumulative >= 200_000 + && !state.any_cache_hit + && state + .last_advisory + .is_none_or(|t| t.elapsed() > one_hour) + { + log::info!( + "[reactive] advisory: agent '{}' consumed {} input tokens with no prompt caching", + agent_name, state.cumulative + ); + state.last_advisory = Some(Instant::now()); + } + } + Msg::Checkpoint { + reason, + conversation_id, + } => { + match should_attempt_checkpoint(&last_ckpt, debounce, ws_ckpt_available) { + CheckpointDecision::Debounced => { + log::debug!("[reactive] debounced checkpoint ({reason})"); + continue; + } + CheckpointDecision::Unavailable => { + log::info!("[reactive] would checkpoint for {reason} but ws-ckpt unavailable"); + continue; + } + CheckpointDecision::Proceed => {} + } + let snapshot_id = make_snapshot_id(&reason); + let msg_text = make_checkpoint_message(&reason, conversation_id.as_deref()); + if run_checkpoint_command(&workspace, &snapshot_id, &msg_text) { + last_ckpt = Instant::now(); + } + } + } + } + }) + .ok()?; + + let notifier = ReactiveNotifier { tx: tx.clone() }; + Some(( + Self { + tx, + shutdown, + handle: Some(handle), + }, + notifier, + )) + } + + /// Send an interruption alert from the existing detection pipeline. + /// Called by unified.rs after detect_and_store_interruptions() for Critical events. + pub fn notify_interruption(&self, interruption_type: &str, conversation_id: Option) { + let _ = self.tx.try_send(Msg::InterruptionAlert { + interruption_type: interruption_type.to_string(), + conversation_id, + }); + } + + fn detect_critical(events: &[GenAISemanticEvent]) -> Option<(String, Option)> { + for event in events { + if let GenAISemanticEvent::LLMCall(call) = event { + let conv_id = call.metadata.get("conversation_id").cloned(); + if let Some(ref err) = call.error { + let lower = err.to_lowercase(); + if lower.contains("crash") + || lower.contains("oom") + || lower.contains("sigkill") + || lower.contains("signal 9") + { + return Some(("agent_crash".into(), conv_id)); + } + if lower.contains("context_length_exceeded") + || lower.contains("context_window") + || lower.contains("maximum context length") + { + return Some(("context_overflow".into(), conv_id)); + } + } + } + } + None + } +} + +impl GenAIExporter for ReactiveExporter { + fn name(&self) -> &str { + "reactive" + } + + fn export(&self, events: &[GenAISemanticEvent]) { + if let Some((reason, conv_id)) = Self::detect_critical(events) { + let _ = self.tx.try_send(Msg::Checkpoint { + reason, + conversation_id: conv_id, + }); + } + // Per-call token accumulation for cumulative advisory + for event in events { + if let GenAISemanticEvent::LLMCall(call) = event { + if let Some(ref usage) = call.token_usage { + let has_cache = usage.cache_read_input_tokens.unwrap_or(0) > 0 + || usage.cache_creation_input_tokens.unwrap_or(0) > 0; + let _ = self.tx.try_send(Msg::TokenAccum { + agent_name: call + .agent_name + .clone() + .unwrap_or_else(|| call.process_name.clone()), + input_tokens: usage.input_tokens as u64, + has_cache, + }); + } + } + } + } +} + +impl Drop for ReactiveExporter { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::Relaxed); + let _ = self.tx.try_send(Msg::Shutdown); + if let Some(h) = self.handle.take() { + let _ = h.join(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::genai::semantic::{ + GenAISemanticEvent, LLMCall, LLMRequest, LLMResponse, TokenUsage, + }; + use std::collections::HashMap; + + fn make_call( + error: Option<&str>, + input_tokens: u32, + cache_read: Option, + ) -> GenAISemanticEvent { + let mut metadata = HashMap::new(); + metadata.insert("conversation_id".to_string(), "conv-1".to_string()); + GenAISemanticEvent::LLMCall(LLMCall { + call_id: "test".into(), + start_timestamp_ns: 0, + end_timestamp_ns: 0, + duration_ns: 0, + provider: "openai".into(), + model: "gpt-4".into(), + request: LLMRequest { + messages: vec![], + temperature: None, + max_tokens: None, + frequency_penalty: None, + presence_penalty: None, + top_p: None, + top_k: None, + seed: None, + stop_sequences: None, + stream: false, + tools: None, + raw_body: None, + }, + response: LLMResponse { + messages: vec![], + streamed: false, + raw_body: None, + }, + token_usage: Some(TokenUsage { + input_tokens, + output_tokens: 100, + total_tokens: input_tokens + 100, + cache_creation_input_tokens: None, + cache_read_input_tokens: cache_read, + }), + error: error.map(String::from), + pid: 1234, + process_name: "test-agent".into(), + agent_name: Some("TestAgent".into()), + metadata, + }) + } + + #[test] + fn should_attempt_checkpoint_proceed() { + let old = Instant::now() - Duration::from_secs(60); + assert_eq!( + should_attempt_checkpoint(&old, Duration::from_secs(30), true), + CheckpointDecision::Proceed + ); + } + + #[test] + fn should_attempt_checkpoint_debounced() { + let recent = Instant::now(); + assert_eq!( + should_attempt_checkpoint(&recent, Duration::from_secs(30), true), + CheckpointDecision::Debounced + ); + } + + #[test] + fn should_attempt_checkpoint_unavailable() { + let old = Instant::now() - Duration::from_secs(60); + assert_eq!( + should_attempt_checkpoint(&old, Duration::from_secs(30), false), + CheckpointDecision::Unavailable + ); + } + + #[test] + fn make_snapshot_id_contains_reason() { + let id = make_snapshot_id("agent_crash"); + assert!(id.starts_with("auto-")); + assert!(id.ends_with("-agent_crash")); + } + + #[test] + fn make_checkpoint_message_with_conv() { + let msg = make_checkpoint_message("retry_storm", Some("conv-42")); + assert_eq!(msg, "reactive: retry_storm (conv=conv-42)"); + } + + #[test] + fn make_checkpoint_message_without_conv() { + let msg = make_checkpoint_message("crash", None); + assert_eq!(msg, "reactive: crash (conv=unknown)"); + } + + #[test] + fn detect_critical_finds_sigkill() { + let events = vec![make_call(Some("killed by signal 9"), 1000, None)]; + let (reason, _) = ReactiveExporter::detect_critical(&events).unwrap(); + assert_eq!(reason, "agent_crash"); + } + + #[test] + fn detect_critical_finds_sigkill_keyword() { + let events = vec![make_call(Some("SIGKILL received"), 1000, None)]; + let (reason, _) = ReactiveExporter::detect_critical(&events).unwrap(); + assert_eq!(reason, "agent_crash"); + } + + #[test] + fn detect_critical_finds_context_window() { + let events = vec![make_call( + Some("context_window exceeded for this request"), + 1000, + None, + )]; + let (reason, _) = ReactiveExporter::detect_critical(&events).unwrap(); + assert_eq!(reason, "context_overflow"); + } + + #[test] + fn reactive_notifier_sends_alert() { + let config = ReactiveConfig { + enabled: true, + debounce_secs: 60, + workspace_path: Some("/tmp".to_string()), + }; + let (exporter, notifier) = ReactiveExporter::new(config).unwrap(); + notifier.notify_interruption("retry_storm", Some("conv-99".into())); + std::thread::sleep(Duration::from_millis(200)); + drop(exporter); + } + + #[test] + fn exporter_name_is_reactive() { + use crate::genai::exporter::GenAIExporter; + let config = ReactiveConfig { + enabled: true, + debounce_secs: 60, + workspace_path: Some("/tmp".to_string()), + }; + let (exporter, _) = ReactiveExporter::new(config).unwrap(); + assert_eq!(exporter.name(), "reactive"); + drop(exporter); + } + + #[test] + fn export_token_accum_no_cache() { + use crate::genai::exporter::GenAIExporter; + let config = ReactiveConfig { + enabled: true, + debounce_secs: 60, + workspace_path: Some("/tmp".to_string()), + }; + let (exporter, _) = ReactiveExporter::new(config).unwrap(); + let event = make_call(None, 10_000, None); + exporter.export(&[event]); + std::thread::sleep(Duration::from_millis(200)); + drop(exporter); + } + + #[test] + fn detect_critical_finds_crash() { + let events = vec![make_call( + Some("process crashed with OOM killer"), + 1000, + None, + )]; + let result = ReactiveExporter::detect_critical(&events); + assert!(result.is_some()); + let (reason, conv) = result.unwrap(); + assert_eq!(reason, "agent_crash"); + assert_eq!(conv.as_deref(), Some("conv-1")); + } + + #[test] + fn detect_critical_ignores_normal_errors() { + let events = vec![make_call(Some("HTTP 429 rate limited"), 1000, None)]; + assert!(ReactiveExporter::detect_critical(&events).is_none()); + } + + #[test] + fn detect_critical_ignores_no_error() { + let events = vec![make_call(None, 1000, None)]; + assert!(ReactiveExporter::detect_critical(&events).is_none()); + } + + #[test] + fn detect_critical_finds_context_overflow() { + let events = vec![make_call( + Some("This model's maximum context length is 128000 tokens"), + 1000, + None, + )]; + let result = ReactiveExporter::detect_critical(&events); + assert!(result.is_some()); + let (reason, _) = result.unwrap(); + assert_eq!(reason, "context_overflow"); + } + + #[test] + fn detect_critical_finds_context_length_exceeded() { + let events = vec![make_call( + Some("context_length_exceeded: input too long"), + 1000, + None, + )]; + let (reason, _) = ReactiveExporter::detect_critical(&events).unwrap(); + assert_eq!(reason, "context_overflow"); + } + + #[test] + fn notify_interruption_does_not_panic() { + let config = ReactiveConfig { + enabled: true, + debounce_secs: 1, + workspace_path: Some("/tmp".to_string()), + }; + if let Some((exporter, _notifier)) = ReactiveExporter::new(config) { + exporter.notify_interruption("retry_storm", Some("conv-99".into())); + std::thread::sleep(Duration::from_millis(100)); + drop(exporter); + } + } + + #[test] + fn export_does_not_panic_on_disabled() { + let config = ReactiveConfig { + enabled: false, + ..Default::default() + }; + assert!(ReactiveExporter::new(config).is_none()); + } + + /// Integration test: export a crash event → background thread processes it → + /// ws-ckpt is spawned (will fail because daemon isn't running, but spawn + + /// timeout + kill must complete without panicking or hanging). + /// Also tests debounce: second call within debounce window is dropped. + #[test] + fn export_crash_event_triggers_checkpoint_attempt() { + use crate::genai::exporter::GenAIExporter; + + let config = ReactiveConfig { + enabled: true, + debounce_secs: 2, + workspace_path: Some("/tmp".to_string()), + }; + + let (exporter, _) = ReactiveExporter::new(config).unwrap(); + + let crash_event = make_call(Some("Process killed by OOM killer"), 1000, None); + let events = vec![crash_event]; + + // First export: should trigger checkpoint attempt + exporter.export(&events); + + // Give background thread time to spawn ws-ckpt + timeout. + // ws-ckpt without a running daemon hangs on socket connect until our + // 10s try_wait deadline kills it, so we need to wait >= 11s. + std::thread::sleep(Duration::from_secs(13)); + + // Second export within debounce window: should be debounced (no second spawn) + let crash_event2 = make_call(Some("Another OOM crash"), 1000, None); + exporter.export(&[crash_event2]); + std::thread::sleep(Duration::from_millis(200)); + + // Drop should complete promptly. The background thread either: + // - Is idle (debounced the second message) → exits on Shutdown within 1s + // - Is in try_wait loop for a second ws-ckpt → has up to 10s before it + // checks shutdown. We allow 12s total for Drop. + let start = std::time::Instant::now(); + drop(exporter); + let drop_time = start.elapsed(); + assert!( + drop_time < Duration::from_secs(12), + "Drop took too long ({drop_time:?}), background thread stuck" + ); + } + + /// Integration: context_overflow event triggers the full pipeline + /// (export → channel → background thread → ws-ckpt spawn attempt). + #[test] + fn export_context_overflow_triggers_checkpoint() { + use crate::genai::exporter::GenAIExporter; + + let config = ReactiveConfig { + enabled: true, + debounce_secs: 2, + workspace_path: Some("/tmp".to_string()), + }; + let exporter = match ReactiveExporter::new(config) { + Some((e, _)) => e, + None => { + eprintln!("ws-ckpt not installed, skipping"); + return; + } + }; + + let overflow_event = make_call( + Some("This model's maximum context length is 128000 tokens. You requested 200000."), + 1000, + None, + ); + exporter.export(&[overflow_event]); + + // Wait for ws-ckpt spawn + timeout + std::thread::sleep(Duration::from_secs(13)); + let start = std::time::Instant::now(); + drop(exporter); + assert!(start.elapsed() < Duration::from_secs(5)); + } + + /// Integration: notify_interruption triggers checkpoint attempt + /// (simulates unified.rs forwarding a RetryStorm detection). + #[test] + fn notify_interruption_triggers_checkpoint() { + let config = ReactiveConfig { + enabled: true, + debounce_secs: 2, + workspace_path: Some("/tmp".to_string()), + }; + let exporter = match ReactiveExporter::new(config) { + Some((e, _)) => e, + None => { + eprintln!("ws-ckpt not installed, skipping"); + return; + } + }; + + exporter.notify_interruption("retry_storm", Some("conv-42".into())); + + // Wait for ws-ckpt attempt + timeout + std::thread::sleep(Duration::from_secs(13)); + let start = std::time::Instant::now(); + drop(exporter); + assert!(start.elapsed() < Duration::from_secs(5)); + } + + /// Integration: cumulative token advisory fires after 200K input tokens + /// with no cache, debounced per-agent per-hour. + #[test] + fn cumulative_advisory_fires_at_threshold() { + use crate::genai::exporter::GenAIExporter; + + let config = ReactiveConfig { + enabled: true, + debounce_secs: 60, + workspace_path: Some("/tmp".to_string()), + }; + let exporter = match ReactiveExporter::new(config) { + Some((e, _)) => e, + None => { + eprintln!("ws-ckpt not installed, skipping"); + return; + } + }; + + // Send 5 calls × 50K tokens = 250K total, no cache + for _ in 0..5 { + let event = make_call(None, 50_000, None); + exporter.export(&[event]); + } + + // Give background thread time to process all TokenAccum messages + std::thread::sleep(Duration::from_millis(500)); + + // Send one with cache → should NOT reset (any_cache_hit is per-window) + // Actually it WILL set any_cache_hit=true for this agent. But advisory + // already fired (if at all) after the 4th message (200K reached). + // The test validates the pipeline doesn't panic and processes correctly. + + let cached_event = make_call(None, 10_000, Some(5_000)); + exporter.export(&[cached_event]); + std::thread::sleep(Duration::from_millis(200)); + + // Clean shutdown + let start = std::time::Instant::now(); + drop(exporter); + assert!( + start.elapsed() < Duration::from_secs(3), + "Drop should be fast (no ws-ckpt in this test path)" + ); + } +} diff --git a/src/agentsight/src/probes/codex_offsets.rs b/src/agentsight/src/probes/codex_offsets.rs index fb146522a..bca63b2cf 100644 --- a/src/agentsight/src/probes/codex_offsets.rs +++ b/src/agentsight/src/probes/codex_offsets.rs @@ -115,7 +115,7 @@ fn compute_head_sha256(path: &str) -> Option { let n = f.read(&mut buf).ok()?; buf.truncate(n); let hash = Sha256::digest(&buf); - Some(hash.iter().map(|b| format!("{:02x}", b)).collect()) + Some(hash.iter().map(|b| format!("{b:02x}")).collect()) } #[cfg(test)] diff --git a/src/agentsight/src/probes/elf_buildid.rs b/src/agentsight/src/probes/elf_buildid.rs index 28bf7a02a..84aac0f99 100644 --- a/src/agentsight/src/probes/elf_buildid.rs +++ b/src/agentsight/src/probes/elf_buildid.rs @@ -106,7 +106,7 @@ pub fn read_buildid(path: &str) -> Option { } fn hex_encode(bytes: &[u8]) -> String { - bytes.iter().map(|b| format!("{:02x}", b)).collect() + bytes.iter().map(|b| format!("{b:02x}")).collect() } #[cfg(test)] diff --git a/src/agentsight/src/server/token_savings.rs b/src/agentsight/src/server/token_savings.rs index ee5a1a6bc..5d88af1ec 100644 --- a/src/agentsight/src/server/token_savings.rs +++ b/src/agentsight/src/server/token_savings.rs @@ -351,13 +351,11 @@ pub(crate) fn build_explanation( ) -> String { if category == "mcp_response" { format!( - "MCP响应压缩: 原始 {} tokens → {} tokens,压缩率 {:.1}%。后续 {} 轮LLM调用均受益,复合节省 {} tokens。", - before_tokens, after_tokens, compression_ratio, compounding_turns, compounded + "MCP响应压缩: 原始 {before_tokens} tokens → {after_tokens} tokens,压缩率 {compression_ratio:.1}%。后续 {compounding_turns} 轮LLM调用均受益,复合节省 {compounded} tokens。" ) } else { format!( - "工具输出优化: 原始 {} tokens → {} tokens,压缩率 {:.1}%。后续 {} 轮LLM调用均受益,复合节省 {} tokens。", - before_tokens, after_tokens, compression_ratio, compounding_turns, compounded + "工具输出优化: 原始 {before_tokens} tokens → {after_tokens} tokens,压缩率 {compression_ratio:.1}%。后续 {compounding_turns} 轮LLM调用均受益,复合节省 {compounded} tokens。" ) } } @@ -414,7 +412,7 @@ pub(crate) fn generate_optimization_tips( if zero_savings_sessions > 0 { tips.push(OptimizationTip { level: "info".to_string(), - title: format!("发现 {} 个未优化会话", zero_savings_sessions), + title: format!("发现 {zero_savings_sessions} 个未优化会话"), description: "部分会话消耗较高但无优化记录,可能是对应 Agent 未启用 tokenless 或工具调用较少。建议检查这些会话的 Agent 配置。".to_string(), }); } @@ -424,8 +422,7 @@ pub(crate) fn generate_optimization_tips( level: "success".to_string(), title: "节省效果优秀".to_string(), description: format!( - "当前复合节省率 {:.1}%,表现优秀!继续保持当前配置。", - grand_compounded_rate + "当前复合节省率 {grand_compounded_rate:.1}%,表现优秀!继续保持当前配置。" ), }); } else if grand_compounded_rate >= 15.0 { @@ -433,8 +430,7 @@ pub(crate) fn generate_optimization_tips( level: "success".to_string(), title: "节省效果良好".to_string(), description: format!( - "当前复合节省率 {:.1}%,已达到良好水平。可尝试调整压缩策略以进一步提升。", - grand_compounded_rate + "当前复合节省率 {grand_compounded_rate:.1}%,已达到良好水平。可尝试调整压缩策略以进一步提升。" ), }); } diff --git a/src/agentsight/src/unified.rs b/src/agentsight/src/unified.rs index e7a5798a6..f2047fe51 100644 --- a/src/agentsight/src/unified.rs +++ b/src/agentsight/src/unified.rs @@ -110,6 +110,8 @@ pub struct AgentSight { process_killer: Arc, /// Cached feature flags so runtime paths can check them without the config. features: crate::config::FeatureFlags, + /// Notifier for forwarding Critical interruptions to ReactiveExporter + reactive_notifier: Option, } /// GenAI events waiting for session_id resolution via ResponseSessionMapper. @@ -331,8 +333,7 @@ impl AgentSight { if let Some(ref path) = sysom_logtail_path { log::info!( - "SLS sysom mode detected (path={}), skipping SQLite and default SLS exporter", - path + "SLS sysom mode detected (path={path}), skipping SQLite and default SLS exporter" ); if logtail_currently_enabled { let exporter = LogtailExporter::new_with_fixed_path( @@ -419,6 +420,24 @@ impl AgentSight { } } + // Register ReactiveExporter (observe→act: checkpoint on critical interruptions) + let reactive_notifier = { + let reactive_config = crate::genai::reactive::ReactiveConfig { + enabled: config.reactive_enabled.unwrap_or(false), + debounce_secs: config.reactive_debounce_secs.unwrap_or(30), + workspace_path: config.reactive_workspace.clone(), + }; + if let Some((exporter, notifier)) = + crate::genai::reactive::ReactiveExporter::new(reactive_config) + { + log::info!("ReactiveExporter enabled"); + genai_exporters.push(Box::new(exporter)); + Some(notifier) + } else { + None + } + }; + // Create analyzer with tokenizer if configured let analyzer = if let Some(ref tokenizer_path) = config.tokenizer_path { if Path::new(tokenizer_path).exists() { @@ -544,6 +563,7 @@ impl AgentSight { deadloop_kill_after_count: config.deadloop_kill_after_count, process_killer: Arc::new(crate::utils::process::LibcProcessKiller), features: config.features.clone(), + reactive_notifier, }) } @@ -1070,6 +1090,12 @@ impl AgentSight { ie.interruption_type, cid ); + if let Some(ref n) = self.reactive_notifier { + n.notify_interruption( + "retry_storm", + Some(cid.to_string()), + ); + } } } } @@ -1132,6 +1158,9 @@ impl AgentSight { cid, loop_event.detail ); + if let Some(ref n) = self.reactive_notifier { + n.notify_interruption("dead_loop", Some(cid.to_string())); + } // ── Auto-kill 止血 ── let new_count = existing_count + 1; diff --git a/src/agentsight/tests/memory_storage.rs b/src/agentsight/tests/memory_storage.rs index 88676023b..2a07a56b9 100644 --- a/src/agentsight/tests/memory_storage.rs +++ b/src/agentsight/tests/memory_storage.rs @@ -23,7 +23,7 @@ fn make_llm_call(idx: usize) -> LLMCall { messages: vec![InputMessage { role: "user".to_string(), parts: vec![MessagePart::Text { - content: format!("hello world request number {}", idx), + content: format!("hello world request number {idx}"), }], name: None, }], @@ -44,7 +44,7 @@ fn make_llm_call(idx: usize) -> LLMCall { messages: vec![OutputMessage { role: "assistant".to_string(), parts: vec![MessagePart::Text { - content: format!("hello world response number {}", idx), + content: format!("hello world response number {idx}"), }], name: None, finish_reason: Some("stop".to_string()), @@ -54,7 +54,7 @@ fn make_llm_call(idx: usize) -> LLMCall { }; let mut call = LLMCall::new( - format!("call-{}", idx), + format!("call-{idx}"), 0, "openai".to_string(), "gpt-4o".to_string(), @@ -142,8 +142,8 @@ fn sqlite_batch_lowers_write_overhead() { }), ); - println!("no batch: {:?}", no_batch); - println!("with batch: {:?}", with_batch); + println!("no batch: {no_batch:?}"); + println!("with batch: {with_batch:?}"); // NOTE: We do NOT assert batch is faster than no-batch because in CI // environments (shared runners, cold disk cache) the difference can be