From e03a54919965439f109c14e5df95a23272bfa2fa Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Fri, 6 Sep 2024 19:29:59 +0300 Subject: [PATCH] Add casuality region to the SQL querry --- graph/src/blockchain/block_stream.rs | 4 ++-- graph/src/components/store/mod.rs | 1 + graph/src/components/store/traits.rs | 4 +++- store/postgres/src/deployment_store.rs | 3 ++- store/postgres/src/relational.rs | 5 +++-- store/postgres/src/relational_queries.rs | 15 +++++++++------ store/postgres/src/writable.rs | 17 +++++++++++++---- store/test-store/tests/graph/entity_cache.rs | 1 + store/test-store/tests/postgres/writable.rs | 6 ++++-- 9 files changed, 38 insertions(+), 18 deletions(-) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 9f8b6fce8a9..9cc7e4b710e 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,5 +1,5 @@ use crate::blockchain::SubgraphFilter; -use crate::data_source::subgraph; +use crate::data_source::{subgraph, CausalityRegion}; use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; use crate::substreams_rpc::BlockScopedData; @@ -453,7 +453,7 @@ async fn get_entities_for_range( let entity_type = schema.entity_type(entity_name)?; entity_types.push(entity_type); } - Ok(store.get_range(entity_types, from..to)?) + Ok(store.get_range(entity_types, CausalityRegion::ONCHAIN, from..to)?) } impl TriggersAdapterWrapper { diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index ccaa78a9a4f..dc7dfe8150c 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1043,6 +1043,7 @@ impl ReadStore for EmptyStore { fn get_range( &self, _entity_types: Vec, + _causality_region: CausalityRegion, _block_range: Range, ) -> Result>, StoreError> { Ok(BTreeMap::new()) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index a7cecad129b..1fb46e344f7 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -232,6 +232,7 @@ pub trait ReadStore: Send + Sync + 'static { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError>; @@ -260,9 +261,10 @@ impl ReadStore for Arc { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { - (**self).get_range(entity_types, block_range) + (**self).get_range(entity_types, causality_region, block_range) } fn get_derived( diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index faa50a0d1ad..9870179a620 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1060,11 +1060,12 @@ impl DeploymentStore { &self, site: Arc, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { let mut conn = self.get_conn()?; let layout = self.layout(&mut conn, site)?; - layout.find_range(&mut conn, entity_types, block_range) + layout.find_range(&mut conn, entity_types, causality_region, block_range) } pub(crate) fn get_derived( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 4402dcefb6c..be9f155e4ba 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -520,6 +520,7 @@ impl Layout { &self, conn: &mut PgConnection, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { let mut tables = vec![]; @@ -529,11 +530,11 @@ impl Layout { et_map.insert(et.to_string(), Arc::new(et)); } let mut entities: BTreeMap> = BTreeMap::new(); - let lower_vec = FindRangeQuery::new(&tables, false, block_range.clone()) + let lower_vec = FindRangeQuery::new(&tables, causality_region, false, block_range.clone()) .get_results::(conn) .optional()? .unwrap_or_default(); - let upper_vec = FindRangeQuery::new(&tables, true, block_range) + let upper_vec = FindRangeQuery::new(&tables, causality_region, true, block_range) .get_results::(conn) .optional()? .unwrap_or_default(); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 9776aef9d01..94ae41eb262 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2024,6 +2024,7 @@ impl<'a, Conn> RunQueryDsl for FindQuery<'a> {} #[derive(Debug, Clone)] pub struct FindRangeQuery<'a> { tables: &'a Vec<&'a Table>, + causality_region: CausalityRegion, is_upper_range: bool, imm_range: EntityBlockRange, mut_range: EntityBlockRange, @@ -2032,6 +2033,7 @@ pub struct FindRangeQuery<'a> { impl<'a> FindRangeQuery<'a> { pub fn new( tables: &'a Vec<&Table>, + causality_region: CausalityRegion, is_upper_range: bool, block_range: Range, ) -> Self { @@ -2039,6 +2041,7 @@ impl<'a> FindRangeQuery<'a> { let mut_range = EntityBlockRange::new(false, block_range, is_upper_range); Self { tables, + causality_region, is_upper_range, imm_range, mut_range, @@ -2075,12 +2078,12 @@ impl<'a> QueryFragment for FindRangeQuery<'a> { out.push_sql(" from "); out.push_sql(table.qualified_name.as_str()); out.push_sql(" e\n where"); - // TODO: add casuality region to the query - // if self.table.has_causality_region { - // out.push_sql("causality_region = "); - // out.push_bind_param::(&self.key.causality_region)?; - // out.push_sql(" and "); - // } + // add casuality region to the query + if table.has_causality_region { + out.push_sql("causality_region = "); + out.push_bind_param::(&self.causality_region)?; + out.push_sql(" and "); + } if table.immutable { self.imm_range.contains(&mut out)?; } else { diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 3436db04004..ae89bc56e58 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -355,12 +355,14 @@ impl SyncStore { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { retry::forever(&self.logger, "get_range", || { self.writable.get_range( self.site.cheap_clone(), entity_types.clone(), + causality_region, block_range.clone(), ) }) @@ -1234,9 +1236,11 @@ impl Queue { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { - self.store.get_range(entity_types, block_range) + self.store + .get_range(entity_types, causality_region, block_range) } fn get_derived( @@ -1454,11 +1458,14 @@ impl Writer { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { match self { - Writer::Sync(store) => store.get_range(entity_types, block_range), - Writer::Async { queue, .. } => queue.get_range(entity_types, block_range), + Writer::Sync(store) => store.get_range(entity_types, causality_region, block_range), + Writer::Async { queue, .. } => { + queue.get_range(entity_types, causality_region, block_range) + } } } @@ -1593,9 +1600,11 @@ impl ReadStore for WritableStore { fn get_range( &self, entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { - self.writer.get_range(entity_types, block_range) + self.writer + .get_range(entity_types, causality_region, block_range) } fn get_derived( diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 15315d89333..9762dcf68a9 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -70,6 +70,7 @@ impl ReadStore for MockStore { fn get_range( &self, _entity_types: Vec, + _causality_region: CausalityRegion, _block_range: Range, ) -> Result>, StoreError> { todo!() diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index ef619c13323..2540c62f966 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -343,7 +343,7 @@ fn read_range_test() { let br: Range = 0..18; let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()]; let e: BTreeMap> = writable - .get_range(entity_types.clone(), br.clone()) + .get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone()) .unwrap(); assert_eq!(e.len(), 5); for en in &e { @@ -356,7 +356,9 @@ fn read_range_test() { } writable.flush().await.unwrap(); writable.deployment_synced().unwrap(); - let e: BTreeMap> = writable.get_range(entity_types, br).unwrap(); + let e: BTreeMap> = writable + .get_range(entity_types, CausalityRegion::ONCHAIN, br) + .unwrap(); assert_eq!(e.len(), 7); for en in &e { let index = *en.0 - 1;