diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9aee9303..e90bb6bea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,7 +87,7 @@ jobs: with: # this might remove tools that are actually needed, # if set to "true" but frees about 6 GB - tool-cache: false + tool-cache: true # all of these default to true, but feel free to set to # "false" if necessary for your workflow @@ -96,7 +96,7 @@ jobs: haskell: true large-packages: false docker-images: true - swap-storage: false + swap-storage: true - name: Free disk Space (Windows) if: runner.os == 'Windows' run: | diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 099aed2e8..413fc1d0f 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -332,9 +332,13 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; + if !coordinator_addr.is_loopback() { + dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; + } else { + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; + } let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index f37613581..534b857bb 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - dataflow.check(&working_dir)?; + let remote_machine_id: Vec<_> = daemon_connections + .iter() + .filter_map(|(id, c)| { + if !c.listen_socket.ip().is_loopback() { + Some(id.as_str()) + } else { + None + } + }) + .collect(); + dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index fc836412a..5300eeec4 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,23 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None, false) + .wrap_err("Dataflow could not be validated.") + } + + pub fn check_in_daemon( + &self, + working_dir: &Path, + remote_machine_id: &[&str], + coordinator_is_remote: bool, + ) -> eyre::Result<()> { + validate::check_dataflow( + self, + working_dir, + Some(remote_machine_id), + coordinator_is_remote, + ) + .wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fe5580967..9c4e89dde 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,7 +1,7 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, get_python_path, }; @@ -12,18 +12,45 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { +pub fn check_dataflow( + dataflow: &Descriptor, + working_dir: &Path, + remote_daemon_id: Option<&[&str]>, + coordinator_is_remote: bool, +) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; // check that nodes and operators exist for node in &nodes { match &node.kind { - descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() { + descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { SHELL_SOURCE => (), source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. + } else if let Some(remote_daemon_id) = remote_daemon_id { + if remote_daemon_id.contains(&node.deploy.machine.as_str()) + || coordinator_is_remote + { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`)", + node.id + ); + } + info!("skipping path check for remote node `{}`", node.id); + } else { + resolve_path(source, working_dir).wrap_err_with(|| { + format!("Could not find source path `{}`", source) + })?; + } } else { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?;