Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fe83cab
feat(graph): add Nozzle Flight service client
isum Sep 25, 2025
deaacc5
feat(graph): add Nozzle stream aggregator
isum Sep 25, 2025
59119fa
feat(graph): add Nozzle data decoder
isum Sep 25, 2025
93f5db1
feat(graph): add SQL query parser, resolver and validator
isum Sep 25, 2025
e91b765
feat(graph): use a new identifier type in Nozzle related modules
isum Sep 25, 2025
e211936
feat(graph): add Nozzle Subgraph schema generation
isum Sep 25, 2025
75227c8
feat(graph): add Nozzle Subgraph manifest
isum Sep 25, 2025
56e5c3a
feat(graph): add reorg handling to the Nozzle FlightClient
isum Oct 28, 2025
63e8180
feat(graph, core): extend SubgraphInstanceManager trait
isum Oct 28, 2025
b42ea17
feat(core, graph, node): allow multiple subgraph instance managers
isum Oct 28, 2025
c832650
fix(graph): update deterministic error patterns in Nozzle Flight client
isum Oct 28, 2025
4047a09
feat(graph): add Nozzle related ENV variables
isum Oct 28, 2025
66e73ba
fix(graph): make block range filter return a new query
isum Oct 28, 2025
6c7b14b
feat(graph): add decoding utilities
isum Oct 28, 2025
600fa7b
fix(graph): use decoding utilities in the stream aggregator
isum Oct 28, 2025
3f2b822
feat(graph): add more details to Nozzle data sources
isum Oct 28, 2025
99eb9f1
feat(core, graph, node): add Nozzle subgraph deployment
isum Oct 28, 2025
db4ca77
feat(graph): add a dedicated Nozzle manifest resolver
isum Oct 28, 2025
f30f8ea
feat(node): add shutdown token
isum Oct 28, 2025
aaf010c
feat(core, graph): add Nozzle subgraph runner
isum Oct 28, 2025
e4fb5a9
chore(all): rename Nozzle to Amp
isum Oct 29, 2025
6ed844a
fix(graph): produce consistent query hashes for logging
isum Nov 5, 2025
b9dffb0
fix(core, graph): simplify SQL query requirements
isum Nov 5, 2025
86ec87b
chore(graph): fix typos
isum Nov 5, 2025
912541f
fix(graph): use nozzle-resume header name
isum Nov 5, 2025
c3e75db
fix(graph): extend common column aliases
isum Nov 6, 2025
214bc56
fix(core, graph): use named streams in the stream aggregator
isum Nov 6, 2025
4161ae3
fix(core, graph): simplify working with identifiers
isum Nov 6, 2025
730afe3
fix(graph): validate query output column names
isum Nov 6, 2025
ddf9fc6
fix(graph): support all versions of the Amp server
isum Nov 6, 2025
d4e3cb7
fix(graph): extend the list of common column aliases
isum Nov 11, 2025
77d6945
test(graph): add decoder unit-tests
isum Nov 11, 2025
b596830
feat(core, graph): add Amp subgraph metrics
isum Nov 18, 2025
dbd57c5
fix(graph): allow more complex dataset and table names
isum Nov 20, 2025
e5cfec0
fix(graph): remove CTE name requirements
isum Nov 20, 2025
49bb74b
fix(graph, node): add option to authenticate Flight service requests
isum Nov 20, 2025
ffd5415
fix(graph): update temporary predefined list of source context tables
isum Nov 21, 2025
829ed84
docs: add docs for Amp-powered subgraphs
isum Nov 21, 2025
78383b4
chore(core): reuse existing metric names
isum Dec 4, 2025
e8edcf2
fix(core, graph): minor adjustments after rebase
isum Dec 4, 2025
89abad5
fix(core, graph): resolve metric conflicts, use quoted table referenc…
isum Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,338 changes: 1,292 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ substreams = "=0.6.0"
substreams-entity-change = "2"
substreams-near-core = "=0.10.2"
rand = { version = "0.9.2", features = ["os_rng"] }
prometheus = "0.14.0"

# Dependencies related to Amp subgraphs
ahash = "0.8.11"
alloy = { version = "1.0.12", default-features = false, features = ["json-abi", "serde"] }
arrow = { version = "=55.0.0" }
arrow-flight = { version = "=55.0.0", features = ["flight-sql-experimental"] }
futures = "0.3.31"
half = "2.7.1"
indoc = "2.0.7"
lazy-regex = "3.4.1"
parking_lot = "0.12.4"
sqlparser-latest = { version = "0.57.0", package = "sqlparser", features = ["visitor"] }
tokio-util = "0.7.15"

# Incremental compilation on Rust 1.58 causes an ICE on build. As soon as graph node builds again, these can be removed.
[profile.test]
Expand Down
1 change: 1 addition & 0 deletions chain/ethereum/src/runtime/runtime_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
create_host_fns(abis, archive, call_cache, eth_adapters, eth_call_gas)
}
data_source::DataSource::Offchain(_) => vec![],
data_source::DataSource::Amp(_) => vec![],
};

Ok(host_fns)
Expand Down
13 changes: 13 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ tower = { git = "https://github.com/tower-rs/tower.git", features = ["full"] }
thiserror = { workspace = true }
anyhow = "1.0"

# Dependencies related to Amp subgraphs
alloy.workspace = true
arrow.workspace = true
chrono.workspace = true
futures.workspace = true
indoc.workspace = true
itertools.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
slog.workspace = true
strum.workspace = true
tokio-util.workspace = true

[dev-dependencies]
tower-test = { git = "https://github.com/tower-rs/tower.git" }
wiremock = "0.6.5"
171 changes: 171 additions & 0 deletions core/src/amp_subgraph/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::sync::Arc;

use alloy::primitives::BlockNumber;
use anyhow::Context;
use async_trait::async_trait;
use graph::{
amp,
components::{
link_resolver::{LinkResolver, LinkResolverContext},
metrics::MetricsRegistry,
store::{DeploymentLocator, SubgraphStore},
subgraph::SubgraphInstanceManager,
},
env::EnvVars,
log::factory::LoggerFactory,
prelude::CheapClone,
};
use slog::{debug, error};
use tokio_util::sync::CancellationToken;

use super::{runner, Metrics, Monitor};

/// Manages Amp subgraph runner futures.
///
/// Creates and schedules Amp subgraph runner futures for execution on demand.
/// Also handles stopping previously started Amp subgraph runners.
pub struct Manager<SS, NC> {
logger_factory: LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
env_vars: Arc<EnvVars>,
monitor: Monitor,
subgraph_store: Arc<SS>,
link_resolver: Arc<dyn LinkResolver>,
amp_client: Arc<NC>,
}

impl<SS, NC> Manager<SS, NC>
where
SS: SubgraphStore,
NC: amp::Client,
{
/// Creates a new Amp subgraph manager.
pub fn new(
logger_factory: &LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
env_vars: Arc<EnvVars>,
cancel_token: &CancellationToken,
subgraph_store: Arc<SS>,
link_resolver: Arc<dyn LinkResolver>,
amp_client: Arc<NC>,
) -> Self {
let logger = logger_factory.component_logger("AmpSubgraphManager", None);
let logger_factory = logger_factory.with_parent(logger);

let monitor = Monitor::new(&logger_factory, cancel_token);

Self {
logger_factory,
metrics_registry,
env_vars,
monitor,
subgraph_store,
link_resolver,
amp_client,
}
}
}

#[async_trait]
impl<SS, NC> SubgraphInstanceManager for Manager<SS, NC>
where
SS: SubgraphStore,
NC: amp::Client + Send + Sync + 'static,
{
async fn start_subgraph(
self: Arc<Self>,
deployment: DeploymentLocator,
stop_block: Option<i32>,
) {
let manager = self.cheap_clone();

self.monitor.start(
deployment.cheap_clone(),
Box::new(move |cancel_token| {
Box::pin(async move {
let logger = manager.logger_factory.subgraph_logger(&deployment);

let store = manager
.subgraph_store
.cheap_clone()
.writable(logger.cheap_clone(), deployment.id, Vec::new().into())
.await
.context("failed to create writable store")?;

let metrics = Metrics::new(
&logger,
manager.metrics_registry.cheap_clone(),
store.cheap_clone(),
deployment.hash.cheap_clone(),
);

let link_resolver = manager
.link_resolver
.for_manifest(&deployment.hash.to_string())
.context("failed to create link resolver")?;

let manifest_bytes = link_resolver
.cat(
&LinkResolverContext::new(&deployment.hash, &logger),
&deployment.hash.to_ipfs_link(),
)
.await
.context("failed to load subgraph manifest")?;

let raw_manifest = serde_yaml::from_slice(&manifest_bytes)
.context("failed to parse subgraph manifest")?;

let mut manifest = amp::Manifest::resolve::<graph_chain_ethereum::Chain, _>(
&logger,
manager.link_resolver.cheap_clone(),
manager.amp_client.cheap_clone(),
manager.env_vars.max_spec_version.cheap_clone(),
deployment.hash.cheap_clone(),
raw_manifest,
)
.await?;

if let Some(stop_block) = stop_block {
for data_source in manifest.data_sources.iter_mut() {
data_source.source.end_block = stop_block as BlockNumber;
}
}

store
.start_subgraph_deployment(&logger)
.await
.context("failed to start subgraph deployment")?;

let runner_context = runner::Context::new(
&logger,
&manager.env_vars.amp,
manager.amp_client.cheap_clone(),
store,
deployment.hash.cheap_clone(),
manifest,
metrics,
);

let runner_result = runner::new_runner(runner_context, cancel_token).await;

match manager.subgraph_store.stop_subgraph(&deployment).await {
Ok(()) => {
debug!(logger, "Subgraph writer stopped");
}
Err(e) => {
error!(logger, "Failed to stop subgraph writer";
"e" => ?e
);
}
}

runner_result
})
}),
);
}

async fn stop_subgraph(&self, deployment: DeploymentLocator) {
self.monitor.stop(deployment);
}
}
Loading