Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try out tokens #631

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions guides/building-a-middleware-from-scratch.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,16 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Token = S::Token;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
// Our middleware doesn't care about backpressure so its ready as long
// as the inner service is ready.
self.inner.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
fn call(&mut self, token: Self::Token, request: Request) -> Self::Future {
self.inner.call(request)
}
}
Expand All @@ -102,7 +103,7 @@ Creating both futures is done like this:
```rust
use tokio::time::sleep;

fn call(&mut self, request: Request) -> Self::Future {
fn call(&mut self, token: Self::Token, request: Request) -> Self::Future {
let response_future = self.inner.call(request);

// This variable has type `tokio::time::Sleep`.
Expand Down Expand Up @@ -151,11 +152,11 @@ where
// Use our new `ResponseFuture` type.
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
fn call(&mut self, token: Self::Token, request: Request) -> Self::Future {
let response_future = self.inner.call(request);
let sleep = tokio::time::sleep(self.timeout);

Expand Down Expand Up @@ -502,12 +503,12 @@ where
type Error = BoxError;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
// Have to map the error type here as well.
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, request: Request) -> Self::Future {
fn call(&mut self, token: Self::Token, request: Request) -> Self::Future {
let response_future = self.inner.call(request);
let sleep = tokio::time::sleep(self.timeout);

Expand Down Expand Up @@ -559,11 +560,11 @@ where
type Error = BoxError;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, request: Request) -> Self::Future {
fn call(&mut self, token: Self::Token, request: Request) -> Self::Future {
let response_future = self.inner.call(request);
let sleep = tokio::time::sleep(self.timeout);

Expand Down
4 changes: 2 additions & 2 deletions tower-layer/src/layer_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ use std::fmt;
/// type Error = S::Error;
/// type Future = S::Future;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
/// self.service.poll_ready(cx)
/// }
///
/// fn call(&mut self, request: Request) -> Self::Future {
/// fn call(&mut self, token: Self::Token, request: Request) -> Self::Future {
/// // Log the request
/// println!("request = {:?}, target = {:?}", request, self.target);
///
Expand Down
4 changes: 2 additions & 2 deletions tower-layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ pub use self::{
/// type Error = S::Error;
/// type Future = S::Future;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
/// self.service.poll_ready(cx)
/// }
///
/// fn call(&mut self, request: Request) -> Self::Future {
/// fn call(&mut self, token: Self::Token, request: Request) -> Self::Future {
/// // Insert log statement here or other functionality
/// println!("request = {:?}, target = {:?}", request, self.target);
/// self.service.call(request)
Expand Down
38 changes: 22 additions & 16 deletions tower-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! request / response clients and servers. It is simple but powerful and is
//! used as the foundation for the rest of Tower.

use std::any::Any;
use std::future::Future;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -56,11 +57,11 @@ use std::task::{Context, Poll};
/// type Error = http::Error;
/// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
/// Poll::Ready(Ok(()))
/// }
///
/// fn call(&mut self, req: Request<Vec<u8>>) -> Self::Future {
/// fn call(&mut self, token: Self::Token, req: Request<Vec<u8>>) -> Self::Future {
/// // create the body
/// let body: Vec<u8> = "hello, world!\n"
/// .as_bytes()
Expand Down Expand Up @@ -166,13 +167,13 @@ use std::task::{Context, Poll};
/// type Error = Box<dyn Error + Send + Sync>;
/// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
/// // Our timeout service is ready if the inner service is ready.
/// // This is how backpressure can be propagated through a tree of nested services.
/// self.inner.poll_ready(cx).map_err(Into::into)
/// }
///
/// fn call(&mut self, req: Request) -> Self::Future {
/// fn call(&mut self, token: Self::Token, req: Request) -> Self::Future {
/// // Create a future that completes after `self.timeout`
/// let timeout = tokio::time::sleep(self.timeout);
///
Expand Down Expand Up @@ -259,11 +260,11 @@ use std::task::{Context, Poll};
/// type Error = S::Error;
/// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
/// Poll::Ready(Ok(()))
/// }
///
/// fn call(&mut self, req: R) -> Self::Future {
/// fn call(&mut self, token: Self::Token, req: R) -> Self::Future {
/// let mut inner = self.inner.clone();
/// Box::pin(async move {
/// // `inner` might not be ready since its a clone
Expand Down Expand Up @@ -294,11 +295,11 @@ use std::task::{Context, Poll};
/// type Error = S::Error;
/// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
/// Poll::Ready(Ok(()))
/// }
///
/// fn call(&mut self, req: R) -> Self::Future {
/// fn call(&mut self, token: Self::Token, req: R) -> Self::Future {
/// let clone = self.inner.clone();
/// // take the service that was ready
/// let mut inner = std::mem::replace(&mut self.inner, clone);
Expand All @@ -315,6 +316,9 @@ pub trait Service<Request> {
/// Errors produced by the service.
type Error;

/// A token that allows you to `call` once.
type Token;

/// The future response value.
type Future: Future<Output = Result<Self::Response, Self::Error>>;

Expand All @@ -331,7 +335,7 @@ pub trait Service<Request> {
/// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a request may be dispatched to the
/// service using `call`. Until a request is dispatched, repeated calls to
/// `poll_ready` must return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>>;

/// Process the request and return the response asynchronously.
///
Expand All @@ -345,7 +349,7 @@ pub trait Service<Request> {
///
/// Implementations are permitted to panic if `call` is invoked without
/// obtaining `Poll::Ready(Ok(()))` from `poll_ready`.
fn call(&mut self, req: Request) -> Self::Future;
fn call(&mut self, token: Self::Token, req: Request) -> Self::Future;
}

impl<'a, S, Request> Service<Request> for &'a mut S
Expand All @@ -354,14 +358,15 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Token = S::Token;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, S::Error>> {
(**self).poll_ready(cx)
}

fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
fn call(&mut self, token: Self::Token, request: Request) -> S::Future {
(**self).call(token, request)
}
}

Expand All @@ -371,13 +376,14 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Token = S::Token;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, S::Error>> {
(**self).poll_ready(cx)
}

fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
fn call(&mut self, token: Self::Token, request: Request) -> S::Future {
(**self).call(token, request)
}
}
45 changes: 20 additions & 25 deletions tower-test/src/mock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub struct Mock<T, U> {
id: u64,
tx: Mutex<Tx<T, U>>,
state: Arc<Mutex<State>>,
can_send: bool,
}

/// Handle to the `Mock`.
Expand Down Expand Up @@ -107,20 +106,32 @@ pub fn pair<T, U>() -> (Mock<T, U>, Handle<T, U>) {
id: 0,
tx,
state: state.clone(),
can_send: false,
};

let handle = Handle { rx, state };

(mock, handle)
}

#[derive(Debug)]
pub struct Token(Arc<Mutex<State>>);

impl Drop for Token {
fn drop(&mut self) {
// TODO: Should probably avoid aborting if we're already panicking
let mut state = self.0.lock().unwrap();
// Give back a call, as this was dropped without calling call.
state.rem += 1;
}
}

impl<T, U> Service<T> for Mock<T, U> {
type Response = U;
type Error = Error;
type Token = Token;
type Future = ResponseFuture<U>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Token, Self::Error>> {
let mut state = self.state.lock().unwrap();

if state.is_closed {
Expand All @@ -131,17 +142,10 @@ impl<T, U> Service<T> for Mock<T, U> {
return Poll::Ready(Err(e));
}

if self.can_send {
return Poll::Ready(Ok(()));
}

if state.rem > 0 {
assert!(!state.tasks.contains_key(&self.id));

// Returning `Ready` means the next call to `call` must succeed.
self.can_send = true;

Poll::Ready(Ok(()))
state.rem -= 1;
Poll::Ready(Ok(Token(self.state.clone())))
} else {
// Bit weird... but whatevz
*state
Expand All @@ -153,24 +157,16 @@ impl<T, U> Service<T> for Mock<T, U> {
}
}

fn call(&mut self, request: T) -> Self::Future {
fn call(&mut self, token: Token, request: T) -> Self::Future {
// Make sure that the service has capacity
let mut state = self.state.lock().unwrap();
let state = self.state.lock().unwrap();

if state.is_closed {
return ResponseFuture::closed();
}

if !self.can_send {
panic!("service not ready; poll_ready must be called first");
}

self.can_send = false;

// Decrement the number of remaining requests that can be sent
if state.rem > 0 {
state.rem -= 1;
}
// "Use up" the token so that rem stays decremented.
std::mem::forget(token);

let (tx, rx) = oneshot::channel();
let send_response = SendResponse { tx };
Expand Down Expand Up @@ -204,7 +200,6 @@ impl<T, U> Clone for Mock<T, U> {
id,
tx,
state: self.state.clone(),
can_send: false,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions tower-test/src/mock/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<T> Spawn<T> {
}

/// Poll this service ready.
pub fn poll_ready<Request>(&mut self) -> Poll<Result<(), T::Error>>
pub fn poll_ready<Request>(&mut self) -> Poll<Result<T::Token, T::Error>>
where
T: Service<Request>,
{
Expand All @@ -42,11 +42,11 @@ impl<T> Spawn<T> {
}

/// Call the inner Service.
pub fn call<Request>(&mut self, req: Request) -> T::Future
pub fn call<Request>(&mut self, token: T::Token, req: Request) -> T::Future
where
T: Service<Request>,
{
self.inner.call(req)
self.inner.call(token, req)
}

/// Get the inner service.
Expand Down
9 changes: 3 additions & 6 deletions tower-test/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,20 @@ async fn single_request_ready() {

assert_pending!(handle.poll_request());

assert_ready!(service.poll_ready()).unwrap();
let token = assert_ready!(service.poll_ready()).unwrap();

let response = service.call("hello");
let response = service.call(token, "hello");

assert_request_eq!(handle, "hello").send_response("world");

assert_eq!(response.await.unwrap(), "world");
}

#[tokio::test(flavor = "current_thread")]
#[should_panic]
async fn backpressure() {
let (mut service, mut handle) = mock::spawn::<_, ()>();
let (mut service, mut handle) = mock::spawn::<&'static str, ()>();

handle.allow(0);

assert_pending!(service.poll_ready());

service.call("hello").await.unwrap();
}
7 changes: 4 additions & 3 deletions tower/src/balance/p2c/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,16 @@ where
{
type Response = Balance<S::Response, Req>;
type Error = S::Error;
type Token = S::Token;
type Future = MakeFuture<S::Future, Req>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Token, Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, target: Target) -> Self::Future {
fn call(&mut self, token: Self::Token, target: Target) -> Self::Future {
MakeFuture {
inner: self.inner.call(target),
inner: self.inner.call(token, target),
_marker: PhantomData,
}
}
Expand Down
Loading