Skip to content

Commit

Permalink
feat(s2n-quic-transport): impl interface so connections can aggregate…
Browse files Browse the repository at this point in the history
…/process ack info (#1293)
  • Loading branch information
toidiu authored May 5, 2022
1 parent 4d8e936 commit 701e90f
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 96 deletions.
1 change: 0 additions & 1 deletion quic/s2n-quic-transport/src/ack/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum Interest {
None,
#[allow(dead_code)]
Immediate,
}

Expand Down
3 changes: 1 addition & 2 deletions quic/s2n-quic-transport/src/ack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ mod ack_manager;
pub(crate) mod ack_ranges;
mod ack_transmission_state;
pub mod interest;
#[allow(dead_code)]
mod pending_ack_ranges;
pub(crate) mod pending_ack_ranges;

#[cfg(test)]
mod tests;
60 changes: 43 additions & 17 deletions quic/s2n-quic-transport/src/ack/pending_ack_ranges.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{ack::ack_ranges::AckRanges, interval_set};
use crate::ack::ack_ranges::AckRanges;
use core::time::Duration;
use s2n_quic_core::{
frame::ack::EcnCounts,
packet::number::{PacketNumber, PacketNumberRange},
};

/// Stores ACK ranges pending processing
/// Stores aggregated ACK info for delayed processing
#[derive(Clone, Debug, Default)]
pub struct PendingAckRanges {
ranges: AckRanges,
ack_ranges: AckRanges,
ecn_counts: EcnCounts,
ack_delay: Duration,
}

impl PendingAckRanges {
#[inline]
pub fn new(ranges: AckRanges, ecn_counts: EcnCounts, ack_delay: Duration) -> Self {
pub fn new(ack_ranges: AckRanges, ecn_counts: EcnCounts, ack_delay: Duration) -> Self {
PendingAckRanges {
ranges,
ack_ranges,
ecn_counts,
ack_delay,
}
Expand Down Expand Up @@ -48,7 +48,7 @@ impl PendingAckRanges {
let mut did_insert = true;
// TODO: add metrics if ack ranges are being dropped
for range in acked_packets {
did_insert &= self.ranges.insert_packet_number_range(range).is_ok()
did_insert &= self.ack_ranges.insert_packet_number_range(range).is_ok()
}

match did_insert {
Expand All @@ -57,24 +57,50 @@ impl PendingAckRanges {
}
}

/// Returns an iterator over all of the values contained in the ranges `IntervalSet`.
/// Returns an iterator over all values in the `AckRanges`
#[inline]
pub fn iter(&self) -> interval_set::IntervalIter<PacketNumber> {
self.ranges.intervals()
pub fn iter(&self) -> impl Iterator<Item = PacketNumberRange> + '_ {
self.ack_ranges
.inclusive_ranges()
.into_iter()
.map(|ack_range| PacketNumberRange::new(*ack_range.start(), *ack_range.end()))
}

/// Returns `EcnCounts` aggregated over all the pending ACKs
#[inline]
pub fn ecn_counts(&self) -> Option<EcnCounts> {
if self.ack_ranges.is_empty() {
None
} else {
Some(self.ecn_counts)
}
}

/// Returns the ACK delay associated with all the pending ACKs
#[inline]
pub fn ack_delay(&self) -> Duration {
self.ack_delay
}

/// Returns the largest `PacketNumber` stored in the AckRanges.
///
/// If no items are present in the set, `None` is returned.
pub fn max_value(&self) -> Option<PacketNumber> {
self.ack_ranges.max_value()
}

/// Clear the ack ranges and reset values
#[inline]
pub fn clear(&mut self) {
self.ranges.clear();
self.ack_ranges.clear();
self.ecn_counts = EcnCounts::default();
self.ack_delay = Duration::default();
}

/// Returns if ack ranges are being tracked
#[inline]
pub fn is_empty(&self) -> bool {
self.ranges.is_empty()
self.ack_ranges.is_empty()
}
}

Expand Down Expand Up @@ -114,7 +140,7 @@ mod tests {
assert_eq!(pending_ack_ranges.ack_delay, now);
assert_eq!(pending_ack_ranges.ecn_counts, ecn_counts);
assert!(!pending_ack_ranges.is_empty());
assert_eq!(pending_ack_ranges.ranges.interval_len(), 1);
assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 1);

// insert new range with updated ack_delay and ecn_counts
now = now.saturating_add(Duration::from_millis(1));
Expand All @@ -131,17 +157,17 @@ mod tests {
assert_eq!(pending_ack_ranges.ack_delay, now);
assert_eq!(pending_ack_ranges.ecn_counts, ecn_counts);
assert!(!pending_ack_ranges.is_empty());
assert_eq!(pending_ack_ranges.ranges.interval_len(), 2);
assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 2);

// ensure pending_ack_ranges clear functionality works
{
assert!(!pending_ack_ranges.is_empty());
pending_ack_ranges.clear();

assert!(pending_ack_ranges.is_empty());
assert_eq!(pending_ack_ranges.ranges.interval_len(), 0);
assert!(!pending_ack_ranges.ranges.contains(&pn_a));
assert!(!pending_ack_ranges.ranges.contains(&pn_b));
assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 0);
assert!(!pending_ack_ranges.ack_ranges.contains(&pn_a));
assert!(!pending_ack_ranges.ack_ranges.contains(&pn_b));
}
}

Expand Down Expand Up @@ -189,7 +215,7 @@ mod tests {
assert!(pending_ack_ranges
.extend(range_1.into_iter(), Some(ecn_counts), now)
.is_ok());
assert_eq!(pending_ack_ranges.ranges.interval_len(), 1);
assert_eq!(pending_ack_ranges.ack_ranges.interval_len(), 1);
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ intrusive_adapter!(WaitingForConnectionIdAdapter<C, L> = Arc<ConnectionNode<C, L
// Intrusive list adapter for managing the list of
// `waiting_for_ack` connections
intrusive_adapter!(WaitingForAckAdapter<C, L> = Arc<ConnectionNode<C, L>>: ConnectionNode<C, L> {
waiting_for_connection_id_link: LinkedListLink
waiting_for_ack_link: LinkedListLink
} where C: connection::Trait, L: connection::Lock<C>);

// Intrusive red black tree adapter for managing a list of `waiting_for_timeout` connections
Expand Down Expand Up @@ -957,6 +957,7 @@ impl<C: connection::Trait, L: connection::Lock<C>> ConnectionContainer<C, L> {

/// Iterates over all `Connection`s which are waiting to process ACKs,
/// and executes the given function on each `Connection`
#[allow(dead_code)]
pub fn iterate_ack_list<F>(&mut self, mut func: F)
where
F: FnMut(&mut C),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ impl connection::Trait for TestConnection {
Ok(())
}

fn on_process_acks(&mut self) {}
fn on_pending_ack_ranges(
&mut self,
_timestamp: Timestamp,
_subscriber: &mut <Self::Config as endpoint::Config>::EventSubscriber,
) -> Result<(), connection::Error> {
Ok(())
}

fn on_wakeup(
&mut self,
Expand Down
26 changes: 25 additions & 1 deletion quic/s2n-quic-transport/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,31 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
}

/// Process ACKs for the `Connection`.
fn on_process_acks(&mut self) {}
fn on_pending_ack_ranges(
&mut self,
timestamp: Timestamp,
subscriber: &mut Config::EventSubscriber,
) -> Result<(), connection::Error> {
let mut publisher = self.event_context.publisher(timestamp, subscriber);

// TODO: care should be taken to only delay ACK processing for the active path.
// However, the active path could change so it might be necessary to track the
// active path across some ACK delay processing.
let path_id = self.path_manager.active_path_id();
self.space_manager
.on_pending_ack_ranges(
timestamp,
path_id,
&mut self.path_manager,
&mut self.local_id_registry,
&mut publisher,
)
.map_err(|err| {
// TODO: publish metrics

err.into()
})
}

/// Handles all external wakeups on the [`Connection`].
fn on_wakeup(
Expand Down
7 changes: 6 additions & 1 deletion quic/s2n-quic-transport/src/connection/connection_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ pub trait ConnectionTrait: 'static + Send + Sized {
subscriber: &mut <Self::Config as endpoint::Config>::EventSubscriber,
) -> Result<(), connection::Error>;

fn on_process_acks(&mut self);
/// Process pendings ACKs for the `Connection`.
fn on_pending_ack_ranges(
&mut self,
timestamp: Timestamp,
subscriber: &mut <Self::Config as endpoint::Config>::EventSubscriber,
) -> Result<(), connection::Error>;

/// Handles all external wakeups on the [`Connection`].
fn on_wakeup(
Expand Down
36 changes: 23 additions & 13 deletions quic/s2n-quic-transport/src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,34 @@ impl<Cfg: Config> s2n_quic_core::endpoint::Endpoint for Endpoint<Cfg> {
let mut now: Option<Timestamp> = None;

for entry in entries.iter_mut() {
if let Some((header, payload)) = entry.read(&local_address) {
let timestamp = match now {
Some(now) => now,
_ => {
let time = clock.get_time();
now = Some(time);
time
}
};
let timestamp = match now {
Some(time) => time,
None => {
now = Some(clock.get_time());
now.expect("value should be set")
}
};

if let Some((header, payload)) = entry.read(&local_address) {
self.receive_datagram(&header, payload, timestamp)
}
}

// process ACKs on Connections with interest
self.connections.iterate_ack_list(|connection| {
connection.on_process_acks();
});
// TODO process pending acks
// let endpoint_context = self.config.context();
// // process ACKs on Connections with interest
// self.connections.iterate_ack_list(|connection| {
// let timestamp = match now {
// Some(time) => time,
// None => {
// now = Some(clock.get_time());
// now.expect("value should be set")
// }
// };
//
// // handle error and potentially close the connection
// connection.on_pending_ack_ranges(timestamp, endpoint_context.event_subscriber);
// });

let len = entries.len();
queue.finish(len);
Expand Down
Loading

0 comments on commit 701e90f

Please sign in to comment.