Skip to content

Commit

Permalink
Subgraph composition: sql more entities
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Jan 31, 2025
1 parent 1fc017f commit 868060b
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 220 deletions.
44 changes: 17 additions & 27 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::blockchain::SubgraphFilter;
use crate::data_source::subgraph;
use crate::data_source::{subgraph, CausalityRegion};
use crate::substreams::Clock;
use crate::substreams_rpc::response::Message as SubstreamsMessage;
use crate::substreams_rpc::BlockScopedData;
Expand Down Expand Up @@ -433,9 +433,19 @@ async fn scan_subgraph_triggers<C: Blockchain>(
}
}

#[derive(Debug)]
pub enum EntitySubgraphOperation {
Create,
Modify,
Delete,
}

#[derive(Debug)]
pub struct EntityWithType {
pub entity_op: EntitySubgraphOperation,
pub entity_type: EntityType,
pub entity: Entity,
pub vid: i64,
}

async fn get_entities_for_range(
Expand All @@ -445,32 +455,12 @@ async fn get_entities_for_range(
from: BlockNumber,
to: BlockNumber,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, Error> {
let mut entities_by_block = BTreeMap::new();

for entity_name in &filter.entities {
let entity_type = schema.entity_type(entity_name)?;

let entity_ranges = store.get_range(&entity_type, from..to)?;

for (block_number, entity_vec) in entity_ranges {
let mut entity_vec = entity_vec
.into_iter()
.map(|e| EntityWithType {
entity_type: entity_type.clone(),
entity: e,
})
.collect();

entities_by_block
.entry(block_number)
.and_modify(|existing_vec: &mut Vec<EntityWithType>| {
existing_vec.append(&mut entity_vec);
})
.or_insert(entity_vec);
}
}

Ok(entities_by_block)
let entity_types: Result<Vec<EntityType>> = filter
.entities
.iter()
.map(|name| schema.entity_type(name))
.collect();
Ok(store.get_range(entity_types?, CausalityRegion::ONCHAIN, from..to)?)
}

impl<C: Blockchain> TriggersAdapterWrapper<C> {
Expand Down
14 changes: 8 additions & 6 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use web3::types::{Address, H256};

use super::*;
use crate::blockchain::block_stream::FirehoseCursor;
use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor};
use crate::blockchain::{BlockTime, ChainIdentifier};
use crate::components::metrics::stopwatch::StopwatchMetrics;
use crate::components::server::index_node::VersionInfo;
Expand Down Expand Up @@ -299,9 +299,10 @@ pub trait SourceableStore: Sync + Send + 'static {
/// changed in the given block_range.
fn get_range(
&self,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError>;

fn input_schema(&self) -> InputSchema;

Expand All @@ -314,10 +315,11 @@ pub trait SourceableStore: Sync + Send + 'static {
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
fn get_range(
&self,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
(**self).get_range(entity_type, block_range)
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
(**self).get_range(entity_types, causality_region, block_range)
}

fn input_schema(&self) -> InputSchema {
Expand Down
41 changes: 31 additions & 10 deletions store/postgres/src/block_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,36 +132,52 @@ impl<'a> QueryFragment<Pg> for BlockRangeUpperBoundClause<'a> {
}
}

#[derive(Debug, Clone, Copy)]
pub enum BoundSide {
Lower,
Upper,
}

/// Helper for generating SQL fragments for selecting entities in a specific block range
#[derive(Debug, Clone, Copy)]
pub enum EntityBlockRange {
Mutable(BlockRange), // TODO: check if this is a proper type here (maybe Range<BlockNumber>?)
Mutable((BlockRange, BoundSide)),
Immutable(BlockRange),
}

impl EntityBlockRange {
pub fn new(table: &Table, block_range: std::ops::Range<BlockNumber>) -> Self {
pub fn new(
immutable: bool,
block_range: std::ops::Range<BlockNumber>,
bound_side: BoundSide,
) -> Self {
let start: Bound<BlockNumber> = Bound::Included(block_range.start);
let end: Bound<BlockNumber> = Bound::Excluded(block_range.end);
let block_range: BlockRange = BlockRange(start, end);
if table.immutable {
if immutable {
Self::Immutable(block_range)
} else {
Self::Mutable(block_range)
Self::Mutable((block_range, bound_side))
}
}

/// Output SQL that matches only rows whose block range contains `block`.
/// Outputs SQL that matches only rows whose entities would trigger a change
/// event (Create, Modify, Delete) in a given interval of blocks. Otherwise said
/// a block_range border is contained in an interval of blocks. For instance
/// one of the following:
/// lower(block_range) >= $1 and lower(block_range) <= $2
/// upper(block_range) >= $1 and upper(block_range) <= $2
/// block$ >= $1 and block$ <= $2
pub fn contains<'b>(&'b self, out: &mut AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();
let block_range = match self {
EntityBlockRange::Mutable(br) => br,
EntityBlockRange::Mutable((br, _)) => br,
EntityBlockRange::Immutable(br) => br,
};
let BlockRange(start, finish) = block_range;

self.compare_column(out);
out.push_sql(" >= ");
out.push_sql(">= ");
match start {
Bound::Included(block) => out.push_bind_param::<Integer, _>(block)?,
Bound::Excluded(block) => {
Expand All @@ -170,9 +186,9 @@ impl EntityBlockRange {
}
Bound::Unbounded => unimplemented!(),
};
out.push_sql(" AND ");
out.push_sql(" and");
self.compare_column(out);
out.push_sql(" <= ");
out.push_sql("<= ");
match finish {
Bound::Included(block) => {
out.push_bind_param::<Integer, _>(block)?;
Expand All @@ -186,7 +202,12 @@ impl EntityBlockRange {

pub fn compare_column(&self, out: &mut AstPass<Pg>) {
match self {
EntityBlockRange::Mutable(_) => out.push_sql(" lower(block_range) "),
EntityBlockRange::Mutable((_, BoundSide::Lower)) => {
out.push_sql(" lower(block_range) ")
}
EntityBlockRange::Mutable((_, BoundSide::Upper)) => {
out.push_sql(" upper(block_range) ")
}
EntityBlockRange::Immutable(_) => out.push_sql(" block$ "),
}
}
Expand Down
9 changes: 5 additions & 4 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use diesel::{prelude::*, sql_query};
use graph::anyhow::Context;
use graph::blockchain::block_stream::FirehoseCursor;
use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor};
use graph::blockchain::BlockTime;
use graph::components::store::write::RowGroup;
use graph::components::store::{
Expand Down Expand Up @@ -1066,12 +1066,13 @@ impl DeploymentStore {
pub(crate) fn get_range(
&self,
site: Arc<Site>,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
let mut conn = self.get_conn()?;
let layout = self.layout(&mut conn, site)?;
layout.find_range(&mut conn, entity_type, block_range)
layout.find_range(&mut conn, entity_types, causality_region, block_range)
}

pub(crate) fn get_derived(
Expand Down
141 changes: 125 additions & 16 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use diesel::{connection::SimpleConnection, Connection};
use diesel::{
debug_query, sql_query, OptionalExtension, PgConnection, QueryDsl, QueryResult, RunQueryDsl,
};
use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
use graph::blockchain::BlockTime;
use graph::cheap_clone::CheapClone;
use graph::components::store::write::{RowGroup, WriteChunk};
Expand Down Expand Up @@ -57,8 +58,8 @@ use std::time::{Duration, Instant};

use crate::relational::value::{FromOidRow, OidRow};
use crate::relational_queries::{
ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery,
FindPossibleDeletionsQuery, ReturnedEntityData,
ConflictingEntitiesData, ConflictingEntitiesQuery, EntityDataExt, FindChangesQuery,
FindDerivedQuery, FindPossibleDeletionsQuery, ReturnedEntityData,
};
use crate::{
primary::{Namespace, Site},
Expand All @@ -75,7 +76,7 @@ use graph::prelude::{
QueryExecutionError, StoreError, StoreEvent, ValueType, BLOCK_NUMBER_MAX,
};

use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
pub use crate::catalog::Catalog;
use crate::connection_pool::ForeignServer;
use crate::{catalog, deployment};
Expand Down Expand Up @@ -545,21 +546,129 @@ impl Layout {
pub fn find_range(
&self,
conn: &mut PgConnection,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
let table = self.table_for_entity(entity_type)?;
let mut entities: BTreeMap<BlockNumber, Vec<Entity>> = BTreeMap::new();
if let Some(vec) = FindRangeQuery::new(table.as_ref(), block_range)
.get_results::<EntityData>(conn)
.optional()?
{
for e in vec {
let block = e.clone().deserialize_block_number::<Entity>()?;
let en = e.deserialize_with_layout::<Entity>(self, None)?;
entities.entry(block).or_default().push(en);
}
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
let mut tables = vec![];
for et in entity_types {
tables.push(self.table_for_entity(&et)?.as_ref());
}
let mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>> = BTreeMap::new();

// Collect all entities that have their 'lower(block_range)' attribute in the
// interval of blocks defined by the variable block_range. For the immutable
// entities the respective attribute is 'block$'.
// Here are all entities that are created or modified in the block_range.
let lower_vec = FindRangeQuery::new(
&tables,
causality_region,
BoundSide::Lower,
block_range.clone(),
)
.get_results::<EntityDataExt>(conn)
.optional()?
.unwrap_or_default();
// Collect all entities that have their 'upper(block_range)' attribute in the
// interval of blocks defined by the variable block_range. For the immutable
// entities no entries are returned.
// Here are all entities that are modified or deleted in the block_range,
// but will have the previous versions, i.e. in the case of an update, it's
// the version before the update, and lower_vec will have a corresponding
// entry with the new version.
let upper_vec =
FindRangeQuery::new(&tables, causality_region, BoundSide::Upper, block_range)
.get_results::<EntityDataExt>(conn)
.optional()?
.unwrap_or_default();
let mut lower_iter = lower_vec.iter().fuse().peekable();
let mut upper_iter = upper_vec.iter().fuse().peekable();
let mut lower_now = lower_iter.next();
let mut upper_now = upper_iter.next();
// A closure to convert the entity data from the database into entity operation.
let transform = |ede: &EntityDataExt,
entity_op: EntitySubgraphOperation|
-> Result<(EntityWithType, BlockNumber), StoreError> {
let e = EntityData::new(ede.entity.clone(), ede.data.clone());
let block = ede.block_number;
let entity_type = e.entity_type(&self.input_schema);
let entity = e.deserialize_with_layout::<Entity>(self, None)?;
let vid = ede.vid;
let ewt = EntityWithType {
entity_op,
entity_type,
entity,
vid,
};
Ok((ewt, block))
};

// The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors
// are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from
// both lower_vec and upper_vec and tries to match entities that have entries in both vectors for
// a particular block. The match is successful if an entry in one array has the same values in the
// other one for the number of the block, entity type and the entity id. The comparison operation
// over the EntityDataExt implements that check. If there is a match it’s a modification operation,
// since both sides of a range are present for that block, entity type and id. If one side of the
// range exists and the other is missing it is a creation or deletion depending on which side is
// present. For immutable entities the entries in upper_vec are missing, hence they are considered
// having a lower bound at particular block and upper bound at infinity.
while lower_now.is_some() || upper_now.is_some() {
let (ewt, block) = match (lower_now, upper_now) {
(Some(lower), Some(upper)) => {
match lower.cmp(&upper) {
std::cmp::Ordering::Greater => {
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
// advance upper_vec pointer
upper_now = upper_iter.next();
(ewt, block)
}
std::cmp::Ordering::Less => {
// we have lower bound at this block but no upper bound at the same block so its creation
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
// advance lower_vec pointer
lower_now = lower_iter.next();
(ewt, block)
}
std::cmp::Ordering::Equal => {
let (ewt, block) = transform(lower, EntitySubgraphOperation::Modify)?;
// advance both lower_vec and upper_vec pointers
lower_now = lower_iter.next();
upper_now = upper_iter.next();
(ewt, block)
}
}
}
(Some(lower), None) => {
// we have lower bound at this block but no upper bound at the same block so its creation
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
// advance lower_vec pointer
lower_now = lower_iter.next();
(ewt, block)
}
(None, Some(upper)) => {
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
// advance upper_vec pointer
upper_now = upper_iter.next();
(ewt, block)
}
_ => panic!("Imposible case to happen"),
};

match entities.get_mut(&block) {
Some(vec) => vec.push(ewt),
None => {
let _ = entities.insert(block, vec![ewt]);
}
};
}

// sort the elements in each blocks bucket by vid
for (_, vec) in &mut entities {
vec.sort_by(|a, b| a.vid.cmp(&b.vid));
}

Ok(entities)
}

Expand Down
Loading

0 comments on commit 868060b

Please sign in to comment.