Skip to content

Commit a0e2838

Browse files
fix: Fix block processing bottleneck (#395)
* feat: Adding tolerance for fast chains * test: Adding unit test cases * chore: Adding tolerance to network examples * fix: Removing old logic and moving to sync pipeline * fix: Logic for checking missing blocks * fix: Removing tracing info logs * test: Fixing tests * fix: Improving logic to save fetched blocks * fix: Improving logic to detect duplicates/unordered * test: Adding test cases * fix: Block tracking for fast networks * fix: Minor fix and test cases * fix: Extending test cases * test: Improve integration tests --------- Co-authored-by: shahnami <[email protected]>
1 parent 97d717d commit a0e2838

File tree

7 files changed

+1757
-284
lines changed

7 files changed

+1757
-284
lines changed

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ async fn main() -> Result<()> {
345345
file_block_storage.clone(),
346346
block_handler,
347347
trigger_handler,
348-
Arc::new(BlockTracker::new(1000, Some(file_block_storage.clone()))),
348+
Arc::new(BlockTracker::new(1000)),
349349
)
350350
.await?;
351351

src/services/blockwatcher/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ pub use service::{
1616
process_new_blocks, BlockWatcherService, JobSchedulerTrait, NetworkBlockWatcher,
1717
};
1818
pub use storage::{BlockStorage, FileBlockStorage};
19-
pub use tracker::{BlockTracker, BlockTrackerTrait};
19+
pub use tracker::{BlockCheckResult, BlockTracker, BlockTrackerTrait};

src/services/blockwatcher/service.rs

Lines changed: 106 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
blockwatcher::{
2121
error::BlockWatcherError,
2222
storage::BlockStorage,
23-
tracker::{BlockTracker, BlockTrackerTrait},
23+
tracker::{BlockCheckResult, BlockTracker, BlockTrackerTrait},
2424
},
2525
},
2626
};
@@ -77,7 +77,7 @@ where
7777
pub block_handler: Arc<H>,
7878
pub trigger_handler: Arc<T>,
7979
pub scheduler: J,
80-
pub block_tracker: Arc<BlockTracker<S>>,
80+
pub block_tracker: Arc<BlockTracker>,
8181
}
8282

8383
/// Map of active block watchers
@@ -101,7 +101,7 @@ where
101101
pub block_handler: Arc<H>,
102102
pub trigger_handler: Arc<T>,
103103
pub active_watchers: Arc<RwLock<BlockWatchersMap<S, H, T, J>>>,
104-
pub block_tracker: Arc<BlockTracker<S>>,
104+
pub block_tracker: Arc<BlockTracker>,
105105
}
106106

107107
impl<S, H, T, J> NetworkBlockWatcher<S, H, T, J>
@@ -125,7 +125,7 @@ where
125125
block_storage: Arc<S>,
126126
block_handler: Arc<H>,
127127
trigger_handler: Arc<T>,
128-
block_tracker: Arc<BlockTracker<S>>,
128+
block_tracker: Arc<BlockTracker>,
129129
) -> Result<Self, BlockWatcherError> {
130130
let scheduler = J::new().await.map_err(|e| {
131131
BlockWatcherError::scheduler_error(
@@ -255,7 +255,7 @@ where
255255
block_storage: Arc<S>,
256256
block_handler: Arc<H>,
257257
trigger_handler: Arc<T>,
258-
block_tracker: Arc<BlockTracker<S>>,
258+
block_tracker: Arc<BlockTracker>,
259259
) -> Result<Self, BlockWatcherError> {
260260
Ok(BlockWatcherService {
261261
block_storage,
@@ -333,7 +333,7 @@ pub async fn process_new_blocks<
333333
C: BlockChainClient + Send + Clone + 'static,
334334
H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
335335
T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
336-
TR: BlockTrackerTrait<S>,
336+
TR: BlockTrackerTrait + Send + Sync + 'static,
337337
>(
338338
network: &Network,
339339
rpc_client: &C,
@@ -403,6 +403,34 @@ pub async fn process_new_blocks<
403403
})?;
404404
}
405405

406+
// Reset expected_next to start_block to ensure synchronization with this execution
407+
// This prevents false out-of-order warnings when reprocessing blocks or restarting
408+
block_tracker
409+
.reset_expected_next(network, start_block)
410+
.await;
411+
412+
// Detect missing blocks using BlockTracker
413+
let missed_blocks = block_tracker.detect_missing_blocks(network, &blocks).await;
414+
415+
// Log and save missed blocks if any
416+
if !missed_blocks.is_empty() {
417+
tracing::error!(
418+
network = %network.slug,
419+
count = missed_blocks.len(),
420+
"Missed {} blocks: {:?}",
421+
missed_blocks.len(),
422+
missed_blocks
423+
);
424+
425+
// Save missed blocks in batch
426+
if network.store_blocks.unwrap_or(false) {
427+
block_storage
428+
.save_missed_blocks(&network.slug, &missed_blocks)
429+
.await
430+
.with_context(|| format!("Failed to save {} missed blocks", missed_blocks.len()))?;
431+
}
432+
}
433+
406434
// Create channels for our pipeline
407435
let (process_tx, process_rx) = mpsc::channel::<(BlockType, u64)>(blocks.len() * 2);
408436
let (trigger_tx, trigger_rx) = mpsc::channel::<ProcessedBlock>(blocks.len() * 2);
@@ -437,21 +465,60 @@ pub async fn process_new_blocks<
437465

438466
// Stage 2: Trigger Pipeline
439467
let trigger_handle = tokio::spawn({
468+
let network = network.clone();
440469
let trigger_handler = trigger_handler.clone();
470+
let block_tracker = block_tracker.clone();
441471

442472
async move {
443473
let mut trigger_rx = trigger_rx;
444474
let mut pending_blocks = BTreeMap::new();
445475
let mut next_block_number = Some(start_block);
476+
let block_tracker = block_tracker.clone();
446477

447478
// Process all incoming blocks
448479
while let Some(processed_block) = trigger_rx.next().await {
449480
let block_number = processed_block.block_number;
481+
482+
// Buffer the block - we'll check and execute in order
450483
pending_blocks.insert(block_number, processed_block);
451484

452485
// Process blocks in order as long as we have the next expected block
453486
while let Some(expected) = next_block_number {
454487
if let Some(block) = pending_blocks.remove(&expected) {
488+
// Check for duplicate or out-of-order blocks when actually executing
489+
// This ensures we're checking the execution order, not arrival order
490+
match block_tracker
491+
.check_processed_block(&network, expected)
492+
.await
493+
{
494+
BlockCheckResult::Ok => {
495+
// Block is valid, execute it
496+
}
497+
BlockCheckResult::Duplicate { last_seen } => {
498+
tracing::error!(
499+
network = %network.slug,
500+
block_number = expected,
501+
last_seen = last_seen,
502+
"Duplicate block detected: received block {} again (last seen: {})",
503+
expected,
504+
last_seen
505+
);
506+
}
507+
BlockCheckResult::OutOfOrder {
508+
expected: exp,
509+
received,
510+
} => {
511+
tracing::warn!(
512+
network = %network.slug,
513+
block_number = received,
514+
expected = exp,
515+
"Out of order block detected: received {} but expected {}",
516+
received,
517+
exp
518+
);
519+
}
520+
}
521+
455522
(trigger_handler)(&block);
456523
next_block_number = Some(expected + 1);
457524
} else {
@@ -463,6 +530,39 @@ pub async fn process_new_blocks<
463530
// Process any remaining blocks in order after the channel is closed
464531
while let Some(min_block) = pending_blocks.keys().next().copied() {
465532
if let Some(block) = pending_blocks.remove(&min_block) {
533+
// Check for duplicate or out-of-order blocks when executing
534+
match block_tracker
535+
.check_processed_block(&network, min_block)
536+
.await
537+
{
538+
BlockCheckResult::Ok => {
539+
// Block is valid, execute it
540+
}
541+
BlockCheckResult::Duplicate { last_seen } => {
542+
tracing::error!(
543+
network = %network.slug,
544+
block_number = min_block,
545+
last_seen = last_seen,
546+
"Duplicate block detected: received block {} again (last seen: {})",
547+
min_block,
548+
last_seen
549+
);
550+
}
551+
BlockCheckResult::OutOfOrder {
552+
expected: exp,
553+
received,
554+
} => {
555+
tracing::warn!(
556+
network = %network.slug,
557+
block_number = received,
558+
expected = exp,
559+
"Out of order block detected: received {} but expected {}",
560+
received,
561+
exp
562+
);
563+
}
564+
}
565+
466566
(trigger_handler)(&block);
467567
}
468568
}
@@ -472,15 +572,10 @@ pub async fn process_new_blocks<
472572

473573
// Feed blocks into the pipeline
474574
futures::future::join_all(blocks.iter().map(|block| {
475-
let network = network.clone();
476-
let block_tracker = block_tracker.clone();
477575
let mut process_tx = process_tx.clone();
478576
async move {
479577
let block_number = block.number().unwrap_or(0);
480578

481-
// Record block in tracker
482-
block_tracker.record_block(&network, block_number).await?;
483-
484579
// Send block to processing pipeline
485580
process_tx
486581
.send((block.clone(), block_number))

src/services/blockwatcher/storage.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,19 @@ pub trait BlockStorage: Clone + Send + Sync {
6767
/// * `Result<(), anyhow::Error>` - Success or error
6868
async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error>;
6969

70-
/// Saves a missed block for a network
70+
/// Saves multiple missed blocks for a network in a single operation
7171
///
7272
/// # Arguments
7373
/// * `network_id` - Unique identifier for the network
74-
/// * `block` - Block number to save
74+
/// * `blocks` - Slice of block numbers to save
7575
///
7676
/// # Returns
7777
/// * `Result<(), anyhow::Error>` - Success or error
78-
async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error>;
78+
async fn save_missed_blocks(
79+
&self,
80+
network_id: &str,
81+
blocks: &[u64],
82+
) -> Result<(), anyhow::Error>;
7983
}
8084

8185
/// File-based implementation of block storage
@@ -197,15 +201,23 @@ impl BlockStorage for FileBlockStorage {
197201
Ok(())
198202
}
199203

200-
/// Saves a missed block for a network
204+
/// Saves multiple missed blocks for a network in a single operation
201205
///
202206
/// # Arguments
203207
/// * `network_id` - Unique identifier for the network
204-
/// * `block` - Block number to save
208+
/// * `blocks` - Slice of block numbers to save
205209
///
206210
/// # Returns
207211
/// * `Result<(), anyhow::Error>` - Success or error
208-
async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error> {
212+
async fn save_missed_blocks(
213+
&self,
214+
network_id: &str,
215+
blocks: &[u64],
216+
) -> Result<(), anyhow::Error> {
217+
if blocks.is_empty() {
218+
return Ok(());
219+
}
220+
209221
let file_path = self
210222
.storage_path
211223
.join(format!("{}_missed_blocks.txt", network_id));
@@ -218,10 +230,14 @@ impl BlockStorage for FileBlockStorage {
218230
.await
219231
.map_err(|e| anyhow::anyhow!("Failed to create missed block file: {}", e))?;
220232

221-
// Write the block number followed by a newline
222-
tokio::io::AsyncWriteExt::write_all(&mut file, format!("{}\n", block).as_bytes())
233+
// Write all block numbers at once, one per line
234+
let content = blocks
235+
.iter()
236+
.map(|block| format!("{}\n", block))
237+
.collect::<String>();
238+
tokio::io::AsyncWriteExt::write_all(&mut file, content.as_bytes())
223239
.await
224-
.map_err(|e| anyhow::anyhow!("Failed to save missed block: {}", e))?;
240+
.map_err(|e| anyhow::anyhow!("Failed to save missed blocks: {}", e))?;
225241

226242
Ok(())
227243
}
@@ -379,12 +395,12 @@ mod tests {
379395
}
380396

381397
#[tokio::test]
382-
async fn test_save_missed_block() {
398+
async fn test_save_missed_blocks() {
383399
let temp_dir = tempfile::tempdir().unwrap();
384400
let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
385401

386-
// Test 1: Normal save
387-
let result = storage.save_missed_block("test", 100).await;
402+
// Test 1: Normal save with single block
403+
let result = storage.save_missed_blocks("test", &[100]).await;
388404
assert!(result.is_ok());
389405

390406
// Verify the content
@@ -393,7 +409,21 @@ mod tests {
393409
.unwrap();
394410
assert_eq!(content, "100\n");
395411

396-
// Test 2: Save with invalid path
412+
// Test 2: Save multiple blocks
413+
let result = storage.save_missed_blocks("test", &[101, 102, 103]).await;
414+
assert!(result.is_ok());
415+
416+
// Verify the content (should append)
417+
let content = tokio::fs::read_to_string(temp_dir.path().join("test_missed_blocks.txt"))
418+
.await
419+
.unwrap();
420+
assert_eq!(content, "100\n101\n102\n103\n");
421+
422+
// Test 3: Save empty slice (should be no-op)
423+
let result = storage.save_missed_blocks("test", &[]).await;
424+
assert!(result.is_ok());
425+
426+
// Test 4: Save with invalid path
397427
#[cfg(unix)]
398428
{
399429
use std::os::unix::fs::PermissionsExt;
@@ -404,7 +434,7 @@ mod tests {
404434
std::fs::set_permissions(&readonly_dir, perms).unwrap();
405435

406436
let readonly_storage = FileBlockStorage::new(readonly_dir);
407-
let result = readonly_storage.save_missed_block("test", 100).await;
437+
let result = readonly_storage.save_missed_blocks("test", &[100]).await;
408438
assert!(result.is_err());
409439
let err = result.unwrap_err();
410440

0 commit comments

Comments
 (0)