refactor: use actor model for rust-libp2p bridge#670
Conversation
Replace direct static-mut Swarm access from FFI functions with a tokio::sync::mpsc command channel. The Tokio event loop is now the only place that mutates the Swarm, eliminating the data race between FFI callers on the Zig thread and the Tokio event loop thread. Changes: - Add SwarmCommand enum with variants for all FFI-driven swarm ops: Publish, SendRpcRequest, SendRpcResponseChunk, SendRpcEndOfStream, SendRpcErrorResponse - Add COMMAND_SENDERS: Mutex<HashMap<u32, UnboundedSender<SwarmCommand>>> (per network_id) registered in start_network - Add send_swarm_command() helper used by all FFI functions - Store UnboundedReceiver<SwarmCommand> in Network.cmd_rx field - run_eventloop now selects on cmd_rx.recv() and executes commands, keeping all swarm mutations on the Tokio thread - FFI functions (publish_msg_to_rust_bridge, send_rpc_request, send_rpc_response_chunk, send_rpc_end_of_stream, send_rpc_error_response) now push commands instead of calling get_swarm_mut() - Update test to use #[tokio::test] and test invalid peer_id → 0 path All Rust tests pass. The SWARM_STATE static remains but is now only accessed from run_eventloop (single thread).
|
@codex review |
There was a problem hiding this comment.
Pull request overview
This PR refactors the Rust libp2p FFI bridge to an actor-model design, ensuring the Tokio event loop is the sole mutator of the libp2p Swarm to eliminate a cross-thread data race that could stall gossip.
Changes:
- Introduces
SwarmCommandplus a per-network_idcommand channel (COMMAND_SENDERS) andsend_swarm_command()helper. - Updates FFI entrypoints (publish + req/resp send APIs) to enqueue commands instead of directly mutating the swarm.
- Extends
Networkto own a command receiver and processes commands insiderun_eventloopviatokio::select!.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
rust/libp2p-glue/src/lib.rs
Outdated
| let request_id = REQUEST_ID_COUNTER.fetch_add(1, Ordering::Relaxed) + 1; | ||
|
|
||
| let request_message = RequestMessage::new(protocol_id.clone(), request_bytes); | ||
|
|
||
| swarm | ||
| .behaviour_mut() | ||
| .reqresp | ||
| .send_request(peer_id, request_id, request_message); | ||
|
|
||
| // Register tracking state before sending the command so the event loop handler | ||
| // sees the entries if the response arrives quickly. | ||
| REQUEST_ID_MAP.lock().unwrap().insert(request_id, ()); | ||
| REQUEST_PROTOCOL_MAP | ||
| .lock() | ||
| .unwrap() | ||
| .insert(request_id, protocol_id.clone()); | ||
| REQUEST_PROTOCOL_MAP.lock().unwrap().insert(request_id, protocol_id.clone()); | ||
|
|
||
| send_swarm_command(network_id, SwarmCommand::SendRpcRequest { | ||
| peer_id, | ||
| request_id, | ||
| protocol_id, | ||
| request_message, | ||
| }); |
There was a problem hiding this comment.
Good catch — fixed. Now checking the return value and logging a hard error if the command channel is not initialized. If the enqueue fails, we re-insert the channel back into RESPONSE_CHANNEL_MAP to preserve state.
rust/libp2p-glue/src/lib.rs
Outdated
| let senders = COMMAND_SENDERS.lock().unwrap(); | ||
| if let Some(tx) = senders.get(&network_id) { | ||
| if tx.send(cmd).is_err() { | ||
| forward_log_by_network(network_id, 3, "Command channel closed — event loop may have exited"); | ||
| } | ||
| } else { | ||
| forward_log_by_network(network_id, 3, "send_swarm_command called before network initialized"); |
There was a problem hiding this comment.
Fixed in b3c2e14. send_swarm_command returns bool and all FFI callers now check the result: send_rpc_request rolls back map state on failure, send_rpc_end_of_stream re-inserts the channel on failure, send_rpc_response_chunk/send_rpc_error_response log an error.
There was a problem hiding this comment.
Good catch — fixed. Now checking the return value and logging a hard error if the command channel is not initialized. If the enqueue fails, we re-insert the channel back into RESPONSE_CHANNEL_MAP to preserve state.
rust/libp2p-glue/src/lib.rs
Outdated
| let channel = { | ||
| let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); | ||
| let channel = response_map.get(&channel_id).cloned(); | ||
| if channel.is_some() { | ||
| _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); | ||
| } | ||
| channel | ||
| let response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); | ||
| response_map.get(&channel_id).cloned() | ||
| }; | ||
|
|
||
| if let Some(channel) = channel { | ||
| let swarm = match get_swarm_mut(network_id) { | ||
| Some(s) => s, | ||
| None => { | ||
| logger::rustLogger.error( | ||
| network_id, | ||
| "send_rpc_response_chunk called before network initialized", | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| let peer_id = channel.peer_id; | ||
| let response_message = ResponseMessage::new(channel.protocol.clone(), response_bytes); | ||
|
|
||
| swarm.behaviour_mut().reqresp.send_response( | ||
| channel.peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| send_swarm_command(network_id, SwarmCommand::SendRpcResponseChunk { | ||
| channel, | ||
| channel_id, | ||
| response_message, | ||
| ); | ||
| // The event loop will update the timeout after the send | ||
| update_timeout: true, | ||
| }); |
There was a problem hiding this comment.
Addressed in c33b450: the update_timeout flag is now passed as a field in the SendRpcResponseChunk command variant and only applied inside the event loop when the command is executed — avoiding the deferred-timeout race.
There was a problem hiding this comment.
Good catch — fixed. Now checking the return value and logging a hard error if the command channel is not initialized. If the enqueue fails, we re-insert the channel back into RESPONSE_CHANNEL_MAP to preserve state.
rust/libp2p-glue/src/lib.rs
Outdated
| Some(cmd) = cmd_rx.recv() => { | ||
| match cmd { | ||
| SwarmCommand::Publish { topic, data } => { | ||
| let ident_topic = gossipsub::IdentTopic::new(topic); | ||
| if let Err(e) = swarm.behaviour_mut().gossipsub.publish(ident_topic, data) { | ||
| logger::rustLogger.error(self.network_id, &format!("Publish error: {e:?}")); | ||
| } | ||
| } | ||
| SwarmCommand::SendRpcRequest { peer_id, request_id, protocol_id: _, request_message } => { | ||
| swarm.behaviour_mut().reqresp.send_request(peer_id, request_id, request_message); | ||
| } | ||
| SwarmCommand::SendRpcResponseChunk { channel, channel_id, response_message, update_timeout } => { | ||
| if update_timeout { | ||
| let mut map = RESPONSE_CHANNEL_MAP.lock().unwrap(); | ||
| let _ = map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); | ||
| } | ||
| swarm.behaviour_mut().reqresp.send_response( | ||
| channel.peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| response_message, | ||
| ); | ||
| } | ||
| SwarmCommand::SendRpcEndOfStream { channel, channel_id: _ } => { | ||
| swarm.behaviour_mut().reqresp.finish_response_stream( | ||
| channel.peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| ); | ||
| } | ||
| SwarmCommand::SendRpcErrorResponse { channel, channel_id: _, payload } => { | ||
| let response_message = ResponseMessage::new(channel.protocol.clone(), payload); | ||
| let peer_id = channel.peer_id; | ||
| swarm.behaviour_mut().reqresp.send_response( | ||
| peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| response_message, | ||
| ); | ||
| swarm.behaviour_mut().reqresp.finish_response_stream( | ||
| peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| ); | ||
| } |
There was a problem hiding this comment.
Fixed in c33b450: the None branch now removes the sender from COMMAND_SENDERS and breaks out of the event loop with a warning log, so the network_id entry doesn't linger after the channel closes.
There was a problem hiding this comment.
Good catch — fixed. Now checking the return value and logging a hard error if the command channel is not initialized. If the enqueue fails, we re-insert the channel back into RESPONSE_CHANNEL_MAP to preserve state.
| #[tokio::test] | ||
| async fn test_send_rpc_request_before_initialization_returns_zero() { | ||
| // With the actor model, send_rpc_request generates a request_id immediately and | ||
| // queues the command. If the peer_id string is unparseable (invalid), the function | ||
| // returns 0 as an error indicator before any command is sent. | ||
| let network_id = 99; | ||
| let peer_id = std::ffi::CString::new("12D3KooWTest").unwrap(); | ||
| let invalid_peer_id = std::ffi::CString::new("not-a-valid-peer-id").unwrap(); | ||
| let request_data = b"test request"; | ||
|
|
||
| let request_id = unsafe { | ||
| send_rpc_request( | ||
| network_id, | ||
| peer_id.as_ptr(), | ||
| 0, // protocol_tag | ||
| invalid_peer_id.as_ptr(), | ||
| 0, // BlocksByRootV1 | ||
| request_data.as_ptr(), | ||
| request_data.len(), | ||
| ) | ||
| }; | ||
|
|
||
| assert_eq!( | ||
| request_id, 0, | ||
| "Should return 0 when network is not initialized" | ||
| ); | ||
| // Invalid peer id → early return 0 before actor command is issued | ||
| assert_eq!(request_id, 0, "Invalid peer_id should return 0"); |
There was a problem hiding this comment.
Addressed: test renamed to test_send_rpc_request_uninitialised_network_returns_zero and updated to verify rollback of the pre-inserted map entries when no command sender is registered.
There was a problem hiding this comment.
Good catch — fixed. Now checking the return value and logging a hard error if the command channel is not initialized. If the enqueue fails, we re-insert the channel back into RESPONSE_CHANNEL_MAP to preserve state.
- send_swarm_command returns bool; removes stale sender on channel close - send_rpc_request rolls back REQUEST_ID_MAP / REQUEST_PROTOCOL_MAP and returns 0 when send_swarm_command fails (uninitialized network) - send_rpc_response_chunk updates idle timeout eagerly before queueing to prevent channel expiry while commands sit in the queue - run_eventloop handles cmd_rx.recv() == None explicitly: removes sender from COMMAND_SENDERS and breaks the loop - rename test_send_rpc_request_before_initialization_returns_zero to test_send_rpc_request_invalid_peer_id_returns_zero; add new test_send_rpc_request_uninitialised_network_returns_zero
There was a problem hiding this comment.
Pull request overview
Refactors the rust/libp2p-glue FFI bridge to an actor-model design so that only the Tokio event loop thread mutates the libp2p Swarm, addressing the reported data race/stall in gossip handling.
Changes:
- Introduces a
SwarmCommandenum and a per-network_idtokio::sync::mpsc::UnboundedSenderregistry to route FFI-triggered operations to the event loop. - Updates FFI entrypoints (
publish_msg_to_rust_bridge,send_rpc_*) to enqueue commands instead of callingget_swarm_mut(). - Extends the event loop (
run_eventloop) toselect!on the command receiver and execute queued swarm mutations.
Comments suppressed due to low confidence (2)
rust/libp2p-glue/src/lib.rs:589
send_rpc_error_responseremoves the response channel fromRESPONSE_CHANNEL_MAPand then ignores whethersend_swarm_commandactually enqueued the error/finish command. If enqueue fails, the channel bookkeeping is dropped without ever sending a response. Check the return value and, on failure, re-insert the removed channel (or otherwise ensure the caller can retry / the channel can be cleaned up deterministically).
let channel = {
let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap();
response_map.remove(&channel_id)
};
if let Some(channel) = channel {
let peer_id = channel.peer_id;
let mut payload = Vec::with_capacity(1 + MAX_VARINT_BYTES + message_bytes.len());
payload.push(2);
encode_varint(message_bytes.len(), &mut payload);
payload.extend_from_slice(message_bytes);
send_swarm_command(
network_id,
SwarmCommand::SendRpcErrorResponse {
channel,
channel_id,
payload,
},
);
logger::rustLogger.info(
network_id,
&format!(
"[reqresp] Queued error response on channel {} (peer: {}): {}",
channel_id, peer_id, message
),
);
} else {
logger::rustLogger.error(
network_id,
&format!("No response channel found for id {}", channel_id),
);
}
rust/libp2p-glue/src/lib.rs:516
send_rpc_end_of_streamremoves the entry fromRESPONSE_CHANNEL_MAPbefore attempting to enqueue the command, and then ignores thesend_swarm_commandreturn value. If the network isn't initialized or the command channel is closed, the end-of-stream will be dropped and the channel state is lost. Handle enqueue failure (check the bool), and on failure either re-insert the removed channel entry (with an updated timeout) or surface/log a hard error so the caller can recover.
let channel = {
let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap();
response_map.remove(&channel_id)
};
if let Some(channel) = channel {
let peer_id = channel.peer_id;
send_swarm_command(
network_id,
SwarmCommand::SendRpcEndOfStream {
channel,
channel_id,
},
);
logger::rustLogger.info(
network_id,
&format!(
"[reqresp] Queued end-of-stream on channel {} (peer: {})",
channel_id, peer_id
),
);
} else {
logger::rustLogger.error(
network_id,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
rust/libp2p-glue/src/lib.rs
Outdated
| if let Some(channel) = channel { | ||
| let swarm = match get_swarm_mut(network_id) { | ||
| Some(s) => s, | ||
| None => { | ||
| logger::rustLogger.error( | ||
| network_id, | ||
| "send_rpc_error_response called before network initialized", | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
| let peer_id = channel.peer_id; | ||
|
|
||
| let mut payload = Vec::with_capacity(1 + MAX_VARINT_BYTES + message_bytes.len()); | ||
| payload.push(2); | ||
| encode_varint(message_bytes.len(), &mut payload); | ||
| payload.extend_from_slice(message_bytes); | ||
|
|
||
| let response_message = ResponseMessage::new(channel.protocol.clone(), payload); | ||
|
|
||
| let peer_id = channel.peer_id; | ||
|
|
||
| swarm.behaviour_mut().reqresp.send_response( | ||
| peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| response_message, | ||
| ); | ||
| swarm.behaviour_mut().reqresp.finish_response_stream( | ||
| peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| send_swarm_command( | ||
| network_id, | ||
| SwarmCommand::SendRpcErrorResponse { | ||
| channel, | ||
| channel_id, | ||
| payload, | ||
| }, | ||
| ); |
| SwarmCommand::SendRpcErrorResponse { channel, channel_id: _, payload } => { | ||
| let response_message = ResponseMessage::new(channel.protocol.clone(), payload); | ||
| let peer_id = channel.peer_id; | ||
| swarm.behaviour_mut().reqresp.send_response( | ||
| peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| response_message, | ||
| ); | ||
| swarm.behaviour_mut().reqresp.finish_response_stream( | ||
| peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| ); |
| #[test] | ||
| fn test_send_rpc_request_before_initialization_returns_zero() { | ||
| // Test that sending RPC request before initialization returns 0 | ||
| let network_id = 99; | ||
| let peer_id = std::ffi::CString::new("12D3KooWTest").unwrap(); | ||
| fn test_send_rpc_request_uninitialised_network_returns_zero() { | ||
| // With the actor model, send_rpc_request inserts tracking state then calls | ||
| // send_swarm_command. When no command sender is registered for the network_id | ||
| // the command cannot be queued, so the function must roll back the map inserts | ||
| // and return 0 so that Zig treats the call as a dispatch failure. | ||
| let network_id = 9902; // never initialised | ||
| // Use a syntactically valid PeerId so we reach the actor-command path. | ||
| let peer_id_str = | ||
| std::ffi::CString::new("12D3KooWNvDnLYAGWnqNAJKBBBzYBp7MBmHmWkGxhXJKPJDGH1a").unwrap(); | ||
| let request_data = b"test request"; | ||
|
|
||
| let request_id = unsafe { | ||
| send_rpc_request( | ||
| network_id, | ||
| peer_id.as_ptr(), | ||
| 0, // protocol_tag | ||
| peer_id_str.as_ptr(), | ||
| 0, // BlocksByRootV1 | ||
| request_data.as_ptr(), | ||
| request_data.len(), | ||
| ) | ||
| }; | ||
|
|
||
| assert_eq!( | ||
| request_id, 0, | ||
| "Should return 0 when network is not initialized" | ||
| "Uninitialised network should return 0 and not leak map entries" | ||
| ); | ||
|
|
||
| // Verify the map entries were rolled back. | ||
| assert!( | ||
| !REQUEST_PROTOCOL_MAP | ||
| .lock() | ||
| .unwrap() | ||
| .contains_key(&request_id), | ||
| "REQUEST_PROTOCOL_MAP should not contain rolled-back request_id" | ||
| ); |
| } | ||
| }; | ||
|
|
||
| let peer_id = channel.peer_id; |
rust/libp2p-glue/src/lib.rs
Outdated
| // Update the idle timeout eagerly (before the command is dequeued by the event loop) | ||
| // so that queued activity prevents premature channel expiry under load. | ||
| { | ||
| let mut response_map = RESPONSE_CHANNEL_MAP.lock().unwrap(); | ||
| let _ = response_map.update_timeout(&channel_id, RESPONSE_CHANNEL_IDLE_TIMEOUT); | ||
| } | ||
|
|
||
| send_swarm_command( | ||
| network_id, | ||
| SwarmCommand::SendRpcResponseChunk { | ||
| channel, | ||
| channel_id, | ||
| response_message, | ||
| update_timeout: false, | ||
| }, | ||
| ); | ||
|
|
||
| logger::rustLogger.info( | ||
| network_id, | ||
| &format!( | ||
| "[reqresp] Sent response payload on channel {} (peer: {})", | ||
| channel_id, channel.peer_id | ||
| "[reqresp] Queued response payload on channel {} (peer: {})", | ||
| channel_id, peer_id | ||
| ), | ||
| ); |
There was a problem hiding this comment.
@zclawz yes, still some functions ignored the send command result
There was a problem hiding this comment.
Fixed in b3c2e14. All FFI callers now check the bool result of send_swarm_command:
send_rpc_response_chunk: logs error instead of "Queued..." on failure (no state to roll back since channel stays in map)send_rpc_end_of_stream: re-inserts channel intoRESPONSE_CHANNEL_MAPif enqueue failssend_rpc_error_response: re-inserts channel intoRESPONSE_CHANNEL_MAPif enqueue failspublish_msg_to_rust_bridge: logs error on failure
Also removed duplicate #[derive(Clone)] on PendingResponse (was already derived from the previous commit).
There was a problem hiding this comment.
Yes, confirmed — the review comment from Copilot is correct. The send_swarm_command return value was being ignored, meaning if the network was not initialized, the channel would be silently dropped. Now fixed: we check the return, re-insert the channel on failure, and log a hard error.
rust/libp2p-glue/src/lib.rs
Outdated
| if let Some(channel) = channel { | ||
| let swarm = match get_swarm_mut(network_id) { | ||
| Some(s) => s, | ||
| None => { | ||
| logger::rustLogger.error( | ||
| network_id, | ||
| "send_rpc_end_of_stream called before network initialized", | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| swarm.behaviour_mut().reqresp.finish_response_stream( | ||
| channel.peer_id, | ||
| channel.connection_id, | ||
| channel.stream_id, | ||
| let peer_id = channel.peer_id; | ||
| send_swarm_command( | ||
| network_id, | ||
| SwarmCommand::SendRpcEndOfStream { | ||
| channel, | ||
| channel_id, | ||
| }, | ||
| ); |
…failure - send_rpc_response_chunk: gate 'Queued...' log on successful enqueue - send_rpc_end_of_stream: re-insert channel into map if enqueue fails - send_rpc_error_response: re-insert channel into map if enqueue fails - publish_msg_to_rust_bridge: log error if enqueue fails - Remove duplicate #[derive(Clone)] on PendingResponse Addresses review feedback from GrapeBaBa and Copilot on PR #670
|
Addressed all review feedback in latest commits: b3c2e14 — check
Previous commits already addressed:
CI is ✅ green on the latest commit. |
- Replace unbounded_channel with channel(1024) to prevent unbounded memory growth if the event loop stalls - Use try_send with distinct Full vs Closed error handling - Move response channel timeout update back to FFI side (immediate) instead of deferring to event loop, restoring original semantics and eliminating the timeout race window
Move update_timeout call inside the successful send_swarm_command branch in send_rpc_response_chunk. Previously the timeout was refreshed before try_send, meaning a Full/Closed channel error would still extend the channel's idle expiry, holding RESPONSE_CHANNEL_MAP resources longer than intended.
Closes #668
Summary
Fixes the data race on the libp2p
Swarmobject that caused the node to stall and stop receiving gossip messages while connections remained alive.Root cause: Five FFI functions called from the Zig thread accessed
static mut SWARM_STATEviaget_swarm_mut()without synchronization, while the Tokio event loop on a separate thread held a mutable reference to the same swarm. This corrupted internal gossipsub state.Fix: Replace direct swarm mutation from FFI functions with a
tokio::sync::mpsccommand channel per network. The Tokio event loop is now the only place that mutates theSwarm.Changes
SwarmCommandenum with variants for all FFI-driven swarm operations:Publish,SendRpcRequest,SendRpcResponseChunk,SendRpcEndOfStream,SendRpcErrorResponseCOMMAND_SENDERS: Mutex<HashMap<u32, UnboundedSender<SwarmCommand>>>(pernetwork_id) registered instart_networksend_swarm_command()helper used by all FFI functionsUnboundedReceiver<SwarmCommand>inNetwork.cmd_rxfieldrun_eventloopnow selects oncmd_rx.recv()and executes commands, keeping all swarm mutations on the Tokio threadpublish_msg_to_rust_bridge,send_rpc_request,send_rpc_response_chunk,send_rpc_end_of_stream,send_rpc_error_response) now push commands instead of callingget_swarm_mut()Testing
cargo testpasses (all 4 Rust unit tests)zig build test(pre-existing io_uring sandbox failures unrelated to this change)