diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index c3f15e8ede7..51fa6ec13ec 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -21,7 +21,11 @@ //! Async functions driving pending and established connections in the form of a task. -use std::{convert::Infallible, pin::Pin}; +use std::{ + convert::Infallible, + pin::Pin, + task::{Context, Poll}, +}; use futures::{ channel::{mpsc, oneshot}, @@ -167,6 +171,18 @@ pub(crate) async fn new_for_pending_incoming_connection( } } +/// Result of polling the connection loop - determines what action to take next. +enum ConnectionLoopAction { + /// An event needs to be sent to the pool + SendEvent(EstablishedConnectionEvent), + /// The connection should be gracefully closed + Close, + /// The connection encountered an error + Error(ConnectionError), + /// The manager disappeared (command channel closed) + ManagerGone, +} + pub(crate) async fn new_for_established_connection( connection_id: ConnectionId, peer_id: PeerId, @@ -177,90 +193,113 @@ pub(crate) async fn new_for_established_connection( THandler: ConnectionHandler, { loop { - match futures::future::select( - command_receiver.next(), - poll_fn(|cx| Pin::new(&mut connection).poll(cx)), - ) - .await - { - Either::Left((Some(command), _)) => match command { - Command::NotifyHandler(event) => connection.on_behaviour_event(event), - Command::Close => { - command_receiver.close(); - let (remaining_events, closing_muxer) = connection.close(); + // Use a single poll_fn that polls both the command receiver and connection. + // This avoids the waker accumulation issue that occurs when using + // futures::future::select in a loop, where each iteration creates new + // futures that register additional wakers without cleaning up old ones. + let action = poll_fn(|cx: &mut Context<'_>| { + // Poll connection events first - prioritize network I/O over commands + // from the behavior layer for better responsiveness. + match Pin::new(&mut connection).poll(cx) { + Poll::Ready(Ok(connection::Event::Handler(event))) => { + return Poll::Ready(ConnectionLoopAction::SendEvent( + EstablishedConnectionEvent::Notify { + id: connection_id, + peer_id, + event, + }, + )); + } + Poll::Ready(Ok(connection::Event::AddressChange(new_address))) => { + return Poll::Ready(ConnectionLoopAction::SendEvent( + EstablishedConnectionEvent::AddressChange { + id: connection_id, + peer_id, + new_address, + }, + )); + } + Poll::Ready(Err(error)) => { + return Poll::Ready(ConnectionLoopAction::::Error(error)); + } + Poll::Pending => {} + } + + // Then poll commands from the behavior layer + loop { + match command_receiver.poll_next_unpin(cx) { + Poll::Ready(Some(Command::NotifyHandler(event))) => { + connection.on_behaviour_event(event); + // Continue polling - there may be more commands + continue; + } + Poll::Ready(Some(Command::Close)) => { + return Poll::Ready(ConnectionLoopAction::::Close); + } + Poll::Ready(None) => { + return Poll::Ready(ConnectionLoopAction::::ManagerGone); + } + Poll::Pending => break, + } + } - let _ = events - .send_all(&mut remaining_events.map(|event| { - Ok(EstablishedConnectionEvent::Notify { - id: connection_id, - event, - peer_id, - }) - })) - .await; + Poll::Pending + }) + .await; - let error = closing_muxer.await.err().map(ConnectionError::IO); + match action { + ConnectionLoopAction::SendEvent(event) => { + let _ = events.send(event).await; + } + ConnectionLoopAction::Close => { + command_receiver.close(); + let (remaining_events, closing_muxer) = connection.close(); - let _ = events - .send(EstablishedConnectionEvent::Closed { + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { id: connection_id, + event, peer_id, - error, }) - .await; - return; - } - }, + })) + .await; - // The manager has disappeared; abort. - Either::Left((None, _)) => return, + let error = closing_muxer.await.err().map(ConnectionError::IO); - Either::Right((event, _)) => { - match event { - Ok(connection::Event::Handler(event)) => { - let _ = events - .send(EstablishedConnectionEvent::Notify { - id: connection_id, - peer_id, - event, - }) - .await; - } - Ok(connection::Event::AddressChange(new_address)) => { - let _ = events - .send(EstablishedConnectionEvent::AddressChange { - id: connection_id, - peer_id, - new_address, - }) - .await; - } - Err(error) => { - command_receiver.close(); - let (remaining_events, _closing_muxer) = connection.close(); + let _ = events + .send(EstablishedConnectionEvent::Closed { + id: connection_id, + peer_id, + error, + }) + .await; + return; + } + ConnectionLoopAction::Error(error) => { + command_receiver.close(); + let (remaining_events, _closing_muxer) = connection.close(); - let _ = events - .send_all(&mut remaining_events.map(|event| { - Ok(EstablishedConnectionEvent::Notify { - id: connection_id, - event, - peer_id, - }) - })) - .await; + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })) + .await; - // Terminate the task with the error, dropping the connection. - let _ = events - .send(EstablishedConnectionEvent::Closed { - id: connection_id, - peer_id, - error: Some(error), - }) - .await; - return; - } - } + let _ = events + .send(EstablishedConnectionEvent::Closed { + id: connection_id, + peer_id, + error: Some(error), + }) + .await; + return; } + ConnectionLoopAction::ManagerGone => return, } } }