Skip to content

gnd: Support multiple subgraphs, grafting, subgraph composition in dev mode #6000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: krishna/graph-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions chain/ethereum/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ impl Transport {
.expect("Failed to connect to Ethereum IPC")
}

#[cfg(not(unix))]
pub async fn new_ipc(_ipc: &str) -> Self {
panic!("IPC connections are not supported on non-Unix platforms")
}

/// Creates a WebSocket transport.
pub async fn new_ws(ws: &str) -> Self {
ws::WebSocket::new(ws)
Expand Down
9 changes: 8 additions & 1 deletion chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ mod test {
blockchain::{DataSource as _, UnresolvedDataSource as _},
components::link_resolver::LinkResolver,
data::subgraph::LATEST_VERSION,
prelude::{async_trait, serde_yaml, JsonValueStream, Link},
prelude::{async_trait, serde_yaml, DeploymentHash, JsonValueStream, Link},
slog::{o, Discard, Logger},
substreams::{
module::{
Expand Down Expand Up @@ -705,6 +705,13 @@ mod test {
unimplemented!()
}

fn for_deployment(
&self,
_deployment: DeploymentHash,
) -> Result<Box<dyn LinkResolver>, Error> {
unimplemented!()
}

async fn cat(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
Ok(gen_package().encode_to_vec())
}
Expand Down
7 changes: 6 additions & 1 deletion core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,12 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;

// Allow for infinite retries for subgraph definition files.
let link_resolver = Arc::from(self.link_resolver.with_retries());
let link_resolver = Arc::from(
self.link_resolver
.for_deployment(deployment.hash.clone())
.map_err(SubgraphRegistrarError::Unknown)?
.with_retries(),
);

// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
self.subgraph_store
Expand Down
6 changes: 5 additions & 1 deletion core/src/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
));
}

let file_bytes = self
let link_resolver = self
.link_resolver
.for_deployment(loc.hash.clone())
.map_err(SubgraphAssignmentProviderError::ResolveError)?;

let file_bytes = link_resolver
.cat(&logger, &loc.hash.to_ipfs_link())
.await
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
Expand Down
45 changes: 34 additions & 11 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ where
start_block_override: Option<BlockPtr>,
graft_block_override: Option<BlockPtr>,
history_blocks: Option<i32>,
ignore_graft_base: bool,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
// We don't have a location for the subgraph yet; that will be
// assigned when we deploy for real. For logging purposes, make up a
Expand All @@ -286,9 +287,14 @@ where
.logger_factory
.subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone()));

let resolver: Arc<dyn LinkResolver> = Arc::from(
self.resolver
.for_deployment(hash.clone())
.map_err(SubgraphRegistrarError::Unknown)?,
);

let raw: serde_yaml::Mapping = {
let file_bytes = self
.resolver
let file_bytes = resolver
.cat(&logger, &hash.to_ipfs_link())
.await
.map_err(|e| {
Expand Down Expand Up @@ -323,8 +329,9 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&resolver,
history_blocks,
ignore_graft_base,
)
.await?
}
Expand All @@ -341,8 +348,9 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&resolver,
history_blocks,
ignore_graft_base,
)
.await?
}
Expand All @@ -359,8 +367,9 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&resolver,
history_blocks,
ignore_graft_base,
)
.await?
}
Expand All @@ -377,8 +386,9 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&resolver,
history_blocks,
ignore_graft_base,
)
.await?
}
Expand Down Expand Up @@ -565,12 +575,14 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
version_switching_mode: SubgraphVersionSwitchingMode,
resolver: &Arc<dyn LinkResolver>,
history_blocks_override: Option<i32>,
ignore_graft_base: bool,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();

let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
deployment.clone(),
raw,
resolver,
&resolver,
logger,
ENV_VARS.max_spec_version.clone(),
)
Expand All @@ -585,10 +597,21 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
Err(StoreError::DeploymentNotFound(_)) => true,
Err(e) => return Err(SubgraphRegistrarError::StoreError(e)),
};
let manifest = unvalidated
.validate(store.cheap_clone(), should_validate)
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;

let manifest = {
let should_validate = should_validate && !ignore_graft_base;

let mut manifest = unvalidated
.validate(store.cheap_clone(), should_validate)
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;

if ignore_graft_base {
manifest.graft = None;
}

manifest
};

let network_name: Word = manifest.network_name().into();

Expand Down
132 changes: 128 additions & 4 deletions graph/src/components/link_resolver/file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;

Expand All @@ -6,22 +7,35 @@ use async_trait::async_trait;
use slog::Logger;

use crate::data::subgraph::Link;
use crate::prelude::{Error, JsonValueStream, LinkResolver as LinkResolverTrait};
use crate::prelude::{DeploymentHash, Error, JsonValueStream, LinkResolver as LinkResolverTrait};

#[derive(Clone, Debug)]
pub struct FileLinkResolver {
base_dir: Option<PathBuf>,
timeout: Duration,
// This is a hashmap that maps the alias name to the path of the file that is aliased
aliases: HashMap<String, PathBuf>,
}

impl Default for FileLinkResolver {
fn default() -> Self {
Self {
base_dir: None,
timeout: Duration::from_secs(30),
aliases: HashMap::new(),
}
}
}

impl FileLinkResolver {
/// Create a new FileLinkResolver
///
/// All paths are treated as absolute paths.
pub fn new() -> Self {
pub fn new(base_dir: Option<PathBuf>, aliases: HashMap<String, PathBuf>) -> Self {
Self {
base_dir: None,
base_dir: base_dir,
timeout: Duration::from_secs(30),
aliases,
}
}

Expand All @@ -33,12 +47,18 @@ impl FileLinkResolver {
Self {
base_dir: Some(base_dir.as_ref().to_owned()),
timeout: Duration::from_secs(30),
aliases: HashMap::new(),
}
}

fn resolve_path(&self, link: &str) -> PathBuf {
let path = Path::new(link);

// If the path is an alias, use the aliased path
if let Some(aliased) = self.aliases.get(link) {
return aliased.clone();
}

// If the path is already absolute or if we don't have a base_dir, return it as is
if path.is_absolute() || self.base_dir.is_none() {
path.to_owned()
Expand All @@ -47,6 +67,42 @@ impl FileLinkResolver {
self.base_dir.as_ref().unwrap().join(link)
}
}

/// This method creates a new resolver that is scoped to a specific subgraph
/// It will set the base directory to the parent directory of the manifest path
/// This is required because paths mentioned in the subgraph manifest are relative paths
/// and we need a new resolver with the right base directory for the specific subgraph
fn clone_for_deployment(&self, deployment: DeploymentHash) -> Result<Self, Error> {
let mut resolver = self.clone();

let deployment_str = deployment.to_string();

// Create a path to the manifest based on the current resolver's
// base directory or default to using the deployment string as path
// If the deployment string is an alias, use the aliased path
let manifest_path = if let Some(aliased) = self.aliases.get(&deployment_str) {
aliased.clone()
} else {
match &resolver.base_dir {
Some(dir) => dir.join(&deployment_str),
None => PathBuf::from(deployment_str),
}
};

let canonical_manifest_path = manifest_path
.canonicalize()
.map_err(|e| Error::from(anyhow!("Failed to canonicalize manifest path: {}", e)))?;

// The manifest path is the path of the subgraph manifest file in the build directory
// We use the parent directory as the base directory for the new resolver
let base_dir = canonical_manifest_path
.parent()
.ok_or_else(|| Error::from(anyhow!("Manifest path has no parent directory")))?
.to_path_buf();

resolver.base_dir = Some(base_dir);
Ok(resolver)
}
}

pub fn remove_prefix(link: &str) -> &str {
Expand Down Expand Up @@ -87,6 +143,13 @@ impl LinkResolverTrait for FileLinkResolver {
}
}

fn for_deployment(
&self,
deployment: DeploymentHash,
) -> Result<Box<dyn LinkResolverTrait>, Error> {
Ok(Box::new(self.clone_for_deployment(deployment)?))
}

async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
Err(anyhow!("get_block is not implemented for FileLinkResolver").into())
}
Expand Down Expand Up @@ -118,7 +181,7 @@ mod tests {
file.write_all(test_content).unwrap();

// Create a resolver without a base directory
let resolver = FileLinkResolver::new();
let resolver = FileLinkResolver::default();
let logger = slog::Logger::root(slog::Discard, slog::o!());

// Test valid path resolution
Expand Down Expand Up @@ -220,4 +283,65 @@ mod tests {
let _ = fs::remove_file(test_file_path);
let _ = fs::remove_dir(temp_dir);
}

#[tokio::test]
async fn test_file_resolver_with_aliases() {
// Create a temporary directory for test files
let temp_dir = env::temp_dir().join("file_resolver_test_aliases");
let _ = fs::create_dir_all(&temp_dir);

// Create two test files with different content
let test_file1_path = temp_dir.join("file.txt");
let test_content1 = b"This is the file content";
let mut file1 = fs::File::create(&test_file1_path).unwrap();
file1.write_all(test_content1).unwrap();

let test_file2_path = temp_dir.join("another_file.txt");
let test_content2 = b"This is another file content";
let mut file2 = fs::File::create(&test_file2_path).unwrap();
file2.write_all(test_content2).unwrap();

// Create aliases mapping
let mut aliases = HashMap::new();
aliases.insert("alias1".to_string(), test_file1_path.clone());
aliases.insert("alias2".to_string(), test_file2_path.clone());
aliases.insert("deployment-id".to_string(), test_file1_path.clone());

// Create resolver with aliases
let resolver = FileLinkResolver::new(Some(temp_dir.clone()), aliases);
let logger = slog::Logger::root(slog::Discard, slog::o!());

// Test resolving by aliases
let link1 = Link {
link: "alias1".to_string(),
};
let result1 = resolver.cat(&logger, &link1).await.unwrap();
assert_eq!(result1, test_content1);

let link2 = Link {
link: "alias2".to_string(),
};
let result2 = resolver.cat(&logger, &link2).await.unwrap();
assert_eq!(result2, test_content2);

// Test that the alias works in for_deployment as well
let deployment = DeploymentHash::new("deployment-id").unwrap();
let deployment_resolver = resolver.clone_for_deployment(deployment).unwrap();

let expected_dir = test_file1_path.parent().unwrap();
let deployment_base_dir = deployment_resolver.base_dir.clone().unwrap();

let canonical_expected_dir = expected_dir.canonicalize().unwrap();
let canonical_deployment_dir = deployment_base_dir.canonicalize().unwrap();

assert_eq!(
canonical_deployment_dir, canonical_expected_dir,
"Build directory paths don't match"
);

// Clean up
let _ = fs::remove_file(test_file1_path);
let _ = fs::remove_file(test_file2_path);
let _ = fs::remove_dir(temp_dir);
}
}
Loading
Loading