From e149df2e3e78d8a30cd6d60b63df7b75e381607b Mon Sep 17 00:00:00 2001 From: Antonin Amand Date: Wed, 15 May 2024 19:51:12 +0200 Subject: [PATCH] poem-grpc: message can span multiple frame When a response was big enough to be split into multiple frames, the response decoder was incorrectly checking the frame decoder for an incomplete frame before receiving the remaining bytes. --- poem-grpc/src/encoding.rs | 59 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/poem-grpc/src/encoding.rs b/poem-grpc/src/encoding.rs index 39ffec5766..4a19411047 100644 --- a/poem-grpc/src/encoding.rs +++ b/poem-grpc/src/encoding.rs @@ -193,8 +193,8 @@ pub(crate) fn create_decode_response_body( let message = decoder.decode(&data).map_err(Status::from_std_error)?; yield message; } - frame_decoder.check_incomplete()?; } else if frame.is_trailers() { + frame_decoder.check_incomplete()?; let headers = frame.into_trailers().unwrap(); status = Some(Status::from_headers(&headers)? .ok_or_else(|| Status::new(Code::Internal) @@ -209,3 +209,60 @@ pub(crate) fn create_decode_response_body( } })) } + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use futures_util::TryStreamExt; + use http::HeaderMap; + use poem::Body; + use prost::Message; + + use super::create_decode_response_body; + use crate::codec::{Codec, ProstCodec}; + + #[derive(Clone, PartialEq, Message)] + struct TestMsg { + #[prost(string, tag = 1)] + value: String, + } + + #[tokio::test] + async fn msg_data_spans_multiple_frames() { + // Split and encoded message into multiple frames. + let msg = TestMsg { + value: + "A program is like a poem, you cannot write a poem without writing it. --- Dijkstra" + .into(), + }; + let encoded = msg.encode_to_vec(); + let len = encoded.len(); + + // Compression flag + u32 big endian size + let mut buffer = vec![0]; + buffer.extend((len as u32).to_be_bytes()); + buffer.extend(encoded); + + // Split the data into multiple frames. + let (first_frame, second_frame) = buffer.split_at(len / 2); + + let bytes_stream = futures_util::stream::iter(vec![ + Ok::<_, std::io::Error>(Bytes::from(first_frame.to_vec())), + Ok(Bytes::from(second_frame.to_vec())), + ]); + let body = Body::from_bytes_stream(bytes_stream); + + let mut codec = ProstCodec::::default(); + let mut streaming = + create_decode_response_body(codec.decoder(), &HeaderMap::default(), body) + .expect("streaming"); + + let stream_msg = streaming + .try_next() + .await + .expect("msg") + .expect("one message"); + + assert_eq!(msg, stream_msg); + } +}