From 9a9e29ef65dcb6a4ccee92ff6d41392d8498f597 Mon Sep 17 00:00:00 2001 From: Andrew Luka Date: Mon, 15 Dec 2025 16:57:31 +0200 Subject: [PATCH] fix(swarm): more optimized code --- swarm/src/connection/pool/task.rs | 210 ++++++++++++++++++++++-------- 1 file changed, 155 insertions(+), 55 deletions(-) diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index c3f15e8ede7..67562cc4ff3 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -25,7 +25,7 @@ use std::{convert::Infallible, pin::Pin}; use futures::{ channel::{mpsc, oneshot}, - future::{poll_fn, Either, Future}, + future::{poll_fn, Either, Future, FutureExt}, SinkExt, StreamExt, }; use libp2p_core::muxing::StreamMuxerBox; @@ -177,17 +177,47 @@ pub(crate) async fn new_for_established_connection( THandler: ConnectionHandler, { loop { - match futures::future::select( - command_receiver.next(), - poll_fn(|cx| Pin::new(&mut connection).poll(cx)), - ) - .await - { - Either::Left((Some(command), _)) => match command { - Command::NotifyHandler(event) => connection.on_behaviour_event(event), - Command::Close => { + // Phase 1: Exhaust connection events (high priority - process all available data) + loop { + match poll_fn(|cx| Pin::new(&mut connection).poll(cx)).now_or_never() { + Some(Ok(connection::Event::Handler(event))) => { + // Try non-blocking send first + match events.try_send(EstablishedConnectionEvent::Notify { + id: connection_id, + peer_id, + event, + }) { + Ok(()) => continue, // Successfully sent, drain more events + Err(e) if e.is_full() => { + // Channel full, must await + let _ = events.send(e.into_inner()).await; + // Continue draining after send completes + continue; + } + Err(_) => return, // Channel closed, exit task + } + } + Some(Ok(connection::Event::AddressChange(new_address))) => { + // Try non-blocking send first + match events.try_send(EstablishedConnectionEvent::AddressChange { + id: connection_id, + peer_id, + new_address, + }) { + Ok(()) => continue, // Successfully sent, drain more events + Err(e) if e.is_full() => { + // Channel full, must await + let _ = events.send(e.into_inner()).await; + // Continue draining after send completes + continue; + } + Err(_) => return, // Channel closed, exit task + } + } + Some(Err(error)) => { + // Connection error, close and terminate command_receiver.close(); - let (remaining_events, closing_muxer) = connection.close(); + let (remaining_events, _closing_muxer) = connection.close(); let _ = events .send_all(&mut remaining_events.map(|event| { @@ -199,65 +229,135 @@ pub(crate) async fn new_for_established_connection( })) .await; - let error = closing_muxer.await.err().map(ConnectionError::IO); - let _ = events .send(EstablishedConnectionEvent::Closed { id: connection_id, peer_id, - error, + error: Some(error), }) .await; return; } - }, + None => break, // Connection would block, move to phase 2 + } + } - // The manager has disappeared; abort. - Either::Left((None, _)) => return, + // Phase 2: Check for commands (lower priority) + match command_receiver.try_next() { + Ok(Some(Command::NotifyHandler(event))) => { + connection.on_behaviour_event(event); + // Loop back to phase 1 - handler might have generated events + continue; + } + Ok(Some(Command::Close)) => { + command_receiver.close(); + let (remaining_events, closing_muxer) = connection.close(); - Either::Right((event, _)) => { - match event { - Ok(connection::Event::Handler(event)) => { - let _ = events - .send(EstablishedConnectionEvent::Notify { - id: connection_id, - peer_id, - event, - }) - .await; - } - Ok(connection::Event::AddressChange(new_address)) => { - let _ = events - .send(EstablishedConnectionEvent::AddressChange { - id: connection_id, - peer_id, - new_address, - }) - .await; - } - Err(error) => { - command_receiver.close(); - let (remaining_events, _closing_muxer) = connection.close(); + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })) + .await; + + let error = closing_muxer.await.err().map(ConnectionError::IO); + + let _ = events + .send(EstablishedConnectionEvent::Closed { + id: connection_id, + peer_id, + error, + }) + .await; + return; + } + Ok(None) => return, // Manager disappeared, exit task + Err(_) => { + // No command ready, both futures would block + // Fall back to select to await either future + match futures::future::select( + command_receiver.next(), + poll_fn(|cx| Pin::new(&mut connection).poll(cx)), + ) + .await + { + Either::Left((Some(command), _)) => match command { + Command::NotifyHandler(event) => connection.on_behaviour_event(event), + Command::Close => { + command_receiver.close(); + let (remaining_events, closing_muxer) = connection.close(); + + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })) + .await; - let _ = events - .send_all(&mut remaining_events.map(|event| { - Ok(EstablishedConnectionEvent::Notify { + let error = closing_muxer.await.err().map(ConnectionError::IO); + + let _ = events + .send(EstablishedConnectionEvent::Closed { id: connection_id, - event, peer_id, + error, }) - })) - .await; + .await; + return; + } + }, + Either::Left((None, _)) => return, // Manager disappeared + Either::Right((event, _)) => { + // Connection event ready, handle it + match event { + Ok(connection::Event::Handler(event)) => { + let _ = events + .send(EstablishedConnectionEvent::Notify { + id: connection_id, + peer_id, + event, + }) + .await; + } + Ok(connection::Event::AddressChange(new_address)) => { + let _ = events + .send(EstablishedConnectionEvent::AddressChange { + id: connection_id, + peer_id, + new_address, + }) + .await; + } + Err(error) => { + command_receiver.close(); + let (remaining_events, _closing_muxer) = connection.close(); - // Terminate the task with the error, dropping the connection. - let _ = events - .send(EstablishedConnectionEvent::Closed { - id: connection_id, - peer_id, - error: Some(error), - }) - .await; - return; + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })) + .await; + + let _ = events + .send(EstablishedConnectionEvent::Closed { + id: connection_id, + peer_id, + error: Some(error), + }) + .await; + return; + } + } } } }