diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 3c5f08851f2..ee69083588e 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,4 +1,4 @@ -use crate::data::store::scalar; +use crate::blockchain::SubgraphFilter; use crate::data_source::subgraph; use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; @@ -7,17 +7,16 @@ use anyhow::Error; use async_stream::stream; use futures03::Stream; use prost_types::Any; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::fmt; +use std::ops::Range; use std::sync::Arc; use std::time::Instant; use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use super::substreams_block_stream::SubstreamsLogData; -use super::{ - Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper, -}; +use super::{Block, BlockPtr, BlockTime, Blockchain, Trigger, TriggerFilterWrapper}; use crate::anyhow::Result; use crate::components::store::{BlockNumber, DeploymentLocator, WritableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; @@ -350,11 +349,36 @@ impl TriggersAdapterWrapper { filter: &Arc>, ) -> Result<(Vec>, BlockNumber), Error> { if !filter.subgraph_filter.is_empty() { - return self - .subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter) - .await; + // TODO: handle empty range, or empty entity set bellow + + if let Some(SubgraphFilter { + subgraph: dh, + start_block: _sb, + entities: ent, + }) = filter.subgraph_filter.first() + { + if let Some((dh2, store)) = self.source_subgraph_stores.first() { + if dh == dh2 { + let schema = crate::components::store::ReadStore::input_schema(store); + if let Some(entity_type) = ent.first() { + let et = schema.entity_type(entity_type).unwrap(); + + let br: Range = from..to; + let entities = store.get_range(&et, br)?; + return self + .subgraph_triggers( + Logger::root(slog::Discard, o!()), + from, + to, + filter, + entities, + ) + .await; + } + } + } + } } - self.adapter .scan_triggers(from, to, &filter.chain_filter) .await @@ -381,70 +405,54 @@ impl TriggersAdapterWrapper { self.adapter.chain_head_ptr().await } - // TODO(krishna): Currently this is a mock implementation of subgraph triggers. - // This will be replaced with the actual implementation which will use the filters to - // query the database of the source subgraph and return the entity triggers. async fn subgraph_triggers( &self, logger: Logger, from: BlockNumber, to: BlockNumber, filter: &Arc>, + entities: BTreeMap>, ) -> Result<(Vec>, BlockNumber), Error> { let logger2 = logger.cheap_clone(); let adapter = self.adapter.clone(); - // let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?; - // let to = to_ptr.block_number(); - let first_filter = filter.subgraph_filter.first().unwrap(); - let blocks = adapter - .load_blocks_by_numbers(logger, HashSet::from_iter(from..=to)) + .load_blocks_by_numbers(logger, HashSet::from_iter(from..to)) .await? .into_iter() .map(|block| { - let trigger_data = vec![Self::create_mock_subgraph_trigger(first_filter, &block)]; - BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2) + let key = block.number(); + match entities.get(&key) { + Some(e) => { + let trigger_data = + Self::create_subgraph_trigger_from_entity(first_filter, e); + Some(BlockWithTriggers::new_with_subgraph_triggers( + block, + trigger_data, + &logger2, + )) + } + None => None, + } }) + .flatten() .collect(); Ok((blocks, to)) } - fn create_mock_subgraph_trigger( + fn create_subgraph_trigger_from_entity( filter: &SubgraphFilter, - block: &C::Block, - ) -> subgraph::TriggerData { - let mock_entity = Self::create_mock_entity(block); - subgraph::TriggerData { - source: filter.subgraph.clone(), - entity: mock_entity, - entity_type: filter.entities.first().unwrap().clone(), - } - } - - fn create_mock_entity(block: &C::Block) -> Entity { - let id = DeploymentHash::new("test").unwrap(); - let data_schema = InputSchema::parse_latest( - "type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }", - id.clone(), - ) - .unwrap(); - - let block = block.ptr(); - let hash = Value::Bytes(scalar::Bytes::from(block.hash_slice().to_vec())); - let data = data_schema - .make_entity(vec![ - ("id".into(), hash.clone()), - ( - "number".into(), - Value::BigInt(scalar::BigInt::from(block.block_number())), - ), - ("hash".into(), hash), - ]) - .unwrap(); - - data + entity: &Vec, + ) -> Vec { + entity + .iter() + .map(|e| subgraph::TriggerData { + source: filter.subgraph.clone(), + entity: e.clone(), + entity_type: filter.entities.first().unwrap().clone(), + }) + .collect() } } diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 647d1cc6f76..fdc82b03510 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -522,16 +522,15 @@ async fn subgraph_data_sources(ctx: TestContext) -> anyhow::Result<()> { assert!(subgraph.healthy); let expected_response = json!({ "mirrorBlocks": [ - { "number": "0" }, - { "number": "1" }, + { "number": "1" }, { "number": "2" }, - { "number": "3" }, - { "number": "4" }, - { "number": "5" }, + { "number": "3" }, + { "number": "4" }, + { "number": "5" }, { "number": "6" }, - { "number": "7" }, - { "number": "8" }, - { "number": "9" }, + { "number": "7" }, + { "number": "8" }, + { "number": "9" }, ] });