From b35754e24f2ef20c8277ad4bec85c1c95ca6444e Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 22 Aug 2024 18:03:15 +0530 Subject: [PATCH] graph: fix subgraph filter mismatch bug --- graph/src/blockchain/block_stream.rs | 31 +++++++++++++++++++++------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 505353c15dd..7704d772731 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -21,7 +21,7 @@ use crate::components::store::{BlockNumber, DeploymentLocator, WritableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; use crate::firehose::{self, FirehoseEndpoint}; use crate::futures03::stream::StreamExt as _; -use crate::schema::InputSchema; +use crate::schema::{EntityType, InputSchema}; use crate::substreams_rpc::response::Message; use crate::{prelude::*, prometheus::labels}; @@ -350,14 +350,14 @@ impl TriggersAdapterWrapper { fn create_subgraph_trigger_from_entities( filter: &SubgraphFilter, - entities: &Vec, + entities: &Vec, ) -> Vec { entities .iter() .map(|e| subgraph::TriggerData { source: filter.subgraph.clone(), - entity: e.clone(), - entity_type: filter.entities.first().unwrap().clone(), + entity: e.entity.clone(), + entity_type: e.entity_type.as_str().to_string(), }) .collect() } @@ -366,7 +366,7 @@ async fn create_subgraph_triggers( logger: Logger, blocks: Vec, filter: &SubgraphFilter, - entities: BTreeMap>, + entities: BTreeMap>, ) -> Result>, Error> { let logger_clone = logger.cheap_clone(); @@ -426,13 +426,18 @@ async fn scan_subgraph_triggers( } } +pub struct EntityWithType { + pub entity_type: EntityType, + pub entity: Entity, +} + async fn get_entities_for_range( store: &Arc, filter: &SubgraphFilter, schema: &InputSchema, from: BlockNumber, to: BlockNumber, -) -> Result>, Error> { +) -> Result>, Error> { let mut entities_by_block = BTreeMap::new(); for entity_name in &filter.entities { @@ -440,10 +445,20 @@ async fn get_entities_for_range( let entity_ranges = store.get_range(&entity_type, from..to)?; - for (block_number, mut entity_vec) in entity_ranges { + for (block_number, entity_vec) in entity_ranges { + let mut entity_vec = entity_vec + .into_iter() + .map(|e| EntityWithType { + entity_type: entity_type.clone(), + entity: e, + }) + .collect(); + entities_by_block .entry(block_number) - .and_modify(|existing_vec: &mut Vec| existing_vec.append(&mut entity_vec)) + .and_modify(|existing_vec: &mut Vec| { + existing_vec.append(&mut entity_vec); + }) .or_insert(entity_vec); } }