Skip to content

Commit

Permalink
feat(s2n-quic-dc): update MTU on dc path when MTU is updated
Browse files Browse the repository at this point in the history
  • Loading branch information
WesleyRosenblum committed Sep 21, 2024
1 parent a88ae41 commit 4a9033f
Show file tree
Hide file tree
Showing 15 changed files with 198 additions and 64 deletions.
30 changes: 21 additions & 9 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl Map {
state.mark_live(self.state.cleaner.epoch());

let (sealer, credentials) = state.uni_sealer();
Some((sealer, credentials, state.parameters))
Some((sealer, credentials, state.parameters.clone()))
}

pub fn open_once(
Expand All @@ -362,7 +362,7 @@ impl Map {

let keys = state.bidi_local(features);

Some((keys, state.parameters))
Some((keys, state.parameters.clone()))
}

pub fn pair_for_credentials(
Expand All @@ -373,7 +373,7 @@ impl Map {
) -> Option<(Bidirectional, ApplicationParams)> {
let state = self.pre_authentication(credentials, control_out)?;

let params = state.parameters;
let params = state.parameters.clone();
let keys = state.bidi_remote(self.clone(), credentials, features);

Some((keys, params))
Expand Down Expand Up @@ -696,13 +696,15 @@ impl Entry {
secret: schedule::Secret,
sender: sender::State,
receiver: receiver::State,
mut parameters: ApplicationParams,
parameters: ApplicationParams,
rehandshake_time: Duration,
) -> Self {
// clamp max datagram size to a well-known value
parameters.max_datagram_size = parameters
.max_datagram_size
.min(crate::stream::MAX_DATAGRAM_SIZE as _);
let max_datagram_size = parameters.max_datagram_size.load(Ordering::Relaxed);
parameters.max_datagram_size.store(
max_datagram_size.min(crate::stream::MAX_DATAGRAM_SIZE as _),
Ordering::Relaxed,
);

assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
Self {
Expand Down Expand Up @@ -941,7 +943,7 @@ impl HandshakingPath {
Self {
peer: connection_info.remote_address.clone().into(),
dc_version: connection_info.dc_version,
parameters: connection_info.application_params,
parameters: connection_info.application_params.clone(),
endpoint_type,
secret: None,
map,
Expand Down Expand Up @@ -1035,12 +1037,22 @@ impl dc::Path for HandshakingPath {
.expect("peer tokens are only received after secrets are ready"),
sender,
receiver,
self.parameters,
self.parameters.clone(),
self.map.state.rehandshake_period,
);
let entry = Arc::new(entry);
self.map.insert(entry);
}

fn on_mtu_updated(&mut self, mtu: u16) {
let peers_guard = self.map.state.peers.guard();
if let Some(entry) = self.map.state.peers.get(&self.peer, &peers_guard) {
entry
.parameters
.max_datagram_size
.store(mtu, Ordering::Relaxed);
}
}
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use s2n_quic_core::{
inet::{ExplicitCongestionNotification, SocketAddress},
varint::VarInt,
};
use std::{io, sync::Arc};
use std::{
io,
sync::{atomic::Ordering, Arc},
};
use tracing::{debug_span, Instrument as _};

type Result<T = (), E = io::Error> = core::result::Result<T, E>;
Expand Down Expand Up @@ -193,7 +196,7 @@ where
let flow = flow::non_blocking::State::new(flow_offset);

let path = send::path::Info {
max_datagram_size: parameters.max_datagram_size,
max_datagram_size: parameters.max_datagram_size.load(Ordering::Relaxed),
send_quantum,
ecn: ExplicitCongestionNotification::Ect0,
next_expected_control_packet: VarInt::ZERO,
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/send/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use s2n_quic_core::{
varint::VarInt,
};
use slotmap::SlotMap;
use std::collections::{BinaryHeap, VecDeque};
use std::{
collections::{BinaryHeap, VecDeque},
sync::atomic::Ordering,
};
use tracing::{debug, trace};

pub mod probe;
Expand Down Expand Up @@ -118,7 +121,7 @@ pub struct PeerActivity {
impl State {
#[inline]
pub fn new(stream_id: stream::Id, params: &ApplicationParams) -> Self {
let max_datagram_size = params.max_datagram_size;
let max_datagram_size = params.max_datagram_size.load(Ordering::Relaxed);
let initial_max_data = params.remote_max_data;
let local_max_data = params.local_send_max_data;

Expand Down
20 changes: 17 additions & 3 deletions quic/s2n-quic-core/src/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
varint::VarInt,
};
use core::time::Duration;
use std::sync::atomic::{AtomicU16, Ordering};

mod disabled;
mod traits;
Expand Down Expand Up @@ -91,25 +92,38 @@ impl<'a> DatagramInfo<'a> {
}

/// Various settings relevant to the dc path
#[derive(Clone, Copy, Debug)]
#[derive(Debug)]
#[non_exhaustive]
pub struct ApplicationParams {
pub max_datagram_size: u16,
pub max_datagram_size: AtomicU16,
pub remote_max_data: VarInt,
pub local_send_max_data: VarInt,
pub local_recv_max_data: VarInt,
pub max_idle_timeout: Option<Duration>,
pub max_ack_delay: Duration,
}

impl Clone for ApplicationParams {
fn clone(&self) -> Self {
Self {
max_datagram_size: AtomicU16::new(self.max_datagram_size.load(Ordering::Relaxed)),
remote_max_data: Default::default(),
local_send_max_data: Default::default(),
local_recv_max_data: Default::default(),
max_idle_timeout: None,
max_ack_delay: Default::default(),
}
}
}

impl ApplicationParams {
pub fn new(
max_datagram_size: u16,
peer_flow_control_limits: &InitialFlowControlLimits,
limits: &Limits,
) -> Self {
Self {
max_datagram_size,
max_datagram_size: AtomicU16::new(max_datagram_size),
remote_max_data: peer_flow_control_limits.max_data,
local_send_max_data: limits.initial_stream_limits().max_data_bidi_local,
local_recv_max_data: limits.initial_stream_limits().max_data_bidi_remote,
Expand Down
4 changes: 4 additions & 0 deletions quic/s2n-quic-core/src/dc/disabled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ impl Path for () {
) {
unimplemented!()
}

fn on_mtu_updated(&mut self, _mtu: u16) {
unimplemented!()
}
}
16 changes: 13 additions & 3 deletions quic/s2n-quic-core/src/dc/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use core::time::Duration;
use std::sync::{
atomic::{AtomicU8, Ordering},
atomic::{AtomicU16, AtomicU8, Ordering},
Arc,
};

Expand All @@ -36,14 +36,19 @@ pub struct MockDcPath {
pub on_peer_stateless_reset_tokens_count: u8,
pub stateless_reset_tokens: Vec<stateless_reset::Token>,
pub peer_stateless_reset_tokens: Vec<stateless_reset::Token>,
pub mtu: u16,
}

impl dc::Endpoint for MockDcEndpoint {
type Path = MockDcPath;

fn new_path(&mut self, _connection_info: &ConnectionInfo) -> Option<Self::Path> {
fn new_path(&mut self, connection_info: &ConnectionInfo) -> Option<Self::Path> {
Some(MockDcPath {
stateless_reset_tokens: self.stateless_reset_tokens.clone(),
mtu: connection_info
.application_params
.max_datagram_size
.load(Ordering::Relaxed),
..Default::default()
})
}
Expand Down Expand Up @@ -76,10 +81,15 @@ impl dc::Path for MockDcPath {
self.peer_stateless_reset_tokens
.extend(stateless_reset_tokens);
}

fn on_mtu_updated(&mut self, mtu: u16) {
self.mtu = mtu
}
}

#[allow(clippy::declare_interior_mutable_const)]
pub const TEST_APPLICATION_PARAMS: ApplicationParams = ApplicationParams {
max_datagram_size: 1472,
max_datagram_size: AtomicU16::new(1472),
remote_max_data: VarInt::from_u32(1u32 << 25),
local_send_max_data: VarInt::from_u32(1u32 << 25),
local_recv_max_data: VarInt::from_u32(1u32 << 25),
Expand Down
10 changes: 10 additions & 0 deletions quic/s2n-quic-core/src/dc/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub trait Path: 'static + Send {
&mut self,
stateless_reset_tokens: impl Iterator<Item = &'a stateless_reset::Token>,
);

/// Called when the MTU has been updated for the path
fn on_mtu_updated(&mut self, mtu: u16);
}

impl<P: Path> Path for Option<P> {
Expand All @@ -69,4 +72,11 @@ impl<P: Path> Path for Option<P> {
path.on_peer_stateless_reset_tokens(stateless_reset_tokens)
}
}

#[inline]
fn on_mtu_updated(&mut self, max_datagram_size: u16) {
if let Some(path) = self {
path.on_mtu_updated(max_datagram_size)
}
}
}
43 changes: 34 additions & 9 deletions quic/s2n-quic-core/src/path/mtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,12 @@ impl Builder {
}
}

#[derive(Eq, PartialEq, Debug)]
pub enum MtuResult {
NoChange,
MtuUpdated(u16),
}

#[derive(Clone, Debug)]
pub struct Controller {
state: State,
Expand Down Expand Up @@ -608,7 +614,7 @@ impl Controller {
congestion_controller: &mut CC,
path_id: path::Id,
publisher: &mut Pub,
) {
) -> MtuResult {
if self.state.is_early_search_requested() && sent_bytes > self.base_plpmtu {
if self.is_next_probe_size_above_threshold() {
// Early probing has succeeded, but the max MTU is higher still so
Expand All @@ -629,10 +635,13 @@ impl Controller {
}

// no need to process anything in the disabled state
ensure!(self.state != State::Disabled);
ensure!(self.state != State::Disabled, MtuResult::NoChange);

// MTU probes are only sent in application data space
ensure!(packet_number.space().is_application_data());
ensure!(
packet_number.space().is_application_data(),
MtuResult::NoChange
);

if sent_bytes >= self.plpmtu
&& self
Expand Down Expand Up @@ -671,8 +680,12 @@ impl Controller {
cause: MtuUpdatedCause::ProbeAcknowledged,
search_complete: self.state.is_search_complete(),
});

return MtuResult::MtuUpdated(self.plpmtu);
}
}

MtuResult::NoChange
}

//= https://www.rfc-editor.org/rfc/rfc8899#section-3
Expand All @@ -690,12 +703,13 @@ impl Controller {
congestion_controller: &mut CC,
path_id: path::Id,
publisher: &mut Pub,
) {
) -> MtuResult {
// MTU probes are only sent in the application data space, but since early packet
// spaces will use the `InitialMtu` prior to MTU probing being enabled, we need
// to check for potentially MTU-related packet loss if an early search has been requested
ensure!(
self.state.is_early_search_requested() || packet_number.space().is_application_data()
self.state.is_early_search_requested() || packet_number.space().is_application_data(),
MtuResult::NoChange
);

match &self.state {
Expand Down Expand Up @@ -725,7 +739,9 @@ impl Controller {
mtu: self.plpmtu,
cause: MtuUpdatedCause::InitialMtuPacketLost,
search_complete: self.state.is_search_complete(),
})
});

return MtuResult::MtuUpdated(self.plpmtu);
}
State::Searching(probe_pn, _) if *probe_pn == packet_number => {
// The MTU probe was lost
Expand Down Expand Up @@ -763,10 +779,17 @@ impl Controller {
}

if self.black_hole_counter > BLACK_HOLE_THRESHOLD {
self.on_black_hole_detected(now, congestion_controller, path_id, publisher);
return self.on_black_hole_detected(
now,
congestion_controller,
path_id,
publisher,
);
}
}
}

MtuResult::NoChange
}

/// Gets the currently validated maximum QUIC datagram size
Expand Down Expand Up @@ -837,7 +860,7 @@ impl Controller {
congestion_controller: &mut CC,
path_id: path::Id,
publisher: &mut Pub,
) {
) -> MtuResult {
self.black_hole_counter = Default::default();
self.largest_acked_mtu_sized_packet = None;
// Reset the plpmtu back to the base_plpmtu and notify the congestion controller
Expand All @@ -856,7 +879,9 @@ impl Controller {
mtu: self.plpmtu,
cause: MtuUpdatedCause::Blackhole,
search_complete: self.state.is_search_complete(),
})
});

MtuResult::MtuUpdated(self.plpmtu)
}

/// Arm the PMTU Raise Timer if there is still room to increase the
Expand Down
Loading

0 comments on commit 4a9033f

Please sign in to comment.