fix(mesh): eliminate memory leaks in two-layer tree sync protocol#1011
fix(mesh): eliminate memory leaks in two-layer tree sync protocol#1011
Conversation
Revert commit 6f5fa17 which added TreeSnapshot, SnapshotNode, from_snapshot, and merge_snapshot to kv_index. This code was never integrated into the mesh crate and will be replaced by a more efficient two-layer sync protocol (tenant deltas + periodic compressed structure snapshots). Signed-off-by: Simon Lin <simonslin@gmail.com> Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Add lightweight tenant change types for the two-layer radix tree sync protocol. TenantDelta contains only which tenants changed at which tree nodes — no tree structure, no prompt text. Wire size comparison: a TenantInsert with 100-char prefix is ~150 bytes vs a TreeOperation with 20k-char prompt at ~20KB+. At 200 rps, this reduces per-round sync from megabytes to ~20 KB. Types added: - TenantDelta: batched inserts + evictions per model per gossip round - TenantInsert: node_path + worker_url + epoch - TenantEvict: node_path + worker_url Signed-off-by: Simon Lin <simonslin@gmail.com> Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
On each tree operation, append a lightweight TenantInsert or TenantEvict to the tenant delta buffer in addition to the existing tree_ops_pending buffer. The collector will drain this buffer each gossip round and send TenantDelta messages instead of full TreeState. This is the sender-side preparation for the two-layer sync protocol: - tenant_delta_inserts: per-model buffer of (node_path, worker_url, epoch) - tenant_delta_evictions: per-model buffer of (node_path, worker_url) The buffers are ~100 bytes per entry vs ~200KB for TreeOperation with full prompt text. Signed-off-by: Simon Lin <simonslin@gmail.com> Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Add Phase 0 to the incremental collector that drains the tenant delta buffer and sends lightweight TenantDelta messages before the existing Phase 1 (tree_ops_pending delta) and Phase 2 (tree_configs full state). Models emitted as tenant deltas in Phase 0 are skipped in Phase 1, avoiding redundant full TreeState serialization. Phase 1+2 still serve as fallback for reconnected peers or when tenant delta buffer is empty (e.g., after checkpoint + buffer drain). Wire size reduction: ~100 bytes per TenantInsert vs ~200KB per TreeOperation with full prompt text. At 200 rps, this reduces per- round sync from megabytes to ~20 KB. Update tests to expect "tenant_delta" policy_type and clear tenant delta buffers in fallback tests. Signed-off-by: Simon Lin <simonslin@gmail.com> Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Add the receive side of the tenant delta protocol: - TreeStateSubscriber trait: add apply_tenant_delta() with default implementation that converts to TreeState operations for backward compatibility. CacheAwarePolicy can override with direct tree updates. - MeshSyncManager: add apply_remote_tenant_delta() that notifies subscribers and advances the tree version counter. - ping_server.rs: add "tenant_delta" branch to the policy state receiver, before the existing "tree_state_delta" and "tree_state" branches. The subscriber receives (node_path, worker_url, epoch) tuples and applies them directly to the local radix tree — no TreeState deserialization, no full tree rebuild, no CRDT operations. Signed-off-by: Simon Lin <simonslin@gmail.com> Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
…ix tree Wire the tenant delta receive path through to the actual radix tree: - mesh lib.rs: re-export TenantDelta, TenantInsert, TenantEvict - PolicyRegistry: implement apply_tenant_delta() that dispatches to CacheAwarePolicy on all policy slots (model, default, prefill, decode) - CacheAwarePolicy: implement apply_tenant_delta() that calls tree.insert_text(node_path, worker_url) directly — no TreeState deserialization, no TreeOperation conversion, no CRDT operations Each tenant insert goes directly from the wire to the radix tree in O(prefix_length) time. This replaces the old path: wire → bincode → TreeState → Vec<TreeOperation> → for_each → insert Eviction propagation is deferred to Step 6 (requires walking the tree to remove a worker from all nodes). Signed-off-by: Simon Lin <simonslin@gmail.com> Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Every 30 gossip rounds (~30s), skip the tenant delta for a model and let Phase 1/2 emit a full TreeState instead. This ensures: - New nodes joining the mesh get a full tree on first sync - Missed tenant deltas (network partition) are corrected within 30s - Tree structure changes (edge splits, new branches) propagate The tenant delta buffer is still drained to prevent accumulation, but the data is discarded for that round in favor of the full state. Configurable via STRUCTURE_SNAPSHOT_INTERVAL constant (default 30). Signed-off-by: Simon Lin <simonslin@gmail.com> Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Add Tree::remove_tenant_all() that removes a tenant from every node
in the radix tree. Used by CacheAwarePolicy::apply_tenant_delta() to
handle remote eviction events.
Previously, evictions were not propagated across mesh nodes — if node A
evicted a prefix, node B would re-introduce it on the next snapshot
merge (zombie tenant). With explicit eviction events in the tenant
delta protocol, evictions propagate correctly:
Node A evicts worker W → TenantEvict{worker_url: W} buffered
→ collector sends in TenantDelta → Node B receives
→ CacheAwarePolicy removes W from all tree nodes
Signed-off-by: Simon Lin <simonslin@gmail.com>
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
…ree-sync Applies all changes from commits 878f0c1..0733dc1 (the other engineer's implementation of the two-layer sync design): Layer 1 (every 1s): Tenant deltas with node_path_hash (8 bytes) + path_hash_index for receiver-side resolution. No prompt text on wire. Layer 2 (every 30s): LZ4-compressed TreeSnapshot via TreeSnapshotProvider. TreeSnapshot/from_snapshot/merge_snapshot restored from PR #974. Key differences from our previous approach: - TenantInsert uses ONLY node_path_hash (no truncated node_path text) - Receiver resolves hash via path_hash_index (populated on local inserts) - export_tree_state on TreeStateSubscriber for checkpoint (no tree_ops_pending) - path_hash_index stores matched prefix (~200 chars), not full prompt - blake3 hash instead of xxh3 for cross-platform determinism 792 tests passing (168 kv-index + 174 mesh + 450 gateway). Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
The ping_server snapshot exchange cloned tree_configs bytes on every ping round (~1/s per peer reconnect). With large trees from 200 rps 80k-char traffic, this caused continuous memory growth to 6+ GB on the receiver side even after traffic stopped. Tree data is synced via Layer 1 (tenant deltas) and Layer 2 (periodic compressed snapshots). The legacy tree_configs path in the snapshot exchange is removed. Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
checkpoint_tree_states called export_tree_state() every 10 gossip rounds, which walked the entire radix tree and reconstructed TreeState with full prompt text as TreeKey::Text for every leaf. With 2000 entries of 80k chars, this allocated ~160 MB every 10 seconds, causing 2+ GB memory usage that kept growing. Tree data is synced via Layer 1 (tenant deltas, 8-byte hashes) and Layer 2 (LZ4-compressed TreeSnapshot every 30 rounds). checkpoint is no longer needed — tree_configs is not used for tree data transmission. Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
C1: checkpoint_tree_states now exports compact TreeSnapshot (shared prefix edges) from subscribers via new export_tree_snapshot trait method, stores in tree_configs. Phase 2 sends as tree_state_lz4 with LZ4 compression. No full prompt text reconstruction. C2: controller.rs incremental stream handler now handles tenant_delta and tree_state_lz4 message types (was falling through to CRDT policy store). Mirrors ping_server.rs logic. C3: select_worker_min_load text path now populates path_hash_index so tenant deltas from the imbalanced-load path are resolvable on receiver side. H3: hash_node_path and hash_token_path return 1 instead of 0 when hash collides with GLOBAL_EVICTION_HASH (0 = global eviction sentinel). Also: TreeState::from_snapshot() conversion for receivers that get TreeSnapshot bytes in tree_state_lz4 payloads. materialize_tree_state handles both TreeState and TreeSnapshot bytes in tree_configs. Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
…y leak) select_worker_min_load was inserting text.to_string() (the full 80k+ char prompt) into path_hash_index. At 200 rps under load imbalance, this accumulated 16 MB/s until the next eviction clear(). This is the same class of memory leak we fixed everywhere else. Fix: skip path_hash_index population in the min-load path (no match result available to extract the matched prefix). Layer 2 snapshots handle convergence for these entries. Also: version comparison in apply_remote_tree_operation now falls back to the authoritative atomic tree_version counter when tree_configs holds TreeSnapshot bytes (which can't be deserialized as TreeState). Prevents stale remote state from overwriting newer local state. Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
checkpoint_tree_states was calling export_tree_snapshot + to_bytes + tree_configs.insert every 10 gossip rounds unconditionally. With a large tree (~50 MB snapshot), this caused continuous memory growth from allocation churn even when no requests were arriving. Add last_checkpoint_gen counter to StateStores. Checkpoint now compares tree_generation against last checkpoint — if unchanged, returns immediately. This eliminates all per-round allocations when idle. Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Phase 2 in the collector was reading tree_configs (31 MB TreeSnapshot), LZ4-compressing, serializing, pushing to updates — only to have the controller reject it for exceeding 10 MB. Since mark_sent was never called, last_sent_version stayed at 0 and the cycle repeated every round (~1/s), allocating 31 MB of garbage per second. Fix: check compressed size before adding to updates. If > 8 MB, skip and advance last_sent_version (via write lock upgrade) to stop the retry loop. The oversized snapshot is effectively dropped — Layer 1 tenant deltas handle steady-state sync, and the tree will eventually shrink after eviction, at which point the snapshot will fit. Also: checkpoint_tree_states skips when tree_generation unchanged. GC clears tree_configs to force re-export after eviction. Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
… logs
Two changes:
1. sync_insert_hash: new lightweight sync path that accepts a
pre-computed hash + tenant instead of TreeKey::Text(text.to_string()).
The main text routing path (select_worker_with_text) now uses this,
avoiding 80k+ String allocation on every request. At 200 rps that was
16 MB/s of allocator churn causing glibc malloc fragmentation
(RSS grows and never returns to OS).
The token path and min-load path still use sync_insert_operation with
TreeKey since they need different handling.
2. Memory debug instrumentation:
- Controller GC cycle logs: tree_configs entries/bytes, tree_versions,
tenant_delta_inserts/evictions counts, policy/worker CRDT sizes
- Eviction task logs: string_tree/token_tree model counts,
path_hash_index entry count
These fire every 60s (GC) and every eviction_interval_secs (300s
default), providing visibility into memory usage without code changes.
Refs: #983
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Add INFO-level logs at every allocation/lifecycle point: Controller: - spawn_sync_stream_handler: peer name + active connection count - Handler exit: peer name - IncrementalUpdateCollector creation: per-peer - GC cycle: CRDT policy oplog length, sync_connections size, retry count Collector: - Phase 0 summary: tenant delta update count + total bytes - Phase 2 summary: tree_configs update count + total bytes - Oversized snapshot skip: promoted to INFO with structured fields Sync: - sync_tree_insert_hash: every 1000 calls, log total + buffer sizes - sync_tree_operation: every 1000 calls, same - apply_remote_tenant_delta: every 100 calls, log applied/skipped/resolved/unresolved Eviction task: - Tree model counts + path_hash_index size These logs fire periodically (not every request) to minimize overhead while providing full visibility into memory usage patterns. Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
… per cycle The debug log I added called tree.snapshot() every eviction cycle to log node_count and edge_bytes. With 3500+ unique 80k-char prompt leaves, snapshot() cloned 170 MB of edge text into a temporary Vec. Over 10 minutes this caused ~850 MB of allocator fragmentation that glibc never returned to the OS. This was the 1.7 GB vs 270 Mi difference between mesh ON and mesh OFF. The snapshot call only existed in the mesh ON test image (added in snapshot-21), not in the mesh OFF baseline image (snapshot-19). Replace with lightweight model count log (no tree walk, no allocation). Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
checkpoint_tree_states called export_tree_snapshot → tree.snapshot() every 10 gossip rounds. With 3500+ unique 80k-char prompt leaves, this allocated 170 MB of edge text into a Vec<SnapshotNode>, serialized it, and stored in tree_configs — every 10 seconds. The serialized bytes were then read by the collector, LZ4-compressed to 34 MB, found to exceed the 8 MB limit, and skipped. The entire 170 MB allocation was wasted work causing allocator fragmentation. Tree data syncs via Layer 1 (tenant deltas, ~50 bytes per insert). Layer 2 (full tree snapshots) is deferred to future work — options include chunked snapshots or incremental tree diffs. Also removed the tree.snapshot() call from the eviction debug log (same 170 MB allocation issue). Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
…nitoring Remove: - Static atomic counters in sync.rs (TREE_INSERT_CALL_COUNT, etc) - Per-1000-call periodic logs in sync_tree_insert_hash/sync_tree_operation - Per-100-call periodic logs in apply_remote_tenant_delta - sync_connections/retry count logs in controller GC cycle Downgrade to DEBUG: - spawn_sync_stream_handler/collector created/handler exited logs - Phase 0/Phase 2 summary logs in incremental.rs (fire every round) Keep at INFO (fires every 60s, useful for ops): - Mesh memory: tree_configs, tree_versions, tenant_inserts, policy/worker CRDT - CRDT policy oplog length - Tree memory: model counts, path_hash_index size Refs: #983 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds tenant-level delta buffering and LZ4-compressed full-structure snapshots to mesh sync, new tenant/hash types and LZ4 helpers, subscriber/export hooks, collector/handler changes, policy integration (hash-based sync and apply_tenant_delta), workspace lz4 dependency, and a Tree removal helper. (50 words) Changes
Sequence Diagram(s)sequenceDiagram
actor Controller
participant IncrementalCollector
participant SyncManager
participant PolicyRegistry
participant RemoteNode
Controller->>IncrementalCollector: collect_updates_for_store(Policy)
alt tenant-delta path
IncrementalCollector->>IncrementalCollector: drain tenant_delta_inserts/evictions
IncrementalCollector->>Controller: emit PolicyState(policy_type="tenant_delta", config=TenantDelta)
Controller->>RemoteNode: send tenant_delta
RemoteNode->>SyncManager: apply_remote_tenant_delta(TenantDelta, actor)
SyncManager->>PolicyRegistry: TreeStateSubscriber::apply_tenant_delta(...)
else full-snapshot path
IncrementalCollector->>IncrementalCollector: materialize tree_configs (TreeState|TreeSnapshot)
IncrementalCollector->>IncrementalCollector: lz4_compress(bytes)
IncrementalCollector->>Controller: emit PolicyState(policy_type="tree_state_lz4", config=lz4_bytes)
Controller->>RemoteNode: send tree_state_lz4
RemoteNode->>SyncManager: apply_remote_tree_operation(lz4 bytes)
SyncManager->>SyncManager: lz4_decompress -> try TreeState -> fallback TreeSnapshot->TreeState
SyncManager->>PolicyRegistry: apply_remote_tree_operation(...)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a two-layer synchronization mechanism for radix trees, utilizing lightweight tenant deltas for high-frequency updates and LZ4-compressed snapshots for periodic convergence. It aims to reduce memory churn by replacing full prompt text with path hashes and buffering changes. However, critical feedback indicates that the current implementation risks data loss and infinite retry loops by skipping mark_sent for failed tree updates. Furthermore, clearing tree_configs during garbage collection while leaving checkpoint_tree_states as a no-op effectively deletes authoritative state periodically, and the omission of hash index population in certain paths leads to permanent state divergence.
| let is_tree_update = | ||
| updates.iter().any(|u| u.key.starts_with("tree:")); | ||
| if is_tree_delta { | ||
| // Force trim the pending buffer to reduce size for next round. | ||
| for u in updates { | ||
| if u.key.starts_with("tree:") { | ||
| if let Some(mut pending) = stores_for_trim.tree_ops_pending.get_mut(&u.key) { | ||
| let len = pending.len(); | ||
| if len > 100 { | ||
| pending.drain(..len / 2); | ||
| log::info!("Force-trimmed oversized tree pending buffer for {}: {} -> {}", u.key, len, pending.len()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| if !is_tree_update { | ||
| collector.mark_sent(*store_type, updates); | ||
| } |
There was a problem hiding this comment.
Skipping mark_sent for tree updates when a batch fails validation (e.g., due to size) leads to data loss for tenant_delta updates. In incremental.rs, Phase 0 drains the tenant_delta_inserts/evictions buffers during collection. If mark_sent is skipped here, last_sent is not updated, but the next collection cycle will find the buffers empty and thus won't include the missing data in the next batch. For tree_configs (Phase 2), which is not drained, this logic creates an infinite retry loop where the same oversized snapshot is serialized, compressed, and rejected every second, causing persistent CPU and memory churn. It is safer to mark the batch as sent to avoid the loop, as Layer 2 snapshots are intended to handle eventual convergence for missed deltas.
| pub fn checkpoint_tree_states(&self) { | ||
| // No-op: tree data syncs via Layer 1 (tenant deltas, ~50 bytes each). | ||
| // Layer 2 (full tree snapshots) is deferred — the snapshot can be | ||
| // 170+ MB for large trees, and the checkpoint allocation every 10s | ||
| // causes allocator fragmentation. Future: chunked or incremental | ||
| // tree diffs for convergence (options 2/3 in the design doc). | ||
| } |
There was a problem hiding this comment.
Making checkpoint_tree_states a no-op disables the export of the local radix tree to the mesh synchronization layer (Layer 2). While this avoids memory pressure from large snapshots, it breaks convergence for new nodes joining the cluster or recovery after network partitions, as they will never receive the full state of existing trees. Furthermore, since IncrementalUpdateCollector Phase 0 (tenant deltas) relies on the path_hash_index which is not populated for all request types (e.g., imbalanced load or tokens), Layer 2 snapshots are the only mechanism for these updates to eventually synchronize. Disabling them entirely leads to permanent state divergence in the mesh.
| // Don't populate path_hash_index here — we don't have a | ||
| // match result and storing the full prompt text (80k+ chars) | ||
| // would recreate the memory leak. Layer 2 snapshots handle | ||
| // convergence for entries from the imbalanced-load path. | ||
|
|
There was a problem hiding this comment.
The decision to skip populating path_hash_index in select_worker_min_load (and similarly in select_worker_with_tokens) breaks Layer 1 synchronization for these requests. Since the receiver of a TenantDelta must resolve the node_path_hash using its local path_hash_index, it will silently drop any updates for hashes it hasn't seen in a balanced-load text path. The comment suggests that Layer 2 snapshots will handle convergence, but since checkpoint_tree_states has been made a no-op in this PR, these updates will never be synchronized, leading to inconsistent cache state across the mesh.
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
| // Lightweight tenant delta — no tree structure, no prompt text | ||
| if let Ok(delta) = | ||
| super::tree_ops::TenantDelta::from_bytes( | ||
| &policy_state.config, | ||
| ) | ||
| { | ||
| sync_manager | ||
| .apply_remote_tenant_delta( | ||
| delta, actor, | ||
| ); | ||
| } |
There was a problem hiding this comment.
🟡 Nit: Deserialization failures for tenant_delta are silently swallowed here (if let Ok). The equivalent handler in controller.rs (line 687-701) properly logs failures with log::warn!. This inconsistency makes debugging protocol issues harder — a corrupt delta on the ping path produces no log output.
| // Lightweight tenant delta — no tree structure, no prompt text | |
| if let Ok(delta) = | |
| super::tree_ops::TenantDelta::from_bytes( | |
| &policy_state.config, | |
| ) | |
| { | |
| sync_manager | |
| .apply_remote_tenant_delta( | |
| delta, actor, | |
| ); | |
| } | |
| // Lightweight tenant delta — no tree structure, no prompt text | |
| match super::tree_ops::TenantDelta::from_bytes( | |
| &policy_state.config, | |
| ) | |
| { | |
| Ok(delta) => { | |
| sync_manager | |
| .apply_remote_tenant_delta( | |
| delta, actor, | |
| ); | |
| } | |
| Err(e) => { | |
| log::warn!( | |
| "Failed to deserialize tenant delta for model {}: {e}", | |
| policy_state.model_id | |
| ); | |
| } | |
| } |
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/mesh/src/ping_server.rs (1)
464-476:⚠️ Potential issue | 🟠 MajorDon't mark oversized tree batches as sent.
For policy batches that contain
tree:updates, this suppresses the resend path when several individually-validtree_state_lz4payloads exceed the gRPC cap together.crates/mesh/src/controller.rsalready special-cases tree updates here;sync_streamneeds the same guard or batch splitting so those snapshots are not dropped for the peer.Possible fix
if let Err(e) = size_validator_clone.validate(batch_size) { log::warn!( "Incremental update too large, skipping store {:?}: {} (max: {} bytes)", store_type, e, size_validator_clone.max_size() ); - // Mark as sent to prevent infinite retry loop. - // Without this, the same oversized update is re-collected, - // re-serialized, and re-skipped every second forever, - // burning CPU and memory. - collector.mark_sent(store_type, &updates); + let is_tree_update = + updates.iter().any(|u| u.key.starts_with("tree:")); + if !is_tree_update { + collector.mark_sent(store_type, &updates); + } continue; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/ping_server.rs` around lines 464 - 476, The code currently marks oversized batches as sent in the sync_stream path which drops policy batches that contain tree snapshots; instead, modify the branch around size_validator_clone.validate(...) so that before calling collector.mark_sent(store_type, &updates) you check whether the batch contains any tree updates (e.g. inspect updates for keys/variants like "tree" or payloads such as tree_state_lz4) and if so do NOT mark them as sent — either leave them for resend or split the batch into smaller chunks; mirror the special-case logic used in controller.rs for tree updates by detecting tree updates in the failing batch and skipping only the mark_sent step (or invoke batch-splitting logic) so tree snapshots are not suppressed.crates/mesh/src/stores.rs (1)
858-890:⚠️ Potential issue | 🔴 CriticalDon't clear all
tree_configshere.After
tree_configs.clear(), the tworetain()calls treat every quiescent model as orphaned and drop itstree_versionsentry too. The next local op on a model that previously learned versionNfrom a remote snapshot will then re-seed from0inbump_tree_version(), so peers already at versionNwill ignore the newversion=1update. Invalidate only the models whose snapshot is actually stale, or preservetree_versionswhen evicting cached bytes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/stores.rs` around lines 858 - 890, The current unconditional self.tree_configs.clear() causes retain() to drop tree_versions for quiescent models and resets peers when bump_tree_version() later reseeds from 0; instead, only invalidate entries whose cached snapshot bytes were actually evicted or preserve their version counters when evicting. Concretely: replace the blanket clear with logic that iterates the keys you evicted and removes only those entries from tree_configs (or mark them as stale), and/or ensure that before removing a tree_configs entry you do NOT remove its tree_versions entry (keep tree_versions[k] if present) so retain() and subsequent bump_tree_version() keep the learned version; use the existing maps tree_configs, tree_versions, tree_ops_pending and the bump_tree_version() call sites to implement the selective eviction.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/kv_index/src/snapshot.rs`:
- Around line 3-8: Update the top-of-file doc comment to remove the "not yet
wired into the mesh sync protocol" phrasing and instead note a partial rollout:
document that TreeSnapshot is already consumed on the mesh receive path
(ping_server) and exposed via TreeStateSubscriber::export_tree_snapshot, and
rephrase the status line to indicate "Partial rollout / in use" while keeping
the rest of the explanation about expected wire-size savings.
In `@crates/mesh/src/controller.rs`:
- Around line 518-526: The current logic using is_tree_update to skip
collector.mark_sent for the entire updates batch blocks non-tree policy entries;
change controller.rs to split the incoming updates (the updates variable) into
tree_updates = updates.iter().filter(|u| u.key.starts_with("tree:")) and
non_tree_updates = the rest, then call collector.mark_sent(*store_type,
&non_tree_updates) for any non-tree items so they are acknowledged even when
tree_updates require retry; proceed to validate/send tree_updates separately
(and only retry those) so unrelated StoreType::Policy changes are not held up by
oversized tree payloads, leaving collector.mark_sent and is_tree_update logic
otherwise untouched.
In `@crates/mesh/src/incremental.rs`:
- Around line 272-286: The code clears tenant_delta_inserts and
tenant_delta_evictions whenever STRUCTURE_SNAPSHOT_INTERVAL triggers, but since
checkpoint_tree_states() is a no-op and Phase 2 only replays
stores.tree_configs, that unconditional removal can permanently drop local
mutations; change the branch in the loop (the block using rounds,
STRUCTURE_SNAPSHOT_INTERVAL, and model_id) to only drain/remove the tenant_delta
buffers if we can guarantee Phase 2 will emit a fresh TreeState (e.g., check
that stores.tree_configs contains model_id or that checkpoint_tree_states()
indicates a snapshot will be produced); otherwise leave the buffers intact (and
optionally log a warning) so local deltas aren’t lost.
- Around line 291-302: The code currently calls remove() on
self.stores.tenant_delta_inserts and tenant_delta_evictions before sending,
which makes deltas non-retryable if send fails; instead, do not remove the
vectors until the send succeeds (i.e., until mark_sent() is called). Change the
collector logic around tenant_delta_inserts/tenant_delta_evictions so you either
(a) obtain a reference (get_mut or entry().or_default()) and clone or take()
into a temporary buffer only when send succeeds, or (b) move the drained batch
into a retry queue owned by the collector (e.g., a per-model retry VecDeque) and
only remove the original entry after successful send/mark_sent(); ensure
sync_stream send error paths (TrySendError::Full, closed, rejected size) push
the batch into the retry queue and do not drop the original data so retries
preserve inserts/evictions.
In `@crates/mesh/src/sync.rs`:
- Around line 523-540: The current change collapses TreeKey::Tokens into a
hash-only TenantInsert (via tenant_delta_inserts), which breaks token-tree
reconstruction because CacheAwarePolicy::apply_tenant_delta and snapshot
exporters only handle text hashes into string_trees; update the sync logic so
token ops are not reduced to hash-only inserts: either route TreeKey::Tokens
into the existing full-state/TreeOperation path (preserving the original token
payload) instead of pushing into tenant_delta_inserts, or extend TenantInsert to
include a key_kind discriminator and token-tree payload and implement the
corresponding apply/export handlers (update CacheAwarePolicy::apply_tenant_delta
and the snapshot exporters to recognize the token kind and write into the
token-tree structures rather than string_trees). Ensure references to
hash_node_path/hash_token_path and tenant_delta_inserts/TenantInsert are
adjusted consistently so peers can reconstruct token affinity.
- Around line 842-846: The current loop holds the tree_state_subscribers read
guard while calling subscriber.apply_tenant_delta, which can deadlock if
callbacks try to acquire the write side; instead snapshot/clone the subscriber
list while holding the read lock and then drop the guard before iterating and
invoking callbacks. Concretely, in sync.rs replace the direct read+iteration on
self.tree_state_subscribers with code that reads the guard into a local cloned
vector (or collected Vec) and then iterates over that snapshot, invoking
apply_tenant_delta on each subscriber so the read guard is not held during
callbacks.
In `@model_gateway/src/policies/cache_aware.rs`:
- Around line 81-85: The path_hash_index field on CacheAwarePolicy is currently
a global Arc<DashMap<u64, String>> so identical prompt hashes from different
models collide; update the data structure to namespace by model_id (e.g.,
Arc<DashMap<(ModelIdType, u64), String>> or Arc<DashMap<ModelIdType,
DashMap<u64, String>>>), update all uses (construction, inserts on local
inserts, lookups in apply_tenant_delta, and eviction bounded by max_tree_size)
to include the model_id key, and ensure functions like apply_tenant_delta and
any code at the other referenced sites (lines ~403-405, ~884-886) use the new
composite key or per-model map when resolving tenant delta hashes so entries for
different models no longer overwrite each other.
In `@model_gateway/src/policies/registry.rs`:
- Around line 783-789: The current assertion using
mesh_sync.get_tree_state(UNKNOWN_MODEL_ID).is_none() is insufficient because it
can pass when the mesh hook is not wired; instead, in
test_set_mesh_sync_propagates_to_default_cache_aware_policy assert a mesh-only
side effect such as that the backing StateStores tree version advanced or that
the tenant-delta buffer was populated after calling set_mesh_sync /
sync_tree_operation. Concretely, replace or supplement the get_tree_state
assertion with a check against the backing store used by mesh_sync (e.g. query
the StateStores instance or its tree version for UNKNOWN_MODEL_ID) or assert the
tenant-delta buffer contains entries for the tenant, ensuring you reference
mesh_sync, set_mesh_sync, sync_tree_operation, UNKNOWN_MODEL_ID and the
StateStores backing object in the updated assertion.
---
Outside diff comments:
In `@crates/mesh/src/ping_server.rs`:
- Around line 464-476: The code currently marks oversized batches as sent in the
sync_stream path which drops policy batches that contain tree snapshots;
instead, modify the branch around size_validator_clone.validate(...) so that
before calling collector.mark_sent(store_type, &updates) you check whether the
batch contains any tree updates (e.g. inspect updates for keys/variants like
"tree" or payloads such as tree_state_lz4) and if so do NOT mark them as sent —
either leave them for resend or split the batch into smaller chunks; mirror the
special-case logic used in controller.rs for tree updates by detecting tree
updates in the failing batch and skipping only the mark_sent step (or invoke
batch-splitting logic) so tree snapshots are not suppressed.
In `@crates/mesh/src/stores.rs`:
- Around line 858-890: The current unconditional self.tree_configs.clear()
causes retain() to drop tree_versions for quiescent models and resets peers when
bump_tree_version() later reseeds from 0; instead, only invalidate entries whose
cached snapshot bytes were actually evicted or preserve their version counters
when evicting. Concretely: replace the blanket clear with logic that iterates
the keys you evicted and removes only those entries from tree_configs (or mark
them as stale), and/or ensure that before removing a tree_configs entry you do
NOT remove its tree_versions entry (keep tree_versions[k] if present) so
retain() and subsequent bump_tree_version() keep the learned version; use the
existing maps tree_configs, tree_versions, tree_ops_pending and the
bump_tree_version() call sites to implement the selective eviction.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9a54390e-bcd7-403e-b18d-796e4377a9ac
📒 Files selected for processing (14)
Cargo.tomlcrates/kv_index/src/snapshot.rscrates/kv_index/src/string_tree.rscrates/mesh/Cargo.tomlcrates/mesh/src/controller.rscrates/mesh/src/incremental.rscrates/mesh/src/lib.rscrates/mesh/src/ping_server.rscrates/mesh/src/stores.rscrates/mesh/src/sync.rscrates/mesh/src/tree_ops.rsdeploy/helm/smg/templates/_helpers.tplmodel_gateway/src/policies/cache_aware.rsmodel_gateway/src/policies/registry.rs
| /// Remove a tenant from all nodes in the tree, including root. | ||
| /// Used for mesh eviction propagation — when a remote node reports | ||
| /// that a worker evicted all its cached prefixes. | ||
| pub fn remove_tenant_all(&self, tenant_id: &TenantId) { | ||
| // collect_tenant_nodes skips root (root is never LRU-evicted), | ||
| // but global removal must include it. | ||
| self.remove_tenant_from_node(&self.root, tenant_id); | ||
|
|
||
| let mut nodes: Vec<(NodeRef, u64)> = Vec::new(); | ||
| self.collect_tenant_nodes(&self.root, tenant_id, &mut nodes); | ||
| for (node, _) in &nodes { | ||
| self.remove_tenant_from_node(node, tenant_id); | ||
| } | ||
| self.tenant_char_count.remove(tenant_id); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider cleaning up empty nodes after tenant removal.
The implementation correctly removes the tenant from all nodes and clears the char count. However, unlike evict_tenant_by_size (lines 811-817), this method does not clean up nodes that become empty (no tenants AND no children) after removal.
For mesh eviction propagation, this may be acceptable if:
- Empty nodes will eventually be cleaned by subsequent LRU eviction cycles, or
- The nodes still serve as structural paths for other tenants
If this is intentional for the mesh sync use case, a brief comment would clarify. Otherwise, consider adding cleanup logic to prevent orphaned nodes from accumulating.
| // Mark non-tree stores as sent to prevent infinite | ||
| // retry loops (PR #808). Tree updates (tenant deltas, | ||
| // structure snapshots) are retried next round with | ||
| // updated data from the live tree. | ||
| let is_tree_update = | ||
| updates.iter().any(|u| u.key.starts_with("tree:")); | ||
| if is_tree_delta { | ||
| // Force trim the pending buffer to reduce size for next round. | ||
| for u in updates { | ||
| if u.key.starts_with("tree:") { | ||
| if let Some(mut pending) = stores_for_trim.tree_ops_pending.get_mut(&u.key) { | ||
| let len = pending.len(); | ||
| if len > 100 { | ||
| pending.drain(..len / 2); | ||
| log::info!("Force-trimmed oversized tree pending buffer for {}: {} -> {}", u.key, len, pending.len()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| if !is_tree_update { | ||
| collector.mark_sent(*store_type, updates); | ||
| } |
There was a problem hiding this comment.
Don't let oversized tree payloads block unrelated policy updates.
StoreType::Policy batches can contain both tree:* entries and regular policy configs. With is_tree_update = any(...), one oversized tree item suppresses mark_sent() for the whole batch, so unrelated policy changes can sit behind that tree retry loop indefinitely. Please split tree vs non-tree policy updates before validating/sending, or at least mark the non-tree subset as sent here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/mesh/src/controller.rs` around lines 518 - 526, The current logic
using is_tree_update to skip collector.mark_sent for the entire updates batch
blocks non-tree policy entries; change controller.rs to split the incoming
updates (the updates variable) into tree_updates = updates.iter().filter(|u|
u.key.starts_with("tree:")) and non_tree_updates = the rest, then call
collector.mark_sent(*store_type, &non_tree_updates) for any non-tree items so
they are acknowledged even when tree_updates require retry; proceed to
validate/send tree_updates separately (and only retry those) so unrelated
StoreType::Policy changes are not held up by oversized tree payloads, leaving
collector.mark_sent and is_tree_update logic otherwise untouched.
| let inserts = self | ||
| .stores | ||
| .tenant_delta_inserts | ||
| .remove(&model_id) | ||
| .map(|(_, v)| v) | ||
| .unwrap_or_default(); | ||
| let evictions = self | ||
| .stores | ||
| .tenant_delta_evictions | ||
| .remove(&model_id) | ||
| .map(|(_, v)| v) | ||
| .unwrap_or_default(); |
There was a problem hiding this comment.
Collecting with remove() makes tenant deltas non-retryable.
These buffers are consumed before the sender knows whether the batch was accepted. If sync_stream hits TrySendError::Full, the channel closes, or the batch is rejected for size, mark_sent() is never called but the vectors are already gone, so the peer permanently misses those inserts/evictions. Keep the drained batch until send success, or move it into a retry queue owned by the collector.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/mesh/src/incremental.rs` around lines 291 - 302, The code currently
calls remove() on self.stores.tenant_delta_inserts and tenant_delta_evictions
before sending, which makes deltas non-retryable if send fails; instead, do not
remove the vectors until the send succeeds (i.e., until mark_sent() is called).
Change the collector logic around tenant_delta_inserts/tenant_delta_evictions so
you either (a) obtain a reference (get_mut or entry().or_default()) and clone or
take() into a temporary buffer only when send succeeds, or (b) move the drained
batch into a retry queue owned by the collector (e.g., a per-model retry
VecDeque) and only remove the original entry after successful send/mark_sent();
ensure sync_stream send error paths (TrySendError::Full, closed, rejected size)
push the batch into the retry queue and do not drop the original data so retries
preserve inserts/evictions.
| // Buffer a lightweight tenant delta — 24 bytes per insert (hash + epoch) | ||
| // instead of 80k+ bytes (full prompt text). | ||
| match &operation { | ||
| TreeOperation::Insert(insert) => { | ||
| let path_hash = match &insert.key { | ||
| TreeKey::Text(text) => hash_node_path(text), | ||
| TreeKey::Tokens(tokens) => hash_token_path(tokens), | ||
| }; | ||
| self.stores | ||
| .tenant_delta_inserts | ||
| .entry(model_id.clone()) | ||
| .or_default() | ||
| .push(TenantInsert { | ||
| node_path_hash: path_hash, | ||
| worker_url: insert.tenant.clone(), | ||
| epoch: self.stores.tree_version(&key), | ||
| }); | ||
| } |
There was a problem hiding this comment.
Hash-only tenant deltas lose token-tree sync.
TreeKey::Tokens is now collapsed into the same TenantInsert { node_path_hash, ... } shape as text, but the only subscriber-side resolver (CacheAwarePolicy::apply_tenant_delta) can resolve text hashes only and writes into string_trees. The new snapshot exporters also walk only string_trees. That leaves gRPC/token-tree updates with no reconstruction path on peers, so token affinity will silently diverge. Keep token ops on the old TreeOperation/full-state path, or add a key-kind plus token-tree apply/export path before hashing them.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/mesh/src/sync.rs` around lines 523 - 540, The current change collapses
TreeKey::Tokens into a hash-only TenantInsert (via tenant_delta_inserts), which
breaks token-tree reconstruction because CacheAwarePolicy::apply_tenant_delta
and snapshot exporters only handle text hashes into string_trees; update the
sync logic so token ops are not reduced to hash-only inserts: either route
TreeKey::Tokens into the existing full-state/TreeOperation path (preserving the
original token payload) instead of pushing into tenant_delta_inserts, or extend
TenantInsert to include a key_kind discriminator and token-tree payload and
implement the corresponding apply/export handlers (update
CacheAwarePolicy::apply_tenant_delta and the snapshot exporters to recognize the
token kind and write into the token-tree structures rather than string_trees).
Ensure references to hash_node_path/hash_token_path and
tenant_delta_inserts/TenantInsert are adjusted consistently so peers can
reconstruct token affinity.
| /// Hash → matched prefix index for resolving tenant delta hashes. | ||
| /// Populated on local inserts with the MATCHED PREFIX from the radix | ||
| /// tree (not the full prompt text). Consumed on remote tenant delta | ||
| /// application. Bounded by eviction at `max_tree_size` entries. | ||
| path_hash_index: Arc<DashMap<u64, String>>, |
There was a problem hiding this comment.
Scope path_hash_index by model.
CacheAwarePolicy keeps separate trees per model, but this index is keyed only by u64. The same prompt hash used in two models will overwrite one another, and apply_tenant_delta() can then resolve model A’s delta against model B’s prefix and insert the tenant into the wrong node. Namespace the hash by model_id (or keep a per-model map) before using it for delta resolution.
💡 Minimal shape change
- path_hash_index: Arc<DashMap<u64, String>>,
+ path_hash_index: Arc<DashMap<(String, u64), String>>,
...
- if let Some(path_entry) = self.path_hash_index.get(&insert.node_path_hash) {
+ if let Some(path_entry) = self
+ .path_hash_index
+ .get(&(model_id.to_string(), insert.node_path_hash))
+ {
...
- self.path_hash_index.insert(path_hash, matched_prefix);
+ self.path_hash_index
+ .insert((model_id.to_string(), path_hash), matched_prefix);Also applies to: 403-405, 884-886
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/policies/cache_aware.rs` around lines 81 - 85, The
path_hash_index field on CacheAwarePolicy is currently a global Arc<DashMap<u64,
String>> so identical prompt hashes from different models collide; update the
data structure to namespace by model_id (e.g., Arc<DashMap<(ModelIdType, u64),
String>> or Arc<DashMap<ModelIdType, DashMap<u64, String>>>), update all uses
(construction, inserts on local inserts, lookups in apply_tenant_delta, and
eviction bounded by max_tree_size) to include the model_id key, and ensure
functions like apply_tenant_delta and any code at the other referenced sites
(lines ~403-405, ~884-886) use the new composite key or per-model map when
resolving tenant delta hashes so entries for different models no longer
overwrite each other.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/mesh/src/sync.rs (1)
2179-2195:⚠️ Potential issue | 🟡 MinorUpdate this test to decode the new policy payload formats.
The assertion now allows
"tenant_delta"and"tree_state_lz4", but the branch below still deserializes every non-tree_state_deltapayload as rawTreeState. The first such update will panic instead of validating the new format. Match onpolicy_state.policy_typeand decode each payload with the same path the receiver uses.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 2179 - 2195, The test assumes every non-"tree_state_delta" payload is a raw TreeState; update it to match on policy_state.policy_type and decode each format the real receiver uses: for "tree_state_delta" keep TreeStateDelta::from_bytes(&policy_state.config) and assert non-empty operations; for "tree_state_lz4" first LZ4-decompress policy_state.config then TreeState::from_bytes on the decompressed bytes; for "tree_state" call TreeState::from_bytes(&policy_state.config); for "tenant_delta" decode using the same tenant-delta path your runtime uses (e.g., the tenant delta deserializer the receiver calls) and assert its expected fields; ensure each branch uses the matching functions (PolicyState, TreeStateDelta::from_bytes, TreeState::from_bytes) and error messages remain descriptive.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/mesh/src/sync.rs`:
- Around line 853-875: checkpoint_tree_states currently does nothing so live
trees are never exported into tree_configs (path "tree_state_lz4"), breaking
Layer 2 dependent flows; restore per-tree export by iterating your
TreeStateSubscriber instances and calling
TreeStateSubscriber::export_tree_snapshot to produce a compact
kv_index::snapshot::TreeSnapshot, then lz4-compress and write those snapshots
into tree_configs under the "tree_state_lz4" key. To avoid huge allocations, add
a size gate and chunking: if the serialized snapshot exceeds a configured
threshold, either serialize/send it in chunks or skip Layer 2 for that tree but
still export smaller snapshots; ensure this logic lives inside
checkpoint_tree_states and uses the same identifiers (checkpoint_tree_states,
TreeStateSubscriber::export_tree_snapshot, tree_configs, "tree_state_lz4") so
local-only state can be repaired by Layer 2 when snapshots are small or chunked.
- Around line 822-850: The current subscriber path calls
CacheAwarePolicy::apply_tenant_delta via the tree_state_subscribers iterator but
unconditionally advances_tree_version and bumps tree_generation, which makes
transient misses durable; change apply_tenant_delta (or add a new return flag)
to report whether all inserts were successfully applied (e.g., return bool or
Result), have the loop over tree_state_subscribers.collect the result, and only
call self.stores.advance_tree_version(&key, delta.version) and
self.stores.tree_generation.fetch_add(1, Ordering::Release) when every
subscriber reports all inserts applied; if any subscriber reports unresolved
inserts, keep the delta/version pending (do not advance) so the repair_path can
resolve and then mark the version advanced.
---
Outside diff comments:
In `@crates/mesh/src/sync.rs`:
- Around line 2179-2195: The test assumes every non-"tree_state_delta" payload
is a raw TreeState; update it to match on policy_state.policy_type and decode
each format the real receiver uses: for "tree_state_delta" keep
TreeStateDelta::from_bytes(&policy_state.config) and assert non-empty
operations; for "tree_state_lz4" first LZ4-decompress policy_state.config then
TreeState::from_bytes on the decompressed bytes; for "tree_state" call
TreeState::from_bytes(&policy_state.config); for "tenant_delta" decode using the
same tenant-delta path your runtime uses (e.g., the tenant delta deserializer
the receiver calls) and assert its expected fields; ensure each branch uses the
matching functions (PolicyState, TreeStateDelta::from_bytes,
TreeState::from_bytes) and error messages remain descriptive.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 83b58432-c739-4f1f-a9cd-92798ea9ce52
📒 Files selected for processing (2)
crates/mesh/src/sync.rsmodel_gateway/src/policies/cache_aware.rs
crates/mesh/src/sync.rs
Outdated
| // Skip stale deltas — only process if version advances local state. | ||
| let current_version = self.stores.tree_version(&key); | ||
| if delta.version <= current_version { | ||
| debug!( | ||
| model_id = %delta.model_id, | ||
| delta_version = delta.version, | ||
| current_version, | ||
| "Skipping stale tenant delta" | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| debug!( | ||
| model_id = %delta.model_id, | ||
| inserts = delta.inserts.len(), | ||
| evictions = delta.evictions.len(), | ||
| version = delta.version, | ||
| "Applying remote tenant delta" | ||
| ); | ||
|
|
||
| // Notify subscribers (CacheAwarePolicy) to apply directly to local tree | ||
| let subscribers = self.tree_state_subscribers.read(); | ||
| for subscriber in subscribers.iter() { | ||
| subscriber.apply_tenant_delta(&delta.model_id, &delta.inserts, &delta.evictions); | ||
| } | ||
|
|
||
| // Advance version and bump generation so collector re-scans | ||
| self.stores.advance_tree_version(&key, delta.version); | ||
| self.stores.tree_generation.fetch_add(1, Ordering::Release); |
There was a problem hiding this comment.
Don't mark a tenant delta as applied when subscribers can drop inserts.
CacheAwarePolicy::apply_tenant_delta() explicitly drops unknown hashes (model_gateway/src/policies/cache_aware.rs, Lines 398-406), but this method still advances tree_version and will skip any later redelivery of the same delta.version. That turns a transient hash miss into durable divergence unless a newer repair update arrives. Have the subscriber path report whether all inserts were applied, or keep unresolved inserts/version pending until the repair path succeeds.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/mesh/src/sync.rs` around lines 822 - 850, The current subscriber path
calls CacheAwarePolicy::apply_tenant_delta via the tree_state_subscribers
iterator but unconditionally advances_tree_version and bumps tree_generation,
which makes transient misses durable; change apply_tenant_delta (or add a new
return flag) to report whether all inserts were successfully applied (e.g.,
return bool or Result), have the loop over tree_state_subscribers.collect the
result, and only call self.stores.advance_tree_version(&key, delta.version) and
self.stores.tree_generation.fetch_add(1, Ordering::Release) when every
subscriber reports all inserts applied; if any subscriber reports unresolved
inserts, keep the delta/version pending (do not advance) so the repair_path can
resolve and then mark the version advanced.
| /// Checkpoint tree state by exporting compact snapshots from the live | ||
| /// radix tree via subscribers. | ||
| /// | ||
| /// Called periodically (~every 10s) to keep `tree_configs` fresh for | ||
| /// the periodic structure snapshot (every 30 gossip rounds). | ||
| /// | ||
| /// Uses [`TreeStateSubscriber::export_tree_snapshot`] to obtain a | ||
| /// compact [`kv_index::snapshot::TreeSnapshot`] that preserves shared | ||
| /// prefixes. This is much smaller than the flat `TreeState` produced | ||
| /// by `export_tree_state` (~2-4 MB vs ~40 MB for 2048 entries sharing | ||
| /// 80% prefixes) and avoids accumulating full prompt text in memory. | ||
| #[expect( | ||
| clippy::unused_self, | ||
| reason = "Public API called by controller — removing &self is a breaking change" | ||
| )] | ||
| pub fn checkpoint_tree_states(&self) { | ||
| // FIXME: Layer 2 (full tree snapshots) is disabled because the | ||
| // snapshot can be 170+ MB for large trees with long prompts, and | ||
| // allocating it every 10s causes allocator fragmentation. Tree data | ||
| // currently syncs via Layer 1 only (tenant deltas, ~50 bytes each). | ||
| // TODO: implement chunked snapshots or incremental tree diffs so | ||
| // Layer 2 works for large trees without excessive memory allocation. | ||
| } |
There was a problem hiding this comment.
Don't disable local snapshot export for every tree.
With checkpoint_tree_states() as a stub, this node never exports its live trees into tree_configs, so the "tree_state_lz4" path cannot repair local-only state. That breaks every flow in model_gateway/src/policies/cache_aware.rs that still depends on Layer 2 convergence: hash misses (Lines 398-406), HTTP min-load inserts (Lines 566-573), and token-tree inserts (Lines 812-817). Size-gate or chunk large snapshots, but keep exporting the smaller ones.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/mesh/src/sync.rs` around lines 853 - 875, checkpoint_tree_states
currently does nothing so live trees are never exported into tree_configs (path
"tree_state_lz4"), breaking Layer 2 dependent flows; restore per-tree export by
iterating your TreeStateSubscriber instances and calling
TreeStateSubscriber::export_tree_snapshot to produce a compact
kv_index::snapshot::TreeSnapshot, then lz4-compress and write those snapshots
into tree_configs under the "tree_state_lz4" key. To avoid huge allocations, add
a size gate and chunking: if the serialized snapshot exceeds a configured
threshold, either serialize/send it in chunks or skip Layer 2 for that tree but
still export smaller snapshots; ensure this logic lives inside
checkpoint_tree_states and uses the same identifiers (checkpoint_tree_states,
TreeStateSubscriber::export_tree_snapshot, tree_configs, "tree_state_lz4") so
local-only state can be repaired by Layer 2 when snapshots are small or chunked.
1. Remove GC unconditional tree_configs.clear() — was causing version regression every 60s by wiping valid remote state and cascading to tree_versions removal. 2. Remove stale version check in apply_remote_tenant_delta — both routers independently bump tree_version on local inserts, so a peer's delta version can legitimately be lower. Tenant inserts are idempotent (max-epoch-wins), so applying "stale" deltas is safe. 3. Remove snapshot-interval skip that discarded tenant deltas every 30 rounds expecting Layer 2 to compensate — Layer 2 is a no-op, so this was a periodic data loss window. 4. Add LZ4 decompress size validation (256 MB cap) to prevent OOM from corrupted or malicious size headers. 5. Add warn log for tenant delta deserialization failures in ping_server (was silently swallowed). Refs: #1011 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/mesh/src/sync.rs (1)
2175-2189:⚠️ Potential issue | 🟡 MinorDecode each accepted policy payload using its actual format in this test.
The assertion allows
tenant_deltaandtree_state_lz4, but the fallback branch still parses every non-tree_state_deltapayload as rawTreeState. If the collector returns either accepted format, this test fails for the wrong reason.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 2175 - 2189, The test currently asserts allowed policy_type values but always parses non-"tree_state_delta" payloads with TreeState::from_bytes; change the branching in the assertion block to decode based on policy_state.policy_type: if "tree_state_delta" parse with TreeStateDelta::from_bytes and check delta.operations non-empty, else if "tree_state_lz4" first decompress then parse as TreeState (or the appropriate LZ4 decode helper) and check operations, else if "tenant_delta" decode using the tenant-delta deserializer and validate its contents, otherwise fail the test; update the branches that reference policy_state.policy_type, TreeStateDelta::from_bytes and TreeState::from_bytes accordingly so each accepted format is decoded with its actual parser.
♻️ Duplicate comments (4)
crates/mesh/src/sync.rs (4)
28-37:⚠️ Potential issue | 🟠 MajorDon't advance
tree_versionafter a best-effort tenant-delta apply.The default
TreeStateSubscriber::apply_tenant_delta()drops every insert, so this call site cannot assume the delta was applied successfully. Advancing the version unconditionally turns a dropped insert into a permanent skip; have the subscriber path report success/unresolved hashes and gateadvance_tree_version()on that result.Also applies to: 842-844
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 28 - 37, The call unconditionally advancing tree_version after invoking TreeStateSubscriber::apply_tenant_delta is incorrect because the default implementation drops inserts and may not have actually applied the delta; change the call site (where advance_tree_version() is invoked) to use a success/result value returned by apply_tenant_delta (or add one) and only call advance_tree_version() when the subscriber reports success/no unresolved hashes; update the TreeStateSubscriber::apply_tenant_delta signature or add a new method return type to convey success/failure (e.g., Result<bool, Error> or enum) and make implementations like CacheAwarePolicy return true on successful application so the caller can gate advance_tree_version() accordingly.
837-840:⚠️ Potential issue | 🟠 MajorDrop the subscriber-list guard before invoking
apply_tenant_delta().This reintroduces the same callback-under-lock deadlock risk already fixed in
notify_tree_state_subscribers().🔓 Suggested fix
- let subscribers = self.tree_state_subscribers.read(); - for subscriber in subscribers.iter() { + let subscribers = self.tree_state_subscribers.read().clone(); + for subscriber in subscribers { subscriber.apply_tenant_delta(&delta.model_id, &delta.inserts, &delta.evictions); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 837 - 840, The current code holds the RwLock read guard from self.tree_state_subscribers.read() while calling subscriber.apply_tenant_delta(...), reintroducing a callback-under-lock deadlock; fix it by taking the read guard, collecting or cloning the subscriber handles into a temporary Vec (e.g., subscribers.iter().cloned().collect()), dropping the read guard, and only then iterating over that Vec to call apply_tenant_delta on each subscriber (same change pattern used in notify_tree_state_subscribers). Ensure you reference tree_state_subscribers, the read() guard, the subscribers local, and apply_tenant_delta when implementing this change.
496-506:⚠️ Potential issue | 🟠 MajorPreserve the
TreeKeykind inTenantInsert.Both
sync_tree_insert_hash()andsync_tree_operation()reduce text and token inserts to the sameu64hash. By the timeapply_tenant_delta()runs, the receiver can no longer tell whether the update belongs in a string tree or token tree, so token-affinity inserts are not reconstructable on peers.Also applies to: 525-530
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 496 - 506, The TenantInsert currently only stores a u64 hash so the receiver loses whether the insert came from a string or token tree; add a TreeKey-kind field to the TenantInsert struct and populate it when creating inserts so the kind from the TreeKey (string vs token) is preserved. Update the TenantInsert definition to include a kind (enum/flag matching TreeKey kinds), set that field in sync_tree_insert_hash and sync_tree_operation when you push TenantInsert (derive the kind from the same TreeKey used to produce node_path_hash), and ensure apply_tenant_delta consumes that kind to reconstruct token-affinity vs string inserts; also update any serialization/deserialization for TenantInsert accordingly.
541-545:⚠️ Potential issue | 🟠 MajorThe over-eviction fallback is no longer self-healing.
The remove path now relies on a later structure snapshot to repair entries that were evicted too broadly, but
checkpoint_tree_states()is empty.crates/mesh/src/incremental.rs(Lines 357-392) andcrates/mesh/src/controller.rs(Lines 125-135) still assume this method keepstree_configsfresh, so reconnect/full-state repair cannot restore local-only tree state.Also applies to: 847-869
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 541 - 545, The Remove branch in TreeOperation::Remove uses a global-evict fallback but checkpoint_tree_states() is currently a no-op so tree_configs never gets refreshed, breaking reconnect/full-state repair; update the remove path to either (a) compute and persist the specific prefix hash being evicted (instead of using 0) or (b) immediately update/persist tree_configs via checkpoint_tree_states() with the affected prefix state so downstream code in incremental.rs and controller.rs can see the change; specifically modify TreeOperation::Remove handling to call checkpoint_tree_states() with the actual prefix (or record the prefix into the snapshot store) and ensure checkpoint_tree_states() actually writes/restores tree_configs so reconnect and full-state repair can restore local-only tree state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/mesh/src/ping_server.rs`:
- Around line 707-753: The tree_state_lz4 branch currently ignores errors during
lz4 decompression and deserialization; update the block that calls
super::tree_ops::lz4_decompress(&policy_state.config) and the subsequent
TreeState::from_bytes / kv_index::snapshot::TreeSnapshot::from_bytes attempts to
log warnings on failure (include model_id, policy_state.version and a short
context string), e.g., log a warning if lz4_decompress returns Err and also log
a warning if decompression succeeds but neither TreeState::from_bytes nor
TreeSnapshot::from_bytes can deserialize the bytes before returning/continuing;
use the existing sync_manager.apply_remote_tree_operation and actor variables
for context in the messages so failures are observable during debugging.
In `@crates/mesh/src/sync.rs`:
- Around line 568-593: materialize_tree_state was updated to accept both
TreeState and TreeSnapshot but apply_remote_tree_delta still assumes only
TreeState, causing valid snapshot-backed configs to be treated as corrupted;
update apply_remote_tree_delta (and the other referenced block at the second
occurrence around apply_remote_tree_delta handling) to mirror
materialize_tree_state: when decoding config bytes first try
TreeState::from_bytes, and if that fails try
kv_index::snapshot::TreeSnapshot::from_bytes and convert to a TreeState via
TreeState::from_snapshot using self.stores.tree_version(key) (or the same
version helper used in materialize_tree_state) before proceeding with the
existing delta-apply logic that expects a TreeState.
---
Outside diff comments:
In `@crates/mesh/src/sync.rs`:
- Around line 2175-2189: The test currently asserts allowed policy_type values
but always parses non-"tree_state_delta" payloads with TreeState::from_bytes;
change the branching in the assertion block to decode based on
policy_state.policy_type: if "tree_state_delta" parse with
TreeStateDelta::from_bytes and check delta.operations non-empty, else if
"tree_state_lz4" first decompress then parse as TreeState (or the appropriate
LZ4 decode helper) and check operations, else if "tenant_delta" decode using the
tenant-delta deserializer and validate its contents, otherwise fail the test;
update the branches that reference policy_state.policy_type,
TreeStateDelta::from_bytes and TreeState::from_bytes accordingly so each
accepted format is decoded with its actual parser.
---
Duplicate comments:
In `@crates/mesh/src/sync.rs`:
- Around line 28-37: The call unconditionally advancing tree_version after
invoking TreeStateSubscriber::apply_tenant_delta is incorrect because the
default implementation drops inserts and may not have actually applied the
delta; change the call site (where advance_tree_version() is invoked) to use a
success/result value returned by apply_tenant_delta (or add one) and only call
advance_tree_version() when the subscriber reports success/no unresolved hashes;
update the TreeStateSubscriber::apply_tenant_delta signature or add a new method
return type to convey success/failure (e.g., Result<bool, Error> or enum) and
make implementations like CacheAwarePolicy return true on successful application
so the caller can gate advance_tree_version() accordingly.
- Around line 837-840: The current code holds the RwLock read guard from
self.tree_state_subscribers.read() while calling
subscriber.apply_tenant_delta(...), reintroducing a callback-under-lock
deadlock; fix it by taking the read guard, collecting or cloning the subscriber
handles into a temporary Vec (e.g., subscribers.iter().cloned().collect()),
dropping the read guard, and only then iterating over that Vec to call
apply_tenant_delta on each subscriber (same change pattern used in
notify_tree_state_subscribers). Ensure you reference tree_state_subscribers, the
read() guard, the subscribers local, and apply_tenant_delta when implementing
this change.
- Around line 496-506: The TenantInsert currently only stores a u64 hash so the
receiver loses whether the insert came from a string or token tree; add a
TreeKey-kind field to the TenantInsert struct and populate it when creating
inserts so the kind from the TreeKey (string vs token) is preserved. Update the
TenantInsert definition to include a kind (enum/flag matching TreeKey kinds),
set that field in sync_tree_insert_hash and sync_tree_operation when you push
TenantInsert (derive the kind from the same TreeKey used to produce
node_path_hash), and ensure apply_tenant_delta consumes that kind to reconstruct
token-affinity vs string inserts; also update any serialization/deserialization
for TenantInsert accordingly.
- Around line 541-545: The Remove branch in TreeOperation::Remove uses a
global-evict fallback but checkpoint_tree_states() is currently a no-op so
tree_configs never gets refreshed, breaking reconnect/full-state repair; update
the remove path to either (a) compute and persist the specific prefix hash being
evicted (instead of using 0) or (b) immediately update/persist tree_configs via
checkpoint_tree_states() with the affected prefix state so downstream code in
incremental.rs and controller.rs can see the change; specifically modify
TreeOperation::Remove handling to call checkpoint_tree_states() with the actual
prefix (or record the prefix into the snapshot store) and ensure
checkpoint_tree_states() actually writes/restores tree_configs so reconnect and
full-state repair can restore local-only tree state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d048f88c-eb18-474e-b6ff-1995b0dcae78
📒 Files selected for processing (5)
crates/mesh/src/incremental.rscrates/mesh/src/ping_server.rscrates/mesh/src/stores.rscrates/mesh/src/sync.rscrates/mesh/src/tree_ops.rs
| /// Load the materialized TreeState from `tree_configs`. | ||
| /// Returns None if no checkpoint exists for this key. | ||
| /// | ||
| /// Returns `(TreeState, pending_count)` where `pending_count` is the | ||
| /// number of operations read from the pending buffer under the same | ||
| /// DashMap `Ref`. Callers that need to drain the pending buffer after | ||
| /// checkpoint should use this count (not `tree_state.operations.len()`, | ||
| /// which also includes operations baked into the config blob). | ||
| fn materialize_tree_state(&self, key: &str, model_id: &str) -> Option<(TreeState, usize)> { | ||
| // Clone bytes out of the DashMap ref so no guard is held while | ||
| // accessing tree_ops_pending. | ||
| let config_bytes = self.stores.tree_configs.get(key).map(|r| r.clone()); | ||
| let mut tree_state = match config_bytes.as_deref() { | ||
| Some(bytes) if !bytes.is_empty() => TreeState::from_bytes(bytes).ok()?, | ||
| Some(_) => TreeState::new(model_id.to_string()), | ||
| None => { | ||
| // No tree_configs entry yet — tree may only exist in the | ||
| // pending buffer (sync_tree_operation no longer creates | ||
| // config entries on the hot path). | ||
| if self.stores.tree_ops_pending.contains_key(key) { | ||
| TreeState::new(model_id.to_string()) | ||
| } else { | ||
| return None; | ||
| } | ||
| } | ||
| }; | ||
| let mut pending_count = 0; | ||
| if let Some(pending) = self.stores.tree_ops_pending.get(key) { | ||
| pending_count = pending.len(); | ||
| for op in pending.iter() { | ||
| tree_state.add_operation(op.clone()); | ||
| } | ||
| /// Handles two storage formats: | ||
| /// - `TreeState` bytes (from remote full-state updates) | ||
| /// - `TreeSnapshot` bytes (from local `checkpoint_tree_states`) | ||
| fn materialize_tree_state(&self, key: &str, model_id: &str) -> Option<TreeState> { | ||
| let config_bytes = self.stores.tree_configs.get(key)?; | ||
| let bytes = config_bytes.value(); | ||
| if bytes.is_empty() { | ||
| return Some(TreeState::new(model_id.to_string())); | ||
| } | ||
| Some((tree_state, pending_count)) | ||
| // Try TreeState first (remote full-state updates store this format). | ||
| if let Ok(ts) = TreeState::from_bytes(bytes) { | ||
| return Some(ts); | ||
| } | ||
| // Fall back to TreeSnapshot (local checkpoint format). | ||
| if let Ok(snap) = kv_index::snapshot::TreeSnapshot::from_bytes(bytes) { | ||
| let version = self.stores.tree_version(key); | ||
| return Some(TreeState::from_snapshot( | ||
| model_id.to_string(), | ||
| &snap, | ||
| version, | ||
| )); | ||
| } | ||
| None |
There was a problem hiding this comment.
Keep delta-apply decoding in sync with tree_configs' new formats.
materialize_tree_state() now accepts both TreeState and TreeSnapshot, but apply_remote_tree_delta() still only accepts TreeState. That makes the storage contract internally inconsistent and will treat valid snapshot-backed configs as corruption.
Also applies to: 727-749
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/mesh/src/sync.rs` around lines 568 - 593, materialize_tree_state was
updated to accept both TreeState and TreeSnapshot but apply_remote_tree_delta
still assumes only TreeState, causing valid snapshot-backed configs to be
treated as corrupted; update apply_remote_tree_delta (and the other referenced
block at the second occurrence around apply_remote_tree_delta handling) to
mirror materialize_tree_state: when decoding config bytes first try
TreeState::from_bytes, and if that fails try
kv_index::snapshot::TreeSnapshot::from_bytes and convert to a TreeState via
TreeState::from_snapshot using self.stores.tree_version(key) (or the same
version helper used in materialize_tree_state) before proceeding with the
existing delta-apply logic that expects a TreeState.
1. Clone subscriber list before callbacks in apply_remote_tenant_delta to avoid holding read guard during subscriber code (same pattern as notify_tree_state_subscribers). 2. Remove dead last_checkpoint_gen field from StateStores (vestigial from disabled Layer 2 checkpoint). 3. Add warn logs for tree_state_delta and tree_state_lz4 deser/decompress failures in ping_server.rs (was silently swallowing errors). 4. Update snapshot.rs module doc — no longer "not yet wired", it IS used by controller.rs and ping_server.rs receive paths. 5. Update tree_configs GC comment — "only remove entries for models that are truly gone" matches actual behavior. 6. Add TODO for path_hash_index model scoping — deferred to follow-up to avoid wire format change. 7. Fix registry test to check tenant_delta_inserts instead of vacuous get_tree_state().is_none() assertion. Refs: #1011 Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
| model_id, | ||
| policy_type: "tree_state".to_string(), | ||
| config: config_bytes, | ||
| policy_type: "tree_state_lz4".to_string(), |
There was a problem hiding this comment.
🟡 Nit: Wire format backward compatibility break during rolling upgrades. This PR introduces two new policy_type values ("tenant_delta" in Phase 0 and "tree_state_lz4" here in Phase 2) that older nodes don't recognize. Older receivers silently ignore unknown policy types — they only handle "tree_state_delta" and "tree_state".
Since Phase 1 (tree_state_delta sending) is also removed, new nodes send nothing that old nodes can process for tree sync. During a rolling upgrade, tree sync from new→old is completely broken (old→new still works because new nodes handle legacy formats).
This means old nodes will have stale tree routing data for the duration of the upgrade window, potentially causing suboptimal cache-aware routing.
Consider either:
- Adding a transitional period where new nodes also emit a
"tree_state"fallback alongside the new formats, or - Documenting this as a known limitation requiring a brief routing degradation during upgrades.
| // Apply inserts — look up the prefix path by hash in our local index. | ||
| // If the hash is unknown (prefix doesn't exist locally), the insert is | ||
| // silently dropped. The next structure snapshot (every ~30s) will deliver | ||
| // the full tree including this prefix + its tenants. |
There was a problem hiding this comment.
🟡 Nit: This comment says "The next structure snapshot (every ~30s) will deliver the full tree" — but checkpoint_tree_states is currently a no-op (see sync.rs:871), so Layer 2 snapshots never fire. Dropped inserts are not corrected by any mechanism in the current code.
This means cross-gateway tenant delta inserts only work when the receiving gateway has already processed the same full prompt text locally (populating path_hash_index with the same hash). For unique prompts routed to different gateways, the insert is permanently lost.
The comment should be updated to reflect the current state, e.g.:
// Unknown hash — dropped. Will be corrected once Layer 2
// (periodic structure snapshots) is enabled.
| )] | ||
| pub fn export_tree_state(&self, model_id: &str) -> Option<smg_mesh::TreeState> { | ||
| let model_id = Self::normalize_mesh_model_id(model_id); | ||
| let tree = self.string_trees.get(model_id)?; | ||
| let snapshot = tree.snapshot(); | ||
| if snapshot.nodes.is_empty() { | ||
| return None; | ||
| } | ||
|
|
||
| // Walk snapshot nodes in pre-order, reconstructing full prefix paths |
There was a problem hiding this comment.
🟡 Nit: export_tree_state contains a snapshot-walking algorithm that is identical to TreeState::from_snapshot in tree_ops.rs:234-268. Both reconstruct full prefix paths from pre-order node lists using the same stack-based traversal. If one is fixed or changed, the other must be updated in lockstep.
Consider extracting the shared traversal into a single helper (e.g., a method on TreeSnapshot that yields (prefix, tenant) pairs) to avoid divergence.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/mesh/src/stores.rs (1)
849-877:⚠️ Potential issue | 🟠 MajorTreat pending tenant deltas as tree liveness.
gc_stale_tree_entries()still only considerstree_ops_pending, but this PR moved string-tree buffering totenant_delta_inserts/tenant_delta_evictions. A model with pending tenant deltas and no checkpointedtree_configsnow loses itstree_versionsentry as “stale”, which can reset the next local version back to1and break the new idle/version tracking.Suggested fix
pub fn gc_stale_tree_entries(&self) -> usize { - let before = - self.tree_configs.len() + self.tree_versions.len() + self.tree_ops_pending.len(); + let has_pending_activity = |key: &String| { + self.tree_ops_pending.get(key).is_some_and(|v| !v.is_empty()) + || self + .tenant_delta_inserts + .get(key) + .is_some_and(|v| !v.is_empty()) + || self + .tenant_delta_evictions + .get(key) + .is_some_and(|v| !v.is_empty()) + }; + let before = self.tree_configs.len() + + self.tree_versions.len() + + self.tree_ops_pending.len() + + self.tenant_delta_inserts.len() + + self.tenant_delta_evictions.len(); self.tree_versions.retain(|k, _| { - self.tree_configs.contains_key(k) - || self.tree_ops_pending.get(k).is_some_and(|v| !v.is_empty()) + self.tree_configs.contains_key(k) || has_pending_activity(k) }); self.tree_ops_pending .retain(|k, v| !v.is_empty() || self.tree_configs.contains_key(k)); + self.tenant_delta_inserts.retain(|k, v| { + !v.is_empty() || self.tree_configs.contains_key(k) || self.tree_versions.contains_key(k) + }); + self.tenant_delta_evictions.retain(|k, v| { + !v.is_empty() || self.tree_configs.contains_key(k) || self.tree_versions.contains_key(k) + }); self.tree_configs.retain(|k, _| { - self.tree_versions.contains_key(k) - || self.tree_ops_pending.get(k).is_some_and(|v| !v.is_empty()) + self.tree_versions.contains_key(k) || has_pending_activity(k) }); - let after = - self.tree_configs.len() + self.tree_versions.len() + self.tree_ops_pending.len(); + let after = self.tree_configs.len() + + self.tree_versions.len() + + self.tree_ops_pending.len() + + self.tenant_delta_inserts.len() + + self.tenant_delta_evictions.len();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/stores.rs` around lines 849 - 877, gc_stale_tree_entries currently treats only tree_ops_pending as evidence of liveness, but pending tenant deltas live in tenant_delta_inserts/tenant_delta_evictions; update gc_stale_tree_entries to treat a model as live if it has non-empty tenant_delta_inserts or tenant_delta_evictions (in addition to tree_ops_pending) when deciding whether to retain tree_versions and tree_configs so that models with pending tenant deltas are not pruned and their version counters are preserved; refer to gc_stale_tree_entries, tree_versions, tree_configs, tree_ops_pending, tenant_delta_inserts, and tenant_delta_evictions when making the checks.crates/mesh/src/sync.rs (1)
2177-2191:⚠️ Potential issue | 🟡 MinorDecode each collected payload by its actual wire format.
This branch now allows
"tenant_delta"and"tree_state_lz4", but it still parses every non-"tree_state_delta"payload as rawTreeState. That means the test no longer validates the collector correctly once it emits a tenant delta or an LZ4 snapshot. Match onpolicy_state.policy_typeand use the corresponding decoder for each case.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 2177 - 2191, The test currently treats every non-"tree_state_delta" payload as a raw TreeState; update the branch on policy_state.policy_type to handle each wire format explicitly: match "tenant_delta" and "tree_state_delta" to decode with TreeStateDelta::from_bytes and assert !delta.operations.is_empty(), match "tree_state_lz4" to first decompress the bytes (using the repo's LZ4 decompression helper) then decode with TreeState::from_bytes and assert !tree.operations.is_empty(), and match "tree_state" to decode directly with TreeState::from_bytes and assert !tree.operations.is_empty(); use the existing symbols policy_state.policy_type, TreeStateDelta::from_bytes, TreeState::from_bytes and the LZ4 decompressor helper to locate and implement these changes.
♻️ Duplicate comments (3)
model_gateway/src/policies/cache_aware.rs (2)
81-91:⚠️ Potential issue | 🟠 MajorScope
path_hash_indexbymodel_id, not just hash.This is still a cross-model correctness bug. The same request text in two models produces the same
hash_node_path(text), andapply_tenant_delta()later resolves that hash withoutmodel_id, so model A can pick up model B’s matched prefix and insert the tenant into the wrong node. This does not require a BLAKE3 collision—just the same prompt string across models.Minimal fix
- path_hash_index: Arc<DashMap<u64, String>>, + path_hash_index: Arc<DashMap<(String, u64), String>>, ... - if let Some(path_entry) = self.path_hash_index.get(&insert.node_path_hash) { + if let Some(path_entry) = self + .path_hash_index + .get(&(model_id.to_string(), insert.node_path_hash)) + { string_tree.insert_text(path_entry.value(), &insert.worker_url); } ... - self.path_hash_index.insert(path_hash, matched_prefix); + self.path_hash_index + .insert((model_id.to_string(), path_hash), matched_prefix);The
model_idis already present on the tenant-delta envelope, so this fix stays wire-compatible.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model_gateway/src/policies/cache_aware.rs` around lines 81 - 91, path_hash_index currently keys only by the 64-bit text hash which allows cross-model collisions; change the map to scope keys by model_id + hash (e.g., a composite key type or by hashing model_id || hash) so lookups/insertions become (model_id, hash) operations; update all uses of path_hash_index (places that call hash_node_path, the local insert code that populates path_hash_index, and apply_tenant_delta which looks up the index) to pass and match the model_id from the tenant-delta envelope, keeping the wire format unchanged because model_id is already available.
1120-1127:⚠️ Potential issue | 🟡 MinorThis assertion is still vacuous.
get_tree_state(...).is_none()passes both when the insert was correctly buffered as a tenant delta and whenselect_worker()never touched mesh at all, so it no longer protects the regression the comment describes.Example tightening
- let stores = Arc::new(StateStores::with_self_name("node1".to_string())); - let mesh_sync = Arc::new(MeshSyncManager::new(stores, "node1".to_string())); + let stores = Arc::new(StateStores::with_self_name("node1".to_string())); + let mesh_sync = Arc::new(MeshSyncManager::new(stores.clone(), "node1".to_string())); @@ - assert!( - mesh_sync.get_tree_state(UNKNOWN_MODEL_ID).is_none(), - "get_tree_state should be None until checkpoint runs" - ); + assert!( + stores + .tenant_delta_inserts + .get(UNKNOWN_MODEL_ID) + .is_some_and(|updates| !updates.is_empty()), + "select_worker should buffer at least one tenant delta" + );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model_gateway/src/policies/cache_aware.rs` around lines 1120 - 1127, The current assertion using mesh_sync.get_tree_state(UNKNOWN_MODEL_ID).is_none() is vacuous; instead capture the tree version before running sync_tree_operation (e.g., pre_version = mesh_sync.get_tree_version(UNKNOWN_MODEL_ID).unwrap_or(0)), run the operation (or select_worker call), then assert the post-version (mesh_sync.get_tree_version(...)) is greater than pre_version while still asserting get_tree_state(...) is None; reference mesh_sync.get_tree_state, mesh_sync.get_tree_version (or equivalent version API), sync_tree_operation and select_worker to locate the relevant checks and replace the single vacuous assertion with the version bump assertion plus the None-state check.crates/mesh/src/sync.rs (1)
574-593:⚠️ Potential issue | 🟠 MajorMirror the snapshot fallback when applying remote deltas.
materialize_tree_state()now accepts bothTreeStateandTreeSnapshot, butapply_remote_tree_delta()still treats any snapshot-backedtree_configsentry as corruption. If a checkpointed snapshot lands here, deltas for that model will be rejected even thoughget_tree_state()can read the same bytes. Please reuse theTreeSnapshot::from_bytes+TreeState::from_snapshot(...)fallback here when computingcurrent_versionand building the base tree.Also applies to: 706-749
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mesh/src/sync.rs` around lines 574 - 593, apply_remote_tree_delta currently treats snapshot-backed tree_configs as corruption while materialize_tree_state accepts both TreeState and kv_index::snapshot::TreeSnapshot; update apply_remote_tree_delta to mirror that fallback: when reading config bytes for current_version and base tree inside apply_remote_tree_delta, attempt TreeState::from_bytes first and if that fails try kv_index::snapshot::TreeSnapshot::from_bytes and convert via TreeState::from_snapshot(model_id.to_string(), &snap, version) (use the same version retrieval as materialize_tree_state via self.stores.tree_version(key)); ensure both places (apply_remote_tree_delta and the other occurrence noted around lines ~706-749) use the same fallback logic so snapshot-backed entries are accepted rather than rejected as corruption.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/kv_index/src/snapshot.rs`:
- Around line 3-8: The TreeSnapshot bytes must include and preserve the tree
version: add a version field to the TreeSnapshot serialization and populate it
when exporting snapshots (the exporter that creates TreeSnapshot instances),
then update callers of TreeState::from_snapshot(...) to pass snap.version
instead of using the outer PolicyState.version placeholder; ensure the
snapshot-backed serialization path no longer uses the placeholder fallback and
that any code paths which call advance_tree_version() rely on the restored
TreeState version from snap.version.
In `@crates/mesh/src/sync.rs`:
- Around line 523-553: The remove path drops epoch metadata causing out-of-order
evictions; extend TenantEvict to include epoch (same as TenantInsert), populate
it where evictions are pushed into tenant_delta_evictions (use
self.stores.tree_version(&key)), and update downstream application logic (the
subscriber/consumer that processes tenant deltas — the code referenced around
apply of deltas) to compare eviction.epoch against the current tree version and
ignore stale evictions; alternatively, if you prefer not to order tenant-delta
removes, keep remove events on the ordered full-state path instead of pushing
them into tenant_delta_evictions so causality is preserved.
---
Outside diff comments:
In `@crates/mesh/src/stores.rs`:
- Around line 849-877: gc_stale_tree_entries currently treats only
tree_ops_pending as evidence of liveness, but pending tenant deltas live in
tenant_delta_inserts/tenant_delta_evictions; update gc_stale_tree_entries to
treat a model as live if it has non-empty tenant_delta_inserts or
tenant_delta_evictions (in addition to tree_ops_pending) when deciding whether
to retain tree_versions and tree_configs so that models with pending tenant
deltas are not pruned and their version counters are preserved; refer to
gc_stale_tree_entries, tree_versions, tree_configs, tree_ops_pending,
tenant_delta_inserts, and tenant_delta_evictions when making the checks.
In `@crates/mesh/src/sync.rs`:
- Around line 2177-2191: The test currently treats every non-"tree_state_delta"
payload as a raw TreeState; update the branch on policy_state.policy_type to
handle each wire format explicitly: match "tenant_delta" and "tree_state_delta"
to decode with TreeStateDelta::from_bytes and assert
!delta.operations.is_empty(), match "tree_state_lz4" to first decompress the
bytes (using the repo's LZ4 decompression helper) then decode with
TreeState::from_bytes and assert !tree.operations.is_empty(), and match
"tree_state" to decode directly with TreeState::from_bytes and assert
!tree.operations.is_empty(); use the existing symbols policy_state.policy_type,
TreeStateDelta::from_bytes, TreeState::from_bytes and the LZ4 decompressor
helper to locate and implement these changes.
---
Duplicate comments:
In `@crates/mesh/src/sync.rs`:
- Around line 574-593: apply_remote_tree_delta currently treats snapshot-backed
tree_configs as corruption while materialize_tree_state accepts both TreeState
and kv_index::snapshot::TreeSnapshot; update apply_remote_tree_delta to mirror
that fallback: when reading config bytes for current_version and base tree
inside apply_remote_tree_delta, attempt TreeState::from_bytes first and if that
fails try kv_index::snapshot::TreeSnapshot::from_bytes and convert via
TreeState::from_snapshot(model_id.to_string(), &snap, version) (use the same
version retrieval as materialize_tree_state via self.stores.tree_version(key));
ensure both places (apply_remote_tree_delta and the other occurrence noted
around lines ~706-749) use the same fallback logic so snapshot-backed entries
are accepted rather than rejected as corruption.
In `@model_gateway/src/policies/cache_aware.rs`:
- Around line 81-91: path_hash_index currently keys only by the 64-bit text hash
which allows cross-model collisions; change the map to scope keys by model_id +
hash (e.g., a composite key type or by hashing model_id || hash) so
lookups/insertions become (model_id, hash) operations; update all uses of
path_hash_index (places that call hash_node_path, the local insert code that
populates path_hash_index, and apply_tenant_delta which looks up the index) to
pass and match the model_id from the tenant-delta envelope, keeping the wire
format unchanged because model_id is already available.
- Around line 1120-1127: The current assertion using
mesh_sync.get_tree_state(UNKNOWN_MODEL_ID).is_none() is vacuous; instead capture
the tree version before running sync_tree_operation (e.g., pre_version =
mesh_sync.get_tree_version(UNKNOWN_MODEL_ID).unwrap_or(0)), run the operation
(or select_worker call), then assert the post-version
(mesh_sync.get_tree_version(...)) is greater than pre_version while still
asserting get_tree_state(...) is None; reference mesh_sync.get_tree_state,
mesh_sync.get_tree_version (or equivalent version API), sync_tree_operation and
select_worker to locate the relevant checks and replace the single vacuous
assertion with the version bump assertion plus the None-state check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8ac583a4-8a2a-4920-8f88-a2efdfb4868a
📒 Files selected for processing (6)
crates/kv_index/src/snapshot.rscrates/mesh/src/ping_server.rscrates/mesh/src/stores.rscrates/mesh/src/sync.rsmodel_gateway/src/policies/cache_aware.rsmodel_gateway/src/policies/registry.rs
| //! Used by the mesh sync protocol as an alternative to the flat `TreeState` | ||
| //! operation list. The structure-preserving format shares common prefixes, | ||
| //! reducing wire size from ~40 MB to ~2-4 MB for a tree with 2048 entries | ||
| //! sharing 80% prefixes. Receivers in `controller.rs` and `ping_server.rs` | ||
| //! accept both `TreeState` and `TreeSnapshot` payloads under the | ||
| //! `tree_state_lz4` policy type. |
There was a problem hiding this comment.
Preserve the tree version inside TreeSnapshot.
Now that this format is part of tree_state_lz4, the snapshot bytes themselves need to carry the tree version. Today the receiver has to trust the outer PolicyState.version, and the sender currently falls back to a placeholder when serializing snapshot-backed updates. That collapses restored TreeState versions and can regress later advance_tree_version() baselines.
Minimal shape change
pub struct TreeSnapshot {
+ /// Tree version used to seed remote version counters.
+ pub version: u64,
/// Flattened tree nodes in pre-order (parent before children).
pub nodes: Vec<SnapshotNode>,
}Populate this on export, and use snap.version at the TreeState::from_snapshot(...) call sites instead of an envelope placeholder.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/kv_index/src/snapshot.rs` around lines 3 - 8, The TreeSnapshot bytes
must include and preserve the tree version: add a version field to the
TreeSnapshot serialization and populate it when exporting snapshots (the
exporter that creates TreeSnapshot instances), then update callers of
TreeState::from_snapshot(...) to pass snap.version instead of using the outer
PolicyState.version placeholder; ensure the snapshot-backed serialization path
no longer uses the placeholder fallback and that any code paths which call
advance_tree_version() rely on the restored TreeState version from snap.version.
| // Buffer a lightweight tenant delta — 24 bytes per insert (hash + epoch) | ||
| // instead of 80k+ bytes (full prompt text). | ||
| match &operation { | ||
| TreeOperation::Insert(insert) => { | ||
| let path_hash = match &insert.key { | ||
| TreeKey::Text(text) => hash_node_path(text), | ||
| TreeKey::Tokens(tokens) => hash_token_path(tokens), | ||
| }; | ||
| self.stores | ||
| .tenant_delta_inserts | ||
| .entry(model_id.clone()) | ||
| .or_default() | ||
| .push(TenantInsert { | ||
| node_path_hash: path_hash, | ||
| worker_url: insert.tenant.clone(), | ||
| epoch: self.stores.tree_version(&key), | ||
| }); | ||
| } | ||
| TreeOperation::Remove(remove) => { | ||
| // TODO: capture the specific prefix hash being evicted. | ||
| // For now, 0 means "evict from all nodes" (global eviction). | ||
| // This is overly aggressive but correct — the next structure | ||
| // snapshot will restore any wrongly evicted entries. | ||
| self.stores | ||
| .tenant_delta_evictions | ||
| .entry(model_id.clone()) | ||
| .or_default() | ||
| .push(TenantEvict { | ||
| node_path_hash: crate::tree_ops::GLOBAL_EVICTION_HASH, | ||
| worker_url: remove.tenant.clone(), | ||
| }); |
There was a problem hiding this comment.
Keep causal ordering on remove deltas.
Lines 535-539 preserve an epoch for inserts, but Lines 546-553 drop that metadata for evictions, and Lines 822-846 intentionally apply stale tenant deltas anyway. Inserts are idempotent; evictions are not — a delayed remove that arrives after a later reinsert will still win, and advance_tree_version() only prevents the counter from moving backward after the delete has already happened. Please either carry an epoch/version on TenantEvict and enforce it in subscribers, or keep removes on the ordered full-state path until tenant-delta removes can be ordered.
Also applies to: 822-846
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/mesh/src/sync.rs` around lines 523 - 553, The remove path drops epoch
metadata causing out-of-order evictions; extend TenantEvict to include epoch
(same as TenantInsert), populate it where evictions are pushed into
tenant_delta_evictions (use self.stores.tree_version(&key)), and update
downstream application logic (the subscriber/consumer that processes tenant
deltas — the code referenced around apply of deltas) to compare eviction.epoch
against the current tree version and ignore stale evictions; alternatively, if
you prefer not to order tenant-delta removes, keep remove events on the ordered
full-state path instead of pushing them into tenant_delta_evictions so causality
is preserved.
test_cache_aware_tree_synchronization called get_tree_state() which returns None because sync_tree_operation no longer populates tree_configs. Updated to verify tenant_delta_inserts are buffered and simulate remote state via apply_remote_tree_operation directly. Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/tests/mesh_integration_test.rs`:
- Around line 254-262: The test only asserts the count of buffered tenant
deltas; enhance it to verify actual payload contents so wrong entries aren't
missed—after retrieving stores1.tenant_delta_inserts.get("model1") (and mapping
to vec), assert that the buffered entries contain the expected tenant delta
payloads (e.g., check each entry's worker URL and tenant id or other identifying
fields) rather than just length; use the same variable (e.g., inserts_vec or
inserts) to iterate and assert specific fields/values for both expected deltas
so the test fails if incorrect entries are buffered (refer to stores1,
tenant_delta_inserts, and the model key "model1" to locate where to add these
content assertions).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f134a7cd-945a-4597-96ec-af5b3c354c7a
📒 Files selected for processing (1)
model_gateway/tests/mesh_integration_test.rs
| // Verify tenant deltas were buffered (sync_tree_operation no longer | ||
| // populates tree_configs — it buffers tenant deltas instead) | ||
| let inserts = stores1 | ||
| .tenant_delta_inserts | ||
| .get("model1") | ||
| .map(|v| v.len()) | ||
| .unwrap_or(0); | ||
| assert_eq!(inserts, 2, "expected 2 tenant delta inserts buffered"); | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Strengthen tenant-delta assertions beyond count-only.
Line [261] only checks count. Consider asserting buffered payload content (e.g., worker URLs) to avoid false positives when wrong entries are buffered.
Proposed assertion hardening
- let inserts = stores1
- .tenant_delta_inserts
- .get("model1")
- .map(|v| v.len())
- .unwrap_or(0);
- assert_eq!(inserts, 2, "expected 2 tenant delta inserts buffered");
+ let inserts = stores1
+ .tenant_delta_inserts
+ .get("model1")
+ .expect("expected tenant delta buffer for model1");
+ assert_eq!(inserts.len(), 2, "expected 2 tenant delta inserts buffered");
+ let workers: std::collections::BTreeSet<_> =
+ inserts.iter().map(|i| i.worker_url.as_str()).collect();
+ assert_eq!(
+ workers,
+ std::collections::BTreeSet::from([
+ "http://worker1:8000",
+ "http://worker2:8000",
+ ]),
+ "buffered tenant inserts should preserve expected worker URLs",
+ );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/tests/mesh_integration_test.rs` around lines 254 - 262, The
test only asserts the count of buffered tenant deltas; enhance it to verify
actual payload contents so wrong entries aren't missed—after retrieving
stores1.tenant_delta_inserts.get("model1") (and mapping to vec), assert that the
buffered entries contain the expected tenant delta payloads (e.g., check each
entry's worker URL and tenant id or other identifying fields) rather than just
length; use the same variable (e.g., inserts_vec or inserts) to iterate and
assert specific fields/values for both expected deltas so the test fails if
incorrect entries are buffered (refer to stores1, tenant_delta_inserts, and the
model key "model1" to locate where to add these content assertions).
| ); | ||
| } | ||
| } | ||
| } else if policy_state.policy_type |
There was a problem hiding this comment.
🟡 Nit: The if-else dispatch ordering here (tenant_delta → tree_state_lz4 → tree_state_delta → tree_state) differs from the equivalent chain in ping_server.rs:693 (tenant_delta → tree_state_delta → tree_state_lz4 → tree_state). The swapped tree_state_lz4 / tree_state_delta ordering isn't a correctness issue (the branches are mutually exclusive string comparisons), but it makes it harder to diff-verify that the two dispatch sites are in sync — which is exactly the maintenance risk REVIEW.md calls out for the HTTP/gRPC dual-path contract.
| @@ -840,9 +850,9 @@ impl StateStores { | |||
| let before = | |||
| self.tree_configs.len() + self.tree_versions.len() + self.tree_ops_pending.len(); | |||
There was a problem hiding this comment.
🟡 Nit: gc_stale_tree_entries still uses tree_ops_pending as a liveness signal (lines 865, 870, 876) and includes it in the before/after accounting here. Since sync_tree_operation no longer pushes to tree_ops_pending, this map is always empty — making these checks dead code. The new tenant_delta_inserts / tenant_delta_evictions buffers don't need GC (they're drained every round by the collector), but the tree_ops_pending references here should be cleaned up to avoid confusion about what's actually live.
…ghtseekorg#1011) Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
Description
Problem
The mesh cache-aware routing sync has been causing memory leaks in production. The issue progressed through multiple fix attempts:
What Works (This PR)
Layer 1: Tenant Deltas (every 1s, ~400 bytes/round)
Layer 2: Structure Snapshots (FIXME — deferred)
Layer 3: Hash-Based Idle Suppression
Key fixes:
Benchmark: 200 rps, 80k-char prompts, 20 workers, 2 mesh routers
Design Doc vs Implementation
Known Limitations
Related PRs
#948 #952 #956 #961 #974 #987
Test Plan
Summary by CodeRabbit
New Features
Improvements