Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 125 additions & 30 deletions src/bytes_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use crate::bytes::Vtable;
#[allow(unused)]
use crate::loom::sync::atomic::AtomicMut;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::try_alloc::{
AllocStrategy, FallibleAllocStrategy, NoAllocStrategy, PanickingAllocStrategy, TryReserveError,
};
use crate::{Buf, BufMut, Bytes, TryGetError};

/// A unique reference to a contiguous slice of memory.
Expand Down Expand Up @@ -599,12 +602,92 @@ impl BytesMut {
}

// will always succeed
let _ = self.reserve_inner(additional, true);
let _ = self.reserve_inner(additional, PanickingAllocStrategy);
}

// In separate function to allow the short-circuits in `reserve` and `try_reclaim` to
// be inline-able. Significantly helps performance. Returns false if it did not succeed.
fn reserve_inner(&mut self, additional: usize, allocate: bool) -> bool {
/// Reserves capacity for at least `additional` more bytes to be inserted
/// into the given `BytesMut`.
///
/// More than `additional` bytes may be reserved in order to avoid frequent
/// reallocations. A call to `reserve` may result in an allocation.
///
/// Before allocating new buffer space, the function will attempt to reclaim
/// space in the existing buffer. If the current handle references a view
/// into a larger original buffer, and all other handles referencing part
/// of the same original buffer have been dropped, then the current view
/// can be copied/shifted to the front of the buffer and the handle can take
/// ownership of the full buffer, provided that the full buffer is large
/// enough to fit the requested additional capacity.
///
/// This optimization will only happen if shifting the data from the current
/// view to the front of the buffer is not too expensive in terms of the
/// (amortized) time required. The precise condition is subject to change;
/// as of now, the length of the data being shifted needs to be at least as
/// large as the distance that it's shifted by. If the current view is empty
/// and the original buffer is large enough to fit the requested additional
/// capacity, then reallocations will never happen.
///
/// # Examples
///
/// In the following example, a new buffer is allocated.
///
/// ```
/// # fn main() -> Result<(), bytes::TryReserveError> {
/// use bytes::BytesMut;
///
/// let mut buf = BytesMut::from(&b"hello"[..]);
/// buf.try_reserve(64)?;
/// assert!(buf.capacity() >= 69);
/// # Ok(())
/// # }
/// ```
///
/// In the following example, the existing buffer is reclaimed.
///
/// ```
/// # fn main() -> Result<(), bytes::TryReserveError> {
/// use bytes::{BytesMut, BufMut};
///
/// let mut buf = BytesMut::with_capacity(128);
/// buf.put(&[0; 64][..]);
///
/// let ptr = buf.as_ptr();
/// let other = buf.split();
///
/// assert!(buf.is_empty());
/// assert_eq!(buf.capacity(), 64);
///
/// drop(other);
/// buf.try_reserve(128)?;
///
/// assert_eq!(buf.capacity(), 128);
/// assert_eq!(buf.as_ptr(), ptr);
/// # Ok(())
/// # }
/// ```
///
/// # Errors
///
/// Errors if the new capacity overflows `usize` or the underlying allocator fails.
#[inline]
pub fn try_reserve(&mut self, additional: usize) -> Result<(), TryReserveError> {
let len = self.len();
let rem = self.capacity() - len;

if additional <= rem {
// The handle can already store at least `additional` more bytes, so
// there is no further work needed to be done.
return Ok(());
}

self.reserve_inner(additional, FallibleAllocStrategy)
.map(|_| ())
}

fn reserve_inner<S>(&mut self, additional: usize, strategy: S) -> Result<(), S::Err>
where
S: AllocStrategy,
{
let len = self.len();
let kind = self.kind();

Expand Down Expand Up @@ -649,22 +732,19 @@ impl BytesMut {
// can gain capacity back.
self.cap += off;
} else {
if !allocate {
return false;
}
// Not enough space, or reusing might be too much overhead:
// allocate more space!
let mut v =
ManuallyDrop::new(rebuild_vec(self.ptr.as_ptr(), self.len, self.cap, off));
v.reserve(additional);
strategy.fallible_reserve(&mut v, additional)?;

// Update the info
self.ptr = vptr(v.as_mut_ptr().add(off));
self.cap = v.capacity() - off;
debug_assert_eq!(self.len, v.len() - off);
}

return true;
return Ok(());
}
}

Expand All @@ -675,11 +755,9 @@ impl BytesMut {
// allocating a new vector with the requested capacity.
//
// Compute the new capacity
let mut new_cap = match len.checked_add(additional) {
Some(new_cap) => new_cap,
None if !allocate => return false,
None => panic!("overflow"),
};
let mut new_cap = len
.checked_add(additional)
.ok_or_else(|| strategy.capacity_overflow())?;

unsafe {
// First, try to reclaim the buffer. This is possible if the current
Expand Down Expand Up @@ -710,17 +788,16 @@ impl BytesMut {
self.ptr = vptr(ptr);
self.cap = v.capacity();
} else {
if !allocate {
return false;
}
// calculate offset
let off = (self.ptr.as_ptr() as usize) - (v.as_ptr() as usize);

// new_cap is calculated in terms of `BytesMut`, not the underlying
// `Vec`, so it does not take the offset into account.
//
// Thus we have to manually add it here.
new_cap = new_cap.checked_add(off).expect("overflow");
new_cap = new_cap
.checked_add(off)
.ok_or_else(|| strategy.capacity_overflow())?;

// The vector capacity is not sufficient. The reserve request is
// asking for more than the initial buffer capacity. Allocate more
Expand All @@ -742,29 +819,31 @@ impl BytesMut {
// the unused capacity of the vector is copied over to the new
// allocation, so we need to ensure that we don't have any data we
// care about in the unused capacity before calling `reserve`.
debug_assert!(off + len <= v.capacity());
v.set_len(off + len);
v.reserve(new_cap - v.len());
let new_len = off + len;
debug_assert!(new_len <= v.capacity());
let original_len = v.len();
v.set_len(new_len);
let additional = new_cap - v.len();
let guard = SetLenOnDrop::new(v, original_len);
strategy.fallible_reserve(guard.vec, additional)?;
mem::forget(guard);

// Update the info
self.ptr = vptr(v.as_mut_ptr().add(off));
self.cap = v.capacity() - off;
}

return true;
return Ok(());
}
}
if !allocate {
return false;
}

let original_capacity_repr = unsafe { (*shared).original_capacity_repr };
let original_capacity = original_capacity_from_repr(original_capacity_repr);

new_cap = cmp::max(new_cap, original_capacity);

// Create a new vector to store the data
let mut v = ManuallyDrop::new(Vec::with_capacity(new_cap));
let mut v = ManuallyDrop::new(strategy.fallible_with_capacity(new_cap)?);

// Copy the bytes
v.extend_from_slice(self.as_ref());
Expand All @@ -779,7 +858,7 @@ impl BytesMut {
self.ptr = vptr(v.as_mut_ptr());
self.cap = v.capacity();
debug_assert_eq!(self.len, v.len());
true
Ok(())
}

/// Attempts to cheaply reclaim already allocated capacity for at least `additional` more
Expand Down Expand Up @@ -825,8 +904,6 @@ impl BytesMut {
/// assert_eq!(true, split.try_reclaim(64));
/// assert_eq!(64, split.capacity());
/// ```
// I tried splitting out try_reclaim_inner after the short circuits, but it was inlined
// regardless with Rust 1.78.0 so probably not worth it
#[inline]
#[must_use = "consider BytesMut::reserve if you need an infallible reservation"]
pub fn try_reclaim(&mut self, additional: usize) -> bool {
Expand All @@ -839,7 +916,7 @@ impl BytesMut {
return true;
}

self.reserve_inner(additional, false)
self.reserve_inner(additional, NoAllocStrategy).is_ok()
}

/// Appends given bytes to this `BytesMut`.
Expand Down Expand Up @@ -1866,6 +1943,24 @@ unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize)
});
}

struct SetLenOnDrop<'a> {
vec: &'a mut Vec<u8>,
len: usize,
}

impl<'a> SetLenOnDrop<'a> {
unsafe fn new(vec: &'a mut Vec<u8>, len: usize) -> Self {
debug_assert!(len <= vec.capacity());
Self { vec, len }
}
}

impl Drop for SetLenOnDrop<'_> {
fn drop(&mut self) {
unsafe { self.vec.set_len(self.len) }
}
}

// compile-fails

/// ```compile_fail
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ mod bytes;
mod bytes_mut;
mod fmt;
mod loom;
mod try_alloc;
pub use crate::bytes::Bytes;
pub use crate::bytes_mut::BytesMut;
pub use crate::try_alloc::TryReserveError;

// Optional Serde support
#[cfg(feature = "serde")]
Expand Down
108 changes: 108 additions & 0 deletions src/try_alloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use alloc::vec::Vec;
use core::{
convert::Infallible,
fmt::{self, Display},
};
#[cfg(feature = "std")]
use std::error::Error;

/// The error type for try_reserve methods.
#[derive(Debug)]
pub struct TryReserveError(TryReserveErrorInner);

#[derive(Debug)]
pub(crate) enum TryReserveErrorInner {
Std(alloc::collections::TryReserveError),
Overflow,
}

impl Display for TryReserveError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
TryReserveErrorInner::Std(err) => Display::fmt(err, f),
TryReserveErrorInner::Overflow => f.write_str("memory allocation failed because the computed capacity exceeded the collection's maximum"),
}
}
}

#[cfg(feature = "std")]
impl Error for TryReserveError {}

/// The allocation strategy
///
/// # Safety
///
/// `fallible_with_capacity` must behave the same as
/// `Vec::with_capacity`, `Vec::try_with_capacity` or always fail.
///
/// `fallible_reserve` must behave the same as `Vec::reserve`,
/// `Vec::try_reserve` or always fail.
pub(crate) unsafe trait AllocStrategy {
type Err;

fn fallible_with_capacity<T>(&self, capacity: usize) -> Result<Vec<T>, Self::Err>;

fn fallible_reserve<T>(&self, vec: &mut Vec<T>, additional: usize) -> Result<(), Self::Err>;

fn capacity_overflow(&self) -> Self::Err;
}

pub(crate) struct PanickingAllocStrategy;

unsafe impl AllocStrategy for PanickingAllocStrategy {
type Err = Infallible;

fn fallible_with_capacity<T>(&self, capacity: usize) -> Result<Vec<T>, Self::Err> {
Ok(Vec::with_capacity(capacity))
}

fn fallible_reserve<T>(&self, vec: &mut Vec<T>, additional: usize) -> Result<(), Self::Err> {
vec.reserve(additional);
Ok(())
}

#[track_caller]
fn capacity_overflow(&self) -> Self::Err {
panic!("overflow")
}
}

pub(crate) struct FallibleAllocStrategy;

unsafe impl AllocStrategy for FallibleAllocStrategy {
type Err = TryReserveError;

fn fallible_with_capacity<T>(&self, capacity: usize) -> Result<Vec<T>, Self::Err> {
let mut vec = Vec::new();
self.fallible_reserve(&mut vec, capacity)?;
Ok(vec)
}

fn fallible_reserve<T>(&self, vec: &mut Vec<T>, additional: usize) -> Result<(), Self::Err> {
vec.try_reserve(additional)
.map_err(|err| TryReserveError(TryReserveErrorInner::Std(err)))
}

fn capacity_overflow(&self) -> Self::Err {
TryReserveError(TryReserveErrorInner::Overflow)
}
}

pub(crate) struct NoAllocStrategy;

unsafe impl AllocStrategy for NoAllocStrategy {
type Err = ();

fn fallible_with_capacity<T>(&self, capacity: usize) -> Result<Vec<T>, Self::Err> {
let _ = capacity;
Err(())
}

fn fallible_reserve<T>(&self, vec: &mut Vec<T>, additional: usize) -> Result<(), Self::Err> {
let _ = vec;
let _ = additional;
Err(())
}

fn capacity_overflow(&self) -> Self::Err {}
}
12 changes: 12 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,18 @@ fn bytes_reserve_overflow() {
bytes.reserve(usize::MAX);
}

#[test]
fn bytes_try_reserve_overflow() {
let mut bytes = BytesMut::with_capacity(1024);
bytes.put_slice(b"hello world");

let err = bytes.try_reserve(usize::MAX).unwrap_err();
assert_eq!(
"memory allocation failed because the computed capacity exceeded the collection's maximum",
err.to_string()
);
}

#[test]
fn bytes_with_capacity_but_empty() {
// See https://github.com/tokio-rs/bytes/issues/340
Expand Down