From 049490bedddc0cd8363f5597169cac88e9b073fa Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Fri, 2 Apr 2021 18:15:42 +0200 Subject: [PATCH] feat: Add Remote service --- tower/Cargo.toml | 2 + tower/src/lib.rs | 3 + tower/src/remote/layer.rs | 79 ++++++++++++++++++++++++ tower/src/remote/mod.rs | 25 ++++++++ tower/src/remote/service.rs | 116 ++++++++++++++++++++++++++++++++++++ tower/src/remote/spawn.rs | 66 ++++++++++++++++++++ 6 files changed, 291 insertions(+) create mode 100644 tower/src/remote/layer.rs create mode 100644 tower/src/remote/mod.rs create mode 100644 tower/src/remote/service.rs create mode 100644 tower/src/remote/spawn.rs diff --git a/tower/Cargo.toml b/tower/Cargo.toml index 9f00e6018..4ccf58d60 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -37,6 +37,7 @@ full = [ "make", "ready-cache", "reconnect", + "remote", "retry", "spawn-ready", "steer", @@ -55,6 +56,7 @@ load-shed = [] make = ["tokio/io-std", "futures-util"] ready-cache = ["futures-util", "indexmap", "tokio/sync", "tracing"] reconnect = ["make", "tokio/io-std", "tracing"] +remote = ["buffer", "futures-util"] retry = ["tokio/time"] spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util", "tracing"] steer = ["futures-util"] diff --git a/tower/src/lib.rs b/tower/src/lib.rs index 552eff633..03bb65235 100644 --- a/tower/src/lib.rs +++ b/tower/src/lib.rs @@ -186,6 +186,9 @@ pub mod ready_cache; #[cfg(feature = "reconnect")] #[cfg_attr(docsrs, doc(cfg(feature = "reconnect")))] pub mod reconnect; +#[cfg(feature = "remote")] +#[cfg_attr(docsrs, doc(cfg(feature = "remote")))] +pub mod remote; #[cfg(feature = "retry")] #[cfg_attr(docsrs, doc(cfg(feature = "retry")))] pub mod retry; diff --git a/tower/src/remote/layer.rs b/tower/src/remote/layer.rs new file mode 100644 index 000000000..b1062feed --- /dev/null +++ b/tower/src/remote/layer.rs @@ -0,0 +1,79 @@ +use super::Remote; +use crate::BoxError; +use std::{ + fmt::{self, Debug, Formatter}, + marker::PhantomData, +}; +use tokio::runtime::Handle; +use tower_layer::Layer; +use tower_service::Service; + +/// Execute a service on a remote tokio executor. +/// +/// See the module documentation for more details. +pub struct RemoteLayer { + bound: usize, + handle: Handle, + _p: PhantomData, +} + +impl RemoteLayer { + /// Creates a new [`RemoteLayer`] with the provided `bound`. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + /// + /// The current Tokio executor is used to run the given service, which means that this method + /// must be called while on the Tokio runtime. + pub fn new(bound: usize) -> Self { + Self::with_handle(bound, Handle::current()) + } + + /// Creates a new [`RemoteLayer`] with the provided `bound`, spawning onto the runtime connected + /// to the given [`Handle`]. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + pub fn with_handle(bound: usize, handle: Handle) -> Self { + Self { + bound, + handle, + _p: PhantomData, + } + } +} + +impl Clone for RemoteLayer { + fn clone(&self) -> Self { + Self { + bound: self.bound, + handle: self.handle.clone(), + _p: self._p, + } + } +} + +impl Debug for RemoteLayer { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("RemoteLayer") + .field("bound", &self.bound) + .field("handle", &self.handle) + .finish() + } +} + +impl Layer for RemoteLayer +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: Send + 'static, + S::Error: 'static, + R: Send + 'static, + BoxError: From, +{ + type Service = Remote; + + fn layer(&self, service: S) -> Self::Service { + Remote::with_handle(service, self.bound, &self.handle) + } +} diff --git a/tower/src/remote/mod.rs b/tower/src/remote/mod.rs new file mode 100644 index 000000000..876a893dd --- /dev/null +++ b/tower/src/remote/mod.rs @@ -0,0 +1,25 @@ +//! Middleware that executes a service on a remote tokio executor. +//! +//! When multiple executors are running it's sometimes desirable to have a service execute on a +//! particular one, for example the one with the most worker threads or the one that supports +//! blocking operations via [`task::block_in_place`]. +//! +//! This module allows you to do that by placing the service behind a multi-producer, single- +//! consumer channel and spawning it onto an executor. The service then processes any requests sent +//! through the channel, spawning the futures covering their execution onto the remote executor. +//! +//! The result of a request is then transparently sent through another channel back to the client. +//! +//! [`task::block_in_place`]: tokio::task::block_in_place + +mod layer; +mod service; +mod spawn; + +pub use self::{layer::*, service::Remote}; + +/// Future types for the [`Remote`] middleware. +pub mod future { + #[doc(inline)] + pub use super::service::RemoteFuture; +} diff --git a/tower/src/remote/service.rs b/tower/src/remote/service.rs new file mode 100644 index 000000000..d2e846f5b --- /dev/null +++ b/tower/src/remote/service.rs @@ -0,0 +1,116 @@ +use super::spawn::{Spawn, SpawnFuture}; +use crate::{ + buffer::{future::ResponseFuture, Buffer}, + BoxError, +}; +use pin_project::pin_project; +use std::{ + fmt::{self, Debug, Formatter}, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::runtime::Handle; +use tower_service::Service; + +/// Execute a service on a remote tokio executor. +/// +/// See the module documentation for more details. +#[derive(Clone)] +pub struct Remote +where + T: Service, + T::Future: Send + 'static, + T::Response: Send + 'static, + T::Error: 'static, + BoxError: From, +{ + inner: Buffer, R>, +} + +/// A future that resolves to the response produced on the remote executor. +#[pin_project] +#[derive(Debug)] +pub struct RemoteFuture { + // Newtype around Buffer's future to hide the fact that we're using it under the hood. + #[pin] + inner: ResponseFuture>, +} + +impl Remote +where + T: Service + Send + 'static, + T::Future: Send + 'static, + T::Response: Send + 'static, + T::Error: 'static, + R: Send + 'static, + BoxError: From, +{ + /// Creates a new [`Remote`] wrapping `service` that spawns onto the current tokio runtime. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + /// + /// The current Tokio executor is used to run the given service, which means that this method + /// must be called while on the Tokio runtime. + pub fn new(service: T, bound: usize) -> Self { + Self::with_handle(service, bound, &Handle::current()) + } + + /// Creates a new [`Remote`] wrapping `service`, spawning onto the runtime that is connected + /// to the given [`Handle`]. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + pub fn with_handle(service: T, bound: usize, handle: &Handle) -> Self { + let (inner, worker) = Buffer::pair(Spawn::new(service, handle.clone()), bound); + handle.spawn(worker); + + Self { inner } + } +} + +impl Debug for Remote +where + T: Service, + T::Future: Send + 'static, + T::Response: Send + 'static, + T::Error: 'static, + BoxError: From, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Remote").finish() + } +} + +impl Service for Remote +where + T: Service, + T::Future: Send + 'static, + T::Response: Send + 'static, + T::Error: 'static, + BoxError: From, +{ + type Response = T::Response; + type Error = BoxError; + type Future = RemoteFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: R) -> Self::Future { + RemoteFuture { + inner: self.inner.call(req), + } + } +} + +impl Future for RemoteFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.inner.poll(cx) + } +} diff --git a/tower/src/remote/spawn.rs b/tower/src/remote/spawn.rs new file mode 100644 index 000000000..2225e19b1 --- /dev/null +++ b/tower/src/remote/spawn.rs @@ -0,0 +1,66 @@ +use crate::BoxError; +use futures_core::ready; +use futures_util::TryFutureExt; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::{runtime::Handle, task::JoinHandle}; +use tower_service::Service; + +/// A service that spawns the future from the inner service on the current tokio +/// executor. +#[derive(Clone, Debug)] +pub(crate) struct Spawn { + handle: Handle, + inner: T, +} + +/// A future that covers the execution of the spawned service future. +#[derive(Debug)] +pub(crate) struct SpawnFuture { + inner: JoinHandle>, +} + +impl Spawn { + /// Creates a new spawn service. + pub(crate) fn new(service: T, handle: Handle) -> Self { + Self { + inner: service, + handle, + } + } +} + +impl Service for Spawn +where + T: Service, + T::Future: Send + 'static, + T::Response: Send + 'static, + T::Error: 'static, + BoxError: From, +{ + type Response = T::Response; + type Error = BoxError; + type Future = SpawnFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: R) -> Self::Future { + let future = self.inner.call(req).map_err(BoxError::from); + let spawned = self.handle.spawn(future); + SpawnFuture { inner: spawned } + } +} + +impl Future for SpawnFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let response = ready!(Pin::new(&mut self.inner).poll(cx))??; + Poll::Ready(Ok(response)) + } +}