Skip to content

Commit a357d87

Browse files
committed
Sample task poll function
1 parent 7f0a028 commit a357d87

File tree

3 files changed

+122
-78
lines changed

3 files changed

+122
-78
lines changed

nomos-da/network/core/src/protocols/sampling/behaviour.rs

+109-70
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,108 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
606606
}
607607
}
608608
}
609+
610+
fn handle_stream_response(
611+
peer_id: PeerId,
612+
tasks: &FuturesUnordered<SamplingStreamFuture>,
613+
to_sample: &mut HashMap<PeerId, VecDeque<(SubnetworkId, BlobId)>>,
614+
to_close: &mut VecDeque<SampleStream>,
615+
stream_response: SampleStreamSuccess,
616+
stream: SampleStream,
617+
) -> Poll<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>> {
618+
match stream_response {
619+
SampleStreamSuccess::Writer(sample_response) => {
620+
// Handle the free stream then return the response.
621+
Self::schedule_outgoing_stream_task(tasks, to_sample, to_close, stream);
622+
Self::handle_sample_response(*sample_response, peer_id)
623+
}
624+
SampleStreamSuccess::Reader => {
625+
// Writer might be hoping to send to this stream another request, wait
626+
// until the writer closes the stream.
627+
let (request_receiver, response_sender) =
628+
Self::schedule_incoming_stream_task(tasks, stream);
629+
Poll::Ready(ToSwarm::GenerateEvent(SamplingEvent::IncomingSample {
630+
request_receiver,
631+
response_sender,
632+
}))
633+
}
634+
}
635+
}
636+
637+
fn handle_stream_error(
638+
to_sample: &mut HashMap<PeerId, VecDeque<(SubnetworkId, BlobId)>>,
639+
to_close: &mut VecDeque<SampleStream>,
640+
error: SamplingError,
641+
maybe_stream: Option<SampleStream>,
642+
) -> Option<Poll<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>> {
643+
match error {
644+
SamplingError::Io {
645+
error,
646+
peer_id,
647+
message,
648+
} if error.kind() == std::io::ErrorKind::ConnectionReset => {
649+
// Connection reset with the attached message comes from the stream that we've
650+
// tried to write to - if connection reset happens during the write it's most
651+
// likely because we didn't have the connection to peer or peer closed stream on
652+
// it's end because it stopped waiting for messages through this stream.
653+
if let Some(peer_queue) = to_sample.get_mut(&peer_id) {
654+
if let Some(m) = message {
655+
peer_queue.push_back((m.column_idx, m.blob_id));
656+
}
657+
}
658+
// Stream is useless if connection was reset.
659+
if let Some(stream) = maybe_stream {
660+
to_close.push_back(stream);
661+
}
662+
None
663+
}
664+
SamplingError::Io { error, .. }
665+
if error.kind() == std::io::ErrorKind::UnexpectedEof =>
666+
{
667+
// Eof is actually expected and is proper signal about remote closing the
668+
// stream. Do not propagate and continue execution of behaviour poll method.
669+
if let Some(stream) = maybe_stream {
670+
to_close.push_back(stream);
671+
}
672+
None
673+
}
674+
error => {
675+
if let Some(stream) = maybe_stream {
676+
to_close.push_back(stream);
677+
}
678+
Some(Poll::Ready(ToSwarm::GenerateEvent(
679+
SamplingEvent::SamplingError { error },
680+
)))
681+
}
682+
}
683+
}
684+
685+
fn poll_tasks(
686+
cx: &mut Context<'_>,
687+
tasks: &mut FuturesUnordered<SamplingStreamFuture>,
688+
to_sample: &mut HashMap<PeerId, VecDeque<(SubnetworkId, BlobId)>>,
689+
to_close: &mut VecDeque<SampleStream>,
690+
) -> Option<Poll<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>> {
691+
if let Poll::Ready(Some(future_result)) = tasks.poll_next_unpin(cx) {
692+
cx.waker().wake_by_ref();
693+
match future_result {
694+
Ok((peer_id, stream_response, stream)) => {
695+
return Some(Self::handle_stream_response(
696+
peer_id,
697+
tasks,
698+
to_sample,
699+
to_close,
700+
stream_response,
701+
stream,
702+
));
703+
}
704+
Err((error, maybe_stream)) => {
705+
Self::handle_stream_error(to_sample, to_close, error, maybe_stream);
706+
}
707+
}
708+
}
709+
None
710+
}
609711
}
610712

611713
impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> NetworkBehaviour
@@ -670,10 +772,6 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
670772
.on_connection_handler_event(peer_id, connection_id, event);
671773
}
672774

673-
#[expect(
674-
clippy::too_many_lines,
675-
reason = "Poll method contains all the branches in small enough sections"
676-
)]
677775
fn poll(
678776
&mut self,
679777
cx: &mut Context<'_>,
@@ -705,6 +803,7 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
705803
control,
706804
);
707805
}
806+
708807
// poll incoming streams
709808
if let Poll::Ready(Some((peer_id, stream))) = incoming_streams.poll_next_unpin(cx) {
710809
let sample_stream = SampleStream { stream, peer_id };
@@ -716,76 +815,14 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
716815
response_sender,
717816
}));
718817
}
818+
719819
// poll tasks
720-
if let Poll::Ready(Some(future_result)) = tasks.poll_next_unpin(cx) {
721-
cx.waker().wake_by_ref(); // Check stream_tasks until it's empty.
722-
match future_result {
723-
Ok((peer_id, stream_response, stream)) => {
724-
match stream_response {
725-
SampleStreamSuccess::Writer(sample_response) => {
726-
// Schedule a new task if its available or drop the stream if not.
727-
Self::schedule_outgoing_stream_task(tasks, to_sample, to_close, stream);
728-
// handle the free stream then return the success
729-
return Self::handle_sample_response(*sample_response, peer_id);
730-
}
731-
SampleStreamSuccess::Reader => {
732-
// Writer might be hoping to send to this stream
733-
// another request, wait
734-
// until the writer closes the stream.
735-
let (request_receiver, response_sender) =
736-
Self::schedule_incoming_stream_task(tasks, stream);
737-
return Poll::Ready(ToSwarm::GenerateEvent(
738-
SamplingEvent::IncomingSample {
739-
request_receiver,
740-
response_sender,
741-
},
742-
));
743-
}
744-
}
745-
}
746-
Err((
747-
SamplingError::Io {
748-
error,
749-
peer_id,
750-
message,
751-
},
752-
maybe_stream,
753-
)) if error.kind() == std::io::ErrorKind::ConnectionReset => {
754-
if let Some(peer_queue) = to_sample.get_mut(&peer_id) {
755-
if let Some(sampling::SampleRequest {
756-
column_idx,
757-
blob_id,
758-
}) = message
759-
{
760-
peer_queue.push_back((column_idx, blob_id));
761-
}
762-
}
763-
if let Some(stream) = maybe_stream {
764-
to_close.push_back(stream);
765-
}
766-
}
767-
Err((SamplingError::Io { error, .. }, maybe_stream))
768-
if error.kind() == std::io::ErrorKind::UnexpectedEof =>
769-
{
770-
// Eof is actually expected and is proper signal about remote closing the
771-
// stream. Do not propagate and continue execution of this poll method.
772-
if let Some(stream) = maybe_stream {
773-
to_close.push_back(stream);
774-
}
775-
}
776-
Err((error, maybe_stream)) => {
777-
if let Some(stream) = maybe_stream {
778-
to_close.push_back(stream);
779-
}
780-
return Poll::Ready(ToSwarm::GenerateEvent(SamplingEvent::SamplingError {
781-
error,
782-
}));
783-
}
784-
}
820+
if let Some(result) = Self::poll_tasks(cx, tasks, to_sample, to_close) {
821+
return result;
785822
}
823+
786824
// Deal with connection as the underlying behaviour would do
787825
if let Poll::Ready(ToSwarm::Dial { mut opts }) = self.stream_behaviour.poll(cx) {
788-
cx.waker().wake_by_ref();
789826
// attach known peer address if possible
790827
if let Some(address) = opts
791828
.get_peer_id()
@@ -796,9 +833,11 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
796833
.extend_addresses_through_behaviour()
797834
.build();
798835
// If we dial, some outgoing task is created, poll again.
836+
cx.waker().wake_by_ref();
799837
return Poll::Ready(ToSwarm::Dial { opts });
800838
}
801839
}
840+
802841
// Discard stream, if still pending pushback to close later.
803842
if let Some(mut stream) = to_close.pop_front() {
804843
if stream.stream.close().poll_unpin(cx).is_pending() {

nomos-da/network/core/src/swarm/common/monitor.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ where
259259
fn record_event(&mut self, event: Self::Event) -> Option<ConnectionMonitorOutput> {
260260
if let Some(peer_id) = event.peer_id() {
261261
tracing::info!("MONITOR EVENT: {event:?}");
262+
println!(">>>>>>>> MONITOR event {event:?}");
262263
let stats = self.peer_stats.entry(*peer_id).or_default();
263264
let now = Instant::now();
264265
match event {
@@ -375,7 +376,8 @@ mod tests {
375376
for _ in 0..4 {
376377
monitor.record_event(MonitorEvent::Sampling(SamplingError::Io {
377378
peer_id,
378-
error: std::io::Error::new(std::io::ErrorKind::Other, "Simulated I/O error"),
379+
error: std::io::Error::other("Simulated I/O error"),
380+
message: None,
379381
}));
380382
}
381383

@@ -393,7 +395,8 @@ mod tests {
393395
for _ in 0..100 {
394396
monitor.record_event(MonitorEvent::Sampling(SamplingError::Io {
395397
peer_id,
396-
error: std::io::Error::new(std::io::ErrorKind::Other, "Simulated I/O error"),
398+
error: std::io::Error::other("Simulated I/O error"),
399+
message: None,
397400
}));
398401
}
399402

@@ -411,7 +414,8 @@ mod tests {
411414
for _ in 0..4 {
412415
monitor.record_event(MonitorEvent::Sampling(SamplingError::Io {
413416
peer_id,
414-
error: std::io::Error::new(std::io::ErrorKind::Other, "Simulated I/O error"),
417+
error: std::io::Error::other("Simulated I/O error"),
418+
message: None,
415419
}));
416420
}
417421

@@ -434,7 +438,8 @@ mod tests {
434438
for _ in 0..4 {
435439
monitor.record_event(MonitorEvent::Sampling(SamplingError::Io {
436440
peer_id,
437-
error: std::io::Error::new(std::io::ErrorKind::Other, "Simulated I/O error"),
441+
error: std::io::Error::other("Simulated I/O error"),
442+
message: None,
438443
}));
439444
}
440445

tests/src/topology/configs/da.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ impl Default for DaParams {
6060
policy_settings: DAConnectionPolicySettings {
6161
min_dispersal_peers: 1,
6262
min_replication_peers: 1,
63-
max_dispersal_failures: 1,
64-
max_sampling_failures: 1,
65-
max_replication_failures: 1,
66-
malicious_threshold: 2,
63+
max_dispersal_failures: 0,
64+
max_sampling_failures: 0,
65+
max_replication_failures: 0,
66+
malicious_threshold: 0,
6767
},
6868
monitor_settings: DAConnectionMonitorSettings {
6969
failure_time_window: Duration::from_secs(5),

0 commit comments

Comments
 (0)