Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service: Impl Service for async functions #657

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 12 additions & 26 deletions tower-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,36 +348,22 @@ pub trait Service<Request> {
fn call(&mut self, req: Request) -> Self::Future;
}

impl<'a, S, Request> Service<Request> for &'a mut S
impl<Request, F, Fut, T, E> Service<Request> for F
where
S: Service<Request> + 'a,
F: FnMut(Request) -> Fut,
Fut: Future<Output = Result<T, E>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
type Response = T;
type Error = E;
type Future = Fut;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
(**self).poll_ready(cx)
#[inline]
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
}

fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
}
}

impl<S, Request> Service<Request> for Box<S>
where
S: Service<Request> + ?Sized,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
(**self).poll_ready(cx)
}

fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
#[inline]
fn call(&mut self, req: Request) -> Self::Future {
self(req)
}
}
4 changes: 2 additions & 2 deletions tower/examples/tower-balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn gen_disco() -> impl Discover<
.iter()
.enumerate()
.map(|(instance, latency)| {
let svc = tower::service_fn(move |_| {
let svc = move |_| {
let start = Instant::now();

let maxms = u64::from(latency.subsec_millis())
Expand All @@ -131,7 +131,7 @@ fn gen_disco() -> impl Discover<
let latency = start.elapsed();
Ok(Rsp { latency, instance })
}
});
};

(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
})
Expand Down
52 changes: 0 additions & 52 deletions tower/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,58 +509,6 @@ impl<L> ServiceBuilder<L> {
self.layer.layer(service)
}

/// Wrap the async function `F` with the middleware provided by this [`ServiceBuilder`]'s
/// [`Layer`]s, returning a new [`Service`].
///
/// This is a convenience method which is equivalent to calling
/// [`ServiceBuilder::service`] with a [`service_fn`], like this:
///
/// ```rust
/// # use tower::{ServiceBuilder, service_fn};
/// # async fn handler_fn(_: ()) -> Result<(), ()> { Ok(()) }
/// # let _ = {
/// ServiceBuilder::new()
/// // ...
/// .service(service_fn(handler_fn))
/// # };
/// ```
///
/// # Example
///
/// ```rust
/// use std::time::Duration;
/// use tower::{ServiceBuilder, ServiceExt, BoxError, service_fn};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), BoxError> {
/// async fn handle(request: &'static str) -> Result<&'static str, BoxError> {
/// Ok(request)
/// }
///
/// let svc = ServiceBuilder::new()
/// .buffer(1024)
/// .timeout(Duration::from_secs(10))
/// .service_fn(handle);
///
/// let response = svc.oneshot("foo").await?;
///
/// assert_eq!(response, "foo");
/// # Ok(())
/// # }
/// ```
///
/// [`Layer`]: crate::Layer
/// [`Service`]: crate::Service
/// [`service_fn`]: crate::service_fn
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn service_fn<F>(self, f: F) -> L::Service
where
L: Layer<crate::util::ServiceFn<F>>,
{
self.service(crate::util::service_fn(f))
}

/// Check that the builder implements `Clone`.
///
/// This can be useful when debugging type errors in `ServiceBuilder`s with lots of layers.
Expand Down
2 changes: 1 addition & 1 deletion tower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ pub mod layer;
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
#[doc(inline)]
pub use self::util::{service_fn, ServiceExt};
pub use self::util::ServiceExt;

#[doc(inline)]
pub use crate::builder::ServiceBuilder;
Expand Down
5 changes: 2 additions & 3 deletions tower/src/make/make_service/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ opaque_future! {
mod tests {
use super::*;
use crate::make::MakeService;
use crate::service_fn;
use futures::future::poll_fn;

async fn echo<R>(req: R) -> Result<R, Infallible> {
Expand All @@ -115,7 +114,7 @@ mod tests {

#[tokio::test]
async fn as_make_service() {
let mut shared = Shared::new(service_fn(echo::<&'static str>));
let mut shared = Shared::new(echo::<&'static str>);

poll_fn(|cx| MakeService::<(), _>::poll_ready(&mut shared, cx))
.await
Expand All @@ -130,7 +129,7 @@ mod tests {

#[tokio::test]
async fn as_make_service_into_service() {
let shared = Shared::new(service_fn(echo::<&'static str>));
let shared = Shared::new(echo::<&'static str>);
let mut shared = MakeService::<(), _>::into_service(shared);

poll_fn(|cx| Service::<()>::poll_ready(&mut shared, cx))
Expand Down
29 changes: 29 additions & 0 deletions tower/src/util/by_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::task::{Context, Poll};

use tower_service::Service;

/// TODO
#[derive(Debug)]
pub struct ByRef<'a, S>(&'a mut S);

impl<'a, S> ByRef<'a, S> {
pub(crate) fn new(service: &'a mut S) -> Self {
Self(service)
}
}

impl<'a, S, Request> Service<Request> for ByRef<'a, S> where S: Service<Request> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(*self.0).poll_ready(cx)
}

#[inline]
fn call(&mut self, req: Request) -> Self::Future {
(*self.0).call(req)
}
}
28 changes: 18 additions & 10 deletions tower/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ mod map_request;
mod map_response;
mod map_result;

mod by_ref;
mod map_future;
mod oneshot;
mod optional;
mod ready;
mod service_fn;
mod then;

pub use self::{
and_then::{AndThen, AndThenLayer},
boxed::{BoxLayer, BoxService, UnsyncBoxService},
boxed_clone::BoxCloneService,
by_ref::ByRef,
either::Either,
future_service::{future_service, FutureService},
map_err::{MapErr, MapErrLayer},
Expand All @@ -33,7 +34,6 @@ pub use self::{
oneshot::Oneshot,
optional::Optional,
ready::{Ready, ReadyOneshot},
service_fn::{service_fn, ServiceFn},
then::{Then, ThenLayer},
};

Expand Down Expand Up @@ -951,17 +951,17 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # Example
///
/// ```
/// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
/// use tower::{Service, ServiceExt, BoxError, util::BoxService};
/// #
/// # struct Request;
/// # struct Response;
/// # impl Response {
/// # fn new() -> Self { Self }
/// # }
///
/// let service = service_fn(|req: Request| async {
/// Ok::<_, BoxError>(Response::new())
/// });
/// async fn service(req: Request) -> Result<Response, BoxError> {
/// Ok(Response::new())
/// }
///
/// let service: BoxService<Request, Response, BoxError> = service
/// .map_request(|req| {
Expand Down Expand Up @@ -997,17 +997,17 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
/// # Example
///
/// ```
/// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
/// use tower::{Service, ServiceExt, BoxError, util::BoxCloneService};
/// #
/// # struct Request;
/// # struct Response;
/// # impl Response {
/// # fn new() -> Self { Self }
/// # }
///
/// let service = service_fn(|req: Request| async {
/// Ok::<_, BoxError>(Response::new())
/// });
/// async fn service(req: Request) -> Result<Response, BoxError> {
/// Ok(Response::new())
/// }
///
/// let service: BoxCloneService<Request, Response, BoxError> = service
/// .map_request(|req| {
Expand Down Expand Up @@ -1036,6 +1036,14 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
{
BoxCloneService::new(self)
}

/// TODO
fn by_ref(&mut self) -> ByRef<'_, Self>
where
Self: Sized,
{
ByRef::new(self)
}
}

impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
Expand Down
20 changes: 16 additions & 4 deletions tower/src/util/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ where
/// [`Ready`] values are produced by [`ServiceExt::ready`].
///
/// [`ServiceExt::ready`]: crate::util::ServiceExt::ready
pub struct Ready<'a, T, Request>(ReadyOneshot<&'a mut T, Request>);
pub struct Ready<'a, T, Request> {
inner: Option<&'a mut T>,
_p: PhantomData<fn() -> Request>,
}

// Safety: This is safe for the same reason that the impl for ReadyOneshot is safe.
impl<'a, T, Request> Unpin for Ready<'a, T, Request> {}
Expand All @@ -78,7 +81,10 @@ where
{
#[allow(missing_docs)]
pub fn new(service: &'a mut T) -> Self {
Self(ReadyOneshot::new(service))
Self {
inner: Some(service),
_p: PhantomData,
}
}
}

Expand All @@ -89,7 +95,13 @@ where
type Output = Result<&'a mut T, T::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
ready!(self
.inner
.as_mut()
.expect("poll after Poll::Ready")
.poll_ready(cx))?;

Poll::Ready(Ok(self.inner.take().expect("poll after Poll::Ready")))
}
}

Expand All @@ -98,6 +110,6 @@ where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Ready").field(&self.0).finish()
f.debug_tuple("Ready").field(&self.inner).finish()
}
}
Loading