Skip to content

Commit

Permalink
fix(s2n-quic-transport): determine GSO capability from the segment_len (
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored May 5, 2022
1 parent 701e90f commit 3f9be0a
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 79 deletions.
84 changes: 68 additions & 16 deletions quic/s2n-quic-core/src/io/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub enum Error {
/// The provided message did not write a payload
EmptyPayload,

/// The provided buffer was too small for the desired payload
UndersizedBuffer,

/// The transmission queue is at capacity
AtCapacity,
}
Expand Down Expand Up @@ -96,10 +99,47 @@ pub trait Message {
fn ipv6_flow_label(&mut self) -> u32;

/// Returns true if the packet can be used in a GSO packet
fn can_gso(&self) -> bool;
fn can_gso(&self, segment_len: usize) -> bool;

/// Writes the payload of the message to an output buffer
fn write_payload(&mut self, buffer: &mut [u8], gso_offset: usize) -> usize;
fn write_payload(&mut self, buffer: PayloadBuffer, gso_offset: usize) -> Result<usize, Error>;
}

#[derive(Debug)]
pub struct PayloadBuffer<'a>(&'a mut [u8]);

impl<'a> PayloadBuffer<'a> {
#[inline]
pub fn new(bytes: &'a mut [u8]) -> Self {
Self(bytes)
}

/// # Safety
///
/// This function should only be used in the case that the writer has its own safety checks in place
#[inline]
pub unsafe fn into_mut_slice(self) -> &'a mut [u8] {
self.0
}

#[track_caller]
#[inline]
pub fn write(&mut self, bytes: &[u8]) -> Result<usize, Error> {
if bytes.is_empty() {
return Err(Error::EmptyPayload);
}

if let Some(buffer) = self.0.get_mut(0..bytes.len()) {
buffer.copy_from_slice(bytes);
Ok(bytes.len())
} else {
debug_assert!(
false,
"tried to write more bytes than was available in the buffer"
);
Err(Error::UndersizedBuffer)
}
}
}

impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message for (Handle, Payload) {
Expand All @@ -121,19 +161,16 @@ impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message for (Handle, Payload) {
0
}

fn can_gso(&self) -> bool {
true
fn can_gso(&self, segment_len: usize) -> bool {
segment_len >= self.1.as_ref().len()
}

fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize {
let payload = self.1.as_ref();
let len = payload.len();
if let Some(buffer) = buffer.get_mut(..len) {
buffer.copy_from_slice(payload);
len
} else {
0
}
fn write_payload(
&mut self,
mut buffer: PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, Error> {
buffer.write(self.1.as_ref())
}
}

Expand All @@ -158,9 +195,24 @@ mod tests {
assert_eq!(message.ecn(), Default::default());
assert_eq!(message.delay(), Default::default());
assert_eq!(message.ipv6_flow_label(), 0);
assert_eq!(message.write_payload(&mut buffer[..], 0), 3);
assert_eq!(
message.write_payload(PayloadBuffer::new(&mut buffer[..]), 0),
Ok(3)
);
}

#[test]
#[should_panic]
fn message_tuple_undersized_test() {
let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
let tuple = path::Tuple {
remote_address,
local_address,
};
let mut message = (tuple, [1u8, 2, 3]);

// assert an empty buffer doesn't panic
assert_eq!(message.write_payload(&mut [][..], 0), 0);
// assert an undersized buffer panics in debug
let _ = message.write_payload(PayloadBuffer::new(&mut [][..]), 0);
}
}
8 changes: 1 addition & 7 deletions quic/s2n-quic-platform/src/io/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,7 @@ impl io::tx::Entry for Packet {
{
self.path.remote_address = message.path_handle().remote_address;
self.ecn = message.ecn();
let len = message.write_payload(&mut self.payload, 0);

if len == 0 {
return Err(io::tx::Error::EmptyPayload);
}

Ok(len)
message.write_payload(io::tx::PayloadBuffer::new(&mut self.payload), 0)
}

fn payload(&self) -> &[u8] {
Expand Down
7 changes: 1 addition & 6 deletions quic/s2n-quic-platform/src/message/mmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,7 @@ impl tx::Entry for Message {
) -> Result<usize, tx::Error> {
let payload = MessageTrait::payload_mut(self);

let len = message.write_payload(payload, 0);

// don't send empty payloads
if len == 0 {
return Err(tx::Error::EmptyPayload);
}
let len = message.write_payload(tx::PayloadBuffer::new(payload), 0)?;

unsafe {
debug_assert!(len <= payload.len());
Expand Down
7 changes: 1 addition & 6 deletions quic/s2n-quic-platform/src/message/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,7 @@ impl tx::Entry for Message {
) -> Result<usize, tx::Error> {
let payload = MessageTrait::payload_mut(self);

let len = message.write_payload(payload, 0);

// don't send empty payloads
if len == 0 {
return Err(tx::Error::EmptyPayload);
}
let len = message.write_payload(tx::PayloadBuffer::new(payload), 0)?;

unsafe {
debug_assert!(len <= payload.len());
Expand Down
20 changes: 15 additions & 5 deletions quic/s2n-quic-platform/src/message/queue/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl<'a, Message: message::Message, B> Slice<'a, Message, B> {
let prev_message = &mut self.messages[gso.index];
// check to make sure the message can be GSO'd and can be included in the same
// GSO payload as the previous message
if !(message.can_gso() && prev_message.can_gso(&mut message)) {
if !(message.can_gso(gso.size) && prev_message.can_gso(&mut message)) {
self.flush_gso();
return Ok(Err(message));
}
Expand All @@ -178,16 +178,26 @@ impl<'a, Message: message::Message, B> Slice<'a, Message, B> {

// allow the message to write up to `gso.size` bytes
let buffer = &mut message::Message::payload_mut(prev_message)[payload_len..];
let buffer = tx::PayloadBuffer::new(buffer);

match message.write_payload(buffer, gso.count) {
0 => {
match message.write_payload(buffer, gso.count).and_then(|size| {
// we don't want to send empty packets
if size == 0 {
Err(tx::Error::EmptyPayload)
} else {
Ok(size)
}
}) {
Err(err) => {
unsafe {
// revert the len to what it was before
prev_message.set_payload_len(payload_len);
}
Err(tx::Error::EmptyPayload)
Err(err)
}
size => {
Ok(size) => {
debug_assert_ne!(size, 0, "payloads should never be empty");

unsafe {
debug_assert!(
gso.size >= size,
Expand Down
7 changes: 1 addition & 6 deletions quic/s2n-quic-platform/src/message/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,7 @@ impl tx::Entry for Message {
) -> Result<usize, tx::Error> {
let payload = MessageTrait::payload_mut(self);

let len = message.write_payload(payload, 0);

// don't send empty payloads
if len == 0 {
return Err(tx::Error::EmptyPayload);
}
let len = message.write_payload(tx::PayloadBuffer::new(payload), 0)?;

unsafe {
debug_assert!(len <= payload.len());
Expand Down
24 changes: 13 additions & 11 deletions quic/s2n-quic-transport/src/connection/close_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message
}

#[inline]
fn can_gso(&self) -> bool {
true
fn can_gso(&self, segment_len: usize) -> bool {
segment_len >= self.packet.len()
}

#[inline]
Expand All @@ -162,9 +162,11 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message
}

#[inline]
fn write_payload(&mut self, buffer: &mut [u8], gso_offset: usize) -> usize {
let len = self.packet.len();

fn write_payload(
&mut self,
mut buffer: tx::PayloadBuffer,
gso_offset: usize,
) -> Result<usize, tx::Error> {
//= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1
//# | Note: Allowing retransmission of a closing packet is an
//# | exception to the requirement that a new packet number be used
Expand All @@ -173,7 +175,7 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message
//# | control, which are not expected to be relevant for a closed
//# | connection. Retransmitting the final packet requires less
//# | state.
buffer[..len].copy_from_slice(self.packet);
let len = buffer.write(self.packet)?;

self.path.on_bytes_transmitted(len);
*self.transmission = TransmissionState::Idle;
Expand All @@ -184,7 +186,7 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message
gso_offset,
});

len
Ok(len)
}
}

Expand Down Expand Up @@ -361,9 +363,9 @@ mod tests {
// transmit an initial packet
assert!(sender.can_transmit(path.transmission_constraint()));
let now = s2n_quic_platform::time::now();
sender
let _ = sender
.transmission(&mut path, now, &mut publisher)
.write_payload(&mut buffer, 0);
.write_payload(tx::PayloadBuffer::new(&mut buffer), 0);

for (gap, packet_size) in events {
// get the next timer event
Expand All @@ -383,9 +385,9 @@ mod tests {
for _ in 0..3 {
let interest = sender.get_transmission_interest();
if interest.can_transmit(path.transmission_constraint()) {
sender
let _ = sender
.transmission(&mut path, now, &mut publisher)
.write_payload(&mut buffer, 0);
.write_payload(tx::PayloadBuffer::new(&mut buffer), 0);
transmission_count += 1;
}
}
Expand Down
24 changes: 20 additions & 4 deletions quic/s2n-quic-transport/src/connection/transmission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,31 @@ impl<'a, 'sub, Config: endpoint::Config> tx::Message for ConnectionTransmission<
}

#[inline]
fn can_gso(&self) -> bool {
fn can_gso(&self, segment_len: usize) -> bool {
if let Some(min_packet_len) = self.context.min_packet_len {
if segment_len < min_packet_len {
return false;
}
}

// If a packet can be GSO'd it means it's limited to the previously written packet
// size. This becomes a problem for MTU probes where they will likely exceed that amount.
// As such, if we're probing we want to let the IO layer know to not GSO the current
// packet.
!self.context.transmission_mode.is_mtu_probing()
}

fn write_payload(&mut self, buffer: &mut [u8], gso_offset: usize) -> usize {
fn write_payload(
&mut self,
buffer: tx::PayloadBuffer,
gso_offset: usize,
) -> Result<usize, tx::Error> {
let space_manager = &mut self.space_manager;
let buffer = unsafe {
// the WriterContext has its own checks for buffer capacity so convert `buffer` into a
// `&mut [u8]`
buffer.into_mut_slice()
};

let mtu = self
.context
Expand Down Expand Up @@ -361,9 +376,10 @@ impl<'a, 'sub, Config: endpoint::Config> tx::Message for ConnectionTransmission<
len: datagram_len as u16,
gso_offset,
});
Ok(datagram_len)
} else {
Err(tx::Error::EmptyPayload)
}

datagram_len
}
}

Expand Down
14 changes: 8 additions & 6 deletions quic/s2n-quic-transport/src/endpoint/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,16 @@ impl<Path: path::Handle> tx::Message for &Transmission<Path> {
}

#[inline]
fn can_gso(&self) -> bool {
true
fn can_gso(&self, segment_len: usize) -> bool {
segment_len >= self.as_ref().len()
}

#[inline]
fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize {
let packet = self.as_ref();
buffer[..packet.len()].copy_from_slice(packet);
packet.len()
fn write_payload(
&mut self,
mut buffer: tx::PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, tx::Error> {
buffer.write(self.as_ref())
}
}
14 changes: 8 additions & 6 deletions quic/s2n-quic-transport/src/endpoint/stateless_reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,16 @@ impl<Path: path::Handle> tx::Message for &Transmission<Path> {
}

#[inline]
fn can_gso(&self) -> bool {
true
fn can_gso(&self, segment_len: usize) -> bool {
segment_len >= self.as_ref().len()
}

#[inline]
fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize {
let packet = self.as_ref();
buffer[..packet.len()].copy_from_slice(packet);
packet.len()
fn write_payload(
&mut self,
mut buffer: tx::PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, tx::Error> {
buffer.write(self.as_ref())
}
}
14 changes: 8 additions & 6 deletions quic/s2n-quic-transport/src/endpoint/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,17 @@ impl<Path: path::Handle> tx::Message for &Transmission<Path> {
}

#[inline]
fn can_gso(&self) -> bool {
true
fn can_gso(&self, segment_len: usize) -> bool {
segment_len >= self.as_ref().len()
}

#[inline]
fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize {
let packet = self.as_ref();
buffer[..packet.len()].copy_from_slice(packet);
packet.len()
fn write_payload(
&mut self,
mut buffer: tx::PayloadBuffer,
_gso_offset: usize,
) -> Result<usize, tx::Error> {
buffer.write(self.as_ref())
}
}

Expand Down

0 comments on commit 3f9be0a

Please sign in to comment.