Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: graphprotocol/graph-node
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: c7e3e8594f4be42e08cfc7dad19cd0b56c7c9567
Choose a base ref
..
head repository: graphprotocol/graph-node
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 86c318820e026eb71252e06cbaec964c2b5a9e9c
Choose a head ref
Showing with 1,095 additions and 402 deletions.
  1. +10 −12 chain/arweave/src/chain.rs
  2. +10 −12 chain/cosmos/src/chain.rs
  3. +10 −12 chain/ethereum/src/chain.rs
  4. +8 −4 chain/ethereum/src/runtime/runtime_adapter.rs
  5. +19 −18 chain/near/src/chain.rs
  6. +3 −0 chain/substreams/examples/substreams.rs
  7. +1 −0 chain/substreams/proto/codec.proto
  8. +3 −0 chain/substreams/src/block_ingestor.rs
  9. +40 −19 chain/substreams/src/block_stream.rs
  10. +99 −11 chain/substreams/src/data_source.rs
  11. +68 −14 chain/substreams/src/mapper.rs
  12. +6 −1 chain/substreams/src/trigger.rs
  13. +63 −3 core/src/subgraph/context.rs
  14. +413 −187 core/src/subgraph/runner.rs
  15. +2 −2 docs/sharding.md
  16. +22 −15 graph/src/blockchain/block_stream.rs
  17. +5 −4 graph/src/blockchain/substreams_block_stream.rs
  18. +12 −0 graph/src/components/subgraph/host.rs
  19. +5 −0 graph/src/env/store.rs
  20. +98 −8 runtime/wasm/src/host.rs
  21. +1 −1 runtime/wasm/src/host_exports.rs
  22. +74 −16 runtime/wasm/src/mapping.rs
  23. +15 −0 runtime/wasm/src/module/mod.rs
  24. +23 −0 store/postgres/migrations/2023-12-14-160628_change_health_column_format/down.sql
  25. +27 −0 store/postgres/migrations/2023-12-14-160628_change_health_column_format/up.sql
  26. +55 −2 store/postgres/src/chain_store.rs
  27. +1 −0 store/postgres/src/deployment.rs
  28. +0 −1 substreams/substreams-head-tracker/Cargo.toml
  29. +0 −1 substreams/substreams-trigger-filter/Cargo.toml
  30. +1 −59 substreams/substreams-trigger-filter/src/lib.rs
  31. BIN substreams/substreams-trigger-filter/substreams-trigger-filter-v0.1.0.spkg
  32. +1 −0 substreams/substreams-trigger-filter/substreams.yaml
22 changes: 10 additions & 12 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ use crate::{
codec,
data_source::{DataSource, UnresolvedDataSource},
};
use graph::blockchain::block_stream::{BlockStream, FirehoseCursor, SubstreamsMapper};
use graph::blockchain::block_stream::{BlockStream, BlockStreamMapper, FirehoseCursor};

pub struct Chain {
logger_factory: LoggerFactory,
@@ -258,13 +258,10 @@ pub struct FirehoseMapper {
}

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<codec::Block>, Error> {
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<codec::Block>, Error> {
let block = match output {
Some(block) => codec::Block::decode(block.value.as_ref())?,
Some(block) => codec::Block::decode(block)?,
None => anyhow::bail!("Arweave mapper is expected to always have a block"),
};

@@ -280,12 +277,13 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
.triggers_in_block(logger, block, self.filter.as_ref())
.await
}
async fn decode_triggers(
async fn handle_substreams_block(
&self,
_logger: &Logger,
_clock: &Clock,
_block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
unimplemented!()
}
}
@@ -321,7 +319,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode_block(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(&any_block.value.as_ref()))?.unwrap();

use ForkStep::*;
match step {
22 changes: 10 additions & 12 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
use std::sync::Arc;

use graph::blockchain::block_stream::{FirehoseCursor, SubstreamsMapper};
use graph::blockchain::block_stream::{BlockStreamMapper, FirehoseCursor};
use graph::blockchain::client::ChainClient;
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
use graph::cheap_clone::CheapClone;
@@ -331,13 +331,10 @@ pub struct FirehoseMapper {
}

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<crate::Block>, Error> {
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<crate::Block>, Error> {
let block = match output {
Some(block) => crate::Block::decode(block.value.as_ref())?,
Some(block) => crate::Block::decode(block)?,
None => anyhow::bail!("cosmos mapper is expected to always have a block"),
};

@@ -354,12 +351,13 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
.await
}

async fn decode_triggers(
async fn handle_substreams_block(
&self,
_logger: &Logger,
_clock: &Clock,
_block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
unimplemented!()
}
}
@@ -395,7 +393,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down struct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode_block(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();

match step {
ForkStep::StepNew => Ok(BlockStreamEvent::ProcessBlock(
22 changes: 10 additions & 12 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ use crate::{
SubgraphEthRpcMetrics, TriggerFilter, ENV_VARS,
};
use graph::blockchain::block_stream::{
BlockStream, BlockStreamBuilder, FirehoseCursor, SubstreamsMapper,
BlockStream, BlockStreamBuilder, BlockStreamMapper, FirehoseCursor,
};

/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
@@ -734,13 +734,10 @@ pub struct FirehoseMapper {
}

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<BlockFinality>, Error> {
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<BlockFinality>, Error> {
let block = match output {
Some(block) => codec::Block::decode(block.value.as_ref())?,
Some(block) => codec::Block::decode(block)?,
None => anyhow::bail!("ethereum mapper is expected to always have a block"),
};

@@ -761,12 +758,13 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
.await
}

async fn decode_triggers(
async fn handle_substreams_block(
&self,
_logger: &Logger,
_clock: &Clock,
_block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
unimplemented!()
}
}
@@ -806,7 +804,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
match step {
StepNew => {
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode_block(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();
let block_with_triggers = self.block_with_triggers(logger, block).await?;

Ok(BlockStreamEvent::ProcessBlock(
12 changes: 8 additions & 4 deletions chain/ethereum/src/runtime/runtime_adapter.rs
Original file line number Diff line number Diff line change
@@ -155,7 +155,8 @@ fn eth_call(
of the subgraph manifest",
unresolved_call.contract_name
)
})?
})
.map_err(HostExportError::Deterministic)?
.contract
.clone();

@@ -170,7 +171,8 @@ fn eth_call(
"Unknown function \"{}::{}\" called from WASM runtime",
unresolved_call.contract_name, unresolved_call.function_name
)
})?,
})
.map_err(HostExportError::Deterministic)?,

// Behavior for apiVersion >= 0.0.04: look up function by signature of
// the form `functionName(uint256,string) returns (bytes32,string)`; this
@@ -182,7 +184,8 @@ fn eth_call(
"Unknown function \"{}::{}\" called from WASM runtime",
unresolved_call.contract_name, unresolved_call.function_name
)
})?
})
.map_err(HostExportError::Deterministic)?
.iter()
.find(|f| function_signature == &f.signature())
.with_context(|| {
@@ -193,7 +196,8 @@ fn eth_call(
unresolved_call.function_name,
function_signature,
)
})?,
})
.map_err(HostExportError::Deterministic)?,
};

let call = EthereumContractCall {
37 changes: 19 additions & 18 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
@@ -40,11 +40,11 @@ use crate::{
data_source::{DataSource, UnresolvedDataSource},
};
use graph::blockchain::block_stream::{
BlockStream, BlockStreamBuilder, FirehoseCursor, SubstreamsMapper,
BlockStream, BlockStreamBuilder, BlockStreamMapper, FirehoseCursor,
};

const NEAR_FILTER_MODULE_NAME: &str = "near_filter";
const SUBSTREAMS_TRIGGER_FILTER_BYTES: &[u8; 497306] = include_bytes!(
const SUBSTREAMS_TRIGGER_FILTER_BYTES: &[u8; 510162] = include_bytes!(
"../../../substreams/substreams-trigger-filter/substreams-trigger-filter-v0.1.0.spkg"
);

@@ -407,13 +407,10 @@ pub struct FirehoseMapper {
}

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<codec::Block>, Error> {
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<codec::Block>, Error> {
let block = match output {
Some(block) => codec::Block::decode(block.value.as_ref())?,
Some(block) => codec::Block::decode(block)?,
None => anyhow::bail!("near mapper is expected to always have a block"),
};

@@ -430,17 +427,18 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
.await
}

async fn decode_triggers(
async fn handle_substreams_block(
&self,
_logger: &Logger,
_clock: &Clock,
message: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
_clock: Clock,
cursor: FirehoseCursor,
message: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
let BlockAndReceipts {
block,
outcome,
receipt,
} = BlockAndReceipts::decode(message.value.as_ref())?;
} = BlockAndReceipts::decode(message.as_ref())?;
let block = block.ok_or_else(|| anyhow!("near block is mandatory on substreams"))?;
let arc_block = Arc::new(block.clone());

@@ -456,10 +454,13 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
})
.collect();

Ok(BlockWithTriggers {
block,
trigger_data,
})
Ok(BlockStreamEvent::ProcessBlock(
BlockWithTriggers {
block,
trigger_data,
},
cursor,
))
}
}

@@ -494,7 +495,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode_block(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();

use ForkStep::*;
match step {
3 changes: 3 additions & 0 deletions chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
@@ -88,6 +88,9 @@ async fn main() -> Result<(), Error> {
Some(event) => match event {
Err(_) => {}
Ok(block_stream_event) => match block_stream_event {
BlockStreamEvent::ProcessWasmBlock(_, _, _, _) => {
unreachable!("Cannot happen with this mapper")
}
BlockStreamEvent::Revert(_, _) => {}
BlockStreamEvent::ProcessBlock(block_with_trigger, _) => {
for change in block_with_trigger.block.changes.entity_changes {
1 change: 1 addition & 0 deletions chain/substreams/proto/codec.proto
Original file line number Diff line number Diff line change
@@ -44,3 +44,4 @@ message Field {
optional Value new_value = 3;
optional Value old_value = 5;
}

3 changes: 3 additions & 0 deletions chain/substreams/src/block_ingestor.rs
Original file line number Diff line number Diff line change
@@ -74,6 +74,9 @@ impl SubstreamsBlockIngestor {

while let Some(message) = stream.next().await {
let (block, cursor) = match message {
Ok(BlockStreamEvent::ProcessWasmBlock(_block_ptr, _data, _handler, _cursor)) => {
unreachable!("Block ingestor should never receive raw blocks");
}
Ok(BlockStreamEvent::ProcessBlock(triggers, cursor)) => {
(Arc::new(triggers.block), cursor)
}
59 changes: 40 additions & 19 deletions chain/substreams/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,10 @@ use graph::{
slog::o,
};

use crate::{mapper::Mapper, Chain, TriggerFilter};
use crate::{
mapper::{Mapper, WasmBlockMapper},
Chain, TriggerFilter,
};

pub struct BlockStreamBuilder {}

@@ -40,29 +43,47 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
subgraph_current_block: Option<BlockPtr>,
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
) -> Result<Box<dyn BlockStream<Chain>>> {
let mapper = Arc::new(Mapper {
schema: Some(schema),
skip_empty_blocks: true,
});

let logger = chain
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "SubstreamsBlockStream"));

Ok(Box::new(SubstreamsBlockStream::new(
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
mapper,
filter.modules.clone(),
filter.module_name.clone(),
filter.start_block.map(|x| vec![x]).unwrap_or_default(),
vec![],
logger,
chain.metrics_registry.clone(),
)))
let stream = match &filter.mapping_handler {
Some(handler) => SubstreamsBlockStream::new(
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
Arc::new(WasmBlockMapper {
handler: handler.clone(),
}),
filter.modules.clone(),
filter.module_name.clone(),
filter.start_block.map(|x| vec![x]).unwrap_or_default(),
vec![],
logger,
chain.metrics_registry.clone(),
),

None => SubstreamsBlockStream::new(
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
Arc::new(Mapper {
schema: Some(schema),
skip_empty_blocks: true,
}),
filter.modules.clone(),
filter.module_name.clone(),
filter.start_block.map(|x| vec![x]).unwrap_or_default(),
vec![],
logger,
chain.metrics_registry.clone(),
),
};

Ok(Box::new(stream))
}

async fn build_firehose(
Loading