diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 324cdbafb2f..d0c59f08f3d 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,5 +1,5 @@ use crate::blockchain::SubgraphFilter; -use crate::data_source::subgraph; +use crate::data_source::{subgraph, CausalityRegion}; use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; use crate::substreams_rpc::BlockScopedData; @@ -433,9 +433,19 @@ async fn scan_subgraph_triggers( } } +#[derive(Debug)] +pub enum EntitySubgraphOperation { + Create, + Modify, + Delete, +} + +#[derive(Debug)] pub struct EntityWithType { + pub entity_op: EntitySubgraphOperation, pub entity_type: EntityType, pub entity: Entity, + pub vid: i64, } async fn get_entities_for_range( @@ -445,32 +455,12 @@ async fn get_entities_for_range( from: BlockNumber, to: BlockNumber, ) -> Result>, Error> { - let mut entities_by_block = BTreeMap::new(); - - for entity_name in &filter.entities { - let entity_type = schema.entity_type(entity_name)?; - - let entity_ranges = store.get_range(&entity_type, from..to)?; - - for (block_number, entity_vec) in entity_ranges { - let mut entity_vec = entity_vec - .into_iter() - .map(|e| EntityWithType { - entity_type: entity_type.clone(), - entity: e, - }) - .collect(); - - entities_by_block - .entry(block_number) - .and_modify(|existing_vec: &mut Vec| { - existing_vec.append(&mut entity_vec); - }) - .or_insert(entity_vec); - } - } - - Ok(entities_by_block) + let entity_types: Result> = filter + .entities + .iter() + .map(|name| schema.entity_type(name)) + .collect(); + Ok(store.get_range(entity_types?, CausalityRegion::ONCHAIN, from..to)?) } impl TriggersAdapterWrapper { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 7ed6a4bc36e..cb26df66880 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use web3::types::{Address, H256}; use super::*; -use crate::blockchain::block_stream::FirehoseCursor; +use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use crate::blockchain::{BlockTime, ChainIdentifier}; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::server::index_node::VersionInfo; @@ -299,9 +299,10 @@ pub trait SourceableStore: Sync + Send + 'static { /// changed in the given block_range. fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, - ) -> Result>, StoreError>; + ) -> Result>, StoreError>; fn input_schema(&self) -> InputSchema; @@ -314,10 +315,11 @@ pub trait SourceableStore: Sync + Send + 'static { impl SourceableStore for Arc { fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, - ) -> Result>, StoreError> { - (**self).get_range(entity_type, block_range) + ) -> Result>, StoreError> { + (**self).get_range(entity_types, causality_region, block_range) } fn input_schema(&self) -> InputSchema { diff --git a/store/postgres/src/block_range.rs b/store/postgres/src/block_range.rs index 5f3c9f014bc..7dbcaa29c00 100644 --- a/store/postgres/src/block_range.rs +++ b/store/postgres/src/block_range.rs @@ -132,36 +132,52 @@ impl<'a> QueryFragment for BlockRangeUpperBoundClause<'a> { } } +#[derive(Debug, Clone, Copy)] +pub enum BoundSide { + Lower, + Upper, +} + /// Helper for generating SQL fragments for selecting entities in a specific block range #[derive(Debug, Clone, Copy)] pub enum EntityBlockRange { - Mutable(BlockRange), // TODO: check if this is a proper type here (maybe Range?) + Mutable((BlockRange, BoundSide)), Immutable(BlockRange), } impl EntityBlockRange { - pub fn new(table: &Table, block_range: std::ops::Range) -> Self { + pub fn new( + immutable: bool, + block_range: std::ops::Range, + bound_side: BoundSide, + ) -> Self { let start: Bound = Bound::Included(block_range.start); let end: Bound = Bound::Excluded(block_range.end); let block_range: BlockRange = BlockRange(start, end); - if table.immutable { + if immutable { Self::Immutable(block_range) } else { - Self::Mutable(block_range) + Self::Mutable((block_range, bound_side)) } } - /// Output SQL that matches only rows whose block range contains `block`. + /// Outputs SQL that matches only rows whose entities would trigger a change + /// event (Create, Modify, Delete) in a given interval of blocks. Otherwise said + /// a block_range border is contained in an interval of blocks. For instance + /// one of the following: + /// lower(block_range) >= $1 and lower(block_range) <= $2 + /// upper(block_range) >= $1 and upper(block_range) <= $2 + /// block$ >= $1 and block$ <= $2 pub fn contains<'b>(&'b self, out: &mut AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.unsafe_to_cache_prepared(); let block_range = match self { - EntityBlockRange::Mutable(br) => br, + EntityBlockRange::Mutable((br, _)) => br, EntityBlockRange::Immutable(br) => br, }; let BlockRange(start, finish) = block_range; self.compare_column(out); - out.push_sql(" >= "); + out.push_sql(">= "); match start { Bound::Included(block) => out.push_bind_param::(block)?, Bound::Excluded(block) => { @@ -170,9 +186,9 @@ impl EntityBlockRange { } Bound::Unbounded => unimplemented!(), }; - out.push_sql(" AND "); + out.push_sql(" and"); self.compare_column(out); - out.push_sql(" <= "); + out.push_sql("<= "); match finish { Bound::Included(block) => { out.push_bind_param::(block)?; @@ -186,7 +202,12 @@ impl EntityBlockRange { pub fn compare_column(&self, out: &mut AstPass) { match self { - EntityBlockRange::Mutable(_) => out.push_sql(" lower(block_range) "), + EntityBlockRange::Mutable((_, BoundSide::Lower)) => { + out.push_sql(" lower(block_range) ") + } + EntityBlockRange::Mutable((_, BoundSide::Upper)) => { + out.push_sql(" upper(block_range) ") + } EntityBlockRange::Immutable(_) => out.push_sql(" block$ "), } } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d0ca873009a..0350973151f 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -4,7 +4,7 @@ use diesel::pg::PgConnection; use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::{prelude::*, sql_query}; use graph::anyhow::Context; -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::write::RowGroup; use graph::components::store::{ @@ -1066,12 +1066,13 @@ impl DeploymentStore { pub(crate) fn get_range( &self, site: Arc, - entity_type: &EntityType, + entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, - ) -> Result>, StoreError> { + ) -> Result>, StoreError> { let mut conn = self.get_conn()?; let layout = self.layout(&mut conn, site)?; - layout.find_range(&mut conn, entity_type, block_range) + layout.find_range(&mut conn, entity_types, causality_region, block_range) } pub(crate) fn get_derived( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 13a81876325..dc71dc8703c 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -28,6 +28,7 @@ use diesel::{connection::SimpleConnection, Connection}; use diesel::{ debug_query, sql_query, OptionalExtension, PgConnection, QueryDsl, QueryResult, RunQueryDsl, }; +use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType}; use graph::blockchain::BlockTime; use graph::cheap_clone::CheapClone; use graph::components::store::write::{RowGroup, WriteChunk}; @@ -57,8 +58,8 @@ use std::time::{Duration, Instant}; use crate::relational::value::{FromOidRow, OidRow}; use crate::relational_queries::{ - ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery, - FindPossibleDeletionsQuery, ReturnedEntityData, + ConflictingEntitiesData, ConflictingEntitiesQuery, EntityDataExt, FindChangesQuery, + FindDerivedQuery, FindPossibleDeletionsQuery, ReturnedEntityData, }; use crate::{ primary::{Namespace, Site}, @@ -75,7 +76,7 @@ use graph::prelude::{ QueryExecutionError, StoreError, StoreEvent, ValueType, BLOCK_NUMBER_MAX, }; -use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; +use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; pub use crate::catalog::Catalog; use crate::connection_pool::ForeignServer; use crate::{catalog, deployment}; @@ -545,21 +546,129 @@ impl Layout { pub fn find_range( &self, conn: &mut PgConnection, - entity_type: &EntityType, + entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, - ) -> Result>, StoreError> { - let table = self.table_for_entity(entity_type)?; - let mut entities: BTreeMap> = BTreeMap::new(); - if let Some(vec) = FindRangeQuery::new(table.as_ref(), block_range) - .get_results::(conn) - .optional()? - { - for e in vec { - let block = e.clone().deserialize_block_number::()?; - let en = e.deserialize_with_layout::(self, None)?; - entities.entry(block).or_default().push(en); - } + ) -> Result>, StoreError> { + let mut tables = vec![]; + for et in entity_types { + tables.push(self.table_for_entity(&et)?.as_ref()); } + let mut entities: BTreeMap> = BTreeMap::new(); + + // Collect all entities that have their 'lower(block_range)' attribute in the + // interval of blocks defined by the variable block_range. For the immutable + // entities the respective attribute is 'block$'. + // Here are all entities that are created or modified in the block_range. + let lower_vec = FindRangeQuery::new( + &tables, + causality_region, + BoundSide::Lower, + block_range.clone(), + ) + .get_results::(conn) + .optional()? + .unwrap_or_default(); + // Collect all entities that have their 'upper(block_range)' attribute in the + // interval of blocks defined by the variable block_range. For the immutable + // entities no entries are returned. + // Here are all entities that are modified or deleted in the block_range, + // but will have the previous versions, i.e. in the case of an update, it's + // the version before the update, and lower_vec will have a corresponding + // entry with the new version. + let upper_vec = + FindRangeQuery::new(&tables, causality_region, BoundSide::Upper, block_range) + .get_results::(conn) + .optional()? + .unwrap_or_default(); + let mut lower_iter = lower_vec.iter().fuse().peekable(); + let mut upper_iter = upper_vec.iter().fuse().peekable(); + let mut lower_now = lower_iter.next(); + let mut upper_now = upper_iter.next(); + // A closure to convert the entity data from the database into entity operation. + let transform = |ede: &EntityDataExt, + entity_op: EntitySubgraphOperation| + -> Result<(EntityWithType, BlockNumber), StoreError> { + let e = EntityData::new(ede.entity.clone(), ede.data.clone()); + let block = ede.block_number; + let entity_type = e.entity_type(&self.input_schema); + let entity = e.deserialize_with_layout::(self, None)?; + let vid = ede.vid; + let ewt = EntityWithType { + entity_op, + entity_type, + entity, + vid, + }; + Ok((ewt, block)) + }; + + // The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors + // are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from + // both lower_vec and upper_vec and tries to match entities that have entries in both vectors for + // a particular block. The match is successful if an entry in one array has the same values in the + // other one for the number of the block, entity type and the entity id. The comparison operation + // over the EntityDataExt implements that check. If there is a match it’s a modification operation, + // since both sides of a range are present for that block, entity type and id. If one side of the + // range exists and the other is missing it is a creation or deletion depending on which side is + // present. For immutable entities the entries in upper_vec are missing, hence they are considered + // having a lower bound at particular block and upper bound at infinity. + while lower_now.is_some() || upper_now.is_some() { + let (ewt, block) = match (lower_now, upper_now) { + (Some(lower), Some(upper)) => { + match lower.cmp(&upper) { + std::cmp::Ordering::Greater => { + // we have upper bound at this block, but no lower bounds at the same block so it's deletion + let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?; + // advance upper_vec pointer + upper_now = upper_iter.next(); + (ewt, block) + } + std::cmp::Ordering::Less => { + // we have lower bound at this block but no upper bound at the same block so its creation + let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?; + // advance lower_vec pointer + lower_now = lower_iter.next(); + (ewt, block) + } + std::cmp::Ordering::Equal => { + let (ewt, block) = transform(lower, EntitySubgraphOperation::Modify)?; + // advance both lower_vec and upper_vec pointers + lower_now = lower_iter.next(); + upper_now = upper_iter.next(); + (ewt, block) + } + } + } + (Some(lower), None) => { + // we have lower bound at this block but no upper bound at the same block so its creation + let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?; + // advance lower_vec pointer + lower_now = lower_iter.next(); + (ewt, block) + } + (None, Some(upper)) => { + let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?; + // advance upper_vec pointer + upper_now = upper_iter.next(); + (ewt, block) + } + _ => panic!("Imposible case to happen"), + }; + + match entities.get_mut(&block) { + Some(vec) => vec.push(ewt), + None => { + let _ = entities.insert(block, vec![ewt]); + } + }; + } + + // sort the elements in each blocks bucket by vid + for (_, vec) in &mut entities { + vec.sort_by(|a, b| a.vid.cmp(&b.vid)); + } + Ok(entities) } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 55ade522dd7..b78984012e9 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -18,7 +18,6 @@ use graph::data::store::{Id, IdType, NULL}; use graph::data::store::{IdList, IdRef, QueryObject}; use graph::data::value::{Object, Word}; use graph::data_source::CausalityRegion; -use graph::prelude::regex::Regex; use graph::prelude::{ anyhow, r, serde_json, BlockNumber, ChildMultiplicity, Entity, EntityCollection, EntityFilter, EntityLink, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityRange, EntityWindow, @@ -28,6 +27,7 @@ use graph::schema::{EntityType, FulltextAlgorithm, FulltextConfig, InputSchema}; use graph::{components::store::AttributeNames, data::store::scalar}; use inflector::Inflector; use itertools::Itertools; +use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::convert::TryFrom; use std::fmt::{self, Display}; @@ -36,7 +36,7 @@ use std::ops::Range; use std::str::FromStr; use std::string::ToString; -use crate::block_range::EntityBlockRange; +use crate::block_range::{BoundSide, EntityBlockRange}; use crate::relational::dsl::AtBlock; use crate::relational::{ dsl, Column, ColumnType, Layout, SqlName, Table, BYTE_ARRAY_PREFIX_SIZE, PRIMARY_KEY_COLUMN, @@ -454,40 +454,12 @@ pub struct EntityData { } impl EntityData { - pub fn entity_type(&self, schema: &InputSchema) -> EntityType { - schema.entity_type(&self.entity).unwrap() + pub fn new(entity: String, data: serde_json::Value) -> EntityData { + EntityData { entity, data } } - pub fn deserialize_block_number(self) -> Result { - use serde_json::Value as j; - match self.data { - j::Object(map) => { - let mut entries = map.into_iter().filter_map(move |(key, json)| { - if key == "block_range" { - let r = json.as_str().unwrap(); - let rx = Regex::new("\\[(?P[0-9]+),([0-9]+)?\\)").unwrap(); - let cap = rx.captures(r).unwrap(); - let start = cap - .name("start") - .map(|mtch| mtch.as_str().to_string()) - .unwrap(); - let n = start.parse::().unwrap(); - Some(n) - } else if key == "block$" { - let block = json.as_i64().unwrap() as i32; - Some(block) - } else { - None - } - }); - let en = entries.next().unwrap(); - assert!(entries.next().is_none()); // there should be just one block_range field - Ok(en) - } - _ => unreachable!( - "we use `to_json` in our queries, and will therefore always get an object back" - ), - } + pub fn entity_type(&self, schema: &InputSchema) -> EntityType { + schema.entity_type(&self.entity).unwrap() } /// Map the `EntityData` using the schema information in `Layout` @@ -562,6 +534,48 @@ impl EntityData { } } +#[derive(QueryableByName, Clone, Debug, Default, Eq)] +pub struct EntityDataExt { + #[diesel(sql_type = Text)] + pub entity: String, + #[diesel(sql_type = Jsonb)] + pub data: serde_json::Value, + #[diesel(sql_type = Integer)] + pub block_number: i32, + #[diesel(sql_type = Text)] + pub id: String, + #[diesel(sql_type = BigInt)] + pub vid: i64, +} + +impl Ord for EntityDataExt { + fn cmp(&self, other: &Self) -> Ordering { + let ord = self.block_number.cmp(&other.block_number); + if ord != Ordering::Equal { + ord + } else { + let ord = self.entity.cmp(&other.entity); + if ord != Ordering::Equal { + ord + } else { + self.id.cmp(&other.id) + } + } + } +} + +impl PartialOrd for EntityDataExt { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for EntityDataExt { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + /// The equivalent of `graph::data::store::Value` but in a form that does /// not require further transformation during `walk_ast`. This form takes /// the idiosyncrasies of how we serialize values into account (e.g., that @@ -1839,37 +1853,87 @@ impl<'a> QueryFragment for Filter<'a> { #[derive(Debug, Clone)] pub struct FindRangeQuery<'a> { - table: &'a Table, - eb_range: EntityBlockRange, + tables: &'a Vec<&'a Table>, + causality_region: CausalityRegion, + bound_side: BoundSide, + imm_range: EntityBlockRange, + mut_range: EntityBlockRange, } impl<'a> FindRangeQuery<'a> { - pub fn new(table: &'a Table, block_range: Range) -> Self { - let eb_range = EntityBlockRange::new(&table, block_range); - Self { table, eb_range } + pub fn new( + tables: &'a Vec<&Table>, + causality_region: CausalityRegion, + bound_side: BoundSide, + block_range: Range, + ) -> Self { + let imm_range = EntityBlockRange::new(true, block_range.clone(), bound_side); + let mut_range = EntityBlockRange::new(false, block_range, bound_side); + Self { + tables, + causality_region, + bound_side, + imm_range, + mut_range, + } } } impl<'a> QueryFragment for FindRangeQuery<'a> { fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.unsafe_to_cache_prepared(); + let mut first = true; - // Generate - // select '..' as entity, to_jsonb(e.*) as data - // from schema.table e where id = $1 - out.push_sql("select "); - out.push_bind_param::(self.table.object.as_str())?; - out.push_sql(" as entity, to_jsonb(e.*) as data\n"); - out.push_sql(" from "); - out.push_sql(self.table.qualified_name.as_str()); - out.push_sql(" e\n where "); - // TODO: do we need to care about it? - // if self.table.has_causality_region { - // out.push_sql("causality_region = "); - // out.push_bind_param::(&self.key.causality_region)?; - // out.push_sql(" and "); - // } - self.eb_range.contains(&mut out) + for table in self.tables.iter() { + // the immutable entities don't have upper range and also can't be modified or deleted + if matches!(self.bound_side, BoundSide::Lower) || !table.immutable { + if first { + first = false; + } else { + out.push_sql("\nunion all\n"); + } + + // Generate + // select '..' as entity, to_jsonb(e.*) as data, {BLOCK_STATEMENT} as block_number + // from schema.table e where ... + // Here the {BLOCK_STATEMENT} is 'block$' for immutable tables and either 'lower(block_range)' + // or 'upper(block_range)' depending on the bound_side variable. + out.push_sql("select "); + out.push_bind_param::(table.object.as_str())?; + out.push_sql(" as entity, to_jsonb(e.*) as data,"); + if table.immutable { + self.imm_range.compare_column(&mut out) + } else { + self.mut_range.compare_column(&mut out) + } + out.push_sql("as block_number, id, vid\n"); + out.push_sql(" from "); + out.push_sql(table.qualified_name.as_str()); + out.push_sql(" e\n where"); + // add casuality region to the query + if table.has_causality_region { + out.push_sql("causality_region = "); + out.push_bind_param::(&self.causality_region)?; + out.push_sql(" and "); + } + if table.immutable { + self.imm_range.contains(&mut out)?; + } else { + self.mut_range.contains(&mut out)?; + } + } + } + + if first { + // In case we have only immutable entities, the upper range will not create any + // select statement. So here we have to generate an SQL statement thet returns + // empty result. + out.push_sql("select 'dummy_entity' as entity, to_jsonb(1) as data, 1 as block_number, 1 as id, 1 as vid where false"); + } else { + out.push_sql("\norder by block_number, entity, id"); + } + + Ok(()) } } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index e1273bfe763..37c2ff8c897 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -6,7 +6,7 @@ use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; use async_trait::async_trait; -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore}; use graph::constraint_violation; @@ -1592,11 +1592,16 @@ impl SourceableStore { impl store::SourceableStore for SourceableStore { fn get_range( &self, - entity_type: &EntityType, + entity_types: Vec, + causality_region: CausalityRegion, block_range: Range, - ) -> Result>, StoreError> { - self.store - .get_range(self.site.clone(), entity_type, block_range) + ) -> Result>, StoreError> { + self.store.get_range( + self.site.clone(), + entity_types, + causality_region, + block_range, + ) } fn input_schema(&self) -> InputSchema { diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index d9e9ee989af..79e5aa188dd 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -1,10 +1,10 @@ -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::schema::{EntityKey, EntityType, InputSchema}; use lazy_static::lazy_static; -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::marker::PhantomData; use std::ops::Range; use test_store::*; @@ -137,62 +137,42 @@ async fn insert_count( deployment: &DeploymentLocator, block: u8, count: u8, - counter_type: &EntityType, - id: &str, - id2: &str, + immutable_only: bool, ) { - let count_key_local = |id: &str| counter_type.parse_key(id).unwrap(); + let count_key_local = |counter_type: &EntityType, id: &str| counter_type.parse_key(id).unwrap(); let data = entity! { TEST_SUBGRAPH_SCHEMA => - id: id, - count :count as i32, + id: "1", + count: count as i32 }; - let entity_op = EntityOperation::Set { - key: count_key_local(&data.get("id").unwrap().to_string()), - data, - }; - let data = entity! { TEST_SUBGRAPH_SCHEMA => - id: id2, - count :count as i32, + let entity_op = if block != 3 && block != 5 && block != 7 { + EntityOperation::Set { + key: count_key_local(&COUNTER_TYPE, &data.get("id").unwrap().to_string()), + data, + } + } else { + EntityOperation::Remove { + key: count_key_local(&COUNTER_TYPE, &data.get("id").unwrap().to_string()), + } }; - let entity_op2 = EntityOperation::Set { - key: count_key_local(&data.get("id").unwrap().to_string()), - data, + let mut ops = if immutable_only { + vec![] + } else { + vec![entity_op] }; - transact_entity_operations( - store, - deployment, - block_pointer(block), - vec![entity_op, entity_op2], - ) - .await - .unwrap(); -} - -async fn insert_count_mutable( - store: &Arc, - deployment: &DeploymentLocator, - block: u8, - count: u8, -) { - insert_count(store, deployment, block, count, &COUNTER_TYPE, "1", "2").await; -} - -async fn insert_count_immutable( - store: &Arc, - deployment: &DeploymentLocator, - block: u8, - count: u8, -) { - insert_count( - store, - deployment, - block, - count, - &COUNTER2_TYPE, - &(block).to_string(), - &(block + 1).to_string(), - ) - .await; + if block < 6 { + let data = entity! { TEST_SUBGRAPH_SCHEMA => + id: &block.to_string(), + count :count as i32, + }; + let entity_op = EntityOperation::Set { + key: count_key_local(&COUNTER2_TYPE, &data.get("id").unwrap().to_string()), + data, + }; + ops.push(entity_op); + } + transact_entity_operations(store, deployment, block_pointer(block), ops) + .await + .unwrap(); } async fn pause_writer(deployment: &DeploymentLocator) { @@ -220,13 +200,13 @@ where } for count in 1..4 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } // Test reading back with pending writes to the same entity pause_writer(&deployment).await; for count in 4..7 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } assert_eq!(6, read_count()); @@ -235,7 +215,7 @@ where // Test reading back with pending writes and a pending revert for count in 7..10 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } writable .revert_block_operations(block_pointer(2), FirehoseCursor::None) @@ -357,51 +337,71 @@ fn restart() { }) } -async fn read_range( - store: Arc, - writable: Arc, - sourceable: Arc, - deployment: DeploymentLocator, - mutable: bool, -) -> usize { - let subgraph_store = store.subgraph_store(); - writable.deployment_synced(block_pointer(0)).unwrap(); - - for count in 1..=7 { - if mutable { - insert_count_mutable(&subgraph_store, &deployment, 2 * count, 4 * count).await - } else { - insert_count_immutable(&subgraph_store, &deployment, 2 * count, 4 * count).await +#[test] +fn read_range_test() { + run_test(|store, writable, sourceable, deployment| async move { + let result_entities = vec![ + r#"(1, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }])"#, + r#"(2, [EntityWithType { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2") }, vid: 2 }])"#, + r#"(3, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3") }, vid: 3 }])"#, + r#"(4, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4") }, vid: 4 }])"#, + r#"(5, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5") }, vid: 5 }])"#, + r#"(6, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, + r#"(7, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, + ]; + let subgraph_store = store.subgraph_store(); + writable.deployment_synced(block_pointer(0)).unwrap(); + + for count in 1..=5 { + insert_count(&subgraph_store, &deployment, count, 2 * count, false).await; } - } - writable.flush().await.unwrap(); + writable.flush().await.unwrap(); + writable.deployment_synced(block_pointer(0)).unwrap(); - let br: Range = 4..8; - let et: &EntityType = if mutable { - &COUNTER_TYPE - } else { - &COUNTER2_TYPE - }; - let e = sourceable.get_range(et, br).unwrap(); - e.iter().map(|(_, v)| v.iter()).flatten().count() + let br: Range = 0..18; + let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()]; + let e: BTreeMap> = sourceable + .get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone()) + .unwrap(); + assert_eq!(e.len(), 5); + for en in &e { + let index = *en.0 - 1; + let a = result_entities[index as usize]; + assert_eq!(a, format!("{:?}", en)); + } + for count in 6..=7 { + insert_count(&subgraph_store, &deployment, count, 2 * count, false).await; + } + writable.flush().await.unwrap(); + writable.deployment_synced(block_pointer(0)).unwrap(); + let e: BTreeMap> = sourceable + .get_range(entity_types, CausalityRegion::ONCHAIN, br) + .unwrap(); + assert_eq!(e.len(), 7); + for en in &e { + let index = *en.0 - 1; + let a = result_entities[index as usize]; + assert_eq!(a, format!("{:?}", en)); + } + }) } #[test] -fn read_range_mutable() { - run_test( - |store, writable, sourceable: Arc, deployment| async move { - let num_entities = read_range(store, writable, sourceable, deployment, true).await; - assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open - }, - ) -} +fn read_immutable_only_range_test() { + run_test(|store, writable, sourceable, deployment| async move { + let subgraph_store = store.subgraph_store(); + writable.deployment_synced(block_pointer(0)).unwrap(); -#[test] -fn read_range_immutable() { - run_test( - |store, writable, sourceable: Arc, deployment| async move { - let num_entities = read_range(store, writable, sourceable, deployment, false).await; - assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open - }, - ) + for count in 1..=4 { + insert_count(&subgraph_store, &deployment, count, 2 * count, true).await; + } + writable.flush().await.unwrap(); + writable.deployment_synced(block_pointer(0)).unwrap(); + let br: Range = 0..18; + let entity_types = vec![COUNTER2_TYPE.clone()]; + let e: BTreeMap> = sourceable + .get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone()) + .unwrap(); + assert_eq!(e.len(), 4); + }) }