Skip to content
210 changes: 84 additions & 126 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,16 @@ use {
anyhow::Context as _,
axum::{body::Body, http::Request},
eth_domain_types as eth,
futures::{FutureExt, StreamExt, future::Either, stream::FuturesUnordered},
futures::{FutureExt, StreamExt, stream::FuturesUnordered},
itertools::Itertools,
simulator::{RevertError, Simulator, SimulatorError},
std::{
cmp::Reverse,
collections::{HashMap, HashSet, VecDeque},
sync::{Arc, Mutex},
time::{Duration, Instant},
},
tokio::{
sync::{mpsc, oneshot},
task,
time::Instant,
},
tokio::{sync::mpsc, task},
tracing::{Instrument, instrument},
};

Expand Down Expand Up @@ -70,6 +67,10 @@ struct SubmitterPool {
direct_slot: Arc<tokio::sync::Semaphore>,
/// EIP-7702 submission accounts. `None` in legacy single-EOA mode.
delegated: Option<DelegatedSlots>,
/// Limits total in-flight settle requests (including those waiting for a
/// pool slot). This replaces the old settle-queue-based admission check
/// and allows buffering requests beyond the number of physical slots.
admission: Arc<tokio::sync::Semaphore>,
solver_address: eth::Address,
}

Expand All @@ -80,7 +81,12 @@ struct DelegatedSlots {
}

impl SubmitterPool {
fn new(solver_address: eth::Address, submission_accounts: Vec<Account>) -> Self {
fn new(
solver_address: eth::Address,
submission_accounts: Vec<Account>,
settle_queue_size: usize,
) -> Self {
let num_delegated = submission_accounts.len();
let delegated = if submission_accounts.is_empty() {
None
} else {
Expand All @@ -94,9 +100,12 @@ impl SubmitterPool {
acquire: tokio::sync::Mutex::new(rx),
})
};
let total_slots = 1 + num_delegated;
let admission_capacity = total_slots + settle_queue_size;
Self {
direct_slot: Arc::new(tokio::sync::Semaphore::new(1)),
delegated,
admission: Arc::new(tokio::sync::Semaphore::new(admission_capacity)),
solver_address,
}
}
Expand Down Expand Up @@ -152,21 +161,10 @@ impl SubmitterPool {
})
}

fn total_slots(&self) -> usize {
// 1 slot for solver EOA + number of delegated EIP7702 accounts (if any)
1 + self
.delegated
.as_ref()
.map_or(0, |d| d.release.max_capacity())
}

fn has_capacity(&self) -> bool {
if self.direct_slot.available_permits() > 0 {
return true;
}
self.delegated
.as_ref()
.is_some_and(|d| d.release.capacity() < d.release.max_capacity())
/// Try to reserve an admission permit without blocking. Returns `None` if
/// the maximum number of in-flight settle requests has been reached.
fn try_admit(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
Arc::clone(&self.admission).try_acquire_owned().ok()
}
}

Expand Down Expand Up @@ -215,6 +213,40 @@ impl Drop for SubmitterGuard {
}
}

/// Wrapper around a spawned settlement task's [`JoinHandle`]. When dropped
/// (e.g. because the HTTP handler was cancelled by the autopilot), the task is
/// aborted after a short grace period to allow cleanup (e.g. cancelling a
/// pending mempool tx).
struct SettleTaskHandle(task::JoinHandle<Result<Settled, Error>>);

impl std::future::Future for SettleTaskHandle {
type Output = Result<Settled, Error>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
std::pin::Pin::new(&mut self.0).poll(cx).map(|join_result| {
join_result.map_err(|err| {
tracing::error!(?err, "settle task panicked");
Error::SubmissionError
})?
})
}
}

impl Drop for SettleTaskHandle {
fn drop(&mut self) {
if !self.0.is_finished() {
let handle = self.0.abort_handle();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
handle.abort();
});
}
}
}

#[derive(Debug)]
pub struct Competition {
pub solver: Solver,
Expand All @@ -228,7 +260,6 @@ pub struct Competition {
/// bad token and orders detector
pub risk_detector: Arc<risk_detector::Detector>,
fetcher: Arc<pre_processing::DataAggregator>,
settle_queue: mpsc::Sender<SettleRequest>,
order_sorting_strategies: Vec<Arc<dyn sorting::SortingStrategy>>,
submitter_pool: SubmitterPool,
}
Expand All @@ -253,33 +284,23 @@ impl Competition {
"EIP-7702 parallel submission enabled"
);
}
let submitter_pool = SubmitterPool::new(solver.address(), submission_accounts);
let queue_size = submitter_pool.total_slots().max(solver.settle_queue_size());
let (settle_sender, settle_receiver) = mpsc::channel(queue_size);
let settle_queue_size = solver.settle_queue_size();
let submitter_pool =
SubmitterPool::new(solver.address(), submission_accounts, settle_queue_size);

let competition = Arc::new(Self {
Arc::new(Self {
solver,
eth,
liquidity,
liquidity_sources_notifier,
simulator,
mempools,
settlements: Default::default(),
settle_queue: settle_sender,
risk_detector,
fetcher,
order_sorting_strategies,
submitter_pool,
});

let competition_clone = Arc::clone(&competition);
tokio::spawn(async move {
competition_clone
.process_settle_requests(settle_receiver)
.await;
});

competition
})
}

/// Solve an auction as part of this competition.
Expand Down Expand Up @@ -744,103 +765,48 @@ impl Competition {
/// Execute the solution generated as part of this competition. Use
/// [`Competition::solve`] to generate the solution.
pub async fn settle(
&self,
self: &Arc<Self>,
auction_id: auction::Id,
solution_id: u64,
submission_deadline: BlockNo,
) -> Result<Settled, Error> {
let (response_sender, response_receiver) = oneshot::channel();

let request = SettleRequest {
auction_id,
solution_id,
submission_deadline,
response_sender,
tracing_span: tracing::Span::current(),
};

self.settle_queue.try_send(request).map_err(|err| {
tracing::warn!(?err, "Failed to enqueue /settle request");
let admission_permit = self.submitter_pool.try_admit().ok_or_else(|| {
tracing::warn!("too many pending settlements; settle request rejected");
Error::TooManyPendingSettlements
})?;

response_receiver.await.map_err(|err| {
tracing::error!(?err, "Failed to dequeue /settle response");
Error::SubmissionError
})?
if self.eth.current_block().borrow().number >= submission_deadline.0 {
return Err(DeadlineExceeded.into());
}

let this = Arc::clone(self);
let tracing_span = tracing::Span::current();
// Spawn as a separate task to enable concurrent EIP-7702 submissions.
// SettleTaskHandle aborts the task (with a grace period) if the caller
// disconnects.
let handle = tokio::spawn(
Comment thread
fafk marked this conversation as resolved.
async move {
let result = this
.process_settle_request(auction_id, solution_id, submission_deadline)
.await;
observe::settled(this.solver.name(), &result);
drop(admission_permit);
result
}
.instrument(tracing_span),
);
SettleTaskHandle(handle).await
}

pub fn ensure_settle_queue_capacity(&self) -> Result<(), Error> {
if !self.submitter_pool.has_capacity() {
if self.submitter_pool.admission.available_permits() == 0 {
tracing::warn!("no idle submission slots; auction is rejected");
Err(Error::TooManyPendingSettlements)
} else {
Ok(())
}
}

async fn process_settle_requests(
self: Arc<Self>,
mut settle_receiver: mpsc::Receiver<SettleRequest>,
) {
while let Some(request) = settle_receiver.recv().await {
// When only the direct solver EOA slot exists and no delegated accounts are set
// up, the pool's acquire() blocks until the slot is free, serializing
// settlements.
let this = Arc::clone(&self);
tokio::spawn(async move {
this.handle_settle_request(request).await;
});
}
}

async fn handle_settle_request(self: &Arc<Self>, request: SettleRequest) {
let SettleRequest {
auction_id,
solution_id,
submission_deadline,
mut response_sender,
tracing_span,
} = request;
async {
if self.eth.current_block().borrow().number >= submission_deadline.0 {
if let Err(err) = response_sender.send(Err(DeadlineExceeded.into())) {
tracing::error!(
?err,
"settle deadline exceeded. unable to return a response"
);
}
return;
}

observe::settling();
let settle_fut =
Box::pin(self.process_settle_request(auction_id, solution_id, submission_deadline));
let closed_fut = Box::pin(response_sender.closed());
let result = match futures::future::select(closed_fut, settle_fut).await {
// Cancel the settlement task if the sender is closed (client likely
// disconnected). This is a fallback to recover from issues
// like a stuck driver (e.g., stalled block stream).
Either::Left((_closed, settle_fut)) => {
tracing::debug!("autopilot terminated settle call");
// Add a grace period to give driver the last chance to cancel the
// tx if needed.
tokio::time::timeout(Duration::from_secs(1), settle_fut)
.await
.unwrap_or_else(|_| {
tracing::error!("didn't finish tx submission within grace period");
Err(DeadlineExceeded.into())
})
}
Either::Right((res, _)) => res,
};
observe::settled(self.solver.name(), &result);
let _ = response_sender.send(result);
}
.instrument(tracing_span)
.await
}

async fn process_settle_request(
&self,
auction_id: auction::Id,
Expand Down Expand Up @@ -989,14 +955,6 @@ fn merge(
merged
}

struct SettleRequest {
auction_id: auction::Id,
solution_id: u64,
submission_deadline: BlockNo,
response_sender: oneshot::Sender<Result<Settled, Error>>,
tracing_span: tracing::Span,
}

/// Solution information sent to the protocol by the driver before the solution
/// ranking happens.
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl State {
&self.0.solver
}

fn competition(&self) -> &domain::Competition {
fn competition(&self) -> &Arc<domain::Competition> {
&self.0.competition
}

Expand Down
Loading
Loading