diff --git a/Cargo.toml b/Cargo.toml index e02d3b35..e4bd0460 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ async-trait = "0.1.53" bytes = "1.0" chrono = { version = "0.4.34", default-features = false, features = ["clock"] } futures = "0.3" +futures-timer = "3" http = "1.2.0" humantime = "2.1" itertools = "0.14.0" @@ -53,7 +54,7 @@ httparse = { version = "1.8.0", default-features = false, features = ["std"], op hyper = { version = "1.2", default-features = false, optional = true } md-5 = { version = "0.10.6", default-features = false, optional = true } quick-xml = { version = "0.38.0", features = ["serialize", "overlapped-lists"], optional = true } -rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"], optional = true } +rand = { version = "0.9.1", default-features = false, features = ["std", "std_rng", "thread_rng"], optional = true } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true } ring = { version = "0.17", default-features = false, features = ["std"], optional = true } rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } @@ -68,6 +69,7 @@ nix = { version = "0.30.0", features = ["fs"] } [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] web-time = { version = "1.1.0" } wasm-bindgen-futures = "0.4.18" +futures-timer = { version = "3", features = ["wasm-bindgen"] } [features] default = ["fs"] @@ -83,7 +85,7 @@ integration = ["rand"] [dev-dependencies] # In alphabetical order hyper = { version = "1.2", features = ["server"] } hyper-util = "0.1" -rand = "0.9" +rand = "0.9.1" tempfile = "3.1.0" regex = "1.11.1" # The "gzip" feature for reqwest is enabled for an integration test. diff --git a/src/client/mod.rs b/src/client/mod.rs index 58b27d28..52e30d92 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -20,6 +20,7 @@ //! [`ObjectStore`]: crate::ObjectStore pub(crate) mod backoff; +pub(crate) mod sleep; #[cfg(not(target_arch = "wasm32"))] mod dns; diff --git a/src/client/retry.rs b/src/client/retry.rs index ed9bf0c4..4904f1b8 100644 --- a/src/client/retry.rs +++ b/src/client/retry.rs @@ -20,7 +20,7 @@ use crate::PutPayload; use crate::client::backoff::{Backoff, BackoffConfig}; use crate::client::builder::HttpRequestBuilder; -use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpResponse}; +use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpResponse, sleep}; use futures::future::BoxFuture; use http::{Method, Uri}; use reqwest::StatusCode; @@ -375,15 +375,15 @@ impl RetryableRequest { return Err(self.err(RequestError::Response { body, status }, ctx)); } - let sleep = ctx.backoff(); + let sleep_duration = ctx.backoff(); info!( "Encountered a response status of {} but body contains Error, backing off for {} seconds, retry {} of {}", status, - sleep.as_secs_f32(), + sleep_duration.as_secs_f32(), ctx.retries, ctx.max_retries, ); - tokio::time::sleep(sleep).await; + sleep::sleep(sleep_duration).await; } } else if status == StatusCode::NOT_MODIFIED { return Err(self.err(RequestError::Status { status, body: None }, ctx)); @@ -420,15 +420,15 @@ impl RetryableRequest { return Err(self.err(source, ctx)); }; - let sleep = ctx.backoff(); + let sleep_duration = ctx.backoff(); info!( "Encountered server error with status {}, backing off for {} seconds, retry {} of {}", status, - sleep.as_secs_f32(), + sleep_duration.as_secs_f32(), ctx.retries, ctx.max_retries, ); - tokio::time::sleep(sleep).await; + sleep::sleep(sleep_duration).await; } } Err(e) => { @@ -445,16 +445,16 @@ impl RetryableRequest { if ctx.exhausted() || !do_retry { return Err(self.err(RequestError::Http(e), ctx)); } - let sleep = ctx.backoff(); + let sleep_duration = ctx.backoff(); info!( "Encountered transport error of kind {:?}, backing off for {} seconds, retry {} of {}: {}", e.kind(), - sleep.as_secs_f32(), + sleep_duration.as_secs_f32(), ctx.retries, ctx.max_retries, e, ); - tokio::time::sleep(sleep).await; + sleep::sleep(sleep_duration).await; } } } @@ -511,7 +511,7 @@ mod tests { use crate::RetryConfig; use crate::client::mock_server::MockServer; use crate::client::retry::{RequestError, RetryContext, RetryExt, body_contains_error}; - use crate::client::{HttpClient, HttpResponse}; + use crate::client::{HttpClient, HttpResponse, sleep}; use http::StatusCode; use hyper::Response; use hyper::header::LOCATION; @@ -734,14 +734,14 @@ mod tests { // Retries on client timeout mock.push_async_fn(|_| async move { - tokio::time::sleep(Duration::from_secs(10)).await; + sleep::sleep(Duration::from_secs(10)).await; panic!() }); do_request().await.unwrap(); // Does not retry PUT request mock.push_async_fn(|_| async move { - tokio::time::sleep(Duration::from_secs(10)).await; + sleep::sleep(Duration::from_secs(10)).await; panic!() }); let res = client.request(Method::PUT, mock.url()).send_retry(&retry); diff --git a/src/client/sleep.rs b/src/client/sleep.rs new file mode 100644 index 00000000..b689f921 --- /dev/null +++ b/src/client/sleep.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{future::Future, time::Duration}; + +use futures::FutureExt; +use futures_timer::Delay; + +/// A future that resolves after `duration`. +/// +/// Not intended for use when high resolution is required. +pub(crate) struct Sleep { + delay: Delay, +} + +/// Create a new [`Sleep`] instance that will resolve once `duration` has passed since its first awaited. +pub(crate) fn sleep(duration: Duration) -> Sleep { + Sleep { + delay: Delay::new(duration), + } +} + +impl Future for Sleep { + type Output = (); + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.as_mut().delay.poll_unpin(cx) + } +}