Skip to content

Generic Buffered/Streaming Response Type Constraints #995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
naftulikay opened this issue May 18, 2025 · 5 comments
Closed

Generic Buffered/Streaming Response Type Constraints #995

naftulikay opened this issue May 18, 2025 · 5 comments

Comments

@naftulikay
Copy link

I'm writing a fairly generic all-in-one Lambda function which will be handling many different types of events. For example, SQS, SNS, CloudWatch scheduled events, S3 events, SES events, and of course API Gateway requests/responses. I have an existing Rust Lambda doing this but it's incredibly old (from the crowbar days) so I'm essentially rewriting it from scratch.

I'm implementing tower::Service for my service directly and I'm trying to define its outputs so that I can return either a synchronous buffered response or a streaming response so I can have the flexibility of both. I'm aware that the only case that streaming is applicable is in API Gateway responses.

use bytes::Bytes;
use lambda_runtime::streaming::Body;
use lambda_runtime::{Error as LambdaError, StreamResponse};
use lambda_runtime::{FunctionResponse, LambdaEvent};
use std::pin::Pin;
use std::sync::Arc;
use tower::Service;

#[derive(Clone)]
pub struct LambdaService(Arc<LambdaServiceInner>);

impl LambdaService {
    pub fn new(id: impl Into<String>) -> Self {
        Self(Arc::new(LambdaServiceInner { id: id.into() }))
    }
}

struct LambdaServiceInner {
    id: String,
}

/// Represents a response to the Lambda invocation, either a buffered response or a streaming 
/// response.
// FIXME here is where the issue occurs
pub type LambdaResponse = FunctionResponse<Bytes, StreamResponse<Body>>;

impl Service<LambdaEvent<serde_json::Value>> for LambdaService {
    type Response = LambdaResponse;
    type Error = LambdaError;
    type Future = Pin<Box<dyn Future<Output=Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: LambdaEvent<serde_json::Value>) -> Self::Future {
        let cloned = self.clone();

        Box::pin(async move {
            cloned.on_event(req).await
        })
    }
}

impl LambdaService {
    pub async fn on_event(&self, event: LambdaEvent<serde_json::Value>) -> Result<LambdaResponse, LambdaError> {
        eprintln!("service id: {}; request_id: {}", self.0.id, event.context.request_id);

        let output = serde_json::json!({
            "context": event.context,
            "event": event.payload,
        });

        eprintln!("event: {}", serde_json::to_string_pretty(&output).unwrap());

        Ok(FunctionResponse::BufferedResponse(Vec::with_capacity(0).into()))
    }
}

My main function simply instantiates the service and calls lambda_runtime::run with it:

use api_lambda::LambdaService;
use lambda_runtime::Error as LambdaError;

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
    lambda_runtime::run(LambdaService::new("my-id")).await
}

When I try to compile this:

error[E0277]: the trait bound `Response<Body>: futures_core::stream::Stream` is not satisfied
   --> crates/api-lambda/src/main.rs:6:70
    |
6   |     lambda_runtime::run(LambdaService::new("my-id")).await
    |                                                                      ^^^^^ the trait `futures_core::stream::Stream` is not implemented for `Response<Body>`
    |
    = help: the following other types implement trait `futures_core::stream::Stream`:
              &mut S
              AssertUnwindSafe<S>
              Body
              Box<S>
              CallAll<Svc, S>
              CallAllUnordered<Svc, S>
              Pin<P>
              async_stream::async_stream::AsyncStream<T, U>
            and 111 others
note: required by a bound in `run`
   --> /path/to/dir/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/lambda_runtime-0.13.0/src/lib.rs:119:8
    |
111 | pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
    |              --- required by a bound in this function
...
119 |     S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
    |        ^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `run`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `api-lambda` (bin "api-lambda") due to 2 previous errors

Note my type alias definition for the response:

lambda_runtime::FunctionResponse<bytes::Bytes, lambda_runtime::StreamResponse<lambda_runtime::streaming::Body>>;

I've tried a number of different values for StreamResponse<S>:

  • lambda_runtime::streaming::Body
  • axum::body::Body
  • axum::body::BodyDataStream

All of these fail indicating:

the trait `futures_core::stream::Stream` is not implemented for `Response<S>`.

The issue is that it appears that for all of the above types, Stream is implemented, so I'm a bit confused as to why this is happening.

I'm happy to contribute an example showing a generic return over both outcomes if I can get some help wrangling the types.

@naftulikay
Copy link
Author

I think I've narrowed it down: it's lambda_runtime::streaming::Response that does not implement Stream, and this violates the type constraints.

Am I correct in this reading of it?

I'm not sure how to write a Stream implementation for StreamResponse because it contains a lambda_runtime::MetadataPrelude that might need to be written first.

@bnusunny
Copy link
Contributor

@naftulikay take a look at this file. It is for streaming a HTTP response.

@naftulikay
Copy link
Author

@bnusunny okay, taking a look, I'll try to see if I can do something similar and still be able to generically return either a fully buffered response or a streaming response.

@naftulikay
Copy link
Author

@bnusunny with a little bit of hammering, I was able to get the following working:

lib.rs

use bytes::Bytes;
use futures::Stream;
use lambda_runtime::{Error as LambdaError, FunctionResponse, StreamResponse};
use lambda_runtime::{LambdaEvent, MetadataPrelude};
use pin_project_lite::pin_project;
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::Service;

pub(crate) mod adapter;

#[derive(Clone)]
pub struct LambdaService(Arc<LambdaServiceInner>);

impl LambdaService {
    pub fn new(id: impl Into<String>) -> Self {
        Self(Arc::new(LambdaServiceInner { id: id.into() }))
    }
}

struct LambdaServiceInner {
    id: String,
}

/// Represents a response to the Lambda invocation, either a buffered response or a streaming
/// response.
pub type LambdaResponse = FunctionResponse<Bytes, HttpBodyStream<axum::body::Body>>;

impl Service<LambdaEvent<serde_json::Value>> for LambdaService {
    type Response = LambdaResponse;
    type Error = LambdaError;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        // NOTE this function determines whether this Service is ready to accept a request. if, for
        //      example, the service is single-threaded, this method could indicate that a request
        //      is in process and the caller should wait before calling it
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: LambdaEvent<serde_json::Value>) -> Self::Future {
        let cloned = self.clone();

        Box::pin(async move { cloned.on_event(req).await })
    }
}

impl LambdaService {
    pub async fn on_event(
        &self,
        event: LambdaEvent<serde_json::Value>,
    ) -> Result<LambdaResponse, LambdaError> {
        // FIXME we need to dispatch based on what kind of event it is
        eprintln!(
            "service id: {}; request_id: {}",
            self.0.id, event.context.request_id
        );

        let output = serde_json::json!({
            "context": event.context,
            "event": event.payload,
        });

        eprintln!("event: {}", serde_json::to_string_pretty(&output).unwrap());

        let body = axum::body::Body::new(serde_json::to_string_pretty(&output).unwrap());

        // NOTE we're just doing it like this to demonstrate we can return both
        if output.is_object() {
            Ok(FunctionResponse::StreamingResponse(StreamResponse {
                metadata_prelude: MetadataPrelude::default(),
                stream: HttpBodyStream { body },
            }))
        } else {
            Ok(FunctionResponse::BufferedResponse(vec![].into()))
        }
    }
}

pin_project! {
    /// Wrapper type for converting an axum response into something that the Lambda library will
    /// work with.
    pub struct HttpBodyStream<B> {
        #[pin]
        pub(crate) body: B,
    }
}

impl<B> Stream for HttpBodyStream<B>
where
    B: axum::body::HttpBody + Unpin + Send + 'static,
    B::Data: Into<Bytes> + Send,
    B::Error: Into<LambdaError> + Send + Debug,
{
    type Item = Result<B::Data, B::Error>;

    #[inline]
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match futures_util::ready!(self.as_mut().project().body.poll_frame(cx)?) {
            Some(frame) => match frame.into_data() {
                Ok(data) => Poll::Ready(Some(Ok(data))),
                Err(_frame) => Poll::Ready(None),
            },
            None => Poll::Ready(None),
        }
    }
}

main.rs

use api_lambda::LambdaService;
use lambda_runtime::Error as LambdaError;

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
    lambda_runtime::run(LambdaService::new("api.naftuli.wtf/local")).await
}

So yes, it does indeed appear possible to have a generic response system where we can return either a FunctionBody::BufferedResponse or a FunctionBody::StreamingResponse!

Copy link

This issue is now closed. Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants