Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 117 additions & 13 deletions src/sync/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use alloy::consensus::transaction::Recovered;
use alloy::consensus::{Transaction as TransactionTrait, Typed2718};
use alloy::network::{ReceiptResponse, TransactionResponse};
use anyhow::Result;
use chrono::{DateTime, TimeZone, Utc};
use tempo_alloy::primitives::transaction::SignatureType;

Expand All @@ -13,12 +14,18 @@ pub fn timestamp_from_secs(secs: u64) -> DateTime<Utc> {
.unwrap_or_else(|| Utc.timestamp_opt(0, 0).single().unwrap())
}

pub fn decode_block(block: &Block) -> BlockRow {
pub fn decode_block(block: &Block) -> Result<BlockRow> {
let timestamp_secs = block.header.timestamp;
let timestamp = timestamp_from_secs(timestamp_secs);
let timestamp_ms = (timestamp_secs * 1000) as i64;
let timestamp_ms = timestamp_secs
.checked_mul(1000)
.and_then(|v| i64::try_from(v).ok())
.ok_or_else(|| anyhow::anyhow!(
"timestamp_secs {} overflows when converting to milliseconds",
timestamp_secs
))?;

BlockRow {
Ok(BlockRow {
num: block.header.number as i64,
hash: block.header.hash.as_slice().to_vec(),
parent_hash: block.header.parent_hash.as_slice().to_vec(),
Expand All @@ -28,7 +35,7 @@ pub fn decode_block(block: &Block) -> BlockRow {
gas_used: block.header.gas_used as i64,
miner: block.header.beneficiary.as_slice().to_vec(),
extra_data: Some(block.header.extra_data.to_vec()),
}
})
}

pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow {
Expand Down Expand Up @@ -82,15 +89,20 @@ pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow {
}
}

pub fn decode_log(log: &Log, block_timestamp: DateTime<Utc>) -> LogRow {
pub fn decode_log(log: &Log, block_timestamp: DateTime<Utc>) -> Result<LogRow> {
let topics = log.topics();
let selector = topics.first().map(|s| s.as_slice().to_vec());

LogRow {
let log_idx = i32::try_from(log.log_index.unwrap_or(0))
.map_err(|_| anyhow::anyhow!("log_index {} overflows i32", log.log_index.unwrap_or(0)))?;
let tx_idx = i32::try_from(log.transaction_index.unwrap_or(0))
.map_err(|_| anyhow::anyhow!("transaction_index {} overflows i32", log.transaction_index.unwrap_or(0)))?;

Ok(LogRow {
block_num: log.block_number.unwrap_or(0) as i64,
block_timestamp,
log_idx: log.log_index.unwrap_or(0) as i32,
tx_idx: log.transaction_index.unwrap_or(0) as i32,
log_idx,
tx_idx,
tx_hash: log
.transaction_hash
.map(|h| h.as_slice().to_vec())
Expand All @@ -102,7 +114,7 @@ pub fn decode_log(log: &Log, block_timestamp: DateTime<Utc>) -> LogRow {
topic2: topics.get(2).map(|t| t.as_slice().to_vec()),
topic3: topics.get(3).map(|t| t.as_slice().to_vec()),
data: log.data().data.to_vec(),
}
})
}

/// Enrich transaction rows with fields that come from receipts (gas_used, fee_payer).
Expand Down Expand Up @@ -186,6 +198,92 @@ mod tests {
assert!(txs.is_empty());
}

fn make_test_block(timestamp: u64, number: u64) -> crate::tempo::Block {
use alloy::primitives::{Address, B256, Bytes};

let inner = alloy::consensus::Header {
timestamp,
number,
parent_hash: B256::ZERO,
gas_limit: 30_000_000,
gas_used: 21_000,
beneficiary: Address::ZERO,
extra_data: Bytes::new(),
..Default::default()
};

crate::tempo::Block {
header: alloy::rpc::types::Header {
hash: B256::ZERO,
inner,
total_difficulty: None,
size: None,
},
transactions: Default::default(),
..Default::default()
}
}

#[test]
fn decode_block_timestamp_overflow_returns_error() {
let block = make_test_block(u64::MAX, 1);
let result = decode_block(&block);
assert!(result.is_err(), "decode_block should error on u64::MAX timestamp");
assert!(
result.unwrap_err().to_string().contains("overflows"),
"error message should mention overflow"
);
}

#[test]
fn decode_block_normal_timestamp_succeeds() {
let block = make_test_block(1_700_000_000, 42);
let result = decode_block(&block).unwrap();
assert_eq!(result.num, 42);
assert_eq!(result.timestamp_ms, 1_700_000_000_000);
}

fn make_test_log(log_index: u64, transaction_index: u64) -> crate::tempo::Log {
use alloy::primitives::{Address, LogData, B256};

crate::tempo::Log {
inner: alloy::primitives::Log {
address: Address::ZERO,
data: LogData::new(vec![], Default::default()).unwrap(),
},
block_hash: Some(B256::ZERO),
block_number: Some(1),
block_timestamp: None,
transaction_hash: Some(B256::ZERO),
transaction_index: Some(transaction_index),
log_index: Some(log_index),
removed: false,
}
}

#[test]
fn decode_log_index_overflow_returns_error() {
let log = make_test_log(u64::MAX, 0);
let result = decode_log(&log, chrono::Utc::now());
assert!(result.is_err(), "decode_log should error when log_index overflows i32");
}

#[test]
fn decode_log_tx_index_overflow_returns_error() {
let log = make_test_log(0, u64::MAX);
let result = decode_log(&log, chrono::Utc::now());
assert!(result.is_err(), "decode_log should error when transaction_index overflows i32");
}

#[test]
fn decode_log_normal_values_succeeds() {
let log = make_test_log(10, 5);
let result = decode_log(&log, chrono::Utc::now()).unwrap();
assert_eq!(result.log_idx, 10);
assert_eq!(result.tx_idx, 5);
assert_eq!(result.block_num, 1);
}

#[test]
fn enrich_multi_block_batch() {
let mut txs = vec![
Expand All @@ -210,11 +308,17 @@ mod tests {
}
}

pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime<Utc>) -> ReceiptRow {
ReceiptRow {
pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime<Utc>) -> Result<ReceiptRow> {
let tx_idx = i32::try_from(receipt.transaction_index().unwrap_or(0))
.map_err(|_| anyhow::anyhow!(
"receipt transaction_index {} overflows i32",
receipt.transaction_index().unwrap_or(0)
))?;

Ok(ReceiptRow {
block_num: receipt.block_number().unwrap_or(0) as i64,
block_timestamp,
tx_idx: receipt.transaction_index().unwrap_or(0) as i32,
tx_idx,
tx_hash: receipt.transaction_hash().as_slice().to_vec(),
from: receipt.from().as_slice().to_vec(),
to: receipt.to().map(|a| a.as_slice().to_vec()),
Expand All @@ -224,5 +328,5 @@ pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime<Utc>) -> Rece
effective_gas_price: Some(receipt.effective_gas_price().to_string()),
status: if receipt.status() { Some(1) } else { Some(0) },
fee_payer: Some(receipt.fee_payer.as_slice().to_vec()),
}
})
}
Loading
Loading