From 70544a26ff9ea6c038aed0c384ae0a6c70743a4b Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 7 Oct 2022 10:17:33 -0400 Subject: [PATCH 1/5] retry: Add `StandardRetryPolicy` and `standard_policy` mod This PR adds a new `standard_policy` module within the retry module that provides a batteries included policy to be used with the retry middleware. The policy combines the `Budget` type and generic backoff utlities from the `backoff` module to provide an easy to use policy with good defaults. This PR also includes a `StandardRetryPolicyBuilder` as well as two new traits `IsRetryable` and `CloneRequest`. These each have blanket impls for closures. The reason that this implementation breaks these out into two different traits is to allow `tower-http` to provide a custom `CloneRequest` implementation that will be able to clone some sort of `ReplayBody` and let the user pass in the retry decision implementation. Ref #682 --- tower/src/retry/backoff.rs | 12 +- tower/src/retry/mod.rs | 14 ++ tower/src/retry/standard_policy.rs | 297 +++++++++++++++++++++++++++++ tower/tests/retry/main.rs | 42 +++- 4 files changed, 359 insertions(+), 6 deletions(-) create mode 100644 tower/src/retry/standard_policy.rs diff --git a/tower/src/retry/backoff.rs b/tower/src/retry/backoff.rs index 306723eda..5f2d3bd80 100644 --- a/tower/src/retry/backoff.rs +++ b/tower/src/retry/backoff.rs @@ -23,7 +23,7 @@ pub trait MakeBackoff { type Backoff: Backoff; /// Constructs a new backoff type. - fn make_backoff(&mut self) -> Self::Backoff; + fn make_backoff(&self) -> Self::Backoff; } /// A backoff trait where a single mutable reference represents a single @@ -120,7 +120,7 @@ where { type Backoff = ExponentialBackoff; - fn make_backoff(&mut self) -> Self::Backoff { + fn make_backoff(&self) -> Self::Backoff { ExponentialBackoff { max: self.max, min: self.min, @@ -179,6 +179,8 @@ where self.iterations += 1; + tracing::trace!(next_backoff_ms = %next.as_millis(), "Next backoff"); + tokio::time::sleep(next) } } @@ -217,7 +219,7 @@ mod tests { let min = time::Duration::from_millis(min_ms); let max = time::Duration::from_millis(max_ms); let rng = HasherRng::default(); - let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { + let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { Err(_) => return TestResult::discard(), Ok(backoff) => backoff, }; @@ -231,7 +233,7 @@ mod tests { let min = time::Duration::from_millis(min_ms); let max = time::Duration::from_millis(max_ms); let rng = HasherRng::default(); - let mut backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { + let backoff = match ExponentialBackoffMaker::new(min, max, 0.0, rng) { Err(_) => return TestResult::discard(), Ok(backoff) => backoff, }; @@ -246,7 +248,7 @@ mod tests { let base = time::Duration::from_millis(base_ms); let max = time::Duration::from_millis(max_ms); let rng = HasherRng::default(); - let mut backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) { + let backoff = match ExponentialBackoffMaker::new(base, max, jitter, rng) { Err(_) => return TestResult::discard(), Ok(backoff) => backoff, }; diff --git a/tower/src/retry/mod.rs b/tower/src/retry/mod.rs index 3abd94fca..763cb7d81 100644 --- a/tower/src/retry/mod.rs +++ b/tower/src/retry/mod.rs @@ -1,10 +1,24 @@ //! Middleware for retrying "failed" requests. +//! +//! # Batteries included features +//! +//! The [`standard_policy`] module contains a default retry [`Policy`] that can +//! be used with the [`Retry`] middleware. For more information check the module +//! docs for [`standard_policy`]. +//! +//! The [`backoff`] module contains backoff utlities that can be used in a +//! custom [`Policy`]. +//! +//! The [`budget`] module contains utilities to reduce the amount of concurrent +//! retries made by a tower middleware stack. The goal for this is to reduce +//! congestive collapse when downstream systems degrade. pub mod backoff; pub mod budget; pub mod future; mod layer; mod policy; +pub mod standard_policy; pub use self::layer::RetryLayer; pub use self::policy::Policy; diff --git a/tower/src/retry/standard_policy.rs b/tower/src/retry/standard_policy.rs new file mode 100644 index 000000000..74bf4bc36 --- /dev/null +++ b/tower/src/retry/standard_policy.rs @@ -0,0 +1,297 @@ +//! A standard retry policy that combines many of the `retry` module's utilities +//! together in a production ready retry policy. +//! +//! # Defaults +//! +//! - [`ExponentialBackoffMaker`] is the default type for `B`. +//! - [`Budget`]'s default implementation is used. +//! - [`IsRetryable`] by default will always return `false`. +//! - [`CloneRequest`] by default will always return `None`. +//! +//! # Backoff +//! +//! The [`StandardRetryPolicy`] takes some [`MakeBackoff`] and will make a +//! [`Backoff`] for each request "session" (a request session is the initial +//! request and any subsequent requests). It will then supply the backoff future +//! to the retry middlware. Usually, this future is the [`tokio::time::Sleep`] +//! type. +//! +//! # Budget +//! +//! The [`StandardRetryPolicy`] uses the [`Budget`] type to ensure that for each +//! produced policy that this client will not overwhelm downstream services. +//! Check the docs of [`Budget`] to understand what the defaults are and why they +//! were chosen. +//! +//! # Example +//! +//! ``` +//! let policy = StandardRetryPolicy::<(), (), ()>::builder() +//! .should_retry(|res: &mut Result<(), ()>| true) +//! .clone_request(|req: &()| *req) +//! .budget(Budget::default()) +//! .build(); +//! ``` +use std::{fmt, marker::PhantomData, sync::Arc}; + +use super::{ + backoff::{Backoff, ExponentialBackoffMaker, MakeBackoff}, + budget::Budget, +}; +use crate::retry::Policy; + +/// A trait to determine if the request associated with the response should be +/// retried by [`StandardRetryPolicy`]. +/// +/// # Closure +/// +/// This trait provides a blanket implementation for a closure of the type +/// `Fn(&mut Result) -> bool + Send + Sync + 'static`. +pub trait IsRetryable: Send + Sync + 'static { + /// Return `true` if the request associated with the response should be + /// retried and `false` if it should not be retried. + fn is_retryalbe(&self, response: &mut Result) -> bool; +} + +/// A trait to clone a request for the [`StandardRetryPolicy`]. +/// +/// # Closure +/// +/// This trait provides a blanket implementation for a closure of the type +/// `Fn(&Req) -> Option + Send + Sync + 'static`. +pub trait CloneRequest: Send + Sync + 'static { + /// Clone a request, if `None` is returned the request will not be retried. + fn clone_request(&self, request: &Req) -> Option; +} + +impl IsRetryable for F +where + F: Fn(&mut Result) -> bool + Send + Sync + 'static, +{ + fn is_retryalbe(&self, response: &mut Result) -> bool { + (self)(response) + } +} + +impl CloneRequest for F +where + F: Fn(&Req) -> Option + Send + Sync + 'static, +{ + fn clone_request(&self, request: &Req) -> Option { + (self)(request) + } +} + +/// A standard retry [`Policy`] that combines a retry budget and a backoff +/// mechanism to produce a safe retry policy that prevents congestive collapse +/// and retry storms. +/// +/// This type is constructed with the [`StandardRetryPolicyBuilder`]. +pub struct StandardRetryPolicy +where + B: MakeBackoff, +{ + is_retryable: Arc>, + clone_request: Arc>, + budget: Arc, + make_backoff: B, + current_backoff: B::Backoff, + _pd: PhantomData, +} + +/// Builder for [`StandardRetryPolicy`]. +/// +/// This type can constructed from the `StandardRetryPolicy::builder` function. +pub struct StandardRetryPolicyBuilder { + is_retryable: Arc>, + clone_request: Arc>, + make_backoff: B, + budget: Arc, +} + +impl StandardRetryPolicyBuilder { + /// Sets the retry decision maker. + /// + /// # Default + /// + /// By default, this will be set to an [`IsRetryable`] implementation that + /// always returns false and thus will not retry requests. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .should_retry(|res: &mut Result<(), ()>| true) + /// .build(); + /// ``` + pub fn should_retry(mut self, f: impl IsRetryable + 'static) -> Self { + self.is_retryable = Arc::new(f); + self + } + + /// Sets the clone request handler. + /// + /// # Default + /// + /// By default, this will be set to a [`CloneRequest`] implementation that + /// will never clone the request and will always return `None`. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .clone_request(|req: &()| Some(*req)) + /// .build(); + /// ``` + pub fn clone_request(mut self, f: impl CloneRequest + 'static) -> Self { + self.clone_request = Arc::new(f); + self + } + + /// Sets the backoff maker. + /// + /// # Default + /// + /// By default, this will be set to [`ExponentialBackoffMaker`]'s default + /// implementation. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// # use tower::retry::backoff::ExponentialBackoffMaker; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .make_backoff(ExponentialBackoffMaker::default()) + /// .build(); + /// ``` + pub fn make_backoff(self, backoff: B2) -> StandardRetryPolicyBuilder { + StandardRetryPolicyBuilder { + make_backoff: backoff, + is_retryable: self.is_retryable, + clone_request: self.clone_request, + budget: self.budget, + } + } + + /// Sets the budget. + pub fn budget(mut self, budget: impl Into>) -> Self { + self.budget = budget.into(); + self + } + + /// Consume this builder and produce a [`StandardRetryPolicy`] with this + /// builders configured settings. + /// + /// # Default + /// + /// By default, this will be set to `Budget::default()`. + /// + /// # Example + /// + /// ``` + /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// // Set the Req, Res, and E type to () for simplicity, replace these with + /// // your specific request/response/error types. + /// StandardRetryPolicy::<(), (), ()>::builder() + /// .budget(Budget::default()) + /// .build(); + /// ``` + pub fn build(self) -> StandardRetryPolicy { + let current_backoff = self.make_backoff.make_backoff(); + + StandardRetryPolicy { + is_retryable: self.is_retryable, + clone_request: self.clone_request, + make_backoff: self.make_backoff, + current_backoff, + budget: self.budget, + _pd: PhantomData, + } + } +} + +impl StandardRetryPolicy { + /// Create a [`StandardRetryPolicyBuilder`]. + pub fn builder() -> StandardRetryPolicyBuilder { + StandardRetryPolicyBuilder { + is_retryable: Arc::new(|_: &mut Result| false), + clone_request: Arc::new(|_: &Req| None), + make_backoff: ExponentialBackoffMaker::default(), + budget: Arc::new(Budget::default()), + } + } +} + +impl Policy for StandardRetryPolicy +where + B: MakeBackoff, + Req: 'static, + Res: 'static, + E: 'static, +{ + type Future = ::Future; + + fn retry(&mut self, _req: &mut Req, result: &mut Result) -> Option { + let can_retry = self.is_retryable.is_retryalbe(result); + + if !can_retry { + tracing::trace!("Recived non-retryable response"); + self.budget.deposit(); + return None; + } + + let can_withdraw = self.budget.withdraw().is_ok(); + + if !can_withdraw { + tracing::trace!("Unable to withdraw from budget"); + return None; + } + + tracing::trace!("Withdrew from retry budget"); + + Some(self.current_backoff.next_backoff()) + } + + fn clone_request(&mut self, req: &Req) -> Option { + self.clone_request.clone_request(req) + } +} + +impl Clone for StandardRetryPolicy +where + B: MakeBackoff + Clone, +{ + fn clone(&self) -> Self { + Self { + is_retryable: self.is_retryable.clone(), + clone_request: self.clone_request.clone(), + budget: self.budget.clone(), + make_backoff: self.make_backoff.clone(), + current_backoff: self.make_backoff.make_backoff(), + _pd: PhantomData, + } + } +} + +impl fmt::Debug for StandardRetryPolicy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StandardRetryPolicy") + .field("budget", &self.budget) + .field("make_backoff", &self.make_backoff) + .finish() + } +} + +impl fmt::Debug for StandardRetryPolicyBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StandardRetryPolicyBuilder").finish() + } +} diff --git a/tower/tests/retry/main.rs b/tower/tests/retry/main.rs index 7ce220b47..2d75bdb58 100644 --- a/tower/tests/retry/main.rs +++ b/tower/tests/retry/main.rs @@ -2,9 +2,11 @@ #[path = "../support.rs"] mod support; +use std::time::Duration; + use futures_util::future; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task}; -use tower::retry::Policy; +use tower::retry::{standard_policy::StandardRetryPolicy, Policy}; use tower_test::{assert_request_eq, mock}; #[tokio::test(flavor = "current_thread")] @@ -218,6 +220,44 @@ where } } +#[tokio::test] +async fn basic() { + let _t = support::trace_init(); + + tokio::time::pause(); + + let policy = StandardRetryPolicy::builder() + .should_retry( + |r: &mut Result<&'static str, Box>| { + if let Err(e) = r { + if format!("{:?}", e).contains("retry me") { + return true; + } + } + + false + }, + ) + .clone_request(|r: &&'static str| Some(*r)) + .build(); + + let (mut svc, mut handle) = new_service(policy); + + assert_ready_ok!(svc.poll_ready()); + + let mut fut = task::spawn(svc.call("hello")); + + assert_request_eq!(handle, "hello").send_error("retry me"); + + assert_pending!(fut.poll()); + tokio::time::advance(Duration::from_secs(1)).await; + assert_pending!(fut.poll()); + + assert_request_eq!(handle, "hello").send_response("world"); + + assert_eq!(fut.await.unwrap(), "world"); +} + fn new_service + Clone>( policy: P, ) -> (mock::Spawn>, Handle) { From a3cdc6f0f17438cb26f1bdbefae2812d5b6aa583 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 12 Oct 2022 10:19:28 -0400 Subject: [PATCH 2/5] Fix tests --- tower/src/retry/standard_policy.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tower/src/retry/standard_policy.rs b/tower/src/retry/standard_policy.rs index 74bf4bc36..99fe4a439 100644 --- a/tower/src/retry/standard_policy.rs +++ b/tower/src/retry/standard_policy.rs @@ -26,9 +26,12 @@ //! # Example //! //! ``` +//!# use tower::retry::standard_policy::StandardRetryPolicy; +//!# use tower::retry::budget::Budget; +//! //! let policy = StandardRetryPolicy::<(), (), ()>::builder() //! .should_retry(|res: &mut Result<(), ()>| true) -//! .clone_request(|req: &()| *req) +//! .clone_request(|req: &()| Some(*req)) //! .budget(Budget::default()) //! .build(); //! ``` @@ -198,6 +201,7 @@ impl StandardRetryPolicyBuilder { /// /// ``` /// # use tower::retry::standard_policy::StandardRetryPolicy; + /// # use tower::retry::budget::Budget; /// // Set the Req, Res, and E type to () for simplicity, replace these with /// // your specific request/response/error types. /// StandardRetryPolicy::<(), (), ()>::builder() @@ -243,7 +247,7 @@ where let can_retry = self.is_retryable.is_retryalbe(result); if !can_retry { - tracing::trace!("Recived non-retryable response"); + tracing::trace!("Received non-retryable response"); self.budget.deposit(); return None; } From 2d24ccb3876cfbc9a6dd2c18dd65ecf3bc5355c6 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 12 Oct 2022 11:17:25 -0400 Subject: [PATCH 3/5] Add tracing to retry feature --- tower/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tower/Cargo.toml b/tower/Cargo.toml index acfebbd77..a8ab1935a 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -58,7 +58,7 @@ load-shed = ["__common"] make = ["futures-util", "pin-project-lite", "tokio/io-std"] ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"] reconnect = ["make", "tokio/io-std", "tracing"] -retry = ["__common", "tokio/time", "util"] +retry = ["__common", "tokio/time", "util", "tracing"] spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"] steer = [] timeout = ["pin-project-lite", "tokio/time"] From d97afa53ed596bf85a9627bf40b9ef0e74c53f6d Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 27 Oct 2022 11:22:12 -0400 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Eliza Weisman --- tower/src/retry/mod.rs | 8 ++++---- tower/src/retry/standard_policy.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tower/src/retry/mod.rs b/tower/src/retry/mod.rs index 763cb7d81..a800f332a 100644 --- a/tower/src/retry/mod.rs +++ b/tower/src/retry/mod.rs @@ -1,13 +1,13 @@ //! Middleware for retrying "failed" requests. //! -//! # Batteries included features +//! # Batteries-Included Features //! //! The [`standard_policy`] module contains a default retry [`Policy`] that can -//! be used with the [`Retry`] middleware. For more information check the module +//! be used with the [`Retry`] middleware. For more information, see the module //! docs for [`standard_policy`]. //! -//! The [`backoff`] module contains backoff utlities that can be used in a -//! custom [`Policy`]. +//! The [`backoff`] module contains utilities for implementing backoffs — +//! which determine how long to wait between retries — in a custom [`Policy`]. //! //! The [`budget`] module contains utilities to reduce the amount of concurrent //! retries made by a tower middleware stack. The goal for this is to reduce diff --git a/tower/src/retry/standard_policy.rs b/tower/src/retry/standard_policy.rs index 99fe4a439..8c93b9459 100644 --- a/tower/src/retry/standard_policy.rs +++ b/tower/src/retry/standard_policy.rs @@ -1,5 +1,5 @@ //! A standard retry policy that combines many of the `retry` module's utilities -//! together in a production ready retry policy. +//! together in a production-ready retry policy. //! //! # Defaults //! @@ -53,7 +53,7 @@ use crate::retry::Policy; pub trait IsRetryable: Send + Sync + 'static { /// Return `true` if the request associated with the response should be /// retried and `false` if it should not be retried. - fn is_retryalbe(&self, response: &mut Result) -> bool; + fn is_retryable(&self, response: &mut Result) -> bool; } /// A trait to clone a request for the [`StandardRetryPolicy`]. From 31e3326a629fce5799d577bc860e4adb1421b091 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 27 Oct 2022 14:34:28 -0400 Subject: [PATCH 5/5] Fix rename --- tower/src/retry/standard_policy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tower/src/retry/standard_policy.rs b/tower/src/retry/standard_policy.rs index 8c93b9459..a507b7c21 100644 --- a/tower/src/retry/standard_policy.rs +++ b/tower/src/retry/standard_policy.rs @@ -71,7 +71,7 @@ impl IsRetryable for F where F: Fn(&mut Result) -> bool + Send + Sync + 'static, { - fn is_retryalbe(&self, response: &mut Result) -> bool { + fn is_retryable(&self, response: &mut Result) -> bool { (self)(response) } } @@ -244,7 +244,7 @@ where type Future = ::Future; fn retry(&mut self, _req: &mut Req, result: &mut Result) -> Option { - let can_retry = self.is_retryable.is_retryalbe(result); + let can_retry = self.is_retryable.is_retryable(result); if !can_retry { tracing::trace!("Received non-retryable response");