Skip to content

Commit

Permalink
update to reqwest 0.12
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Apr 30, 2024
1 parent 5746454 commit 78ab848
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 107 deletions.
231 changes: 169 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ prost = "0.11"
rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.7", features = ["opentelemetry_0_20"] }
reqwest-middleware = "0.2.0"
reqwest-retry = "0.2.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_20"] }
reqwest-middleware = "0.3.0"
reqwest-retry = "0.5"
routerify = "3"
rpds = "0.13"
rustc-hash = "1.1.0"
Expand Down Expand Up @@ -177,6 +177,7 @@ tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tower-service = "0.3.2"
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.20.0"
Expand Down
4 changes: 2 additions & 2 deletions control_plane/src/storage_controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{background_process, local_env::LocalEnv};
use camino::{Utf8Path, Utf8PathBuf};
use hyper::Method;
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeRegisterRequest, TenantCreateResponse, TenantLocateResponse,
Expand All @@ -14,6 +13,7 @@ use pageserver_api::{
};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{fs, str::FromStr};
use tokio::process::Command;
Expand Down Expand Up @@ -379,7 +379,7 @@ impl StorageController {
/// Simple HTTP request wrapper for calling into storage controller
async fn dispatch<RQ, RS>(
&self,
method: hyper::Method,
method: reqwest::Method,
path: String,
body: Option<RQ>,
) -> anyhow::Result<RS>
Expand Down
5 changes: 2 additions & 3 deletions control_plane/storcon_cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{collections::HashMap, str::FromStr, time::Duration};

use clap::{Parser, Subcommand};
use hyper::{Method, StatusCode};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
Expand All @@ -14,7 +13,7 @@ use pageserver_api::{
shard::{ShardStripeSize, TenantShardId},
};
use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
use reqwest::Url;
use reqwest::{Method, StatusCode, Url};
use serde::{de::DeserializeOwned, Serialize};
use utils::id::{NodeId, TenantId};

Expand Down Expand Up @@ -232,7 +231,7 @@ impl Client {
/// Simple HTTP request wrapper for calling into storage controller
async fn dispatch<RQ, RS>(
&self,
method: hyper::Method,
method: Method,
path: String,
body: Option<RQ>,
) -> mgmt_api::Result<RS>
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ControlPlaneClient {
let mut client = reqwest::ClientBuilder::new();

if let Some(jwt) = &conf.control_plane_api_token {
let mut headers = hyper::HeaderMap::new();
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"Authorization",
format!("Bearer {}", jwt.get_contents()).parse().unwrap(),
Expand Down
5 changes: 3 additions & 2 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ prometheus.workspace = true
rand.workspace = true
regex.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
reqwest = { workspace = true, features = ["json"] }
reqwest-middleware.workspace = true
reqwest.workspace = true
reqwest-middleware = { workspace = true, features = ["json"] }
reqwest-retry.workspace = true
reqwest-tracing.workspace = true
routerify.workspace = true
Expand All @@ -84,6 +84,7 @@ tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
tower-service.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
Expand Down
17 changes: 9 additions & 8 deletions proxy/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pub mod health_server;

use std::{sync::Arc, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};

use futures::FutureExt;
pub use reqwest::{Request, Response, StatusCode};
Expand Down Expand Up @@ -103,12 +103,12 @@ impl Endpoint {
}
}

/// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
use hyper::{
client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
service::Service,
use hyper_util::client::legacy::connect::dns::{
GaiResolver as HyperGaiResolver, Name as HyperName,
};
use reqwest::dns::{Addrs, Resolve, Resolving};
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
/// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
use tower_service::Service;
#[derive(Debug)]
pub struct GaiResolver(HyperGaiResolver);

Expand All @@ -121,11 +121,12 @@ impl Default for GaiResolver {
impl Resolve for GaiResolver {
fn resolve(&self, name: Name) -> Resolving {
let this = &mut self.0.clone();
let hyper_name = HyperName::from_str(name.as_str()).expect("name should be valid");
let start = Instant::now();
Box::pin(
Service::<Name>::call(this, name.clone()).map(move |result| {
Service::<HyperName>::call(this, hyper_name).map(move |result| {
let resolve_duration = start.elapsed();
trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
trace!(duration = ?resolve_duration, addr = %name.as_str(), "resolve host complete");
result
.map(|addrs| -> Addrs { Box::new(addrs) })
.map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/proxy/wake_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::metrics::{
WakeupFailureKind,
};
use crate::proxy::retry::retry_after;
use hyper::StatusCode;
use hyper1::StatusCode;
use std::ops::ControlFlow;
use tracing::{error, info, warn};

Expand Down
2 changes: 1 addition & 1 deletion storage_controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
reqwest.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
36 changes: 24 additions & 12 deletions storage_controller/src/compute_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{collections::HashMap, time::Duration};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::{Method, StatusCode};
use hyper::StatusCode;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -328,7 +328,7 @@ impl ComputeHook {
reconfigure_request: &ComputeHookNotifyRequest,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let req = self.client.request(Method::PUT, url);
let req = self.client.request(reqwest::Method::PUT, url);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
Expand All @@ -347,8 +347,10 @@ impl ComputeHook {
};

// Treat all 2xx responses as success
if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES {
if response.status() != StatusCode::OK {
if response.status() >= reqwest::StatusCode::OK
&& response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
{
if response.status() != reqwest::StatusCode::OK {
// Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
// log a warning.
tracing::warn!(
Expand All @@ -362,7 +364,7 @@ impl ComputeHook {

// Error response codes
match response.status() {
StatusCode::TOO_MANY_REQUESTS => {
reqwest::StatusCode::TOO_MANY_REQUESTS => {
// TODO: 429 handling should be global: set some state visible to other requests
// so that they will delay before starting, rather than all notifications trying
// once before backing off.
Expand All @@ -371,20 +373,30 @@ impl ComputeHook {
.ok();
Err(NotifyError::SlowDown)
}
StatusCode::LOCKED => {
reqwest::StatusCode::LOCKED => {
// We consider this fatal, because it's possible that the operation blocking the control one is
// also the one that is waiting for this reconcile. We should let the reconciler calling
// this hook fail, to give control plane a chance to un-lock.
tracing::info!("Control plane reports tenant is locked, dropping out of notify");
Err(NotifyError::Busy)
}
StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT
| StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())),
StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(NotifyError::Fatal(response.status()))
reqwest::StatusCode::SERVICE_UNAVAILABLE => {
Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
}
_ => Err(NotifyError::Unexpected(response.status())),
reqwest::StatusCode::GATEWAY_TIMEOUT => {
Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
}
reqwest::StatusCode::BAD_GATEWAY => {
Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
}

reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
status => Err(NotifyError::Unexpected(
hyper::StatusCode::from_u16(status.as_u16())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
)),
}
}

Expand Down
25 changes: 18 additions & 7 deletions storage_controller/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::metrics::{
};
use crate::reconciler::ReconcileError;
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
use futures::Future;
use hyper::header::CONTENT_TYPE;
use hyper::{Body, Request, Response};
Expand Down Expand Up @@ -258,6 +259,12 @@ async fn handle_tenant_time_travel_remote_storage(
json_response(StatusCode::OK, ())
}

pub(crate) fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
hyper::StatusCode::from_u16(status.as_u16())
.context("invalid status code")
.map_err(ApiError::InternalServerError)
}

async fn handle_tenant_secondary_download(
service: Arc<Service>,
req: Request<Body>,
Expand All @@ -266,7 +273,7 @@ async fn handle_tenant_secondary_download(
let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);

let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
json_response(status, progress)
json_response(map_reqwest_hyper_status(status)?, progress)
}

async fn handle_tenant_delete(
Expand All @@ -277,7 +284,10 @@ async fn handle_tenant_delete(
check_permissions(&req, Scope::PageServerApi)?;

deletion_wrapper(service, move |service| async move {
service.tenant_delete(tenant_id).await
service
.tenant_delete(tenant_id)
.await
.and_then(map_reqwest_hyper_status)
})
.await
}
Expand Down Expand Up @@ -308,7 +318,10 @@ async fn handle_tenant_timeline_delete(
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;

deletion_wrapper(service, move |service| async move {
service.tenant_timeline_delete(tenant_id, timeline_id).await
service
.tenant_timeline_delete(tenant_id, timeline_id)
.await
.and_then(map_reqwest_hyper_status)
})
.await
}
Expand Down Expand Up @@ -371,11 +384,9 @@ async fn handle_tenant_timeline_passthrough(
}

// We have a reqest::Response, would like a http::Response
let mut builder = hyper::Response::builder()
.status(resp.status())
.version(resp.version());
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
for (k, v) in resp.headers() {
builder = builder.header(k, v);
builder = builder.header(k.as_str(), v.as_bytes());
}

let response = builder
Expand Down
2 changes: 1 addition & 1 deletion storage_controller/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{str::FromStr, time::Duration};

use hyper::StatusCode;
use reqwest::StatusCode;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
Expand Down
2 changes: 1 addition & 1 deletion storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::service;
use hyper::StatusCode;
use reqwest::StatusCode;
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
Expand Down
2 changes: 1 addition & 1 deletion storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use control_plane::storage_controller::{
};
use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use itertools::Itertools;
use pageserver_api::{
controller_api::{
Expand All @@ -33,6 +32,7 @@ use pageserver_api::{
},
models::{SecondaryProgress, TenantConfigRequest},
};
use reqwest::StatusCode;

use crate::pageserver_client::PageserverClient;
use pageserver_api::{
Expand Down
3 changes: 2 additions & 1 deletion workspace_hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] }
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "rustls-tls", "stream"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
Expand Down

0 comments on commit 78ab848

Please sign in to comment.