Skip to content

Commit

Permalink
Replace congestion control implementations with google/quiche congest…
Browse files Browse the repository at this point in the history
…ion control implementations
  • Loading branch information
vkrasnov committed Mar 12, 2024
1 parent 23e2ce6 commit 2ca0301
Show file tree
Hide file tree
Showing 42 changed files with 8,660 additions and 7,504 deletions.
1 change: 1 addition & 0 deletions apps/src/bin/quiche-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ fn main() {
client.loss_rate = loss_rate;
}

#[allow(deprecated)]
let max_send_burst =
client.conn.send_quantum().min(client.max_send_burst) /
client.max_datagram_size *
Expand Down
1 change: 1 addition & 0 deletions quiche/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cmake = "0.1"

[dependencies]
either = { version = "1.8", default-features = false }
enum_dispatch = "0.3"
log = { version = "0.4", features = ["std"] }
libc = "0.2"
libm = "0.2"
Expand Down
10 changes: 8 additions & 2 deletions quiche/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,10 @@ pub extern fn quiche_conn_peer_streams_left_uni(conn: &Connection) -> u64 {

#[no_mangle]
pub extern fn quiche_conn_send_quantum(conn: &Connection) -> size_t {
conn.send_quantum() as size_t
#[allow(deprecated)]
{
conn.send_quantum() as size_t
}
}

#[no_mangle]
Expand Down Expand Up @@ -1540,7 +1543,10 @@ pub extern fn quiche_conn_send_quantum_on_path(
let local = std_addr_from_c(local, local_len);
let peer = std_addr_from_c(peer, peer_len);

conn.send_quantum_on_path(local, peer) as size_t
#[allow(deprecated)]
{
conn.send_quantum_on_path(local, peer) as size_t
}
}

#[no_mangle]
Expand Down
144 changes: 51 additions & 93 deletions quiche/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ use std::time;
use std::sync::Arc;

use std::net::SocketAddr;

use std::str::FromStr;

use std::collections::HashSet;
Expand Down Expand Up @@ -3502,7 +3501,6 @@ impl Connection {
}
}

let is_app_limited = self.delivery_rate_check_if_app_limited();
let n_paths = self.paths.len();
let path = self.paths.get_mut(send_pid)?;
let flow_control = &mut self.flow_control;
Expand Down Expand Up @@ -3598,16 +3596,13 @@ impl Connection {
// in the output buffer.
//
// This usually happens when we try to send a new packet but
// failed because cwnd is almost full. In such case app_limited
// is set to false here to make cwnd grow when ACK is received.
path.recovery.update_app_limited(false);
// failed because cwnd is almost full.
return Err(Error::Done);
},
}

// Make sure there is enough space for the minimum payload length.
if left < PAYLOAD_MIN_LEN {
path.recovery.update_app_limited(false);
return Err(Error::Done);
}

Expand Down Expand Up @@ -3738,10 +3733,6 @@ impl Connection {
//
// This usually happens when we try to send a new packet
// but failed because cwnd is almost full.
//
// In such case app_limited is set to false here to make
// cwnd grow when ACK is received.
active_path.recovery.update_app_limited(false);
return Err(Error::Done);
},
}
Expand Down Expand Up @@ -4354,10 +4345,12 @@ impl Connection {
path.recovery.ping_sent(epoch);
}

if !has_data && cwnd_available > frame::MAX_STREAM_OVERHEAD {
path.recovery.on_app_limited();
}

if frames.is_empty() {
// When we reach this point we are not able to write more, so set
// app_limited to false.
path.recovery.update_app_limited(false);
// When we reach this point we are not able to write more
return Err(Error::Done);
}

Expand Down Expand Up @@ -4479,26 +4472,12 @@ impl Connection {
let sent_pkt = recovery::Sent {
pkt_num: pn,
frames,
time_sent: now,
time_acked: None,
time_lost: None,
size: if ack_eliciting { written } else { 0 },
ack_eliciting,
in_flight,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data,
pmtud: pmtud_probe,
};

if in_flight && is_app_limited {
path.recovery.delivery_rate_update_app_limited(true);
}

self.next_pkt_num += 1;

let handshake_status = recovery::HandshakeStatus {
Expand Down Expand Up @@ -4532,10 +4511,6 @@ impl Connection {
path.sent_count += 1;
path.sent_bytes += written as u64;

if self.dgram_send_queue.byte_size() > path.recovery.cwnd_available() {
path.recovery.update_app_limited(false);
}

path.max_send_bytes = path.max_send_bytes.saturating_sub(written);

// On the client, drop initial state after sending an Handshake packet.
Expand Down Expand Up @@ -4574,6 +4549,7 @@ impl Connection {
/// offloading mechanisms as the maximum limit for outgoing aggregates of
/// multiple packets.
#[inline]
#[deprecated = "For accurate pacing use `get_next_release_time` instead"]
pub fn send_quantum(&self) -> usize {
match self.paths.get_active() {
Ok(p) => p.recovery.send_quantum(),
Expand All @@ -4592,6 +4568,7 @@ impl Connection {
///
/// If the (`local_addr`, peer_addr`) 4-tuple relates to a non-existing
/// path, this method returns 0.
#[deprecated = "For accurate pacing use `get_next_release_time` instead"]
pub fn send_quantum_on_path(
&self, local_addr: SocketAddr, peer_addr: SocketAddr,
) -> usize {
Expand All @@ -4602,6 +4579,36 @@ impl Connection {
.unwrap_or(0)
}

/// Get the preferred send time for the next packet.
#[inline]
pub fn get_next_release_time(&self) -> Option<ReleaseDecision> {
self.paths
.get_active()
.map(|path| path.recovery.get_next_release_time())
.ok()
}

/// Get the preferred send time for the next packet.
#[inline]
pub fn get_next_release_time_on_path(
&self, local_addr: SocketAddr, peer_addr: SocketAddr,
) -> Option<ReleaseDecision> {
self.paths
.path_id_from_addrs(&(local_addr, peer_addr))
.and_then(|pid| self.paths.get(pid).ok())
.map(|path| path.recovery.get_next_release_time())
}

/// The maximum pacing into the future, equals 1/8 of the smoothed rtt, but
/// not greater than 5ms
pub fn max_release_into_future(&self) -> time::Duration {
self.paths
.get_active()
.map(|p| p.recovery.rtt().mul_f64(0.125))
.unwrap_or(time::Duration::from_millis(1))
.min(time::Duration::from_millis(5))
}

/// Reads contiguous data from a stream into the provided slice.
///
/// The slice must be sized by the caller and will be populated up to its
Expand Down Expand Up @@ -5567,14 +5574,6 @@ impl Connection {

self.dgram_send_queue.push(buf.to_vec())?;

let active_path = self.paths.get_active_mut()?;

if self.dgram_send_queue.byte_size() >
active_path.recovery.cwnd_available()
{
active_path.recovery.update_app_limited(false);
}

Ok(())
}

Expand All @@ -5597,14 +5596,6 @@ impl Connection {

self.dgram_send_queue.push(buf)?;

let active_path = self.paths.get_active_mut()?;

if self.dgram_send_queue.byte_size() >
active_path.recovery.cwnd_available()
{
active_path.recovery.update_app_limited(false);
}

Ok(())
}

Expand Down Expand Up @@ -6843,21 +6834,15 @@ impl Connection {

let handshake_status = self.handshake_status();

let is_app_limited = self.delivery_rate_check_if_app_limited();

for (_, p) in self.paths.iter_mut() {
if is_app_limited {
p.recovery.delivery_rate_update_app_limited(true);
}

let (lost_packets, lost_bytes) = p.recovery.on_ack_received(
&ranges,
ack_delay,
epoch,
handshake_status,
now,
&self.trace_id,
)?;
);

self.lost_count += lost_packets;
self.lost_bytes += lost_bytes as u64;
Expand Down Expand Up @@ -7384,35 +7369,6 @@ impl Connection {
cmp::min(cwin_available, self.max_tx_data - self.tx_data) as usize;
}

fn delivery_rate_check_if_app_limited(&self) -> bool {
// Enter the app-limited phase of delivery rate when these conditions
// are met:
//
// - The remaining capacity is higher than available bytes in cwnd (there
// is more room to send).
// - New data since the last send() is smaller than available bytes in
// cwnd (we queued less than what we can send).
// - There is room to send more data in cwnd.
//
// In application-limited phases the transmission rate is limited by the
// application rather than the congestion control algorithm.
//
// Note that this is equivalent to CheckIfApplicationLimited() from the
// delivery rate draft. This is also separate from `recovery.app_limited`
// and only applies to delivery rate calculation.
let cwin_available = self
.paths
.iter()
.filter(|&(_, p)| p.active())
.map(|(_, p)| p.recovery.cwnd_available())
.sum();

((self.tx_buffered + self.dgram_send_queue_byte_size()) < cwin_available) &&
(self.tx_data.saturating_sub(self.last_tx_data)) <
cwin_available as u64 &&
cwin_available > 0
}

fn set_initial_dcid(
&mut self, cid: ConnectionId<'static>, reset_token: Option<u128>,
path_id: usize,
Expand Down Expand Up @@ -13874,20 +13830,13 @@ mod tests {
assert_eq!(pipe.client.dgram_send(&send_buf), Ok(()));
}

assert!(!pipe
.client
.paths
.get_active()
.expect("no active")
.recovery
.app_limited());
assert_eq!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);

let (len, _) = pipe.client.send(&mut buf).unwrap();

assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
assert!(!pipe
assert!(pipe
.client
.paths
.get_active()
Expand All @@ -13898,6 +13847,14 @@ mod tests {
assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));

let flight = testing::emit_flight(&mut pipe.client).unwrap();

assert!(!pipe
.client
.paths
.get_active()
.expect("no active")
.recovery
.app_limited());
testing::process_flight(&mut pipe.server, flight).unwrap();

let flight = testing::emit_flight(&mut pipe.server).unwrap();
Expand All @@ -13906,7 +13863,7 @@ mod tests {
assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);

assert!(!pipe
assert!(pipe
.client
.paths
.get_active()
Expand Down Expand Up @@ -16800,7 +16757,8 @@ pub use crate::path::PathEvent;
pub use crate::path::PathStats;
pub use crate::path::SocketAddrIter;

pub use crate::recovery::congestion::CongestionControlAlgorithm;
pub use crate::recovery::CongestionControlAlgorithm;
pub use crate::recovery::ReleaseDecision;

pub use crate::stream::StreamIter;

Expand Down
Loading

0 comments on commit 2ca0301

Please sign in to comment.