Skip to content

Commit 5aa700e

Browse files
committed
feature flags for anything that uses tokio since it might now be available
1 parent 643fd54 commit 5aa700e

File tree

2 files changed

+78
-49
lines changed

2 files changed

+78
-49
lines changed

tonic/src/codec/decode.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -210,25 +210,34 @@ impl StreamingInner {
210210
}
211211

212212
let decode_buf = if let Some(encoding) = compression {
213-
self.decompress_buf.clear();
214-
215-
if let Err(err) = decompress(
216-
CompressionSettings::new(encoding, buffer_settings.buffer_size),
217-
&mut self.buf,
218-
&mut self.decompress_buf,
219-
len,
220-
) {
221-
let message = if let Direction::Response(status) = self.direction {
222-
format!(
223-
"Error decompressing: {err}, while receiving response with status: {status}"
224-
)
225-
} else {
226-
format!("Error decompressing: {err}, while sending request")
227-
};
228-
return Err(Status::internal(message));
213+
#[cfg(any(feature = "gzip", feature = "deflate", feature = "zstd"))]
214+
{
215+
self.decompress_buf.clear();
216+
217+
if let Err(err) = decompress(
218+
CompressionSettings::new(encoding, buffer_settings.buffer_size),
219+
&mut self.buf,
220+
&mut self.decompress_buf,
221+
len,
222+
) {
223+
let message = if let Direction::Response(status) = self.direction {
224+
format!(
225+
"Error decompressing: {err}, while receiving response with status: {status}"
226+
)
227+
} else {
228+
format!("Error decompressing: {err}, while sending request")
229+
};
230+
return Err(Status::internal(message));
231+
}
232+
let decompressed_len = self.decompress_buf.len();
233+
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
234+
}
235+
#[cfg(not(any(feature = "gzip", feature = "deflate", feature = "zstd")))]
236+
{
237+
// This branch is unreachable when no compression features are enabled
238+
// because CompressionEncoding has no variants
239+
unreachable!("Compression encoding without compression features")
229240
}
230-
let decompressed_len = self.decompress_buf.len();
231-
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
232241
} else {
233242
DecodeBuf::new(&mut self.buf, len)
234243
};

tonic/src/codec/encode.rs

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::{
1212
pin::Pin,
1313
task::{ready, Context, Poll},
1414
};
15+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
1516
use tokio::task::JoinHandle;
1617
use tokio_stream::{adapters::Fuse, Stream, StreamExt};
1718

@@ -38,6 +39,7 @@ struct EncodedBytes<T, U> {
3839
buf: BytesMut,
3940
uncompression_buf: BytesMut,
4041
error: Option<Status>,
42+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
4143
#[pin]
4244
compression_task: Option<JoinHandle<Result<CompressionResult, Status>>>,
4345
}
@@ -74,6 +76,7 @@ impl<T: Encoder, U: Stream> EncodedBytes<T, U> {
7476
buf,
7577
uncompression_buf,
7678
error: None,
79+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
7780
compression_task: None,
7881
}
7982
}
@@ -113,6 +116,7 @@ where
113116
uncompression_buf: &mut BytesMut,
114117
compression_encoding: Option<CompressionEncoding>,
115118
max_message_size: Option<usize>,
119+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
116120
compression_task: &mut Pin<&mut Option<JoinHandle<Result<CompressionResult, Status>>>>,
117121
buffer_settings: &BufferSettings,
118122
) -> Result<bool, Status> {
@@ -127,6 +131,8 @@ where
127131

128132
let uncompressed_len = uncompression_buf.len();
129133

134+
// Check if we should use spawn_blocking (only when tokio is available)
135+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
130136
if let Some(spawn_threshold) = settings.spawn_blocking_threshold {
131137
if uncompressed_len >= spawn_threshold
132138
&& uncompressed_len >= settings.compression_threshold
@@ -156,6 +162,7 @@ where
156162
Ok(false)
157163
}
158164

165+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
159166
fn poll_compression_task(
160167
compression_task: &mut Pin<&mut Option<JoinHandle<Result<CompressionResult, Status>>>>,
161168
buf: &mut BytesMut,
@@ -226,6 +233,7 @@ where
226233
buf,
227234
uncompression_buf,
228235
error,
236+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
229237
mut compression_task,
230238
} = self.project();
231239
let buffer_settings = encoder.buffer_settings();
@@ -235,17 +243,20 @@ where
235243
}
236244

237245
// Check if we have an in-flight compression task
238-
match Self::poll_compression_task(
239-
&mut compression_task,
240-
buf,
241-
*max_message_size,
242-
&buffer_settings,
243-
cx,
244-
) {
245-
Poll::Ready(Some(result)) => return Poll::Ready(Some(result)),
246-
Poll::Pending => return Poll::Pending,
247-
Poll::Ready(None) => {
248-
// Task completed, continue processing
246+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
247+
{
248+
match Self::poll_compression_task(
249+
&mut compression_task,
250+
buf,
251+
*max_message_size,
252+
&buffer_settings,
253+
cx,
254+
) {
255+
Poll::Ready(Some(result)) => return Poll::Ready(Some(result)),
256+
Poll::Pending => return Poll::Pending,
257+
Poll::Ready(None) => {
258+
// Task completed, continue processing
259+
}
249260
}
250261
}
251262

@@ -268,32 +279,41 @@ where
268279
uncompression_buf,
269280
*compression_encoding,
270281
*max_message_size,
282+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
271283
&mut compression_task,
272284
&buffer_settings,
273285
) {
274286
Ok(true) => {
275-
// We just spawned/armed the blocking compression task.
276-
// Poll it once right away so it can capture our waker.
277-
match Self::poll_compression_task(
278-
&mut compression_task,
279-
buf,
280-
*max_message_size,
281-
&buffer_settings,
282-
cx,
283-
) {
284-
Poll::Ready(Some(result)) => {
285-
return Poll::Ready(Some(result));
286-
}
287-
Poll::Ready(None) => {
288-
if buf.len() >= buffer_settings.yield_threshold {
289-
return Poll::Ready(Some(Ok(buf
290-
.split_to(buf.len())
291-
.freeze())));
287+
#[cfg(any(feature = "transport", feature = "channel", feature = "server"))]
288+
{
289+
// We just spawned/armed the blocking compression task.
290+
// Poll it once right away so it can capture our waker.
291+
match Self::poll_compression_task(
292+
&mut compression_task,
293+
buf,
294+
*max_message_size,
295+
&buffer_settings,
296+
cx,
297+
) {
298+
Poll::Ready(Some(result)) => {
299+
return Poll::Ready(Some(result));
300+
}
301+
Poll::Ready(None) => {
302+
if buf.len() >= buffer_settings.yield_threshold {
303+
return Poll::Ready(Some(Ok(buf
304+
.split_to(buf.len())
305+
.freeze())));
306+
}
307+
}
308+
Poll::Pending => {
309+
return Poll::Pending;
292310
}
293311
}
294-
Poll::Pending => {
295-
return Poll::Pending;
296-
}
312+
}
313+
#[cfg(not(any(feature = "transport", feature = "channel", feature = "server")))]
314+
{
315+
// This shouldn't happen when tokio is not available
316+
unreachable!("spawn_blocking returned true without tokio")
297317
}
298318
}
299319
Ok(false) => {

0 commit comments

Comments
 (0)