@@ -606,6 +606,108 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
606
606
}
607
607
}
608
608
}
609
+
610
+ fn handle_stream_response (
611
+ peer_id : PeerId ,
612
+ tasks : & FuturesUnordered < SamplingStreamFuture > ,
613
+ to_sample : & mut HashMap < PeerId , VecDeque < ( SubnetworkId , BlobId ) > > ,
614
+ to_close : & mut VecDeque < SampleStream > ,
615
+ stream_response : SampleStreamSuccess ,
616
+ stream : SampleStream ,
617
+ ) -> Poll < ToSwarm < <Self as NetworkBehaviour >:: ToSwarm , THandlerInEvent < Self > > > {
618
+ match stream_response {
619
+ SampleStreamSuccess :: Writer ( sample_response) => {
620
+ // Handle the free stream then return the response.
621
+ Self :: schedule_outgoing_stream_task ( tasks, to_sample, to_close, stream) ;
622
+ Self :: handle_sample_response ( * sample_response, peer_id)
623
+ }
624
+ SampleStreamSuccess :: Reader => {
625
+ // Writer might be hoping to send to this stream another request, wait
626
+ // until the writer closes the stream.
627
+ let ( request_receiver, response_sender) =
628
+ Self :: schedule_incoming_stream_task ( tasks, stream) ;
629
+ Poll :: Ready ( ToSwarm :: GenerateEvent ( SamplingEvent :: IncomingSample {
630
+ request_receiver,
631
+ response_sender,
632
+ } ) )
633
+ }
634
+ }
635
+ }
636
+
637
+ fn handle_stream_error (
638
+ to_sample : & mut HashMap < PeerId , VecDeque < ( SubnetworkId , BlobId ) > > ,
639
+ to_close : & mut VecDeque < SampleStream > ,
640
+ error : SamplingError ,
641
+ maybe_stream : Option < SampleStream > ,
642
+ ) -> Option < Poll < ToSwarm < <Self as NetworkBehaviour >:: ToSwarm , THandlerInEvent < Self > > > > {
643
+ match error {
644
+ SamplingError :: Io {
645
+ error,
646
+ peer_id,
647
+ message,
648
+ } if error. kind ( ) == std:: io:: ErrorKind :: ConnectionReset => {
649
+ // Connection reset with the attached message comes from the stream that we've
650
+ // tried to write to - if connection reset happens during the write it's most
651
+ // likely because we didn't have the connection to peer or peer closed stream on
652
+ // it's end because it stopped waiting for messages through this stream.
653
+ if let Some ( peer_queue) = to_sample. get_mut ( & peer_id) {
654
+ if let Some ( m) = message {
655
+ peer_queue. push_back ( ( m. column_idx , m. blob_id ) ) ;
656
+ }
657
+ }
658
+ // Stream is useless if connection was reset.
659
+ if let Some ( stream) = maybe_stream {
660
+ to_close. push_back ( stream) ;
661
+ }
662
+ None
663
+ }
664
+ SamplingError :: Io { error, .. }
665
+ if error. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof =>
666
+ {
667
+ // Eof is actually expected and is proper signal about remote closing the
668
+ // stream. Do not propagate and continue execution of behaviour poll method.
669
+ if let Some ( stream) = maybe_stream {
670
+ to_close. push_back ( stream) ;
671
+ }
672
+ None
673
+ }
674
+ error => {
675
+ if let Some ( stream) = maybe_stream {
676
+ to_close. push_back ( stream) ;
677
+ }
678
+ Some ( Poll :: Ready ( ToSwarm :: GenerateEvent (
679
+ SamplingEvent :: SamplingError { error } ,
680
+ ) ) )
681
+ }
682
+ }
683
+ }
684
+
685
+ fn poll_tasks (
686
+ cx : & mut Context < ' _ > ,
687
+ tasks : & mut FuturesUnordered < SamplingStreamFuture > ,
688
+ to_sample : & mut HashMap < PeerId , VecDeque < ( SubnetworkId , BlobId ) > > ,
689
+ to_close : & mut VecDeque < SampleStream > ,
690
+ ) -> Option < Poll < ToSwarm < <Self as NetworkBehaviour >:: ToSwarm , THandlerInEvent < Self > > > > {
691
+ if let Poll :: Ready ( Some ( future_result) ) = tasks. poll_next_unpin ( cx) {
692
+ cx. waker ( ) . wake_by_ref ( ) ;
693
+ match future_result {
694
+ Ok ( ( peer_id, stream_response, stream) ) => {
695
+ return Some ( Self :: handle_stream_response (
696
+ peer_id,
697
+ tasks,
698
+ to_sample,
699
+ to_close,
700
+ stream_response,
701
+ stream,
702
+ ) ) ;
703
+ }
704
+ Err ( ( error, maybe_stream) ) => {
705
+ Self :: handle_stream_error ( to_sample, to_close, error, maybe_stream) ;
706
+ }
707
+ }
708
+ }
709
+ None
710
+ }
609
711
}
610
712
611
713
impl < M : MembershipHandler < Id = PeerId , NetworkId = SubnetworkId > + ' static > NetworkBehaviour
@@ -670,10 +772,6 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
670
772
. on_connection_handler_event ( peer_id, connection_id, event) ;
671
773
}
672
774
673
- #[ expect(
674
- clippy:: too_many_lines,
675
- reason = "Poll method contains all the branches in small enough sections"
676
- ) ]
677
775
fn poll (
678
776
& mut self ,
679
777
cx : & mut Context < ' _ > ,
@@ -705,6 +803,7 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
705
803
control,
706
804
) ;
707
805
}
806
+
708
807
// poll incoming streams
709
808
if let Poll :: Ready ( Some ( ( peer_id, stream) ) ) = incoming_streams. poll_next_unpin ( cx) {
710
809
let sample_stream = SampleStream { stream, peer_id } ;
@@ -716,76 +815,14 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
716
815
response_sender,
717
816
} ) ) ;
718
817
}
818
+
719
819
// poll tasks
720
- if let Poll :: Ready ( Some ( future_result) ) = tasks. poll_next_unpin ( cx) {
721
- cx. waker ( ) . wake_by_ref ( ) ; // Check stream_tasks until it's empty.
722
- match future_result {
723
- Ok ( ( peer_id, stream_response, stream) ) => {
724
- match stream_response {
725
- SampleStreamSuccess :: Writer ( sample_response) => {
726
- // Schedule a new task if its available or drop the stream if not.
727
- Self :: schedule_outgoing_stream_task ( tasks, to_sample, to_close, stream) ;
728
- // handle the free stream then return the success
729
- return Self :: handle_sample_response ( * sample_response, peer_id) ;
730
- }
731
- SampleStreamSuccess :: Reader => {
732
- // Writer might be hoping to send to this stream
733
- // another request, wait
734
- // until the writer closes the stream.
735
- let ( request_receiver, response_sender) =
736
- Self :: schedule_incoming_stream_task ( tasks, stream) ;
737
- return Poll :: Ready ( ToSwarm :: GenerateEvent (
738
- SamplingEvent :: IncomingSample {
739
- request_receiver,
740
- response_sender,
741
- } ,
742
- ) ) ;
743
- }
744
- }
745
- }
746
- Err ( (
747
- SamplingError :: Io {
748
- error,
749
- peer_id,
750
- message,
751
- } ,
752
- maybe_stream,
753
- ) ) if error. kind ( ) == std:: io:: ErrorKind :: ConnectionReset => {
754
- if let Some ( peer_queue) = to_sample. get_mut ( & peer_id) {
755
- if let Some ( sampling:: SampleRequest {
756
- column_idx,
757
- blob_id,
758
- } ) = message
759
- {
760
- peer_queue. push_back ( ( column_idx, blob_id) ) ;
761
- }
762
- }
763
- if let Some ( stream) = maybe_stream {
764
- to_close. push_back ( stream) ;
765
- }
766
- }
767
- Err ( ( SamplingError :: Io { error, .. } , maybe_stream) )
768
- if error. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof =>
769
- {
770
- // Eof is actually expected and is proper signal about remote closing the
771
- // stream. Do not propagate and continue execution of this poll method.
772
- if let Some ( stream) = maybe_stream {
773
- to_close. push_back ( stream) ;
774
- }
775
- }
776
- Err ( ( error, maybe_stream) ) => {
777
- if let Some ( stream) = maybe_stream {
778
- to_close. push_back ( stream) ;
779
- }
780
- return Poll :: Ready ( ToSwarm :: GenerateEvent ( SamplingEvent :: SamplingError {
781
- error,
782
- } ) ) ;
783
- }
784
- }
820
+ if let Some ( result) = Self :: poll_tasks ( cx, tasks, to_sample, to_close) {
821
+ return result;
785
822
}
823
+
786
824
// Deal with connection as the underlying behaviour would do
787
825
if let Poll :: Ready ( ToSwarm :: Dial { mut opts } ) = self . stream_behaviour . poll ( cx) {
788
- cx. waker ( ) . wake_by_ref ( ) ;
789
826
// attach known peer address if possible
790
827
if let Some ( address) = opts
791
828
. get_peer_id ( )
@@ -796,9 +833,11 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
796
833
. extend_addresses_through_behaviour ( )
797
834
. build ( ) ;
798
835
// If we dial, some outgoing task is created, poll again.
836
+ cx. waker ( ) . wake_by_ref ( ) ;
799
837
return Poll :: Ready ( ToSwarm :: Dial { opts } ) ;
800
838
}
801
839
}
840
+
802
841
// Discard stream, if still pending pushback to close later.
803
842
if let Some ( mut stream) = to_close. pop_front ( ) {
804
843
if stream. stream . close ( ) . poll_unpin ( cx) . is_pending ( ) {
0 commit comments