Skip to content

Commit

Permalink
Add casuality region to the SQL querry
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv authored and incrypto32 committed Sep 12, 2024
1 parent 74093de commit e03a549
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 18 deletions.
4 changes: 2 additions & 2 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::blockchain::SubgraphFilter;
use crate::data_source::subgraph;
use crate::data_source::{subgraph, CausalityRegion};
use crate::substreams::Clock;
use crate::substreams_rpc::response::Message as SubstreamsMessage;
use crate::substreams_rpc::BlockScopedData;
Expand Down Expand Up @@ -453,7 +453,7 @@ async fn get_entities_for_range(
let entity_type = schema.entity_type(entity_name)?;
entity_types.push(entity_type);
}
Ok(store.get_range(entity_types, from..to)?)
Ok(store.get_range(entity_types, CausalityRegion::ONCHAIN, from..to)?)
}

impl<C: Blockchain> TriggersAdapterWrapper<C> {
Expand Down
1 change: 1 addition & 0 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ impl ReadStore for EmptyStore {
fn get_range(
&self,
_entity_types: Vec<EntityType>,
_causality_region: CausalityRegion,
_block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
Ok(BTreeMap::new())
Expand Down
4 changes: 3 additions & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ pub trait ReadStore: Send + Sync + 'static {
fn get_range(
&self,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError>;

Expand Down Expand Up @@ -260,9 +261,10 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {
fn get_range(
&self,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
(**self).get_range(entity_types, block_range)
(**self).get_range(entity_types, causality_region, block_range)
}

fn get_derived(
Expand Down
3 changes: 2 additions & 1 deletion store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,11 +1060,12 @@ impl DeploymentStore {
&self,
site: Arc<Site>,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
let mut conn = self.get_conn()?;
let layout = self.layout(&mut conn, site)?;
layout.find_range(&mut conn, entity_types, block_range)
layout.find_range(&mut conn, entity_types, causality_region, block_range)
}

pub(crate) fn get_derived(
Expand Down
5 changes: 3 additions & 2 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ impl Layout {
&self,
conn: &mut PgConnection,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
let mut tables = vec![];
Expand All @@ -529,11 +530,11 @@ impl Layout {
et_map.insert(et.to_string(), Arc::new(et));
}
let mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>> = BTreeMap::new();
let lower_vec = FindRangeQuery::new(&tables, false, block_range.clone())
let lower_vec = FindRangeQuery::new(&tables, causality_region, false, block_range.clone())
.get_results::<EntityDataExt>(conn)
.optional()?
.unwrap_or_default();
let upper_vec = FindRangeQuery::new(&tables, true, block_range)
let upper_vec = FindRangeQuery::new(&tables, causality_region, true, block_range)
.get_results::<EntityDataExt>(conn)
.optional()?
.unwrap_or_default();
Expand Down
15 changes: 9 additions & 6 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,7 @@ impl<'a, Conn> RunQueryDsl<Conn> for FindQuery<'a> {}
#[derive(Debug, Clone)]
pub struct FindRangeQuery<'a> {
tables: &'a Vec<&'a Table>,
causality_region: CausalityRegion,
is_upper_range: bool,
imm_range: EntityBlockRange,
mut_range: EntityBlockRange,
Expand All @@ -2032,13 +2033,15 @@ pub struct FindRangeQuery<'a> {
impl<'a> FindRangeQuery<'a> {
pub fn new(
tables: &'a Vec<&Table>,
causality_region: CausalityRegion,
is_upper_range: bool,
block_range: Range<BlockNumber>,
) -> Self {
let imm_range = EntityBlockRange::new(true, block_range.clone(), false);
let mut_range = EntityBlockRange::new(false, block_range, is_upper_range);
Self {
tables,
causality_region,
is_upper_range,
imm_range,
mut_range,
Expand Down Expand Up @@ -2075,12 +2078,12 @@ impl<'a> QueryFragment<Pg> for FindRangeQuery<'a> {
out.push_sql(" from ");
out.push_sql(table.qualified_name.as_str());
out.push_sql(" e\n where");
// TODO: add casuality region to the query
// if self.table.has_causality_region {
// out.push_sql("causality_region = ");
// out.push_bind_param::<Integer, _>(&self.key.causality_region)?;
// out.push_sql(" and ");
// }
// add casuality region to the query
if table.has_causality_region {
out.push_sql("causality_region = ");
out.push_bind_param::<Integer, _>(&self.causality_region)?;
out.push_sql(" and ");
}
if table.immutable {
self.imm_range.contains(&mut out)?;
} else {
Expand Down
17 changes: 13 additions & 4 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,14 @@ impl SyncStore {
fn get_range(
&self,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
retry::forever(&self.logger, "get_range", || {
self.writable.get_range(
self.site.cheap_clone(),
entity_types.clone(),
causality_region,
block_range.clone(),
)
})
Expand Down Expand Up @@ -1234,9 +1236,11 @@ impl Queue {
fn get_range(
&self,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
self.store.get_range(entity_types, block_range)
self.store
.get_range(entity_types, causality_region, block_range)
}

fn get_derived(
Expand Down Expand Up @@ -1454,11 +1458,14 @@ impl Writer {
fn get_range(
&self,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
match self {
Writer::Sync(store) => store.get_range(entity_types, block_range),
Writer::Async { queue, .. } => queue.get_range(entity_types, block_range),
Writer::Sync(store) => store.get_range(entity_types, causality_region, block_range),
Writer::Async { queue, .. } => {
queue.get_range(entity_types, causality_region, block_range)
}
}
}

Expand Down Expand Up @@ -1593,9 +1600,11 @@ impl ReadStore for WritableStore {
fn get_range(
&self,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
self.writer.get_range(entity_types, block_range)
self.writer
.get_range(entity_types, causality_region, block_range)
}

fn get_derived(
Expand Down
1 change: 1 addition & 0 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl ReadStore for MockStore {
fn get_range(
&self,
_entity_types: Vec<EntityType>,
_causality_region: CausalityRegion,
_block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
todo!()
Expand Down
6 changes: 4 additions & 2 deletions store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ fn read_range_test() {
let br: Range<BlockNumber> = 0..18;
let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()];
let e: BTreeMap<i32, Vec<EntityWithType>> = writable
.get_range(entity_types.clone(), br.clone())
.get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone())
.unwrap();
assert_eq!(e.len(), 5);
for en in &e {
Expand All @@ -356,7 +356,9 @@ fn read_range_test() {
}
writable.flush().await.unwrap();
writable.deployment_synced().unwrap();
let e: BTreeMap<i32, Vec<EntityWithType>> = writable.get_range(entity_types, br).unwrap();
let e: BTreeMap<i32, Vec<EntityWithType>> = writable
.get_range(entity_types, CausalityRegion::ONCHAIN, br)
.unwrap();
assert_eq!(e.len(), 7);
for en in &e {
let index = *en.0 - 1;
Expand Down

0 comments on commit e03a549

Please sign in to comment.