Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 93 additions & 77 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ where
..
} = self.get_mut();

// Exhaust requested_substreams (must poll until Pending per Stream contract)
loop {
match requested_substreams.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(()))) => continue,
Expand All @@ -283,16 +284,17 @@ where
));
continue;
}
Poll::Ready(None) | Poll::Pending => {}
Poll::Ready(None) | Poll::Pending => break,
}
}

// Poll the [`ConnectionHandler`].
// Exhaust the [`ConnectionHandler`] (must poll until exhausted).
loop {
match handler.poll(cx) {
Poll::Pending => {}
Poll::Pending => break,
Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => {
let timeout = *protocol.timeout();
let (upgrade, user_data) = protocol.into_upgrade();

requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade));
continue; // Poll handler until exhausted.
}
Expand Down Expand Up @@ -324,11 +326,12 @@ where
continue;
}
}
}

// In case the [`ConnectionHandler`] can not make any more progress, poll the
// negotiating outbound streams.
// Exhaust negotiating outbound streams (must poll until Pending per Stream contract)
loop {
match negotiating_out.poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => {}
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some((info, Ok(protocol)))) => {
handler.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
FullyNegotiatedOutbound { protocol, info },
Expand All @@ -342,11 +345,12 @@ where
continue;
}
}
}

// In case both the [`ConnectionHandler`] and the negotiating outbound streams can not
// make any more progress, poll the negotiating inbound streams.
// Exhaust negotiating inbound streams (must poll until Pending per Stream contract)
loop {
match negotiating_in.poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => {}
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some((info, Ok(protocol)))) => {
handler.on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
FullyNegotiatedInbound { protocol, info },
Expand All @@ -372,49 +376,57 @@ where
continue;
}
}
}

// Check if the connection (and handler) should be shut down.
// As long as we're still negotiating substreams or have
// any active streams shutdown is always postponed.
if negotiating_in.is_empty()
&& negotiating_out.is_empty()
&& requested_substreams.is_empty()
&& stream_counter.has_no_active_streams()
// Check if the connection (and handler) should be shut down.
// As long as we're still negotiating substreams or have
// any active streams shutdown is always postponed.
if negotiating_in.is_empty()
&& negotiating_out.is_empty()
&& requested_substreams.is_empty()
&& stream_counter.has_no_active_streams()
{
if let Some(new_timeout) =
compute_new_shutdown(handler.connection_keep_alive(), shutdown, *idle_timeout)
{
if let Some(new_timeout) =
compute_new_shutdown(handler.connection_keep_alive(), shutdown, *idle_timeout)
{
*shutdown = new_timeout;
}
*shutdown = new_timeout;
}

match shutdown {
Shutdown::None => {}
Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)),
Shutdown::Later(delay) => match Future::poll(Pin::new(delay), cx) {
Poll::Ready(_) => {
return Poll::Ready(Err(ConnectionError::KeepAliveTimeout))
}
Poll::Pending => {}
},
}
} else {
*shutdown = Shutdown::None;
match shutdown {
Shutdown::None => {}
Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)),
Shutdown::Later(delay) => match Future::poll(Pin::new(delay), cx) {
Poll::Ready(_) => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)),
Poll::Pending => {}
},
}
} else {
*shutdown = Shutdown::None;
}

match muxing.poll_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(StreamMuxerEvent::AddressChange(address)) => {
handler.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
new_address: &address,
}));
return Poll::Ready(Ok(Event::AddressChange(address)));
}
// Poll muxer for events (only once per wake)
match muxing.poll_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(StreamMuxerEvent::AddressChange(address)) => {
handler.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
new_address: &address,
}));
return Poll::Ready(Ok(Event::AddressChange(address)));
}
}

if let Some(requested_substream) = requested_substreams.iter_mut().next() {
match muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
// Try to open outbound substreams (exhaust until Pending or no more requested)
loop {
// First, let requested_substreams clean up any extracted items
while let Poll::Ready(Some(Ok(()))) = requested_substreams.poll_next_unpin(cx) {
// Item was extracted and is now removed
}

match muxing.poll_outbound_unpin(cx)? {
Poll::Pending => break,
Poll::Ready(substream) => {
// Only extract a requested substream if muxer gave us one
if let Some(requested_substream) = requested_substreams.iter_mut().next() {
let (user_data, timeout, upgrade) = requested_substream.extract();

negotiating_out.push(StreamUpgrade::new_outbound(
Expand All @@ -425,50 +437,54 @@ where
*substream_upgrade_protocol_override,
stream_counter.clone(),
));

// Go back to the top,
// handler can potentially make progress again.
continue;
// Continue to try to open more
} else {
// No more requested substreams, stop trying
break;
}
}
}
}

if negotiating_in.len() < *max_negotiating_inbound_streams {
match muxing.poll_inbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
let protocol = handler.listen_protocol();
// Accept inbound substreams (exhaust until Pending or hit max)
loop {
if negotiating_in.len() >= *max_negotiating_inbound_streams {
break;
}

negotiating_in.push(StreamUpgrade::new_inbound(
substream,
protocol,
stream_counter.clone(),
));
match muxing.poll_inbound_unpin(cx)? {
Poll::Pending => break,
Poll::Ready(substream) => {
let protocol = handler.listen_protocol();

// Go back to the top,
// handler can potentially make progress again.
continue;
}
negotiating_in.push(StreamUpgrade::new_inbound(
substream,
protocol,
stream_counter.clone(),
));
// Continue to accept more
}
}
}

let changes = ProtocolsChange::from_full_sets(
supported_protocols,
handler.listen_protocol().upgrade().protocol_info(),
protocol_buffer,
);
// Check protocol changes (only once per wake)
let changes = ProtocolsChange::from_full_sets(
supported_protocols,
handler.listen_protocol().upgrade().protocol_info(),
protocol_buffer,
);

if !changes.is_empty() {
for change in changes {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
}
// Go back to the top, handler can potentially make progress again.
continue;
if !changes.is_empty() {
for change in changes {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
}

// Nothing can make progress, return `Pending`.
// Wake ourselves to let handler react
cx.waker().wake_by_ref();
return Poll::Pending;
}

// All phases complete, nothing ready
Poll::Pending
}

#[cfg(test)]
Expand Down
Loading