Skip to content

Commit db4fdbd

Browse files
committed
refactor: misc cleaning on the submit task
1 parent eb33cb3 commit db4fdbd

File tree

1 file changed

+95
-73
lines changed

1 file changed

+95
-73
lines changed

src/tasks/submit.rs

+95-73
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ use alloy::{
1515
sol_types::{SolCall, SolError},
1616
transports::TransportError,
1717
};
18-
use eyre::{bail, eyre};
18+
use eyre::{Context, bail, eyre};
1919
use init4_bin_base::deps::{
2020
metrics::{counter, histogram},
21-
tracing::{self, debug, error, info, instrument, trace, warn},
21+
tracing::{self, Instrument, debug, debug_span, error, info, instrument, trace, warn},
2222
};
2323
use oauth2::TokenResponse;
2424
use signet_sim::BuiltBlock;
@@ -178,7 +178,10 @@ impl SubmitTask {
178178
"error in transaction submission"
179179
);
180180

181-
if e.as_revert_data() == Some(IncorrectHostBlock::SELECTOR.into()) {
181+
if e.as_revert_data()
182+
.map(|data| data.starts_with(&IncorrectHostBlock::SELECTOR))
183+
.unwrap_or_default()
184+
{
182185
return Ok(ControlFlow::Retry);
183186
}
184187

@@ -234,15 +237,33 @@ impl SubmitTask {
234237
Ok(ControlFlow::Done)
235238
}
236239

237-
#[instrument(skip_all, err)]
240+
/// Sign with a local signer if available, otherwise ask quincey
241+
/// for a signature (politely).
242+
#[instrument(skip_all, fields(is_local = self.sequencer_signer.is_some()))]
243+
async fn get_signature(&self, req: SignRequest) -> eyre::Result<SignResponse> {
244+
let sig = if let Some(signer) = &self.sequencer_signer {
245+
signer.sign_hash(&req.signing_hash()).await?
246+
} else {
247+
self.sup_quincey(&req)
248+
.await
249+
.wrap_err("failed to get signature from quincey")
250+
.inspect(|_| {
251+
counter!("builder.quincey_signature_acquired").increment(1);
252+
})?
253+
.sig
254+
};
255+
256+
debug!(sig = hex::encode(sig.as_bytes()), "acquired signature");
257+
Ok(SignResponse { req, sig })
258+
}
259+
260+
#[instrument(skip_all)]
238261
async fn handle_inbound(&self, block: &BuiltBlock) -> eyre::Result<ControlFlow> {
239262
info!(txns = block.tx_count(), "handling inbound block");
240-
let sig_request = match self.construct_sig_request(block).await {
241-
Ok(sig_request) => sig_request,
242-
Err(e) => {
243-
error!(error = %e, "error constructing signature request");
244-
return Ok(ControlFlow::Skip);
245-
}
263+
let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| {
264+
error!(error = %e, "error constructing signature request");
265+
}) else {
266+
return Ok(ControlFlow::Skip);
246267
};
247268

248269
debug!(
@@ -251,76 +272,77 @@ impl SubmitTask {
251272
"constructed signature request for host block"
252273
);
253274

254-
// If configured with a local signer, we use it. Otherwise, we ask
255-
// quincey (politely)
256-
let signed = if let Some(signer) = &self.sequencer_signer {
257-
let sig = signer.sign_hash(&sig_request.signing_hash()).await?;
258-
debug!(sig = hex::encode(sig.as_bytes()), "acquired signature from local signer");
259-
SignResponse { req: sig_request, sig }
260-
} else {
261-
let resp: SignResponse = match self.sup_quincey(&sig_request).await {
262-
Ok(resp) => resp,
263-
Err(e) => {
264-
error!(error = %e, "error acquiring signature from quincey");
265-
return Ok(ControlFlow::Retry);
266-
}
267-
};
268-
debug!(sig = hex::encode(resp.sig.as_bytes()), "acquired signature from quincey");
269-
counter!("builder.quincey_signature_acquired").increment(1);
270-
resp
271-
};
275+
let signed = self.get_signature(sig_request).await?;
272276

273277
self.submit_transaction(&signed, block).await
274278
}
275279

276-
/// Spawns the in progress block building task
277-
pub fn spawn(self) -> (mpsc::UnboundedSender<BuiltBlock>, JoinHandle<()>) {
278-
let (sender, mut inbound) = mpsc::unbounded_channel();
279-
let handle = tokio::spawn(async move {
280-
loop {
281-
if let Some(in_progress) = inbound.recv().await {
282-
let building_start_time = Instant::now();
283-
let mut retries = 0;
284-
loop {
285-
match self.handle_inbound(&in_progress).await {
286-
Ok(ControlFlow::Retry) => {
287-
retries += 1;
288-
if retries > 3 {
289-
counter!("builder.building_too_many_retries").increment(1);
290-
histogram!("builder.block_build_time")
291-
.record(building_start_time.elapsed().as_millis() as f64);
292-
error!("error handling inbound block: too many retries");
293-
break;
294-
}
295-
error!("error handling inbound block: retrying");
296-
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
297-
}
298-
Ok(ControlFlow::Skip) => {
299-
histogram!("builder.block_build_time")
300-
.record(building_start_time.elapsed().as_millis() as f64);
301-
counter!("builder.skipped_blocks").increment(1);
302-
info!("skipping block");
303-
break;
304-
}
305-
Ok(ControlFlow::Done) => {
306-
histogram!("builder.block_build_time")
307-
.record(building_start_time.elapsed().as_millis() as f64);
308-
counter!("builder.submitted_successful_blocks").increment(1);
309-
info!("block landed successfully");
310-
break;
311-
}
312-
Err(e) => {
313-
error!(error = %e, "error handling inbound block");
314-
break;
315-
}
316-
}
280+
async fn retrying_handle_inbound(
281+
&self,
282+
block: &BuiltBlock,
283+
retry_limit: usize,
284+
) -> eyre::Result<ControlFlow> {
285+
let mut retries = 0;
286+
let building_start_time = Instant::now();
287+
288+
let result = loop {
289+
let span = debug_span!("SubmitTask::retrying_handle_inbound", retries);
290+
291+
let result =
292+
self.handle_inbound(block).instrument(span.clone()).await.inspect_err(|e| {
293+
error!(error = %e, "error handling inbound block");
294+
})?;
295+
296+
let guard = span.entered();
297+
298+
match result {
299+
ControlFlow::Retry => {
300+
retries += 1;
301+
if retries > retry_limit {
302+
counter!("builder.building_too_many_retries").increment(1);
303+
return Ok(ControlFlow::Skip);
317304
}
318-
} else {
319-
debug!("upstream task gone");
320-
break;
305+
error!("error handling inbound block: retrying");
306+
drop(guard);
307+
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
308+
309+
continue;
310+
}
311+
ControlFlow::Skip => {
312+
counter!("builder.skipped_blocks").increment(1);
313+
break result;
314+
}
315+
ControlFlow::Done => {
316+
counter!("builder.submitted_successful_blocks").increment(1);
317+
break result;
321318
}
322319
}
323-
});
320+
};
321+
322+
// This is reached when `Done` or `Skip` is returned
323+
histogram!("builder.block_build_time")
324+
.record(building_start_time.elapsed().as_millis() as f64);
325+
info!(?result, "finished block building");
326+
Ok(result)
327+
}
328+
329+
async fn task_future(self, mut inbound: mpsc::UnboundedReceiver<BuiltBlock>) {
330+
loop {
331+
let Some(block) = inbound.recv().await else {
332+
debug!("upstream task gone");
333+
break;
334+
};
335+
336+
if self.retrying_handle_inbound(&block, 3).await.is_err() {
337+
break;
338+
}
339+
}
340+
}
341+
342+
/// Spawns the in progress block building task
343+
pub fn spawn(self) -> (mpsc::UnboundedSender<BuiltBlock>, JoinHandle<()>) {
344+
let (sender, inbound) = mpsc::unbounded_channel();
345+
let handle = tokio::spawn(self.task_future(inbound));
324346

325347
(sender, handle)
326348
}

0 commit comments

Comments
 (0)