Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry Policy POC #546

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions tower/src/retry/future.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Future types

use super::{Policy, Retry};
use super::Retry;
use crate::retry::PolicyV2;
use futures_core::ready;
use pin_project::pin_project;
use std::future::Future;
Expand All @@ -13,7 +14,7 @@ use tower_service::Service;
#[derive(Debug)]
pub struct ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error>,
P: PolicyV2<Request, S::Response, S::Error>,
S: Service<Request>,
{
request: Option<Request>,
Expand All @@ -36,7 +37,7 @@ enum State<F, P> {

impl<P, S, Request> ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error>,
P: PolicyV2<Request, S::Response, S::Error>,
S: Service<Request>,
{
pub(crate) fn new(
Expand All @@ -54,7 +55,7 @@ where

impl<P, S, Request> Future for ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error> + Clone,
P: PolicyV2<Request, S::Response, S::Error> + Clone,
S: Service<Request> + Clone,
{
type Output = Result<S::Response, S::Error>;
Expand All @@ -67,11 +68,11 @@ where
StateProj::Called(future) => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, result.as_ref()) {
Some(checking) => {
match this.retry.policy.retry(req, result) {
Err(checking) => {
this.state.set(State::Checking(checking));
}
None => return Poll::Ready(result),
Ok(result) => return Poll::Ready(result),
}
} else {
// request wasn't cloned, so no way to retry it
Expand Down
4 changes: 3 additions & 1 deletion tower/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ pub mod budget;
pub mod future;
mod layer;
mod policy;
mod policyv2;

pub use self::layer::RetryLayer;
pub use self::policy::Policy;
pub use self::policyv2::PolicyV2;

use self::future::ResponseFuture;
use pin_project::pin_project;
Expand Down Expand Up @@ -50,7 +52,7 @@ impl<P, S> Retry<P, S> {

impl<P, S, Request> Service<Request> for Retry<P, S>
where
P: Policy<Request, S::Response, S::Error> + Clone,
P: PolicyV2<Request, S::Response, S::Error> + Clone,
S: Service<Request> + Clone,
{
type Response = S::Response;
Expand Down
45 changes: 45 additions & 0 deletions tower/src/retry/policyv2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::retry::Policy;
use std::future::Future;

/// TODO docs
pub trait PolicyV2<Req, Res, E>: Sized {
/// The [`Future`] type returned by [`Policy::retry`].
type Future: Future<Output = Self>;

/// Check the policy if a certain request should be retried.
///
/// This method is passed a reference to the original request, and either
/// the [`Service::Response`] or [`Service::Error`] from the inner service.
///
/// If the request should **not** be retried, return `None`.
///
/// If the request *should* be retried, return `Some` future of a new
/// policy that would apply for the next request attempt.
///
/// [`Service::Response`]: crate::Service::Response
/// [`Service::Error`]: crate::Service::Error
fn retry(&self, req: &Req, result: Result<Res, E>) -> Result<Result<Res, E>, Self::Future>;

/// Tries to clone a request before being passed to the inner service.
///
/// If the request cannot be cloned, return [`None`].
fn clone_request(&self, req: &Req) -> Option<Req>;
}

impl<T, Req, Res, E> PolicyV2<Req, Res, E> for T
where
T: Policy<Req, Res, E>,
{
type Future = T::Future;

fn retry(&self, req: &Req, result: Result<Res, E>) -> Result<Result<Res, E>, Self::Future> {
match Policy::<Req, Res, E>::retry(self, req, result.as_ref()) {
Some(fut) => Err(fut),
None => Ok(result),
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Policy::<Req, Res, E>::clone_request(self, req)
}
}