From 5cb42ffce61b98435a4673646e528e613a625210 Mon Sep 17 00:00:00 2001 From: Apu Islam Date: Tue, 4 Nov 2025 13:01:30 +0000 Subject: [PATCH] feat(http2): implement HTTP/2 informational responses support Add support for HTTP/2 informational responses (1xx status codes) including 103 Early Hints. This enables servers to send preliminary headers before the final response, improving client performance through early resource discovery and connection establishment. Changes include: - extend client and server APIs to handle interim informational responses - update stream state management for 1xx responses - add test for interim informational response scenarios This implementation follows RFC 7540 and RFC 8297 specifications for HTTP/2 informational responses handling. --- src/client.rs | 13 + src/codec/error.rs | 4 + src/proto/streams/recv.rs | 83 +++- src/proto/streams/send.rs | 39 ++ src/proto/streams/streams.rs | 46 ++ src/server.rs | 98 +++++ .../h2-tests/tests/informational_responses.rs | 392 ++++++++++++++++++ 7 files changed, 661 insertions(+), 14 deletions(-) create mode 100644 tests/h2-tests/tests/informational_responses.rs diff --git a/src/client.rs b/src/client.rs index 0c3415c78..f7974cfb3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1485,6 +1485,19 @@ impl ResponseFuture { pub fn stream_id(&self) -> crate::StreamId { crate::StreamId::from_internal(self.inner.stream_id()) } + + /// Polls for informational responses (1xx status codes). + /// + /// This method should be called before polling the main response future + /// to check for any informational responses that have been received. + /// + /// Returns `Poll::Ready(Some(response))` if an informational response is available, + /// `Poll::Ready(None)` if no more informational responses are expected, + /// or `Poll::Pending` if no informational response is currently available. + pub fn poll_informational(&mut self, cx: &mut Context<'_>) -> Poll, crate::Error>>> { + self.inner.poll_informational(cx).map_err(Into::into) + } + /// Returns a stream of PushPromises /// /// # Panics diff --git a/src/codec/error.rs b/src/codec/error.rs index 0acb913e5..3bcd13205 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -49,6 +49,9 @@ pub enum UserError { /// Tries to send push promise to peer who has disabled server push PeerDisabledServerPush, + + /// Invalid status code for informational response (must be 1xx) + InvalidInformationalStatusCode, } // ===== impl SendError ===== @@ -97,6 +100,7 @@ impl fmt::Display for UserError { SendPingWhilePending => "send_ping before received previous pong", SendSettingsWhilePending => "sending SETTINGS before received previous ACK", PeerDisabledServerPush => "sending PUSH_PROMISE to peer who disabled server push", + InvalidInformationalStatusCode => "invalid informational status code", }) } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index bb044a66c..b1b696b3d 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -66,6 +66,7 @@ pub(super) enum Event { Headers(peer::PollMessage), Data(Bytes), Trailers(HeaderMap), + InformationalHeaders(peer::PollMessage), } #[derive(Debug)] @@ -264,6 +265,21 @@ impl Recv { // corresponding headers frame pushed to `stream.pending_recv`. self.pending_accept.push(stream); } + } else { + // This is an informational response (1xx status code) + // Convert to response and store it for polling + let message = counts + .peer() + .convert_poll_message(pseudo, fields, stream_id)?; + + tracing::debug!("Received informational response: {:?}", message); + + // Push the informational response onto the stream's recv buffer + // with a special event type so it can be polled separately + stream + .pending_recv + .push_back(&mut self.buffer, Event::InformationalHeaders(message)); + stream.notify_recv(); } Ok(()) @@ -324,24 +340,63 @@ impl Recv { ) -> Poll, proto::Error>> { use super::peer::PollMessage::*; - // If the buffer is not empty, then the first frame must be a HEADERS - // frame or the user violated the contract. - match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), - Some(_) => panic!("poll_response called after response returned"), - None => { - if !stream.state.ensure_recv_open()? { - proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id); - return Poll::Ready(Err(Error::library_reset( - stream.id, - Reason::PROTOCOL_ERROR, - ))); + // Skip over any interim informational headers to find the main response + loop { + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)), + Some(Event::InformationalHeaders(_)) => { + // Skip interim informational headers - they should be consumed by poll_informational + continue; } + Some(_) => panic!("poll_response called after response returned"), + None => { + if !stream.state.ensure_recv_open()? { + proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id); + return Poll::Ready(Err(Error::library_reset( + stream.id, + Reason::PROTOCOL_ERROR, + ))); + } - stream.recv_task = Some(cx.waker().clone()); - Poll::Pending + stream.recv_task = Some(cx.waker().clone()); + return Poll::Pending; + } + } + } + } + + /// Called by the client to get informational responses (1xx status codes) + pub fn poll_informational( + &mut self, + cx: &Context, + stream: &mut store::Ptr, + ) -> Poll, proto::Error>>> { + use super::peer::PollMessage::*; + + // Try to pop the front event and check if it's an informational response + // If it's not, we put it back + if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) { + match event { + Event::InformationalHeaders(Client(response)) => { + // Found an informational response, return it + return Poll::Ready(Some(Ok(response))); + } + other => { + // Not an informational response, put it back at the front + stream.pending_recv.push_front(&mut self.buffer, other); + } } } + + // No informational response available at the front + if stream.state.ensure_recv_open()? { + // Request to get notified once more frames arrive + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending + } else { + // No more frames will be received + Poll::Ready(None) + } } /// Transition the stream based on receiving trailers diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 6edb6b077..bab9363df 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -167,6 +167,45 @@ impl Send { Ok(()) } + /// Send interim informational headers (1xx responses) without changing stream state. + /// This allows multiple interim informational responses to be sent before the final response. + pub fn send_interim_informational_headers( + &mut self, + frame: frame::Headers, + buffer: &mut Buffer>, + stream: &mut store::Ptr, + _counts: &mut Counts, + task: &mut Option, + ) -> Result<(), UserError> { + tracing::trace!( + "send_informational_headers_direct; frame={:?}; stream_id={:?}", + frame, + frame.stream_id() + ); + + // Validate headers + Self::check_headers(frame.fields())?; + + // Ensure this is an informational response (1xx status code) + if !frame.is_informational() { + tracing::debug!("send_informational_headers_direct called with non-informational frame"); + return Err(UserError::UnexpectedFrameType); + } + + // Ensure the frame is not marked as end_stream for informational responses + if frame.is_end_stream() { + tracing::debug!("send_informational_headers_direct called with end_stream=true"); + return Err(UserError::UnexpectedFrameType); + } + + // Queue the frame for sending WITHOUT changing stream state + // This is the key difference from send_headers - we don't call stream.state.send_open() + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + Ok(()) + } + /// Send an explicit RST_STREAM frame pub fn send_reset( &mut self, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index af177ff4f..53e8b647c 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1150,6 +1150,42 @@ impl StreamRef { } } + pub fn send_informational_headers(&mut self, frame: frame::Headers) -> Result<(), UserError> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.opaque.key); + let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.counts.transition(stream, |counts, stream| { + // For informational responses (1xx), we need to send headers without + // changing the stream state. This allows multiple informational responses + // to be sent before the final response. + + // Validate that this is actually an informational response + if !frame.is_informational() { + return Err(UserError::UnexpectedFrameType); + } + + // Ensure the frame is not marked as end_stream for informational responses + if frame.is_end_stream() { + return Err(UserError::UnexpectedFrameType); + } + + // Send the interim informational headers directly to the buffer without state changes + // This bypasses the normal send_headers flow that would transition the stream state + actions.send.send_interim_informational_headers( + frame, + send_buffer, + stream, + counts, + &mut actions.task + ) + }) + } + pub fn send_response( &mut self, mut response: Response<()>, @@ -1334,6 +1370,16 @@ impl OpaqueStreamRef { me.actions.recv.poll_response(cx, &mut stream) } + + /// Called by a client to check for informational responses (1xx status codes) + pub fn poll_informational(&mut self, cx: &Context) -> Poll, proto::Error>>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_informational(cx, &mut stream) + } /// Called by a client to check for a pushed request. pub fn poll_pushed( &mut self, diff --git a/src/server.rs b/src/server.rs index 3ce3eb435..a2e630231 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1102,6 +1102,104 @@ impl Default for Builder { // ===== impl SendResponse ===== impl SendResponse { + /// Send an interim informational response (1xx status codes) + /// + /// This method can be called multiple times before calling `send_response()` + /// to send the final response. Only 1xx status codes are allowed. + /// + /// Interim informational responses are used to provide early feedback to the client + /// before the final response is ready. Common examples include: + /// - 100 Continue: Indicates the client should continue with the request + /// - 103 Early Hints: Provides early hints about resources to preload + /// + /// # Arguments + /// * `response` - HTTP response with 1xx status code and headers + /// + /// # Returns + /// * `Ok(())` - Interim Informational response sent successfully + /// * `Err(Error)` - Failed to send (invalid status code, connection error, etc.) + /// + /// # Examples + /// ```rust + /// use h2::server; + /// use http::{Response, StatusCode}; + /// + /// # async fn example(mut send_response: h2::server::SendResponse) -> Result<(), h2::Error> { + /// // Send 100 Continue before processing request body + /// let continue_response = Response::builder() + /// .status(StatusCode::CONTINUE) + /// .body(()) + /// .unwrap(); + /// send_response.send_informational(continue_response)?; + /// + /// // Later send the final response + /// let final_response = Response::builder() + /// .status(StatusCode::OK) + /// .body(()) + /// .unwrap(); + /// let _stream = send_response.send_response(final_response, false)?; + /// # Ok(()) + /// # } + /// ``` + /// + /// # Errors + /// This method will return an error if: + /// - The response status code is not in the 1xx range + /// - The final response has already been sent + /// - There is a connection-level error + pub fn send_informational(&mut self, response: Response<()>) -> Result<(), crate::Error> { + let stream_id = self.inner.stream_id(); + let status = response.status(); + + tracing::debug!( + "h2::send_informational called with status: {} on stream: {:?}", + status, + stream_id + ); + + // Validate that this is an informational response (1xx status code) + if !response.status().is_informational() { + tracing::debug!( + "h2::invalid informational status code: {} on stream: {:?}", + status, + stream_id + ); + // Return an error for invalid status codes + return Err(crate::Error::from(UserError::InvalidInformationalStatusCode)); + } + + tracing::trace!( + "h2::converting informational response to HEADERS frame for stream: {:?}", + stream_id + ); + + // Convert the response to a HEADERS frame without END_STREAM flag + // Use the proper Peer::convert_send_message method for informational responses + let frame = Peer::convert_send_message( + stream_id, + response, + false, // NOT end_of_stream for informational responses + ); + + tracing::trace!( + "h2::sending interim informational headers frame for stream: {:?}", + stream_id + ); + + // Use the proper H2 streams API for sending interim informational headers + // This bypasses the normal response flow and allows multiple informational responses + let result = self.inner + .send_informational_headers(frame) + .map_err(Into::into); + + match &result { + Ok(()) => tracing::debug!("h2::Successfully sent informational headers"), + Err(e) => tracing::debug!("h2::Failed to send informational headers: {:?}", e), + } + + result + } + /// Send a response to a client request. /// /// On success, a [`SendStream`] instance is returned. This instance can be diff --git a/tests/h2-tests/tests/informational_responses.rs b/tests/h2-tests/tests/informational_responses.rs new file mode 100644 index 000000000..50d9e5c55 --- /dev/null +++ b/tests/h2-tests/tests/informational_responses.rs @@ -0,0 +1,392 @@ +#![deny(warnings)] + +use futures::{future::poll_fn, StreamExt}; +use h2_support::prelude::*; +use http::{Response, StatusCode}; + +#[tokio::test] +async fn send_100_continue() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + // Send a POST request + client + .send_frame(frames::headers(1).request("POST", "https://example.com/")) + .await; + + // Expect 100 Continue response first + client + .recv_frame(frames::headers(1).response(StatusCode::CONTINUE)) + .await; + + // Send request body after receiving 100 Continue + client + .send_frame(frames::data(1, &b"request body"[..]).eos()) + .await; + + // Expect final response + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::POST); + + // Send 100 Continue informational response + let continue_response = Response::builder() + .status(StatusCode::CONTINUE) + .body(()) + .unwrap(); + stream.send_informational(continue_response).unwrap(); + + // Send final response + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn send_103_early_hints() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + // Send a GET request + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + + // Expect 103 Early Hints response first + client + .recv_frame( + frames::headers(1) + .response(StatusCode::EARLY_HINTS) + .field("link", "; rel=preload; as=style, ; rel=preload; as=script"), + ) + .await; + + // Expect final response + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Send 103 Early Hints informational response + let early_hints_response = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=style, ; rel=preload; as=script") + .body(()) + .unwrap(); + stream.send_informational(early_hints_response).unwrap(); + + // Send final response + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn send_multiple_informational_responses() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + client + .send_frame(frames::headers(1).request("POST", "https://example.com/")) + .await; + + // Expect 100 Continue + client + .recv_frame(frames::headers(1).response(StatusCode::CONTINUE)) + .await; + + client + .send_frame(frames::data(1, &b"request body"[..]).eos()) + .await; + + // Expect 103 Early Hints + client + .recv_frame( + frames::headers(1) + .response(StatusCode::EARLY_HINTS) + .field("link", "; rel=preload; as=style"), + ) + .await; + + // Expect final response + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::POST); + + // Send 100 Continue + let continue_response = Response::builder() + .status(StatusCode::CONTINUE) + .body(()) + .unwrap(); + stream.send_informational(continue_response).unwrap(); + + // Send 103 Early Hints + let early_hints_response = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=style") + .body(()) + .unwrap(); + stream.send_informational(early_hints_response).unwrap(); + + // Send final response + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn invalid_informational_status_returns_error() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + + // Should only receive the final response since invalid informational response errors out + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Try to send invalid informational response (200 is not 1xx) + // This should return an error + let invalid_response = Response::builder() + .status(StatusCode::OK) + .body(()) + .unwrap(); + let result = stream.send_informational(invalid_response); + + // Expect error for invalid status code + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("invalid informational status code")); + + // Send actual final response after error + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn client_poll_informational_responses() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let recv_settings = srv.assert_client_handshake().await; + assert_default_settings!(recv_settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + + // Send 103 Early Hints + srv.send_frame( + frames::headers(1) + .response(StatusCode::EARLY_HINTS) + .field("link", "; rel=preload"), + ) + .await; + + // Send final response + srv.send_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let client = async move { + let (client, connection) = client::handshake(io).await.expect("handshake"); + + let request = Request::builder() + .method("GET") + .uri("https://example.com/") + .body(()) + .unwrap(); + + let (mut response_future, _) = client + .ready() + .await + .unwrap() + .send_request(request, true) + .unwrap(); + + let conn_fut = async move { + connection.await.expect("connection error"); + }; + + let response_fut = async move { + // Poll for informational responses + loop { + match poll_fn(|cx| response_future.poll_informational(cx)).await { + Some(Ok(info_response)) => { + assert_eq!(info_response.status(), StatusCode::EARLY_HINTS); + assert_eq!( + info_response.headers().get("link").unwrap(), + "; rel=preload" + ); + break; + } + Some(Err(e)) => panic!("Error polling informational: {:?}", e), + None => break, + } + } + + // Get the final response + let response = response_future.await.expect("response error"); + assert_eq!(response.status(), StatusCode::OK); + }; + + join(conn_fut, response_fut).await; + }; + + join(srv, client).await; +} + +#[tokio::test] +async fn informational_responses_with_body_streaming() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + client + .send_frame(frames::headers(1).request("POST", "https://example.com/")) + .await; + + // Expect 100 Continue + client + .recv_frame(frames::headers(1).response(StatusCode::CONTINUE)) + .await; + + client + .send_frame(frames::data(1, &b"chunk1"[..])) + .await; + + // Expect 103 Early Hints while still receiving body + client + .recv_frame( + frames::headers(1) + .response(StatusCode::EARLY_HINTS) + .field("link", "; rel=preload"), + ) + .await; + + client + .send_frame(frames::data(1, &b"chunk2"[..]).eos()) + .await; + + // Expect final response with streaming body + client + .recv_frame(frames::headers(1).response(StatusCode::OK)) + .await; + + client + .recv_frame(frames::data(1, &b"response data"[..]).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::POST); + + // Send 100 Continue + let continue_response = Response::builder() + .status(StatusCode::CONTINUE) + .body(()) + .unwrap(); + stream.send_informational(continue_response).unwrap(); + + // Send 103 Early Hints while processing + let early_hints_response = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload") + .body(()) + .unwrap(); + stream.send_informational(early_hints_response).unwrap(); + + // Send final response with body + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + let mut send_stream = stream.send_response(rsp, false).unwrap(); + + send_stream + .send_data("response data".into(), true) + .unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +}