diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index e52d4b0e893..0dc2d5008df 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -271,6 +271,7 @@ where .. } = self.get_mut(); + // Exhaust requested_substreams (must poll until Pending per Stream contract) loop { match requested_substreams.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => continue, @@ -283,16 +284,17 @@ where )); continue; } - Poll::Ready(None) | Poll::Pending => {} + Poll::Ready(None) | Poll::Pending => break, } + } - // Poll the [`ConnectionHandler`]. + // Exhaust the [`ConnectionHandler`] (must poll until exhausted). + loop { match handler.poll(cx) { - Poll::Pending => {} + Poll::Pending => break, Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { let timeout = *protocol.timeout(); let (upgrade, user_data) = protocol.into_upgrade(); - requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade)); continue; // Poll handler until exhausted. } @@ -324,11 +326,12 @@ where continue; } } + } - // In case the [`ConnectionHandler`] can not make any more progress, poll the - // negotiating outbound streams. + // Exhaust negotiating outbound streams (must poll until Pending per Stream contract) + loop { match negotiating_out.poll_next_unpin(cx) { - Poll::Pending | Poll::Ready(None) => {} + Poll::Pending | Poll::Ready(None) => break, Poll::Ready(Some((info, Ok(protocol)))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( FullyNegotiatedOutbound { protocol, info }, @@ -342,11 +345,12 @@ where continue; } } + } - // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not - // make any more progress, poll the negotiating inbound streams. + // Exhaust negotiating inbound streams (must poll until Pending per Stream contract) + loop { match negotiating_in.poll_next_unpin(cx) { - Poll::Pending | Poll::Ready(None) => {} + Poll::Pending | Poll::Ready(None) => break, Poll::Ready(Some((info, Ok(protocol)))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedInbound( FullyNegotiatedInbound { protocol, info }, @@ -372,49 +376,57 @@ where continue; } } + } - // Check if the connection (and handler) should be shut down. - // As long as we're still negotiating substreams or have - // any active streams shutdown is always postponed. - if negotiating_in.is_empty() - && negotiating_out.is_empty() - && requested_substreams.is_empty() - && stream_counter.has_no_active_streams() + // Check if the connection (and handler) should be shut down. + // As long as we're still negotiating substreams or have + // any active streams shutdown is always postponed. + if negotiating_in.is_empty() + && negotiating_out.is_empty() + && requested_substreams.is_empty() + && stream_counter.has_no_active_streams() + { + if let Some(new_timeout) = + compute_new_shutdown(handler.connection_keep_alive(), shutdown, *idle_timeout) { - if let Some(new_timeout) = - compute_new_shutdown(handler.connection_keep_alive(), shutdown, *idle_timeout) - { - *shutdown = new_timeout; - } + *shutdown = new_timeout; + } - match shutdown { - Shutdown::None => {} - Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), - Shutdown::Later(delay) => match Future::poll(Pin::new(delay), cx) { - Poll::Ready(_) => { - return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) - } - Poll::Pending => {} - }, - } - } else { - *shutdown = Shutdown::None; + match shutdown { + Shutdown::None => {} + Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), + Shutdown::Later(delay) => match Future::poll(Pin::new(delay), cx) { + Poll::Ready(_) => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), + Poll::Pending => {} + }, } + } else { + *shutdown = Shutdown::None; + } - match muxing.poll_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { - handler.on_connection_event(ConnectionEvent::AddressChange(AddressChange { - new_address: &address, - })); - return Poll::Ready(Ok(Event::AddressChange(address))); - } + // Poll muxer for events (only once per wake) + match muxing.poll_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { + handler.on_connection_event(ConnectionEvent::AddressChange(AddressChange { + new_address: &address, + })); + return Poll::Ready(Ok(Event::AddressChange(address))); } + } - if let Some(requested_substream) = requested_substreams.iter_mut().next() { - match muxing.poll_outbound_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(substream) => { + // Try to open outbound substreams (exhaust until Pending or no more requested) + loop { + // First, let requested_substreams clean up any extracted items + while let Poll::Ready(Some(Ok(()))) = requested_substreams.poll_next_unpin(cx) { + // Item was extracted and is now removed + } + + match muxing.poll_outbound_unpin(cx)? { + Poll::Pending => break, + Poll::Ready(substream) => { + // Only extract a requested substream if muxer gave us one + if let Some(requested_substream) = requested_substreams.iter_mut().next() { let (user_data, timeout, upgrade) = requested_substream.extract(); negotiating_out.push(StreamUpgrade::new_outbound( @@ -425,50 +437,54 @@ where *substream_upgrade_protocol_override, stream_counter.clone(), )); - - // Go back to the top, - // handler can potentially make progress again. - continue; + // Continue to try to open more + } else { + // No more requested substreams, stop trying + break; } } } + } - if negotiating_in.len() < *max_negotiating_inbound_streams { - match muxing.poll_inbound_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(substream) => { - let protocol = handler.listen_protocol(); + // Accept inbound substreams (exhaust until Pending or hit max) + loop { + if negotiating_in.len() >= *max_negotiating_inbound_streams { + break; + } - negotiating_in.push(StreamUpgrade::new_inbound( - substream, - protocol, - stream_counter.clone(), - )); + match muxing.poll_inbound_unpin(cx)? { + Poll::Pending => break, + Poll::Ready(substream) => { + let protocol = handler.listen_protocol(); - // Go back to the top, - // handler can potentially make progress again. - continue; - } + negotiating_in.push(StreamUpgrade::new_inbound( + substream, + protocol, + stream_counter.clone(), + )); + // Continue to accept more } } + } - let changes = ProtocolsChange::from_full_sets( - supported_protocols, - handler.listen_protocol().upgrade().protocol_info(), - protocol_buffer, - ); + // Check protocol changes (only once per wake) + let changes = ProtocolsChange::from_full_sets( + supported_protocols, + handler.listen_protocol().upgrade().protocol_info(), + protocol_buffer, + ); - if !changes.is_empty() { - for change in changes { - handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change)); - } - // Go back to the top, handler can potentially make progress again. - continue; + if !changes.is_empty() { + for change in changes { + handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change)); } - - // Nothing can make progress, return `Pending`. + // Wake ourselves to let handler react + cx.waker().wake_by_ref(); return Poll::Pending; } + + // All phases complete, nothing ready + Poll::Pending } #[cfg(test)]