diff --git a/poem-grpc/src/encoding.rs b/poem-grpc/src/encoding.rs index 39ffec5766..21fe260290 100644 --- a/poem-grpc/src/encoding.rs +++ b/poem-grpc/src/encoding.rs @@ -193,8 +193,8 @@ pub(crate) fn create_decode_response_body( let message = decoder.decode(&data).map_err(Status::from_std_error)?; yield message; } - frame_decoder.check_incomplete()?; } else if frame.is_trailers() { + frame_decoder.check_incomplete()?; let headers = frame.into_trailers().unwrap(); status = Some(Status::from_headers(&headers)? .ok_or_else(|| Status::new(Code::Internal) @@ -209,3 +209,58 @@ pub(crate) fn create_decode_response_body( } })) } + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use futures_util::TryStreamExt; + use http::HeaderMap; + use poem::Body; + use prost::Message; + + use crate::codec::{Codec, ProstCodec}; + + use super::create_decode_response_body; + + #[derive(Clone, PartialEq, Message)] + struct TestMsg { + #[prost(string, tag = 1)] + value: String, + } + + #[tokio::test] + async fn msg_data_spans_multiple_frames() { + // Split and encoded message into multiple frames. + let msg = TestMsg { + value: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".into(), + }; + let mut encoded = msg.encode_to_vec(); + + // Compression flag + u32 big endian size + let mut first = vec![0]; + first.extend((encoded.len() as u32).to_be_bytes()); + + // Split the data into multiple frames. + let rest = encoded.split_off(encoded.len() / 2); + first.extend(encoded); + + let bytes_stream = futures_util::stream::iter(vec![ + Ok::<_, std::io::Error>(Bytes::from(first)), + Ok(Bytes::from(rest)), + ]); + let body = Body::from_bytes_stream(bytes_stream); + + let mut codec = ProstCodec::::default(); + let mut streaming = + create_decode_response_body(codec.decoder(), &HeaderMap::default(), body) + .expect("streaming"); + + let stream_msg = streaming + .try_next() + .await + .expect("msg") + .expect("one message"); + + assert_eq!(msg, stream_msg); + } +}