Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,19 @@ impl ResponseFuture {
pub fn stream_id(&self) -> crate::StreamId {
crate::StreamId::from_internal(self.inner.stream_id())
}

/// Polls for informational responses (1xx status codes).
///
/// This method should be called before polling the main response future
/// to check for any informational responses that have been received.
///
/// Returns `Poll::Ready(Some(response))` if an informational response is available,
/// `Poll::Ready(None)` if no more informational responses are expected,
/// or `Poll::Pending` if no informational response is currently available.
pub fn poll_informational(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Response<()>, crate::Error>>> {
self.inner.poll_informational(cx).map_err(Into::into)
}

/// Returns a stream of PushPromises
///
/// # Panics
Expand Down
83 changes: 69 additions & 14 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub(super) enum Event {
Headers(peer::PollMessage),
Data(Bytes),
Trailers(HeaderMap),
InformationalHeaders(peer::PollMessage),
}

#[derive(Debug)]
Expand Down Expand Up @@ -264,6 +265,21 @@ impl Recv {
// corresponding headers frame pushed to `stream.pending_recv`.
self.pending_accept.push(stream);
}
} else {
// This is an informational response (1xx status code)
// Convert to response and store it for polling
let message = counts
.peer()
.convert_poll_message(pseudo, fields, stream_id)?;

tracing::debug!("Received informational response: {:?}", message);

// Push the informational response onto the stream's recv buffer
// with a special event type so it can be polled separately
stream
.pending_recv
.push_back(&mut self.buffer, Event::InformationalHeaders(message));
stream.notify_recv();
}

Ok(())
Expand Down Expand Up @@ -324,24 +340,63 @@ impl Recv {
) -> Poll<Result<Response<()>, proto::Error>> {
use super::peer::PollMessage::*;

// If the buffer is not empty, then the first frame must be a HEADERS
// frame or the user violated the contract.
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
Some(_) => panic!("poll_response called after response returned"),
None => {
if !stream.state.ensure_recv_open()? {
proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
return Poll::Ready(Err(Error::library_reset(
stream.id,
Reason::PROTOCOL_ERROR,
)));
// Skip over any interim informational headers to find the main response
loop {
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)),
Some(Event::InformationalHeaders(_)) => {
// Skip interim informational headers - they should be consumed by poll_informational
continue;
}
Some(_) => panic!("poll_response called after response returned"),
None => {
if !stream.state.ensure_recv_open()? {
proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
return Poll::Ready(Err(Error::library_reset(
stream.id,
Reason::PROTOCOL_ERROR,
)));
}

stream.recv_task = Some(cx.waker().clone());
Poll::Pending
stream.recv_task = Some(cx.waker().clone());
return Poll::Pending;
}
}
}
}

/// Called by the client to get informational responses (1xx status codes)
pub fn poll_informational(
&mut self,
cx: &Context,
stream: &mut store::Ptr,
) -> Poll<Option<Result<Response<()>, proto::Error>>> {
use super::peer::PollMessage::*;

// Try to pop the front event and check if it's an informational response
// If it's not, we put it back
if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) {
match event {
Event::InformationalHeaders(Client(response)) => {
// Found an informational response, return it
return Poll::Ready(Some(Ok(response)));
}
other => {
// Not an informational response, put it back at the front
stream.pending_recv.push_front(&mut self.buffer, other);
}
}
}

// No informational response available at the front
if stream.state.ensure_recv_open()? {
// Request to get notified once more frames arrive
stream.recv_task = Some(cx.waker().clone());
Poll::Pending
} else {
// No more frames will be received
Poll::Ready(None)
}
}

/// Transition the stream based on receiving trailers
Expand Down
39 changes: 39 additions & 0 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,45 @@ impl Send {
Ok(())
}

/// Send interim informational headers (1xx responses) without changing stream state.
/// This allows multiple interim informational responses to be sent before the final response.
pub fn send_interim_informational_headers<B>(
&mut self,
frame: frame::Headers,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
_counts: &mut Counts,
task: &mut Option<Waker>,
) -> Result<(), UserError> {
tracing::trace!(
"send_informational_headers_direct; frame={:?}; stream_id={:?}",
frame,
frame.stream_id()
);

// Validate headers
Self::check_headers(frame.fields())?;

// Ensure this is an informational response (1xx status code)
if !frame.is_informational() {
tracing::debug!("send_informational_headers_direct called with non-informational frame");
return Err(UserError::UnexpectedFrameType);
}

// Ensure the frame is not marked as end_stream for informational responses
if frame.is_end_stream() {
tracing::debug!("send_informational_headers_direct called with end_stream=true");
return Err(UserError::UnexpectedFrameType);
}

// Queue the frame for sending WITHOUT changing stream state
// This is the key difference from send_headers - we don't call stream.state.send_open()
self.prioritize
.queue_frame(frame.into(), buffer, stream, task);

Ok(())
}

/// Send an explicit RST_STREAM frame
pub fn send_reset<B>(
&mut self,
Expand Down
46 changes: 46 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,42 @@ impl<B> StreamRef<B> {
}
}

pub fn send_informational_headers(&mut self, frame: frame::Headers) -> Result<(), UserError> {
let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me;

let stream = me.store.resolve(self.opaque.key);
let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;

me.counts.transition(stream, |counts, stream| {
// For informational responses (1xx), we need to send headers without
// changing the stream state. This allows multiple informational responses
// to be sent before the final response.

// Validate that this is actually an informational response
if !frame.is_informational() {
return Err(UserError::UnexpectedFrameType);
}

// Ensure the frame is not marked as end_stream for informational responses
if frame.is_end_stream() {
return Err(UserError::UnexpectedFrameType);
}

// Send the interim informational headers directly to the buffer without state changes
// This bypasses the normal send_headers flow that would transition the stream state
actions.send.send_interim_informational_headers(
frame,
send_buffer,
stream,
counts,
&mut actions.task
)
})
}

pub fn send_response(
&mut self,
mut response: Response<()>,
Expand Down Expand Up @@ -1334,6 +1370,16 @@ impl OpaqueStreamRef {

me.actions.recv.poll_response(cx, &mut stream)
}

/// Called by a client to check for informational responses (1xx status codes)
pub fn poll_informational(&mut self, cx: &Context) -> Poll<Option<Result<Response<()>, proto::Error>>> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let mut stream = me.store.resolve(self.key);

me.actions.recv.poll_informational(cx, &mut stream)
}
/// Called by a client to check for a pushed request.
pub fn poll_pushed(
&mut self,
Expand Down
99 changes: 99 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,105 @@ impl Default for Builder {
// ===== impl SendResponse =====

impl<B: Buf> SendResponse<B> {
/// Send an interim informational response (1xx status codes)
///
/// This method can be called multiple times before calling `send_response()`
/// to send the final response. Only 1xx status codes are allowed.
///
/// Interim informational responses are used to provide early feedback to the client
/// before the final response is ready. Common examples include:
/// - 100 Continue: Indicates the client should continue with the request
/// - 103 Early Hints: Provides early hints about resources to preload
///
/// # Arguments
/// * `response` - HTTP response with 1xx status code and headers
///
/// # Returns
/// * `Ok(())` - Interim Informational response sent successfully
/// * `Err(Error)` - Failed to send (invalid status code, connection error, etc.)
///
/// # Examples
/// ```rust
/// use h2::server;
/// use http::{Response, StatusCode};
///
/// # async fn example(mut send_response: h2::server::SendResponse<bytes::Bytes>) -> Result<(), h2::Error> {
/// // Send 100 Continue before processing request body
/// let continue_response = Response::builder()
/// .status(StatusCode::CONTINUE)
/// .body(())
/// .unwrap();
/// send_response.send_informational(continue_response)?;
///
/// // Later send the final response
/// let final_response = Response::builder()
/// .status(StatusCode::OK)
/// .body(())
/// .unwrap();
/// let _stream = send_response.send_response(final_response, false)?;
/// # Ok(())
/// # }
/// ```
///
/// # Errors
/// This method will return an error if:
/// - The response status code is not in the 1xx range
/// - The final response has already been sent
/// - There is a connection-level error
pub fn send_informational(&mut self, response: Response<()>) -> Result<(), crate::Error> {
let stream_id = self.inner.stream_id();
let status = response.status();

tracing::debug!(
"h2::send_informational called with status: {} on stream: {:?}",
status,
stream_id
);

// Validate that this is an informational response (1xx status code)
if !response.status().is_informational() {
tracing::debug!(
"h2::ignoring non-informational response with status: {} on stream: {:?}",
status,
stream_id
);
// Instead of returning an error, just ignore invalid informational responses
// This allows the stream to continue working even if invalid responses are sent
return Ok(());
}

tracing::trace!(
"h2::converting informational response to HEADERS frame for stream: {:?}",
stream_id
);

// Convert the response to a HEADERS frame without END_STREAM flag
// Use the proper Peer::convert_send_message method for informational responses
let frame = Peer::convert_send_message(
stream_id,
response,
false, // NOT end_of_stream for informational responses
);

tracing::trace!(
"h2::sending interim informational headers frame for stream: {:?}",
stream_id
);

// Use the proper H2 streams API for sending interim informational headers
// This bypasses the normal response flow and allows multiple informational responses
let result = self.inner
.send_informational_headers(frame)
.map_err(Into::into);

match &result {
Ok(()) => tracing::debug!("h2::Successfully sent informational headers"),
Err(e) => tracing::debug!("h2::Failed to send informational headers: {:?}", e),
}

result
}

/// Send a response to a client request.
///
/// On success, a [`SendStream`] instance is returned. This instance can be
Expand Down
Loading
Loading