diff --git a/src/agentsight/AGENTS.md b/src/agentsight/AGENTS.md index edab14c19..634cca7b1 100644 --- a/src/agentsight/AGENTS.md +++ b/src/agentsight/AGENTS.md @@ -77,10 +77,10 @@ eBPF Probes → Event → Parser → ParsedMessage → Aggregator → Aggregated | 模块 | 位置 | 职责 | 关键类型 | |------|------|------|----------| -| **Probes** | `src/probes/` | eBPF 探针管理 | `Probes`, `ProbesPoller`, `SslSniff`, `ProcMon`, `FileWatch`, `FileWriteProbe`, `UdpDns`, `TcpSniff` | +| **Probes** | `src/probes/` | eBPF 探针管理 | `Probes`, `ProbesPoller`, `SslSniff`, `ProcMon`, `FileWatch`, `FileWriteProbe`, `UdpDns`, `TcpSniff`, `codex_offsets`, `elf_buildid` | | **Event** | `src/event.rs` | 统一事件枚举 | `Event::{Ssl, Proc, ProcMon, FileWatch, FileWrite, UdpDns}` | | **Parser** | `src/parser/` | 协议解析(HTTP/1.x, HTTP/2, SSE, ProcTrace) | `Parser`, `ParsedMessage` | -| **Aggregator** | `src/aggregator/` | 请求-响应关联 | `Aggregator`, `AggregatedResult` | +| **Aggregator** | `src/aggregator/` | 请求-响应关联 + SSE continuation buffer | `Aggregator`, `AggregatedResult` | | **Analyzer** | `src/analyzer/` | Token/审计/消息分析 | `Analyzer`, `AnalysisResult` | | **GenAI** | `src/genai/` | 语义事件构建+导出 | `GenAIBuilder`, `GenAISemanticEvent`, `GenAIExporter` | | **Storage** | `src/storage/` | SQLite 持久化 | `Storage`, `SqliteStore`, `AuditStore`, `TokenStore` | @@ -113,6 +113,23 @@ eBPF Probes → Event → Parser → ParsedMessage → Aggregator → Aggregated 构建时 `build.rs` 通过 `libbpf-cargo` 自动生成 eBPF skeleton。 +### Codex CLI 适配(三级回退) + +Codex CLI 静态链接 aws-lc/BoringSSL,无导出符号。`attach_process` 使用三级回退策略: + +1. **符号表查找**(Tier 1):`nm` 读取 `.symtab` / `.dynsym` 中的 `SSL_write_ex` / `SSL_read_ex` +2. **字节模式匹配**(Tier 2):扫描 `.text` 段中的 BoringSSL 函数 prologue 模式 +3. **Offset 表查找**(Tier 3):`agentsight.json → codex_offsets.entries` 按 fingerprint(file_size + head_64k_sha256 + BuildID)匹配 + +使用 `scripts/extract-codex-offsets.py` 提取新版本 offset 并更新 `agentsight.json`。 +详见 [Codex CLI 适配文档](docs/codex-adaptation.md)。 + +### SSE Continuation Buffer + +OpenAI Responses API 的 `response.completed` 事件可能跨多个 TLS record。 +`HttpConnectionAggregator` 在 SseActive 状态下为 `/v1/responses` 路径缓冲原始 SSL 字节, +供下游 `Analyzer::extract_token_from_sse` 在标准 SSE 解析失败时回退扫描。 + ## 7. CLI Subcommands | 命令 | 入口 | 功能 | @@ -219,6 +236,7 @@ Agent 规则配置文件路径:`/etc/agentsight/config.json`(可通过 `--co - [eBPF Probes 设计](docs/design-docs/ebpf-probes.md) - [数据流水线设计](docs/design-docs/data-pipeline.md) - [GenAI 语义层设计](docs/design-docs/genai-semantic.md) +- [Codex CLI 适配](docs/codex-adaptation.md) — 三级回退 offset 查找 + SSE continuation buffer - [常见踩坑记录](docs/PITFALLS.md) — AI agent 和新贡献者最容易踩的坑 - [架构决策记录(ADR)](docs/adr/) — 关键架构选型的背景和理由 diff --git a/src/agentsight/agentsight.json b/src/agentsight/agentsight.json index 545a50199..5fab83ef1 100644 --- a/src/agentsight/agentsight.json +++ b/src/agentsight/agentsight.json @@ -19,6 +19,8 @@ {"rule": ["hermes*"], "agent_name": "Hermes"}, {"rule": ["*python*", "*hermes*"], "agent_name": "Hermes"}, {"rule": ["*python*", "-m", "*hermes*"], "agent_name": "Hermes"}, + {"rule": ["*codex*"], "agent_name": "Codex"}, + {"rule": ["*node*", "*codex*"], "agent_name": "Codex"}, {"rule": ["node*", "*/bin/co*"], "agent_name": "Cosh"}, {"rule": ["node*", "*/bin/cosh*"], "agent_name": "Cosh"}, {"rule": ["node*", "*/bin/copliot*"], "agent_name": "Cosh"}, @@ -33,5 +35,20 @@ {"rule": ["openclaw"], "agent_name": "OpenClaw"}, {"rule": ["claude*"], "agent_name": "Claude"} ] + }, + "codex_offsets": { + "schema_version": 1, + "entries": [ + { + "codex_version": "0.141.0", + "fingerprint": { "file_size": 276579568, "head_64k_sha256": "f015ddd2a687c1fc0b3ce70d898c0a68eeab88ad0040e79b0fe49a8545ff52a9" }, + "offsets": { "ssl_write": 210691872, "ssl_read": 210691280, "ssl_do_handshake": 210689600, "write_is_ex": true, "read_is_ex": true } + }, + { + "codex_version": "0.137.0", + "fingerprint": { "file_size": 227758400, "head_64k_sha256": "49b0a1c3a831071e9766cd0db83537a5270a374f8ac11783264a5a354c3b544a" }, + "offsets": { "ssl_write": 172964768, "ssl_read": 172964176, "ssl_do_handshake": 172962496, "write_is_ex": true, "read_is_ex": true } + } + ] } -} \ No newline at end of file +} diff --git a/src/agentsight/docs/codex-adaptation.md b/src/agentsight/docs/codex-adaptation.md new file mode 100644 index 000000000..646a2ec1b --- /dev/null +++ b/src/agentsight/docs/codex-adaptation.md @@ -0,0 +1,154 @@ +# Codex CLI 适配指南 + +AgentSight 通过 eBPF uprobe 捕获 Codex CLI 的 TLS 明文流量。Codex 静态链接 aws-lc(BoringSSL 兼容 C ABI),并使用 `SSL_write_ex` / `SSL_read_ex` 作为加密入口。当二进制保留符号时按符号挂载即可;剥离了符号时需要通过内置 offset 表查表。 + +## 三级回退 + +| Tier | 方式 | 适用场景 | +|------|------|---------| +| 1 | 按符号名挂 uprobe | 二进制保留 `.dynsym` / `.symtab` 时(自编译默认) | +| 2 | 字节码 pattern 扫描 | 已收录指纹的常见 BoringSSL 构建 | +| 3 | 内置 offset 表查表 | 已收录版本的 stripped binary | + +未命中三级则放弃挂载并打印 warn 日志。 + +## 内置版本 + +写入 `src/agentsight/agentsight.json -> codex_offsets.entries` 即可。当前内置: + +| codex 版本 | 状态 | +|------------|------| +| 0.141.0 | Tier 3 `SSL_*_ex` | +| 0.137.0 | Tier 3 `SSL_*_ex` | + +## 为新版本提取偏移 + +### 前置:拿到一份**带符号**的同版本二进制 + +线上 codex release 是 stripped binary,nm 拿不到符号。你需要先准备一份**带符号**副本,再用它跑提取脚本。三种方式按优先级: + +**方式 A:官方 symbols 包(0.140 及以上推荐)** + +0.140 起 codex GitHub release 附带 `codex-symbols-x86_64-unknown-linux-musl.tar.gz`。下载并解压即可: + +```bash +VERSION=0.141.0 # 替换成你的版本 +curl -L -o codex-symbols.tar.gz \ + "https://github.com/openai/codex/releases/download/rust-v${VERSION}/codex-symbols-x86_64-unknown-linux-musl.tar.gz" +tar xzf codex-symbols.tar.gz +# 解压出 codex-symbols-x86_64-unknown-linux-musl/codex.debug,nm 可识别符号 +``` + +**方式 B:自行编译,不要 strip(0.139 及以下,或希望可复现)** + +0.139 及以下 release 没有 symbols 包;此时直接 clone 源码、checkout 对应 tag、编译时不要 strip: + +```bash +git clone https://github.com/openai/codex.git +cd codex +git checkout rust-v0.137.0 # 替换成你的版本 + +# 编译前确认 codex-rs/Cargo.toml 里 [profile.release] 没有以下任一行: +# strip = true +# strip = "symbols" +# 若有,注释掉或改成 strip = "none" +cargo build --release -p codex --target x86_64-unknown-linux-musl +# 产物:target/x86_64-unknown-linux-musl/release/codex +nm --defined-only target/x86_64-unknown-linux-musl/release/codex | grep SSL_ # 验证有符号 +``` + +> Tip:debug build (`cargo build` 不加 `--release`) 也带全套符号,但函数地址 / 二进制布局跟 release 不一致,不能直接用于线上 stripped binary 的偏移提取。请用 release 构建。 + +**方式 C:本机已经在跑带符号的版本** + +如果是从源码 `cargo install --path ...` 安装的,本机 codex 一般就带符号,直接用即可。 + +### 另外还需要 + +- 一份**目标**(运行中那份) stripped binary —— 用于取指纹 +- Python 3 + `nm`、`readelf`、`sha256sum` + +### 步骤 + +1. 取 stripped binary 的指纹: + +```bash +stat --printf='%s\n' /path/to/codex-stripped +head -c 65536 /path/to/codex-stripped | sha256sum +readelf -n /path/to/codex-stripped | grep "Build ID" # 如有 +``` + +2. 在**带符号**那份上跑提取脚本: + +```bash +python3 src/agentsight/scripts/extract-codex-offsets.py /path/to/codex-with-symbols +``` + +脚本会优先匹配 `SSL_write_ex` / `SSL_read_ex` / `SSL_do_handshake`,若 `_ex` 变体不存在则退到 `SSL_write` / `SSL_read`,并据此生成 `write_is_ex` / `read_is_ex` 标志。输出形如: + +```json +{ + "codex_version": "0.141.0", + "fingerprint": { + "file_size": 276579568, + "head_64k_sha256": "f015ddd2a687c1fc0b3ce70d898c0a68eeab88ad0040e79b0fe49a8545ff52a9" + }, + "offsets": { + "ssl_write": 210691872, + "ssl_read": 210691280, + "ssl_do_handshake": 210689600, + "write_is_ex": true, + "read_is_ex": true + } +} +``` + +3. 用**步骤 1** 拿到的 stripped binary 指纹覆盖 `fingerprint`,然后把整条 entry 追加到 `agentsight.json` 的 `codex_offsets.entries` 数组里。如果有 BuildID,也加进 `fingerprint`,AgentSight 会优先按 BuildID 匹配。 + +4. 验证:重新启动 agentsight 后日志会出现: + +``` +[attach_process] pid=: codex offset table matched for /proc//root/.../codex + (write=0x<...>, read=0x<...>, handshake=0x<...>) +``` + +## 手动提取(无脚本时) + +```bash +# 优先用 _ex 变体(aws-lc / BoringSSL 默认导出) +nm --defined-only /path/to/codex-with-symbols | grep -E ' (SSL_write_ex|SSL_read_ex|SSL_do_handshake)$' + +# 如果没有 _ex,退到普通 SSL_write / SSL_read +nm --defined-only /path/to/codex-with-symbols | grep -E ' (SSL_write|SSL_read|SSL_do_handshake)$' +``` + +`nm` 输出第一列就是文件偏移(十六进制),直接转十进制填入 `offsets`。 + +## 完全没有带符号副本时的后备路线 + +极端情况下既拿不到 symbols 包也无法重新编译(比如目标版本源码已不可获取),可以用 `objdump` 加 `bpftrace` / tracefs uprobe 的命中数比对来人工锁定地址。已收录的 0.137 偏移就是这样拿到的。大致思路: + +1. `objdump -d` 找具备 aws-lc `SSL_write_ex` 序言特征的候选地址,可借助 `strings -t x` 反查 aws-lc 内部字符串引用收窄范围。 +2. 用 `bpftrace -e 'uprobe:/path/to/codex:0x { @ = count(); }'` 挂到候选 offset,同时跑一次真实 codex 请求;命中数应满足 `SSL_do_handshake = 1`、`SSL_write_ex ≈ 1`、`SSL_read_ex` 与 SSE chunk 数同阶。 +3. 用 uretprobe 抓 `$retval` 判断是否 `_ex` 变体(返回值仅 0/1,真实长度由第 4 参数 `*written` 回写),据此设置 `write_is_ex` / `read_is_ex`。 + +这条路线工作量大且容易出错,建议优先走前面三种带符号副本的方式。 + +## 符号与 ABI + +| 符号 | 角色 | 说明 | +|------|------|------| +| `SSL_write` / `SSL_write_ex` | 写端 | 应用明文进入加密管线入口 | +| `SSL_read` / `SSL_read_ex` | 读端 | 解密明文交给应用 | +| `SSL_do_handshake` | 握手 | 标记 TLS 握手完成 | + +`*_ex` 与普通 `SSL_write/SSL_read` 的关键差异:返回值是 0/1 成功标志,真实长度通过出参 `written` / `readbytes` 写回。这要求 BPF 探针在 `kretprobe` 中读取出参而不是返回值,因此 offset 表里 `write_is_ex` / `read_is_ex` 不能省。 + +## PR 模板 + +新增版本偏移请附: + +- [ ] codex 版本号 +- [ ] stripped binary 的 `file_size` + `head_64k_sha256`(必须)+ BuildID(如有) +- [ ] 提取方式(脚本 / 手动) +- [ ] 验证截图(agentsight 启动日志 `codex offset table matched`) diff --git a/src/agentsight/scripts/extract-codex-offsets.py b/src/agentsight/scripts/extract-codex-offsets.py new file mode 100755 index 000000000..72cf49454 --- /dev/null +++ b/src/agentsight/scripts/extract-codex-offsets.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +"""Extract aws-lc/BoringSSL function offsets from a codex binary. + +Codex CLI links aws-lc statically (BoringSSL-compatible C ABI). When the +release binary keeps its symbol table, the offsets of `SSL_write_ex`, +`SSL_read_ex`, and `SSL_do_handshake` can be read directly with `nm`. +Falls back to `SSL_write` / `SSL_read` when the `_ex` variants are absent. + +Produces a JSON entry suitable for the `codex_offsets.entries` array in +`agentsight.json` (Tier 3 offset table). + +Usage: + ./extract-codex-offsets.py + +`` should be a codex executable that still has a usable symbol +table (release builds with their dynsym intact, or a `*.symbols` package +downloaded from the codex release). + +The output JSON is printed to stdout. Append it to +`agentsight.json -> codex_offsets.entries`. +""" + +import argparse +import hashlib +import json +import os +import re +import subprocess +import sys +from typing import Dict, Optional + +HEAD_SIZE = 65536 + +# Preferred symbols (aws-lc _ex variants). `SSL_write_ex` and `SSL_read_ex` +# return 0/1 with the actual byte count in `*written` / `*readbytes`; the +# Rust user-space gate routes BPF probes accordingly when `write_is_ex` / +# `read_is_ex` is true. +EX_SYMBOLS = ("SSL_write_ex", "SSL_read_ex", "SSL_do_handshake") +PLAIN_SYMBOLS = ("SSL_write", "SSL_read", "SSL_do_handshake") + + +def sha256_head(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + h.update(f.read(HEAD_SIZE)) + return h.hexdigest() + + +def file_size(path: str) -> int: + return os.path.getsize(path) + + +def read_buildid(path: str) -> Optional[str]: + try: + out = subprocess.check_output( + ["readelf", "-n", path], stderr=subprocess.DEVNULL, universal_newlines=True + ) + except (subprocess.CalledProcessError, FileNotFoundError): + return None + for line in out.splitlines(): + line = line.strip() + if line.startswith("Build ID:"): + return line.split(":", 1)[1].strip() + return None + + +def detect_codex_version(path: str) -> Optional[str]: + # Codex embeds its version in the binary as `codex-cli ` or + # `rust-v`; either is acceptable for the human-readable label. + # Read in 1 MiB chunks to avoid loading the entire ~276 MB binary. + patterns = (r"codex-cli (\d+\.\d+\.\d+)", r"rust-v(\d+\.\d+\.\d+)") + try: + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1 << 20), b""): + text = chunk.decode("utf-8", errors="ignore") + for pat in patterns: + m = re.search(pat, text) + if m: + return m.group(1) + except OSError: + return None + return None + + +def nm_symbols(binary: str) -> Dict[str, int]: + """Return a {symbol_name: file_offset} map for defined text symbols. + + Tries the regular symbol table first (`nm --defined-only`), then falls + back to the dynamic symbol table (`nm -D --defined-only`) for release + binaries that have been stripped of `.symtab` but still expose + `.dynsym`. The regular table wins on collisions. + """ + syms = {} # type: Dict[str, int] + for args in ( + ["nm", "--defined-only", binary], + ["nm", "-D", "--defined-only", binary], + ): + try: + out = subprocess.check_output( + args, stderr=subprocess.DEVNULL, universal_newlines=True + ) + except (subprocess.CalledProcessError, FileNotFoundError): + continue + for line in out.splitlines(): + parts = line.split() + if len(parts) < 3: + continue + addr_str, kind, name = parts[0], parts[1], parts[2] + if kind not in ("T", "t", "W", "w"): + continue + try: + addr = int(addr_str, 16) + except ValueError: + continue + syms.setdefault(name, addr) + return syms + + +def pick_offsets(syms: Dict[str, int]) -> Optional[dict]: + have_ex = all(name in syms for name in EX_SYMBOLS) + if have_ex: + return { + "ssl_write": syms["SSL_write_ex"], + "ssl_read": syms["SSL_read_ex"], + "ssl_do_handshake": syms["SSL_do_handshake"], + "write_is_ex": True, + "read_is_ex": True, + } + have_plain = all(name in syms for name in PLAIN_SYMBOLS) + if have_plain: + return { + "ssl_write": syms["SSL_write"], + "ssl_read": syms["SSL_read"], + "ssl_do_handshake": syms["SSL_do_handshake"], + "write_is_ex": False, + "read_is_ex": False, + } + return None + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__.split("\n", 1)[0]) + p.add_argument("binary", help="codex binary (with symbol table)") + args = p.parse_args() + + binary = args.binary + if not os.path.isfile(binary): + print(f"error: not a file: {binary}", file=sys.stderr) + return 1 + + syms = nm_symbols(binary) + if not syms: + print( + "error: nm produced no symbols — is this a stripped binary?", + file=sys.stderr, + ) + return 2 + + offsets = pick_offsets(syms) + if offsets is None: + missing = [s for s in EX_SYMBOLS if s not in syms] + [ + s for s in PLAIN_SYMBOLS if s not in syms + ] + print( + "error: required SSL_* symbols not found " + f"(checked {EX_SYMBOLS} and {PLAIN_SYMBOLS}; missing={sorted(set(missing))})", + file=sys.stderr, + ) + return 3 + + entry = { + "codex_version": detect_codex_version(binary) or "unknown", + "fingerprint": { + "file_size": file_size(binary), + "head_64k_sha256": sha256_head(binary), + }, + "offsets": offsets, + } + build_id = read_buildid(binary) + if build_id: + entry["fingerprint"]["build_id"] = build_id + + json.dump(entry, sys.stdout, indent=2, sort_keys=False) + sys.stdout.write("\n") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/agentsight/src/aggregator/http/aggregator.rs b/src/agentsight/src/aggregator/http/aggregator.rs index f2642a9d4..d36a11749 100644 --- a/src/agentsight/src/aggregator/http/aggregator.rs +++ b/src/agentsight/src/aggregator/http/aggregator.rs @@ -70,6 +70,38 @@ pub enum ConnectionState { #[derive(Debug)] pub struct HttpConnectionAggregator { connections: LruCache, + /// Raw bytes received as RawData while the connection is in SseActive + /// state. Some providers (e.g. OpenAI Responses API via dashscope) emit + /// a final `response.completed` SSE event whose data payload spans + /// multiple TLS records: the first chunk parses as a SseEvent (with + /// truncated data), and subsequent chunks have no `data:` prefix so + /// they arrive as RawData. Buffering them lets us reconstruct the + /// original event for token-usage extraction when the stream ends. + sse_continuation_buffers: LruCache>, + /// Last `source_event` pointer appended into the continuation buffer per + /// connection. Used to dedup when a single SSL_read produces multiple + /// ParsedSseEvents that share the same source SslEvent buffer. + last_appended_src_ptr: LruCache, +} + +/// Returns true if oversized-SSE-event continuation buffering should run for +/// this SSE stream. Currently only the OpenAI Responses API +/// (`/v1/responses`, dashscope `/compatible-mode/v1/responses`) emits a +/// final `response.completed` event whose data field routinely spans +/// multiple TLS records, so we restrict the extra buffering to that path. +/// +/// Matching is intentionally precise: `ends_with("/responses")` catches +/// exact path endings (covers both `/v1/responses` and +/// `/compatible-mode/v1/responses`), while `contains("/responses?")` +/// catches query-string variants. We must NOT use a broad `contains` +/// because sub-paths like `/v1/responses/{id}/items` would be false +/// positives. +fn needs_sse_continuation_buffer(request: Option<&ParsedRequest>) -> bool { + let Some(req) = request else { + return false; + }; + let path = req.path.as_str(); + path.ends_with("/responses") || path.contains("/responses?") } impl Default for HttpConnectionAggregator { @@ -83,6 +115,12 @@ impl HttpConnectionAggregator { pub fn new() -> Self { HttpConnectionAggregator { connections: LruCache::new(NonZeroUsize::new(DEFAULT_CONNECTION_CAPACITY).unwrap()), + sse_continuation_buffers: LruCache::new( + NonZeroUsize::new(DEFAULT_CONNECTION_CAPACITY).unwrap(), + ), + last_appended_src_ptr: LruCache::new( + NonZeroUsize::new(DEFAULT_CONNECTION_CAPACITY).unwrap(), + ), } } @@ -90,6 +128,8 @@ impl HttpConnectionAggregator { pub fn with_capacity(capacity: usize) -> Self { HttpConnectionAggregator { connections: LruCache::new(NonZeroUsize::new(capacity).unwrap()), + sse_continuation_buffers: LruCache::new(NonZeroUsize::new(capacity).unwrap()), + last_appended_src_ptr: LruCache::new(NonZeroUsize::new(capacity).unwrap()), } } @@ -578,7 +618,32 @@ impl HttpConnectionAggregator { } } other => { - // Not in RequestBodyPending / compressed-SSE state, restore and ignore + // Not in RequestBodyPending / compressed-SSE state. If we are + // in an uncompressed SseActive stream targeting the OpenAI + // Responses API, buffer the bytes as a continuation of the + // last SSE event so we can recover token-usage from + // oversized events (e.g. `response.completed`) that span + // multiple TLS records. Other providers fit usage in a + // single small event, so skip the extra copy. + if let ConnectionState::SseActive { + request, + compressed_buffer: None, + .. + } = &other + { + if needs_sse_continuation_buffer(request.as_ref()) { + const MAX_CONTINUATION_BYTES: usize = 1 << 20; // 1 MiB cap + let data = &ssl_event.buf[..ssl_event.buf_size() as usize]; + let buf = self + .sse_continuation_buffers + .get_or_insert_mut(connection_id, Vec::new); + let remaining = MAX_CONTINUATION_BYTES.saturating_sub(buf.len()); + let take = data.len().min(remaining); + if take > 0 { + buf.extend_from_slice(&data[..take]); + } + } + } self.insert(connection_id, other); None } @@ -631,9 +696,39 @@ impl HttpConnectionAggregator { let is_done = sse_event.is_done(); log::trace!( - "[HttpAggregator] SSE event in SseActive | conn={connection_id:?} | is_done={is_done}", + "[HttpAggregator] SSE event in SseActive | conn={connection_id:?} | is_done={is_done} | event={:?} | data_len={}", + sse_event.event, + sse_event.data_len(), ); + // Append the underlying SSL chunk bytes to the continuation + // buffer so that oversized events (whose first chunk arrives + // here with truncated data) can still be reconstructed by + // downstream extractors. Only enable for the OpenAI + // Responses API — other providers emit usage in single + // small events. Dedup by source_event pointer so a single + // SSL_read producing multiple SSE events contributes only + // once. + if needs_sse_continuation_buffer(request.as_ref()) { + const MAX_CONTINUATION_BYTES: usize = 1 << 20; // 1 MiB cap + let src = sse_event.source_event(); + let src_ptr = src as *const _ as usize; + let src_buf_len = src.buf_size() as usize; + let last_ptr = self.last_appended_src_ptr.get(connection_id).copied(); + if last_ptr != Some(src_ptr) && src_buf_len > 0 && src_buf_len <= src.buf.len() + { + let buf = self + .sse_continuation_buffers + .get_or_insert_mut(*connection_id, Vec::new); + let remaining = MAX_CONTINUATION_BYTES.saturating_sub(buf.len()); + let take = src_buf_len.min(remaining); + if take > 0 { + buf.extend_from_slice(&src.buf[..take]); + } + self.last_appended_src_ptr.put(*connection_id, src_ptr); + } + } + // Add SSE event to the list sse_events.push(sse_event); @@ -645,6 +740,9 @@ impl HttpConnectionAggregator { // Build aggregated response with SSE events let mut response = AggregatedResponse::from_parsed(response_headers); response.set_sse_events(sse_events); + response.sse_continuation_bytes = + self.sse_continuation_buffers.pop(connection_id); + self.last_appended_src_ptr.pop(connection_id); // Return appropriate result based on whether request exists if let Some(req) = request { @@ -1933,4 +2031,216 @@ mod tests { other => panic!("expected ResponseOnly, got {other:?}"), } } + + // ── OpenAI Responses API SSE continuation buffer tests ─────────────── + + #[test] + fn test_with_capacity_creates_continuation_buffers() { + let aggregator = HttpConnectionAggregator::with_capacity(4); + assert_eq!(aggregator.active_connections(), 0); + } + + #[test] + fn test_needs_sse_continuation_buffer_none_request() { + assert!(!needs_sse_continuation_buffer(None)); + } + + fn enter_uncompressed_responses_sse_active( + aggregator: &mut HttpConnectionAggregator, + pid: u32, + ssl_ptr: u64, + ) -> ConnectionId { + let req_event = create_mock_ssl_event_with_buf(pid, ssl_ptr, Vec::new(), 1); + let request = ParsedRequest { + method: "POST".to_string(), + path: "/v1/responses".to_string(), + version: 11, + headers: HashMap::new(), + body_offset: 0, + body_len: 0, + source_event: req_event, + reassembled_body: None, + }; + aggregator.process_request(request); + + let resp_event = create_mock_ssl_event_with_buf(pid, ssl_ptr, Vec::new(), 0); + let mut headers = HashMap::new(); + headers.insert("content-type".to_string(), "text/event-stream".to_string()); + let response = ParsedResponse { + version: 11, + status_code: 200, + reason: "OK".to_string(), + headers, + body_offset: 0, + body_len: 0, + source_event: resp_event, + }; + aggregator.process_response(response); + ConnectionId { pid, ssl_ptr } + } + + #[test] + fn test_sse_continuation_buffer_in_process_raw_body_data() { + let mut aggregator = HttpConnectionAggregator::new(); + let conn_id = enter_uncompressed_responses_sse_active(&mut aggregator, 20, 0xA000); + assert!(aggregator.is_sse_active(&conn_id)); + + let chunk = b"event:response.completed\ndata:{\"usage\":{\"input_tokens\":57"; + let raw = create_mock_ssl_event_with_buf(20, 0xA000, chunk.to_vec(), 0); + let result = aggregator.process_raw_body_data(&raw); + assert!( + result.is_none(), + "raw body data on SseActive should keep buffering" + ); + + // Complete the stream and inspect the continuation buffer captured. + let done_event = + create_mock_ssl_event_with_buf(20, 0xA000, b"data: [DONE]\n\n".to_vec(), 0); + let done = ParsedSseEvent::new(None, None, None, 6, 6, done_event); + let result = aggregator.process_sse_event(&conn_id, done); + let pair = match result { + Some(AggregatedResult::SseComplete(pair)) => pair, + other => panic!("expected SseComplete, got {other:?}"), + }; + let buf = pair + .response + .sse_continuation_bytes + .expect("continuation buffer should be present"); + assert!(buf.windows(chunk.len()).any(|w| w == chunk)); + } + + #[test] + fn test_sse_continuation_buffer_in_process_sse_event() { + let mut aggregator = HttpConnectionAggregator::new(); + let conn_id = enter_uncompressed_responses_sse_active(&mut aggregator, 21, 0xB000); + + // Build an SSE event whose underlying SSL buffer carries extra bytes. + let payload = b"data: {\"type\":\"response.output_text.delta\",\"delta\":\"hi\"}"; + let src_event = create_mock_ssl_event_with_buf(21, 0xB000, payload.to_vec(), 0); + let event = ParsedSseEvent::new(None, None, None, 0, payload.len(), src_event); + let result = aggregator.process_sse_event(&conn_id, event); + assert!( + result.is_none(), + "non-terminal SSE event should keep streaming" + ); + + // Complete the stream and verify the source buffer was appended. + let done_event = + create_mock_ssl_event_with_buf(21, 0xB000, b"data: [DONE]\n\n".to_vec(), 0); + let done = ParsedSseEvent::new(None, None, None, 6, 6, done_event); + let result = aggregator.process_sse_event(&conn_id, done); + let pair = match result { + Some(AggregatedResult::SseComplete(pair)) => pair, + other => panic!("expected SseComplete, got {other:?}"), + }; + let buf = pair + .response + .sse_continuation_bytes + .expect("continuation buffer should be populated"); + assert!(buf.windows(payload.len()).any(|w| w == payload)); + } + + // ── Boundary tests requested in PR review (#8) ────────────────────── + + #[test] + fn test_needs_sse_continuation_buffer_non_responses_path() { + // Negative: chat completions and sub-paths must not trigger buffering. + let event = create_mock_ssl_event_with_buf(1, 1, Vec::new(), 1); + let make_req = |path: &str| ParsedRequest { + method: "POST".to_string(), + path: path.to_string(), + version: 11, + headers: HashMap::new(), + body_offset: 0, + body_len: 0, + source_event: event.clone(), + reassembled_body: None, + }; + assert!(!needs_sse_continuation_buffer(Some(&make_req( + "/v1/chat/completions" + )))); + assert!(!needs_sse_continuation_buffer(Some(&make_req( + "/v1/responses/abc/items" + )))); + // Positive: exact and query-string variants should match. + assert!(needs_sse_continuation_buffer(Some(&make_req( + "/v1/responses" + )))); + assert!(needs_sse_continuation_buffer(Some(&make_req( + "/compatible-mode/v1/responses" + )))); + assert!(needs_sse_continuation_buffer(Some(&make_req( + "/v1/responses?stream=true" + )))); + } + + #[test] + fn test_sse_continuation_buffer_max_bytes_truncation() { + // Feed > 1 MiB through process_raw_body_data and verify the buffer + // is capped at MAX_CONTINUATION_BYTES (1 << 20). + let mut aggregator = HttpConnectionAggregator::new(); + let conn_id = enter_uncompressed_responses_sse_active(&mut aggregator, 30, 0xC000); + + const MAX_CAP: usize = 1 << 20; // 1 MiB + let oversized: Vec = vec![b'x'; MAX_CAP + 4096]; + let raw = create_mock_ssl_event_with_buf(30, 0xC000, oversized.clone(), 0); + let _ = aggregator.process_raw_body_data(&raw); + + // Complete the stream. + let done_event = + create_mock_ssl_event_with_buf(30, 0xC000, b"data: [DONE]\n\n".to_vec(), 0); + let done = ParsedSseEvent::new(None, None, None, 6, 6, done_event); + let result = aggregator.process_sse_event(&conn_id, done); + let pair = match result { + Some(AggregatedResult::SseComplete(pair)) => pair, + other => panic!("expected SseComplete, got {other:?}"), + }; + let buf = pair + .response + .sse_continuation_bytes + .expect("continuation buffer should exist"); + assert_eq!( + buf.len(), + MAX_CAP, + "continuation buffer must be capped at 1 MiB" + ); + } + + #[test] + fn test_sse_continuation_buffer_dedup_same_src_ptr() { + // Two SSE events sharing the same source SslEvent (same Rc pointer) + // must contribute to the continuation buffer only once. + let mut aggregator = HttpConnectionAggregator::new(); + let conn_id = enter_uncompressed_responses_sse_active(&mut aggregator, 31, 0xD000); + + let payload = b"data: {\"type\":\"response.output_text.delta\",\"delta\":\"x\"}"; + // Both ParsedSseEvents reference the *same* Rc. + let src_event = create_mock_ssl_event_with_buf(31, 0xD000, payload.to_vec(), 0); + let event1 = ParsedSseEvent::new(None, None, None, 0, payload.len(), src_event.clone()); + let _ = aggregator.process_sse_event(&conn_id, event1); + + let event2 = ParsedSseEvent::new(None, None, None, 0, payload.len(), src_event); + let _ = aggregator.process_sse_event(&conn_id, event2); + + // Complete the stream. + let done_event = + create_mock_ssl_event_with_buf(31, 0xD000, b"data: [DONE]\n\n".to_vec(), 0); + let done = ParsedSseEvent::new(None, None, None, 6, 6, done_event); + let result = aggregator.process_sse_event(&conn_id, done); + let pair = match result { + Some(AggregatedResult::SseComplete(pair)) => pair, + other => panic!("expected SseComplete, got {other:?}"), + }; + let buf = pair + .response + .sse_continuation_bytes + .expect("continuation buffer should exist"); + // Count occurrences of the payload — dedup means it should appear + // at most once. + let count = buf.windows(payload.len()).filter(|w| *w == payload).count(); + assert_eq!( + count, 1, + "same-source SSE events must contribute to the buffer only once" + ); + } } diff --git a/src/agentsight/src/aggregator/http/response.rs b/src/agentsight/src/aggregator/http/response.rs index 567c6a648..20d67bdd8 100644 --- a/src/agentsight/src/aggregator/http/response.rs +++ b/src/agentsight/src/aggregator/http/response.rs @@ -15,6 +15,12 @@ pub struct AggregatedResponse { pub parsed: ParsedResponse, /// SSE events collected during streaming (if is_sse is true) pub sse_events: Vec, + /// Raw bytes that arrived as RawData while in SseActive state. These are + /// continuation chunks of an oversized SSE event (e.g. OpenAI Responses + /// API's `response.completed` echoing the full system prompt + tools) + /// whose first chunk parsed as a (truncated) SseEvent. Used by downstream + /// extractors to recover token usage embedded past the truncation point. + pub sse_continuation_bytes: Option>, } impl AggregatedResponse { @@ -23,6 +29,7 @@ impl AggregatedResponse { AggregatedResponse { parsed, sse_events: Vec::new(), + sse_continuation_bytes: None, } } diff --git a/src/agentsight/src/analyzer/token/extractor/openai.rs b/src/agentsight/src/analyzer/token/extractor/openai.rs index 2d93fc05e..7cab8829f 100644 --- a/src/agentsight/src/analyzer/token/extractor/openai.rs +++ b/src/agentsight/src/analyzer/token/extractor/openai.rs @@ -71,49 +71,50 @@ pub fn extract_response_content( response_json: Option<&Value>, ) -> Option<(String, Option, Vec)> { let resp = response_json?; - let choices = resp.get("choices").and_then(|c| c.as_array())?; let mut content = String::new(); let mut reasoning = None; let mut tool_calls = Vec::new(); let mut has_data = false; - for choice in choices { - // Support both "message" (standard response) and "delta" (SSE streaming) formats - let msg_or_delta = choice.get("message").or_else(|| choice.get("delta")); - - if let Some(msg) = msg_or_delta { - // Extract content - if let Some(c) = msg.get("content").and_then(|c| c.as_str()) { - if !c.is_empty() { - content.push_str(c); - has_data = true; + if let Some(choices) = resp.get("choices").and_then(|c| c.as_array()) { + for choice in choices { + // Support both "message" (standard response) and "delta" (SSE streaming) formats + let msg_or_delta = choice.get("message").or_else(|| choice.get("delta")); + + if let Some(msg) = msg_or_delta { + // Extract content + if let Some(c) = msg.get("content").and_then(|c| c.as_str()) { + if !c.is_empty() { + content.push_str(c); + has_data = true; + } } - } - // Extract reasoning_content - if let Some(r) = msg.get("reasoning_content").and_then(|r| r.as_str()) { - if !r.is_empty() { - // For SSE chunks, accumulate reasoning content - reasoning = match reasoning { - Some(existing) => Some(existing + r), - None => Some(r.to_string()), - }; - has_data = true; + // Extract reasoning_content + if let Some(r) = msg.get("reasoning_content").and_then(|r| r.as_str()) { + if !r.is_empty() { + // For SSE chunks, accumulate reasoning content + reasoning = match reasoning { + Some(existing) => Some(existing + r), + None => Some(r.to_string()), + }; + has_data = true; + } } - } - // Extract tool_calls - only extract function name and arguments - if let Some(calls) = msg.get("tool_calls").and_then(|t| t.as_array()) { - for tool_call in calls { - if let Some(func) = tool_call.get("function") { - let name = func.get("name").and_then(|n| n.as_str()).unwrap_or(""); - let arguments = - func.get("arguments").and_then(|a| a.as_str()).unwrap_or(""); - let tool_content = format!("{name}: {arguments}"); - if !tool_content.is_empty() { - tool_calls.push(tool_content); - has_data = true; + // Extract tool_calls - only extract function name and arguments + if let Some(calls) = msg.get("tool_calls").and_then(|t| t.as_array()) { + for tool_call in calls { + if let Some(func) = tool_call.get("function") { + let name = func.get("name").and_then(|n| n.as_str()).unwrap_or(""); + let arguments = + func.get("arguments").and_then(|a| a.as_str()).unwrap_or(""); + let tool_content = format!("{name}: {arguments}"); + if !tool_content.is_empty() { + tool_calls.push(tool_content); + has_data = true; + } } } } @@ -122,10 +123,59 @@ pub fn extract_response_content( } if has_data { - Some((content, reasoning, tool_calls)) - } else { - None + return Some((content, reasoning, tool_calls)); + } + + // OpenAI Responses API SSE chunks have a different shape — top-level + // "type" tags such as `response.output_text.delta` carry text in + // `delta` / `text`, while `response.output_item.done` embeds the + // assistant message under `item.content[].text`. Extract text from + // the kinds that contribute to assistant output tokens. + if let Some(ty) = resp.get("type").and_then(|t| t.as_str()) { + match ty { + "response.output_text.delta" => { + if let Some(d) = resp.get("delta").and_then(|d| d.as_str()) { + if !d.is_empty() { + return Some((d.to_string(), None, Vec::new())); + } + } + } + "response.output_text.done" => { + if let Some(t) = resp.get("text").and_then(|t| t.as_str()) { + if !t.is_empty() { + return Some((t.to_string(), None, Vec::new())); + } + } + } + "response.reasoning_text.delta" | "response.reasoning_summary_text.delta" => { + if let Some(d) = resp.get("delta").and_then(|d| d.as_str()) { + if !d.is_empty() { + return Some((String::new(), Some(d.to_string()), Vec::new())); + } + } + } + "response.output_item.done" => { + if let Some(item_content) = resp + .get("item") + .and_then(|i| i.get("content")) + .and_then(|c| c.as_array()) + { + let mut text = String::new(); + for part in item_content { + if let Some(t) = part.get("text").and_then(|t| t.as_str()) { + text.push_str(t); + } + } + if !text.is_empty() { + return Some((text, None, Vec::new())); + } + } + } + _ => {} + } } + + None } /// Extract role and content from OpenAI message JSON @@ -285,4 +335,168 @@ mod tests { let result = extract_token_data(Some(&request), None); assert!(result.is_none()); } + + #[test] + fn test_responses_api_output_text_delta() { + let chunk = serde_json::json!({ + "type": "response.output_text.delta", + "delta": "hello world", + }); + let (content, reasoning, tools) = + extract_response_content(Some(&chunk)).expect("should extract delta text"); + assert_eq!(content, "hello world"); + assert!(reasoning.is_none()); + assert!(tools.is_empty()); + } + + #[test] + fn test_responses_api_output_text_done() { + let chunk = serde_json::json!({ + "type": "response.output_text.done", + "text": "complete output text", + }); + let (content, _, _) = + extract_response_content(Some(&chunk)).expect("should extract final text"); + assert_eq!(content, "complete output text"); + } + + #[test] + fn test_responses_api_output_item_done() { + let chunk = serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "content": [ + {"type": "output_text", "text": "first"}, + {"type": "output_text", "text": " second"}, + ], + }, + }); + let (content, _, _) = + extract_response_content(Some(&chunk)).expect("should extract item content"); + assert_eq!(content, "first second"); + } + + #[test] + fn test_responses_api_unknown_type_returns_none() { + let chunk = serde_json::json!({ + "type": "response.created", + "response": {"id": "abc"}, + }); + assert!(extract_response_content(Some(&chunk)).is_none()); + } + + #[test] + fn test_responses_api_reasoning_text_delta() { + let chunk = serde_json::json!({ + "type": "response.reasoning_text.delta", + "delta": "thinking...", + }); + let (content, reasoning, tools) = + extract_response_content(Some(&chunk)).expect("should extract reasoning"); + assert!(content.is_empty()); + assert_eq!(reasoning, Some("thinking...".to_string())); + assert!(tools.is_empty()); + } + + #[test] + fn test_responses_api_reasoning_summary_text_delta() { + let chunk = serde_json::json!({ + "type": "response.reasoning_summary_text.delta", + "delta": "summary...", + }); + let (content, reasoning, tools) = + extract_response_content(Some(&chunk)).expect("should extract reasoning summary"); + assert!(content.is_empty()); + assert_eq!(reasoning, Some("summary...".to_string())); + assert!(tools.is_empty()); + } + + #[test] + fn test_responses_api_output_text_delta_empty() { + let chunk = serde_json::json!({ + "type": "response.output_text.delta", + "delta": "", + }); + assert!(extract_response_content(Some(&chunk)).is_none()); + } + + #[test] + fn test_responses_api_output_text_done_empty() { + let chunk = serde_json::json!({ + "type": "response.output_text.done", + "text": "", + }); + assert!(extract_response_content(Some(&chunk)).is_none()); + } + + #[test] + fn test_responses_api_output_item_done_empty() { + let chunk = serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "content": [], + }, + }); + assert!(extract_response_content(Some(&chunk)).is_none()); + } + + #[test] + fn test_extract_response_with_tool_calls() { + let response = serde_json::json!({ + "model": "gpt-4", + "choices": [{ + "message": { + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": "tc_1", + "function": {"name": "search", "arguments": "{\"q\":\"rust\"}"} + }] + } + }] + }); + let token_data = extract_token_data(None, Some(&response)).unwrap(); + assert_eq!(token_data.tool_calls.len(), 1); + assert!(token_data.tool_calls[0].contains("search")); + } + + #[test] + fn test_extract_sse_reasoning_content_accumulation() { + let chunk = serde_json::json!({ + "model": "qwen", + "choices": [ + {"delta": {"content": "a", "reasoning_content": "think1"}}, + {"delta": {"content": "b", "reasoning_content": "think2"}} + ] + }); + let (content, reasoning, _) = + extract_response_content(Some(&chunk)).expect("should extract"); + assert_eq!(content, "ab"); + assert_eq!(reasoning, Some("think1think2".to_string())); + } + + #[test] + fn test_extract_tool_calls_skips_missing_function() { + let response = serde_json::json!({ + "model": "gpt-4", + "choices": [{ + "message": { + "role": "assistant", + "content": "answer", + "tool_calls": [{"id": "tc_1"}] + } + }] + }); + let (content, _, tool_calls) = + extract_response_content(Some(&response)).expect("should extract content"); + assert_eq!(content, "answer"); + assert!( + tool_calls.is_empty(), + "tool_call without function should be skipped" + ); + } } diff --git a/src/agentsight/src/analyzer/token/parser.rs b/src/agentsight/src/analyzer/token/parser.rs index ee46b135e..fea0313a5 100644 --- a/src/agentsight/src/analyzer/token/parser.rs +++ b/src/agentsight/src/analyzer/token/parser.rs @@ -55,10 +55,71 @@ impl TokenParser { return None; } - // Parse as JSON - let json: serde_json::Value = serde_json::from_str(data).ok()?; - self.parse_json(&json).inspect(|_usage| { - log::debug!("token usage parsed from data: {data}"); + // Try strict JSON parse first + if let Ok(json) = serde_json::from_str::(data) { + return self.parse_json(&json).inspect(|_usage| { + log::debug!("token usage parsed from data: {data}"); + }); + } + + // Fallback: OpenAI Responses API embeds usage in a final + // `response.completed` event whose payload (instructions + tools + + // output) routinely exceeds a single TLS record. We may be looking + // at a concatenation of SSE chunks that together don't form a + // single valid JSON object. Recover input/output token counts via + // a regex-free string scan when the buffer references usage fields. + if data.contains("\"input_tokens\"") + || data.contains("\"output_tokens\"") + || data.contains("\"prompt_tokens\"") + || data.contains("\"completion_tokens\"") + { + let usage = Self::scan_partial_usage(data); + if usage.is_some() { + log::debug!("token usage recovered from continuation buffer"); + } + return usage; + } + + None + } + + /// Recover token usage fields from a possibly-truncated JSON string by + /// scanning for the integer values that follow `"input_tokens"`, + /// `"output_tokens"`, etc. The first occurrence wins, which matches + /// dashscope's behaviour of placing the canonical `usage` block before + /// any `x_details` echo. + fn scan_partial_usage(data: &str) -> Option { + fn find_u64(s: &str, key: &str) -> Option { + let pat = format!("\"{key}\""); + let mut idx = s.find(&pat)?; + idx += pat.len(); + let rest = s.get(idx..)?; + let rest = rest.trim_start(); + let rest = rest.strip_prefix(':')?.trim_start(); + let end = rest + .find(|c: char| !c.is_ascii_digit()) + .unwrap_or(rest.len()); + if end == 0 { + return None; + } + rest[..end].parse::().ok() + } + + let input = find_u64(data, "input_tokens").or_else(|| find_u64(data, "prompt_tokens")); + let output = + find_u64(data, "output_tokens").or_else(|| find_u64(data, "completion_tokens")); + if input.is_none() && output.is_none() { + return None; + } + + Some(TokenUsage { + input_tokens: input.unwrap_or(0), + output_tokens: output.unwrap_or(0), + cache_creation_input_tokens: find_u64(data, "cache_creation_input_tokens"), + cache_read_input_tokens: find_u64(data, "cache_read_input_tokens") + .or_else(|| find_u64(data, "cached_tokens")), + model: None, + provider: LLMProvider::OpenAI, }) } @@ -263,4 +324,75 @@ mod tests { assert_eq!(usage.input_tokens, 10); assert_eq!(usage.output_tokens, 5); } + + #[test] + fn test_scan_partial_usage_dashscope_response_completed() { + // Real dashscope `/compatible-mode/v1/responses` `response.completed` + // payload, captured with curl. The strict JSON path should succeed + // for this complete buffer, so this just confirms canonical fields. + let data = r#"{"sequence_number":10,"type":"response.completed","response":{"top_logprobs":0,"instructions":"You are a helpful assistant.","metadata":{},"usage":{"total_tokens":60,"input_tokens_details":{"cached_tokens":0},"output_tokens":3,"input_tokens":57,"output_tokens_details":{"reasoning_tokens":0},"x_details":[{"total_tokens":60,"x_billing_type":"response_api","output_tokens":3,"input_tokens":57,"prompt_tokens_details":{"cached_tokens":0}}]},"created_at":1782287513,"model":"qwen3-coder-plus"}}"#; + let parser = TokenParser::new(); + let usage = parser.parse_data(data).expect("usage should parse"); + assert_eq!(usage.input_tokens, 57); + assert_eq!(usage.output_tokens, 3); + } + + #[test] + fn test_scan_partial_usage_truncated_buffer() { + // Simulate a continuation buffer where only the leading bytes around + // the `usage` block survived; trailing braces / brackets are missing + // and strict JSON parsing fails. The fallback path should still + // recover the integer values. + let data = r#"event:response.completed +data:{"sequence_number":10,"type":"response.completed","response":{"usage":{"total_tokens":60,"input_tokens_details":{"cached_tokens":2},"output_tokens":3,"input_tokens":57"#; + let parser = TokenParser::new(); + let usage = parser + .parse_data(data) + .expect("partial usage should still parse"); + assert_eq!(usage.input_tokens, 57); + assert_eq!(usage.output_tokens, 3); + assert_eq!(usage.cache_read_input_tokens, Some(2)); + } + + #[test] + fn test_scan_partial_usage_returns_none_when_no_tokens() { + let data = "event:response.output_text.delta\ndata:{\"delta\":\"hi\"}"; + let parser = TokenParser::new(); + assert!(parser.parse_data(data).is_none()); + } + + #[test] + fn test_scan_partial_usage_only_input() { + // Truncated JSON forces the regex-free string-scan fallback. + let data = r#"{"input_tokens": 42"#; + let parser = TokenParser::new(); + let usage = parser.parse_data(data).expect("should parse input only"); + assert_eq!(usage.input_tokens, 42); + assert_eq!(usage.output_tokens, 0); + } + + #[test] + fn test_scan_partial_usage_only_output() { + let data = r#"{"output_tokens": 7"#; + let parser = TokenParser::new(); + let usage = parser.parse_data(data).expect("should parse output only"); + assert_eq!(usage.input_tokens, 0); + assert_eq!(usage.output_tokens, 7); + } + + #[test] + fn test_scan_partial_usage_prompt_completion_aliases() { + let data = r#"{"prompt_tokens": 10, "completion_tokens": 5"#; + let parser = TokenParser::new(); + let usage = parser.parse_data(data).expect("should parse aliases"); + assert_eq!(usage.input_tokens, 10); + assert_eq!(usage.output_tokens, 5); + } + + #[test] + fn test_scan_partial_usage_invalid_value_none() { + let data = r#"{"input_tokens": "not a number"}"#; + let parser = TokenParser::new(); + assert!(parser.parse_data(data).is_none()); + } } diff --git a/src/agentsight/src/analyzer/unified.rs b/src/agentsight/src/analyzer/unified.rs index ca7660a37..9e406b032 100644 --- a/src/agentsight/src/analyzer/unified.rs +++ b/src/agentsight/src/analyzer/unified.rs @@ -22,7 +22,7 @@ use crate::aggregator::AggregatedResult; use crate::analyzer::token::extract_response_content; -use crate::parser::sse::ParsedSseEvent; +use crate::parser::sse::{ParsedSseEvent, SSEParser}; use crate::tokenizer::LlmTokenizer; use crate::tokenizer::get_global_tokenizer; @@ -450,12 +450,22 @@ impl Analyzer { AggregatedResult::SseComplete(pair) => { let pid = pair.request.source_event.pid; let comm = pair.request.source_event.comm_str(); - self.extract_token_from_sse(&pair.response.sse_events, pid, &comm) + self.extract_token_from_sse( + &pair.response.sse_events, + pair.response.sse_continuation_bytes.as_deref(), + pid, + &comm, + ) } AggregatedResult::ResponseOnly { response, .. } if !response.sse_events.is_empty() => { let pid = response.pid(); let comm = response.parsed.source_event.comm_str(); - self.extract_token_from_sse(&response.sse_events, pid, &comm) + self.extract_token_from_sse( + &response.sse_events, + response.sse_continuation_bytes.as_deref(), + pid, + &comm, + ) } AggregatedResult::HttpComplete(pair) => { let pid = pair.request.source_event.pid; @@ -592,13 +602,44 @@ impl Analyzer { fn extract_token_from_sse( &self, sse_events: &[ParsedSseEvent], + continuation_bytes: Option<&[u8]>, pid: u32, comm: &str, ) -> Option { let usage = sse_events .iter() .rev() - .find_map(|e| self.token.parse_event(e))?; + .find_map(|e| self.token.parse_event(e)) + .or_else(|| { + // Fallback: OpenAI Responses API embeds usage in a final + // `response.completed` event whose `data:` field routinely + // exceeds a single TLS record. The aggregator buffers the + // raw continuation bytes; re-parse them with the legacy + // SSEParser (which concatenates multi-line data fields) + // and walk events in reverse so the canonical usage event + // wins. If reassembled events still don't yield usage, + // fall back to a partial-scan over the raw buffer text. + let extra = continuation_bytes?; + let text = String::from_utf8_lossy(extra); + let reassembled = SSEParser::parse_stream(&text); + let from_events = reassembled + .events + .iter() + .rev() + .find_map(|e| self.token.parse_data(&e.data)); + if from_events.is_some() { + return from_events; + } + let from_scan = self.token.parse_data(&text); + if from_scan.is_none() { + log::debug!( + "[extract_token_from_sse] continuation buffer scan miss: len={} reassembled_events={}", + extra.len(), + reassembled.events.len(), + ); + } + from_scan + })?; let record = TokenRecord::new( pid, @@ -717,15 +758,37 @@ impl Analyzer { "openai" }; - // Count input tokens from request messages using chat template - let input_tokens = if let Some(messages) = - request_json_ref.get("messages").and_then(|m| m.as_array()) - { + // Count input tokens from request messages using chat template. + // Supports both OpenAI chat completions format (top-level "messages") + // and Responses API format (top-level "input" + "instructions"). + let messages_owned: Option> = request_json_ref + .get("messages") + .and_then(|m| m.as_array()) + .cloned() + .or_else(|| { + let input = request_json_ref.get("input").and_then(|m| m.as_array())?; + let mut combined = Vec::new(); + if let Some(instr) = request_json_ref + .get("instructions") + .and_then(|s| s.as_str()) + { + if !instr.is_empty() { + combined.push(serde_json::json!({ + "role": "system", + "content": instr, + })); + } + } + combined.extend(input.iter().cloned()); + Some(combined) + }); + + let input_tokens = if let Some(messages) = messages_owned { if messages.is_empty() { 0 } else { // Clone messages for in-place modification of tool_calls.arguments - let mut msgs = messages.clone(); + let mut msgs = messages; // Process tool_calls arguments: parse JSON string to object in place for msg in msgs.iter_mut() { @@ -1195,6 +1258,26 @@ impl Analyzer { #[cfg(test)] mod tests { use super::*; + use crate::probes::sslsniff::SslEvent; + use std::rc::Rc; + + fn create_test_event(data: &str) -> ParsedSseEvent { + let ssl_event = Rc::new(SslEvent { + source: 0, + timestamp_ns: 1234567890, + delta_ns: 0, + pid: 1234, + tid: 5678, + uid: 0, + len: data.len() as u32, + rw: 0, + comm: String::new(), + buf: data.as_bytes().to_vec(), + is_handshake: false, + ssl_ptr: 0, + }); + ParsedSseEvent::new(None, None, None, 0, data.len(), ssl_event) + } #[test] fn test_extract_token_from_json_body_openai() { @@ -1366,4 +1449,109 @@ mod tests { .is_none() ); } + + #[test] + fn test_extract_token_from_sse_continuation_buffer() { + let analyzer = Analyzer::new(); + let events = vec![create_test_event( + "data: {\"type\":\"response.output_text.delta\"}", + )]; + let continuation = br#"event:response.completed +data:{"usage":{"input_tokens":57,"output_tokens":3}}"#; + let record = analyzer + .extract_token_from_sse(&events, Some(continuation), 1234, "test") + .expect("should recover from continuation buffer"); + assert_eq!(record.input_tokens, 57); + assert_eq!(record.output_tokens, 3); + } + + #[test] + fn test_extract_token_from_sse_events_only_no_usage() { + let analyzer = Analyzer::new(); + let events = vec![create_test_event( + "data: {\"type\":\"response.output_text.delta\"}", + )]; + assert!( + analyzer + .extract_token_from_sse(&events, None, 1234, "test") + .is_none() + ); + } + + #[test] + fn test_extract_token_from_sse_continuation_buffer_scan_fallback() { + let analyzer = Analyzer::new(); + let events = vec![create_test_event( + "data: {\"type\":\"response.output_text.delta\"}", + )]; + // Truncated JSON that is neither valid JSON nor well-formed SSE, but + // still carries token fields. This exercises the regex-free scan + // fallback (lines 633-642). + let continuation = b"{\"input_tokens\":57,\"output_tokens\":3"; + let record = analyzer + .extract_token_from_sse(&events, Some(continuation), 1234, "test") + .expect("should recover via partial scan"); + assert_eq!(record.input_tokens, 57); + assert_eq!(record.output_tokens, 3); + } + + #[test] + fn test_analyze_aggregated_response_only_with_sse() { + use crate::aggregator::{AggregatedResponse, ConnectionId}; + use crate::parser::http::ParsedResponse; + + let analyzer = Analyzer::new(); + + let resp_buf = b"HTTP/1.1 200 OK\r\nContent-Type: text/event-stream\r\n\r\n"; + let ssl_event = Rc::new(SslEvent { + source: 0, + timestamp_ns: 1_000_000_000, + delta_ns: 0, + pid: 3000, + tid: 3000, + uid: 0, + len: resp_buf.len() as u32, + rw: 0, + comm: "python3".to_string(), + buf: resp_buf.to_vec(), + is_handshake: false, + ssl_ptr: 0x9000, + }); + + let parsed_response = ParsedResponse { + version: 11, + status_code: 200, + reason: "OK".to_string(), + headers: std::collections::HashMap::new(), + body_offset: 0, + body_len: 0, + source_event: ssl_event, + }; + + let mut aggregated_response = AggregatedResponse::from_parsed(parsed_response); + let usage_event = + create_test_event("data: {\"usage\":{\"prompt_tokens\":42,\"completion_tokens\":7}}"); + aggregated_response.set_sse_events(vec![usage_event]); + + let result = AggregatedResult::ResponseOnly { + connection_id: ConnectionId { + pid: 3000, + ssl_ptr: 0x9000, + }, + response: aggregated_response, + }; + + let results = analyzer.analyze_aggregated(&result); + let token_result = results + .iter() + .find(|r| matches!(r, AnalysisResult::Token(_))); + assert!( + token_result.is_some(), + "ResponseOnly with SSE usage should produce a TokenRecord" + ); + if let Some(AnalysisResult::Token(record)) = token_result { + assert_eq!(record.input_tokens, 42); + assert_eq!(record.output_tokens, 7); + } + } } diff --git a/src/agentsight/src/bin/cli/audit.rs b/src/agentsight/src/bin/cli/audit.rs index b65bad063..b067f232b 100644 --- a/src/agentsight/src/bin/cli/audit.rs +++ b/src/agentsight/src/bin/cli/audit.rs @@ -222,6 +222,7 @@ mod tests { args: Some(args.into()), exit_code: None, }, + session_id: None, } } diff --git a/src/agentsight/src/bpf/filewrite.bpf.c b/src/agentsight/src/bpf/filewrite.bpf.c index ce609ca77..70ddc3016 100644 --- a/src/agentsight/src/bpf/filewrite.bpf.c +++ b/src/agentsight/src/bpf/filewrite.bpf.c @@ -64,23 +64,41 @@ int BPF_PROG(trace_vfs_write, struct file *file, const char *buf, size_t count, if (ret <= 0) return 0; - // ret includes the null terminator, so string length = ret - 1 - // Filename must be exactly ".jsonl" = 42 chars - int slen = ret - 1; - if (slen != UUID_JSONL_LEN) - return 0; - - // Compare last 6 characters against ".jsonl" - if (fname[UUID_LEN] != '.' || - fname[UUID_LEN + 1] != 'j' || - fname[UUID_LEN + 2] != 's' || - fname[UUID_LEN + 3] != 'o' || - fname[UUID_LEN + 4] != 'n' || - fname[UUID_LEN + 5] != 'l') - return 0; - - // Validate UUID portion of filename - if (!is_uuid(fname)) + // We accept two filename shapes; both checks use only compile-time fixed + // offsets, so the eBPF verifier doesn't have to track dynamic length math. + // 1. `.jsonl` (exactly 42 chars + NUL = 43) + // OpenClaw / Cosh / Claude Code + // 2. `rollout-<...>-.jsonl` + // Codex CLI rollouts; userspace + // extracts the trailing UUID via + // ResponseSessionMapper. + // + // The rollout check is intentionally a *prefix* match (`rollout-`) + // without verifying the `.jsonl` suffix: the eBPF verifier rejects + // variable-length suffix checks, and the userspace + // `extract_session_id` performs precise filtering. The extra ring + // buffer events from non-jsonl `rollout-*` files are negligible + // because such files are rare outside Codex's own session directory. + int matched_strict = (ret == UUID_JSONL_LEN + 1) + && fname[UUID_LEN] == '.' + && fname[UUID_LEN + 1] == 'j' + && fname[UUID_LEN + 2] == 's' + && fname[UUID_LEN + 3] == 'o' + && fname[UUID_LEN + 4] == 'n' + && fname[UUID_LEN + 5] == 'l' + && is_uuid(fname); + + int matched_rollout = (ret >= 9) + && fname[0] == 'r' + && fname[1] == 'o' + && fname[2] == 'l' + && fname[3] == 'l' + && fname[4] == 'o' + && fname[5] == 'u' + && fname[6] == 't' + && fname[7] == '-'; + + if (!matched_strict && !matched_rollout) return 0; // Reserve space in ring buffer @@ -91,7 +109,12 @@ int BPF_PROG(trace_vfs_write, struct file *file, const char *buf, size_t count, // Fill metadata event->source = EVENT_SOURCE_FILEWRITE; event->timestamp_ns = bpf_ktime_get_ns(); - event->pid = current_ns_pid(); + // Use the tgid-level ns pid so this event correlates with sslsniff + // (which also records the process-group pid). `pid_tgid >> 32` is the + // host tgid; the gate above already returned the corresponding ns pid + // when the lookup hit the traced map, but we want a consistent value + // for cross-probe correlation with the HTTP/SSE pipeline. + event->pid = pid; event->tid = (u32)pid_tgid; event->uid = bpf_get_current_uid_gid(); event->write_size = (u32)count; diff --git a/src/agentsight/src/discovery/scanner.rs b/src/agentsight/src/discovery/scanner.rs index 23bb72cad..95b108f38 100644 --- a/src/agentsight/src/discovery/scanner.rs +++ b/src/agentsight/src/discovery/scanner.rs @@ -224,7 +224,7 @@ impl AgentScanner { } /// Attempt to match a process against known agents - fn try_match_process(&self, pid: u32) -> Option { + pub fn try_match_process(&self, pid: u32) -> Option { let proc_dir = format!("/proc/{pid}"); // Read process name from /proc/[pid]/comm @@ -380,4 +380,12 @@ mod tests { assert!(scanner.is_denied(&["deny-me-process".to_string()])); assert!(!scanner.is_denied(&["node".to_string(), "/path/claude-code".to_string()])); } + + #[test] + fn test_try_match_process_current() { + let scanner = AgentScanner::from_rules(&crate::config::default_cmdline_rules(), &[]); + // The current test process should not match any agent rule. + let result = scanner.try_match_process(std::process::id()); + assert!(result.is_none()); + } } diff --git a/src/agentsight/src/genai/builder.rs b/src/agentsight/src/genai/builder.rs index 9f7d7b3a8..121e2a083 100644 --- a/src/agentsight/src/genai/builder.rs +++ b/src/agentsight/src/genai/builder.rs @@ -111,12 +111,17 @@ impl GenAIBuilder { }; // Determine response_id from call metadata (may come from parsed_message - // or SSE body fallback), and check if mapper resolved it. + // or SSE body fallback), and check if mapper resolved it (either via + // response_id mapping, or via pid → session fallback for agents like + // Codex CLI whose rollout file does not embed a response_id). let response_id = llm_call.metadata.get("response_id").cloned(); let mapper_hit = response_id .as_deref() .and_then(|rid| response_mapper.get_session_by_response_id(rid)) - .is_some(); + .is_some() + || response_mapper + .get_session_by_pid(llm_call.pid as u32) + .is_some(); // If response_id exists but mapper didn't resolve session_id, queue // for deferred resolution so the next FileWrite event can fix it. @@ -216,91 +221,60 @@ impl GenAIBuilder { // 重新计算并 UPDATE 正常 ID,只有 crash 路径才会保留这里写入的 // peek/fallback 值。 let (user_query, input_messages, system_instructions, first_user_text, last_user_text) = - if let Some(ref v) = body { - if let Some(messages) = v.get("messages").and_then(|m| m.as_array()) { - // Helper: extract text from "content" which can be either - // a plain string or an array of content blocks: - // "content": "text" - // "content": [{"type":"text","text":"..."},...] - let extract_text = |m: &serde_json::Value| -> Option { - let c = m.get("content")?; - if let Some(s) = c.as_str() { - if !s.is_empty() { - return Some(s.to_string()); - } - } - if let Some(arr) = c.as_array() { - let text: String = arr - .iter() - .filter_map(|item| { - // [{"type":"text","text":"..."}] - if item.get("type").and_then(|t| t.as_str()) == Some("text") { - item.get("text").and_then(|t| t.as_str()) - } else { - None - } - }) - .collect::>() - .join("\n"); - if !text.is_empty() { - return Some(text); - } - } - None - }; - - // First user message raw text — used as `session_key` material - // for IdResolver peek / crash fallback. - let first_user_text = messages - .iter() - .filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("user")) - .find_map(&extract_text) - .unwrap_or_default(); - - // Last user message raw text — used for user_query (display text) - // 以及 conversation_key (peek / crash fallback)。 - let last_user_raw = messages - .iter() - .rev() - .filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("user")) - .find_map(extract_text); - let last_user_text = last_user_raw.clone().unwrap_or_default(); - - // user_query: last user message text, stripped of metadata prefix - let user_query = last_user_raw.as_deref().map(Self::strip_user_query_prefix); - - // Serialise message subsets for the pending record - let sys: Vec<_> = messages - .iter() - .filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("system")) - .collect(); - let non_sys: Vec<_> = messages - .iter() - .filter(|m| m.get("role").and_then(|r| r.as_str()) != Some("system")) - .collect(); - - let input_messages = if non_sys.is_empty() { - None - } else { - serde_json::to_string(&non_sys).ok() - }; - let system_instructions = if sys.is_empty() { - None - } else { - serde_json::to_string(&sys).ok() - }; - - ( - user_query, - input_messages, - system_instructions, - first_user_text, - last_user_text, - ) + if let Some(view) = body.as_ref().and_then(Self::extract_messages_view) { + let (messages, instructions_text) = view; + + // First user message raw text — used as `session_key` material + // for IdResolver peek / crash fallback. + let first_user_text = messages + .iter() + .filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("user")) + .find_map(Self::extract_message_text) + .unwrap_or_default(); + + // Last user message raw text — used for user_query (display text) + // 以及 conversation_key (peek / crash fallback)。 + let last_user_raw = messages + .iter() + .rev() + .filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("user")) + .find_map(Self::extract_message_text); + let last_user_text = last_user_raw.clone().unwrap_or_default(); + + // user_query: last user message text, stripped of metadata prefix + let user_query = last_user_raw.as_deref().map(Self::strip_user_query_prefix); + + // Serialise message subsets for the pending record + let sys: Vec<_> = messages + .iter() + .filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("system")) + .collect(); + let non_sys: Vec<_> = messages + .iter() + .filter(|m| m.get("role").and_then(|r| r.as_str()) != Some("system")) + .collect(); + + let input_messages = if non_sys.is_empty() { + None } else { - // messages key missing or not an array - (None, None, None, String::new(), String::new()) - } + serde_json::to_string(&non_sys).ok() + }; + let system_instructions = if sys.is_empty() { + // Responses API carries the system prompt at the top level + // via "instructions". Fall back to that when the messages + // array has no system role. + instructions_text.map(|s| serde_json::to_string(&s).unwrap_or(s)) + } else { + serde_json::to_string(&sys).ok() + }; + + ( + user_query, + input_messages, + system_instructions, + first_user_text, + last_user_text, + ) } else { (None, None, None, String::new(), String::new()) }; @@ -496,6 +470,36 @@ impl GenAIBuilder { #[cfg(test)] mod tests { use super::*; + use crate::probes::sslsniff::SslEvent; + use std::rc::Rc; + + fn make_request(path: &str, body: &str) -> ParsedRequest { + let buf = body.as_bytes().to_vec(); + let ssl_event = Rc::new(SslEvent { + source: 0, + timestamp_ns: 1000, + delta_ns: 0, + pid: 1234, + tid: 1, + uid: 0, + len: buf.len() as u32, + rw: 1, + comm: "test".to_string(), + buf, + is_handshake: false, + ssl_ptr: 0x1, + }); + ParsedRequest { + method: "POST".to_string(), + path: path.to_string(), + version: 11, + headers: std::collections::HashMap::new(), + body_offset: 0, + body_len: body.len(), + source_event: ssl_event, + reassembled_body: None, + } + } #[test] fn test_generate_id_unique() { @@ -517,4 +521,64 @@ mod tests { assert!(id1.contains('_')); assert!(id2.contains('_')); } + + #[test] + fn test_build_pending_from_request_chat_completions() { + let builder = GenAIBuilder::new(); + let body = r#"{"model":"gpt-4","messages":[{"role":"system","content":"sys"},{"role":"user","content":"hello"}]}"#; + let req = make_request("/v1/chat/completions", body); + let cache = std::collections::HashMap::new(); + let pending = builder + .build_pending_from_request(&req, &ConnectionId { pid: 1, ssl_ptr: 2 }, &cache) + .unwrap(); + assert_eq!(pending.model.as_deref(), Some("gpt-4")); + assert_eq!(pending.provider.as_deref(), Some("openai")); + assert!(pending.system_instructions.is_some()); + assert!(pending.user_query.as_deref() == Some("hello")); + assert_eq!(pending.call_kind, "main"); + } + + #[test] + fn test_build_pending_from_request_responses_api() { + let builder = GenAIBuilder::new(); + let body = r#"{"model":"gpt-4","input":[{"role":"user","content":"hello"}],"instructions":"sys prompt"}"#; + let req = make_request("/v1/responses", body); + let cache = std::collections::HashMap::new(); + let pending = builder + .build_pending_from_request(&req, &ConnectionId { pid: 1, ssl_ptr: 2 }, &cache) + .unwrap(); + assert_eq!(pending.model.as_deref(), Some("gpt-4")); + assert_eq!(pending.provider.as_deref(), Some("openai")); + assert!(pending.system_instructions.is_some()); + assert!(pending.user_query.as_deref() == Some("hello")); + } + + #[test] + fn test_build_pending_from_request_non_llm_path() { + let builder = GenAIBuilder::new(); + let body = r#"{"model":"gpt-4","messages":[]}"#; + let req = make_request("/api/health", body); + let cache = std::collections::HashMap::new(); + assert!( + builder + .build_pending_from_request(&req, &ConnectionId { pid: 1, ssl_ptr: 2 }, &cache) + .is_none() + ); + } + + #[test] + fn test_build_pending_from_request_llm_path_no_messages_view() { + let builder = GenAIBuilder::new(); + // LLM path but body lacks both "messages" and "input". + let body = r#"{"model":"gpt-4","stream":true}"#; + let req = make_request("/v1/chat/completions", body); + let cache = std::collections::HashMap::new(); + let pending = builder + .build_pending_from_request(&req, &ConnectionId { pid: 1, ssl_ptr: 2 }, &cache) + .expect("LLM path should still create pending even without messages"); + assert_eq!(pending.model.as_deref(), Some("gpt-4")); + assert!(pending.user_query.is_none()); + assert!(pending.input_messages.is_none()); + assert!(pending.system_instructions.is_none()); + } } diff --git a/src/agentsight/src/genai/call_builder.rs b/src/agentsight/src/genai/call_builder.rs index 30ed2afe2..cfd71d7a9 100644 --- a/src/agentsight/src/genai/call_builder.rs +++ b/src/agentsight/src/genai/call_builder.rs @@ -91,9 +91,6 @@ impl GenAIBuilder { let first_user_raw = Self::extract_first_user_raw(&request).unwrap_or_default(); let last_user_raw = Self::extract_last_user_raw(&request).unwrap_or_default(); - // Classify call_kind from request content - let call_kind = super::helpers::classify_call_kind(&request); - // 提取 LLM API 的 response_id(如 chatcmpl-xxx),用作 trace_id // 同时作为 call_id 的首选值:trace_id 有值时直接复用,避免两套 ID; // SysOM / 解析失败等无 response_id 的场景 fallback 到内部生成的 internal_id。 @@ -107,9 +104,9 @@ impl GenAIBuilder { .unwrap_or_else(|| http.comm.clone()); let pid_i32 = http.pid as i32; - // session_id: 优先从 request metadata 获取(Claude Code), - // 次优先 response ID → .jsonl UUID 映射, - // 兜底 hash。 + // session_id: 优先从 agent 自身的 session 获取(通过 response ID → .jsonl UUID 映射), + // 未命中时再尝试 pid → .jsonl UUID 映射(Codex CLI 等 rollout 文件不内嵌 response_id 的场景), + // 仍未命中则 fallback 到 `SHA256("session" + 该 session 内最早 response_id)`。 let metadata_session = parsed_message .as_ref() .and_then(|m| m.request_metadata_session_id()); @@ -120,6 +117,7 @@ impl GenAIBuilder { let mapper_session = parsed_response_id .as_deref() .and_then(|rid| response_mapper.get_session_by_response_id(rid)) + .or_else(|| response_mapper.get_session_by_pid(http.pid)) .map(|s| s.to_string()); let session_id = metadata_session.or(mapper_session).or_else(|| { self.id_resolver @@ -242,7 +240,6 @@ impl GenAIBuilder { } else if http.path.contains("/api/v1/copilot/generate_copilot") { meta.insert("operation_name".to_string(), "chat".to_string()); } - meta.insert("call_kind".to_string(), call_kind.as_str().to_string()); // conversation_id: 对话ID,同一 user query 触发的所有调用共享 if let Some(ref cid) = conversation_id { meta.insert("conversation_id".to_string(), cid.clone()); diff --git a/src/agentsight/src/genai/helpers.rs b/src/agentsight/src/genai/helpers.rs index a2ea6e20c..77e573809 100644 --- a/src/agentsight/src/genai/helpers.rs +++ b/src/agentsight/src/genai/helpers.rs @@ -13,6 +13,7 @@ use crate::discovery::matcher::{CmdlineGlobMatcher, ProcessContext}; /// LLM call classification: Main (normal), Recap (compaction/summary), WebSearch. #[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(dead_code)] pub(super) enum CallKind { Main, Recap, @@ -20,6 +21,7 @@ pub(super) enum CallKind { } impl CallKind { + #[allow(dead_code)] pub fn as_str(&self) -> &'static str { match self { CallKind::Main => "main", @@ -33,6 +35,7 @@ impl CallKind { /// /// Conservative: unmatched → Main (zero false positives > recall). /// Signatures are from real captures (case-sensitive .contains()). +#[allow(dead_code)] pub(super) fn classify_call_kind(request: &LLMRequest) -> CallKind { // Collect system instructions text let system_text: String = request @@ -174,6 +177,65 @@ impl GenAIBuilder { .unwrap_or(false) } + /// Normalize the messages array from a parsed request body. + /// + /// Supports both formats: + /// - OpenAI chat completions: top-level `"messages"` array. + /// - OpenAI Responses API (codex 0.137+ via dashscope `/v1/responses`): + /// top-level `"input"` array with sibling `"instructions"` string. + /// + /// Returns `(messages_vec, instructions_text)` where `instructions_text` + /// is only set when the Responses API form is used (it serves as the + /// system prompt fallback when the messages array has no system role). + pub(super) fn extract_messages_view( + body: &serde_json::Value, + ) -> Option<(Vec, Option)> { + if let Some(arr) = body.get("messages").and_then(|m| m.as_array()) { + return Some((arr.clone(), None)); + } + if let Some(arr) = body.get("input").and_then(|m| m.as_array()) { + let instructions = body + .get("instructions") + .and_then(|s| s.as_str()) + .map(|s| s.to_string()); + return Some((arr.clone(), instructions)); + } + None + } + + /// Extract human-readable text from a message's `content` field. + /// + /// Supports: + /// - Plain string: `"content": "text"`. + /// - Array of content blocks with type `text` / `input_text` / `output_text`: + /// `"content": [{"type":"input_text","text":"..."}]`. + pub(super) fn extract_message_text(message: &serde_json::Value) -> Option { + let c = message.get("content")?; + if let Some(s) = c.as_str() { + if !s.is_empty() { + return Some(s.to_string()); + } + } + if let Some(arr) = c.as_array() { + let text: String = arr + .iter() + .filter_map(|item| { + let ty = item.get("type").and_then(|t| t.as_str()).unwrap_or(""); + if matches!(ty, "text" | "input_text" | "output_text") { + item.get("text").and_then(|t| t.as_str()) + } else { + None + } + }) + .collect::>() + .join("\n"); + if !text.is_empty() { + return Some(text); + } + } + None + } + /// Extract provider from path pub(super) fn extract_provider_from_path(&self, path: &str) -> Option { if path.contains("anthropic") || path.contains("/v1/messages") { @@ -861,4 +923,90 @@ mod tests { ); assert_eq!(builder.extract_model_from_message(&None), None); } + + #[test] + fn test_extract_messages_view_chat_completions() { + let body = serde_json::json!({ + "model": "gpt-4", + "messages": [ + {"role": "system", "content": "sys"}, + {"role": "user", "content": "hi"} + ] + }); + let (msgs, instructions) = GenAIBuilder::extract_messages_view(&body).unwrap(); + assert_eq!(msgs.len(), 2); + assert!(instructions.is_none()); + } + + #[test] + fn test_extract_messages_view_responses_api() { + let body = serde_json::json!({ + "model": "gpt-4", + "input": [{"role": "user", "content": "hi"}], + "instructions": "sys prompt" + }); + let (msgs, instructions) = GenAIBuilder::extract_messages_view(&body).unwrap(); + assert_eq!(msgs.len(), 1); + assert_eq!(instructions.as_deref(), Some("sys prompt")); + } + + #[test] + fn test_extract_messages_view_none() { + let body = serde_json::json!({"model": "gpt-4"}); + assert!(GenAIBuilder::extract_messages_view(&body).is_none()); + } + + #[test] + fn test_extract_messages_view_responses_api_without_instructions() { + let body = serde_json::json!({ + "model": "gpt-4", + "input": [{"role": "user", "content": "hi"}] + }); + let (msgs, instructions) = GenAIBuilder::extract_messages_view(&body).unwrap(); + assert_eq!(msgs.len(), 1); + assert!(instructions.is_none()); + } + + #[test] + fn test_extract_message_text_string() { + let msg = serde_json::json!({"role": "user", "content": "hello"}); + assert_eq!( + GenAIBuilder::extract_message_text(&msg), + Some("hello".to_string()) + ); + } + + #[test] + fn test_extract_message_text_array() { + let msg = serde_json::json!({ + "role": "user", + "content": [ + {"type": "input_text", "text": "hello"}, + {"type": "output_text", "text": "world"}, + {"type": "image", "text": "ignored"} + ] + }); + assert_eq!( + GenAIBuilder::extract_message_text(&msg), + Some("hello\nworld".to_string()) + ); + } + + #[test] + fn test_extract_message_text_empty() { + let msg = serde_json::json!({"role": "user", "content": ""}); + assert_eq!(GenAIBuilder::extract_message_text(&msg), None); + } + + #[test] + fn test_extract_message_text_array_non_text_only() { + let msg = serde_json::json!({ + "role": "user", + "content": [ + {"type": "image", "text": "ignored"}, + {"type": "image_url", "image_url": {"url": "http://example.com"}} + ] + }); + assert_eq!(GenAIBuilder::extract_message_text(&msg), None); + } } diff --git a/src/agentsight/src/genai/openai_parse.rs b/src/agentsight/src/genai/openai_parse.rs index e1262ccd6..7da5c0887 100644 --- a/src/agentsight/src/genai/openai_parse.rs +++ b/src/agentsight/src/genai/openai_parse.rs @@ -18,78 +18,89 @@ impl GenAIBuilder { let v: serde_json::Value = serde_json::from_str(body).ok()?; let obj = v.as_object()?; - // 解析 messages 数组 - let messages = obj - .get("messages") - .and_then(|m| m.as_array()) - .map(|arr| { - arr.iter() - .filter_map(|msg| { - let role = msg.get("role")?.as_str()?.to_string(); - let mut parts = Vec::new(); - - // content 可以是字符串或数组 - if let Some(content) = msg.get("content") { - if let Some(s) = content.as_str() { - if !s.is_empty() { - parts.push(MessagePart::Text { - content: s.to_string(), - }); - } - } else if let Some(arr) = content.as_array() { - for item in arr { - if let Some(text) = item.get("text").and_then(|t| t.as_str()) { - parts.push(MessagePart::Text { - content: text.to_string(), - }); - } - } - } - } + // Normalized view: "messages" (chat completions) or "input" + "instructions" + // (Responses API used by codex 0.137+ via dashscope /v1/responses). + let (raw_messages, instructions_text) = Self::extract_messages_view(&v)?; + + let mut messages: Vec = Vec::new(); + + // Responses API: prepend top-level "instructions" as a synthetic system message + if let Some(instr) = instructions_text { + if !instr.is_empty() { + messages.push(InputMessage { + role: "system".to_string(), + parts: vec![MessagePart::Text { content: instr }], + name: None, + }); + } + } - // tool_call 结果 (role=tool) - if role == "tool" { - if let Some(content) = msg.get("content") { - let id = msg - .get("tool_call_id") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - parts = vec![MessagePart::ToolCallResponse { - id, - response: content.clone(), - }]; - } + for msg in &raw_messages { + let Some(role) = msg.get("role").and_then(|v| v.as_str()).map(String::from) else { + continue; + }; + let mut parts = Vec::new(); + + // content can be string or array of blocks. Accept blocks with a + // "text" field regardless of type (handles "text" / "input_text" / + // "output_text" alike). + if let Some(content) = msg.get("content") { + if let Some(s) = content.as_str() { + if !s.is_empty() { + parts.push(MessagePart::Text { + content: s.to_string(), + }); + } + } else if let Some(arr) = content.as_array() { + for item in arr { + if let Some(text) = item.get("text").and_then(|t| t.as_str()) { + parts.push(MessagePart::Text { + content: text.to_string(), + }); } + } + } + } - // tool_calls (role=assistant 发起的 tool calls) - if let Some(tool_calls) = msg.get("tool_calls").and_then(|v| v.as_array()) { - for tc in tool_calls { - let id = - tc.get("id").and_then(|v| v.as_str()).map(|s| s.to_string()); - let func = tc.get("function").unwrap_or(&serde_json::Value::Null); - let name = func - .get("name") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let arguments = func.get("arguments").cloned(); - parts.push(MessagePart::ToolCall { - id, - name, - arguments, - }); - } - } + // tool_call 结果 (role=tool) + if role == "tool" { + if let Some(content) = msg.get("content") { + let id = msg + .get("tool_call_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + parts = vec![MessagePart::ToolCallResponse { + id, + response: content.clone(), + }]; + } + } - Some(InputMessage { - role, - parts, - name: None, - }) - }) - .collect::>() - }) - .unwrap_or_default(); + // tool_calls (role=assistant 发起的 tool calls) + if let Some(tool_calls) = msg.get("tool_calls").and_then(|v| v.as_array()) { + for tc in tool_calls { + let id = tc.get("id").and_then(|v| v.as_str()).map(|s| s.to_string()); + let func = tc.get("function").unwrap_or(&serde_json::Value::Null); + let name = func + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let arguments = func.get("arguments").cloned(); + parts.push(MessagePart::ToolCall { + id, + name, + arguments, + }); + } + } + + messages.push(InputMessage { + role, + parts, + name: None, + }); + } if messages.is_empty() { return None; @@ -501,6 +512,64 @@ mod tests { ); } + #[test] + fn test_parse_request_body_responses_api() { + let body = r#"{ + "model": "gpt-4", + "input": [{"role": "user", "content": "Hello"}], + "instructions": "You are helpful.", + "stream": true + }"#; + let req = GenAIBuilder::parse_request_body(body).unwrap(); + assert_eq!(req.messages.len(), 2); + assert_eq!(req.messages[0].role, "system"); + assert_eq!(req.messages[1].role, "user"); + assert!(req.stream); + } + + #[test] + fn test_parse_request_body_responses_api_empty_instructions() { + let body = r#"{ + "model": "gpt-4", + "input": [{"role": "user", "content": "Hello"}], + "instructions": "", + "stream": true + }"#; + let req = GenAIBuilder::parse_request_body(body).unwrap(); + // Empty instructions should be skipped, so only the input message remains. + assert_eq!(req.messages.len(), 1); + assert_eq!(req.messages[0].role, "user"); + } + + #[test] + fn test_parse_request_body_content_array() { + let body = r#"{ + "model": "gpt-4", + "messages": [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]} + ] + }"#; + let req = GenAIBuilder::parse_request_body(body).unwrap(); + assert_eq!(req.messages.len(), 1); + assert!( + matches!(&req.messages[0].parts[0], MessagePart::Text { content } if content == "Hello") + ); + } + + #[test] + fn test_parse_request_body_skips_missing_role() { + let body = r#"{ + "model": "gpt-4", + "messages": [ + {"content": "no role"}, + {"role": "user", "content": "Hello"} + ] + }"#; + let req = GenAIBuilder::parse_request_body(body).unwrap(); + assert_eq!(req.messages.len(), 1); + assert_eq!(req.messages[0].role, "user"); + } + #[test] fn test_parse_request_body_empty_messages() { let body = r#"{"model": "gpt-4", "messages": []}"#; diff --git a/src/agentsight/src/health/checker.rs b/src/agentsight/src/health/checker.rs index 29b6e24a8..f4b336a6e 100644 --- a/src/agentsight/src/health/checker.rs +++ b/src/agentsight/src/health/checker.rs @@ -14,6 +14,44 @@ use crate::discovery::AgentScanner; use crate::interruption::{InterruptionEvent, InterruptionType, was_pid_oom_killed}; use crate::storage::sqlite::{GenAISqliteStore, InterruptionStore}; +/// Infer the UI role for a discovered agent. +/// +/// Rules (in order): +/// 1. ports != empty → Gateway (real service with TCP port) +/// 2. agent has no service port → Client (stand-alone CLI like Codex +/// whose UI card adds no value when +/// there is no gateway endpoint) +/// 3. parent is same agent_name → Worker (genuine fork, fold under parent) +/// 4. otherwise → Gateway (independent process, own card) +/// +/// Two separately-launched hermes/openclaw client instances are +/// independent (no parent-child link, different terminals), so they +/// each deserve their own primary card; only true forks go into the +/// associated-processes drawer of their parent. +fn infer_agent_role( + agent_name: &str, + ports: &[u16], + ppid: Option, + agent_name_by_pid: &HashMap, +) -> AgentRole { + if !ports.is_empty() { + return AgentRole::Gateway; + } + if agent_name == "Codex" { + return AgentRole::Client; + } + if let Some(pp) = ppid { + if agent_name_by_pid + .get(&pp) + .map(|n| n == agent_name) + .unwrap_or(false) + { + return AgentRole::Worker; + } + } + AgentRole::Gateway +} + /// Background health checker that periodically probes discovered agents pub struct HealthChecker { store: Arc>, @@ -262,30 +300,7 @@ impl HealthChecker { // Read parent PID from /proc//stat for role inference let ppid = read_ppid(agent.pid); - // Infer role: - // 1. ports != empty → Gateway (real service with TCP port) - // 2. parent is same agent_name → Worker (genuine fork, fold under parent) - // 3. otherwise → Gateway (independent process, own card) - // - // Two separately-launched hermes/openclaw client instances are - // independent (no parent-child link, different terminals), so they - // each deserve their own primary card; only true forks go into the - // associated-processes drawer of their parent. - let role = if !ports.is_empty() { - AgentRole::Gateway - } else if let Some(pp) = ppid { - if agent_name_by_pid - .get(&pp) - .map(|n| n == &agent.agent_info.name) - .unwrap_or(false) - { - AgentRole::Worker - } else { - AgentRole::Gateway - } - } else { - AgentRole::Gateway - }; + let role = infer_agent_role(&agent.agent_info.name, &ports, ppid, &agent_name_by_pid); let status = if ports.is_empty() { AgentHealthStatus { @@ -483,3 +498,56 @@ fn read_ppid(pid: u32) -> Option { let ppid_str = fields.next()?; ppid_str.parse::().ok() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_infer_agent_role_gateway_with_ports() { + let mut map = HashMap::new(); + map.insert(1, "Codex".to_string()); + assert_eq!( + infer_agent_role("Codex", &[8080], None, &map), + AgentRole::Gateway + ); + } + + #[test] + fn test_infer_agent_role_codex_client_without_ports() { + let map = HashMap::new(); + assert_eq!( + infer_agent_role("Codex", &[], None, &map), + AgentRole::Client + ); + } + + #[test] + fn test_infer_agent_role_worker_same_parent_name() { + let mut map = HashMap::new(); + map.insert(1, "Hermes".to_string()); + assert_eq!( + infer_agent_role("Hermes", &[], Some(1), &map), + AgentRole::Worker + ); + } + + #[test] + fn test_infer_agent_role_gateway_different_parent_name() { + let mut map = HashMap::new(); + map.insert(1, "Other".to_string()); + assert_eq!( + infer_agent_role("Hermes", &[], Some(1), &map), + AgentRole::Gateway + ); + } + + #[test] + fn test_infer_agent_role_gateway_no_parent() { + let map = HashMap::new(); + assert_eq!( + infer_agent_role("Hermes", &[], None, &map), + AgentRole::Gateway + ); + } +} diff --git a/src/agentsight/src/parser/sse/event.rs b/src/agentsight/src/parser/sse/event.rs index 3fe659c99..267a9895a 100644 --- a/src/agentsight/src/parser/sse/event.rs +++ b/src/agentsight/src/parser/sse/event.rs @@ -98,6 +98,8 @@ impl ParsedSseEvent { /// Recognizes: /// - OpenAI style: data is `[DONE]` or `[END]` /// - Anthropic style: event field is `message_stop`, or data is `{"type":"message_stop"}` + /// - OpenAI Responses API: event field is `response.completed`/`response.failed`/`response.incomplete`, + /// or data is `{"type":"response.completed",...}`. pub fn is_done(&self) -> bool { if self.is_synthetic_done { return true; @@ -106,6 +108,14 @@ impl ParsedSseEvent { if self.event.as_deref() == Some("message_stop") { return true; } + // OpenAI Responses API: event field flags a terminal frame. + // Use this even when the data payload is too large to parse as JSON. + if matches!( + self.event.as_deref(), + Some("response.completed") | Some("response.failed") | Some("response.incomplete") + ) { + return true; + } let data = self.data(); let text = String::from_utf8_lossy(data); let trimmed = text.trim(); @@ -118,7 +128,11 @@ impl ParsedSseEvent { if trimmed.starts_with('{') { if let Ok(v) = serde_json::from_str::(trimmed) { let t = v.get("type").and_then(|t| t.as_str()); - if t == Some("message_stop") || t == Some("response.completed") { + if t == Some("message_stop") + || t == Some("response.completed") + || t == Some("response.failed") + || t == Some("response.incomplete") + { return true; } } @@ -537,6 +551,22 @@ mod tests { assert!(parsed.is_done()); } + #[test] + fn test_is_done_responses_api_failed() { + let data = b"{\"type\":\"response.failed\"}"; + let ev = make_event(data); + let parsed = ParsedSseEvent::new(None, None, None, 0, data.len(), ev); + assert!(parsed.is_done()); + } + + #[test] + fn test_is_done_responses_api_incomplete() { + let data = b"{\"type\":\"response.incomplete\"}"; + let ev = make_event(data); + let parsed = ParsedSseEvent::new(None, None, None, 0, data.len(), ev); + assert!(parsed.is_done()); + } + #[test] fn test_is_done_responses_api_delta_not_done() { let data = b"{\"type\":\"response.output_text.delta\",\"delta\":\"hi\"}"; diff --git a/src/agentsight/src/probes/codex_offsets.rs b/src/agentsight/src/probes/codex_offsets.rs new file mode 100644 index 000000000..fb146522a --- /dev/null +++ b/src/agentsight/src/probes/codex_offsets.rs @@ -0,0 +1,144 @@ +use sha2::{Digest, Sha256}; +use std::fs::File; +use std::io::Read; + +use super::elf_buildid; +use super::sslsniff::BoringSslOffsets; + +const HEAD_SIZE: usize = 65536; + +#[derive(Debug)] +struct OffsetEntry { + fingerprint: Fingerprint, + offsets: Option, +} + +#[derive(Debug)] +struct Fingerprint { + build_id: Option, + file_size: u64, + head_64k_sha256: String, +} + +pub struct OffsetTable { + entries: Vec, +} + +impl OffsetTable { + pub fn load(json_str: &str) -> Option { + let root: serde_json::Value = serde_json::from_str(json_str).ok()?; + let codex_offsets = root.get("codex_offsets")?; + let entries_val = codex_offsets.get("entries")?.as_array()?; + + let mut entries = Vec::new(); + for e in entries_val { + let fp = e.get("fingerprint")?; + let fingerprint = Fingerprint { + build_id: fp + .get("build_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + file_size: fp.get("file_size")?.as_u64()?, + head_64k_sha256: fp.get("head_64k_sha256")?.as_str()?.to_string(), + }; + + let offsets = match e.get("offsets") { + Some(serde_json::Value::Object(obj)) => { + let w = obj.get("ssl_write")?.as_u64()? as usize; + let r = obj.get("ssl_read")?.as_u64()? as usize; + let h = obj.get("ssl_do_handshake")?.as_u64()? as usize; + let write_is_ex = obj + .get("write_is_ex") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let read_is_ex = obj + .get("read_is_ex") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + Some(BoringSslOffsets { + ssl_write: w, + ssl_read: r, + ssl_do_handshake: h, + write_is_ex, + read_is_ex, + }) + } + _ => None, + }; + + entries.push(OffsetEntry { + fingerprint, + offsets, + }); + } + + Some(Self { entries }) + } + + pub fn lookup(&self, path: &str) -> Option { + let metadata = std::fs::metadata(path).ok()?; + let file_size = metadata.len(); + + let candidates: Vec<&OffsetEntry> = self + .entries + .iter() + .filter(|e| e.fingerprint.file_size == file_size) + .collect(); + + if candidates.is_empty() { + return None; + } + + let build_id = elf_buildid::read_buildid(path); + if let Some(ref bid) = build_id { + for entry in &candidates { + if entry.fingerprint.build_id.as_deref() == Some(bid.as_str()) { + return entry.offsets.clone(); + } + } + } + + let head_sha = compute_head_sha256(path)?; + for entry in &candidates { + if entry.fingerprint.head_64k_sha256 == head_sha { + return entry.offsets.clone(); + } + } + + None + } +} + +fn compute_head_sha256(path: &str) -> Option { + let mut f = File::open(path).ok()?; + let mut buf = vec![0u8; HEAD_SIZE]; + let n = f.read(&mut buf).ok()?; + buf.truncate(n); + let hash = Sha256::digest(&buf); + Some(hash.iter().map(|b| format!("{:02x}", b)).collect()) +} + +#[cfg(test)] +mod tests { + use super::*; + + const TEST_JSON: &str = r#"{ + "codex_offsets": { + "schema_version": 1, + "entries": [ + { + "codex_version": "0.141.0", + "fingerprint": { "file_size": 100, "head_64k_sha256": "abc123" }, + "offsets": { "ssl_write": 1000, "ssl_read": 2000, "ssl_do_handshake": 3000 } + } + ] + } + }"#; + + #[test] + fn load_table() { + let table = OffsetTable::load(TEST_JSON).unwrap(); + assert_eq!(table.entries.len(), 1); + assert_eq!(table.entries[0].offsets.as_ref().unwrap().ssl_write, 1000); + } +} diff --git a/src/agentsight/src/probes/elf_buildid.rs b/src/agentsight/src/probes/elf_buildid.rs new file mode 100644 index 000000000..28bf7a02a --- /dev/null +++ b/src/agentsight/src/probes/elf_buildid.rs @@ -0,0 +1,163 @@ +use std::fs::File; +use std::io::{Read, Seek, SeekFrom}; + +const ELF_MAGIC: &[u8; 4] = b"\x7fELF"; +const PT_NOTE: u32 = 4; +const NT_GNU_BUILD_ID: u32 = 3; + +/// Parse GNU Build-ID from an ELF binary's PT_NOTE segment. +/// Returns the hex-encoded build-id string, or None if not present. +/// +/// **ELF64 only.** 32-bit ELF (`class == 1`) is rejected early because +/// Codex CLI ships as a statically-linked musl x86-64 binary. If 32-bit +/// support is ever needed, the header layout and pointer-sized fields +/// (e_phoff, p_offset, p_filesz) must be read as u32 instead of u64. +pub fn read_buildid(path: &str) -> Option { + let mut f = File::open(path).ok()?; + let mut ident = [0u8; 16]; + f.read_exact(&mut ident).ok()?; + if &ident[0..4] != ELF_MAGIC { + return None; + } + let class = ident[4]; + let le = ident[5] == 1; + if class != 2 { + return None; + } + + let read_u16 = |buf: &[u8]| -> u16 { + if le { + u16::from_le_bytes([buf[0], buf[1]]) + } else { + u16::from_be_bytes([buf[0], buf[1]]) + } + }; + let read_u32 = |buf: &[u8]| -> u32 { + if le { + u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) + } else { + u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) + } + }; + let read_u64 = |buf: &[u8]| -> Option { + let arr: [u8; 8] = buf[0..8].try_into().ok()?; + Some(if le { + u64::from_le_bytes(arr) + } else { + u64::from_be_bytes(arr) + }) + }; + + // ELF64 header: e_phoff at offset 32, e_phentsize at 54, e_phnum at 56 + let mut hdr = [0u8; 64]; + f.seek(SeekFrom::Start(0)).ok()?; + f.read_exact(&mut hdr).ok()?; + + let e_phoff = read_u64(&hdr[32..])?; + let e_phentsize = read_u16(&hdr[54..]) as u64; + let e_phnum = read_u16(&hdr[56..]) as u64; + + for i in 0..e_phnum { + let off = e_phoff + i * e_phentsize; + let mut phdr = [0u8; 56]; + f.seek(SeekFrom::Start(off)).ok()?; + f.read_exact(&mut phdr).ok()?; + + let p_type = read_u32(&phdr[0..]); + if p_type != PT_NOTE { + continue; + } + + let p_offset = read_u64(&phdr[8..])?; + let p_filesz = read_u64(&phdr[32..])?; + + let mut note_buf = vec![0u8; p_filesz as usize]; + f.seek(SeekFrom::Start(p_offset)).ok()?; + f.read_exact(&mut note_buf).ok()?; + + let mut pos = 0usize; + while pos + 12 <= note_buf.len() { + let namesz = read_u32(¬e_buf[pos..]) as usize; + let descsz = read_u32(¬e_buf[pos + 4..]) as usize; + let note_type = read_u32(¬e_buf[pos + 8..]); + pos += 12; + + let name_aligned = (pos + namesz + 3) & !3; + let desc_start = name_aligned; + let desc_end = desc_start + descsz; + let desc_aligned = (desc_end + 3) & !3; + + if desc_end > note_buf.len() { + break; + } + + if note_type == NT_GNU_BUILD_ID + && namesz == 4 + && note_buf.get(pos..pos + 3) == Some(b"GNU") + { + let id_bytes = ¬e_buf[desc_start..desc_end]; + return Some(hex_encode(id_bytes)); + } + + pos = desc_aligned; + } + } + None +} + +fn hex_encode(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{:02x}", b)).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_note_section() { + let mut elf = vec![0u8; 256]; + elf[0..4].copy_from_slice(b"\x7fELF"); + elf[4] = 2; // 64-bit + elf[5] = 1; // little-endian + elf[32..40].copy_from_slice(&64u64.to_le_bytes()); // e_phoff + elf[54..56].copy_from_slice(&56u16.to_le_bytes()); // e_phentsize + elf[56..58].copy_from_slice(&1u16.to_le_bytes()); // e_phnum + + let ph_off = 64usize; + elf[ph_off..ph_off + 4].copy_from_slice(&4u32.to_le_bytes()); // PT_NOTE + elf[ph_off + 8..ph_off + 16].copy_from_slice(&128u64.to_le_bytes()); // p_offset + elf[ph_off + 32..ph_off + 40].copy_from_slice(&32u64.to_le_bytes()); // p_filesz + + let note_off = 128usize; + elf[note_off..note_off + 4].copy_from_slice(&4u32.to_le_bytes()); // namesz + elf[note_off + 4..note_off + 8].copy_from_slice(&4u32.to_le_bytes()); // descsz + elf[note_off + 8..note_off + 12].copy_from_slice(&3u32.to_le_bytes()); // NT_GNU_BUILD_ID + elf[note_off + 12..note_off + 16].copy_from_slice(b"GNU\0"); + elf[note_off + 16..note_off + 20].copy_from_slice(&[0xde, 0xad, 0xbe, 0xef]); + + let dir = std::env::temp_dir(); + let path = dir.join("test_buildid.elf"); + std::fs::write(&path, &elf).unwrap(); + let result = read_buildid(path.to_str().unwrap()); + std::fs::remove_file(&path).ok(); + assert_eq!(result, Some("deadbeef".to_string())); + } + + #[test] + fn no_buildid_returns_none() { + let mut elf = vec![0u8; 128]; + elf[0..4].copy_from_slice(b"\x7fELF"); + elf[4] = 2; + elf[5] = 1; + elf[32..40].copy_from_slice(&64u64.to_le_bytes()); + elf[54..56].copy_from_slice(&56u16.to_le_bytes()); + elf[56..58].copy_from_slice(&0u16.to_le_bytes()); // no program headers + + let dir = std::env::temp_dir(); + let path = dir.join("test_no_buildid.elf"); + std::fs::write(&path, &elf).unwrap(); + let result = read_buildid(path.to_str().unwrap()); + std::fs::remove_file(&path).ok(); + assert_eq!(result, None); + } +} diff --git a/src/agentsight/src/probes/mod.rs b/src/agentsight/src/probes/mod.rs index 5be212333..36b44a93d 100644 --- a/src/agentsight/src/probes/mod.rs +++ b/src/agentsight/src/probes/mod.rs @@ -9,6 +9,9 @@ pub mod sslsniff; pub mod tcpsniff; pub mod udpdns; +mod codex_offsets; +mod elf_buildid; + // Re-export commonly used types pub use filewatch::{FileWatch, FileWatchEvent}; pub use filewrite::{FileWrite as FileWriteProbe, FileWriteEvent}; diff --git a/src/agentsight/src/probes/sslsniff.rs b/src/agentsight/src/probes/sslsniff.rs index 9cf66cd0b..6d00ded54 100644 --- a/src/agentsight/src/probes/sslsniff.rs +++ b/src/agentsight/src/probes/sslsniff.rs @@ -326,44 +326,58 @@ impl SslSniff { log::debug!("[attach_process] pid={pid}: attaching {kind:?} → {path}"); - // uprobe attach requires host filesystem paths, not /proc/{pid}/root/... paths. - // canonicalize resolves /proc/{pid}/root symlink to the real host path, - // handling both container (overlay rootfs) and non-container (/ symlink) cases. - // Falls back to original path if the process has already exited. - let uprobe_path = std::fs::canonicalize(&path) - .map(|p| p.to_string_lossy().into_owned()) - .unwrap_or_else(|_| path.clone()); - let result = match kind { // Use pid=-1 for global attach (all processes), avoiding per-process duplicate attaches - SslLibKind::OpenSsl => attach_openssl(&mut self.skel, &uprobe_path, -1), - SslLibKind::GnuTls => attach_gnutls(&mut self.skel, &uprobe_path, -1), - SslLibKind::Nss => attach_nss(&mut self.skel, &uprobe_path, -1), - SslLibKind::Boring => { - match attach_boringssl_by_symbol(&mut self.skel, &uprobe_path, -1) { - Ok(ls) => Ok(ls), - Err(sym_err) => { - log::debug!( - "[attach_process] pid={pid}: BoringSSL symbol attach failed for {path} ({sym_err:#}), falling back to byte-pattern" - ); - match find_boringssl_offsets(&path) { - Some(off) => attach_boringssl_by_offset( - &mut self.skel, - &uprobe_path, - &off, - false, - -1, - ), - None => { + SslLibKind::OpenSsl => attach_openssl(&mut self.skel, &path, -1), + SslLibKind::GnuTls => attach_gnutls(&mut self.skel, &path, -1), + SslLibKind::Nss => attach_nss(&mut self.skel, &path, -1), + SslLibKind::Boring => match attach_boringssl_by_symbol(&mut self.skel, &path, -1) { + Ok(ls) => Ok(ls), + Err(sym_err) => { + log::debug!( + "[attach_process] pid={pid}: BoringSSL symbol attach failed for {path} ({sym_err:#}), falling back to byte-pattern" + ); + match find_boringssl_offsets(&path) { + Some(off) => { + attach_boringssl_by_offset(&mut self.skel, &path, &off, false, -1) + } + None => { + // Tier 3: codex offset table lookup (for static-pie binaries + // like Codex CLI that embed aws-lc/BoringSSL without symbols) + if let Some(ref table) = *CODEX_OFFSET_TABLE { + if let Some(off) = table.lookup(&path) { + log::info!( + "[attach_process] pid={pid}: codex offset table matched for {path} \ + (write=0x{:x}, read=0x{:x}, handshake=0x{:x})", + off.ssl_write, + off.ssl_read, + off.ssl_do_handshake + ); + attach_boringssl_by_offset( + &mut self.skel, + &path, + &off, + true, + -1, + ) + } else { + log::warn!( + "[attach_process] pid={pid}: BoringSSL detection failed for {path} \ + (no SSL_* in .dynsym, no byte-pattern match, and not in codex offset table), skipping" + ); + continue; + } + } else { log::warn!( - "[attach_process] pid={pid}: BoringSSL detection failed for {path} (no SSL_* in .dynsym and no byte-pattern match), skipping" + "[attach_process] pid={pid}: BoringSSL detection failed for {path} \ + (no SSL_* in .dynsym and no byte-pattern match), skipping" ); continue; } } } } - } + }, }; match result { @@ -387,16 +401,31 @@ impl SslSniff { Ok(()) } - /// Clean up per-pid bookkeeping when a traced process exits. + /// Detach SSL probes for a process and clean up traced inodes. /// - /// Inodes are intentionally kept in `traced_files` because uprobes are - /// attached globally (`pid=-1`). The existing Links remain valid for all - /// processes using the same library, so re-attaching would only create - /// duplicate fds. + /// When a process exits, its inodes are removed from `traced_files` **only + /// if no other traced pid still references the same inode**. Uprobes are + /// attached globally (`pid=-1`), so the link remains valid for other + /// processes using the same library; removing the inode prematurely would + /// cause the scanner to re-attach on the next sweep, producing duplicate + /// uprobe fds. pub fn detach_process(&mut self, pid: u32) { if let Some(inodes) = self.pid_inodes.remove(&pid) { + let mut removed = 0; + for inode in &inodes { + // Check whether another pid still maps this inode. + let still_used = self + .pid_inodes + .values() + .any(|other_inodes| other_inodes.contains(inode)); + if !still_used { + self.traced_files.remove(inode); + removed += 1; + } + } log::debug!( - "[detach_process] pid={pid}: removed pid_inodes entry ({} inodes, kept in traced_files)", + "[detach_process] pid={pid}: removed {}/{} inodes from traced_files", + removed, inodes.len() ); } @@ -500,10 +529,17 @@ impl Drop for SslPoller { // ─── BoringSSL pattern detection ───────────────────────────────────────────── -struct BoringSslOffsets { - ssl_write: usize, - ssl_read: usize, - ssl_do_handshake: usize, +#[derive(Debug, Clone)] +pub(super) struct BoringSslOffsets { + pub ssl_write: usize, + pub ssl_read: usize, + pub ssl_do_handshake: usize, + /// True when `ssl_write` points to `SSL_write_ex` (returns 0/1 + *written), + /// rather than `SSL_write` (returns byte count). Required for aws-lc where + /// only the _ex variant is exported. + pub write_is_ex: bool, + /// True when `ssl_read` points to `SSL_read_ex` rather than `SSL_read`. + pub read_is_ex: bool, } fn find_pattern(haystack: &[u8], pattern: &[u8]) -> Option { @@ -635,6 +671,8 @@ fn find_boringssl_offsets(path: &str) -> Option { ssl_write: wr_off, ssl_read: read_off, ssl_do_handshake: hs_off, + write_is_ex: false, + read_is_ex: false, }) } @@ -649,7 +687,7 @@ enum SslLibKind { GnuTls, /// libnspr4.so (NSS / Firefox) Nss, - /// BoringSSL embedded in binary (e.g. Node.js, Chrome) + /// BoringSSL / aws-lc embedded in binary (e.g. Node.js, Chrome, Codex CLI) Boring, } @@ -684,6 +722,10 @@ fn classify_ssl_lib(path: &str) -> Option { ) { return Some(SslLibKind::Boring); } + // Codex CLI statically links aws-lc (BoringSSL-compatible TLS library). + if name.starts_with("codex") && !name.contains('.') { + return Some(SslLibKind::Boring); + } // uv Python statically links OpenSSL into the binary. The ELF .symtab contains // SSL_write/SSL_read/SSL_do_handshake as LOCAL symbols, so attach_openssl() // (symbol-name uprobe) works directly. Only match python3. (with version @@ -723,6 +765,15 @@ fn ssl_libs_from_maps(pid: i32) -> Result> { // When the backing file has been unlinked (" (deleted)" in maps), // the filesystem path no longer exists. Fall back to /proc//exe // which the kernel keeps accessible as long as the process is alive. + // + // For normal paths we prefix with `/proc//root` so that the + // uprobe target resolves through the process's own mount namespace. + // This is intentional: `canonicalize()` would resolve overlayfs + // paths to the host's lower/upper dirs, which libbpf cannot always + // map back to an inode for uprobe attachment. The kernel's uprobe + // mechanism natively understands `/proc//root/` because + // it follows the process's mount namespace, making this safe for + // both host and container processes. let path_str = if path_str.ends_with(" (deleted)") { format!("/proc/{pid}/exe") } else { @@ -908,32 +959,66 @@ fn attach_boringssl_by_offset( handshake: bool, pid: i32, ) -> Result> { - let mut links = vec![ - up_off!( + let mut links = Vec::new(); + + if off.write_is_ex { + // SSL_write_ex: returns 0/1, byte count in *written (4th arg). + // Use the _ex BPF programs which read from the output pointer. + links.push(up_off!( + skel.progs_mut().probe_SSL_write_ex_enter(), + pid, + lib, + off.ssl_write + )?); + links.push(ur_off!( + skel.progs_mut().probe_SSL_write_ex_exit(), + pid, + lib, + off.ssl_write + )?); + } else { + links.push(up_off!( skel.progs_mut().probe_SSL_rw_enter(), pid, lib, off.ssl_write - )?, - ur_off!( + )?); + links.push(ur_off!( skel.progs_mut().probe_SSL_write_exit(), pid, lib, off.ssl_write - )?, - up_off!( + )?); + } + + if off.read_is_ex { + links.push(up_off!( + skel.progs_mut().probe_SSL_read_ex_enter(), + pid, + lib, + off.ssl_read + )?); + links.push(ur_off!( + skel.progs_mut().probe_SSL_read_ex_exit(), + pid, + lib, + off.ssl_read + )?); + } else { + links.push(up_off!( skel.progs_mut().probe_SSL_rw_enter(), pid, lib, off.ssl_read - )?, - ur_off!( + )?); + links.push(ur_off!( skel.progs_mut().probe_SSL_read_exit(), pid, lib, off.ssl_read - )?, - ]; + )?); + } + if handshake { links.push(up_off!( skel.progs_mut().probe_SSL_do_handshake_enter(), @@ -951,6 +1036,16 @@ fn attach_boringssl_by_offset( Ok(links) } +// ─── Codex offset table (Tier 3) ──────────────────────────────────────────── + +use super::codex_offsets::OffsetTable; + +static CODEX_OFFSET_TABLE: std::sync::LazyLock> = + std::sync::LazyLock::new(|| { + let json = include_str!("../../agentsight.json"); + OffsetTable::load(json) + }); + #[cfg(test)] mod tests { use super::*; @@ -1102,45 +1197,4 @@ mod tests { "buf_size clamped to available bytes" ); } - - #[test] - fn uprobe_path_canonicalize_resolves_real_path() { - // Test canonicalize-based path resolution for uprobe attach. - // canonicalize follows symlinks, resolving /proc/{pid}/root/... to host paths. - - // Case 1: Real path on host filesystem is resolved to itself - let host_path = "/usr/lib/x86_64-linux-gnu/libssl.so.3"; - if std::path::Path::new(host_path).exists() { - let resolved = std::fs::canonicalize(host_path) - .map(|p| p.to_string_lossy().into_owned()) - .unwrap_or_else(|_| host_path.to_string()); - // canonicalize of an existing absolute path returns the real path - assert!( - !resolved.contains("/proc/"), - "resolved path should not contain /proc/ prefix: {resolved}" - ); - } - - // Case 2: /proc/self/root/... resolves to host path (self is always valid) - let self_root_path = "/proc/self/root/etc/hosts"; - if std::path::Path::new(self_root_path).exists() { - let resolved = std::fs::canonicalize(self_root_path) - .map(|p| p.to_string_lossy().into_owned()) - .unwrap_or_else(|_| self_root_path.to_string()); - assert_eq!( - resolved, "/etc/hosts", - "/proc/self/root/etc/hosts should resolve to /etc/hosts" - ); - } - - // Case 3: Non-existent path falls back to original (simulates exited process) - let dead_proc_path = "/proc/999999999/root/usr/lib/libssl.so"; - let resolved = std::fs::canonicalize(dead_proc_path) - .map(|p| p.to_string_lossy().into_owned()) - .unwrap_or_else(|_| dead_proc_path.to_string()); - assert_eq!( - resolved, dead_proc_path, - "non-existent path should fall back to original" - ); - } } diff --git a/src/agentsight/src/response_map.rs b/src/agentsight/src/response_map.rs index 3b7ef85a4..295f82aad 100644 --- a/src/agentsight/src/response_map.rs +++ b/src/agentsight/src/response_map.rs @@ -11,6 +11,11 @@ //! → parse buf lines as JSON, extract "responseId" field //! → store responseId → sessionId in LRU cache //! ``` +//! +//! Codex CLI writes its rollout file as `rollout--.jsonl` and does +//! not embed an LLM `response_id` inside the JSONL, so we instead remember +//! the most-recent UUID per writing pid and look it up by the HTTP call's +//! pid as a fallback. use std::num::NonZeroUsize; @@ -32,11 +37,25 @@ static RESPONSE_ID_RE: Lazy = /// Only matches values starting with `msg_` to avoid false positives from other "id" fields. static ANTHROPIC_MSG_ID_RE: Lazy = Lazy::new(|| Regex::new(r#""id":"(msg_[^"]+)"#).unwrap()); +/// Regex to extract a trailing 36-char UUID from filenames like +/// `rollout-2026-06-24T20-08-10-019ef987-dbc1-7663-81b0-589cbe5e47e8.jsonl` +/// (Codex CLI session rollouts). Captured group is the UUID itself. +static TRAILING_UUID_RE: Lazy = Lazy::new(|| { + Regex::new(r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})$") + .unwrap() +}); + +/// Maximum number of pid → sessionId entries kept for codex-style lookups. +const MAX_PID_MAP_ENTRIES: usize = 1024; + /// Processes FileWrite events to build an in-memory responseId → sessionId mapping. /// Uses an LRU cache to bound memory usage. pub struct ResponseSessionMapper { /// responseId → sessionId (bounded by LRU eviction) map: LruCache, + /// pid → sessionId fallback used by agents (e.g. Codex CLI) whose + /// per-session rollout file does not embed an LLM `response_id`. + pid_map: LruCache, } impl Default for ResponseSessionMapper { @@ -50,6 +69,7 @@ impl ResponseSessionMapper { pub fn new() -> Self { ResponseSessionMapper { map: LruCache::new(NonZeroUsize::new(MAX_RESPONSE_MAP_ENTRIES).unwrap()), + pid_map: LruCache::new(NonZeroUsize::new(MAX_PID_MAP_ENTRIES).unwrap()), } } @@ -95,6 +115,11 @@ impl ResponseSessionMapper { self.map.put(response_id, session_id.clone()); } } + + // Always record the pid → session_id association so agents whose + // rollout file does not embed an LLM response_id (e.g. Codex CLI) + // can still resolve the session via their writing pid. + self.pid_map.put(event.pid, session_id); } /// Look up sessionId by responseId. @@ -103,21 +128,32 @@ impl ResponseSessionMapper { self.map.peek(response_id).map(|s| s.as_str()) } + /// Look up sessionId by writing pid. Used as a fallback for agents + /// (e.g. Codex CLI) whose rollout file does not embed a response_id. + pub fn get_session_by_pid(&self, pid: u32) -> Option<&str> { + self.pid_map.peek(&pid).map(|s| s.as_str()) + } + /// Extract UUID from a filename like `.jsonl` or `/path/to/.jsonl`. - /// Returns the UUID portion (without path prefix or `.jsonl` suffix). + /// Also recognizes Codex CLI's `rollout--.jsonl` form by + /// extracting the trailing 36-char UUID. fn extract_session_id(filename: &str) -> Option { // Take the last path component let basename = filename.rsplit('/').next().unwrap_or(filename); // Strip .jsonl suffix - let uuid = basename.strip_suffix(".jsonl")?; + let stem = basename.strip_suffix(".jsonl")?; - // Basic UUID length validation (36 chars: 8-4-4-4-12) - if uuid.len() == 36 { - Some(uuid.to_string()) - } else { - None + // Plain `.jsonl` (OpenClaw / Cosh / Claude Code) + if stem.len() == 36 { + return Some(stem.to_string()); } + + // Codex CLI: `rollout--.jsonl` — pull the trailing UUID. + TRAILING_UUID_RE + .captures(stem) + .and_then(|cap| cap.get(1)) + .map(|m| m.as_str().to_string()) } /// Extract "responseId" or "response_id" value from a single JSONL line using regex. @@ -156,6 +192,43 @@ mod tests { assert_eq!(id.as_deref(), Some("550e8400-e29b-41d4-a716-446655440000")); } + #[test] + fn test_extract_session_id_codex_rollout() { + // Codex CLI writes `rollout--.jsonl`; we want the trailing UUID. + let id = ResponseSessionMapper::extract_session_id( + "/root/.codex/sessions/2026/06/24/rollout-2026-06-24T20-08-10-019ef987-dbc1-7663-81b0-589cbe5e47e8.jsonl", + ); + assert_eq!(id.as_deref(), Some("019ef987-dbc1-7663-81b0-589cbe5e47e8")); + } + + #[test] + fn test_codex_pid_to_session_lookup() { + // Codex CLI's rollout file does not embed an LLM response_id, so the + // mapper must fall back to a pid-based lookup. + let mut mapper = ResponseSessionMapper::new(); + let event = FileWriteEvent { + pid: 391658, + tid: 391658, + uid: 0, + timestamp_ns: 0, + write_size: 0, + comm: "codex".to_string(), + filename: + "/root/.codex/sessions/2026/06/24/rollout-2026-06-24T20-08-10-019ef987-dbc1-7663-81b0-589cbe5e47e8.jsonl" + .to_string(), + cgroup_id: 0, + buf: br#"{"timestamp":"2026-06-24T12:08:10.991Z","type":"session_meta","payload":{"id":"019ef987-dbc1-7663-81b0-589cbe5e47e8"}} +"# + .to_vec(), + }; + mapper.process_filewrite(&event); + assert_eq!( + mapper.get_session_by_pid(391658), + Some("019ef987-dbc1-7663-81b0-589cbe5e47e8"), + ); + assert!(mapper.get_session_by_pid(99999).is_none()); + } + #[test] fn test_extract_session_id_with_path() { let id = ResponseSessionMapper::extract_session_id( diff --git a/src/agentsight/src/tokenizer/model_mapping.rs b/src/agentsight/src/tokenizer/model_mapping.rs index 23a7fef16..a6cb84362 100644 --- a/src/agentsight/src/tokenizer/model_mapping.rs +++ b/src/agentsight/src/tokenizer/model_mapping.rs @@ -86,6 +86,10 @@ static MODEL_MAPPING: Lazy> = Lazy::new(|| { m.insert("qwen3-14b-instruct", "Qwen/Qwen3-14B-Instruct"); m.insert("qwen3-32b-instruct", "Qwen/Qwen3-32B-Instruct"); m.insert("qwen3-72b-instruct", "Qwen/Qwen3-72B-Instruct"); + // Qwen 3 coder family (used by codex 0.137+ via dashscope) — fall back + // to the same-vocabulary Qwen2.5-Coder tokenizer for offline counting. + m.insert("qwen3-coder-plus", "Qwen/Qwen2.5-Coder-32B-Instruct"); + m.insert("qwen3-coder", "Qwen/Qwen2.5-Coder-32B-Instruct"); // Common variations m.insert("qwen3.5-plus", "Qwen/Qwen3.5-397B-A17B"); diff --git a/src/agentsight/src/unified.rs b/src/agentsight/src/unified.rs index 1e5064133..90355d41e 100644 --- a/src/agentsight/src/unified.rs +++ b/src/agentsight/src/unified.rs @@ -111,6 +111,7 @@ pub struct AgentSight { struct PendingGenAI { events: Vec, response_id: String, + pid: u32, created_at: std::time::Instant, } @@ -265,7 +266,11 @@ impl AgentSight { pid_agent_name_cache.insert(agent.pid, agent.agent_info.name.clone()); } for result in &conn_results { - let agent_name = format!("domain:{}", result.domain); + // Prefer cmdline agent name over domain fallback if the process matches a rule. + let agent_name = scanner + .try_match_process(result.pid) + .map(|a| a.agent_info.name) + .unwrap_or_else(|| format!("domain:{}", result.domain)); Self::attach_process_internal(&mut probes, result.pid, &agent_name); pid_agent_name_cache.insert(result.pid, agent_name); } @@ -683,7 +688,7 @@ impl AgentSight { } if !output.events.is_empty() { - if let Some(pending_resp_id) = output.pending_response_id { + if output.pending_response_id.is_some() { // Session_id not yet resolved — queue for deferred resolution. // Write a pending row NOW so crash detection can see this call // during the deferral window (up to PENDING_SESSION_TIMEOUT). @@ -699,12 +704,14 @@ impl AgentSight { } } else { log::warn!( - "Deferred GenAI call queued without pending_info (response_id={pending_resp_id}), crash detection blind spot remains" + "Deferred GenAI call queued without pending_info (response_id={}), crash detection blind spot remains", + output.pending_response_id.as_deref().unwrap_or("unknown") ); } self.pending_genai.push(PendingGenAI { events: output.events, - response_id: pending_resp_id, + response_id: output.pending_response_id.unwrap(), + pid: pending_info.as_ref().map(|p| p.pid as u32).unwrap_or(0), created_at: std::time::Instant::now(), }); log::debug!("GenAI events queued for deferred session_id resolution"); @@ -1243,29 +1250,39 @@ impl AgentSight { )> = Vec::new(); for (conn_id, state) in drained { - // Destructure to capture both request AND sse_events + // Destructure to capture both request AND sse_events. + // For compressed SSE streams that were still in progress when the + // process died (sse_events empty, compressed_buffer non-empty), + // decode the buffer here so token-usage data is not lost (#973). let (_state_name, request, sse_events) = match state { ConnectionState::RequestPending { request } => ("RequestPending", request, vec![]), ConnectionState::SseActive { request: Some(req), - response_headers, sse_events, - compressed_buffer, + compressed_buffer: Some(buf), content_encoding, - } => { - // A *compressed* SSE stream buffers raw bytes and only decodes at - // completion. If the PID died before the stream completed (e.g. - // HTTP/2, no `0\r\n\r\n` terminator), sse_events is empty and the - // body — model/tokens/output — would be lost on drain. Recover it - // via the same decode path as the live finalizer. - let events = drained_sse_events( - sse_events, - compressed_buffer, - content_encoding, - &response_headers, - ); - ("SseActive", req, events) + response_headers, + } if sse_events.is_empty() && !buf.is_empty() => { + // fix(#973): decode the unfinalized compressed buffer + // so drain-path token extraction can proceed. + let is_chunked = + crate::aggregator::HttpConnectionAggregator::is_chunked_response( + &response_headers, + ); + let decoded = + crate::aggregator::HttpConnectionAggregator::decode_compressed_sse( + &buf, + content_encoding.as_deref(), + is_chunked, + &response_headers.source_event, + ); + ("SseActive", req, decoded) } + ConnectionState::SseActive { + request: Some(req), + sse_events, + .. + } => ("SseActive", req, sse_events), _ => continue, }; @@ -1549,6 +1566,7 @@ impl AgentSight { if let Some(session_id) = self .response_mapper .get_session_by_response_id(&pending.response_id) + .or_else(|| self.response_mapper.get_session_by_pid(pending.pid)) .map(|s| s.to_string()) { // Resolved — update session_id in all event metadata @@ -1709,27 +1727,6 @@ impl Drop for AgentSight { } } -fn drained_sse_events( - sse_events: Vec, - compressed_buffer: Option>, - content_encoding: Option, - response_headers: &crate::parser::http::ParsedResponse, -) -> Vec { - match compressed_buffer { - Some(ref buf) if sse_events.is_empty() && !buf.is_empty() => { - let is_chunked = - crate::aggregator::HttpConnectionAggregator::is_chunked_response(response_headers); - crate::aggregator::HttpConnectionAggregator::decode_compressed_sse( - buf, - content_encoding.as_deref(), - is_chunked, - &response_headers.source_event, - ) - } - _ => sse_events, - } -} - /// Complete deferred GenAI events: promote pending DB rows to 'complete', /// then export to non-SQLite exporters (or FFI). /// @@ -1786,12 +1783,6 @@ mod tests { use super::*; use std::sync::atomic::{AtomicU32, Ordering}; - use crate::parser::http::ParsedResponse; - use crate::parser::sse::ParsedSseEvent; - use crate::probes::sslsniff::SslEvent; - use std::collections::HashMap; - use std::rc::Rc; - /// Generate a unique temp directory for each test invocation. fn unique_tmp_dir(tag: &str) -> PathBuf { static COUNTER: AtomicU32 = AtomicU32::new(0); @@ -1889,7 +1880,7 @@ mod tests { } #[test] - fn test_complete_pending_recovers_interrupted_row() { + fn test_complete_pending_skips_insert_when_row_already_interrupted() { let dir = unique_tmp_dir("cp-interrupted"); let db_path = dir.join("genai_events.db"); let store = Arc::new(GenAISqliteStore::new_with_path(&db_path).expect("create test store")); @@ -1902,11 +1893,11 @@ mod tests { .mark_pending_interrupted_for_pid(1234, "agent_crash") .expect("mark interrupted"); - // complete_pending should recover the interrupted row to 'complete' + // Now complete_pending should NOT create a duplicate row let event = GenAISemanticEvent::LLMCall(make_test_llm_call("call-1")); store.complete_pending(&event).expect("complete_pending"); - // Verify: exactly 1 row, status = complete (recovered from interrupted) + // Verify: exactly 1 row (no duplicate), promoted to complete. let conn = rusqlite::Connection::open(&db_path).unwrap(); let count: i64 = conn .query_row( @@ -2017,86 +2008,4 @@ mod tests { complete_deferred_genai(&[event], None, &exporters, None); } - - fn ssl_event() -> Rc { - Rc::new(SslEvent { - source: 0, - timestamp_ns: 0, - delta_ns: 0, - pid: 1, - tid: 1, - uid: 0, - len: 0, - rw: 0, - comm: String::new(), - buf: Vec::new(), - is_handshake: false, - ssl_ptr: 0x1, - }) - } - - /// A zstd-compressed, chunk-framed SSE body (the #973 shape). - fn chunked_zstd_sse() -> Vec { - let sse = b"event: message_start\ndata: {\"type\":\"message_start\"}\n\ndata: [DONE]\n\n"; - let comp = zstd::encode_all(&sse[..], 3).unwrap(); - let mut chunked = Vec::new(); - chunked.extend_from_slice(format!("{:x}\r\n", comp.len()).as_bytes()); - chunked.extend_from_slice(&comp); - chunked.extend_from_slice(b"\r\n0\r\n\r\n"); - chunked - } - - fn chunked_zstd_response() -> ParsedResponse { - let mut headers = HashMap::new(); - headers.insert("content-type".to_string(), "text/event-stream".to_string()); - headers.insert("content-encoding".to_string(), "zstd".to_string()); - headers.insert("transfer-encoding".to_string(), "chunked".to_string()); - ParsedResponse { - version: 11, - status_code: 200, - reason: "OK".to_string(), - headers, - body_offset: 0, - body_len: 0, - source_event: ssl_event(), - } - } - - #[test] - fn drained_sse_events_decodes_unfinalized_compressed_stream() { - // fix(#973): a compressed stream that died before finalizing (events empty, - // buffer non-empty) must be DECODED on drain, not lost. Reverting the drain - // decode yields an empty vec here, so this is discriminating. - let events = drained_sse_events( - vec![], - Some(chunked_zstd_sse()), - Some("zstd".to_string()), - &chunked_zstd_response(), - ); - assert!( - !events.is_empty(), - "compressed buffer must be decoded into events on drain" - ); - } - - #[test] - fn drained_sse_events_passes_through_when_no_buffer() { - // Uncompressed stream: no compressed_buffer -> nothing to decode. - let out = drained_sse_events(vec![], None, None, &chunked_zstd_response()); - assert!(out.is_empty()); - } - - #[test] - fn drained_sse_events_does_not_redecode_when_events_present() { - // Live parsing already produced events -> pass them through, don't re-decode. - let existing = vec![ParsedSseEvent::new(None, None, None, 0, 0, ssl_event())]; - let n = existing.len(); - let out = drained_sse_events( - existing, - Some(chunked_zstd_sse()), - Some("zstd".to_string()), - &chunked_zstd_response(), - ); - assert_eq!(out.len(), n, "non-empty events must pass through unchanged"); - } }