Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 113 additions & 74 deletions swarm/src/connection/pool/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

//! Async functions driving pending and established connections in the form of a task.

use std::{convert::Infallible, pin::Pin};
use std::{
convert::Infallible,
pin::Pin,
task::{Context, Poll},
};

use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -167,6 +171,18 @@ pub(crate) async fn new_for_pending_incoming_connection<TFut>(
}
}

/// Result of polling the connection loop - determines what action to take next.
enum ConnectionLoopAction<THandler: ConnectionHandler> {
/// An event needs to be sent to the pool
SendEvent(EstablishedConnectionEvent<THandler::ToBehaviour>),
/// The connection should be gracefully closed
Close,
/// The connection encountered an error
Error(ConnectionError),
/// The manager disappeared (command channel closed)
ManagerGone,
}

pub(crate) async fn new_for_established_connection<THandler>(
connection_id: ConnectionId,
peer_id: PeerId,
Expand All @@ -177,90 +193,113 @@ pub(crate) async fn new_for_established_connection<THandler>(
THandler: ConnectionHandler,
{
loop {
match futures::future::select(
command_receiver.next(),
poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
)
.await
{
Either::Left((Some(command), _)) => match command {
Command::NotifyHandler(event) => connection.on_behaviour_event(event),
Command::Close => {
command_receiver.close();
let (remaining_events, closing_muxer) = connection.close();
// Use a single poll_fn that polls both the command receiver and connection.
// This avoids the waker accumulation issue that occurs when using
// futures::future::select in a loop, where each iteration creates new
// futures that register additional wakers without cleaning up old ones.
let action = poll_fn(|cx: &mut Context<'_>| {
// Poll connection events first - prioritize network I/O over commands
// from the behavior layer for better responsiveness.
match Pin::new(&mut connection).poll(cx) {
Poll::Ready(Ok(connection::Event::Handler(event))) => {
return Poll::Ready(ConnectionLoopAction::SendEvent(
EstablishedConnectionEvent::Notify {
id: connection_id,
peer_id,
event,
},
));
}
Poll::Ready(Ok(connection::Event::AddressChange(new_address))) => {
return Poll::Ready(ConnectionLoopAction::SendEvent(
EstablishedConnectionEvent::AddressChange {
id: connection_id,
peer_id,
new_address,
},
));
}
Poll::Ready(Err(error)) => {
return Poll::Ready(ConnectionLoopAction::<THandler>::Error(error));
}
Poll::Pending => {}
}

// Then poll commands from the behavior layer
loop {
match command_receiver.poll_next_unpin(cx) {
Poll::Ready(Some(Command::NotifyHandler(event))) => {
connection.on_behaviour_event(event);
// Continue polling - there may be more commands
continue;
}
Poll::Ready(Some(Command::Close)) => {
return Poll::Ready(ConnectionLoopAction::<THandler>::Close);
}
Poll::Ready(None) => {
return Poll::Ready(ConnectionLoopAction::<THandler>::ManagerGone);
}
Poll::Pending => break,
}
}

let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;
Poll::Pending
})
.await;

let error = closing_muxer.await.err().map(ConnectionError::IO);
match action {
ConnectionLoopAction::SendEvent(event) => {
let _ = events.send(event).await;
}
ConnectionLoopAction::Close => {
command_receiver.close();
let (remaining_events, closing_muxer) = connection.close();

let _ = events
.send(EstablishedConnectionEvent::Closed {
let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
error,
})
.await;
return;
}
},
}))
.await;

// The manager has disappeared; abort.
Either::Left((None, _)) => return,
let error = closing_muxer.await.err().map(ConnectionError::IO);

Either::Right((event, _)) => {
match event {
Ok(connection::Event::Handler(event)) => {
let _ = events
.send(EstablishedConnectionEvent::Notify {
id: connection_id,
peer_id,
event,
})
.await;
}
Ok(connection::Event::AddressChange(new_address)) => {
let _ = events
.send(EstablishedConnectionEvent::AddressChange {
id: connection_id,
peer_id,
new_address,
})
.await;
}
Err(error) => {
command_receiver.close();
let (remaining_events, _closing_muxer) = connection.close();
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error,
})
.await;
return;
}
ConnectionLoopAction::Error(error) => {
command_receiver.close();
let (remaining_events, _closing_muxer) = connection.close();

let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;
let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;

// Terminate the task with the error, dropping the connection.
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error: Some(error),
})
.await;
return;
}
}
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error: Some(error),
})
.await;
return;
}
ConnectionLoopAction::ManagerGone => return,
}
}
}
Loading