Skip to content

Commit

Permalink
poem-grpc: message can span multiple frame
Browse files Browse the repository at this point in the history
When a response was big enough to be split into multiple frames,
the response decoder was incorrectly checking the frame decoder
for an incomplete frame before receiving the remaining bytes.
  • Loading branch information
gwik committed May 15, 2024
1 parent 393ec48 commit c067c82
Showing 1 changed file with 56 additions and 1 deletion.
57 changes: 56 additions & 1 deletion poem-grpc/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ pub(crate) fn create_decode_response_body<T: Decoder>(
let message = decoder.decode(&data).map_err(Status::from_std_error)?;
yield message;
}
frame_decoder.check_incomplete()?;
} else if frame.is_trailers() {
frame_decoder.check_incomplete()?;
let headers = frame.into_trailers().unwrap();
status = Some(Status::from_headers(&headers)?
.ok_or_else(|| Status::new(Code::Internal)
Expand All @@ -209,3 +209,58 @@ pub(crate) fn create_decode_response_body<T: Decoder>(
}
}))
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures_util::TryStreamExt;
use http::HeaderMap;
use poem::Body;
use prost::Message;

use crate::codec::{Codec, ProstCodec};

use super::create_decode_response_body;

#[derive(Clone, PartialEq, Message)]
struct TestMsg {
#[prost(string, tag = 1)]
value: String,
}

#[tokio::test]
async fn msg_data_spans_multiple_frames() {
// Split and encoded message into multiple frames.
let msg = TestMsg {
value: "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx".into(),
};
let mut encoded = msg.encode_to_vec();

// Compression flag + u32 big endian size
let mut first = vec![0];
first.extend((encoded.len() as u32).to_be_bytes());

// Split the data into multiple frames.
let rest = encoded.split_off(encoded.len() / 2);
first.extend(encoded);

let bytes_stream = futures_util::stream::iter(vec![
Ok::<_, std::io::Error>(Bytes::from(first)),
Ok(Bytes::from(rest)),
]);
let body = Body::from_bytes_stream(bytes_stream);

let mut codec = ProstCodec::<TestMsg, TestMsg>::default();
let mut streaming =
create_decode_response_body(codec.decoder(), &HeaderMap::default(), body)
.expect("streaming");

let stream_msg = streaming
.try_next()
.await
.expect("msg")
.expect("one message");

assert_eq!(msg, stream_msg);
}
}

0 comments on commit c067c82

Please sign in to comment.