Skip to content

Commit 776e026

Browse files
squadgazzzcow-protocol
authored andcommitted
Log original request ID when reusing shared in-flight quote requests (#4300)
## Background Fast and optimal quoters share the same `ExternalTradeFinder` instance (via `Arc<dyn TradeFinding>`), so `RequestSharing` already deduplicates identical concurrent requests. However, when a hitchhiker reused an in-flight request, there was no trace linking it back to the original HTTP call, making it hard to debug why the optimal quoter failed to provide solutions, while the fast one succeeded for basically the same quote competition or vice versa. ## Changes - Extended `RequestSharing::shared_or_else` to return a `SharedResult` that indicates whether an existing in-flight request was reused (`is_shared` flag), improving observability of request deduplication. - In `ExternalTradeFinder`, the HTTP request ID (`X-REQUEST-ID`) is now embedded in the shared future's result. When a caller (e.g. the fast or optimal quoter) reuses an in-flight request instead of sending a new one, a debug log is emitted with the `original_request_id` — making it possible to trace which HTTP request produced a given quote result. - All other `shared_or_else` callers updated to use the new `.future` field (no behavioral change). ## How to test Using staging, use the cowswap UI to get some quotes. Search for the new log.
1 parent 0a42e7c commit 776e026

File tree

3 files changed

+98
-47
lines changed

3 files changed

+98
-47
lines changed

crates/liquidity-sources/src/recent_block_cache.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ where
265265
let retries = self.maximum_retries;
266266
let delay = self.delay_between_retries;
267267
let fetcher = self.fetcher.clone();
268-
let fut = self.requests.shared_or_else((key, block), |entry| {
268+
let shared = self.requests.shared_or_else((key, block), |entry| {
269269
let (key, block) = entry.clone();
270270
async move {
271271
for _ in 0..=retries {
@@ -280,7 +280,7 @@ where
280280
}
281281
.boxed()
282282
});
283-
fut.await.context("could not fetch liquidity")
283+
shared.await.context("could not fetch liquidity")
284284
}
285285

286286
async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {

crates/price-estimation/src/trade_finding/external.rs

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,28 @@ use {
2121
futures::FutureExt,
2222
observe::tracing::distributed::headers::tracing_headers,
2323
request_sharing::{BoxRequestSharing, RequestSharing},
24-
reqwest::{Client, header},
24+
reqwest::{Client, StatusCode, header},
2525
tracing::instrument,
2626
url::Url,
2727
};
2828

29+
/// Wraps a trade result with the request ID of the HTTP request that produced
30+
/// it, so that consumers reusing a shared in-flight request can identify the
31+
/// original request.
32+
#[derive(Clone)]
33+
struct SharedTradeResponse {
34+
result: Result<TradeKind, PriceEstimationError>,
35+
request_id: Option<String>,
36+
}
37+
2938
pub struct ExternalTradeFinder {
3039
/// URL to call to in the driver to get a quote with call data for a trade.
3140
quote_endpoint: Url,
3241

3342
/// Utility to make sure no 2 identical requests are in-flight at the same
3443
/// time. Instead of issuing a duplicated request this awaits the
3544
/// response of the in-flight request.
36-
sharing: BoxRequestSharing<Query, Result<TradeKind, PriceEstimationError>>,
45+
sharing: BoxRequestSharing<Query, SharedTradeResponse>,
3746

3847
/// Client to issue http requests with.
3948
client: Client,
@@ -83,39 +92,55 @@ impl ExternalTradeFinder {
8392
request = request.header("X-Current-Block-Hash", block_hash.to_string())
8493
}
8594

86-
if let Some(id) = id {
87-
request = request.header("X-REQUEST-ID", id);
95+
if let Some(ref id) = id {
96+
request = request.header("X-REQUEST-ID", id.clone());
8897
}
8998

90-
let response = request
91-
.timeout(timeout)
92-
.send()
93-
.await
94-
.map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?;
95-
if response.status() == 429 {
96-
return Err(PriceEstimationError::RateLimited);
99+
let result = async {
100+
let response = request
101+
.timeout(timeout)
102+
.send()
103+
.await
104+
.map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?;
105+
if response.status() == StatusCode::TOO_MANY_REQUESTS {
106+
return Err(PriceEstimationError::RateLimited);
107+
}
108+
let text = response
109+
.text()
110+
.await
111+
.map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?;
112+
serde_json::from_str::<dto::QuoteKind>(&text)
113+
.map(TradeKind::from)
114+
.map_err(|err| {
115+
serde_json::from_str::<dto::Error>(&text)
116+
.map(PriceEstimationError::from)
117+
.unwrap_or_else(|_| {
118+
PriceEstimationError::EstimatorInternal(anyhow!(err))
119+
})
120+
})
121+
}
122+
.await;
123+
124+
SharedTradeResponse {
125+
result,
126+
request_id: id,
97127
}
98-
let text = response
99-
.text()
100-
.await
101-
.map_err(|err| PriceEstimationError::EstimatorInternal(anyhow!(err)))?;
102-
serde_json::from_str::<dto::QuoteKind>(&text)
103-
.map(TradeKind::from)
104-
.map_err(|err| {
105-
if let Ok(err) = serde_json::from_str::<dto::Error>(&text) {
106-
PriceEstimationError::from(err)
107-
} else {
108-
PriceEstimationError::EstimatorInternal(anyhow!(err))
109-
}
110-
})
111128
}
112129
.boxed()
113130
};
114131

115-
self.sharing
116-
.shared_or_else(query.clone(), fut)
117-
.await
118-
.map_err(TradeError::from)
132+
let shared = self.sharing.shared_or_else(query.clone(), fut);
133+
let is_shared = shared.is_shared;
134+
let response = shared.await;
135+
136+
if is_shared {
137+
tracing::debug!(
138+
original_request_id = ?response.request_id,
139+
"reusing in-flight quote request"
140+
);
141+
}
142+
143+
response.result.map_err(TradeError::from)
119144
}
120145
}
121146

crates/request-sharing/src/lib.rs

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use {
1111
collections::HashMap,
1212
future::Future,
1313
hash::Hash,
14+
pin::Pin,
1415
sync::{Arc, Mutex},
16+
task::{Context, Poll},
1517
time::Duration,
1618
},
1719
};
@@ -39,6 +41,28 @@ pub type BoxRequestSharing<Request, Response> =
3941
/// A boxed shared future.
4042
pub type BoxShared<T> = Shared<BoxFuture<'static, T>>;
4143

44+
/// Result of [`RequestSharing::shared_or_else`] indicating whether an
45+
/// already in-flight future was reused or a new one was created.
46+
///
47+
/// Implements [`Future`] so it can be awaited directly.
48+
pub struct SharedResult<Fut: Future> {
49+
future: Shared<Fut>,
50+
/// `true` when an existing in-flight request was reused instead of
51+
/// starting a new one.
52+
pub is_shared: bool,
53+
}
54+
55+
impl<Fut: Future> Future for SharedResult<Fut>
56+
where
57+
Fut::Output: Clone,
58+
{
59+
type Output = Fut::Output;
60+
61+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62+
Pin::new(&mut self.future).poll(cx)
63+
}
64+
}
65+
4266
type Cache<Request, Response> = Arc<Mutex<HashMap<Request, WeakShared<Response>>>>;
4367

4468
impl<Request: Send + 'static, Fut: Future + Send + 'static> RequestSharing<Request, Fut>
@@ -100,7 +124,7 @@ where
100124
{
101125
/// Returns an existing in flight future or creates and uses a new future
102126
/// from the specified closure.
103-
pub fn shared_or_else<F>(&self, request: Request, future: F) -> Shared<Fut>
127+
pub fn shared_or_else<F>(&self, request: Request, future: F) -> SharedResult<Fut>
104128
where
105129
F: FnOnce(&Request) -> Fut,
106130
{
@@ -113,7 +137,10 @@ where
113137
.request_sharing_access
114138
.with_label_values(&[self.request_label.as_str(), "hits"])
115139
.inc();
116-
return existing;
140+
return SharedResult {
141+
future: existing,
142+
is_shared: true,
143+
};
117144
}
118145

119146
Metrics::get()
@@ -129,7 +156,10 @@ where
129156
.request_sharing_cached_items
130157
.with_label_values(&[&self.request_label])
131158
.set(in_flight.len() as u64);
132-
shared
159+
SharedResult {
160+
future: shared,
161+
is_shared: false,
162+
}
133163
}
134164
}
135165

@@ -165,30 +195,26 @@ mod tests {
165195
request_label: label.clone(),
166196
};
167197

168-
let shared0 = sharing.shared_or_else(0, |_| futures::future::ready(0).boxed());
169-
let shared1 = sharing.shared_or_else(0, |_| async { panic!() }.boxed());
198+
let result0 = sharing.shared_or_else(0, |_| futures::future::ready(0).boxed());
199+
let result1 = sharing.shared_or_else(0, |_| async { panic!() }.boxed());
170200

171-
assert!(shared0.ptr_eq(&shared1));
172-
assert_eq!(shared0.strong_count().unwrap(), 2);
173-
assert_eq!(shared1.strong_count().unwrap(), 2);
174-
assert_eq!(shared0.weak_count().unwrap(), 1);
201+
assert!(!result0.is_shared);
202+
assert!(result1.is_shared);
175203

176-
// complete first shared
177-
assert_eq!(shared0.now_or_never().unwrap(), 0);
178-
assert_eq!(shared1.strong_count().unwrap(), 1);
179-
assert_eq!(shared1.weak_count().unwrap(), 1);
204+
// Complete first shared — result1 still holds a reference.
205+
assert_eq!(result0.await, 0);
180206

181-
// GC does not delete any keys because some tasks still use the future
207+
// GC does not delete because result1 still references the future.
182208
RequestSharing::collect_garbage(&sharing.in_flight, &label);
183209
assert_eq!(sharing.in_flight.lock().unwrap().len(), 1);
184210
assert!(sharing.in_flight.lock().unwrap().get(&0).is_some());
185211

186-
// complete second shared
187-
assert_eq!(shared1.now_or_never().unwrap(), 0);
212+
// Complete second shared — proves sharing since its factory would panic.
213+
assert_eq!(result1.await, 0);
188214

189215
RequestSharing::collect_garbage(&sharing.in_flight, &label);
190216

191-
// GC deleted all now unused futures
217+
// GC deleted all now unused futures.
192218
assert!(sharing.in_flight.lock().unwrap().is_empty());
193219
}
194220
}

0 commit comments

Comments
 (0)