Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 155 additions & 55 deletions swarm/src/connection/pool/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{convert::Infallible, pin::Pin};

use futures::{
channel::{mpsc, oneshot},
future::{poll_fn, Either, Future},
future::{poll_fn, Either, Future, FutureExt},
SinkExt, StreamExt,
};
use libp2p_core::muxing::StreamMuxerBox;
Expand Down Expand Up @@ -177,17 +177,47 @@ pub(crate) async fn new_for_established_connection<THandler>(
THandler: ConnectionHandler,
{
loop {
match futures::future::select(
command_receiver.next(),
poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
)
.await
{
Either::Left((Some(command), _)) => match command {
Command::NotifyHandler(event) => connection.on_behaviour_event(event),
Command::Close => {
// Phase 1: Exhaust connection events (high priority - process all available data)
loop {
match poll_fn(|cx| Pin::new(&mut connection).poll(cx)).now_or_never() {
Some(Ok(connection::Event::Handler(event))) => {
// Try non-blocking send first
match events.try_send(EstablishedConnectionEvent::Notify {
id: connection_id,
peer_id,
event,
}) {
Ok(()) => continue, // Successfully sent, drain more events
Err(e) if e.is_full() => {
// Channel full, must await
let _ = events.send(e.into_inner()).await;
// Continue draining after send completes
continue;
}
Err(_) => return, // Channel closed, exit task
}
}
Some(Ok(connection::Event::AddressChange(new_address))) => {
// Try non-blocking send first
match events.try_send(EstablishedConnectionEvent::AddressChange {
id: connection_id,
peer_id,
new_address,
}) {
Ok(()) => continue, // Successfully sent, drain more events
Err(e) if e.is_full() => {
// Channel full, must await
let _ = events.send(e.into_inner()).await;
// Continue draining after send completes
continue;
}
Err(_) => return, // Channel closed, exit task
}
}
Some(Err(error)) => {
// Connection error, close and terminate
command_receiver.close();
let (remaining_events, closing_muxer) = connection.close();
let (remaining_events, _closing_muxer) = connection.close();

let _ = events
.send_all(&mut remaining_events.map(|event| {
Expand All @@ -199,65 +229,135 @@ pub(crate) async fn new_for_established_connection<THandler>(
}))
.await;

let error = closing_muxer.await.err().map(ConnectionError::IO);

let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error,
error: Some(error),
})
.await;
return;
}
},
None => break, // Connection would block, move to phase 2
}
}

// The manager has disappeared; abort.
Either::Left((None, _)) => return,
// Phase 2: Check for commands (lower priority)
match command_receiver.try_next() {
Ok(Some(Command::NotifyHandler(event))) => {
connection.on_behaviour_event(event);
// Loop back to phase 1 - handler might have generated events
continue;
}
Ok(Some(Command::Close)) => {
command_receiver.close();
let (remaining_events, closing_muxer) = connection.close();

Either::Right((event, _)) => {
match event {
Ok(connection::Event::Handler(event)) => {
let _ = events
.send(EstablishedConnectionEvent::Notify {
id: connection_id,
peer_id,
event,
})
.await;
}
Ok(connection::Event::AddressChange(new_address)) => {
let _ = events
.send(EstablishedConnectionEvent::AddressChange {
id: connection_id,
peer_id,
new_address,
})
.await;
}
Err(error) => {
command_receiver.close();
let (remaining_events, _closing_muxer) = connection.close();
let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;

let error = closing_muxer.await.err().map(ConnectionError::IO);

let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error,
})
.await;
return;
}
Ok(None) => return, // Manager disappeared, exit task
Err(_) => {
// No command ready, both futures would block
// Fall back to select to await either future
match futures::future::select(
command_receiver.next(),
poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
)
.await
{
Either::Left((Some(command), _)) => match command {
Command::NotifyHandler(event) => connection.on_behaviour_event(event),
Command::Close => {
command_receiver.close();
let (remaining_events, closing_muxer) = connection.close();

let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;

let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
let error = closing_muxer.await.err().map(ConnectionError::IO);

let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
event,
peer_id,
error,
})
}))
.await;
.await;
return;
}
},
Either::Left((None, _)) => return, // Manager disappeared
Either::Right((event, _)) => {
// Connection event ready, handle it
match event {
Ok(connection::Event::Handler(event)) => {
let _ = events
.send(EstablishedConnectionEvent::Notify {
id: connection_id,
peer_id,
event,
})
.await;
}
Ok(connection::Event::AddressChange(new_address)) => {
let _ = events
.send(EstablishedConnectionEvent::AddressChange {
id: connection_id,
peer_id,
new_address,
})
.await;
}
Err(error) => {
command_receiver.close();
let (remaining_events, _closing_muxer) = connection.close();

// Terminate the task with the error, dropping the connection.
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error: Some(error),
})
.await;
return;
let _ = events
.send_all(&mut remaining_events.map(|event| {
Ok(EstablishedConnectionEvent::Notify {
id: connection_id,
event,
peer_id,
})
}))
.await;

let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error: Some(error),
})
.await;
return;
}
}
}
}
}
Expand Down
Loading