From 1ce283e79fd1643da5154db05289c480874b68c6 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Fri, 26 Jul 2024 00:37:14 +0300 Subject: [PATCH 1/6] read entities from database --- graph/src/blockchain/block_stream.rs | 91 +++++++++++++++++++++++++--- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 3c5f08851f2..967e0cda1d8 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,3 +1,4 @@ +use crate::blockchain::SubgraphFilter; use crate::data::store::scalar; use crate::data_source::subgraph; use crate::substreams::Clock; @@ -7,17 +8,16 @@ use anyhow::Error; use async_stream::stream; use futures03::Stream; use prost_types::Any; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::fmt; +use std::ops::Range; use std::sync::Arc; use std::time::Instant; use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use super::substreams_block_stream::SubstreamsLogData; -use super::{ - Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper, -}; +use super::{Block, BlockPtr, BlockTime, Blockchain, Trigger, TriggerFilterWrapper}; use crate::anyhow::Result; use crate::components::store::{BlockNumber, DeploymentLocator, WritableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; @@ -350,9 +350,44 @@ impl TriggersAdapterWrapper { filter: &Arc>, ) -> Result<(Vec>, BlockNumber), Error> { if !filter.subgraph_filter.is_empty() { - return self - .subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter) - .await; + // TODO: handle empty range, or empty entity set bellow + if to <= from { + return self + .mock_subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter) + .await; + } + + if let Some(SubgraphFilter { + subgraph: dh, + start_block: _sb, + entities: ent, + }) = filter.subgraph_filter.first() + { + if let Some((dh2, store)) = self.source_subgraph_stores.first() { + if dh == dh2 { + let schema = crate::components::store::ReadStore::input_schema(store); + if let Some(entity_type) = ent.first() { + let et = schema.entity_type(entity_type).unwrap(); + + let f: u32 = from as u32; + let t: u32 = to as u32; + let br: Range = f..t; + let entities = store.get_range(&et, br)?; + if !entities.is_empty() { + return self + .subgraph_triggers( + Logger::root(slog::Discard, o!()), + from, + to, + filter, + entities, + ) + .await; + } + } + } + } + } } self.adapter @@ -381,10 +416,50 @@ impl TriggersAdapterWrapper { self.adapter.chain_head_ptr().await } + async fn subgraph_triggers( + &self, + logger: Logger, + from: BlockNumber, + to: BlockNumber, + filter: &Arc>, + entities: BTreeMap, + ) -> Result<(Vec>, BlockNumber), Error> { + let logger2 = logger.cheap_clone(); + let adapter = self.adapter.clone(); + let first_filter = filter.subgraph_filter.first().unwrap(); + let blocks = adapter + .load_blocks_by_numbers(logger, HashSet::from_iter(from..to)) + .await? + .into_iter() + .map(|block| { + let key = block.number(); + let entity = entities.get(&key).unwrap(); + let trigger_data = vec![Self::create_subgraph_trigger_from_entity( + first_filter, + entity, + )]; + BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2) + }) + .collect(); + + Ok((blocks, to)) + } + + fn create_subgraph_trigger_from_entity( + filter: &SubgraphFilter, + entity: &Entity, + ) -> subgraph::TriggerData { + subgraph::TriggerData { + source: filter.subgraph.clone(), + entity: entity.clone(), + entity_type: filter.entities.first().unwrap().clone(), + } + } + // TODO(krishna): Currently this is a mock implementation of subgraph triggers. // This will be replaced with the actual implementation which will use the filters to // query the database of the source subgraph and return the entity triggers. - async fn subgraph_triggers( + async fn mock_subgraph_triggers( &self, logger: Logger, from: BlockNumber, From 34535512a8244d91021a0226f76160d309e23cd8 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Mon, 29 Jul 2024 15:46:04 +0300 Subject: [PATCH 2/6] clean up --- graph/src/blockchain/block_stream.rs | 56 ++++++---------------------- 1 file changed, 12 insertions(+), 44 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 967e0cda1d8..b3acabcc289 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -352,8 +352,15 @@ impl TriggersAdapterWrapper { if !filter.subgraph_filter.is_empty() { // TODO: handle empty range, or empty entity set bellow if to <= from { + let entities = BTreeMap::::new(); return self - .mock_subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter) + .subgraph_triggers( + Logger::root(slog::Discard, o!()), + from, + to, + filter, + entities, + ) .await; } @@ -433,7 +440,10 @@ impl TriggersAdapterWrapper { .into_iter() .map(|block| { let key = block.number(); - let entity = entities.get(&key).unwrap(); + let entity = match entities.get(&key) { + Some(e) => e, + None => &Self::create_mock_entity(&block), + }; let trigger_data = vec![Self::create_subgraph_trigger_from_entity( first_filter, entity, @@ -456,48 +466,6 @@ impl TriggersAdapterWrapper { } } - // TODO(krishna): Currently this is a mock implementation of subgraph triggers. - // This will be replaced with the actual implementation which will use the filters to - // query the database of the source subgraph and return the entity triggers. - async fn mock_subgraph_triggers( - &self, - logger: Logger, - from: BlockNumber, - to: BlockNumber, - filter: &Arc>, - ) -> Result<(Vec>, BlockNumber), Error> { - let logger2 = logger.cheap_clone(); - let adapter = self.adapter.clone(); - // let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?; - // let to = to_ptr.block_number(); - - let first_filter = filter.subgraph_filter.first().unwrap(); - - let blocks = adapter - .load_blocks_by_numbers(logger, HashSet::from_iter(from..=to)) - .await? - .into_iter() - .map(|block| { - let trigger_data = vec![Self::create_mock_subgraph_trigger(first_filter, &block)]; - BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2) - }) - .collect(); - - Ok((blocks, to)) - } - - fn create_mock_subgraph_trigger( - filter: &SubgraphFilter, - block: &C::Block, - ) -> subgraph::TriggerData { - let mock_entity = Self::create_mock_entity(block); - subgraph::TriggerData { - source: filter.subgraph.clone(), - entity: mock_entity, - entity_type: filter.entities.first().unwrap().clone(), - } - } - fn create_mock_entity(block: &C::Block) -> Entity { let id = DeploymentHash::new("test").unwrap(); let data_schema = InputSchema::parse_latest( From cd389f9ce424cf9a11bf3af66c1714918fd4c73a Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Tue, 30 Jul 2024 00:09:50 +0300 Subject: [PATCH 3/6] simplify --- graph/src/blockchain/block_stream.rs | 33 ++++++++-------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index b3acabcc289..10bbbf1d544 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -351,18 +351,6 @@ impl TriggersAdapterWrapper { ) -> Result<(Vec>, BlockNumber), Error> { if !filter.subgraph_filter.is_empty() { // TODO: handle empty range, or empty entity set bellow - if to <= from { - let entities = BTreeMap::::new(); - return self - .subgraph_triggers( - Logger::root(slog::Discard, o!()), - from, - to, - filter, - entities, - ) - .await; - } if let Some(SubgraphFilter { subgraph: dh, @@ -380,23 +368,20 @@ impl TriggersAdapterWrapper { let t: u32 = to as u32; let br: Range = f..t; let entities = store.get_range(&et, br)?; - if !entities.is_empty() { - return self - .subgraph_triggers( - Logger::root(slog::Discard, o!()), - from, - to, - filter, - entities, - ) - .await; - } + return self + .subgraph_triggers( + Logger::root(slog::Discard, o!()), + from, + to, + filter, + entities, + ) + .await; } } } } } - self.adapter .scan_triggers(from, to, &filter.chain_filter) .await From 66e37baf522e325ae16d7c1462811b216d4c9e42 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Tue, 30 Jul 2024 14:38:50 +0300 Subject: [PATCH 4/6] remove mocks --- graph/src/blockchain/block_stream.rs | 47 ++++++++-------------------- tests/tests/integration_tests.rs | 15 +++++---- 2 files changed, 20 insertions(+), 42 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 10bbbf1d544..21154fa682b 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,5 +1,4 @@ use crate::blockchain::SubgraphFilter; -use crate::data::store::scalar; use crate::data_source::subgraph; use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; @@ -425,16 +424,20 @@ impl TriggersAdapterWrapper { .into_iter() .map(|block| { let key = block.number(); - let entity = match entities.get(&key) { - Some(e) => e, - None => &Self::create_mock_entity(&block), - }; - let trigger_data = vec![Self::create_subgraph_trigger_from_entity( - first_filter, - entity, - )]; - BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2) + match entities.get(&key) { + Some(e) => { + let trigger_data = + vec![Self::create_subgraph_trigger_from_entity(first_filter, e)]; + Some(BlockWithTriggers::new_with_subgraph_triggers( + block, + trigger_data, + &logger2, + )) + } + None => None, + } }) + .flatten() .collect(); Ok((blocks, to)) @@ -450,30 +453,6 @@ impl TriggersAdapterWrapper { entity_type: filter.entities.first().unwrap().clone(), } } - - fn create_mock_entity(block: &C::Block) -> Entity { - let id = DeploymentHash::new("test").unwrap(); - let data_schema = InputSchema::parse_latest( - "type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }", - id.clone(), - ) - .unwrap(); - - let block = block.ptr(); - let hash = Value::Bytes(scalar::Bytes::from(block.hash_slice().to_vec())); - let data = data_schema - .make_entity(vec![ - ("id".into(), hash.clone()), - ( - "number".into(), - Value::BigInt(scalar::BigInt::from(block.block_number())), - ), - ("hash".into(), hash), - ]) - .unwrap(); - - data - } } #[async_trait] diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 647d1cc6f76..fdc82b03510 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -522,16 +522,15 @@ async fn subgraph_data_sources(ctx: TestContext) -> anyhow::Result<()> { assert!(subgraph.healthy); let expected_response = json!({ "mirrorBlocks": [ - { "number": "0" }, - { "number": "1" }, + { "number": "1" }, { "number": "2" }, - { "number": "3" }, - { "number": "4" }, - { "number": "5" }, + { "number": "3" }, + { "number": "4" }, + { "number": "5" }, { "number": "6" }, - { "number": "7" }, - { "number": "8" }, - { "number": "9" }, + { "number": "7" }, + { "number": "8" }, + { "number": "9" }, ] }); From b21706b5c1b97b857d7f67e20bde8a6c93b36226 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Tue, 30 Jul 2024 17:06:10 +0300 Subject: [PATCH 5/6] fixes --- graph/src/blockchain/block_stream.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 21154fa682b..c1b409bef47 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -445,13 +445,16 @@ impl TriggersAdapterWrapper { fn create_subgraph_trigger_from_entity( filter: &SubgraphFilter, - entity: &Entity, - ) -> subgraph::TriggerData { - subgraph::TriggerData { - source: filter.subgraph.clone(), - entity: entity.clone(), - entity_type: filter.entities.first().unwrap().clone(), - } + entity: &Vec, + ) -> Vec { + entity + .iter() + .map(|e| subgraph::TriggerData { + source: filter.subgraph.clone(), + entity: e.clone(), + entity_type: filter.entities.first().unwrap().clone(), + }) + .collect() } } From 44c0385bea7182369c9c0d82f186b143dfe1272a Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Tue, 13 Aug 2024 14:33:33 +0300 Subject: [PATCH 6/6] fixes --- graph/src/blockchain/block_stream.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index c1b409bef47..ee69083588e 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -363,9 +363,7 @@ impl TriggersAdapterWrapper { if let Some(entity_type) = ent.first() { let et = schema.entity_type(entity_type).unwrap(); - let f: u32 = from as u32; - let t: u32 = to as u32; - let br: Range = f..t; + let br: Range = from..to; let entities = store.get_range(&et, br)?; return self .subgraph_triggers( @@ -413,7 +411,7 @@ impl TriggersAdapterWrapper { from: BlockNumber, to: BlockNumber, filter: &Arc>, - entities: BTreeMap, + entities: BTreeMap>, ) -> Result<(Vec>, BlockNumber), Error> { let logger2 = logger.cheap_clone(); let adapter = self.adapter.clone(); @@ -427,7 +425,7 @@ impl TriggersAdapterWrapper { match entities.get(&key) { Some(e) => { let trigger_data = - vec![Self::create_subgraph_trigger_from_entity(first_filter, e)]; + Self::create_subgraph_trigger_from_entity(first_filter, e); Some(BlockWithTriggers::new_with_subgraph_triggers( block, trigger_data,