diff --git a/Cargo.lock b/Cargo.lock index 8fc6df4d..3c61ebd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,6 +1890,7 @@ dependencies = [ "subgraphs", "thiserror 2.0.17", "tokio", + "tokio-retry2", "tracing", "vrl", "xxhash-rust", @@ -3430,6 +3431,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -5008,6 +5029,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-retry2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7de2537bbad4f8b2d4237cdab9e7c4948a1f74744e45f54144eeccd05d3ad955" +dependencies = [ + "pin-project", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 03f41bbd..36f95c5f 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -47,6 +47,7 @@ itoa = "1.0.15" ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" +tokio-retry2 = "0.6.0" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index f85fc49d..adfcb300 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; -use http::HeaderMap; +use http::{HeaderMap, StatusCode}; use crate::execution::plan::ClientRequestDetails; @@ -10,7 +10,7 @@ use crate::execution::plan::ClientRequestDetails; pub trait SubgraphExecutor { async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: &'a HttpExecutionRequest<'a>, ) -> HttpExecutionResponse; fn to_boxed_arc<'a>(self) -> Arc> where @@ -38,4 +38,5 @@ pub struct HttpExecutionRequest<'a> { pub struct HttpExecutionResponse { pub body: Bytes, pub headers: HeaderMap, + pub status: StatusCode, } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index b1297a9b..a06dbeef 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -159,19 +159,20 @@ impl HTTPSubgraphExecutor { impl SubgraphExecutor for HTTPSubgraphExecutor { async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: &'a HttpExecutionRequest<'a>, ) -> HttpExecutionResponse { - let body = match self.build_request_body(&execution_request) { + let body = match self.build_request_body(execution_request) { Ok(body) => body, Err(e) => { return HttpExecutionResponse { body: error_to_graphql_bytes(&self.endpoint, e), headers: Default::default(), + status: http::StatusCode::BAD_REQUEST, } } }; - let mut headers = execution_request.headers; + let mut headers = execution_request.headers.clone(); self.header_map.iter().for_each(|(key, value)| { headers.insert(key, value.clone()); }); @@ -184,10 +185,12 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, headers: shared_response.headers, + status: shared_response.status, }, Err(e) => HttpExecutionResponse { body: error_to_graphql_bytes(&self.endpoint, e), headers: Default::default(), + status: http::StatusCode::BAD_REQUEST, }, }; } @@ -223,10 +226,12 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body.clone(), headers: shared_response.headers.clone(), + status: shared_response.status, }, Err(e) => HttpExecutionResponse { body: error_to_graphql_bytes(&self.endpoint, e.clone()), headers: Default::default(), + status: http::StatusCode::BAD_REQUEST, }, } } diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 555fa4ba..3717dc23 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -18,11 +18,7 @@ use crate::{ executors::{ common::{ HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, - }, - dedupe::{ABuildHasher, SharedResponse}, - error::SubgraphExecutorError, - http::HTTPSubgraphExecutor, - timeout::TimeoutExecutor, + }, dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, http::HTTPSubgraphExecutor, retry::RetryExecutor, timeout::TimeoutExecutor }, response::graphql_error::GraphQLError, }; @@ -50,7 +46,7 @@ impl SubgraphExecutorMap { execution_request: HttpExecutionRequest<'a>, ) -> HttpExecutionResponse { match self.inner.get(subgraph_name) { - Some(executor) => executor.execute(execution_request).await, + Some(executor) => executor.execute(&execution_request).await, None => { let graphql_error: GraphQLError = format!( "Subgraph executor not found for subgraph: {}", @@ -67,6 +63,7 @@ impl SubgraphExecutorMap { HttpExecutionResponse { body: buffer.freeze(), headers: Default::default(), + status: http::StatusCode::BAD_REQUEST, } } } @@ -146,6 +143,11 @@ impl SubgraphExecutorMap { .to_boxed_arc(); } + if config_arc.max_retries > 0 { + executor = RetryExecutor::new(executor, &config_arc) + .to_boxed_arc(); + } + Ok((subgraph_name, executor)) }) .collect::, SubgraphExecutorError>>()?; diff --git a/lib/executor/src/executors/mod.rs b/lib/executor/src/executors/mod.rs index 4b64bda5..a817caed 100644 --- a/lib/executor/src/executors/mod.rs +++ b/lib/executor/src/executors/mod.rs @@ -4,3 +4,4 @@ pub mod error; pub mod http; pub mod map; pub mod timeout; +pub mod retry; \ No newline at end of file diff --git a/lib/executor/src/executors/retry.rs b/lib/executor/src/executors/retry.rs new file mode 100644 index 00000000..893a245b --- /dev/null +++ b/lib/executor/src/executors/retry.rs @@ -0,0 +1,55 @@ +use async_trait::async_trait; +use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig; +use tokio_retry2::{strategy::ExponentialBackoff, Retry, RetryError}; + +use crate::executors::common::{ + HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, +}; + +pub struct RetryExecutor { + pub executor: SubgraphExecutorBoxedArc, + pub strategy: std::iter::Take, +} + +impl RetryExecutor { + pub fn new(executor: SubgraphExecutorBoxedArc, config: &TrafficShapingExecutorConfig) -> Self { + let retry_delay_as_millis = config.retry_delay.as_millis(); + let strategy = ExponentialBackoff::from_millis(retry_delay_as_millis as u64) + .factor(config.retry_factor) + .max_delay(config.retry_delay) + .take(config.max_retries + 1); // to account for the initial attempt + Self { executor, strategy } + } +} + +#[async_trait] +impl SubgraphExecutor for RetryExecutor { + async fn execute<'a>( + &self, + execution_request: &'a HttpExecutionRequest<'a>, + ) -> HttpExecutionResponse { + let action = async move || { + let result = self.executor.execute(execution_request).await; + if result.status.is_success() { + Ok(result) + } else { + let retry_after_header = result + .headers + .get("retry-after") + .and_then(|value| value.to_str().ok()) + .and_then(|s| s.parse::().ok()); + let retry_after = retry_after_header.map(std::time::Duration::from_secs); + Err(RetryError::Transient { + err: result, + retry_after, + }) + } + }; + let result = Retry::spawn(self.strategy.clone(), action).await; + + match result { + Ok(response) => response, + Err(response) => response, + } + } +} diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 1f0d276b..2347cce2 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -145,7 +145,7 @@ impl TimeoutExecutor { impl SubgraphExecutor for TimeoutExecutor { async fn execute<'a>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: &'a HttpExecutionRequest<'a>, ) -> HttpExecutionResponse { let timeout = self.get_timeout_duration(execution_request.client_request); let execution = self.executor.execute(execution_request); @@ -158,6 +158,7 @@ impl SubgraphExecutor for TimeoutExecutor { SubgraphExecutorError::RequestTimeout(timeout), ), headers: Default::default(), + status: http::StatusCode::GATEWAY_TIMEOUT, }, } } else { @@ -191,11 +192,12 @@ mod tests { impl SubgraphExecutor for MockExecutor { async fn execute<'a>( &self, - _execution_request: HttpExecutionRequest<'a>, + _execution_request: &'a HttpExecutionRequest<'a>, ) -> HttpExecutionResponse { HttpExecutionResponse { body: Default::default(), headers: Default::default(), + status: http::StatusCode::OK, } } } @@ -402,7 +404,7 @@ mod tests { }; println!("Sending request to executor with 5s timeout..."); - let response = timeout_executor.execute(execution_request).await; + let response = timeout_executor.execute(&execution_request).await; println!("Received response from executor."); assert!( diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index d634cf83..fb6d36cf 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -51,6 +51,15 @@ pub struct TrafficShapingExecutorConfig { /// ``` #[serde(default, skip_serializing_if = "Option::is_none")] pub timeout: Option, + + #[serde(default = "default_max_retries")] + pub max_retries: usize, + + #[serde(deserialize_with = "humantime_serde", default = "default_retry_delay")] + pub retry_delay: Duration, + + #[serde(default = "default_retry_factor")] + pub retry_factor: u64, } #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] @@ -74,6 +83,9 @@ impl Default for TrafficShapingExecutorConfig { pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), dedupe_enabled: default_dedupe_enabled(), timeout: None, + max_retries: 0, + retry_delay: default_retry_delay(), + retry_factor: default_retry_factor(), } } } @@ -99,3 +111,15 @@ fn default_pool_idle_timeout_seconds() -> u64 { fn default_dedupe_enabled() -> bool { true } + +fn default_max_retries() -> usize { + 0 +} + +fn default_retry_delay() -> Duration { + Duration::from_secs(1) +} + +fn default_retry_factor() -> u64 { + 1 +} \ No newline at end of file