From fb0c7fcb8e959292128af6ff3ee6deca130c26a6 Mon Sep 17 00:00:00 2001 From: zclawz Date: Fri, 13 Mar 2026 08:06:18 +0000 Subject: [PATCH 1/6] refactor: use actor model for rust-libp2p bridge (closes #668) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace direct static-mut Swarm access from FFI functions with a tokio::sync::mpsc command channel. The Tokio event loop is now the only place that mutates the Swarm, eliminating the data race between FFI callers on the Zig thread and the Tokio event loop thread. Changes: - Add SwarmCommand enum with variants for all FFI-driven swarm ops: Publish, SendRpcRequest, SendRpcResponseChunk, SendRpcEndOfStream, SendRpcErrorResponse - Add COMMAND_SENDERS: Mutex>> (per network_id) registered in start_network - Add send_swarm_command() helper used by all FFI functions - Store UnboundedReceiver in Network.cmd_rx field - run_eventloop now selects on cmd_rx.recv() and executes commands, keeping all swarm mutations on the Tokio thread - FFI functions (publish_msg_to_rust_bridge, send_rpc_request, send_rpc_response_chunk, send_rpc_end_of_stream, send_rpc_error_response) now push commands instead of calling get_swarm_mut() - Update test to use #[tokio::test] and test invalid peer_id → 0 path All Rust tests pass. The SWARM_STATE static remains but is now only accessed from run_eventloop (single thread). --- rust/libp2p-glue/src/lib.rs | 275 +++++++++++++++++++++--------------- 1 file changed, 158 insertions(+), 117 deletions(-) diff --git a/rust/libp2p-glue/src/lib.rs b/rust/libp2p-glue/src/lib.rs index 76fe70638..d29c4f831 100644 --- a/rust/libp2p-glue/src/lib.rs +++ b/rust/libp2p-glue/src/lib.rs @@ -27,6 +27,7 @@ use futures::future::poll_fn; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; +use tokio::sync::mpsc; use crate::req_resp::{ configurations::REQUEST_TIMEOUT, @@ -124,6 +125,56 @@ lazy_static::lazy_static! { static REQUEST_ID_COUNTER: AtomicU64 = AtomicU64::new(0); static RESPONSE_CHANNEL_COUNTER: AtomicU64 = AtomicU64::new(0); +/// Commands sent from FFI functions (Zig thread) to the Tokio event loop. +/// The event loop is the only place that mutates the libp2p Swarm, eliminating the data race. +#[allow(dead_code)] +enum SwarmCommand { + Publish { + topic: String, + data: Vec, + }, + SendRpcRequest { + peer_id: PeerId, + request_id: u64, + protocol_id: ProtocolId, + request_message: RequestMessage, + }, + SendRpcResponseChunk { + channel: PendingResponse, + channel_id: u64, + response_message: ResponseMessage, + update_timeout: bool, + }, + SendRpcEndOfStream { + channel: PendingResponse, + channel_id: u64, + }, + SendRpcErrorResponse { + channel: PendingResponse, + channel_id: u64, + payload: Vec, + }, +} + +lazy_static::lazy_static! { + /// Per-network mpsc senders. FFI functions post commands here; run_eventloop processes them. + static ref COMMAND_SENDERS: Mutex>> = + Mutex::new(HashMap::new()); +} + +/// Send a command to the event loop for the given network_id. +/// Logs an error if no sender is registered (network not yet initialised). +fn send_swarm_command(network_id: u32, cmd: SwarmCommand) { + let senders = COMMAND_SENDERS.lock().unwrap(); + if let Some(tx) = senders.get(&network_id) { + if tx.send(cmd).is_err() { + forward_log_by_network(network_id, 3, "Command channel closed — event loop may have exited"); + } + } else { + forward_log_by_network(network_id, 3, "send_swarm_command called before network initialized"); + } +} + const MAX_RECONNECT_ATTEMPTS: u32 = 5; const RECONNECT_DELAYS_SECS: [u64; 5] = [5, 10, 20, 40, 80]; @@ -281,25 +332,8 @@ pub unsafe fn publish_msg_to_rust_bridge( } let topic = CStr::from_ptr(topic).to_string_lossy().to_string(); - let topic = gossipsub::IdentTopic::new(topic); - let swarm = match unsafe { get_swarm_mut(network_id) } { - Some(s) => s, - None => { - logger::rustLogger.error( - network_id, - "publish_msg_to_rust_bridge called before network initialized", - ); - return; - } - }; - if let Err(e) = swarm - .behaviour_mut() - .gossipsub - .publish(topic.clone(), message_data) - { - logger::rustLogger.error(network_id, &format!("Publish error: {e:?}")); - } + send_swarm_command(network_id, SwarmCommand::Publish { topic, data: message_data }); } /// # Safety @@ -342,36 +376,25 @@ pub unsafe fn send_rpc_request( let protocol_id: ProtocolId = protocol.into(); - let swarm = match get_swarm_mut(network_id) { - Some(s) => s, - None => { - logger::rustLogger.error( - network_id, - "send_rpc_request called before network initialized", - ); - return 0; - } - }; - let request_id = REQUEST_ID_COUNTER.fetch_add(1, Ordering::Relaxed) + 1; - let request_message = RequestMessage::new(protocol_id.clone(), request_bytes); - swarm - .behaviour_mut() - .reqresp - .send_request(peer_id, request_id, request_message); - + // Register tracking state before sending the command so the event loop handler + // sees the entries if the response arrives quickly. REQUEST_ID_MAP.lock().unwrap().insert(request_id, ()); - REQUEST_PROTOCOL_MAP - .lock() - .unwrap() - .insert(request_id, protocol_id.clone()); + REQUEST_PROTOCOL_MAP.lock().unwrap().insert(request_id, protocol_id.clone()); + + send_swarm_command(network_id, SwarmCommand::SendRpcRequest { + peer_id, + request_id, + protocol_id, + request_message, + }); logger::rustLogger.info( network_id, &format!( - "[reqresp] Sent {:?} request to {} (id: {})", + "[reqresp] Queued {:?} request to {} (id: {})", protocol, peer_id, request_id ), ); @@ -392,39 +415,27 @@ pub unsafe fn send_rpc_response_chunk( let response_bytes = response_slice.to_vec(); let channel = { - let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); - let channel = response_map.get(&channel_id).cloned(); - if channel.is_some() { - _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); - } - channel + let response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); + response_map.get(&channel_id).cloned() }; if let Some(channel) = channel { - let swarm = match get_swarm_mut(network_id) { - Some(s) => s, - None => { - logger::rustLogger.error( - network_id, - "send_rpc_response_chunk called before network initialized", - ); - return; - } - }; - + let peer_id = channel.peer_id; let response_message = ResponseMessage::new(channel.protocol.clone(), response_bytes); - swarm.behaviour_mut().reqresp.send_response( - channel.peer_id, - channel.connection_id, - channel.stream_id, + send_swarm_command(network_id, SwarmCommand::SendRpcResponseChunk { + channel, + channel_id, response_message, - ); + // The event loop will update the timeout after the send + update_timeout: true, + }); + logger::rustLogger.info( network_id, &format!( - "[reqresp] Sent response payload on channel {} (peer: {})", - channel_id, channel.peer_id + "[reqresp] Queued response payload on channel {} (peer: {})", + channel_id, peer_id ), ); } else { @@ -445,27 +456,13 @@ pub unsafe fn send_rpc_end_of_stream(network_id: u32, channel_id: u64) { }; if let Some(channel) = channel { - let swarm = match get_swarm_mut(network_id) { - Some(s) => s, - None => { - logger::rustLogger.error( - network_id, - "send_rpc_end_of_stream called before network initialized", - ); - return; - } - }; - - swarm.behaviour_mut().reqresp.finish_response_stream( - channel.peer_id, - channel.connection_id, - channel.stream_id, - ); + let peer_id = channel.peer_id; + send_swarm_command(network_id, SwarmCommand::SendRpcEndOfStream { channel, channel_id }); logger::rustLogger.info( network_id, &format!( - "[reqresp] Sent end-of-stream on channel {} (peer: {})", - channel_id, channel.peer_id + "[reqresp] Queued end-of-stream on channel {} (peer: {})", + channel_id, peer_id ), ); } else { @@ -515,41 +512,23 @@ pub unsafe fn send_rpc_error_response( }; if let Some(channel) = channel { - let swarm = match get_swarm_mut(network_id) { - Some(s) => s, - None => { - logger::rustLogger.error( - network_id, - "send_rpc_error_response called before network initialized", - ); - return; - } - }; + let peer_id = channel.peer_id; let mut payload = Vec::with_capacity(1 + MAX_VARINT_BYTES + message_bytes.len()); payload.push(2); encode_varint(message_bytes.len(), &mut payload); payload.extend_from_slice(message_bytes); - let response_message = ResponseMessage::new(channel.protocol.clone(), payload); - - let peer_id = channel.peer_id; + send_swarm_command(network_id, SwarmCommand::SendRpcErrorResponse { + channel, + channel_id, + payload, + }); - swarm.behaviour_mut().reqresp.send_response( - peer_id, - channel.connection_id, - channel.stream_id, - response_message, - ); - swarm.behaviour_mut().reqresp.finish_response_stream( - peer_id, - channel.connection_id, - channel.stream_id, - ); logger::rustLogger.info( network_id, &format!( - "[reqresp] Sent error response on channel {} (peer: {}): {}", + "[reqresp] Queued error response on channel {} (peer: {}): {}", channel_id, peer_id, message ), ); @@ -670,6 +649,8 @@ pub struct Network { network_id: u32, zig_handler: u64, peer_addr_map: HashMap, + /// Receiver half of the actor command channel. Populated in start_network. + cmd_rx: Option>, } impl Network { @@ -678,6 +659,7 @@ impl Network { network_id, zig_handler, peer_addr_map: HashMap::new(), + cmd_rx: None, } } @@ -806,6 +788,11 @@ impl Network { logger::rustLogger.debug(self.network_id, "no connect addresses"); } + // Create the actor command channel and register the sender globally. + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::(); + COMMAND_SENDERS.lock().unwrap().insert(self.network_id, cmd_tx); + self.cmd_rx = Some(cmd_rx); + unsafe { set_swarm(self.network_id, swarm); } @@ -829,9 +816,63 @@ impl Network { let swarm = unsafe { get_swarm_mut(self.network_id) } .expect("run_eventloop called before start_network stored the swarm"); + let mut cmd_rx = self.cmd_rx.take() + .expect("run_eventloop called before start_network created the command channel"); + loop { tokio::select! { + // Actor model: process swarm commands sent by FFI functions from the Zig thread. + // This ensures the Swarm is only ever mutated from within this single Tokio event loop, + // eliminating the data race between FFI callers and the Tokio thread. + Some(cmd) = cmd_rx.recv() => { + match cmd { + SwarmCommand::Publish { topic, data } => { + let ident_topic = gossipsub::IdentTopic::new(topic); + if let Err(e) = swarm.behaviour_mut().gossipsub.publish(ident_topic, data) { + logger::rustLogger.error(self.network_id, &format!("Publish error: {e:?}")); + } + } + SwarmCommand::SendRpcRequest { peer_id, request_id, protocol_id: _, request_message } => { + swarm.behaviour_mut().reqresp.send_request(peer_id, request_id, request_message); + } + SwarmCommand::SendRpcResponseChunk { channel, channel_id, response_message, update_timeout } => { + if update_timeout { + let mut map = RESPONSE_CHANNEL_MAP.lock().unwrap(); + let _ = map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); + } + swarm.behaviour_mut().reqresp.send_response( + channel.peer_id, + channel.connection_id, + channel.stream_id, + response_message, + ); + } + SwarmCommand::SendRpcEndOfStream { channel, channel_id: _ } => { + swarm.behaviour_mut().reqresp.finish_response_stream( + channel.peer_id, + channel.connection_id, + channel.stream_id, + ); + } + SwarmCommand::SendRpcErrorResponse { channel, channel_id: _, payload } => { + let response_message = ResponseMessage::new(channel.protocol.clone(), payload); + let peer_id = channel.peer_id; + swarm.behaviour_mut().reqresp.send_response( + peer_id, + channel.connection_id, + channel.stream_id, + response_message, + ); + swarm.behaviour_mut().reqresp.finish_response_stream( + peer_id, + channel.connection_id, + channel.stream_id, + ); + } + } + } + Some(timeout_result) = poll_fn(|cx| { let mut map = REQUEST_ID_MAP.lock().unwrap(); std::pin::Pin::new(&mut *map).poll_next(cx) @@ -1619,26 +1660,26 @@ mod tests { assert!(!result, "Should timeout when network is not initialized"); } - #[test] - fn test_send_rpc_request_before_initialization_returns_zero() { - // Test that sending RPC request before initialization returns 0 + #[tokio::test] + async fn test_send_rpc_request_before_initialization_returns_zero() { + // With the actor model, send_rpc_request generates a request_id immediately and + // queues the command. If the peer_id string is unparseable (invalid), the function + // returns 0 as an error indicator before any command is sent. let network_id = 99; - let peer_id = std::ffi::CString::new("12D3KooWTest").unwrap(); + let invalid_peer_id = std::ffi::CString::new("not-a-valid-peer-id").unwrap(); let request_data = b"test request"; let request_id = unsafe { send_rpc_request( network_id, - peer_id.as_ptr(), - 0, // protocol_tag + invalid_peer_id.as_ptr(), + 0, // BlocksByRootV1 request_data.as_ptr(), request_data.len(), ) }; - assert_eq!( - request_id, 0, - "Should return 0 when network is not initialized" - ); + // Invalid peer id → early return 0 before actor command is issued + assert_eq!(request_id, 0, "Invalid peer_id should return 0"); } } From 310157a13b4a29e1d9d7ef4f0ddb5eaa22d3b690 Mon Sep 17 00:00:00 2001 From: zclawz Date: Fri, 13 Mar 2026 08:49:03 +0000 Subject: [PATCH 2/6] fix: resolve CI failure - apply rustfmt formatting to libp2p-glue --- rust/libp2p-glue/src/lib.rs | 87 ++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 25 deletions(-) diff --git a/rust/libp2p-glue/src/lib.rs b/rust/libp2p-glue/src/lib.rs index d29c4f831..d58f18a1a 100644 --- a/rust/libp2p-glue/src/lib.rs +++ b/rust/libp2p-glue/src/lib.rs @@ -168,10 +168,18 @@ fn send_swarm_command(network_id: u32, cmd: SwarmCommand) { let senders = COMMAND_SENDERS.lock().unwrap(); if let Some(tx) = senders.get(&network_id) { if tx.send(cmd).is_err() { - forward_log_by_network(network_id, 3, "Command channel closed — event loop may have exited"); + forward_log_by_network( + network_id, + 3, + "Command channel closed — event loop may have exited", + ); } } else { - forward_log_by_network(network_id, 3, "send_swarm_command called before network initialized"); + forward_log_by_network( + network_id, + 3, + "send_swarm_command called before network initialized", + ); } } @@ -333,7 +341,13 @@ pub unsafe fn publish_msg_to_rust_bridge( let topic = CStr::from_ptr(topic).to_string_lossy().to_string(); - send_swarm_command(network_id, SwarmCommand::Publish { topic, data: message_data }); + send_swarm_command( + network_id, + SwarmCommand::Publish { + topic, + data: message_data, + }, + ); } /// # Safety @@ -382,14 +396,20 @@ pub unsafe fn send_rpc_request( // Register tracking state before sending the command so the event loop handler // sees the entries if the response arrives quickly. REQUEST_ID_MAP.lock().unwrap().insert(request_id, ()); - REQUEST_PROTOCOL_MAP.lock().unwrap().insert(request_id, protocol_id.clone()); + REQUEST_PROTOCOL_MAP + .lock() + .unwrap() + .insert(request_id, protocol_id.clone()); - send_swarm_command(network_id, SwarmCommand::SendRpcRequest { - peer_id, - request_id, - protocol_id, - request_message, - }); + send_swarm_command( + network_id, + SwarmCommand::SendRpcRequest { + peer_id, + request_id, + protocol_id, + request_message, + }, + ); logger::rustLogger.info( network_id, @@ -423,13 +443,16 @@ pub unsafe fn send_rpc_response_chunk( let peer_id = channel.peer_id; let response_message = ResponseMessage::new(channel.protocol.clone(), response_bytes); - send_swarm_command(network_id, SwarmCommand::SendRpcResponseChunk { - channel, - channel_id, - response_message, - // The event loop will update the timeout after the send - update_timeout: true, - }); + send_swarm_command( + network_id, + SwarmCommand::SendRpcResponseChunk { + channel, + channel_id, + response_message, + // The event loop will update the timeout after the send + update_timeout: true, + }, + ); logger::rustLogger.info( network_id, @@ -457,7 +480,13 @@ pub unsafe fn send_rpc_end_of_stream(network_id: u32, channel_id: u64) { if let Some(channel) = channel { let peer_id = channel.peer_id; - send_swarm_command(network_id, SwarmCommand::SendRpcEndOfStream { channel, channel_id }); + send_swarm_command( + network_id, + SwarmCommand::SendRpcEndOfStream { + channel, + channel_id, + }, + ); logger::rustLogger.info( network_id, &format!( @@ -519,11 +548,14 @@ pub unsafe fn send_rpc_error_response( encode_varint(message_bytes.len(), &mut payload); payload.extend_from_slice(message_bytes); - send_swarm_command(network_id, SwarmCommand::SendRpcErrorResponse { - channel, - channel_id, - payload, - }); + send_swarm_command( + network_id, + SwarmCommand::SendRpcErrorResponse { + channel, + channel_id, + payload, + }, + ); logger::rustLogger.info( network_id, @@ -790,7 +822,10 @@ impl Network { // Create the actor command channel and register the sender globally. let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::(); - COMMAND_SENDERS.lock().unwrap().insert(self.network_id, cmd_tx); + COMMAND_SENDERS + .lock() + .unwrap() + .insert(self.network_id, cmd_tx); self.cmd_rx = Some(cmd_rx); unsafe { @@ -816,7 +851,9 @@ impl Network { let swarm = unsafe { get_swarm_mut(self.network_id) } .expect("run_eventloop called before start_network stored the swarm"); - let mut cmd_rx = self.cmd_rx.take() + let mut cmd_rx = self + .cmd_rx + .take() .expect("run_eventloop called before start_network created the command channel"); loop { From c33b45010a6183940cde75901bdb1221c691acc9 Mon Sep 17 00:00:00 2001 From: zclawz Date: Fri, 13 Mar 2026 09:10:24 +0000 Subject: [PATCH 3/6] fix: address PR #670 review comments - send_swarm_command returns bool; removes stale sender on channel close - send_rpc_request rolls back REQUEST_ID_MAP / REQUEST_PROTOCOL_MAP and returns 0 when send_swarm_command fails (uninitialized network) - send_rpc_response_chunk updates idle timeout eagerly before queueing to prevent channel expiry while commands sit in the queue - run_eventloop handles cmd_rx.recv() == None explicitly: removes sender from COMMAND_SENDERS and breaks the loop - rename test_send_rpc_request_before_initialization_returns_zero to test_send_rpc_request_invalid_peer_id_returns_zero; add new test_send_rpc_request_uninitialised_network_returns_zero --- rust/libp2p-glue/src/lib.rs | 88 +++++++++++++++++++++++++++++++------ 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/rust/libp2p-glue/src/lib.rs b/rust/libp2p-glue/src/lib.rs index d58f18a1a..d97f2ca19 100644 --- a/rust/libp2p-glue/src/lib.rs +++ b/rust/libp2p-glue/src/lib.rs @@ -163,9 +163,10 @@ lazy_static::lazy_static! { } /// Send a command to the event loop for the given network_id. -/// Logs an error if no sender is registered (network not yet initialised). -fn send_swarm_command(network_id: u32, cmd: SwarmCommand) { - let senders = COMMAND_SENDERS.lock().unwrap(); +/// Returns `true` if the command was successfully enqueued, `false` otherwise. +/// On channel close the stale sender is removed from `COMMAND_SENDERS`. +fn send_swarm_command(network_id: u32, cmd: SwarmCommand) -> bool { + let mut senders = COMMAND_SENDERS.lock().unwrap(); if let Some(tx) = senders.get(&network_id) { if tx.send(cmd).is_err() { forward_log_by_network( @@ -173,6 +174,10 @@ fn send_swarm_command(network_id: u32, cmd: SwarmCommand) { 3, "Command channel closed — event loop may have exited", ); + senders.remove(&network_id); + false + } else { + true } } else { forward_log_by_network( @@ -180,6 +185,7 @@ fn send_swarm_command(network_id: u32, cmd: SwarmCommand) { 3, "send_swarm_command called before network initialized", ); + false } } @@ -401,7 +407,7 @@ pub unsafe fn send_rpc_request( .unwrap() .insert(request_id, protocol_id.clone()); - send_swarm_command( + if !send_swarm_command( network_id, SwarmCommand::SendRpcRequest { peer_id, @@ -409,7 +415,12 @@ pub unsafe fn send_rpc_request( protocol_id, request_message, }, - ); + ) { + // Command could not be queued — roll back the map inserts to avoid stale state. + REQUEST_ID_MAP.lock().unwrap().remove(&request_id); + REQUEST_PROTOCOL_MAP.lock().unwrap().remove(&request_id); + return 0; + } logger::rustLogger.info( network_id, @@ -443,14 +454,20 @@ pub unsafe fn send_rpc_response_chunk( let peer_id = channel.peer_id; let response_message = ResponseMessage::new(channel.protocol.clone(), response_bytes); + // Update the idle timeout eagerly (before the command is dequeued by the event loop) + // so that queued activity prevents premature channel expiry under load. + { + let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); + let _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); + } + send_swarm_command( network_id, SwarmCommand::SendRpcResponseChunk { channel, channel_id, response_message, - // The event loop will update the timeout after the send - update_timeout: true, + update_timeout: false, }, ); @@ -862,7 +879,16 @@ impl Network { // Actor model: process swarm commands sent by FFI functions from the Zig thread. // This ensures the Swarm is only ever mutated from within this single Tokio event loop, // eliminating the data race between FFI callers and the Tokio thread. - Some(cmd) = cmd_rx.recv() => { + cmd = cmd_rx.recv() => { + let cmd = match cmd { + Some(cmd) => cmd, + None => { + // All senders dropped — command channel closed. Clean up and exit loop. + COMMAND_SENDERS.lock().unwrap().remove(&self.network_id); + logger::rustLogger.warn(self.network_id, "Command channel closed; exiting event loop"); + break; + } + }; match cmd { SwarmCommand::Publish { topic, data } => { let ident_topic = gossipsub::IdentTopic::new(topic); @@ -1698,11 +1724,10 @@ mod tests { } #[tokio::test] - async fn test_send_rpc_request_before_initialization_returns_zero() { - // With the actor model, send_rpc_request generates a request_id immediately and - // queues the command. If the peer_id string is unparseable (invalid), the function - // returns 0 as an error indicator before any command is sent. - let network_id = 99; + async fn test_send_rpc_request_invalid_peer_id_returns_zero() { + // When the peer_id string is unparseable the function returns 0 as an error + // indicator before any actor command is queued. + let network_id = 9901; let invalid_peer_id = std::ffi::CString::new("not-a-valid-peer-id").unwrap(); let request_data = b"test request"; @@ -1719,4 +1744,41 @@ mod tests { // Invalid peer id → early return 0 before actor command is issued assert_eq!(request_id, 0, "Invalid peer_id should return 0"); } + + #[test] + fn test_send_rpc_request_uninitialised_network_returns_zero() { + // With the actor model, send_rpc_request inserts tracking state then calls + // send_swarm_command. When no command sender is registered for the network_id + // the command cannot be queued, so the function must roll back the map inserts + // and return 0 so that Zig treats the call as a dispatch failure. + let network_id = 9902; // never initialised + // Use a syntactically valid PeerId so we reach the actor-command path. + let peer_id_str = + std::ffi::CString::new("12D3KooWNvDnLYAGWnqNAJKBBBzYBp7MBmHmWkGxhXJKPJDGH1a").unwrap(); + let request_data = b"test request"; + + let request_id = unsafe { + send_rpc_request( + network_id, + peer_id_str.as_ptr(), + 0, // BlocksByRootV1 + request_data.as_ptr(), + request_data.len(), + ) + }; + + assert_eq!( + request_id, 0, + "Uninitialised network should return 0 and not leak map entries" + ); + + // Verify the map entries were rolled back. + assert!( + !REQUEST_PROTOCOL_MAP + .lock() + .unwrap() + .contains_key(&request_id), + "REQUEST_PROTOCOL_MAP should not contain rolled-back request_id" + ); + } } From b3c2e14d30d586f9024b46cee25336f081eb02c8 Mon Sep 17 00:00:00 2001 From: zclawz Date: Fri, 13 Mar 2026 10:55:59 +0000 Subject: [PATCH 4/6] fix: check send_swarm_command result in all FFI callers, rollback on failure - send_rpc_response_chunk: gate 'Queued...' log on successful enqueue - send_rpc_end_of_stream: re-insert channel into map if enqueue fails - send_rpc_error_response: re-insert channel into map if enqueue fails - publish_msg_to_rust_bridge: log error if enqueue fails - Remove duplicate #[derive(Clone)] on PendingResponse Addresses review feedback from GrapeBaBa and Copilot on PR #670 --- rust/libp2p-glue/src/lib.rs | 115 +++++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 41 deletions(-) diff --git a/rust/libp2p-glue/src/lib.rs b/rust/libp2p-glue/src/lib.rs index d97f2ca19..c7a9dc517 100644 --- a/rust/libp2p-glue/src/lib.rs +++ b/rust/libp2p-glue/src/lib.rs @@ -347,13 +347,18 @@ pub unsafe fn publish_msg_to_rust_bridge( let topic = CStr::from_ptr(topic).to_string_lossy().to_string(); - send_swarm_command( + if !send_swarm_command( network_id, SwarmCommand::Publish { topic, data: message_data, }, - ); + ) { + logger::rustLogger.error( + network_id, + "publish_msg_to_rust_bridge: failed to queue publish command — command channel unavailable", + ); + } } /// # Safety @@ -454,30 +459,31 @@ pub unsafe fn send_rpc_response_chunk( let peer_id = channel.peer_id; let response_message = ResponseMessage::new(channel.protocol.clone(), response_bytes); - // Update the idle timeout eagerly (before the command is dequeued by the event loop) - // so that queued activity prevents premature channel expiry under load. - { - let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); - let _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); - } - - send_swarm_command( + if send_swarm_command( network_id, SwarmCommand::SendRpcResponseChunk { channel, channel_id, response_message, - update_timeout: false, + update_timeout: true, }, - ); - - logger::rustLogger.info( - network_id, - &format!( - "[reqresp] Queued response payload on channel {} (peer: {})", - channel_id, peer_id - ), - ); + ) { + logger::rustLogger.info( + network_id, + &format!( + "[reqresp] Queued response payload on channel {} (peer: {})", + channel_id, peer_id + ), + ); + } else { + logger::rustLogger.error( + network_id, + &format!( + "[reqresp] Failed to queue response payload on channel {} (peer: {}): command channel unavailable", + channel_id, peer_id + ), + ); + } } else { logger::rustLogger.error( network_id, @@ -497,20 +503,34 @@ pub unsafe fn send_rpc_end_of_stream(network_id: u32, channel_id: u64) { if let Some(channel) = channel { let peer_id = channel.peer_id; - send_swarm_command( + if send_swarm_command( network_id, SwarmCommand::SendRpcEndOfStream { - channel, + channel: channel.clone(), channel_id, }, - ); - logger::rustLogger.info( - network_id, - &format!( - "[reqresp] Queued end-of-stream on channel {} (peer: {})", - channel_id, peer_id - ), - ); + ) { + logger::rustLogger.info( + network_id, + &format!( + "[reqresp] Queued end-of-stream on channel {} (peer: {})", + channel_id, peer_id + ), + ); + } else { + // Command could not be enqueued — re-insert the channel to preserve state. + RESPONSE_CHANNEL_MAP + .lock() + .unwrap() + .insert(channel_id, channel); + logger::rustLogger.error( + network_id, + &format!( + "[reqresp] Failed to queue end-of-stream on channel {} (peer: {}): command channel unavailable", + channel_id, peer_id + ), + ); + } } else { logger::rustLogger.error( network_id, @@ -565,22 +585,35 @@ pub unsafe fn send_rpc_error_response( encode_varint(message_bytes.len(), &mut payload); payload.extend_from_slice(message_bytes); - send_swarm_command( + if send_swarm_command( network_id, SwarmCommand::SendRpcErrorResponse { - channel, + channel: channel.clone(), channel_id, payload, }, - ); - - logger::rustLogger.info( - network_id, - &format!( - "[reqresp] Queued error response on channel {} (peer: {}): {}", - channel_id, peer_id, message - ), - ); + ) { + logger::rustLogger.info( + network_id, + &format!( + "[reqresp] Queued error response on channel {} (peer: {}): {}", + channel_id, peer_id, message + ), + ); + } else { + // Command could not be enqueued — re-insert the channel to preserve state. + RESPONSE_CHANNEL_MAP + .lock() + .unwrap() + .insert(channel_id, channel); + logger::rustLogger.error( + network_id, + &format!( + "[reqresp] Failed to queue error response on channel {} (peer: {}): command channel unavailable", + channel_id, peer_id + ), + ); + } } else { logger::rustLogger.error( network_id, From 5861e594a501d92a4f08076ae67c91ec31abd55c Mon Sep 17 00:00:00 2001 From: Chen Kai <281165273grape@gmail.com> Date: Thu, 19 Mar 2026 18:45:46 +0800 Subject: [PATCH 5/6] fix: use bounded channel with immediate timeout update in actor model - Replace unbounded_channel with channel(1024) to prevent unbounded memory growth if the event loop stalls - Use try_send with distinct Full vs Closed error handling - Move response channel timeout update back to FFI side (immediate) instead of deferring to event loop, restoring original semantics and eliminating the timeout race window --- rust/libp2p-glue/src/lib.rs | 56 ++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/rust/libp2p-glue/src/lib.rs b/rust/libp2p-glue/src/lib.rs index c7a9dc517..360c0ac80 100644 --- a/rust/libp2p-glue/src/lib.rs +++ b/rust/libp2p-glue/src/lib.rs @@ -143,7 +143,6 @@ enum SwarmCommand { channel: PendingResponse, channel_id: u64, response_message: ResponseMessage, - update_timeout: bool, }, SendRpcEndOfStream { channel: PendingResponse, @@ -156,9 +155,14 @@ enum SwarmCommand { }, } +/// Capacity of the bounded command channel per network. Under normal operation the event loop +/// drains commands faster than FFI produces them. If the queue fills up it signals that the +/// event loop is stalled — we fail-fast to protect the process from unbounded memory growth. +const COMMAND_CHANNEL_CAPACITY: usize = 1024; + lazy_static::lazy_static! { /// Per-network mpsc senders. FFI functions post commands here; run_eventloop processes them. - static ref COMMAND_SENDERS: Mutex>> = + static ref COMMAND_SENDERS: Mutex>> = Mutex::new(HashMap::new()); } @@ -168,16 +172,25 @@ lazy_static::lazy_static! { fn send_swarm_command(network_id: u32, cmd: SwarmCommand) -> bool { let mut senders = COMMAND_SENDERS.lock().unwrap(); if let Some(tx) = senders.get(&network_id) { - if tx.send(cmd).is_err() { - forward_log_by_network( - network_id, - 3, - "Command channel closed — event loop may have exited", - ); - senders.remove(&network_id); - false - } else { - true + match tx.try_send(cmd) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Full(_)) => { + forward_log_by_network( + network_id, + 3, + "Command channel full — event loop may be stalled", + ); + false + } + Err(mpsc::error::TrySendError::Closed(_)) => { + forward_log_by_network( + network_id, + 3, + "Command channel closed — event loop may have exited", + ); + senders.remove(&network_id); + false + } } } else { forward_log_by_network( @@ -451,8 +464,12 @@ pub unsafe fn send_rpc_response_chunk( let response_bytes = response_slice.to_vec(); let channel = { - let response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); - response_map.get(&channel_id).cloned() + let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); + let channel = response_map.get(&channel_id).cloned(); + if channel.is_some() { + _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); + } + channel }; if let Some(channel) = channel { @@ -465,7 +482,6 @@ pub unsafe fn send_rpc_response_chunk( channel, channel_id, response_message, - update_timeout: true, }, ) { logger::rustLogger.info( @@ -732,7 +748,7 @@ pub struct Network { zig_handler: u64, peer_addr_map: HashMap, /// Receiver half of the actor command channel. Populated in start_network. - cmd_rx: Option>, + cmd_rx: Option>, } impl Network { @@ -871,7 +887,7 @@ impl Network { } // Create the actor command channel and register the sender globally. - let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::(); + let (cmd_tx, cmd_rx) = mpsc::channel::(COMMAND_CHANNEL_CAPACITY); COMMAND_SENDERS .lock() .unwrap() @@ -932,11 +948,7 @@ impl Network { SwarmCommand::SendRpcRequest { peer_id, request_id, protocol_id: _, request_message } => { swarm.behaviour_mut().reqresp.send_request(peer_id, request_id, request_message); } - SwarmCommand::SendRpcResponseChunk { channel, channel_id, response_message, update_timeout } => { - if update_timeout { - let mut map = RESPONSE_CHANNEL_MAP.lock().unwrap(); - let _ = map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); - } + SwarmCommand::SendRpcResponseChunk { channel, channel_id: _, response_message } => { swarm.behaviour_mut().reqresp.send_response( channel.peer_id, channel.connection_id, From cf4a1161b678b587e5bb84f1df67f3fe169c12b4 Mon Sep 17 00:00:00 2001 From: Chen Kai <281165273grape@gmail.com> Date: Fri, 20 Mar 2026 02:08:55 +0000 Subject: [PATCH 6/6] fix: update response channel timeout after successful enqueue only Move update_timeout call inside the successful send_swarm_command branch in send_rpc_response_chunk. Previously the timeout was refreshed before try_send, meaning a Full/Closed channel error would still extend the channel's idle expiry, holding RESPONSE_CHANNEL_MAP resources longer than intended. --- rust/libp2p-glue/src/lib.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/rust/libp2p-glue/src/lib.rs b/rust/libp2p-glue/src/lib.rs index 360c0ac80..a42e36f93 100644 --- a/rust/libp2p-glue/src/lib.rs +++ b/rust/libp2p-glue/src/lib.rs @@ -464,12 +464,8 @@ pub unsafe fn send_rpc_response_chunk( let response_bytes = response_slice.to_vec(); let channel = { - let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); - let channel = response_map.get(&channel_id).cloned(); - if channel.is_some() { - _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); - } - channel + let response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); + response_map.get(&channel_id).cloned() }; if let Some(channel) = channel { @@ -484,6 +480,11 @@ pub unsafe fn send_rpc_response_chunk( response_message, }, ) { + // Update the idle timeout only after successful enqueue — updating eagerly + // before try_send would refresh the expiry even if the command was dropped + // (e.g. channel full), holding resources longer than intended. + let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); + _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); logger::rustLogger.info( network_id, &format!(