Skip to content

Commit

Permalink
chain/ethereum, graph: refactor firehose block stream to cache fireho…
Browse files Browse the repository at this point in the history
…se blocks
  • Loading branch information
incrypto32 committed Sep 5, 2024
1 parent 856f3f4 commit 10c5c95
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 17 deletions.
1 change: 1 addition & 0 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Blockchain for Chain {
});

Ok(Box::new(FirehoseBlockStream::new(
self.chain_store(),
deployment.hash,
self.chain_client(),
store.block_ptr(),
Expand Down
1 change: 1 addition & 0 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl Blockchain for Chain {
});

Ok(Box::new(FirehoseBlockStream::new(
self.chain_store(),
deployment.hash,
self.chain_client(),
store.block_ptr(),
Expand Down
1 change: 1 addition & 0 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });

Ok(Box::new(FirehoseBlockStream::new(
chain.chain_store(),
deployment.hash,
chain.chain_client(),
subgraph_current_block,
Expand Down
23 changes: 21 additions & 2 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1719,10 +1719,27 @@ impl EthereumAdapterTrait for EthereumAdapter {

let mut blocks: Vec<Arc<BlockPtrExt>> = blocks_map
.into_iter()
.filter_map(|(_number, values)| {
.filter_map(|(number, values)| {
if values.len() == 1 {
json::from_value(values[0].clone()).ok()
match json::from_value(values[0].clone()) {
Ok(block) => Some(block),
Err(e) => {
debug!(
&logger,
"Failed to parse block for block number: {:?}, Error: {:?}",
number,
e
);
None
}
}
} else {
warn!(
&logger,
"Expected one block for block number {:?}, found {}",
number,
values.len()
);
None
}
})
Expand All @@ -1739,6 +1756,8 @@ impl EthereumAdapterTrait for EthereumAdapter {
"Loading {} block(s) not in the block cache",
missing_blocks.len()
);

debug!(logger, "Missing blocks {:?}", missing_blocks);
}

Box::new(
Expand Down
1 change: 1 addition & 0 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });

Ok(Box::new(FirehoseBlockStream::new(
chain.chain_store(),
deployment.hash,
chain.chain_client(),
subgraph_current_block,
Expand Down
1 change: 1 addition & 0 deletions chain/starknet/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });

Ok(Box::new(FirehoseBlockStream::new(
chain.chain_store(),
deployment.hash,
chain.chain_client(),
subgraph_current_block,
Expand Down
42 changes: 41 additions & 1 deletion graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::block_stream::{
use super::client::ChainClient;
use super::Blockchain;
use crate::blockchain::block_stream::FirehoseCursor;
use crate::blockchain::TriggerFilter;
use crate::blockchain::{Block, TriggerFilter};
use crate::prelude::*;
use crate::util::backoff::ExponentialBackoff;
use crate::{firehose, firehose::FirehoseEndpoint};
Expand Down Expand Up @@ -108,6 +108,7 @@ where
C: Blockchain,
{
pub fn new<F>(
chain_store: Arc<dyn ChainStore>,
deployment: DeploymentHash,
client: Arc<ChainClient<C>>,
subgraph_current_block: Option<BlockPtr>,
Expand All @@ -134,6 +135,7 @@ where
let metrics = FirehoseBlockStreamMetrics::new(registry, deployment.clone());
FirehoseBlockStream {
stream: Box::pin(stream_blocks(
chain_store,
client,
cursor,
deployment,
Expand All @@ -148,6 +150,7 @@ where
}

fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
chain_store: Arc<dyn ChainStore>,
client: Arc<ChainClient<C>>,
mut latest_cursor: FirehoseCursor,
deployment: DeploymentHash,
Expand Down Expand Up @@ -257,6 +260,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(

for await response in stream {
match process_firehose_response(
chain_store.clone(),
&endpoint,
response,
&mut check_subgraph_continuity,
Expand Down Expand Up @@ -344,6 +348,7 @@ enum BlockResponse<C: Blockchain> {
}

async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
chain_store: Arc<dyn ChainStore>,
endpoint: &Arc<FirehoseEndpoint>,
result: Result<firehose::Response, Status>,
check_subgraph_continuity: &mut bool,
Expand All @@ -359,11 +364,46 @@ async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
.await
.context("Mapping block to BlockStreamEvent failed")?;

if let BlockStreamEvent::ProcessBlock(block, _) = &event {
info!(logger, "Inserting block to cache"; "block_number" => block.block.number(), "block_hash" => format!("{:?}", block.block.hash()));

let start_time = Instant::now();

let result = chain_store
.insert_block(Arc::new(block.block.clone()))
.await;

let elapsed = start_time.elapsed();

match result {
Ok(_) => {
trace!(
logger,
"Block inserted to cache successfully";
"block_number" => block.block.number(),
"block_hash" => format!("{:?}", block.block.hash()),
"time_taken" => format!("{:?}", elapsed)
);
}
Err(e) => {
error!(
logger,
"Failed to insert block into store";
"block_number" => block.block.number(),
"block_hash" => format!("{:?}", block.block.hash()),
"error" => format!("{:?}", e),
"time_taken" => format!("{:?}", elapsed)
);
}
}
}

if *check_subgraph_continuity {
info!(logger, "Firehose started from a subgraph pointer without an existing cursor, ensuring chain continuity");

if let BlockStreamEvent::ProcessBlock(ref block, _) = event {
let previous_block_ptr = block.parent_ptr();

if previous_block_ptr.is_some() && previous_block_ptr.as_ref() != subgraph_current_block
{
warn!(&logger,
Expand Down
2 changes: 2 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ pub trait ChainStore: Send + Sync + 'static {
/// Insert a block into the store (or update if they are already present).
async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error>;

async fn insert_block(&self, block: Arc<dyn Block>) -> Result<(), Error>;

fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error>;

/// Try to update the head block pointer to the block with the highest block number.
Expand Down
36 changes: 22 additions & 14 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1947,20 +1947,8 @@ impl ChainStore {

Ok(block_map)
}
}

#[async_trait]
impl ChainStoreTrait for ChainStore {
fn genesis_block_ptr(&self) -> Result<BlockPtr, Error> {
let ident = self.chain_identifier()?;

Ok(BlockPtr {
hash: ident.genesis_block_hash,
number: 0,
})
}

async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
async fn save_block(&self, block: Arc<dyn Block>, allow_update: bool) -> Result<(), Error> {
// We should always have the parent block available to us at this point.
if let Some(parent_hash) = block.parent_hash() {
let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok());
Expand All @@ -1973,13 +1961,33 @@ impl ChainStoreTrait for ChainStore {
pool.with_conn(move |conn, _| {
conn.transaction(|conn| {
storage
.upsert_block(conn, &network, block.as_ref(), true)
.upsert_block(conn, &network, block.as_ref(), allow_update)
.map_err(CancelableError::from)
})
})
.await
.map_err(Error::from)
}
}

#[async_trait]
impl ChainStoreTrait for ChainStore {
fn genesis_block_ptr(&self) -> Result<BlockPtr, Error> {
let ident = self.chain_identifier()?;

Ok(BlockPtr {
hash: ident.genesis_block_hash,
number: 0,
})
}

async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
self.save_block(block, true).await
}

async fn insert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
self.save_block(block, false).await
}

fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error> {
let mut conn = self.pool.get()?;
Expand Down

0 comments on commit 10c5c95

Please sign in to comment.