Skip to content

Commit

Permalink
Merge branch 'master' into unsync-boxed-clone
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jul 20, 2024
2 parents 6405287 + 0891760 commit adac860
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 77 deletions.
6 changes: 3 additions & 3 deletions guides/building-a-middleware-from-scratch.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Our middleware doesn't care about backpressure so its ready as long
// Our middleware doesn't care about backpressure, so it's ready as long
// as the inner service is ready.
self.inner.poll_ready(cx)
}
Expand Down Expand Up @@ -192,9 +192,9 @@ where

Ideally we want to write something like this:

1. First poll `self.response_future` and if its ready return the response or error it
1. First poll `self.response_future`, and if it's ready, return the response or error it
resolved to.
2. Otherwise poll `self.sleep` and if its ready return an error.
2. Otherwise, poll `self.sleep`, and if it's ready, return an error.
3. If neither future is ready return `Poll::Pending`.

We might try:
Expand Down
69 changes: 0 additions & 69 deletions tower/src/balance/p2c/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@ use crate::ready_cache::{error::Failed, ReadyCache};
use crate::util::rng::{sample_floyd2, HasherRng, Rng};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project_lite::pin_project;
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};

Expand Down Expand Up @@ -58,25 +55,6 @@ where
}
}

pin_project! {
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,

_req: PhantomData<Req>,
}
}

enum Error<E> {
Inner(E),
Canceled,
}

impl<D, Req> Balance<D, Req>
where
D: Discover,
Expand Down Expand Up @@ -279,50 +257,3 @@ where
.map_err(Into::into)
}
}

impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
type Output = Result<(K, S), (K, Error<S::Error>)>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

if let Poll::Ready(Ok(())) = this.cancel.poll(cx) {
let key = this.key.take().expect("polled after ready");
return Poll::Ready(Err((key, Error::Canceled)));
}

let res = ready!(this
.service
.as_mut()
.expect("poll after ready")
.poll_ready(cx));

let key = this.key.take().expect("polled after ready");
let svc = this.service.take().expect("polled after ready");

match res {
Ok(()) => Poll::Ready(Ok((key, svc))),
Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
}
}
}

impl<K, S, Req> fmt::Debug for UnreadyService<K, S, Req>
where
K: fmt::Debug,
S: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Self {
key,
cancel,
service,
_req,
} = self;
f.debug_struct("UnreadyService")
.field("key", key)
.field("cancel", cancel)
.field("service", service)
.finish()
}
}
2 changes: 1 addition & 1 deletion tower/src/retry/backoff.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This module contains generic [backoff] utlities to be used with the retry
//! This module contains generic [backoff] utilities to be used with the retry
//! layer.
//!
//! The [`Backoff`] trait is a generic way to represent backoffs that can use
Expand Down
2 changes: 1 addition & 1 deletion tower/src/retry/budget/tps_budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct TpsBudget {
slots: Box<[AtomicIsize]>,
/// The amount of time represented by each slot.
window: Duration,
/// The changers for the current slot to be commited
/// The changers for the current slot to be committed
/// after the slot expires.
writer: AtomicIsize,
/// Amount of tokens to deposit for each put().
Expand Down
4 changes: 2 additions & 2 deletions tower/src/util/rng.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! This module provides a generic [`Rng`] trait and a [`HasherRng`] that
//! implements the trait based on [`RandomState`] or any other [`Hasher`].
//!
//! These utlities replace tower's internal usage of `rand` with these smaller,
//! These utilities replace tower's internal usage of `rand` with these smaller,
//! more lightweight methods. Most of the implementations are extracted from
//! their corresponding `rand` implementations.
//!
Expand Down Expand Up @@ -111,7 +111,7 @@ where

/// A sampler modified from the Rand implementation for use internally for the balance middleware.
///
/// It's an implemenetation of Floyd's combination algorithm. with amount fixed at 2. This uses no allocated
/// It's an implementation of Floyd's combination algorithm. with amount fixed at 2. This uses no allocated
/// memory and finishes in constant time (only 2 random calls)
///
/// ref: This was borrowed and modified from the following Rand implementation
Expand Down
2 changes: 1 addition & 1 deletion tower/tests/ready_cache/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async fn cancelation_observed() {
let mut handles = vec![];

// NOTE This test passes at 129 items, but fails at 130 items (if coop
// schedulding interferes with cancelation).
// scheduling interferes with cancelation).
for _ in 0..130 {
let (svc, mut handle) = tower_test::mock::pair::<(), ()>();
handle.allow(1);
Expand Down

0 comments on commit adac860

Please sign in to comment.