Skip to content

Commit 643fd54

Browse files
committed
Compression thresholds configureable via env vars
When compressing large payloads we use spawn_blocking to move the job to a tread to not block the async worker thread
1 parent 7d51ce6 commit 643fd54

File tree

3 files changed

+274
-51
lines changed

3 files changed

+274
-51
lines changed

tonic/src/codec/compression.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,34 @@ use bytes::{Buf, BufMut, BytesMut};
44
use flate2::read::{GzDecoder, GzEncoder};
55
#[cfg(feature = "deflate")]
66
use flate2::read::{ZlibDecoder, ZlibEncoder};
7-
use std::{borrow::Cow, fmt};
7+
use std::{borrow::Cow, fmt, sync::OnceLock};
88
#[cfg(feature = "zstd")]
99
use zstd::stream::read::{Decoder, Encoder};
1010

1111
pub(crate) const ENCODING_HEADER: &str = "grpc-encoding";
1212
pub(crate) const ACCEPT_ENCODING_HEADER: &str = "grpc-accept-encoding";
1313

14+
/// Get the compression threshold from environment variable or default (1024 bytes)
15+
fn get_compression_threshold() -> usize {
16+
static THRESHOLD: OnceLock<usize> = OnceLock::new();
17+
*THRESHOLD.get_or_init(|| {
18+
std::env::var("TONIC_COMPRESSION_THRESHOLD")
19+
.ok()
20+
.and_then(|v| v.parse().ok())
21+
.unwrap_or(1024)
22+
})
23+
}
24+
25+
/// Get the spawn_blocking threshold from environment variable (disabled by default)
26+
fn get_spawn_blocking_threshold() -> Option<usize> {
27+
static THRESHOLD: OnceLock<Option<usize>> = OnceLock::new();
28+
*THRESHOLD.get_or_init(|| {
29+
std::env::var("TONIC_SPAWN_BLOCKING_THRESHOLD")
30+
.ok()
31+
.and_then(|v| v.parse().ok())
32+
})
33+
}
34+
1435
/// Struct used to configure which encodings are enabled on a server or channel.
1536
///
1637
/// Represents an ordered list of compression encodings that are enabled.
@@ -77,6 +98,26 @@ pub(crate) struct CompressionSettings {
7798
/// buffer_growth_interval controls memory growth for internal buffers to balance resizing cost against memory waste.
7899
/// The default buffer growth interval is 8 kilobytes.
79100
pub(crate) buffer_growth_interval: usize,
101+
/// Minimum message size (in bytes) to compress. Messages smaller than this are sent uncompressed.
102+
/// Can be configured via TONIC_COMPRESSION_THRESHOLD environment variable. Default: 1024 bytes.
103+
pub(crate) compression_threshold: usize,
104+
/// Minimum message size (in bytes) to use spawn_blocking for compression.
105+
/// If set, messages larger than this threshold will be compressed on a blocking thread pool.
106+
/// Can be configured via TONIC_SPAWN_BLOCKING_THRESHOLD environment variable. Default: None (disabled).
107+
pub(crate) spawn_blocking_threshold: Option<usize>,
108+
}
109+
110+
impl CompressionSettings {
111+
/// Create new CompressionSettings with thresholds loaded from environment variables
112+
#[inline]
113+
pub(crate) fn new(encoding: CompressionEncoding, buffer_growth_interval: usize) -> Self {
114+
Self {
115+
encoding,
116+
buffer_growth_interval,
117+
compression_threshold: get_compression_threshold(),
118+
spawn_blocking_threshold: get_spawn_blocking_threshold(),
119+
}
120+
}
80121
}
81122

82123
/// The compression encodings Tonic supports.

tonic/src/codec/decode.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,7 @@ impl StreamingInner {
213213
self.decompress_buf.clear();
214214

215215
if let Err(err) = decompress(
216-
CompressionSettings {
217-
encoding,
218-
buffer_growth_interval: buffer_settings.buffer_size,
219-
},
216+
CompressionSettings::new(encoding, buffer_settings.buffer_size),
220217
&mut self.buf,
221218
&mut self.decompress_buf,
222219
len,

0 commit comments

Comments
 (0)