diff --git a/quic/s2n-quic-transport/src/ack/interest.rs b/quic/s2n-quic-transport/src/ack/interest.rs index 23e1b534dd..13abf2970d 100644 --- a/quic/s2n-quic-transport/src/ack/interest.rs +++ b/quic/s2n-quic-transport/src/ack/interest.rs @@ -4,7 +4,6 @@ #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum Interest { None, - #[allow(dead_code)] Immediate, } diff --git a/quic/s2n-quic-transport/src/ack/mod.rs b/quic/s2n-quic-transport/src/ack/mod.rs index 1ec77780d1..41d5330180 100644 --- a/quic/s2n-quic-transport/src/ack/mod.rs +++ b/quic/s2n-quic-transport/src/ack/mod.rs @@ -9,8 +9,7 @@ mod ack_manager; pub(crate) mod ack_ranges; mod ack_transmission_state; pub mod interest; -#[allow(dead_code)] -mod pending_ack_ranges; +pub(crate) mod pending_ack_ranges; #[cfg(test)] mod tests; diff --git a/quic/s2n-quic-transport/src/ack/pending_ack_ranges.rs b/quic/s2n-quic-transport/src/ack/pending_ack_ranges.rs index b64bdbf5aa..41a6a77ca5 100644 --- a/quic/s2n-quic-transport/src/ack/pending_ack_ranges.rs +++ b/quic/s2n-quic-transport/src/ack/pending_ack_ranges.rs @@ -1,26 +1,26 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use crate::{ack::ack_ranges::AckRanges, interval_set}; +use crate::ack::ack_ranges::AckRanges; use core::time::Duration; use s2n_quic_core::{ frame::ack::EcnCounts, packet::number::{PacketNumber, PacketNumberRange}, }; -/// Stores ACK ranges pending processing +/// Stores aggregated ACK info for delayed processing #[derive(Clone, Debug, Default)] pub struct PendingAckRanges { - ranges: AckRanges, + ack_ranges: AckRanges, ecn_counts: EcnCounts, ack_delay: Duration, } impl PendingAckRanges { #[inline] - pub fn new(ranges: AckRanges, ecn_counts: EcnCounts, ack_delay: Duration) -> Self { + pub fn new(ack_ranges: AckRanges, ecn_counts: EcnCounts, ack_delay: Duration) -> Self { PendingAckRanges { - ranges, + ack_ranges, ecn_counts, ack_delay, } @@ -48,7 +48,7 @@ impl PendingAckRanges { let mut did_insert = true; // TODO: add metrics if ack ranges are being dropped for range in acked_packets { - did_insert &= self.ranges.insert_packet_number_range(range).is_ok() + did_insert &= self.ack_ranges.insert_packet_number_range(range).is_ok() } match did_insert { @@ -57,16 +57,42 @@ impl PendingAckRanges { } } - /// Returns an iterator over all of the values contained in the ranges `IntervalSet`. + /// Returns an iterator over all values in the `AckRanges` #[inline] - pub fn iter(&self) -> interval_set::IntervalIter { - self.ranges.intervals() + pub fn iter(&self) -> impl Iterator + '_ { + self.ack_ranges + .inclusive_ranges() + .into_iter() + .map(|ack_range| PacketNumberRange::new(*ack_range.start(), *ack_range.end())) + } + + /// Returns `EcnCounts` aggregated over all the pending ACKs + #[inline] + pub fn ecn_counts(&self) -> Option { + if self.ack_ranges.is_empty() { + None + } else { + Some(self.ecn_counts) + } + } + + /// Returns the ACK delay associated with all the pending ACKs + #[inline] + pub fn ack_delay(&self) -> Duration { + self.ack_delay + } + + /// Returns the largest `PacketNumber` stored in the AckRanges. + /// + /// If no items are present in the set, `None` is returned. + pub fn max_value(&self) -> Option { + self.ack_ranges.max_value() } /// Clear the ack ranges and reset values #[inline] pub fn clear(&mut self) { - self.ranges.clear(); + self.ack_ranges.clear(); self.ecn_counts = EcnCounts::default(); self.ack_delay = Duration::default(); } @@ -74,7 +100,7 @@ impl PendingAckRanges { /// Returns if ack ranges are being tracked #[inline] pub fn is_empty(&self) -> bool { - self.ranges.is_empty() + self.ack_ranges.is_empty() } } @@ -114,7 +140,7 @@ mod tests { assert_eq!(pending_ack_ranges.ack_delay, now); assert_eq!(pending_ack_ranges.ecn_counts, ecn_counts); assert!(!pending_ack_ranges.is_empty()); - assert_eq!(pending_ack_ranges.ranges.interval_len(), 1); + assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 1); // insert new range with updated ack_delay and ecn_counts now = now.saturating_add(Duration::from_millis(1)); @@ -131,7 +157,7 @@ mod tests { assert_eq!(pending_ack_ranges.ack_delay, now); assert_eq!(pending_ack_ranges.ecn_counts, ecn_counts); assert!(!pending_ack_ranges.is_empty()); - assert_eq!(pending_ack_ranges.ranges.interval_len(), 2); + assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 2); // ensure pending_ack_ranges clear functionality works { @@ -139,9 +165,9 @@ mod tests { pending_ack_ranges.clear(); assert!(pending_ack_ranges.is_empty()); - assert_eq!(pending_ack_ranges.ranges.interval_len(), 0); - assert!(!pending_ack_ranges.ranges.contains(&pn_a)); - assert!(!pending_ack_ranges.ranges.contains(&pn_b)); + assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 0); + assert!(!pending_ack_ranges.ack_ranges.contains(&pn_a)); + assert!(!pending_ack_ranges.ack_ranges.contains(&pn_b)); } } @@ -189,7 +215,7 @@ mod tests { assert!(pending_ack_ranges .extend(range_1.into_iter(), Some(ecn_counts), now) .is_ok()); - assert_eq!(pending_ack_ranges.ranges.interval_len(), 1); + assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 1); } #[test] diff --git a/quic/s2n-quic-transport/src/connection/connection_container.rs b/quic/s2n-quic-transport/src/connection/connection_container.rs index 57ab8834b2..e9e1d06a23 100644 --- a/quic/s2n-quic-transport/src/connection/connection_container.rs +++ b/quic/s2n-quic-transport/src/connection/connection_container.rs @@ -63,7 +63,7 @@ intrusive_adapter!(WaitingForConnectionIdAdapter = Arc = Arc>: ConnectionNode { - waiting_for_connection_id_link: LinkedListLink + waiting_for_ack_link: LinkedListLink } where C: connection::Trait, L: connection::Lock); // Intrusive red black tree adapter for managing a list of `waiting_for_timeout` connections @@ -957,6 +957,7 @@ impl> ConnectionContainer { /// Iterates over all `Connection`s which are waiting to process ACKs, /// and executes the given function on each `Connection` + #[allow(dead_code)] pub fn iterate_ack_list(&mut self, mut func: F) where F: FnMut(&mut C), diff --git a/quic/s2n-quic-transport/src/connection/connection_container/tests.rs b/quic/s2n-quic-transport/src/connection/connection_container/tests.rs index 99bb6d436f..bd06a0edcc 100644 --- a/quic/s2n-quic-transport/src/connection/connection_container/tests.rs +++ b/quic/s2n-quic-transport/src/connection/connection_container/tests.rs @@ -125,7 +125,13 @@ impl connection::Trait for TestConnection { Ok(()) } - fn on_process_acks(&mut self) {} + fn on_pending_ack_ranges( + &mut self, + _timestamp: Timestamp, + _subscriber: &mut ::EventSubscriber, + ) -> Result<(), connection::Error> { + Ok(()) + } fn on_wakeup( &mut self, diff --git a/quic/s2n-quic-transport/src/connection/connection_impl.rs b/quic/s2n-quic-transport/src/connection/connection_impl.rs index d92322c470..aec8e0d052 100644 --- a/quic/s2n-quic-transport/src/connection/connection_impl.rs +++ b/quic/s2n-quic-transport/src/connection/connection_impl.rs @@ -1049,7 +1049,31 @@ impl connection::Trait for ConnectionImpl { } /// Process ACKs for the `Connection`. - fn on_process_acks(&mut self) {} + fn on_pending_ack_ranges( + &mut self, + timestamp: Timestamp, + subscriber: &mut Config::EventSubscriber, + ) -> Result<(), connection::Error> { + let mut publisher = self.event_context.publisher(timestamp, subscriber); + + // TODO: care should be taken to only delay ACK processing for the active path. + // However, the active path could change so it might be necessary to track the + // active path across some ACK delay processing. + let path_id = self.path_manager.active_path_id(); + self.space_manager + .on_pending_ack_ranges( + timestamp, + path_id, + &mut self.path_manager, + &mut self.local_id_registry, + &mut publisher, + ) + .map_err(|err| { + // TODO: publish metrics + + err.into() + }) + } /// Handles all external wakeups on the [`Connection`]. fn on_wakeup( diff --git a/quic/s2n-quic-transport/src/connection/connection_trait.rs b/quic/s2n-quic-transport/src/connection/connection_trait.rs index 2b0b85631a..e79f2a7b6b 100644 --- a/quic/s2n-quic-transport/src/connection/connection_trait.rs +++ b/quic/s2n-quic-transport/src/connection/connection_trait.rs @@ -99,7 +99,12 @@ pub trait ConnectionTrait: 'static + Send + Sized { subscriber: &mut ::EventSubscriber, ) -> Result<(), connection::Error>; - fn on_process_acks(&mut self); + /// Process pendings ACKs for the `Connection`. + fn on_pending_ack_ranges( + &mut self, + timestamp: Timestamp, + subscriber: &mut ::EventSubscriber, + ) -> Result<(), connection::Error>; /// Handles all external wakeups on the [`Connection`]. fn on_wakeup( diff --git a/quic/s2n-quic-transport/src/endpoint/mod.rs b/quic/s2n-quic-transport/src/endpoint/mod.rs index 237c1dd439..07f572db17 100644 --- a/quic/s2n-quic-transport/src/endpoint/mod.rs +++ b/quic/s2n-quic-transport/src/endpoint/mod.rs @@ -106,24 +106,34 @@ impl s2n_quic_core::endpoint::Endpoint for Endpoint { let mut now: Option = None; for entry in entries.iter_mut() { - if let Some((header, payload)) = entry.read(&local_address) { - let timestamp = match now { - Some(now) => now, - _ => { - let time = clock.get_time(); - now = Some(time); - time - } - }; + let timestamp = match now { + Some(time) => time, + None => { + now = Some(clock.get_time()); + now.expect("value should be set") + } + }; + if let Some((header, payload)) = entry.read(&local_address) { self.receive_datagram(&header, payload, timestamp) } } - // process ACKs on Connections with interest - self.connections.iterate_ack_list(|connection| { - connection.on_process_acks(); - }); + // TODO process pending acks + // let endpoint_context = self.config.context(); + // // process ACKs on Connections with interest + // self.connections.iterate_ack_list(|connection| { + // let timestamp = match now { + // Some(time) => time, + // None => { + // now = Some(clock.get_time()); + // now.expect("value should be set") + // } + // }; + // + // // handle error and potentially close the connection + // connection.on_pending_ack_ranges(timestamp, endpoint_context.event_subscriber); + // }); let len = entries.len(); queue.finish(len); diff --git a/quic/s2n-quic-transport/src/recovery/manager.rs b/quic/s2n-quic-transport/src/recovery/manager.rs index 1402eacab3..f609989b16 100644 --- a/quic/s2n-quic-transport/src/recovery/manager.rs +++ b/quic/s2n-quic-transport/src/recovery/manager.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + ack::pending_ack_ranges::PendingAckRanges, contexts::WriteContext, endpoint, path::{self, ecn::ValidationOutcome, path_event, Path}, @@ -304,6 +305,43 @@ impl Manager { self.pto.on_transmit(context) } + /// Process pending ACK information. + /// + /// Update congestion controller, timers and meta data around acked packet ranges. + pub fn on_pending_ack_ranges, Pub: event::ConnectionPublisher>( + &mut self, + timestamp: Timestamp, + pending_ack_ranges: &mut PendingAckRanges, + context: &mut Ctx, + publisher: &mut Pub, + ) -> Result<(), transport::Error> { + debug_assert!( + !pending_ack_ranges.is_empty(), + "pending_ack_ranges should be non-empty since connection indicated ack interest" + ); + + let largest_acked_packet_number = pending_ack_ranges + .max_value() + .expect("pending range should not be empty"); + let result = self.process_acks( + timestamp, + pending_ack_ranges.iter(), + largest_acked_packet_number, + pending_ack_ranges.ack_delay(), + pending_ack_ranges.ecn_counts(), + context, + publisher, + ); + + // reset pending ack information after processing + // + // If there was an error during processing its probably safer + // to clear the queue rather than try again. + pending_ack_ranges.clear(); + + result + } + /// Process ACK frame. /// /// Update congestion controller, timers and meta data around acked packet ranges. @@ -318,34 +356,52 @@ impl Manager { context: &mut Ctx, publisher: &mut Pub, ) -> Result<(), transport::Error> { - let largest_acked_packet_number = - self.space.new_packet_number(frame.largest_acknowledged()); - let mut newly_acked_packets = SmallVec::< - [SentPacketInfo; ACKED_PACKETS_INITIAL_CAPACITY], - >::new(); + let space = self.space; + let largest_acked_packet_number = space.new_packet_number(frame.largest_acknowledged()); + self.process_acks( + timestamp, + frame.ack_ranges().map(|ack_range| { + let (start, end) = ack_range.into_inner(); + PacketNumberRange::new(space.new_packet_number(start), space.new_packet_number(end)) + }), + largest_acked_packet_number, + frame.ack_delay(), + frame.ecn_counts, + context, + publisher, + )?; + + Ok(()) + } + /// Generic interface for processing ACK ranges. + #[allow(clippy::too_many_arguments)] + fn process_acks, Pub: event::ConnectionPublisher>( + &mut self, + timestamp: Timestamp, + ranges: impl Iterator, + largest_acked_packet_number: PacketNumber, + ack_delay: Duration, + ecn_counts: Option, + context: &mut Ctx, + publisher: &mut Pub, + ) -> Result<(), transport::Error> { // Update the largest acked packet if the largest packet acked in this frame is larger - let new_largest_packet = if self - .largest_acked_packet - .map_or(true, |pn| pn < largest_acked_packet_number) - { - self.largest_acked_packet = Some(largest_acked_packet_number); - true - } else { - false + let acked_new_largest_packet = match self.largest_acked_packet { + Some(current_largest) if current_largest > largest_acked_packet_number => false, + _ => { + self.largest_acked_packet = Some(largest_acked_packet_number); + true + } }; - self.largest_acked_packet = Some( - self.largest_acked_packet - .map_or(largest_acked_packet_number, |pn| { - pn.max(largest_acked_packet_number) - }), - ); - + let mut newly_acked_packets = SmallVec::< + [SentPacketInfo; ACKED_PACKETS_INITIAL_CAPACITY], + >::new(); let (largest_newly_acked, includes_ack_eliciting) = self.process_ack_range( &mut newly_acked_packets, timestamp, - &frame, + ranges, context, publisher, )?; @@ -363,7 +419,7 @@ impl Manager { largest_acked_packet_number, includes_ack_eliciting, timestamp, - &frame, + ack_delay, context, ); @@ -371,9 +427,9 @@ impl Manager { self.process_new_acked_packets( &newly_acked_packets, largest_newly_acked_info, - new_largest_packet, + acked_new_largest_packet, timestamp, - &frame, + ecn_counts, context, publisher, ); @@ -387,39 +443,27 @@ impl Manager { } // Process ack_range and return largest_newly_acked and if the packet is ack eliciting. - fn process_ack_range< - A: frame::ack::AckRanges, - Ctx: Context, - Pub: event::ConnectionPublisher, - >( + fn process_ack_range, Pub: event::ConnectionPublisher>( &mut self, newly_acked_packets: &mut SmallVec< [SentPacketInfo; ACKED_PACKETS_INITIAL_CAPACITY], >, timestamp: Timestamp, - frame: &frame::Ack, + ranges: impl Iterator, context: &mut Ctx, publisher: &mut Pub, ) -> Result<(Option>, bool), transport::Error> { let mut largest_newly_acked: Option> = None; let mut includes_ack_eliciting = false; - for ack_range in frame.ack_ranges() { - let (start, end) = ack_range.into_inner(); - - let acked_packets = PacketNumberRange::new( - self.space.new_packet_number(start), - self.space.new_packet_number(end), - ); - - context.validate_packet_ack(timestamp, &acked_packets)?; + for pn_range in ranges { + context.validate_packet_ack(timestamp, &pn_range)?; // notify components of packets acked - context.on_packet_ack(timestamp, &acked_packets); + context.on_packet_ack(timestamp, &pn_range); let mut newly_acked_range: Option<(PacketNumber, PacketNumber)> = None; - for (packet_number, acked_packet_info) in self.sent_packets.remove_range(acked_packets) - { + for (packet_number, acked_packet_info) in self.sent_packets.remove_range(pn_range) { newly_acked_packets.push(acked_packet_info); if largest_newly_acked.map_or(true, |(pn, _)| packet_number > pn) { @@ -457,13 +501,13 @@ impl Manager { Ok((largest_newly_acked, includes_ack_eliciting)) } - fn update_congestion_control>( + fn update_congestion_control>( &mut self, largest_newly_acked: PacketDetails, largest_acked_packet_number: PacketNumber, includes_ack_eliciting: bool, timestamp: Timestamp, - frame: &frame::Ack, + ack_delay: Duration, context: &mut Ctx, ) { let mut should_update_rtt = true; @@ -490,7 +534,7 @@ impl Manager { let latest_rtt = timestamp - largest_newly_acked_info.time_sent; let path = context.path_mut_by_id(largest_newly_acked_info.path_id); path.rtt_estimator.update_rtt( - frame.ack_delay(), + ack_delay, latest_rtt, timestamp, is_handshake_confirmed, @@ -507,11 +551,7 @@ impl Manager { } #[allow(clippy::too_many_arguments)] - fn process_new_acked_packets< - A: frame::ack::AckRanges, - Ctx: Context, - Pub: event::ConnectionPublisher, - >( + fn process_new_acked_packets, Pub: event::ConnectionPublisher>( &mut self, newly_acked_packets: &SmallVec< [SentPacketInfo; ACKED_PACKETS_INITIAL_CAPACITY], @@ -519,7 +559,7 @@ impl Manager { largest_newly_acked: SentPacketInfo, new_largest_packet: bool, timestamp: Timestamp, - frame: &frame::Ack, + ecn_counts: Option, context: &mut Ctx, publisher: &mut Pub, ) { @@ -585,7 +625,7 @@ impl Manager { if new_largest_packet { self.process_ecn( newly_acked_ecn_counts, - frame.ecn_counts, + ecn_counts, timestamp, context, publisher, diff --git a/quic/s2n-quic-transport/src/space/application.rs b/quic/s2n-quic-transport/src/space/application.rs index b172a3e21d..0e590b7713 100644 --- a/quic/s2n-quic-transport/src/space/application.rs +++ b/quic/s2n-quic-transport/src/space/application.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - ack::AckManager, + ack::{pending_ack_ranges::PendingAckRanges, AckManager}, connection::{self, ConnectionTransmissionContext, ProcessingError}, endpoint, path, path::{path_event, Path}, @@ -21,8 +21,8 @@ use s2n_quic_core::{ datagram::Endpoint, event::{self, ConnectionPublisher as _, IntoEvent}, frame::{ - ack::AckRanges, crypto::CryptoRef, stream::StreamRef, Ack, ConnectionClose, DataBlocked, - HandshakeDone, MaxData, MaxStreamData, MaxStreams, NewConnectionId, NewToken, + self, ack::AckRanges, crypto::CryptoRef, stream::StreamRef, Ack, ConnectionClose, + DataBlocked, HandshakeDone, MaxData, MaxStreamData, MaxStreams, NewConnectionId, NewToken, PathChallenge, PathResponse, ResetStream, RetireConnectionId, StopSending, StreamDataBlocked, StreamsBlocked, }, @@ -47,6 +47,8 @@ pub struct ApplicationSpace { /// The current state of the Spin bit /// TODO: Spin me pub spin_bit: SpinBit, + /// Aggregate ACK info stored for delayed processing + pub pending_ack_ranges: PendingAckRanges, /// The crypto suite for application data /// TODO: What about ZeroRtt? //= https://www.rfc-editor.org/rfc/rfc9001#section-6.3 @@ -98,6 +100,7 @@ impl ApplicationSpace { tx_packet_numbers: TxPacketNumbers::new(PacketNumberSpace::ApplicationData, now), ack_manager, spin_bit: SpinBit::Zero, + pending_ack_ranges: PendingAckRanges::default(), stream_manager, key_set, header_key, @@ -212,7 +215,7 @@ impl ApplicationSpace { outcome.bytes_progressed += (self.stream_manager.outgoing_bytes_progressed() - bytes_progressed).as_u64() as usize; - let (recovery_manager, mut recovery_context) = self.recovery( + let (recovery_manager, mut recovery_context, _) = self.recovery( handshake_status, context.local_id_registry, context.path_id, @@ -369,7 +372,7 @@ impl ApplicationSpace { self.ack_manager.on_timeout(timestamp); self.key_set.on_timeout(timestamp); - let (recovery_manager, mut context) = self.recovery( + let (recovery_manager, mut context, _) = self.recovery( handshake_status, local_id_registry, path_manager.active_path_id(), @@ -418,6 +421,7 @@ impl ApplicationSpace { ) -> ( &'a mut recovery::Manager, RecoveryContext<'a, Config>, + &'a mut PendingAckRanges, ) { ( &mut self.recovery_manager, @@ -431,6 +435,7 @@ impl ApplicationSpace { path_manager, tx_packet_numbers: &mut self.tx_packet_numbers, }, + &mut self.pending_ack_ranges, ) } @@ -545,6 +550,47 @@ impl ApplicationSpace { limits } + + // Store ACKs in PendingAckRanges for delayed processing + // + // Returns `Err` if the range was not inserted. + pub fn update_pending_acks( + &mut self, + frame: &frame::Ack, + pending_ack_ranges: &mut PendingAckRanges, + ) -> Result<(), ()> { + let range = frame.ack_ranges().into_iter().map(|f| { + PacketNumberRange::new( + PacketNumberSpace::ApplicationData.new_packet_number(*f.start()), + PacketNumberSpace::ApplicationData.new_packet_number(*f.end()), + ) + }); + pending_ack_ranges.extend(range, frame.ecn_counts, frame.ack_delay()) + } + + pub fn on_pending_ack_ranges( + &mut self, + timestamp: Timestamp, + path_id: path::Id, + path_manager: &mut path::Manager, + handshake_status: &mut HandshakeStatus, + local_id_registry: &mut connection::LocalIdRegistry, + publisher: &mut Pub, + ) -> Result<(), transport::Error> { + debug_assert!( + !self.pending_ack_ranges.is_empty(), + "pending_ack_ranges should be non-empty since connection indicated ack interest" + ); + + let (recovery_manager, mut context, pending_ack_ranges) = + self.recovery(handshake_status, local_id_registry, path_id, path_manager); + recovery_manager.on_pending_ack_ranges( + timestamp, + pending_ack_ranges, + &mut context, + publisher, + ) + } } impl timer::Provider for ApplicationSpace { @@ -700,10 +746,13 @@ impl PacketSpace for ApplicationSpace ) -> Result<(), transport::Error> { let path = &mut path_manager[path_id]; path.on_peer_validated(); - let (recovery_manager, mut context) = + let (recovery_manager, mut context, _) = self.recovery(handshake_status, local_id_registry, path_id, path_manager); - // TODO instead of processing ACKs, store the information and express interest + // TODO enable delayed ack processing. It might be possible to process + // the ACKs immediately if insertion into PendingAckRanges fails + // + // self.update_pending_acks(&frame, &mut self.pending_ack_ranges) recovery_manager.on_ack_frame(timestamp, frame, &mut context, publisher) } diff --git a/quic/s2n-quic-transport/src/space/mod.rs b/quic/s2n-quic-transport/src/space/mod.rs index 69e9a9503f..f6e5258732 100644 --- a/quic/s2n-quic-transport/src/space/mod.rs +++ b/quic/s2n-quic-transport/src/space/mod.rs @@ -444,11 +444,48 @@ impl PacketSpaceManager { pub fn retry_cid(&self) -> Option<&PeerId> { self.retry_cid.as_deref() } + + pub fn on_pending_ack_ranges( + &mut self, + timestamp: Timestamp, + path_id: path::Id, + path_manager: &mut path::Manager, + local_id_registry: &mut connection::LocalIdRegistry, + publisher: &mut Pub, + ) -> Result<(), transport::Error> { + debug_assert!( + self.application().is_some(), + "application space should exists since delay ACK processing is only enabled\ + post handshake complete and connection indicated ACK interest" + ); + debug_assert!( + !self.application().unwrap().pending_ack_ranges.is_empty(), + "pending_ack_ranges should be non-empty since connection indicated ACK interest" + ); + + if let Some((space, handshake_status)) = self.application_mut() { + space.on_pending_ack_ranges( + timestamp, + path_id, + path_manager, + handshake_status, + local_id_registry, + publisher, + )?; + } + + Ok(()) + } } impl ack::interest::Provider for PacketSpaceManager { #[inline] fn ack_interest(&self, query: &mut Q) -> ack::interest::Result { + if let Some(space) = self.application() { + if !space.pending_ack_ranges.is_empty() { + return query.on_interest(ack::interest::Interest::Immediate); + } + } query.on_interest(ack::interest::Interest::None) } }