Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic "batteries-included" retry::Policys. #414

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tower-retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ tower-layer = "0.3"
tokio = { version = "0.2", features = ["time"] }
pin-project = "0.4"
futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false }

[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
tokio = { version = "0.2", features = ["macros", "test-util"] }
tokio-test = "0.2"
futures-util = { version = "0.3", default-features = false }
2 changes: 2 additions & 0 deletions tower-retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
pub mod budget;
pub mod future;
mod layer;
mod policies;
mod policy;

pub use crate::layer::RetryLayer;
pub use crate::policies::{RetryErrors, RetryLimit};
pub use crate::policy::Policy;

use crate::future::ResponseFuture;
Expand Down
61 changes: 61 additions & 0 deletions tower-retry/src/policies.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use super::Policy;
use futures_util::future;

/// A very basic retry policy with a limited number of retry attempts.
///
/// FIXME: explain why not to use this.
#[derive(Clone, Debug)]
pub struct RetryLimit {
remaining_tries: usize,
}

impl RetryLimit {
/// Create a policy with the given number of retry attempts.
pub fn new(retry_attempts: usize) -> Self {
RetryLimit {
remaining_tries: retry_attempts,
}
}
}

impl<Req: Clone, Res, E> Policy<Req, Res, E> for RetryLimit {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a way to traitize this? For example, http retries you may want to retry 500s but not 400s. In this case, if I were to apply this to hyper I would only be able to retry system errors not http errors. What do you think about providing some method to allow users to specify what they want to retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possibility would be to construct the RetryLimit with a closure F: FnMut(&E) -> bool; then callers who wanted to retry all errors could construct it as, e.g.,

let policy = RetryLimit::new(2, |_| true);

and callers who want to filter retries can stick whatever logic they want in the closure. (Does that seem like the right bound for the closure?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though since this only returns an error that wouldn't support retrying on http::Responses?

I think we could use some trait and a TraitFn adapter for a closure, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure what kind of trait you're thinking of. I think I'm missing something: if the user-supplied logic has access to the whole response and error, how would that be different from just implementing Policy::retry directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is closer to the related vector retrylogic thing I sent on discord, but the idea is that as a user I can just provide what i'd like to retry on instead of having to implement a retry policy myself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a way to traitize this? For example, http retries you may want to retry 500s but not 400s.

For example, finagle uses a ResponseClassifier in a similar way.

if self.remaining_tries > 0 {
Some(future::ready(RetryLimit {
remaining_tries: self.remaining_tries - 1,
}))
} else {
None
}
} else {
None
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}

/// A very basic retry policy that always retries failed requests.
///
/// FIXME: explain why not to use this.
#[derive(Clone, Debug)]
pub struct RetryErrors;

impl<Req: Clone, Res, E> Policy<Req, Res, E> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
} else {
None
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
45 changes: 2 additions & 43 deletions tower-retry/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,6 @@
use std::future::Future;

/// A "retry policy" to classify if a request should be retried.
///
/// # Example
///
/// ```
/// use tower_retry::Policy;
/// use futures_util::future;
///
/// type Req = String;
/// type Res = String;
///
/// struct Attempts(usize);
///
/// impl<E> Policy<Req, Res, E> for Attempts {
/// type Future = future::Ready<Self>;
///
/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
/// match result {
/// Ok(_) => {
/// // Treat all `Response`s as success,
/// // so don't retry...
/// None
/// },
/// Err(_) => {
/// // Treat all errors as failures...
/// // But we limit the number of attempts...
/// if self.0 > 0 {
/// // Try again!
/// Some(future::ready(Attempts(self.0 - 1)))
/// } else {
/// // Used all our attempts, no retry...
/// None
/// }
/// }
/// }
/// }
///
/// fn clone_request(&self, req: &Req) -> Option<Req> {
/// Some(req.clone())
/// }
/// }
/// ```
pub trait Policy<Req, Res, E>: Sized {
/// The `Future` type returned by `Policy::retry()`.
type Future: Future<Output = Self>;
Expand All @@ -50,9 +9,9 @@ pub trait Policy<Req, Res, E>: Sized {
/// This method is passed a reference to the original request, and either
/// the `Service::Response` or `Service::Error` from the inner service.
///
/// If the request should **not** be retried, return `None`.
/// If the request **should not** be retried, return `None`.
///
/// If the request *should* be retried, return `Some` future of a new
/// If the request **should** be retried, return `Some` future of a new
/// policy that would apply for the next request attempt.
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future>;
/// Tries to clone a request before being passed to the inner service.
Expand Down
40 changes: 2 additions & 38 deletions tower-retry/tests/retry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures_util::future;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
use tower_retry::Policy;
use tower_retry::{Policy, RetryErrors, RetryLimit};
use tower_test::{assert_request_eq, mock};

#[tokio::test]
Expand All @@ -22,7 +22,7 @@ async fn retry_errors() {

#[tokio::test]
async fn retry_limit() {
let (mut service, mut handle) = new_service(Limit(2));
let (mut service, mut handle) = new_service(RetryLimit::new(2));

assert_ready_ok!(service.poll_ready());

Expand Down Expand Up @@ -83,42 +83,6 @@ type Error = Box<dyn std::error::Error + Send + Sync>;
type Mock = mock::Mock<Req, Res>;
type Handle = mock::Handle<Req, Res>;

#[derive(Clone)]
struct RetryErrors;

impl Policy<Req, Res, Error> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
} else {
None
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(*req)
}
}

#[derive(Clone)]
struct Limit(usize);

impl Policy<Req, Res, Error> for Limit {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
if result.is_err() && self.0 > 0 {
Some(future::ready(Limit(self.0 - 1)))
} else {
None
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(*req)
}
}

#[derive(Clone)]
struct UnlessErr(InnerError);

Expand Down