Skip to content

Commit

Permalink
Subgraph composition: Use block cache to get blocks for subgraph trig…
Browse files Browse the repository at this point in the history
…gers
  • Loading branch information
incrypto32 committed Jan 31, 2025
1 parent cf4951c commit 998dbd2
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 16 deletions.
103 changes: 92 additions & 11 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,45 @@ impl EthereumAdapter {
.buffered(ENV_VARS.block_batch_size)
}

/// Request blocks by number through JSON-RPC.
fn load_blocks_by_numbers_rpc(
&self,
logger: Logger,
numbers: Vec<BlockNumber>,
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
let web3 = self.web3.clone();

stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
let web3 = web3.clone();
retry(format!("load block {}", number), &logger)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Box::pin(
web3.eth()
.block_with_txs(BlockId::Number(Web3BlockNumber::Number(
number.into(),
))),
)
.compat()
.from_err::<Error>()
.and_then(move |block| {
block.map(Arc::new).ok_or_else(|| {
anyhow::anyhow!(
"Ethereum node did not find block with number {:?}",
number
)
})
})
.compat()
})
.boxed()
.compat()
.from_err()
}))
.buffered(ENV_VARS.block_batch_size)
}

/// Request blocks ptrs for numbers through JSON-RPC.
///
/// Reorg safety: If ids are numbers, they must be a final blocks.
Expand Down Expand Up @@ -1650,26 +1689,68 @@ impl EthereumAdapterTrait for EthereumAdapter {
Ok(decoded)
}

// This is a ugly temporary implementation to get the block ptrs for a range of blocks
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
async fn load_blocks_by_numbers(
&self,
logger: Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
let block_hashes = block_numbers
let blocks_map: BTreeMap<i32, Vec<json::Value>> = chain_store
.cheap_clone()
.blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
.await
.map_err(|e| {
error!(&logger, "Error accessing block cache {}", e);
e
})
.unwrap_or_default();

let mut blocks: Vec<Arc<LightEthereumBlock>> = blocks_map
.into_iter()
.map(|number| {
chain_store
.block_hashes_by_block_number(number)
.unwrap()
.first()
.unwrap()
.as_h256()
.filter_map(|(_number, values)| {
if values.len() == 1 {
json::from_value(values[0].clone()).ok()
} else {
None
}
})
.collect::<HashSet<_>>();
.collect::<Vec<_>>();

self.load_blocks(logger, chain_store, block_hashes).await
let missing_blocks: Vec<i32> = block_numbers
.into_iter()
.filter(|&number| !blocks.iter().any(|block| block.number() == number))
.collect();

if !missing_blocks.is_empty() {
debug!(
logger,
"Loading {} block(s) not in the block cache",
missing_blocks.len()
);
}

Box::new(
self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks)
.collect()
.map(move |new_blocks| {
let upsert_blocks: Vec<_> = new_blocks
.iter()
.map(|block| BlockFinality::Final(block.clone()))
.collect();
let block_refs: Vec<_> = upsert_blocks
.iter()
.map(|block| block as &dyn graph::blockchain::Block)
.collect();
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
error!(logger, "Error writing to block cache {}", e);
}
blocks.extend(new_blocks);
blocks.sort_by_key(|block| block.number);
stream::iter_ok(blocks)
})
.flatten_stream(),
)
}

/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<C: Blockchain> ChainClient<C> {
pub fn rpc(&self) -> anyhow::Result<&C::Client> {
match self {
Self::Rpc(rpc) => Ok(rpc),
_ => Err(anyhow!("rpc endpoint requested on firehose chain client")),
Self::Firehose(_) => Err(anyhow!("rpc endpoint requested on firehose chain client")),
}
}
}
6 changes: 6 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ pub trait ChainStore: Send + Sync + 'static {
hashes: Vec<BlockHash>,
) -> Result<Vec<serde_json::Value>, Error>;

/// Returns the blocks present in the store for the given block numbers.
async fn blocks_by_numbers(
self: Arc<Self>,
numbers: Vec<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<serde_json::Value>>, Error>;

/// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching
/// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding
/// a child of `root`. Returns None if unable to complete due to missing blocks in the chain
Expand Down
Loading

0 comments on commit 998dbd2

Please sign in to comment.