Skip to content

Commit e8a6fb6

Browse files
authored
fix(mesh): eliminate memory leaks in two-layer tree sync protocol (#1011)
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
1 parent 687319f commit e8a6fb6

15 files changed

Lines changed: 1361 additions & 797 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ tonic = { version = "0.14.2", features = ["gzip", "transport"] }
2929
tonic-prost = "0.14.2"
3030
axum = { version = "0.8.6" }
3131
blake3 = "1.5"
32+
lz4_flex = "0.11"
3233
bytemuck = { version = "1.21" }
3334
chrono = { version = "0.4" }
3435
dashmap = "6.1.0"

crates/kv_index/src/snapshot.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
//! Compact serializable snapshot of a radix tree.
22
//!
3-
//! Preserves the tree structure (shared prefixes stored once) for
4-
//! efficient mesh synchronization. A tree with 2048 entries sharing
5-
//! 80% prefixes serializes to ~2-4 MB instead of ~40 MB as flat ops.
3+
//! Used by the mesh sync protocol as an alternative to the flat `TreeState`
4+
//! operation list. The structure-preserving format shares common prefixes,
5+
//! reducing wire size from ~40 MB to ~2-4 MB for a tree with 2048 entries
6+
//! sharing 80% prefixes. Receivers in `controller.rs` and `ping_server.rs`
7+
//! accept both `TreeState` and `TreeSnapshot` payloads under the
8+
//! `tree_state_lz4` policy type.
69
//!
710
//! Wire format: flattened pre-order node list. Each node stores its
811
//! edge label and tenant list. Children are implicitly ordered by

crates/kv_index/src/string_tree.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,23 @@ impl Tree {
897897
self.evict_tenant_entries(tenant, current_count - max_chars);
898898
}
899899

900-
/// Helper to evict a specific number of chars for a tenant
900+
/// Remove a tenant from all nodes in the tree, including root.
901+
/// Used for mesh eviction propagation — when a remote node reports
902+
/// that a worker evicted all its cached prefixes.
903+
pub fn remove_tenant_all(&self, tenant_id: &TenantId) {
904+
// collect_tenant_nodes skips root (root is never LRU-evicted),
905+
// but global removal must include it.
906+
self.remove_tenant_from_node(&self.root, tenant_id);
907+
908+
let mut nodes: Vec<(NodeRef, u64)> = Vec::new();
909+
self.collect_tenant_nodes(&self.root, tenant_id, &mut nodes);
910+
for (node, _) in &nodes {
911+
self.remove_tenant_from_node(node, tenant_id);
912+
}
913+
self.tenant_char_count.remove(tenant_id);
914+
}
915+
916+
/// Evict a specific number of chars for a tenant using LRU ordering.
901917
fn evict_tenant_entries(&self, tenant_id: &TenantId, chars_to_evict: usize) {
902918
if chars_to_evict == 0 {
903919
return;

crates/mesh/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ categories = ["network-programming", "data-structures"]
1515
[dependencies]
1616
# Workspace dependencies
1717
anyhow.workspace = true
18+
blake3.workspace = true
19+
lz4_flex.workspace = true
1820
async-trait.workspace = true
1921
axum.workspace = true
2022
chrono.workspace = true

crates/mesh/src/controller.rs

Lines changed: 155 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ impl MeshController {
122122
};
123123
cnt += 1;
124124

125-
// Checkpoint pending tree ops every 10 rounds (~10s) to bound
126-
// the pending buffer size. With 20k-char prompts at 500 rps,
127-
// 60-round intervals accumulated ~300 MB in pending.
125+
// Checkpoint tree state every 10 rounds (~10s) by exporting
126+
// the live radix tree from CacheAwarePolicy into tree_configs.
127+
// This keeps the periodic structure snapshot fresh.
128128
if cnt.is_multiple_of(10) {
129129
self.sync_manager.checkpoint_tree_states();
130130
}
@@ -146,6 +146,53 @@ impl MeshController {
146146
self.stores.membership.len(),
147147
self.stores.app.len(),
148148
);
149+
150+
// Log all mesh data structure sizes for memory debugging.
151+
let tree_configs_bytes: usize = self
152+
.stores
153+
.tree_configs
154+
.iter()
155+
.map(|e| e.value().len())
156+
.sum();
157+
let tenant_inserts: usize = self
158+
.stores
159+
.tenant_delta_inserts
160+
.iter()
161+
.map(|e| e.value().len())
162+
.sum();
163+
let tenant_evictions: usize = self
164+
.stores
165+
.tenant_delta_evictions
166+
.iter()
167+
.map(|e| e.value().len())
168+
.sum();
169+
let tree_ops_pending: usize = self
170+
.stores
171+
.tree_ops_pending
172+
.iter()
173+
.map(|e| e.value().len())
174+
.sum();
175+
log::info!(
176+
"Mesh memory: tree_configs={} entries ({} bytes), tree_versions={}, \
177+
tenant_inserts={}, tenant_evictions={}, tree_ops_pending={}, \
178+
policy_crdt={}, worker_crdt={}",
179+
self.stores.tree_configs.len(),
180+
tree_configs_bytes,
181+
self.stores.tree_versions.len(),
182+
tenant_inserts,
183+
tenant_evictions,
184+
tree_ops_pending,
185+
self.stores.policy.len(),
186+
self.stores.worker.len(),
187+
);
188+
189+
// Log CRDT policy store operation log length for memory debugging
190+
let policy_oplog_len = self.stores.policy.get_operation_log().len();
191+
log::info!(
192+
policy_oplog_len,
193+
"GC: CRDT policy store operation log length"
194+
);
195+
149196
// Clean up retry managers for peers no longer in cluster state
150197
retry_managers.retain(|peer_name, _| map.contains_key(peer_name));
151198
}
@@ -366,6 +413,13 @@ impl MeshController {
366413
) -> tokio::task::JoinHandle<()> {
367414
let stores = self.stores.clone();
368415
let sync_manager = self.sync_manager.clone();
416+
let sync_connections = self.sync_connections.clone();
417+
418+
// Log connection lifecycle: spawn
419+
log::debug!(
420+
peer = %peer_name,
421+
"spawn_sync_stream_handler called — spawning handler task"
422+
);
369423

370424
// Create a span for the spawned task
371425
let span = tracing::info_span!(
@@ -378,7 +432,13 @@ impl MeshController {
378432
async move {
379433
use tokio_stream::StreamExt;
380434

381-
log::info!("Sync stream handler started for peer {}", peer_name);
435+
// Log active connection count at handler start
436+
let active_connections = sync_connections.lock().await.len();
437+
log::debug!(
438+
peer = %peer_name,
439+
active_connections,
440+
"Sync stream handler started"
441+
);
382442

383443
let sequence = Arc::new(AtomicU64::new(0));
384444

@@ -404,12 +464,15 @@ impl MeshController {
404464
stores.clone(),
405465
self_name.clone(),
406466
));
467+
log::debug!(
468+
peer = %peer_name,
469+
"IncrementalUpdateCollector created"
470+
);
407471
let tx_incremental = tx.clone();
408472
let self_name_incremental = self_name.clone();
409473
let peer_name_incremental = peer_name.clone();
410474
let shared_sequence = sequence.clone();
411475
let size_validator = MessageSizeValidator::default();
412-
let stores_for_trim = stores.clone();
413476

414477
#[expect(clippy::disallowed_methods, reason = "incremental sender handle is stored and aborted when the parent sync_stream handler exits")]
415478
tokio::spawn(async move {
@@ -452,26 +515,13 @@ impl MeshController {
452515
e,
453516
size_validator.max_size()
454517
);
455-
// For tree deltas, do NOT mark as sent — skip this
456-
// round and let the pending buffer trim in mark_sent
457-
// eventually reduce the size. For other stores,
458-
// mark_sent prevents an infinite retry loop (PR #808).
459-
let is_tree_delta =
518+
// Mark non-tree stores as sent to prevent infinite
519+
// retry loops (PR #808). Tree updates (tenant deltas,
520+
// structure snapshots) are retried next round with
521+
// updated data from the live tree.
522+
let is_tree_update =
460523
updates.iter().any(|u| u.key.starts_with("tree:"));
461-
if is_tree_delta {
462-
// Force trim the pending buffer to reduce size for next round.
463-
for u in updates {
464-
if u.key.starts_with("tree:") {
465-
if let Some(mut pending) = stores_for_trim.tree_ops_pending.get_mut(&u.key) {
466-
let len = pending.len();
467-
if len > 100 {
468-
pending.drain(..len / 2);
469-
log::info!("Force-trimmed oversized tree pending buffer for {}: {} -> {}", u.key, len, pending.len());
470-
}
471-
}
472-
}
473-
}
474-
} else {
524+
if !is_tree_update {
475525
collector.mark_sent(*store_type, updates);
476526
}
477527
continue;
@@ -631,6 +681,83 @@ impl MeshController {
631681
let actor = Some(state_update.actor.clone());
632682

633683
if policy_state.policy_type
684+
== "tenant_delta"
685+
{
686+
// Lightweight tenant delta — no tree structure, no prompt text
687+
match super::tree_ops::TenantDelta::from_bytes(
688+
&policy_state.config,
689+
) {
690+
Ok(delta) => {
691+
sync_manager
692+
.apply_remote_tenant_delta(
693+
delta, actor,
694+
);
695+
}
696+
Err(e) => {
697+
log::warn!(
698+
"Failed to deserialize tenant delta for model {}: {e}",
699+
policy_state.model_id
700+
);
701+
}
702+
}
703+
} else if policy_state.policy_type
704+
== "tree_state_lz4"
705+
{
706+
// LZ4-compressed snapshot (TreeState or TreeSnapshot bytes)
707+
match super::tree_ops::lz4_decompress(
708+
&policy_state.config,
709+
) {
710+
Ok(decompressed) => {
711+
// Try TreeState first (backward compat)
712+
if let Ok(tree_state) =
713+
super::tree_ops::TreeState::from_bytes(
714+
&decompressed,
715+
)
716+
{
717+
sync_manager
718+
.apply_remote_tree_operation(
719+
policy_state
720+
.model_id
721+
.clone(),
722+
tree_state,
723+
actor,
724+
);
725+
} else if let Ok(snap) =
726+
kv_index::snapshot::TreeSnapshot::from_bytes(
727+
&decompressed,
728+
)
729+
{
730+
let tree_state =
731+
super::tree_ops::TreeState::from_snapshot(
732+
policy_state
733+
.model_id
734+
.clone(),
735+
&snap,
736+
policy_state.version,
737+
);
738+
sync_manager
739+
.apply_remote_tree_operation(
740+
policy_state
741+
.model_id
742+
.clone(),
743+
tree_state,
744+
actor,
745+
);
746+
} else {
747+
log::warn!(
748+
"Failed to deserialize tree_state_lz4 payload for model {}",
749+
policy_state.model_id
750+
);
751+
}
752+
}
753+
Err(e) => {
754+
log::warn!(
755+
"Failed to LZ4-decompress tree state for model {}: {e}",
756+
policy_state.model_id
757+
);
758+
}
759+
}
760+
} else if policy_state.policy_type
634761
== "tree_state_delta"
635762
{
636763
// Delta: apply only the new operations
@@ -880,7 +1007,10 @@ impl MeshController {
8801007

8811008
incremental_sender_handle.abort();
8821009
let _ = incremental_sender_handle.await;
883-
log::info!("Sync stream handler stopped for peer {}", peer_name);
1010+
log::debug!(
1011+
peer = %peer_name,
1012+
"sync_stream_handler exited — handler dropped"
1013+
);
8841014
}
8851015
.instrument(span),
8861016
)

0 commit comments

Comments
 (0)