From 401225fde6c4db12300c01e12b40b7a695748ff0 Mon Sep 17 00:00:00 2001 From: jxom <7336481+jxom@users.noreply.github.com> Date: Wed, 1 Apr 2026 09:45:39 +1300 Subject: [PATCH] fix: checked arithmetic for RPC data decoding Fix four High severity audit findings related to unchecked arithmetic: 1. Timestamp overflow in decode_block: timestamp_secs * 1000 can overflow for large timestamps. Now uses checked_mul with i64::try_from validation. 2. Log index narrowing in decode_log: log_index and transaction_index were cast with unchecked `as i32`. Now uses i32::try_from() and returns error on overflow. 3. Receipt index narrowing in decode_receipt: transaction_index was cast with unchecked `as i32`. Now uses i32::try_from(). 4. eth_blockNumber u64::MAX overflow in tick_realtime: current_to + 1 and tip_num + 1 could overflow at u64::MAX. Now uses saturating_add. All three decode functions now return Result, propagating errors to callers. Added unit tests for overflow cases and normal value decoding. Amp-Thread-ID: https://ampcode.com/threads/T-019d458d-01b9-76ca-9fb1-b40dd6c6a615 Co-authored-by: Amp --- src/sync/decoder.rs | 130 +++++++++++++++++++++++++++---- src/sync/engine.rs | 184 +++++++++++++++++++++----------------------- 2 files changed, 203 insertions(+), 111 deletions(-) diff --git a/src/sync/decoder.rs b/src/sync/decoder.rs index 2c8ecc0..bc26dc7 100644 --- a/src/sync/decoder.rs +++ b/src/sync/decoder.rs @@ -1,6 +1,7 @@ use alloy::consensus::transaction::Recovered; use alloy::consensus::{Transaction as TransactionTrait, Typed2718}; use alloy::network::{ReceiptResponse, TransactionResponse}; +use anyhow::Result; use chrono::{DateTime, TimeZone, Utc}; use tempo_alloy::primitives::transaction::SignatureType; @@ -13,12 +14,18 @@ pub fn timestamp_from_secs(secs: u64) -> DateTime { .unwrap_or_else(|| Utc.timestamp_opt(0, 0).single().unwrap()) } -pub fn decode_block(block: &Block) -> BlockRow { +pub fn decode_block(block: &Block) -> Result { let timestamp_secs = block.header.timestamp; let timestamp = timestamp_from_secs(timestamp_secs); - let timestamp_ms = (timestamp_secs * 1000) as i64; + let timestamp_ms = timestamp_secs + .checked_mul(1000) + .and_then(|v| i64::try_from(v).ok()) + .ok_or_else(|| anyhow::anyhow!( + "timestamp_secs {} overflows when converting to milliseconds", + timestamp_secs + ))?; - BlockRow { + Ok(BlockRow { num: block.header.number as i64, hash: block.header.hash.as_slice().to_vec(), parent_hash: block.header.parent_hash.as_slice().to_vec(), @@ -28,7 +35,7 @@ pub fn decode_block(block: &Block) -> BlockRow { gas_used: block.header.gas_used as i64, miner: block.header.beneficiary.as_slice().to_vec(), extra_data: Some(block.header.extra_data.to_vec()), - } + }) } pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow { @@ -82,15 +89,20 @@ pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow { } } -pub fn decode_log(log: &Log, block_timestamp: DateTime) -> LogRow { +pub fn decode_log(log: &Log, block_timestamp: DateTime) -> Result { let topics = log.topics(); let selector = topics.first().map(|s| s.as_slice().to_vec()); - LogRow { + let log_idx = i32::try_from(log.log_index.unwrap_or(0)) + .map_err(|_| anyhow::anyhow!("log_index {} overflows i32", log.log_index.unwrap_or(0)))?; + let tx_idx = i32::try_from(log.transaction_index.unwrap_or(0)) + .map_err(|_| anyhow::anyhow!("transaction_index {} overflows i32", log.transaction_index.unwrap_or(0)))?; + + Ok(LogRow { block_num: log.block_number.unwrap_or(0) as i64, block_timestamp, - log_idx: log.log_index.unwrap_or(0) as i32, - tx_idx: log.transaction_index.unwrap_or(0) as i32, + log_idx, + tx_idx, tx_hash: log .transaction_hash .map(|h| h.as_slice().to_vec()) @@ -102,7 +114,7 @@ pub fn decode_log(log: &Log, block_timestamp: DateTime) -> LogRow { topic2: topics.get(2).map(|t| t.as_slice().to_vec()), topic3: topics.get(3).map(|t| t.as_slice().to_vec()), data: log.data().data.to_vec(), - } + }) } /// Enrich transaction rows with fields that come from receipts (gas_used, fee_payer). @@ -186,6 +198,92 @@ mod tests { assert!(txs.is_empty()); } + fn make_test_block(timestamp: u64, number: u64) -> crate::tempo::Block { + use alloy::primitives::{Address, B256, Bytes}; + + let inner = alloy::consensus::Header { + timestamp, + number, + parent_hash: B256::ZERO, + gas_limit: 30_000_000, + gas_used: 21_000, + beneficiary: Address::ZERO, + extra_data: Bytes::new(), + ..Default::default() + }; + + crate::tempo::Block { + header: alloy::rpc::types::Header { + hash: B256::ZERO, + inner, + total_difficulty: None, + size: None, + }, + transactions: Default::default(), + ..Default::default() + } + } + + #[test] + fn decode_block_timestamp_overflow_returns_error() { + let block = make_test_block(u64::MAX, 1); + let result = decode_block(&block); + assert!(result.is_err(), "decode_block should error on u64::MAX timestamp"); + assert!( + result.unwrap_err().to_string().contains("overflows"), + "error message should mention overflow" + ); + } + + #[test] + fn decode_block_normal_timestamp_succeeds() { + let block = make_test_block(1_700_000_000, 42); + let result = decode_block(&block).unwrap(); + assert_eq!(result.num, 42); + assert_eq!(result.timestamp_ms, 1_700_000_000_000); + } + + fn make_test_log(log_index: u64, transaction_index: u64) -> crate::tempo::Log { + use alloy::primitives::{Address, LogData, B256}; + + crate::tempo::Log { + inner: alloy::primitives::Log { + address: Address::ZERO, + data: LogData::new(vec![], Default::default()).unwrap(), + }, + block_hash: Some(B256::ZERO), + block_number: Some(1), + block_timestamp: None, + transaction_hash: Some(B256::ZERO), + transaction_index: Some(transaction_index), + log_index: Some(log_index), + removed: false, + } + } + + #[test] + fn decode_log_index_overflow_returns_error() { + let log = make_test_log(u64::MAX, 0); + let result = decode_log(&log, chrono::Utc::now()); + assert!(result.is_err(), "decode_log should error when log_index overflows i32"); + } + + #[test] + fn decode_log_tx_index_overflow_returns_error() { + let log = make_test_log(0, u64::MAX); + let result = decode_log(&log, chrono::Utc::now()); + assert!(result.is_err(), "decode_log should error when transaction_index overflows i32"); + } + + #[test] + fn decode_log_normal_values_succeeds() { + let log = make_test_log(10, 5); + let result = decode_log(&log, chrono::Utc::now()).unwrap(); + assert_eq!(result.log_idx, 10); + assert_eq!(result.tx_idx, 5); + assert_eq!(result.block_num, 1); + } + #[test] fn enrich_multi_block_batch() { let mut txs = vec![ @@ -210,11 +308,17 @@ mod tests { } } -pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime) -> ReceiptRow { - ReceiptRow { +pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime) -> Result { + let tx_idx = i32::try_from(receipt.transaction_index().unwrap_or(0)) + .map_err(|_| anyhow::anyhow!( + "receipt transaction_index {} overflows i32", + receipt.transaction_index().unwrap_or(0) + ))?; + + Ok(ReceiptRow { block_num: receipt.block_number().unwrap_or(0) as i64, block_timestamp, - tx_idx: receipt.transaction_index().unwrap_or(0) as i32, + tx_idx, tx_hash: receipt.transaction_hash().as_slice().to_vec(), from: receipt.from().as_slice().to_vec(), to: receipt.to().map(|a| a.as_slice().to_vec()), @@ -224,5 +328,5 @@ pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime) -> Rece effective_gas_price: Some(receipt.effective_gas_price().to_string()), status: if receipt.status() { Some(1) } else { Some(0) }, fee_payer: Some(receipt.fee_payer.as_slice().to_vec()), - } + }) } diff --git a/src/sync/engine.rs b/src/sync/engine.rs index d9d649e..5158d65 100644 --- a/src/sync/engine.rs +++ b/src/sync/engine.rs @@ -298,7 +298,7 @@ impl SyncEngine { // Jump to near head immediately, don't catch up sequentially let start_from = if state.tip_num >= remote_head.saturating_sub(TAIL_WINDOW) { - state.tip_num + 1 + state.tip_num.saturating_add(1) } else { let jump_to = remote_head.saturating_sub(TAIL_WINDOW); if state.tip_num > 0 && jump_to > state.tip_num { @@ -347,8 +347,8 @@ impl SyncEngine { *logs_per_block.entry(log.block_num).or_insert(0_u64) += 1; } - let next_from = current_to + 1; - let next_to = (next_from + BATCH_SIZE - 1).min(remote_head); + let next_from = current_to.saturating_add(1); + let next_to = (next_from.saturating_add(BATCH_SIZE - 1)).min(remote_head); let has_next = next_from <= remote_head; // Pipeline: fetch next batch while writing current @@ -563,7 +563,7 @@ impl SyncEngine { .map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp))) .collect(); - let block_rows: Vec<_> = blocks.iter().map(decode_block).collect(); + let block_rows: Vec<_> = blocks.iter().map(|b| decode_block(b)).collect::>>()?; let mut all_txs: Vec<_> = blocks .iter() @@ -576,35 +576,23 @@ impl SyncEngine { }) .collect(); - let all_logs: Vec<_> = receipts - .iter() - .flatten() - .flat_map(|receipt| { - let block_num = receipt.block_number().unwrap_or(0); - block_timestamps - .get(&block_num) - .map(|&ts| { - receipt - .inner - .logs() - .iter() - .map(move |log| decode_log(log, ts)) - }) - .into_iter() - .flatten() - }) - .collect(); + let mut all_logs = Vec::new(); + for receipt in receipts.iter().flatten() { + let block_num = receipt.block_number().unwrap_or(0); + if let Some(&ts) = block_timestamps.get(&block_num) { + for log in receipt.inner.logs() { + all_logs.push(decode_log(log, ts)?); + } + } + } - let all_receipts: Vec<_> = receipts - .iter() - .flatten() - .filter_map(|receipt| { - let block_num = receipt.block_number().unwrap_or(0); - block_timestamps - .get(&block_num) - .map(|&ts| decode_receipt(receipt, ts)) - }) - .collect(); + let mut all_receipts = Vec::new(); + for receipt in receipts.iter().flatten() { + let block_num = receipt.block_number().unwrap_or(0); + if let Some(&ts) = block_timestamps.get(&block_num) { + all_receipts.push(decode_receipt(receipt, ts)?); + } + } enrich_txs_from_receipts(&mut all_txs, &all_receipts); @@ -628,7 +616,7 @@ impl SyncEngine { self.realtime_rpc.get_block_receipts(num) )?; - let block_row = decode_block(&block); + let block_row = decode_block(&block)?; let block_ts = timestamp_from_secs(block.header.timestamp); let mut txs: Vec<_> = block .transactions @@ -637,15 +625,17 @@ impl SyncEngine { .map(|(i, tx)| decode_transaction(tx, &block, i as u32)) .collect(); - let log_rows: Vec<_> = receipts - .iter() - .flat_map(|r| r.inner.logs().iter().map(|log| decode_log(log, block_ts))) - .collect(); + let mut log_rows = Vec::new(); + for r in &receipts { + for log in r.inner.logs() { + log_rows.push(decode_log(log, block_ts)?); + } + } - let receipt_rows: Vec<_> = receipts - .iter() - .map(|r| decode_receipt(r, block_ts)) - .collect(); + let mut receipt_rows = Vec::new(); + for r in &receipts { + receipt_rows.push(decode_receipt(r, block_ts)?); + } enrich_txs_from_receipts(&mut txs, &receipt_rows); @@ -1346,7 +1336,7 @@ async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to: .map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp))) .collect(); - let block_rows: Vec<_> = blocks.iter().map(decode_block).collect(); + let block_rows: Vec<_> = blocks.iter().map(|b| decode_block(b)).collect::>>()?; let mut all_txs: Vec<_> = blocks .iter() @@ -1359,35 +1349,23 @@ async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to: }) .collect(); - let all_logs: Vec<_> = receipts - .iter() - .flatten() - .flat_map(|receipt| { - let block_num = receipt.block_number().unwrap_or(0); - block_timestamps - .get(&block_num) - .map(|&ts| { - receipt - .inner - .logs() - .iter() - .map(move |log| decode_log(log, ts)) - }) - .into_iter() - .flatten() - }) - .collect(); + let mut all_logs = Vec::new(); + for receipt in receipts.iter().flatten() { + let block_num = receipt.block_number().unwrap_or(0); + if let Some(&ts) = block_timestamps.get(&block_num) { + for log in receipt.inner.logs() { + all_logs.push(decode_log(log, ts)?); + } + } + } - let all_receipts: Vec<_> = receipts - .iter() - .flatten() - .filter_map(|receipt| { - let block_num = receipt.block_number().unwrap_or(0); - block_timestamps - .get(&block_num) - .map(|&ts| decode_receipt(receipt, ts)) - }) - .collect(); + let mut all_receipts = Vec::new(); + for receipt in receipts.iter().flatten() { + let block_num = receipt.block_number().unwrap_or(0); + if let Some(&ts) = block_timestamps.get(&block_num) { + all_receipts.push(decode_receipt(receipt, ts)?); + } + } enrich_txs_from_receipts(&mut all_txs, &all_receipts); @@ -1490,35 +1468,23 @@ async fn tick_receipt_backfill(sinks: &SinkSet, rpc: &RpcClient, chain_id: u64) .collect(); // Decode logs and receipts - let all_logs: Vec<_> = receipts - .iter() - .flatten() - .flat_map(|receipt| { - let block_num = receipt.block_number().unwrap_or(0); - block_timestamps - .get(&block_num) - .map(|&ts| { - receipt - .inner - .logs() - .iter() - .map(move |log| decode_log(log, ts)) - }) - .into_iter() - .flatten() - }) - .collect(); + let mut all_logs = Vec::new(); + for receipt in receipts.iter().flatten() { + let block_num = receipt.block_number().unwrap_or(0); + if let Some(&ts) = block_timestamps.get(&block_num) { + for log in receipt.inner.logs() { + all_logs.push(decode_log(log, ts)?); + } + } + } - let all_receipts: Vec<_> = receipts - .iter() - .flatten() - .filter_map(|receipt| { - let block_num = receipt.block_number().unwrap_or(0); - block_timestamps - .get(&block_num) - .map(|&ts| decode_receipt(receipt, ts)) - }) - .collect(); + let mut all_receipts = Vec::new(); + for receipt in receipts.iter().flatten() { + let block_num = receipt.block_number().unwrap_or(0); + if let Some(&ts) = block_timestamps.get(&block_num) { + all_receipts.push(decode_receipt(receipt, ts)?); + } + } let log_count = all_logs.len(); let receipt_count = all_receipts.len(); @@ -1630,6 +1596,28 @@ mod tests { assert_eq!(group_consecutive_blocks(&[5, 1, 3, 2, 4]), vec![(1, 5)]); } + #[test] + fn test_saturating_add_prevents_u64_max_overflow() { + // Simulates the tick_realtime arithmetic with u64::MAX as remote_head + let current_to: u64 = u64::MAX; + let next_from = current_to.saturating_add(1); + // saturating_add prevents overflow - stays at u64::MAX + assert_eq!(next_from, u64::MAX); + // has_next should be false because next_from > remote_head is impossible + // (they're both u64::MAX), so next_from <= remote_head is true but + // the loop terminates because current_from > remote_head after advancing + let remote_head = u64::MAX; + let batch_size: u64 = 10; + let next_to = next_from.saturating_add(batch_size - 1).min(remote_head); + assert_eq!(next_to, u64::MAX); + + // Verify tip_num + 1 also saturates + let tip_num: u64 = u64::MAX; + let start_from = tip_num.saturating_add(1); + assert_eq!(start_from, u64::MAX); + // start_from > remote_head would be false, preventing infinite loops + } + #[test] fn test_group_consecutive_blocks_splits_large_ranges() { // 20 consecutive blocks should be split into ranges of MAX_RANGE_SIZE