Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subgraph Composition: Cache blocks processed by firehose block stream #5631

Draft
wants to merge 1 commit into
base: krishna/switch-to-using-only-block-ptrs
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Blockchain for Chain {
});

Ok(Box::new(FirehoseBlockStream::new(
self.chain_store(),
deployment.hash,
self.chain_client(),
store.block_ptr(),
Expand Down
1 change: 1 addition & 0 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl Blockchain for Chain {
});

Ok(Box::new(FirehoseBlockStream::new(
self.chain_store(),
deployment.hash,
self.chain_client(),
store.block_ptr(),
Expand Down
1 change: 1 addition & 0 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });

Ok(Box::new(FirehoseBlockStream::new(
chain.chain_store(),
deployment.hash,
chain.chain_client(),
subgraph_current_block,
Expand Down
23 changes: 21 additions & 2 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1719,10 +1719,27 @@ impl EthereumAdapterTrait for EthereumAdapter {

let mut blocks: Vec<Arc<BlockPtrExt>> = blocks_map
.into_iter()
.filter_map(|(_number, values)| {
.filter_map(|(number, values)| {
if values.len() == 1 {
json::from_value(values[0].clone()).ok()
match json::from_value(values[0].clone()) {
Ok(block) => Some(block),
Err(e) => {
debug!(
&logger,
"Failed to parse block for block number: {:?}, Error: {:?}",
number,
e
);
None
}
}
} else {
warn!(
&logger,
"Expected one block for block number {:?}, found {}",
number,
values.len()
);
None
}
})
Expand All @@ -1739,6 +1756,8 @@ impl EthereumAdapterTrait for EthereumAdapter {
"Loading {} block(s) not in the block cache",
missing_blocks.len()
);

debug!(logger, "Missing blocks {:?}", missing_blocks);
}

Box::new(
Expand Down
1 change: 1 addition & 0 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });

Ok(Box::new(FirehoseBlockStream::new(
chain.chain_store(),
deployment.hash,
chain.chain_client(),
subgraph_current_block,
Expand Down
1 change: 1 addition & 0 deletions chain/starknet/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });

Ok(Box::new(FirehoseBlockStream::new(
chain.chain_store(),
deployment.hash,
chain.chain_client(),
subgraph_current_block,
Expand Down
42 changes: 41 additions & 1 deletion graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::block_stream::{
use super::client::ChainClient;
use super::Blockchain;
use crate::blockchain::block_stream::FirehoseCursor;
use crate::blockchain::TriggerFilter;
use crate::blockchain::{Block, TriggerFilter};
use crate::prelude::*;
use crate::util::backoff::ExponentialBackoff;
use crate::{firehose, firehose::FirehoseEndpoint};
Expand Down Expand Up @@ -108,6 +108,7 @@ where
C: Blockchain,
{
pub fn new<F>(
chain_store: Arc<dyn ChainStore>,
deployment: DeploymentHash,
client: Arc<ChainClient<C>>,
subgraph_current_block: Option<BlockPtr>,
Expand All @@ -134,6 +135,7 @@ where
let metrics = FirehoseBlockStreamMetrics::new(registry, deployment.clone());
FirehoseBlockStream {
stream: Box::pin(stream_blocks(
chain_store,
client,
cursor,
deployment,
Expand All @@ -148,6 +150,7 @@ where
}

fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
chain_store: Arc<dyn ChainStore>,
client: Arc<ChainClient<C>>,
mut latest_cursor: FirehoseCursor,
deployment: DeploymentHash,
Expand Down Expand Up @@ -257,6 +260,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(

for await response in stream {
match process_firehose_response(
chain_store.clone(),
&endpoint,
response,
&mut check_subgraph_continuity,
Expand Down Expand Up @@ -344,6 +348,7 @@ enum BlockResponse<C: Blockchain> {
}

async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
chain_store: Arc<dyn ChainStore>,
endpoint: &Arc<FirehoseEndpoint>,
result: Result<firehose::Response, Status>,
check_subgraph_continuity: &mut bool,
Expand All @@ -359,11 +364,46 @@ async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
.await
.context("Mapping block to BlockStreamEvent failed")?;

if let BlockStreamEvent::ProcessBlock(block, _) = &event {
info!(logger, "Inserting block to cache"; "block_number" => block.block.number(), "block_hash" => format!("{:?}", block.block.hash()));

let start_time = Instant::now();

let result = chain_store
.insert_block(Arc::new(block.block.clone()))
.await;

let elapsed = start_time.elapsed();

match result {
Ok(_) => {
trace!(
logger,
"Block inserted to cache successfully";
"block_number" => block.block.number(),
"block_hash" => format!("{:?}", block.block.hash()),
"time_taken" => format!("{:?}", elapsed)
);
}
Err(e) => {
error!(
logger,
"Failed to insert block into store";
"block_number" => block.block.number(),
"block_hash" => format!("{:?}", block.block.hash()),
"error" => format!("{:?}", e),
"time_taken" => format!("{:?}", elapsed)
);
}
}
}

if *check_subgraph_continuity {
info!(logger, "Firehose started from a subgraph pointer without an existing cursor, ensuring chain continuity");

if let BlockStreamEvent::ProcessBlock(ref block, _) = event {
let previous_block_ptr = block.parent_ptr();

if previous_block_ptr.is_some() && previous_block_ptr.as_ref() != subgraph_current_block
{
warn!(&logger,
Expand Down
2 changes: 2 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ pub trait ChainStore: Send + Sync + 'static {
/// Insert a block into the store (or update if they are already present).
async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error>;

async fn insert_block(&self, block: Arc<dyn Block>) -> Result<(), Error>;

fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error>;

/// Try to update the head block pointer to the block with the highest block number.
Expand Down
36 changes: 22 additions & 14 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1947,20 +1947,8 @@ impl ChainStore {

Ok(block_map)
}
}

#[async_trait]
impl ChainStoreTrait for ChainStore {
fn genesis_block_ptr(&self) -> Result<BlockPtr, Error> {
let ident = self.chain_identifier()?;

Ok(BlockPtr {
hash: ident.genesis_block_hash,
number: 0,
})
}

async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
async fn save_block(&self, block: Arc<dyn Block>, allow_update: bool) -> Result<(), Error> {
// We should always have the parent block available to us at this point.
if let Some(parent_hash) = block.parent_hash() {
let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok());
Expand All @@ -1973,13 +1961,33 @@ impl ChainStoreTrait for ChainStore {
pool.with_conn(move |conn, _| {
conn.transaction(|conn| {
storage
.upsert_block(conn, &network, block.as_ref(), true)
.upsert_block(conn, &network, block.as_ref(), allow_update)
.map_err(CancelableError::from)
})
})
.await
.map_err(Error::from)
}
}

#[async_trait]
impl ChainStoreTrait for ChainStore {
fn genesis_block_ptr(&self) -> Result<BlockPtr, Error> {
let ident = self.chain_identifier()?;

Ok(BlockPtr {
hash: ident.genesis_block_hash,
number: 0,
})
}

async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
self.save_block(block, true).await
}

async fn insert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
self.save_block(block, false).await
}

fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error> {
let mut conn = self.pool.get()?;
Expand Down
Loading