diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 55e2da2da60..505353c15dd 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -21,7 +21,7 @@ use crate::components::store::{BlockNumber, DeploymentLocator, WritableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; use crate::firehose::{self, FirehoseEndpoint}; use crate::futures03::stream::StreamExt as _; -use crate::schema::{EntityType, InputSchema}; +use crate::schema::InputSchema; use crate::substreams_rpc::response::Message; use crate::{prelude::*, prometheus::labels}; @@ -433,18 +433,22 @@ async fn get_entities_for_range( from: BlockNumber, to: BlockNumber, ) -> Result>, Error> { - let entity_types: Vec = filter - .entities - .iter() - .map(|e| schema.entity_type(e).unwrap()) - .collect(); - let mut entities = BTreeMap::new(); - for entity_type in entity_types { - let range = from..to; - let mut entities_for_type = store.get_range(&entity_type, range)?; - entities.append(&mut entities_for_type); + let mut entities_by_block = BTreeMap::new(); + + for entity_name in &filter.entities { + let entity_type = schema.entity_type(entity_name)?; + + let entity_ranges = store.get_range(&entity_type, from..to)?; + + for (block_number, mut entity_vec) in entity_ranges { + entities_by_block + .entry(block_number) + .and_modify(|existing_vec: &mut Vec| existing_vec.append(&mut entity_vec)) + .or_insert(entity_vec); + } } - Ok(entities) + + Ok(entities_by_block) } impl TriggersAdapterWrapper {