Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry: Add StandardRetryPolicy and standard_policy mod #698

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ load-shed = ["__common"]
make = ["futures-util", "pin-project-lite", "tokio/io-std"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
reconnect = ["make", "tokio/io-std", "tracing"]
retry = ["__common", "tokio/time", "util"]
retry = ["__common", "tokio/time", "util", "tracing"]
spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
steer = []
timeout = ["pin-project-lite", "tokio/time"]
Expand Down
12 changes: 7 additions & 5 deletions tower/src/retry/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait MakeBackoff {
type Backoff: Backoff;

/// Constructs a new backoff type.
fn make_backoff(&mut self) -> Self::Backoff;
fn make_backoff(&self) -> Self::Backoff;
}

/// A backoff trait where a single mutable reference represents a single
Expand Down Expand Up @@ -120,7 +120,7 @@ where
{
type Backoff = ExponentialBackoff<R>;

fn make_backoff(&mut self) -> Self::Backoff {
fn make_backoff(&self) -> Self::Backoff {
ExponentialBackoff {
max: self.max,
min: self.min,
Expand Down Expand Up @@ -179,6 +179,8 @@ where

self.iterations += 1;

tracing::trace!(next_backoff_ms = %next.as_millis(), "Next backoff");

tokio::time::sleep(next)
}
}
Expand Down Expand Up @@ -217,7 +219,7 @@ mod tests {
let min = time::Duration::from_millis(min_ms);
let max = time::Duration::from_millis(max_ms);
let rng = HasherRng::default();
let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
Err(_) => return TestResult::discard(),
Ok(backoff) => backoff,
};
Expand All @@ -231,7 +233,7 @@ mod tests {
let min = time::Duration::from_millis(min_ms);
let max = time::Duration::from_millis(max_ms);
let rng = HasherRng::default();
let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) {
Err(_) => return TestResult::discard(),
Ok(backoff) => backoff,
};
Expand All @@ -246,7 +248,7 @@ mod tests {
let base = time::Duration::from_millis(base_ms);
let max = time::Duration::from_millis(max_ms);
let rng = HasherRng::default();
let mut backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) {
let backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) {
Err(_) => return TestResult::discard(),
Ok(backoff) => backoff,
};
Expand Down
14 changes: 14 additions & 0 deletions tower/src/retry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
//! Middleware for retrying "failed" requests.
//!
//! # Batteries-Included Features
//!
//! The [`standard_policy`] module contains a default retry [`Policy`] that can
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I would call the standard_policy module a "default" unless it's used by default. calling it "batteries-included" or something might be better?

//! be used with the [`Retry`] middleware. For more information, see the module
//! docs for [`standard_policy`].
//!
//! The [`backoff`] module contains utilities for implementing backoffs —
//! which determine how long to wait between retries — in a custom [`Policy`].
//!
//! The [`budget`] module contains utilities to reduce the amount of concurrent
//! retries made by a tower middleware stack. The goal for this is to reduce
//! congestive collapse when downstream systems degrade.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could maybe be nice to make "congestive collapse" a link to Wikipedia or something?


pub mod backoff;
pub mod budget;
pub mod future;
mod layer;
mod policy;
pub mod standard_policy;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I feel about calling this "standard policy"; I might prefer a name that actually describes what the policy does, rather than just saying it's "standard"...


pub use self::layer::RetryLayer;
pub use self::policy::Policy;
Expand Down
301 changes: 301 additions & 0 deletions tower/src/retry/standard_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
//! A standard retry policy that combines many of the `retry` module's utilities
//! together in a production-ready retry policy.
//!
//! # Defaults
//!
//! - [`ExponentialBackoffMaker`] is the default type for `B`.
//! - [`Budget`]'s default implementation is used.
//! - [`IsRetryable`] by default will always return `false`.
//! - [`CloneRequest`] by default will always return `None`.
Comment on lines +8 to +9
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems kind of unfortunate to me that the default behavior for the batteries-included retry policy is to...never retry anything. In particular, defaulting to a CloneRequest implementation that always returns None feels a bit weird to me.

What do you think about an API where the default implementation expects that the request type implements Clone, and just calls its Clone method? That way, users whose requests implement Clone never have to think about the CloneRequest trait, and providing a CloneRequest implementation is only necessary if you want to retry !Clone requests?

//!
//! # Backoff
//!
//! The [`StandardRetryPolicy`] takes some [`MakeBackoff`] and will make a
//! [`Backoff`] for each request "session" (a request session is the initial
//! request and any subsequent requests). It will then supply the backoff future
//! to the retry middlware. Usually, this future is the [`tokio::time::Sleep`]
//! type.
//!
//! # Budget
//!
//! The [`StandardRetryPolicy`] uses the [`Budget`] type to ensure that for each
//! produced policy that this client will not overwhelm downstream services.
//! Check the docs of [`Budget`] to understand what the defaults are and why they
//! were chosen.
//!
//! # Example
//!
//! ```
//!# use tower::retry::standard_policy::StandardRetryPolicy;
//!# use tower::retry::budget::Budget;
//!
//! let policy = StandardRetryPolicy::<(), (), ()>::builder()
//! .should_retry(|res: &mut Result<(), ()>| true)
//! .clone_request(|req: &()| Some(*req))
//! .budget(Budget::default())
//! .build();
//! ```
use std::{fmt, marker::PhantomData, sync::Arc};

use super::{
backoff::{Backoff, ExponentialBackoffMaker, MakeBackoff},
budget::Budget,
};
use crate::retry::Policy;

/// A trait to determine if the request associated with the response should be
/// retried by [`StandardRetryPolicy`].
///
/// # Closure
///
/// This trait provides a blanket implementation for a closure of the type
/// `Fn(&mut Result<Res, E>) -> bool + Send + Sync + 'static`.
pub trait IsRetryable<Res, E>: Send + Sync + 'static {
/// Return `true` if the request associated with the response should be
/// retried and `false` if it should not be retried.
fn is_retryable(&self, response: &mut Result<Res, E>) -> bool;
}

/// A trait to clone a request for the [`StandardRetryPolicy`].
///
/// # Closure
///
/// This trait provides a blanket implementation for a closure of the type
/// `Fn(&Req) -> Option<Req> + Send + Sync + 'static`.
pub trait CloneRequest<Req>: Send + Sync + 'static {
/// Clone a request, if `None` is returned the request will not be retried.
fn clone_request(&self, request: &Req) -> Option<Req>;
}
Comment on lines +59 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like there probably ought to be an implementation of this that's impl CloneRequest<Req> where Req: Clone that just calls Clone. Perhaps that ought to be the default?


impl<Res, E, F> IsRetryable<Res, E> for F
where
F: Fn(&mut Result<Res, E>) -> bool + Send + Sync + 'static,
{
fn is_retryable(&self, response: &mut Result<Res, E>) -> bool {
(self)(response)
}
}

impl<Req, F> CloneRequest<Req> for F
where
F: Fn(&Req) -> Option<Req> + Send + Sync + 'static,
{
fn clone_request(&self, request: &Req) -> Option<Req> {
(self)(request)
}
}

/// A standard retry [`Policy`] that combines a retry budget and a backoff
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "retry budget" and "backoff" here should link to the respective modules?

/// mechanism to produce a safe retry policy that prevents congestive collapse
/// and retry storms.
///
/// This type is constructed with the [`StandardRetryPolicyBuilder`].
pub struct StandardRetryPolicy<Req, Res, E, B = ExponentialBackoffMaker>
where
B: MakeBackoff,
{
is_retryable: Arc<dyn IsRetryable<Res, E>>,
clone_request: Arc<dyn CloneRequest<Req>>,
Comment on lines +97 to +98
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little bit on the fence about whether always Arcing these is the right choice or not --- a question that's definitely come up in the past with other middleware that needs to clone a Fn value. for example, MapRequest is generic over a F: FnMut(...) + Clone, and MapResponse is generic over a FnOnce + Clone, rather than Arcing a Fn...

F: FnMut(R1) -> R2 + Clone,

in this case, since the functions are always cloned into each request, it wouldn't make sense to allow FnMut + Clone, since the mutable state wouldn't be shared across instances of the closure. but, it would allow the use of function pointers (not closures) to avoid allocating an Arc and bumping two reference counts on every request...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, maybe a microoptimization, but it could be worth considering combining the IsRetryable and CloneRequest impls into a single struct that's then Arced, so that we only increment/decrement one atomic ref count on each request, rather than two...but, OTTOH, we would probably have to either box them or make the struct generic, if we did that, and boxing them would mean two heap ptr derefs...:woman_shrugging:

budget: Arc<Budget>,
make_backoff: B,
current_backoff: B::Backoff,
_pd: PhantomData<fn(Req, Res, E)>,
}

/// Builder for [`StandardRetryPolicy`].
///
/// This type can constructed from the `StandardRetryPolicy::builder` function.
pub struct StandardRetryPolicyBuilder<Req, Res, E, B = ExponentialBackoffMaker> {
is_retryable: Arc<dyn IsRetryable<Res, E>>,
clone_request: Arc<dyn CloneRequest<Req>>,
make_backoff: B,
budget: Arc<Budget>,
}

impl<Req, Res, E, B: MakeBackoff> StandardRetryPolicyBuilder<Req, Res, E, B> {
/// Sets the retry decision maker.
///
/// # Default
///
/// By default, this will be set to an [`IsRetryable`] implementation that
/// always returns false and thus will not retry requests.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .should_retry(|res: &mut Result<(), ()>| true)
/// .build();
/// ```
pub fn should_retry(mut self, f: impl IsRetryable<Res, E> + 'static) -> Self {
self.is_retryable = Arc::new(f);
self
}

/// Sets the clone request handler.
///
/// # Default
///
/// By default, this will be set to a [`CloneRequest`] implementation that
/// will never clone the request and will always return `None`.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .clone_request(|req: &()| Some(*req))
/// .build();
/// ```
pub fn clone_request(mut self, f: impl CloneRequest<Req> + 'static) -> Self {
self.clone_request = Arc::new(f);
self
}

/// Sets the backoff maker.
///
/// # Default
///
/// By default, this will be set to [`ExponentialBackoffMaker`]'s default
/// implementation.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// # use tower::retry::backoff::ExponentialBackoffMaker;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .make_backoff(ExponentialBackoffMaker::default())
/// .build();
/// ```
pub fn make_backoff<B2>(self, backoff: B2) -> StandardRetryPolicyBuilder<Req, Res, E, B2> {
StandardRetryPolicyBuilder {
make_backoff: backoff,
is_retryable: self.is_retryable,
clone_request: self.clone_request,
budget: self.budget,
}
}

/// Sets the budget.
pub fn budget(mut self, budget: impl Into<Arc<Budget>>) -> Self {
self.budget = budget.into();
self
}

/// Consume this builder and produce a [`StandardRetryPolicy`] with this
/// builders configured settings.
///
/// # Default
///
/// By default, this will be set to `Budget::default()`.
///
/// # Example
///
/// ```
/// # use tower::retry::standard_policy::StandardRetryPolicy;
/// # use tower::retry::budget::Budget;
/// // Set the Req, Res, and E type to () for simplicity, replace these with
/// // your specific request/response/error types.
/// StandardRetryPolicy::<(), (), ()>::builder()
/// .budget(Budget::default())
/// .build();
/// ```
pub fn build(self) -> StandardRetryPolicy<Req, Res, E, B> {
let current_backoff = self.make_backoff.make_backoff();

StandardRetryPolicy {
is_retryable: self.is_retryable,
clone_request: self.clone_request,
make_backoff: self.make_backoff,
current_backoff,
budget: self.budget,
_pd: PhantomData,
}
}
}

impl<Req, Res, E> StandardRetryPolicy<Req, Res, E, ExponentialBackoffMaker> {
/// Create a [`StandardRetryPolicyBuilder`].
pub fn builder() -> StandardRetryPolicyBuilder<Req, Res, E> {
StandardRetryPolicyBuilder {
is_retryable: Arc::new(|_: &mut Result<Res, E>| false),
clone_request: Arc::new(|_: &Req| None),
make_backoff: ExponentialBackoffMaker::default(),
budget: Arc::new(Budget::default()),
}
}
}

impl<Req, Res, E, B> Policy<Req, Res, E> for StandardRetryPolicy<Req, Res, E, B>
where
B: MakeBackoff,
Req: 'static,
Res: 'static,
E: 'static,
{
type Future = <B::Backoff as Backoff>::Future;

fn retry(&mut self, _req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
let can_retry = self.is_retryable.is_retryable(result);

if !can_retry {
tracing::trace!("Received non-retryable response");
self.budget.deposit();
return None;
}

let can_withdraw = self.budget.withdraw().is_ok();

if !can_withdraw {
tracing::trace!("Unable to withdraw from budget");
return None;
}

tracing::trace!("Withdrew from retry budget");

Some(self.current_backoff.next_backoff())
}

fn clone_request(&mut self, req: &Req) -> Option<Req> {
self.clone_request.clone_request(req)
}
}

impl<Req, Res, E, B> Clone for StandardRetryPolicy<Req, Res, E, B>
where
B: MakeBackoff + Clone,
{
fn clone(&self) -> Self {
Self {
is_retryable: self.is_retryable.clone(),
clone_request: self.clone_request.clone(),
budget: self.budget.clone(),
make_backoff: self.make_backoff.clone(),
current_backoff: self.make_backoff.make_backoff(),
_pd: PhantomData,
}
}
}

impl<Req, Res, E, B: MakeBackoff + fmt::Debug> fmt::Debug for StandardRetryPolicy<Req, Res, E, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StandardRetryPolicy")
.field("budget", &self.budget)
.field("make_backoff", &self.make_backoff)
.finish()
}
}

impl<Req, Res, E, B: fmt::Debug> fmt::Debug for StandardRetryPolicyBuilder<Req, Res, E, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StandardRetryPolicyBuilder").finish()
}
}
Loading