Skip to content

Commit 10d408c

Browse files
committed
Subgraph Retry
1 parent aa71421 commit 10d408c

File tree

9 files changed

+136
-14
lines changed

9 files changed

+136
-14
lines changed

Cargo.lock

Lines changed: 31 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/executor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ itoa = "1.0.15"
4747
ryu = "1.0.20"
4848
indexmap = "2.10.0"
4949
bumpalo = "3.19.0"
50+
tokio-retry2 = "0.6.0"
5051

5152
[dev-dependencies]
5253
subgraphs = { path = "../../bench/subgraphs" }

lib/executor/src/executors/common.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ use std::{collections::HashMap, sync::Arc};
22

33
use async_trait::async_trait;
44
use bytes::Bytes;
5-
use http::HeaderMap;
5+
use http::{HeaderMap, StatusCode};
66

77
use crate::execution::plan::ClientRequestDetails;
88

99
#[async_trait]
1010
pub trait SubgraphExecutor {
1111
async fn execute<'a>(
1212
&self,
13-
execution_request: HttpExecutionRequest<'a>,
13+
execution_request: &'a HttpExecutionRequest<'a>,
1414
) -> HttpExecutionResponse;
1515
fn to_boxed_arc<'a>(self) -> Arc<Box<dyn SubgraphExecutor + Send + Sync + 'a>>
1616
where
@@ -38,4 +38,5 @@ pub struct HttpExecutionRequest<'a> {
3838
pub struct HttpExecutionResponse {
3939
pub body: Bytes,
4040
pub headers: HeaderMap,
41+
pub status: StatusCode,
4142
}

lib/executor/src/executors/http.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,19 +159,20 @@ impl HTTPSubgraphExecutor {
159159
impl SubgraphExecutor for HTTPSubgraphExecutor {
160160
async fn execute<'a>(
161161
&self,
162-
execution_request: HttpExecutionRequest<'a>,
162+
execution_request: &'a HttpExecutionRequest<'a>,
163163
) -> HttpExecutionResponse {
164-
let body = match self.build_request_body(&execution_request) {
164+
let body = match self.build_request_body(execution_request) {
165165
Ok(body) => body,
166166
Err(e) => {
167167
return HttpExecutionResponse {
168168
body: error_to_graphql_bytes(&self.endpoint, e),
169169
headers: Default::default(),
170+
status: http::StatusCode::BAD_REQUEST,
170171
}
171172
}
172173
};
173174

174-
let mut headers = execution_request.headers;
175+
let mut headers = execution_request.headers.clone();
175176
self.header_map.iter().for_each(|(key, value)| {
176177
headers.insert(key, value.clone());
177178
});
@@ -184,10 +185,12 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
184185
Ok(shared_response) => HttpExecutionResponse {
185186
body: shared_response.body,
186187
headers: shared_response.headers,
188+
status: shared_response.status,
187189
},
188190
Err(e) => HttpExecutionResponse {
189191
body: error_to_graphql_bytes(&self.endpoint, e),
190192
headers: Default::default(),
193+
status: http::StatusCode::BAD_REQUEST,
191194
},
192195
};
193196
}
@@ -223,10 +226,12 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
223226
Ok(shared_response) => HttpExecutionResponse {
224227
body: shared_response.body.clone(),
225228
headers: shared_response.headers.clone(),
229+
status: shared_response.status,
226230
},
227231
Err(e) => HttpExecutionResponse {
228232
body: error_to_graphql_bytes(&self.endpoint, e.clone()),
229233
headers: Default::default(),
234+
status: http::StatusCode::BAD_REQUEST,
230235
},
231236
}
232237
}

lib/executor/src/executors/map.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,7 @@ use crate::{
1616
executors::{
1717
common::{
1818
HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
19-
},
20-
dedupe::{ABuildHasher, SharedResponse},
21-
error::SubgraphExecutorError,
22-
http::HTTPSubgraphExecutor,
23-
timeout::TimeoutExecutor,
19+
}, dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, http::HTTPSubgraphExecutor, retry::RetryExecutor, timeout::TimeoutExecutor
2420
},
2521
response::graphql_error::GraphQLError,
2622
};
@@ -48,7 +44,7 @@ impl SubgraphExecutorMap {
4844
execution_request: HttpExecutionRequest<'a>,
4945
) -> HttpExecutionResponse {
5046
match self.inner.get(subgraph_name) {
51-
Some(executor) => executor.execute(execution_request).await,
47+
Some(executor) => executor.execute(&execution_request).await,
5248
None => {
5349
let graphql_error: GraphQLError = format!(
5450
"Subgraph executor not found for subgraph: {}",
@@ -65,6 +61,7 @@ impl SubgraphExecutorMap {
6561
HttpExecutionResponse {
6662
body: buffer.freeze(),
6763
headers: Default::default(),
64+
status: http::StatusCode::BAD_REQUEST,
6865
}
6966
}
7067
}
@@ -145,6 +142,11 @@ impl SubgraphExecutorMap {
145142
.to_boxed_arc();
146143
}
147144

145+
if config_arc.max_retries > 0 {
146+
executor = RetryExecutor::new(executor, &config_arc)
147+
.to_boxed_arc();
148+
}
149+
148150
Ok((subgraph_name, executor))
149151
})
150152
.collect::<Result<HashMap<_, _>, SubgraphExecutorError>>()?;

lib/executor/src/executors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ pub mod error;
44
pub mod http;
55
pub mod map;
66
pub mod timeout;
7+
pub mod retry;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use async_trait::async_trait;
2+
use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig;
3+
use tokio_retry2::{strategy::ExponentialBackoff, Retry, RetryError};
4+
5+
use crate::executors::common::{
6+
HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
7+
};
8+
9+
pub struct RetryExecutor {
10+
pub executor: SubgraphExecutorBoxedArc,
11+
pub strategy: std::iter::Take<ExponentialBackoff>,
12+
}
13+
14+
impl RetryExecutor {
15+
pub fn new(executor: SubgraphExecutorBoxedArc, config: &TrafficShapingExecutorConfig) -> Self {
16+
let retry_delay_as_millis = config.retry_delay.as_millis();
17+
let strategy = ExponentialBackoff::from_millis(retry_delay_as_millis as u64)
18+
.factor(config.retry_factor)
19+
.max_delay(config.retry_delay)
20+
.take(config.max_retries + 1); // to account for the initial attempt
21+
Self { executor, strategy }
22+
}
23+
}
24+
25+
#[async_trait]
26+
impl SubgraphExecutor for RetryExecutor {
27+
async fn execute<'a>(
28+
&self,
29+
execution_request: &'a HttpExecutionRequest<'a>,
30+
) -> HttpExecutionResponse {
31+
let action = async move || {
32+
let result = self.executor.execute(execution_request).await;
33+
if result.status.is_success() {
34+
Ok(result)
35+
} else {
36+
let retry_after_header = result
37+
.headers
38+
.get("retry-after")
39+
.and_then(|value| value.to_str().ok())
40+
.and_then(|s| s.parse::<u64>().ok());
41+
let retry_after = retry_after_header.map(std::time::Duration::from_secs);
42+
Err(RetryError::Transient {
43+
err: result,
44+
retry_after,
45+
})
46+
}
47+
};
48+
let result = Retry::spawn(self.strategy.clone(), action).await;
49+
50+
match result {
51+
Ok(response) => response,
52+
Err(response) => response,
53+
}
54+
}
55+
}

lib/executor/src/executors/timeout.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ impl TimeoutExecutor {
145145
impl SubgraphExecutor for TimeoutExecutor {
146146
async fn execute<'a>(
147147
&self,
148-
execution_request: HttpExecutionRequest<'a>,
148+
execution_request: &'a HttpExecutionRequest<'a>,
149149
) -> HttpExecutionResponse {
150150
let timeout = self.get_timeout_duration(execution_request.client_request);
151151
let execution = self.executor.execute(execution_request);
@@ -158,6 +158,7 @@ impl SubgraphExecutor for TimeoutExecutor {
158158
SubgraphExecutorError::RequestTimeout(timeout),
159159
),
160160
headers: Default::default(),
161+
status: http::StatusCode::GATEWAY_TIMEOUT,
161162
},
162163
}
163164
} else {
@@ -191,11 +192,12 @@ mod tests {
191192
impl SubgraphExecutor for MockExecutor {
192193
async fn execute<'a>(
193194
&self,
194-
_execution_request: HttpExecutionRequest<'a>,
195+
_execution_request: &'a HttpExecutionRequest<'a>,
195196
) -> HttpExecutionResponse {
196197
HttpExecutionResponse {
197198
body: Default::default(),
198199
headers: Default::default(),
200+
status: http::StatusCode::OK,
199201
}
200202
}
201203
}
@@ -402,7 +404,7 @@ mod tests {
402404
};
403405

404406
println!("Sending request to executor with 5s timeout...");
405-
let response = timeout_executor.execute(execution_request).await;
407+
let response = timeout_executor.execute(&execution_request).await;
406408

407409
println!("Received response from executor.");
408410
assert!(

lib/router-config/src/traffic_shaping.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ pub struct TrafficShapingExecutorConfig {
4040
/// ```
4141
#[serde(default, skip_serializing_if = "Option::is_none")]
4242
pub timeout: Option<SubgraphTimeoutConfig>,
43+
44+
#[serde(default = "default_max_retries")]
45+
pub max_retries: usize,
46+
47+
#[serde(deserialize_with = "humantime_serde", default = "default_retry_delay")]
48+
pub retry_delay: Duration,
49+
50+
#[serde(default = "default_retry_factor")]
51+
pub retry_factor: u64,
4352
}
4453

4554
#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
@@ -64,6 +73,9 @@ impl Default for TrafficShapingExecutorConfig {
6473
pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(),
6574
dedupe_enabled: default_dedupe_enabled(),
6675
timeout: None,
76+
max_retries: 0,
77+
retry_delay: default_retry_delay(),
78+
retry_factor: default_retry_factor(),
6779
}
6880
}
6981
}
@@ -79,3 +91,15 @@ fn default_pool_idle_timeout_seconds() -> u64 {
7991
fn default_dedupe_enabled() -> bool {
8092
true
8193
}
94+
95+
fn default_max_retries() -> usize {
96+
0
97+
}
98+
99+
fn default_retry_delay() -> Duration {
100+
Duration::from_secs(1)
101+
}
102+
103+
fn default_retry_factor() -> u64 {
104+
1
105+
}

0 commit comments

Comments
 (0)