Skip to content

Commit 76a268b

Browse files
committed
tonic: add compression threshold for message encoding
Messages smaller than 1024 bytes are no longer compressed even when compression is enabled. This optimization avoids the overhead of compression for small messages where the compression ratio would be minimal or potentially increase the message size. The threshold is set to 1024 bytes (matching UNCOMPRESSED_MIN_BODY_SIZE used in tests). Messages below this threshold are sent uncompressed with the compression flag unset in the gRPC header.
1 parent c9cc210 commit 76a268b

File tree

3 files changed

+32
-16
lines changed

3 files changed

+32
-16
lines changed

tests/compression/src/lib.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ impl test_server::Test for Svc {
6666
&self,
6767
_req: Request<()>,
6868
) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> {
69-
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
70-
let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
71-
.take(2)
72-
.map(Ok::<_, Status>);
69+
// Messages smaller than 1024 don't get compressed and we want to
70+
// test that the first message doesn't get compressed
71+
let small = vec![0u8; UNCOMPRESSED_MIN_BODY_SIZE - 100];
72+
let big = vec![0u8; UNCOMPRESSED_MIN_BODY_SIZE];
73+
74+
let stream = tokio_stream::iter([
75+
Ok::<_, Status>(SomeData { data: small }),
76+
Ok::<_, Status>(SomeData { data: big }),
77+
]);
7378
Ok(self.prepare_response(Response::new(Box::pin(stream))))
7479
}
7580

tests/compression/src/server_stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ async fn client_enabled_server_enabled(encoding: CompressionEncoding) {
5858
.await
5959
.expect("stream empty")
6060
.expect("item was error");
61-
assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
61+
// The first message shouldn't get compressed because it's below the threshold
62+
assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE - 100);
6263

6364
stream
6465
.next()

tonic/src/codec/encode.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,16 @@ fn encode_item<T>(
142142
where
143143
T: Encoder<Error = Status>,
144144
{
145+
const COMPRESSION_THRESHOLD: usize = 1024;
145146
let offset = buf.len();
146147

147148
buf.reserve(HEADER_SIZE);
148149
unsafe {
149150
buf.advance_mut(HEADER_SIZE);
150151
}
151152

153+
let mut was_compressed = false;
154+
152155
if let Some(encoding) = compression_encoding {
153156
uncompression_buf.clear();
154157

@@ -158,24 +161,31 @@ where
158161

159162
let uncompressed_len = uncompression_buf.len();
160163

161-
compress(
162-
CompressionSettings {
163-
encoding,
164-
buffer_growth_interval: buffer_settings.buffer_size,
165-
},
166-
uncompression_buf,
167-
buf,
168-
uncompressed_len,
169-
)
170-
.map_err(|err| Status::internal(format!("Error compressing: {err}")))?;
164+
if uncompressed_len >= COMPRESSION_THRESHOLD {
165+
compress(
166+
CompressionSettings {
167+
encoding,
168+
buffer_growth_interval: buffer_settings.buffer_size,
169+
},
170+
uncompression_buf,
171+
buf,
172+
uncompressed_len,
173+
)
174+
.map_err(|err| Status::internal(format!("Error compressing: {err}")))?;
175+
was_compressed = true;
176+
} else {
177+
buf.reserve(uncompressed_len);
178+
buf.extend_from_slice(&uncompression_buf[..]);
179+
}
171180
} else {
172181
encoder
173182
.encode(item, &mut EncodeBuf::new(buf))
174183
.map_err(|err| Status::internal(format!("Error encoding: {err}")))?;
175184
}
176185

177186
// now that we know length, we can write the header
178-
finish_encoding(compression_encoding, max_message_size, &mut buf[offset..])
187+
let final_compression = if was_compressed { compression_encoding } else { None };
188+
finish_encoding(final_compression, max_message_size, &mut buf[offset..])
179189
}
180190

181191
fn finish_encoding(

0 commit comments

Comments
 (0)