diff --git a/quic/s2n-quic-core/src/io/tx.rs b/quic/s2n-quic-core/src/io/tx.rs index 1c60ab9ca8..c8fc7a4982 100644 --- a/quic/s2n-quic-core/src/io/tx.rs +++ b/quic/s2n-quic-core/src/io/tx.rs @@ -54,6 +54,9 @@ pub enum Error { /// The provided message did not write a payload EmptyPayload, + /// The provided buffer was too small for the desired payload + UndersizedBuffer, + /// The transmission queue is at capacity AtCapacity, } @@ -96,10 +99,47 @@ pub trait Message { fn ipv6_flow_label(&mut self) -> u32; /// Returns true if the packet can be used in a GSO packet - fn can_gso(&self) -> bool; + fn can_gso(&self, segment_len: usize) -> bool; /// Writes the payload of the message to an output buffer - fn write_payload(&mut self, buffer: &mut [u8], gso_offset: usize) -> usize; + fn write_payload(&mut self, buffer: PayloadBuffer, gso_offset: usize) -> Result; +} + +#[derive(Debug)] +pub struct PayloadBuffer<'a>(&'a mut [u8]); + +impl<'a> PayloadBuffer<'a> { + #[inline] + pub fn new(bytes: &'a mut [u8]) -> Self { + Self(bytes) + } + + /// # Safety + /// + /// This function should only be used in the case that the writer has its own safety checks in place + #[inline] + pub unsafe fn into_mut_slice(self) -> &'a mut [u8] { + self.0 + } + + #[track_caller] + #[inline] + pub fn write(&mut self, bytes: &[u8]) -> Result { + if bytes.is_empty() { + return Err(Error::EmptyPayload); + } + + if let Some(buffer) = self.0.get_mut(0..bytes.len()) { + buffer.copy_from_slice(bytes); + Ok(bytes.len()) + } else { + debug_assert!( + false, + "tried to write more bytes than was available in the buffer" + ); + Err(Error::UndersizedBuffer) + } + } } impl> Message for (Handle, Payload) { @@ -121,19 +161,16 @@ impl> Message for (Handle, Payload) { 0 } - fn can_gso(&self) -> bool { - true + fn can_gso(&self, segment_len: usize) -> bool { + segment_len >= self.1.as_ref().len() } - fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize { - let payload = self.1.as_ref(); - let len = payload.len(); - if let Some(buffer) = buffer.get_mut(..len) { - buffer.copy_from_slice(payload); - len - } else { - 0 - } + fn write_payload( + &mut self, + mut buffer: PayloadBuffer, + _gso_offset: usize, + ) -> Result { + buffer.write(self.1.as_ref()) } } @@ -158,9 +195,24 @@ mod tests { assert_eq!(message.ecn(), Default::default()); assert_eq!(message.delay(), Default::default()); assert_eq!(message.ipv6_flow_label(), 0); - assert_eq!(message.write_payload(&mut buffer[..], 0), 3); + assert_eq!( + message.write_payload(PayloadBuffer::new(&mut buffer[..]), 0), + Ok(3) + ); + } + + #[test] + #[should_panic] + fn message_tuple_undersized_test() { + let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into(); + let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into(); + let tuple = path::Tuple { + remote_address, + local_address, + }; + let mut message = (tuple, [1u8, 2, 3]); - // assert an empty buffer doesn't panic - assert_eq!(message.write_payload(&mut [][..], 0), 0); + // assert an undersized buffer panics in debug + let _ = message.write_payload(PayloadBuffer::new(&mut [][..]), 0); } } diff --git a/quic/s2n-quic-platform/src/io/testing/network.rs b/quic/s2n-quic-platform/src/io/testing/network.rs index 158be0f48d..17254bfcfb 100644 --- a/quic/s2n-quic-platform/src/io/testing/network.rs +++ b/quic/s2n-quic-platform/src/io/testing/network.rs @@ -348,13 +348,7 @@ impl io::tx::Entry for Packet { { self.path.remote_address = message.path_handle().remote_address; self.ecn = message.ecn(); - let len = message.write_payload(&mut self.payload, 0); - - if len == 0 { - return Err(io::tx::Error::EmptyPayload); - } - - Ok(len) + message.write_payload(io::tx::PayloadBuffer::new(&mut self.payload), 0) } fn payload(&self) -> &[u8] { diff --git a/quic/s2n-quic-platform/src/message/mmsg.rs b/quic/s2n-quic-platform/src/message/mmsg.rs index 37ead12266..00a6e21bf0 100644 --- a/quic/s2n-quic-platform/src/message/mmsg.rs +++ b/quic/s2n-quic-platform/src/message/mmsg.rs @@ -203,12 +203,7 @@ impl tx::Entry for Message { ) -> Result { let payload = MessageTrait::payload_mut(self); - let len = message.write_payload(payload, 0); - - // don't send empty payloads - if len == 0 { - return Err(tx::Error::EmptyPayload); - } + let len = message.write_payload(tx::PayloadBuffer::new(payload), 0)?; unsafe { debug_assert!(len <= payload.len()); diff --git a/quic/s2n-quic-platform/src/message/msg.rs b/quic/s2n-quic-platform/src/message/msg.rs index 2c4044488c..9bf5f400f7 100644 --- a/quic/s2n-quic-platform/src/message/msg.rs +++ b/quic/s2n-quic-platform/src/message/msg.rs @@ -534,12 +534,7 @@ impl tx::Entry for Message { ) -> Result { let payload = MessageTrait::payload_mut(self); - let len = message.write_payload(payload, 0); - - // don't send empty payloads - if len == 0 { - return Err(tx::Error::EmptyPayload); - } + let len = message.write_payload(tx::PayloadBuffer::new(payload), 0)?; unsafe { debug_assert!(len <= payload.len()); diff --git a/quic/s2n-quic-platform/src/message/queue/slice.rs b/quic/s2n-quic-platform/src/message/queue/slice.rs index a8910d60cf..8cfa13dae0 100644 --- a/quic/s2n-quic-platform/src/message/queue/slice.rs +++ b/quic/s2n-quic-platform/src/message/queue/slice.rs @@ -156,7 +156,7 @@ impl<'a, Message: message::Message, B> Slice<'a, Message, B> { let prev_message = &mut self.messages[gso.index]; // check to make sure the message can be GSO'd and can be included in the same // GSO payload as the previous message - if !(message.can_gso() && prev_message.can_gso(&mut message)) { + if !(message.can_gso(gso.size) && prev_message.can_gso(&mut message)) { self.flush_gso(); return Ok(Err(message)); } @@ -178,16 +178,26 @@ impl<'a, Message: message::Message, B> Slice<'a, Message, B> { // allow the message to write up to `gso.size` bytes let buffer = &mut message::Message::payload_mut(prev_message)[payload_len..]; + let buffer = tx::PayloadBuffer::new(buffer); - match message.write_payload(buffer, gso.count) { - 0 => { + match message.write_payload(buffer, gso.count).and_then(|size| { + // we don't want to send empty packets + if size == 0 { + Err(tx::Error::EmptyPayload) + } else { + Ok(size) + } + }) { + Err(err) => { unsafe { // revert the len to what it was before prev_message.set_payload_len(payload_len); } - Err(tx::Error::EmptyPayload) + Err(err) } - size => { + Ok(size) => { + debug_assert_ne!(size, 0, "payloads should never be empty"); + unsafe { debug_assert!( gso.size >= size, diff --git a/quic/s2n-quic-platform/src/message/simple.rs b/quic/s2n-quic-platform/src/message/simple.rs index b90738a142..7725730fe4 100644 --- a/quic/s2n-quic-platform/src/message/simple.rs +++ b/quic/s2n-quic-platform/src/message/simple.rs @@ -181,12 +181,7 @@ impl tx::Entry for Message { ) -> Result { let payload = MessageTrait::payload_mut(self); - let len = message.write_payload(payload, 0); - - // don't send empty payloads - if len == 0 { - return Err(tx::Error::EmptyPayload); - } + let len = message.write_payload(tx::PayloadBuffer::new(payload), 0)?; unsafe { debug_assert!(len <= payload.len()); diff --git a/quic/s2n-quic-transport/src/connection/close_sender.rs b/quic/s2n-quic-transport/src/connection/close_sender.rs index 4cc2f64ff2..eb22021248 100644 --- a/quic/s2n-quic-transport/src/connection/close_sender.rs +++ b/quic/s2n-quic-transport/src/connection/close_sender.rs @@ -152,8 +152,8 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message } #[inline] - fn can_gso(&self) -> bool { - true + fn can_gso(&self, segment_len: usize) -> bool { + segment_len >= self.packet.len() } #[inline] @@ -162,9 +162,11 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message } #[inline] - fn write_payload(&mut self, buffer: &mut [u8], gso_offset: usize) -> usize { - let len = self.packet.len(); - + fn write_payload( + &mut self, + mut buffer: tx::PayloadBuffer, + gso_offset: usize, + ) -> Result { //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1 //# | Note: Allowing retransmission of a closing packet is an //# | exception to the requirement that a new packet number be used @@ -173,7 +175,7 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message //# | control, which are not expected to be relevant for a closed //# | connection. Retransmitting the final packet requires less //# | state. - buffer[..len].copy_from_slice(self.packet); + let len = buffer.write(self.packet)?; self.path.on_bytes_transmitted(len); *self.transmission = TransmissionState::Idle; @@ -184,7 +186,7 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> tx::Message gso_offset, }); - len + Ok(len) } } @@ -361,9 +363,9 @@ mod tests { // transmit an initial packet assert!(sender.can_transmit(path.transmission_constraint())); let now = s2n_quic_platform::time::now(); - sender + let _ = sender .transmission(&mut path, now, &mut publisher) - .write_payload(&mut buffer, 0); + .write_payload(tx::PayloadBuffer::new(&mut buffer), 0); for (gap, packet_size) in events { // get the next timer event @@ -383,9 +385,9 @@ mod tests { for _ in 0..3 { let interest = sender.get_transmission_interest(); if interest.can_transmit(path.transmission_constraint()) { - sender + let _ = sender .transmission(&mut path, now, &mut publisher) - .write_payload(&mut buffer, 0); + .write_payload(tx::PayloadBuffer::new(&mut buffer), 0); transmission_count += 1; } } diff --git a/quic/s2n-quic-transport/src/connection/transmission.rs b/quic/s2n-quic-transport/src/connection/transmission.rs index 08b95bea65..500b497b83 100644 --- a/quic/s2n-quic-transport/src/connection/transmission.rs +++ b/quic/s2n-quic-transport/src/connection/transmission.rs @@ -72,7 +72,13 @@ impl<'a, 'sub, Config: endpoint::Config> tx::Message for ConnectionTransmission< } #[inline] - fn can_gso(&self) -> bool { + fn can_gso(&self, segment_len: usize) -> bool { + if let Some(min_packet_len) = self.context.min_packet_len { + if segment_len < min_packet_len { + return false; + } + } + // If a packet can be GSO'd it means it's limited to the previously written packet // size. This becomes a problem for MTU probes where they will likely exceed that amount. // As such, if we're probing we want to let the IO layer know to not GSO the current @@ -80,8 +86,17 @@ impl<'a, 'sub, Config: endpoint::Config> tx::Message for ConnectionTransmission< !self.context.transmission_mode.is_mtu_probing() } - fn write_payload(&mut self, buffer: &mut [u8], gso_offset: usize) -> usize { + fn write_payload( + &mut self, + buffer: tx::PayloadBuffer, + gso_offset: usize, + ) -> Result { let space_manager = &mut self.space_manager; + let buffer = unsafe { + // the WriterContext has its own checks for buffer capacity so convert `buffer` into a + // `&mut [u8]` + buffer.into_mut_slice() + }; let mtu = self .context @@ -361,9 +376,10 @@ impl<'a, 'sub, Config: endpoint::Config> tx::Message for ConnectionTransmission< len: datagram_len as u16, gso_offset, }); + Ok(datagram_len) + } else { + Err(tx::Error::EmptyPayload) } - - datagram_len } } diff --git a/quic/s2n-quic-transport/src/endpoint/retry.rs b/quic/s2n-quic-transport/src/endpoint/retry.rs index 511ecece71..9896736196 100644 --- a/quic/s2n-quic-transport/src/endpoint/retry.rs +++ b/quic/s2n-quic-transport/src/endpoint/retry.rs @@ -157,14 +157,16 @@ impl tx::Message for &Transmission { } #[inline] - fn can_gso(&self) -> bool { - true + fn can_gso(&self, segment_len: usize) -> bool { + segment_len >= self.as_ref().len() } #[inline] - fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize { - let packet = self.as_ref(); - buffer[..packet.len()].copy_from_slice(packet); - packet.len() + fn write_payload( + &mut self, + mut buffer: tx::PayloadBuffer, + _gso_offset: usize, + ) -> Result { + buffer.write(self.as_ref()) } } diff --git a/quic/s2n-quic-transport/src/endpoint/stateless_reset.rs b/quic/s2n-quic-transport/src/endpoint/stateless_reset.rs index 89f7e4d3ed..4707787cce 100644 --- a/quic/s2n-quic-transport/src/endpoint/stateless_reset.rs +++ b/quic/s2n-quic-transport/src/endpoint/stateless_reset.rs @@ -144,14 +144,16 @@ impl tx::Message for &Transmission { } #[inline] - fn can_gso(&self) -> bool { - true + fn can_gso(&self, segment_len: usize) -> bool { + segment_len >= self.as_ref().len() } #[inline] - fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize { - let packet = self.as_ref(); - buffer[..packet.len()].copy_from_slice(packet); - packet.len() + fn write_payload( + &mut self, + mut buffer: tx::PayloadBuffer, + _gso_offset: usize, + ) -> Result { + buffer.write(self.as_ref()) } } diff --git a/quic/s2n-quic-transport/src/endpoint/version.rs b/quic/s2n-quic-transport/src/endpoint/version.rs index d391a1876a..b71e8c8c2d 100644 --- a/quic/s2n-quic-transport/src/endpoint/version.rs +++ b/quic/s2n-quic-transport/src/endpoint/version.rs @@ -259,15 +259,17 @@ impl tx::Message for &Transmission { } #[inline] - fn can_gso(&self) -> bool { - true + fn can_gso(&self, segment_len: usize) -> bool { + segment_len >= self.as_ref().len() } #[inline] - fn write_payload(&mut self, buffer: &mut [u8], _gso_offset: usize) -> usize { - let packet = self.as_ref(); - buffer[..packet.len()].copy_from_slice(packet); - packet.len() + fn write_payload( + &mut self, + mut buffer: tx::PayloadBuffer, + _gso_offset: usize, + ) -> Result { + buffer.write(self.as_ref()) } }