Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ type MaybeDecompressionBody<T> = tower_http::decompression::DecompressionBody<T>

type ClientService = Timeout<
ConfigService<
MaybeDecompression<
Retry<RetryPolicy, FollowRedirect<HttpClient<Connector, Body>, FollowRedirectPolicy>>,
>,
MaybeDecompression<Retry<RetryPolicy, FollowRedirect<HttpClient<Connector, Body>>>>,
>,
>;

Expand Down
108 changes: 48 additions & 60 deletions src/client/layer/redirect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,38 @@ use std::{

use futures_util::future::Either;
use http::{Request, Response};
use http_body::Body;
use http_body::Body as HttpBody;
use tower::{BoxError, Layer, Service};

use self::future::ResponseFuture;
pub use self::policy::{Action, Attempt, Policy};
pub use self::policy::{Action, Attempt};
use crate::{client::body::Body, redirect::FollowRedirectPolicy};

enum BodyRepr<B> {
Some(B),
Empty,
None,
}

impl<B> BodyRepr<B>
where
B: Body + Default,
{
fn take(&mut self) -> Option<B> {
impl BodyRepr<Body> {
fn take(&mut self) -> Option<Body> {
match mem::replace(self, BodyRepr::None) {
BodyRepr::Some(body) => Some(body),
BodyRepr::Empty => {
*self = BodyRepr::Empty;
Some(B::default())
Some(Body::default())
}
BodyRepr::None => None,
}
}

fn try_clone_from<P, E>(&mut self, body: &B, policy: &P)
where
P: Policy<B, E>,
{
fn try_clone_from(&mut self, body: &Body) {
match self {
BodyRepr::Some(_) | BodyRepr::Empty => {}
BodyRepr::None => {
if body.size_hint().exact() == Some(0) {
*self = BodyRepr::Some(B::default());
} else if let Some(cloned) = policy.clone_body(body) {
*self = BodyRepr::Some(Body::default());
} else if let Some(cloned) = body.try_clone() {
*self = BodyRepr::Some(cloned);
}
}
Expand All @@ -55,25 +50,24 @@ where
}

/// [`Layer`] for retrying requests with a [`Service`] to follow redirection responses.
#[derive(Clone, Copy, Default)]
pub struct FollowRedirectLayer<P> {
policy: P,
#[derive(Clone)]
pub struct FollowRedirectLayer {
policy: FollowRedirectPolicy,
}

impl<P> FollowRedirectLayer<P> {
/// Create a new [`FollowRedirectLayer`] with the given redirection [`Policy`].
impl FollowRedirectLayer {
/// Create a new [`FollowRedirectLayer`] with the given redirection policy.
#[inline(always)]
pub fn with_policy(policy: P) -> Self {
pub(crate) fn with_policy(policy: FollowRedirectPolicy) -> Self {
FollowRedirectLayer { policy }
}
}

impl<S, P> Layer<S> for FollowRedirectLayer<P>
impl<S> Layer<S> for FollowRedirectLayer
where
S: Clone,
P: Clone,
{
type Service = FollowRedirect<S, P>;
type Service = FollowRedirect<S>;

#[inline(always)]
fn layer(&self, inner: S) -> Self::Service {
Expand All @@ -82,63 +76,57 @@ where
}

/// Middleware that retries requests with a [`Service`] to follow redirection responses.
#[derive(Clone, Copy)]
pub struct FollowRedirect<S, P> {
#[derive(Clone)]
pub struct FollowRedirect<S> {
inner: S,
policy: P,
policy: FollowRedirectPolicy,
}

impl<S, P> FollowRedirect<S, P>
where
P: Clone,
{
/// Create a new [`FollowRedirect`] with the given redirection [`Policy`].
impl<S> FollowRedirect<S> {
/// Create a new [`FollowRedirect`] with the given redirection policy.
#[inline(always)]
pub fn with_policy(inner: S, policy: P) -> Self {
fn with_policy(inner: S, policy: FollowRedirectPolicy) -> Self {
FollowRedirect { inner, policy }
}
}

impl<ReqBody, ResBody, S, P> Service<Request<ReqBody>> for FollowRedirect<S, P>
impl<ResBody, S> Service<Request<Body>> for FollowRedirect<S>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone,
S: Service<Request<Body>, Response = Response<ResBody>> + Clone,
S::Error: From<BoxError>,
P: Policy<ReqBody, S::Error> + Clone,
ReqBody: Body + Default,
{
type Response = Response<ResBody>;
type Error = S::Error;
type Future = ResponseFuture<S, ReqBody, P>;
type Future = ResponseFuture<S>;

#[inline(always)]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: Request<ReqBody>) -> Self::Future {
if self.policy.follow_redirects(&mut req) {
let service = self.inner.clone();
let mut service = mem::replace(&mut self.inner, service);
let mut policy = self.policy.clone();

let mut body_repr = BodyRepr::None;
body_repr.try_clone_from(req.body(), &policy);
policy.on_request(&mut req);

let (parts, body) = req.into_parts();
let req = Request::from_parts(parts.clone(), body);
ResponseFuture::Redirect {
future: Either::Left(service.call(req)),
pending_future: None,
service,
policy,
parts,
body_repr,
}
} else {
ResponseFuture::Direct {
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let Some(mut policy) = self.policy.for_request(&mut req) else {
return ResponseFuture::Direct {
future: self.inner.call(req),
}
};
};

let service = self.inner.clone();
let mut service = mem::replace(&mut self.inner, service);

let mut body_repr = BodyRepr::None;
body_repr.try_clone_from(req.body());
policy.on_request(&mut req);

let (parts, body) = req.into_parts();
let req = Request::from_parts(parts.clone(), body);
ResponseFuture::Redirect {
future: Either::Left(service.call(req)),
pending_future: None,
service,
policy,
parts,
body_repr,
}
}
}
70 changes: 28 additions & 42 deletions src/client/layer/redirect/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,39 @@ use http::{
header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE, LOCATION, TRANSFER_ENCODING},
request::Parts,
};
use http_body::Body;
use pin_project_lite::pin_project;
use tower::{BoxError, Service, util::Oneshot};
use url::Url;

use super::{
BodyRepr,
policy::{Action, Attempt, Policy},
policy::{Action, Attempt},
};
use crate::{Error, ext::RequestUri, into_uri::IntoUriSealed};
use crate::{Body, ext::RequestUri, into_uri::IntoUriSealed, redirect::FollowRedirectPolicy};

/// Pending future state for handling redirects.
pub struct Pending<ReqBody, Response> {
pub struct Pending<Response> {
future: Pin<Box<dyn Future<Output = Action> + Send>>,
location: Uri,
body: ReqBody,
body: Body,
res: Response,
}

pin_project! {
/// Response future for [`FollowRedirect`].
#[project = ResponseFutureProj]
pub enum ResponseFuture<S, B, P>
pub enum ResponseFuture<S>
where
S: Service<Request<B>>,
S: Service<Request<Body>>,
{
Redirect {
#[pin]
future: Either<S::Future, Oneshot<S, Request<B>>>,
pending_future: Option<Pending<B, S::Response>>,
future: Either<S::Future, Oneshot<S, Request<Body>>>,
pending_future: Option<Pending<S::Response>>,
service: S,
policy: P,
policy: FollowRedirectPolicy,
parts: Parts,
body_repr: BodyRepr<B>,
body_repr: BodyRepr<Body>,
},

Direct {
Expand All @@ -54,14 +53,12 @@ pin_project! {
}
}

impl<S, ReqBody, ResBody, P> Future for ResponseFuture<S, ReqBody, P>
impl<S, B> Future for ResponseFuture<S>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone,
S: Service<Request<Body>, Response = Response<B>> + Clone,
S::Error: From<BoxError>,
P: Policy<ReqBody, S::Error>,
ReqBody: Body + Default,
{
type Output = Result<Response<ResBody>, S::Error>;
type Output = Result<Response<B>, S::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
Expand Down Expand Up @@ -215,41 +212,36 @@ fn drop_payload_headers(headers: &mut HeaderMap) {
}
}

type RedirectFuturePin<'a, S, ReqBody> =
Pin<&'a mut Either<<S as Service<Request<ReqBody>>>::Future, Oneshot<S, Request<ReqBody>>>>;
type RedirectFuturePin<'a, S> =
Pin<&'a mut Either<<S as Service<Request<Body>>>::Future, Oneshot<S, Request<Body>>>>;

struct RedirectAction<'a, S, ReqBody, ResBody, P>
struct RedirectAction<'a, S, B>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone,
P: Policy<ReqBody, S::Error>,
S: Service<Request<Body>, Response = Response<B>> + Clone,
{
action: Action,
future: &'a mut RedirectFuturePin<'a, S, ReqBody>,
future: &'a mut RedirectFuturePin<'a, S>,
service: &'a S,
policy: &'a mut P,
policy: &'a mut FollowRedirectPolicy,
parts: &'a mut Parts,
body: ReqBody,
body_repr: &'a mut BodyRepr<ReqBody>,
res: Response<ResBody>,
body: Body,
body_repr: &'a mut BodyRepr<Body>,
res: Response<B>,
location: Uri,
}

fn handle_action<S, ReqBody, ResBody, P>(
fn handle_action<S, B>(
cx: &mut Context<'_>,
redirect: RedirectAction<'_, S, ReqBody, ResBody, P>,
) -> Poll<Result<Response<ResBody>, S::Error>>
redirect: RedirectAction<'_, S, B>,
) -> Poll<Result<Response<B>, S::Error>>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone,
S: Service<Request<Body>, Response = Response<B>> + Clone,
S::Error: From<BoxError>,
P: Policy<ReqBody, S::Error>,
ReqBody: Body + Default,
{
match redirect.action {
Action::Follow => {
redirect.parts.uri = redirect.location;
redirect
.body_repr
.try_clone_from(&redirect.body, redirect.policy);
redirect.body_repr.try_clone_from(&redirect.body);
Comment on lines 242 to +244

This comment was marked as spam.


let mut req = Request::from_parts(redirect.parts.clone(), redirect.body);
redirect.policy.on_request(&mut req);
Expand All @@ -261,13 +253,7 @@ where
Poll::Pending
}
Action::Stop => Poll::Ready(Ok(redirect.res)),
Action::Pending(_) => Poll::Ready(Err(S::Error::from(
Error::redirect(
"Nested pending Action is not supported in redirect policy",
redirect.parts.uri.clone(),
)
.into(),
))),
Action::Error(err) => Poll::Ready(Err(err.into())),
Action::Pending(_) => unreachable!(),
}
Comment thread
0x676e67 marked this conversation as resolved.
}
28 changes: 2 additions & 26 deletions src/client/layer/redirect/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,10 @@

use std::{fmt, pin::Pin};

use http::{HeaderMap, Request, Response, StatusCode, Uri};
use http::{HeaderMap, StatusCode, Uri};
Comment thread
0x676e67 marked this conversation as resolved.

use crate::error::BoxError;

/// Trait for the policy on handling redirection responses.
pub trait Policy<B, E> {
/// Invoked when the service received a response with a redirection status code (`3xx`).
///
/// This method returns an [`Action`] which indicates whether the service should follow
/// the redirection.
fn redirect(&mut self, attempt: Attempt<'_>) -> Result<Action, E>;

/// Returns whether redirection is currently permitted by this policy.
///
/// This method is called to determine whether the client should follow redirects at all.
/// It allows policies to enable or disable redirection behavior based on the [`Request`].
fn follow_redirects(&mut self, _request: &mut Request<B>) -> bool;

/// Invoked right before the service makes a [`Request`].
fn on_request(&mut self, _request: &mut Request<B>);

/// Invoked right after the service received a [`Response`].
fn on_response<Body>(&mut self, _response: &mut Response<Body>);

/// Try to clone a request body before the service makes a redirected request.
fn clone_body(&self, _body: &B) -> Option<B>;
}

/// A type that holds information on a redirection attempt.
pub struct Attempt<'a> {
pub(crate) status: StatusCode,
Expand All @@ -38,7 +14,7 @@ pub struct Attempt<'a> {
pub(crate) previous: &'a Uri,
}

/// A value returned by [`Policy::redirect`] which indicates the action
/// A value which indicates the action
/// [`FollowRedirect`][super::FollowRedirect] should take for a redirection response.
pub enum Action {
/// Follow the redirection.
Expand Down
Loading
Loading