Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions tests/compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ impl test_server::Test for Svc {
&self,
_req: Request<()>,
) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> {
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
.take(2)
.map(Ok::<_, Status>);
// Messages smaller than 1024 don't get compressed and we want to
// test that the first message doesn't get compressed
let small = vec![0u8; UNCOMPRESSED_MIN_BODY_SIZE - 100];
let big = vec![0u8; UNCOMPRESSED_MIN_BODY_SIZE];

let stream = tokio_stream::iter([
Ok::<_, Status>(SomeData { data: small }),
Ok::<_, Status>(SomeData { data: big }),
]);
Ok(self.prepare_response(Response::new(Box::pin(stream))))
}

Expand Down
3 changes: 2 additions & 1 deletion tests/compression/src/server_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) {
.await
.expect("stream empty")
.expect("item was error");
assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
// The first message shouldn't get compressed because it's below the threshold
assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE - 100);

stream
.next()
Expand Down
44 changes: 43 additions & 1 deletion tonic/src/codec/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,34 @@ use bytes::{Buf, BufMut, BytesMut};
use flate2::read::{GzDecoder, GzEncoder};
#[cfg(feature = "deflate")]
use flate2::read::{ZlibDecoder, ZlibEncoder};
use std::{borrow::Cow, fmt};
use std::{borrow::Cow, fmt, sync::OnceLock};
#[cfg(feature = "zstd")]
use zstd::stream::read::{Decoder, Encoder};

pub(crate) const ENCODING_HEADER: &str = "grpc-encoding";
pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding";

/// Get the compression threshold from environment variable or default (1024 bytes)
fn get_compression_threshold() -> usize {
static THRESHOLD: OnceLock<usize> = OnceLock::new();
*THRESHOLD.get_or_init(|| {
std::env::var("TONIC_COMPRESSION_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(1024)
})
}

/// Get the spawn_blocking threshold from environment variable (disabled by default)
fn get_spawn_blocking_threshold() -> Option<usize> {
static THRESHOLD: OnceLock<Option<usize>> = OnceLock::new();
*THRESHOLD.get_or_init(|| {
std::env::var("TONIC_SPAWN_BLOCKING_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
})
}

/// Struct used to configure which encodings are enabled on a server or channel.
///
/// Represents an ordered list of compression encodings that are enabled.
Expand Down Expand Up @@ -77,6 +98,26 @@ pub(crate) struct CompressionSettings {
/// buffer_growth_interval controls memory growth for internal buffers to balance resizing cost against memory waste.
/// The default buffer growth interval is 8 kilobytes.
pub(crate) buffer_growth_interval: usize,
/// Minimum message size (in bytes) to compress. Messages smaller than this are sent uncompressed.
/// Can be configured via TONIC_COMPRESSION_THRESHOLD environment variable. Default: 1024 bytes.
pub(crate) compression_threshold: usize,
/// Minimum message size (in bytes) to use spawn_blocking for compression.
/// If set, messages larger than this threshold will be compressed on a blocking thread pool.
/// Can be configured via TONIC_SPAWN_BLOCKING_THRESHOLD environment variable. Default: None (disabled).
pub(crate) spawn_blocking_threshold: Option<usize>,
}

impl CompressionSettings {
/// Create new CompressionSettings with thresholds loaded from environment variables
#[inline]
pub(crate) fn new(encoding: CompressionEncoding, buffer_growth_interval: usize) -> Self {
Self {
encoding,
buffer_growth_interval,
compression_threshold: get_compression_threshold(),
spawn_blocking_threshold: get_spawn_blocking_threshold(),
}
}
}

/// The compression encodings Tonic supports.
Expand Down Expand Up @@ -252,6 +293,7 @@ pub(crate) fn compress(
}

/// Decompress `len` bytes from `compressed_buf` into `out_buf`.
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
#[allow(unused_variables, unreachable_code)]
pub(crate) fn decompress(
settings: CompressionSettings,
Expand Down
61 changes: 38 additions & 23 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::compression::{decompress, CompressionEncoding, CompressionSettings};
use super::compression::CompressionEncoding;
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
use super::compression::{decompress, CompressionSettings};
use super::{BufferSettings, DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE};
use crate::{body::Body, metadata::MetadataMap, Code, Status};
use bytes::{Buf, BufMut, BytesMut};
Expand Down Expand Up @@ -30,6 +32,7 @@ struct StreamingInner {
direction: Direction,
buf: BytesMut,
trailers: Option<HeaderMap>,
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
decompress_buf: BytesMut,
encoding: Option<CompressionEncoding>,
max_message_size: Option<usize>,
Expand Down Expand Up @@ -136,6 +139,7 @@ impl<T> Streaming<T> {
direction,
buf: BytesMut::with_capacity(buffer_size),
trailers: None,
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
decompress_buf: BytesMut::new(),
encoding,
max_message_size,
Expand All @@ -147,6 +151,10 @@ impl<T> Streaming<T> {
impl StreamingInner {
fn decode_chunk(
&mut self,
#[cfg_attr(
not(any(feature = "gzip", feature = "deflate", feature = "zstd")),
allow(unused_variables)
)]
buffer_settings: BufferSettings,
) -> Result<Option<DecodeBuf<'_>>, Status> {
if let State::ReadHeader = self.state {
Expand Down Expand Up @@ -209,29 +217,36 @@ impl StreamingInner {
return Ok(None);
}

let decode_buf = if let Some(encoding) = compression {
self.decompress_buf.clear();

if let Err(err) = decompress(
CompressionSettings {
encoding,
buffer_growth_interval: buffer_settings.buffer_size,
},
&mut self.buf,
&mut self.decompress_buf,
len,
) {
let message = if let Direction::Response(status) = self.direction {
format!(
"Error decompressing: {err}, while receiving response with status: {status}"
)
} else {
format!("Error decompressing: {err}, while sending request")
};
return Err(Status::internal(message));
let decode_buf = if let Some(_encoding) = compression {
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
{
let encoding = _encoding;
self.decompress_buf.clear();

if let Err(err) = decompress(
CompressionSettings::new(encoding, buffer_settings.buffer_size),
&mut self.buf,
&mut self.decompress_buf,
len,
) {
let message = if let Direction::Response(status) = self.direction {
format!(
"Error decompressing: {err}, while receiving response with status: {status}"
)
} else {
format!("Error decompressing: {err}, while sending request")
};
return Err(Status::internal(message));
}
let decompressed_len = self.decompress_buf.len();
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
}
#[cfg(not(any(feature = "gzip", feature = "deflate", feature = "zstd")))]
{
// This branch is unreachable when no compression features are enabled
// because CompressionEncoding has no variants
unreachable!("Compression encoding without compression features")
}
let decompressed_len = self.decompress_buf.len();
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
} else {
DecodeBuf::new(&mut self.buf, len)
};
Expand Down
Loading
Loading