From 52c9d37f59faede14adabe8dc82b346826e5c407 Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:46:00 +0800 Subject: [PATCH 01/15] refuse pass relative path to remote daemon --- binaries/coordinator/src/run/mod.rs | 12 ++++++++++- libraries/core/src/descriptor/mod.rs | 11 +++++++++- libraries/core/src/descriptor/validate.rs | 26 ++++++++++++++++++++--- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index f37613581..889340cea 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - dataflow.check(&working_dir)?; + let remote_machine_id: Vec<_> = daemon_connections + .iter() + .filter_map(|(id, c)| { + if !c.listen_socket.ip().is_loopback() { + Some(id.as_str()) + } else { + None + } + }) + .collect(); + dataflow.check_in_daemon(&working_dir, &remote_machine_id)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index fc836412a..59fafae99 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,16 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None).wrap_err("Dataflow could not be validated.") + } + + pub fn check_in_daemon( + &self, + working_dir: &Path, + remote_machine_id: &[&str], + ) -> eyre::Result<()> { + validate::check_dataflow(self, working_dir, Some(remote_machine_id)) + .wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fe5580967..fdf883130 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,7 +1,7 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, get_python_path, }; @@ -12,18 +12,38 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { +pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path, remote_daemon_id: Option<&[&str]>) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; // check that nodes and operators exist for node in &nodes { match &node.kind { - descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() { + descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { SHELL_SOURCE => (), source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. + } else if let Some(remote_daemon_id) = remote_daemon_id { + if remote_daemon_id.contains(&node.deploy.machine.as_str()) { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`)", + node.id + ); + } + info!("skipping path check for remote node `{}`", node.id); + } else { + resolve_path(source, working_dir).wrap_err_with(|| { + format!("Could not find source path `{}`", source) + })?; + } } else { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; From ef0455678736f53ca8743fd204205963082daea5 Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:52:08 +0800 Subject: [PATCH 02/15] fix fmt --- .vscode/settings.json | 3 +++ libraries/core/src/descriptor/mod.rs | 3 ++- libraries/core/src/descriptor/validate.rs | 6 +++++- 3 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..415b3c10e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "cmake.sourceDirectory": "/home/xuchang/github/dora/examples/cmake-dataflow" +} \ No newline at end of file diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 59fafae99..04276244c 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,8 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, None).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None) + .wrap_err("Dataflow could not be validated.") } pub fn check_in_daemon( diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fdf883130..f72add968 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -12,7 +12,11 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path, remote_daemon_id: Option<&[&str]>) -> eyre::Result<()> { +pub fn check_dataflow( + dataflow: &Descriptor, + working_dir: &Path, + remote_daemon_id: Option<&[&str]>, +) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; From 3567796ecea42576c687af17e673b996043cc29d Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:52:42 +0800 Subject: [PATCH 03/15] del vscode --- .vscode/settings.json | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 415b3c10e..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "cmake.sourceDirectory": "/home/xuchang/github/dora/examples/cmake-dataflow" -} \ No newline at end of file From 25e5771e5decf605303800affae5f800da511823 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 09:55:44 +0800 Subject: [PATCH 04/15] add local ip checker in dora start --- binaries/cli/src/main.rs | 13 ++++++++++--- libraries/core/src/descriptor/validate.rs | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 099aed2e8..1dda3ad13 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -332,9 +332,16 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; + if !coordinator_addr.is_loopback() { + // use empty string to indicate that all nodes should be + // referred to as remote node, such that all paths are + // checked as absolute + dataflow_descriptor.check_in_daemon(&working_dir, &[&String::default()])?; + } else { + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; + } let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index f72add968..83453bf6a 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -29,7 +29,7 @@ pub fn check_dataflow( if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. } else if let Some(remote_daemon_id) = remote_daemon_id { - if remote_daemon_id.contains(&node.deploy.machine.as_str()) { + if remote_daemon_id.contains(&node.deploy.machine.as_str()) || remote_daemon_id.contains(&String::default().as_str()) { let path = Path::new(&source); let path = if path.extension().is_none() { path.with_extension(EXE_EXTENSION) From 22007af669deccaddda3653e16cab4b6dbab44dd Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 09:56:24 +0800 Subject: [PATCH 05/15] fix fmt --- binaries/cli/src/main.rs | 4 ++-- libraries/core/src/descriptor/validate.rs | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 1dda3ad13..ce1db69fb 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -333,8 +333,8 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); if !coordinator_addr.is_loopback() { - // use empty string to indicate that all nodes should be - // referred to as remote node, such that all paths are + // use empty string to indicate that all nodes should be + // referred to as remote node, such that all paths are // checked as absolute dataflow_descriptor.check_in_daemon(&working_dir, &[&String::default()])?; } else { diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 83453bf6a..5c90f8fb2 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -29,7 +29,9 @@ pub fn check_dataflow( if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. } else if let Some(remote_daemon_id) = remote_daemon_id { - if remote_daemon_id.contains(&node.deploy.machine.as_str()) || remote_daemon_id.contains(&String::default().as_str()) { + if remote_daemon_id.contains(&node.deploy.machine.as_str()) + || remote_daemon_id.contains(&String::default().as_str()) + { let path = Path::new(&source); let path = if path.extension().is_none() { path.with_extension(EXE_EXTENSION) From f237b4b5f3c0a12c696fe952bf5b17282abf937e Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 09:59:53 +0800 Subject: [PATCH 06/15] minor --- binaries/cli/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index ce1db69fb..8f199752b 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -335,7 +335,8 @@ fn run() -> eyre::Result<()> { if !coordinator_addr.is_loopback() { // use empty string to indicate that all nodes should be // referred to as remote node, such that all paths are - // checked as absolute + // checked as absolute, the machine id of remote daemon + // should not be empty string. dataflow_descriptor.check_in_daemon(&working_dir, &[&String::default()])?; } else { dataflow_descriptor From 607391e5b734abdc73e08afb1d688b8cd64f884a Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 10:36:20 +0800 Subject: [PATCH 07/15] del dotnet and ghc to left enough space --- .github/workflows/ci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9aee9303..fafa1e2ff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -140,7 +140,9 @@ jobs: with: python-version: "3.10" - name: "Python Dataflow example" - run: cargo run --example python-dataflow + run: | + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + cargo run --example python-dataflow - uses: conda-incubator/setup-miniconda@v3 with: auto-activate-base: true From 3dfe4c87013d8b874afd8dd15e07ebc56ae65638 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 10:44:06 +0800 Subject: [PATCH 08/15] del dotnet and ghc to left enough space --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fafa1e2ff..89d5f6c1d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -141,7 +141,8 @@ jobs: python-version: "3.10" - name: "Python Dataflow example" run: | - sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /usr/share/dotnet + sudo rm -rf /opt/ghc cargo run --example python-dataflow - uses: conda-incubator/setup-miniconda@v3 with: From b0c4e13e1e9b85083830060b37f0545fe0d4e72a Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 10:49:19 +0800 Subject: [PATCH 09/15] del large-packages to release enough space --- .github/workflows/ci.yml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 89d5f6c1d..d1d14431b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,7 +94,7 @@ jobs: android: true dotnet: true haskell: true - large-packages: false + large-packages: true docker-images: true swap-storage: false - name: Free disk Space (Windows) @@ -140,10 +140,7 @@ jobs: with: python-version: "3.10" - name: "Python Dataflow example" - run: | - sudo rm -rf /usr/share/dotnet - sudo rm -rf /opt/ghc - cargo run --example python-dataflow + run: cargo run --example python-dataflow - uses: conda-incubator/setup-miniconda@v3 with: auto-activate-base: true From 5a007021f044703f85c0e4c6125c62fe1c2c3df7 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 10:58:02 +0800 Subject: [PATCH 10/15] try to del swap-storage to release enough space --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1d14431b..25ab27d9a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,9 +94,9 @@ jobs: android: true dotnet: true haskell: true - large-packages: true + large-packages: false docker-images: true - swap-storage: false + swap-storage: true - name: Free disk Space (Windows) if: runner.os == 'Windows' run: | From cf37cbe926bbb0794d8b5f23b760d03131df069f Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 11:38:17 +0800 Subject: [PATCH 11/15] del tool-cache to left enough space --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25ab27d9a..41cc90d08 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,7 +87,7 @@ jobs: with: # this might remove tools that are actually needed, # if set to "true" but frees about 6 GB - tool-cache: false + tool-cache: tru # all of these default to true, but feel free to set to # "false" if necessary for your workflow From aa61fa8027845143f8807c1fc1dcadfa0488e9af Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 11:56:05 +0800 Subject: [PATCH 12/15] try to del tool-cache to release enough space --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 41cc90d08..e90bb6bea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,7 +87,7 @@ jobs: with: # this might remove tools that are actually needed, # if set to "true" but frees about 6 GB - tool-cache: tru + tool-cache: true # all of these default to true, but feel free to set to # "false" if necessary for your workflow From adf20fb5a9704d5f803e8fd1d084a55d4056ba9a Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 17:52:57 +0800 Subject: [PATCH 13/15] add coodinator_is_remote argument --- binaries/cli/src/main.rs | 6 +----- binaries/coordinator/src/run/mod.rs | 2 +- libraries/core/src/descriptor/mod.rs | 5 +++-- libraries/core/src/descriptor/validate.rs | 5 +++-- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 8f199752b..f1fbc3aab 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -333,11 +333,7 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); if !coordinator_addr.is_loopback() { - // use empty string to indicate that all nodes should be - // referred to as remote node, such that all paths are - // checked as absolute, the machine id of remote daemon - // should not be empty string. - dataflow_descriptor.check_in_daemon(&working_dir, &[&String::default()])?; + dataflow_descriptor.check_in_daemon(&working_dir, &vec![], true)?; } else { dataflow_descriptor .check(&working_dir) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 889340cea..534b857bb 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -34,7 +34,7 @@ pub(super) async fn spawn_dataflow( } }) .collect(); - dataflow.check_in_daemon(&working_dir, &remote_machine_id)?; + dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 04276244c..8f0428e85 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,7 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, None) + validate::check_dataflow(self, working_dir, None, false) .wrap_err("Dataflow could not be validated.") } @@ -141,8 +141,9 @@ impl Descriptor { &self, working_dir: &Path, remote_machine_id: &[&str], + coordinator_is_remote: bool, ) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, Some(remote_machine_id)) + validate::check_dataflow(self, working_dir, Some(remote_machine_id), coordinator_is_remote) .wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 5c90f8fb2..92ba57143 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -16,10 +16,11 @@ pub fn check_dataflow( dataflow: &Descriptor, working_dir: &Path, remote_daemon_id: Option<&[&str]>, + coordinator_is_remote: bool, ) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; - + // check that nodes and operators exist for node in &nodes { match &node.kind { @@ -30,7 +31,7 @@ pub fn check_dataflow( info!("{source} is a URL."); // TODO: Implement url check. } else if let Some(remote_daemon_id) = remote_daemon_id { if remote_daemon_id.contains(&node.deploy.machine.as_str()) - || remote_daemon_id.contains(&String::default().as_str()) + || coordinator_is_remote { let path = Path::new(&source); let path = if path.extension().is_none() { From 1a9686a0986b812101b0ce724f8c15a3c0d1d1c5 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 17:55:48 +0800 Subject: [PATCH 14/15] fix fmt --- libraries/core/src/descriptor/mod.rs | 9 +++++++-- libraries/core/src/descriptor/validate.rs | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 8f0428e85..5300eeec4 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -143,8 +143,13 @@ impl Descriptor { remote_machine_id: &[&str], coordinator_is_remote: bool, ) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, Some(remote_machine_id), coordinator_is_remote) - .wrap_err("Dataflow could not be validated.") + validate::check_dataflow( + self, + working_dir, + Some(remote_machine_id), + coordinator_is_remote, + ) + .wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 92ba57143..9c4e89dde 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -20,7 +20,7 @@ pub fn check_dataflow( ) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; - + // check that nodes and operators exist for node in &nodes { match &node.kind { From 9ebd17200459321f283df8561b07bc07b54b48ea Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 12 Jun 2024 18:19:28 +0800 Subject: [PATCH 15/15] minor --- binaries/cli/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f1fbc3aab..413fc1d0f 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -333,7 +333,7 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); if !coordinator_addr.is_loopback() { - dataflow_descriptor.check_in_daemon(&working_dir, &vec![], true)?; + dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; } else { dataflow_descriptor .check(&working_dir)