Skip to content

Split the subgraph_deployment table into two #6003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ jobs:
ports:
- 5432:5432
env:
RUSTFLAGS: "-C link-arg=-fuse-ld=lld -D warnings"
GRAPH_IPFS_REQUEST_TIMEOUT: "60"
RUSTFLAGS: "-C link-arg=-fuse-ld=lld -D warnings --cfg test_with_ipfs"
RUNNER_TESTS_WAIT_FOR_SYNC_SECS: "600"
steps:
- name: Tune GitHub hosted runner to reduce flakiness
Expand Down
85 changes: 50 additions & 35 deletions docs/implementation/metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
List of all known subgraph names. Maintained in the primary, but there is a background job that periodically copies the table from the primary to all other shards. Those copies are used for queries when the primary is down.

| Column | Type | Use |
|-------------------|--------------|-------------------------------------------|
| ----------------- | ------------ | ----------------------------------------- |
| `id` | `text!` | primary key, UUID |
| `name` | `text!` | user-chosen name |
| `current_version` | `text` | `subgraph_version.id` for current version |
Expand All @@ -18,29 +18,27 @@ List of all known subgraph names. Maintained in the primary, but there is a back

The `id` is used by the hosted explorer to reference the subgraph.


### `subgraphs.subgraph_version`

Mapping of subgraph names from `subgraph` to IPFS hashes. Maintained in the primary, but there is a background job that periodically copies the table from the primary to all other shards. Those copies are used for queries when the primary is down.

| Column | Type | Use |
|---------------|--------------|-------------------------|
| ------------- | ------------ | ----------------------- |
| `id` | `text!` | primary key, UUID |
| `subgraph` | `text!` | `subgraph.id` |
| `deployment` | `text!` | IPFS hash of deployment |
| `created_at` | `numeric` | UNIX timestamp |
| `vid` | `int8!` | unused |
| `block_range` | `int4range!` | unused |


## Managing a deployment

Directory of all deployments. Maintained in the primary, but there is a background job that periodically copies the table from the primary to all other shards. Those copies are used for queries when the primary is down.

### `public.deployment_schemas`

| Column | Type | Use |
|--------------|----------------|----------------------------------------------|
| ------------ | -------------- | -------------------------------------------- |
| `id` | `int4!` | primary key |
| `subgraph` | `text!` | IPFS hash of deployment |
| `name` | `text!` | name of `sgdNNN` schema |
Expand All @@ -52,36 +50,53 @@ Directory of all deployments. Maintained in the primary, but there is a backgrou

There can be multiple copies of the same deployment, but at most one per shard. The `active` flag indicates which of these copies will be used for queries; `graph-node` makes sure that there is always exactly one for each IPFS hash.

### `subgraphs.subgraph_deployment`
### `subgraphs.head`

Details about a deployment that change on every block. Maintained in the
shard alongside the deployment's data in `sgdNNN`.

| Column | Type | Use |
| ----------------- | ---------- | -------------------------------------------- |
| `id` | `integer!` | primary key, same as `deployment_schemas.id` |
| `block_hash` | `bytea` | current subgraph head |
| `block_number` | `numeric` | |
| `entity_count` | `numeric!` | total number of entities |
| `firehose_cursor` | `text` | |

The head block pointer in `block_number` and `block_hash` is the latest
block that has been fully processed by the deployment. It will be `null`
until the deployment is fully initialized, and only set when the deployment
processes the first block. For deployments that are grafted or being copied,
the head block pointer will be `null` until the graft/copy has finished
which can take considerable time.

### `subgraphs.deployment`

Details about a deployment to track sync progress etc. Maintained in the
shard alongside the deployment's data in `sgdNNN`. The table should only
contain frequently changing data, but for historical reasons contains also
static data.

| Column | Type | Use |
|--------------------------------------|------------|----------------------------------------------|
| `id` | `integer!` | primary key, same as `deployment_schemas.id` |
| `deployment` | `text!` | IPFS hash |
| `failed` | `boolean!` | |
| `synced` | `boolean!` | |
| `earliest_block_number` | `integer!` | earliest block for which we have data |
| `latest_ethereum_block_hash` | `bytea` | current subgraph head |
| `latest_ethereum_block_number` | `numeric` | |
| `entity_count` | `numeric!` | total number of entities |
| `graft_base` | `text` | IPFS hash of graft base |
| `graft_block_hash` | `bytea` | graft block |
| `graft_block_number` | `numeric` | |
| `reorg_count` | `integer!` | |
| `current_reorg_depth` | `integer!` | |
| `max_reorg_depth` | `integer!` | |
| `fatal_error` | `text` | |
| `non_fatal_errors` | `text[]` | |
| `health` | `health!` | |
| `last_healthy_ethereum_block_hash` | `bytea` | |
| `last_healthy_ethereum_block_number` | `numeric` | |
| `firehose_cursor` | `text` | |
| `debug_fork` | `text` | |
contain data that changes fairly infrequently, but for historical reasons
contains also static data.

| Column | Type | Use |
| ------------------------------------ | ------------- | ---------------------------------------------------- |
| `id` | `integer!` | primary key, same as `deployment_schemas.id` |
| `subgraph` | `text!` | IPFS hash |
| `earliest_block_number` | `integer!` | earliest block for which we have data |
| `health` | `health!` | |
| `failed` | `boolean!` | |
| `fatal_error` | `text` | |
| `non_fatal_errors` | `text[]` | |
| `graft_base` | `text` | IPFS hash of graft base |
| `graft_block_hash` | `bytea` | graft block |
| `graft_block_number` | `numeric` | |
| `reorg_count` | `integer!` | |
| `current_reorg_depth` | `integer!` | |
| `max_reorg_depth` | `integer!` | |
| `last_healthy_ethereum_block_hash` | `bytea` | |
| `last_healthy_ethereum_block_number` | `numeric` | |
| `debug_fork` | `text` | |
| `synced_at` | `timestamptz` | time when deployment first reach chain head |
| `synced_at_block_number` | `integer` | block number where deployment first reach chain head |

The columns `reorg_count`, `current_reorg_depth`, and `max_reorg_depth` are
set during indexing. They are used to determine whether a reorg happened
Expand All @@ -94,7 +109,7 @@ Details about a deployment that rarely change. Maintained in the
shard alongside the deployment's data in `sgdNNN`.

| Column | Type | Use |
|-------------------------|------------|------------------------------------------------------|
| ----------------------- | ---------- | ---------------------------------------------------- |
| `id` | `integer!` | primary key, same as `deployment_schemas.id` |
| `spec_version` | `text!` | |
| `description` | `text` | |
Expand All @@ -115,7 +130,7 @@ but there is a background job that periodically copies the table from the
primary to all other shards.

| Column | Type | Use |
|---------|-------|---------------------------------------------|
| ------- | ----- | ------------------------------------------- |
| id | int4! | primary key, ref to `deployment_schemas.id` |
| node_id | text! | name of index node |

Expand Down Expand Up @@ -147,7 +162,7 @@ should have the 'account-like' optimization turned on.
Details about features that a deployment uses, Maintained in the primary.

| Column | Type | Use |
|----------------|-----------|-------------|
| -------------- | --------- | ----------- |
| `id` | `text!` | primary key |
| `spec_version` | `text!` | |
| `api_version` | `text` | |
Expand Down
18 changes: 18 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ pub struct EnvVars {
/// Set by the environment variable `GRAPH_FIREHOSE_BLOCK_BATCH_SIZE`.
/// The default value is 10.
pub firehose_block_batch_size: usize,
/// Timeouts to use for various IPFS requests set by
/// `GRAPH_IPFS_REQUEST_TIMEOUT`. Defaults to 60 seconds for release
/// builds and one second for debug builds to speed up tests. The value
/// is in seconds.
pub ipfs_request_timeout: Duration,
}

impl EnvVars {
Expand All @@ -256,6 +261,16 @@ impl EnvVars {
let graphql = InnerGraphQl::init_from_env()?.into();
let mapping_handlers = InnerMappingHandlers::init_from_env()?.into();
let store = InnerStore::init_from_env()?.try_into()?;
let ipfs_request_timeout = match inner.ipfs_request_timeout {
Some(timeout) => Duration::from_secs(timeout),
None => {
if cfg!(debug_assertions) {
Duration::from_secs(1)
} else {
Duration::from_secs(60)
}
}
};

Ok(Self {
graphql,
Expand Down Expand Up @@ -330,6 +345,7 @@ impl EnvVars {
firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit,
firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout,
firehose_block_batch_size: inner.firehose_block_fetch_batch_size,
ipfs_request_timeout,
})
}

Expand Down Expand Up @@ -510,6 +526,8 @@ struct Inner {
firehose_block_fetch_timeout: u64,
#[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_BATCH_SIZE", default = "10")]
firehose_block_fetch_batch_size: usize,
#[envconfig(from = "GRAPH_IPFS_REQUEST_TIMEOUT")]
ipfs_request_timeout: Option<u64>,
}

#[derive(Clone, Debug)]
Expand Down
10 changes: 4 additions & 6 deletions graph/src/ipfs/gateway_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use async_trait::async_trait;
Expand All @@ -9,6 +8,7 @@ use http::header::CACHE_CONTROL;
use reqwest::StatusCode;
use slog::Logger;

use crate::env::ENV_VARS;
use crate::ipfs::IpfsClient;
use crate::ipfs::IpfsError;
use crate::ipfs::IpfsRequest;
Expand All @@ -17,10 +17,6 @@ use crate::ipfs::IpfsResult;
use crate::ipfs::RetryPolicy;
use crate::ipfs::ServerAddress;

/// The request that verifies that the IPFS gateway is accessible is generally fast because
/// it does not involve querying the distributed network.
const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);

/// A client that connects to an IPFS gateway.
///
/// Reference: <https://specs.ipfs.tech/http-gateways/path-gateway>
Expand Down Expand Up @@ -99,7 +95,7 @@ impl IpfsGatewayClient {
}
});

let ok = tokio::time::timeout(TEST_REQUEST_TIMEOUT, fut)
let ok = tokio::time::timeout(ENV_VARS.ipfs_request_timeout, fut)
.await
.map_err(|_| anyhow!("request timed out"))??;

Expand Down Expand Up @@ -151,6 +147,8 @@ impl IpfsClient for IpfsGatewayClient {

#[cfg(test)]
mod tests {
use std::time::Duration;

use bytes::BytesMut;
use futures03::TryStreamExt;
use wiremock::matchers as m;
Expand Down
7 changes: 1 addition & 6 deletions graph/src/ipfs/retry_policy.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use std::time::Duration;

use slog::Logger;

use crate::ipfs::error::IpfsError;
use crate::prelude::*;
use crate::util::futures::retry;
use crate::util::futures::RetryConfig;

/// The default maximum delay between retries.
const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(60);

/// Describes retry behavior when IPFS requests fail.
#[derive(Clone, Copy, Debug)]
pub enum RetryPolicy {
Expand All @@ -33,7 +28,7 @@ impl RetryPolicy {
) -> RetryConfig<O, IpfsError> {
retry(operation_name, logger)
.limit(ENV_VARS.mappings.ipfs_max_attempts)
.max_delay(DEFAULT_MAX_DELAY)
.max_delay(ENV_VARS.ipfs_request_timeout)
.when(move |result: &Result<O, IpfsError>| match result {
Ok(_) => false,
Err(err) => match self {
Expand Down
9 changes: 3 additions & 6 deletions graph/src/ipfs/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use reqwest::Response;
use reqwest::StatusCode;
use slog::Logger;

use crate::env::ENV_VARS;
use crate::ipfs::IpfsClient;
use crate::ipfs::IpfsError;
use crate::ipfs::IpfsRequest;
Expand All @@ -17,10 +18,6 @@ use crate::ipfs::IpfsResult;
use crate::ipfs::RetryPolicy;
use crate::ipfs::ServerAddress;

/// The request that verifies that the IPFS RPC API is accessible is generally fast because
/// it does not involve querying the distributed network.
const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);

/// A client that connects to an IPFS RPC API.
///
/// Reference: <https://docs.ipfs.tech/reference/kubo/rpc>
Expand Down Expand Up @@ -60,7 +57,7 @@ impl IpfsRpcClient {
server_address: ServerAddress::new(server_address)?,
http_client: reqwest::Client::new(),
logger: logger.to_owned(),
test_request_timeout: TEST_REQUEST_TIMEOUT,
test_request_timeout: ENV_VARS.ipfs_request_timeout,
})
}

Expand Down Expand Up @@ -88,7 +85,7 @@ impl IpfsRpcClient {
}
});

let ok = tokio::time::timeout(TEST_REQUEST_TIMEOUT, fut)
let ok = tokio::time::timeout(ENV_VARS.ipfs_request_timeout, fut)
.await
.map_err(|_| anyhow!("request timed out"))??;

Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/unused_deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn record(store: Arc<SubgraphStore>) -> Result<(), Error> {
let recorded = store.record_unused_deployments()?;

for unused in store.list_unused_deployments(unused::Filter::New)? {
if recorded.iter().any(|r| r.deployment == unused.deployment) {
if recorded.iter().any(|r| r.subgraph == unused.deployment) {
add_row(&mut list, unused);
}
}
Expand Down
Loading
Loading