Skip to content

Commit

Permalink
Subgraph Composition: Entity Ops Detection in Handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Jan 31, 2025
1 parent 868060b commit cf4951c
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 51 deletions.
32 changes: 13 additions & 19 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,13 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {

fn create_subgraph_trigger_from_entities(
filter: &SubgraphFilter,
entities: &Vec<EntityWithType>,
entities: Vec<EntityWithType>,
) -> Vec<subgraph::TriggerData> {
entities
.iter()
.map(|e| subgraph::TriggerData {
.into_iter()
.map(|entity| subgraph::TriggerData {
source: filter.subgraph.clone(),
entity: e.entity.clone(),
entity_type: e.entity_type.as_str().to_string(),
entity,
})
.collect()
}
Expand All @@ -373,25 +372,20 @@ async fn create_subgraph_triggers<C: Blockchain>(
logger: Logger,
blocks: Vec<C::Block>,
filter: &SubgraphFilter,
entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let logger_clone = logger.cheap_clone();

let blocks: Vec<BlockWithTriggers<C>> = blocks
.into_iter()
.map(|block| {
let block_number = block.number();
match entities.get(&block_number) {
Some(e) => {
let trigger_data = create_subgraph_trigger_from_entities(filter, e);
BlockWithTriggers::new_with_subgraph_triggers(
block,
trigger_data,
&logger_clone,
)
}
None => BlockWithTriggers::new_with_subgraph_triggers(block, vec![], &logger_clone),
}
let trigger_data = entities
.remove(&block_number)
.map(|e| create_subgraph_trigger_from_entities(filter, e))
.unwrap_or_else(Vec::new);

BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger_clone)
})
.collect();

Expand Down Expand Up @@ -433,14 +427,14 @@ async fn scan_subgraph_triggers<C: Blockchain>(
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum EntitySubgraphOperation {
Create,
Modify,
Delete,
}

#[derive(Debug)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EntityWithType {
pub entity_op: EntitySubgraphOperation,
pub entity_type: EntityType,
Expand Down
24 changes: 10 additions & 14 deletions graph/src/data_source/subgraph.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::{
blockchain::{Block, Blockchain},
components::{
link_resolver::LinkResolver,
store::{BlockNumber, Entity},
},
blockchain::{block_stream::EntityWithType, Block, Blockchain},
components::{link_resolver::LinkResolver, store::BlockNumber},
data::{subgraph::SPEC_VERSION_1_3_0, value::Word},
data_source,
prelude::{DataSourceContext, DeploymentHash, Link},
Expand Down Expand Up @@ -76,7 +73,7 @@ impl DataSource {
}

let trigger_ref = self.mapping.handlers.iter().find_map(|handler| {
if handler.entity != trigger.entity_type {
if handler.entity != trigger.entity_type() {
return None;
}

Expand Down Expand Up @@ -281,17 +278,16 @@ impl UnresolvedDataSourceTemplate {
#[derive(Clone, PartialEq, Eq)]
pub struct TriggerData {
pub source: DeploymentHash,
pub entity: Entity,
pub entity_type: String,
pub entity: EntityWithType,
}

impl TriggerData {
pub fn new(source: DeploymentHash, entity: Entity, entity_type: String) -> Self {
Self {
source,
entity,
entity_type,
}
pub fn new(source: DeploymentHash, entity: EntityWithType) -> Self {
Self { source, entity }
}

pub fn entity_type(&self) -> &str {
self.entity.entity_type.as_str()
}
}

Expand Down
3 changes: 3 additions & 0 deletions graph/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ pub enum IndexForAscTypeId {
// ...
// LastStarknetType = 4499,

// Subgraph Data Source types
AscEntityTrigger = 4500,

// Reserved discriminant space for a future blockchain type IDs: [4,500, 5,499]
//
// Generated with the following shell script:
Expand Down
2 changes: 1 addition & 1 deletion runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ToAscPtr for subgraph::TriggerData {
heap: &mut H,
gas: &GasCounter,
) -> Result<AscPtr<()>, HostExportError> {
asc_new(heap, &self.entity.sorted_ref(), gas).map(|ptr| ptr.erase())
asc_new(heap, &self.entity, gas).map(|ptr| ptr.erase())
}
}

Expand Down
45 changes: 44 additions & 1 deletion runtime/wasm/src/to_from/external.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use ethabi;

use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
use graph::data::store::scalar::Timestamp;
use graph::data::value::Word;
use graph::prelude::{BigDecimal, BigInt};
use graph::runtime::gas::GasCounter;
use graph::runtime::{
asc_get, asc_new, AscIndexId, AscPtr, AscType, AscValue, HostExportError, ToAscObj,
asc_get, asc_new, AscIndexId, AscPtr, AscType, AscValue, HostExportError, IndexForAscTypeId,
ToAscObj,
};
use graph::{data::store, runtime::DeterministicHostError};
use graph::{prelude::serde_json, runtime::FromAscObj};
use graph::{prelude::web3::types as web3, runtime::AscHeap};
use graph_runtime_derive::AscType;

use crate::asc_abi::class::*;

Expand Down Expand Up @@ -463,3 +466,43 @@ where
})
}
}

#[derive(Debug, Clone, Eq, PartialEq, AscType)]
pub enum AscSubgraphEntityOp {
Create,
Modify,
Delete,
}

#[derive(AscType)]
pub struct AscEntityTrigger {
pub entity_op: AscSubgraphEntityOp,
pub entity_type: AscPtr<AscString>,
pub entity: AscPtr<AscEntity>,
pub vid: i64,
}

impl ToAscObj<AscEntityTrigger> for EntityWithType {
fn to_asc_obj<H: AscHeap + ?Sized>(
&self,
heap: &mut H,
gas: &GasCounter,
) -> Result<AscEntityTrigger, HostExportError> {
let entity_op = match self.entity_op {
EntitySubgraphOperation::Create => AscSubgraphEntityOp::Create,
EntitySubgraphOperation::Modify => AscSubgraphEntityOp::Modify,
EntitySubgraphOperation::Delete => AscSubgraphEntityOp::Delete,
};

Ok(AscEntityTrigger {
entity_op,
entity_type: asc_new(heap, &self.entity_type.as_str(), gas)?,
entity: asc_new(heap, &self.entity.sorted_ref(), gas)?,
vid: self.vid,
})
}
}

impl AscIndexId for AscEntityTrigger {
const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::AscEntityTrigger;
}
3 changes: 2 additions & 1 deletion tests/integration-tests/source-subgraph/schema.graphql
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@

type Block @entity {
id: ID!
number: BigInt!
hash: Bytes!
testMessage: String
}

type Block2 @entity {
id: ID!
number: BigInt!
hash: Bytes!
testMessage: String
}
34 changes: 33 additions & 1 deletion tests/integration-tests/source-subgraph/src/mapping.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ethereum, log } from '@graphprotocol/graph-ts';
import { ethereum, log, store } from '@graphprotocol/graph-ts';
import { Block, Block2 } from '../generated/schema';
import { BigInt } from '@graphprotocol/graph-ts';

Expand All @@ -22,4 +22,36 @@ export function handleBlock(block: ethereum.Block): void {
blockEntity3.number = block.number;
blockEntity3.hash = block.hash;
blockEntity3.save();

if (block.number.equals(BigInt.fromI32(1))) {
let id = 'TEST';
let entity = new Block(id);
entity.number = block.number;
entity.hash = block.hash;
entity.testMessage = 'Created at block 1';
log.info('Created entity at block 1', []);
entity.save();
}

if (block.number.equals(BigInt.fromI32(2))) {
let id = 'TEST';
let blockEntity1 = Block.load(id);
if (blockEntity1) {
// Update the block entity
blockEntity1.testMessage = 'Updated at block 2';
log.info('Updated entity at block 2', []);
blockEntity1.save();
}
}

if (block.number.equals(BigInt.fromI32(3))) {
let id = 'TEST';
let blockEntity1 = Block.load(id);
if (blockEntity1) {
blockEntity1.testMessage = 'Deleted at block 3';
log.info('Deleted entity at block 3', []);
blockEntity1.save();
store.remove('Block', id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ type MirrorBlock @entity {
id: String!
number: BigInt!
hash: Bytes!
testMessage: String
}
37 changes: 34 additions & 3 deletions tests/integration-tests/subgraph-data-sources/src/mapping.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,46 @@
import { Entity, log } from '@graphprotocol/graph-ts';
import { Entity, log, store } from '@graphprotocol/graph-ts';
import { MirrorBlock } from '../generated/schema';

export function handleEntity(blockEntity: Entity): void {
export class EntityTrigger {
constructor(
public entityOp: u32,
public entityType: string,
public entity: Entity,
public vid: i64,
) {}
}

export function handleEntity(trigger: EntityTrigger): void {
let blockEntity = trigger.entity;
let blockNumber = blockEntity.getBigInt('number');
let blockHash = blockEntity.getBytes('hash');
let testMessage = blockEntity.get('testMessage');
let id = blockEntity.getString('id');

log.info('Block number: {}', [blockNumber.toString()]);

let block = new MirrorBlock(id);
if (trigger.entityOp == 2) {
log.info('Removing block entity with id: {}', [id]);
store.remove('MirrorBlock', id);
return;
}

let block = loadOrCreateMirrorBlock(id);
block.number = blockNumber;
block.hash = blockHash;
if (testMessage) {
block.testMessage = testMessage.toString();
}

block.save();
}

export function loadOrCreateMirrorBlock(id: string): MirrorBlock {
let block = MirrorBlock.load(id);
if (!block) {
log.info('Creating new block entity with id: {}', [id]);
block = new MirrorBlock(id);
}

return block;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dataSources:
name: Contract
network: test
source:
address: 'QmeZhEiJuBusu7GxCe6AytvqSsgwV8QxkbSYx5ojSFB28a'
address: 'Qmaqf8cRxfxbduZppSHKG9DMuX5JZPMoGuwGb2DQuo48sq'
startBlock: 0
mapping:
apiVersion: 0.0.7
Expand Down
33 changes: 31 additions & 2 deletions tests/runner-tests/subgraph-data-sources/src/mapping.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
import { Entity, log } from '@graphprotocol/graph-ts';

export function handleBlock(content: Entity): void {
let stringContent = content.getString('val');
export const SubgraphEntityOpCreate: u32 = 0;
export const SubgraphEntityOpModify: u32 = 1;
export const SubgraphEntityOpDelete: u32 = 2;

export class EntityTrigger {
constructor(
public entityOp: u32,
public entityType: string,
public entity: Entity,
public vid: i64,
) {}
}

export function handleBlock(content: EntityTrigger): void {
let stringContent = content.entity.getString('val');
log.info('Content: {}', [stringContent]);
log.info('EntityOp: {}', [content.entityOp.toString()]);

switch (content.entityOp) {
case SubgraphEntityOpCreate: {
log.info('Entity created: {}', [content.entityType]);
break
}
case SubgraphEntityOpModify: {
log.info('Entity modified: {}', [content.entityType]);
break;
}
case SubgraphEntityOpDelete: {
log.info('Entity deleted: {}', [content.entityType]);
break;
}
}
}
Loading

0 comments on commit cf4951c

Please sign in to comment.