From 10c5c95542d94b7f6e56835e1529432be5d1857a Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 5 Sep 2024 11:59:10 +0530 Subject: [PATCH] chain/ethereum, graph: refactor firehose block stream to cache firehose blocks --- chain/arweave/src/chain.rs | 1 + chain/cosmos/src/chain.rs | 1 + chain/ethereum/src/chain.rs | 1 + chain/ethereum/src/ethereum_adapter.rs | 23 +++++++++- chain/near/src/chain.rs | 1 + chain/starknet/src/chain.rs | 1 + graph/src/blockchain/firehose_block_stream.rs | 42 ++++++++++++++++++- graph/src/components/store/traits.rs | 2 + store/postgres/src/chain_store.rs | 36 +++++++++------- 9 files changed, 91 insertions(+), 17 deletions(-) diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index 8cf9aeaac2d..285374bffbf 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -144,6 +144,7 @@ impl Blockchain for Chain { }); Ok(Box::new(FirehoseBlockStream::new( + self.chain_store(), deployment.hash, self.chain_client(), store.block_ptr(), diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index bbd7a1ac410..e72c4d77a39 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -137,6 +137,7 @@ impl Blockchain for Chain { }); Ok(Box::new(FirehoseBlockStream::new( + self.chain_store(), deployment.hash, self.chain_client(), store.block_ptr(), diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index c55a3ebaf77..4e4f26fd9b1 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -100,6 +100,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); Ok(Box::new(FirehoseBlockStream::new( + chain.chain_store(), deployment.hash, chain.chain_client(), subgraph_current_block, diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 117783cf815..fb125e4138a 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1719,10 +1719,27 @@ impl EthereumAdapterTrait for EthereumAdapter { let mut blocks: Vec> = blocks_map .into_iter() - .filter_map(|(_number, values)| { + .filter_map(|(number, values)| { if values.len() == 1 { - json::from_value(values[0].clone()).ok() + match json::from_value(values[0].clone()) { + Ok(block) => Some(block), + Err(e) => { + debug!( + &logger, + "Failed to parse block for block number: {:?}, Error: {:?}", + number, + e + ); + None + } + } } else { + warn!( + &logger, + "Expected one block for block number {:?}, found {}", + number, + values.len() + ); None } }) @@ -1739,6 +1756,8 @@ impl EthereumAdapterTrait for EthereumAdapter { "Loading {} block(s) not in the block cache", missing_blocks.len() ); + + debug!(logger, "Missing blocks {:?}", missing_blocks); } Box::new( diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 238551df841..db134fef9e5 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -136,6 +136,7 @@ impl BlockStreamBuilder for NearStreamBuilder { let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); Ok(Box::new(FirehoseBlockStream::new( + chain.chain_store(), deployment.hash, chain.chain_client(), subgraph_current_block, diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index 826e1fe2c91..0721a3a9c6f 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -227,6 +227,7 @@ impl BlockStreamBuilder for StarknetStreamBuilder { let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); Ok(Box::new(FirehoseBlockStream::new( + chain.chain_store(), deployment.hash, chain.chain_client(), subgraph_current_block, diff --git a/graph/src/blockchain/firehose_block_stream.rs b/graph/src/blockchain/firehose_block_stream.rs index 254ccd42f82..5f4fc02fcfc 100644 --- a/graph/src/blockchain/firehose_block_stream.rs +++ b/graph/src/blockchain/firehose_block_stream.rs @@ -4,7 +4,7 @@ use super::block_stream::{ use super::client::ChainClient; use super::Blockchain; use crate::blockchain::block_stream::FirehoseCursor; -use crate::blockchain::TriggerFilter; +use crate::blockchain::{Block, TriggerFilter}; use crate::prelude::*; use crate::util::backoff::ExponentialBackoff; use crate::{firehose, firehose::FirehoseEndpoint}; @@ -108,6 +108,7 @@ where C: Blockchain, { pub fn new( + chain_store: Arc, deployment: DeploymentHash, client: Arc>, subgraph_current_block: Option, @@ -134,6 +135,7 @@ where let metrics = FirehoseBlockStreamMetrics::new(registry, deployment.clone()); FirehoseBlockStream { stream: Box::pin(stream_blocks( + chain_store, client, cursor, deployment, @@ -148,6 +150,7 @@ where } fn stream_blocks>( + chain_store: Arc, client: Arc>, mut latest_cursor: FirehoseCursor, deployment: DeploymentHash, @@ -257,6 +260,7 @@ fn stream_blocks>( for await response in stream { match process_firehose_response( + chain_store.clone(), &endpoint, response, &mut check_subgraph_continuity, @@ -344,6 +348,7 @@ enum BlockResponse { } async fn process_firehose_response>( + chain_store: Arc, endpoint: &Arc, result: Result, check_subgraph_continuity: &mut bool, @@ -359,11 +364,46 @@ async fn process_firehose_response>( .await .context("Mapping block to BlockStreamEvent failed")?; + if let BlockStreamEvent::ProcessBlock(block, _) = &event { + info!(logger, "Inserting block to cache"; "block_number" => block.block.number(), "block_hash" => format!("{:?}", block.block.hash())); + + let start_time = Instant::now(); + + let result = chain_store + .insert_block(Arc::new(block.block.clone())) + .await; + + let elapsed = start_time.elapsed(); + + match result { + Ok(_) => { + trace!( + logger, + "Block inserted to cache successfully"; + "block_number" => block.block.number(), + "block_hash" => format!("{:?}", block.block.hash()), + "time_taken" => format!("{:?}", elapsed) + ); + } + Err(e) => { + error!( + logger, + "Failed to insert block into store"; + "block_number" => block.block.number(), + "block_hash" => format!("{:?}", block.block.hash()), + "error" => format!("{:?}", e), + "time_taken" => format!("{:?}", elapsed) + ); + } + } + } + if *check_subgraph_continuity { info!(logger, "Firehose started from a subgraph pointer without an existing cursor, ensuring chain continuity"); if let BlockStreamEvent::ProcessBlock(ref block, _) = event { let previous_block_ptr = block.parent_ptr(); + if previous_block_ptr.is_some() && previous_block_ptr.as_ref() != subgraph_current_block { warn!(&logger, diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 55f0b6b7b65..411b6eb33ee 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -443,6 +443,8 @@ pub trait ChainStore: Send + Sync + 'static { /// Insert a block into the store (or update if they are already present). async fn upsert_block(&self, block: Arc) -> Result<(), Error>; + async fn insert_block(&self, block: Arc) -> Result<(), Error>; + fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error>; /// Try to update the head block pointer to the block with the highest block number. diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index bd37739eec7..592a126056c 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1947,20 +1947,8 @@ impl ChainStore { Ok(block_map) } -} - -#[async_trait] -impl ChainStoreTrait for ChainStore { - fn genesis_block_ptr(&self) -> Result { - let ident = self.chain_identifier()?; - - Ok(BlockPtr { - hash: ident.genesis_block_hash, - number: 0, - }) - } - async fn upsert_block(&self, block: Arc) -> Result<(), Error> { + async fn save_block(&self, block: Arc, allow_update: bool) -> Result<(), Error> { // We should always have the parent block available to us at this point. if let Some(parent_hash) = block.parent_hash() { let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok()); @@ -1973,13 +1961,33 @@ impl ChainStoreTrait for ChainStore { pool.with_conn(move |conn, _| { conn.transaction(|conn| { storage - .upsert_block(conn, &network, block.as_ref(), true) + .upsert_block(conn, &network, block.as_ref(), allow_update) .map_err(CancelableError::from) }) }) .await .map_err(Error::from) } +} + +#[async_trait] +impl ChainStoreTrait for ChainStore { + fn genesis_block_ptr(&self) -> Result { + let ident = self.chain_identifier()?; + + Ok(BlockPtr { + hash: ident.genesis_block_hash, + number: 0, + }) + } + + async fn upsert_block(&self, block: Arc) -> Result<(), Error> { + self.save_block(block, true).await + } + + async fn insert_block(&self, block: Arc) -> Result<(), Error> { + self.save_block(block, false).await + } fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error> { let mut conn = self.pool.get()?;