From 52c9d37f59faede14adabe8dc82b346826e5c407 Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:46:00 +0800 Subject: [PATCH 01/13] refuse pass relative path to remote daemon --- binaries/coordinator/src/run/mod.rs | 12 ++++++++++- libraries/core/src/descriptor/mod.rs | 11 +++++++++- libraries/core/src/descriptor/validate.rs | 26 ++++++++++++++++++++--- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index f37613581..889340cea 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - dataflow.check(&working_dir)?; + let remote_machine_id: Vec<_> = daemon_connections + .iter() + .filter_map(|(id, c)| { + if !c.listen_socket.ip().is_loopback() { + Some(id.as_str()) + } else { + None + } + }) + .collect(); + dataflow.check_in_daemon(&working_dir, &remote_machine_id)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index fc836412a..59fafae99 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,16 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None).wrap_err("Dataflow could not be validated.") + } + + pub fn check_in_daemon( + &self, + working_dir: &Path, + remote_machine_id: &[&str], + ) -> eyre::Result<()> { + validate::check_dataflow(self, working_dir, Some(remote_machine_id)) + .wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fe5580967..fdf883130 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,7 +1,7 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, get_python_path, }; @@ -12,18 +12,38 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { +pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path, remote_daemon_id: Option<&[&str]>) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; // check that nodes and operators exist for node in &nodes { match &node.kind { - descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() { + descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { SHELL_SOURCE => (), source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. + } else if let Some(remote_daemon_id) = remote_daemon_id { + if remote_daemon_id.contains(&node.deploy.machine.as_str()) { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`)", + node.id + ); + } + info!("skipping path check for remote node `{}`", node.id); + } else { + resolve_path(source, working_dir).wrap_err_with(|| { + format!("Could not find source path `{}`", source) + })?; + } } else { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; From ef0455678736f53ca8743fd204205963082daea5 Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:52:08 +0800 Subject: [PATCH 02/13] fix fmt --- .vscode/settings.json | 3 +++ libraries/core/src/descriptor/mod.rs | 3 ++- libraries/core/src/descriptor/validate.rs | 6 +++++- 3 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..415b3c10e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "cmake.sourceDirectory": "/home/xuchang/github/dora/examples/cmake-dataflow" +} \ No newline at end of file diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 59fafae99..04276244c 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,8 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, None).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None) + .wrap_err("Dataflow could not be validated.") } pub fn check_in_daemon( diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fdf883130..f72add968 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -12,7 +12,11 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path, remote_daemon_id: Option<&[&str]>) -> eyre::Result<()> { +pub fn check_dataflow( + dataflow: &Descriptor, + working_dir: &Path, + remote_daemon_id: Option<&[&str]>, +) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; From 3567796ecea42576c687af17e673b996043cc29d Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:52:42 +0800 Subject: [PATCH 03/13] del vscode --- .vscode/settings.json | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 415b3c10e..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "cmake.sourceDirectory": "/home/xuchang/github/dora/examples/cmake-dataflow" -} \ No newline at end of file From cd4fbe9403960f171c1b30edaca89584f31ea74a Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 14:22:30 +0800 Subject: [PATCH 04/13] add: default working dir to remote deamon --- binaries/coordinator/src/lib.rs | 12 ++++++++ binaries/coordinator/src/run/mod.rs | 41 +++++++++++++++++----------- binaries/daemon/src/spawn.rs | 14 ++++++---- libraries/core/src/descriptor/mod.rs | 18 +++++++----- 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5032d6a5f..467ecae61 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -137,6 +137,7 @@ async fn start_inner( let mut dataflow_results: HashMap>> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); + let mut daemon_working_dirs: HashMap = HashMap::new(); while let Some(event) = events.next().await { if event.log() { @@ -207,6 +208,17 @@ async fn start_inner( "closing previous connection `{machine_id}` on new register" ); } + if ip.is_loopback() { + daemon_working_dirs.insert( + machine_id.clone(), + PathBuf::from("/tmp/"), //TODO: Register Daemon working directory + ); + } else { + daemon_working_dirs.insert( + machine_id.clone(), + PathBuf::from("/tmp/"), //TODO: Register Daemon working directory + ); + } } (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 889340cea..08f233629 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,7 +1,4 @@ -use crate::{ - tcp_utils::{tcp_receive, tcp_send}, - DaemonConnection, -}; +use crate::{tcp_utils::{tcp_receive, tcp_send}, DaemonConnection}; use dora_core::{ daemon_messages::{ @@ -16,6 +13,7 @@ use std::{ path::PathBuf, }; use uuid::{NoContext, Timestamp, Uuid}; +const DEFAULT_WORKING_DIR: &str = "/tmp"; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( @@ -50,19 +48,21 @@ pub(super) async fn spawn_dataflow( }) .collect::, _>>()?; - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir, - nodes: nodes.clone(), - machine_listen_ports, - dataflow_descriptor: dataflow, - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; - + for machine in &machines { + let working_dir = find_working_dir(daemon_connections, machine, working_dir.clone()); + + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir, + nodes: nodes.clone(), + machine_listen_ports: machine_listen_ports.clone(), + dataflow_descriptor: dataflow.clone(), + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); spawn_dataflow_on_machine(daemon_connections, machine, &message) .await @@ -78,6 +78,15 @@ pub(super) async fn spawn_dataflow( }) } +fn find_working_dir(daemon_connections: &mut HashMap, machine: &str, working_dir: PathBuf) -> PathBuf { + if daemon_connections.get_mut(machine).unwrap().listen_socket.ip().is_loopback() { + working_dir + } else { + PathBuf::from(DEFAULT_WORKING_DIR) + } + +} + async fn spawn_dataflow_on_machine( daemon_connections: &mut HashMap, machine: &str, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 90751c5bf..98505a9ee 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,8 +8,7 @@ use dora_core::{ config::{DataId, NodeRunConfig}, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, - ResolvedNode, SHELL_SOURCE, + resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE }, get_python_path, message::uhlc::HLC, @@ -88,9 +87,14 @@ pub async fn spawn_node( .wrap_err("failed to download custom node")?; target_path.clone() } else { - resolve_path(source, working_dir).wrap_err_with(|| { - format!("failed to resolve node source `{}`", source) - })? + let path = source_to_path(source); + if path.is_absolute(){ + path + } else{ + resolve_path(source, working_dir).wrap_err_with(|| { + format!("failed to resolve node source `{}`", source) + })? + } }; // If extension is .py, use python to run the script diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 04276244c..a9872e7ef 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -9,7 +9,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, env::consts::EXE_EXTENSION, fmt, - path::{Path, PathBuf}, + path::{self, Path, PathBuf}, }; use tracing::warn; pub use visualize::collect_dora_timers; @@ -427,12 +427,7 @@ pub fn source_is_url(source: &str) -> bool { } pub fn resolve_path(source: &str, working_dir: &Path) -> Result { - let path = Path::new(&source); - let path = if path.extension().is_none() { - path.with_extension(EXE_EXTENSION) - } else { - path.to_owned() - }; + let path = source_to_path(source); // Search path within current working directory if let Ok(abs_path) = working_dir.join(&path).canonicalize() { @@ -444,6 +439,15 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result { bail!("Could not find source path {}", path.display()) } } +pub fn source_to_path(source: &str) -> PathBuf { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; + path +} #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields)] From 8a67bfd773b17efd2ce02457f8642b1e21711eaa Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 15:09:37 +0800 Subject: [PATCH 05/13] fix: skip path check in all multiple machine --- binaries/coordinator/src/run/mod.rs | 12 +------- libraries/core/src/descriptor/mod.rs | 13 ++------- libraries/core/src/descriptor/validate.rs | 34 ++++++++--------------- 3 files changed, 15 insertions(+), 44 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 08f233629..617025499 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -22,17 +22,7 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - let remote_machine_id: Vec<_> = daemon_connections - .iter() - .filter_map(|(id, c)| { - if !c.listen_socket.ip().is_loopback() { - Some(id.as_str()) - } else { - None - } - }) - .collect(); - dataflow.check_in_daemon(&working_dir, &remote_machine_id)?; + dataflow.check(&working_dir)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index a9872e7ef..80671025d 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -9,7 +9,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, env::consts::EXE_EXTENSION, fmt, - path::{self, Path, PathBuf}, + path::{Path, PathBuf}, }; use tracing::warn; pub use visualize::collect_dora_timers; @@ -133,18 +133,11 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, None) - .wrap_err("Dataflow could not be validated.") - } - pub fn check_in_daemon( - &self, - working_dir: &Path, - remote_machine_id: &[&str], - ) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, Some(remote_machine_id)) + validate::check_dataflow(self, working_dir) .wrap_err("Dataflow could not be validated.") } + } #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index f72add968..77827a787 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,7 +1,7 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, get_python_path, }; @@ -9,13 +9,12 @@ use eyre::{bail, eyre, Context}; use std::{path::Path, process::Command}; use tracing::info; -use super::{resolve_path, Descriptor, SHELL_SOURCE}; +use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow( dataflow: &Descriptor, working_dir: &Path, - remote_daemon_id: Option<&[&str]>, ) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; @@ -28,30 +27,19 @@ pub fn check_dataflow( source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if let Some(remote_daemon_id) = remote_daemon_id { - if remote_daemon_id.contains(&node.deploy.machine.as_str()) { - let path = Path::new(&source); - let path = if path.extension().is_none() { - path.with_extension(EXE_EXTENSION) - } else { - path.to_owned() - }; - if path.is_relative() { - eyre::bail!( - "paths of remote nodes must be absolute (node `{}`)", - node.id - ); - } - info!("skipping path check for remote node `{}`", node.id); - } else { + } else if node.deploy.machine.is_empty() { resolve_path(source, working_dir).wrap_err_with(|| { format!("Could not find source path `{}`", source) })?; + + } else { + let path = source_to_path(source); + if path.is_relative() { + eyre::bail!("paths of remote nodes must be absolute (node `{}`)", node.id); + } + info!("skipping path check for remote node `{}`", node.id); + } - } else { - resolve_path(source, working_dir) - .wrap_err_with(|| format!("Could not find source path `{}`", source))?; - }; } }, descriptor::CoreNodeKind::Runtime(node) => { From 513ac1bdd98d225d46aa488d60adcf1738f3b251 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 16:42:19 +0800 Subject: [PATCH 06/13] add: skip check operator in multiple-daemons --- binaries/coordinator/src/run/mod.rs | 10 +----- libraries/core/src/descriptor/validate.rs | 39 +++++++++++++++++++---- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 617025499..948265597 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -40,7 +40,7 @@ pub(super) async fn spawn_dataflow( for machine in &machines { - let working_dir = find_working_dir(daemon_connections, machine, working_dir.clone()); + let working_dir = PathBuf::from(DEFAULT_WORKING_DIR); let spawn_command = SpawnDataflowNodes { dataflow_id: uuid, @@ -68,14 +68,6 @@ pub(super) async fn spawn_dataflow( }) } -fn find_working_dir(daemon_connections: &mut HashMap, machine: &str, working_dir: PathBuf) -> PathBuf { - if daemon_connections.get_mut(machine).unwrap().listen_socket.ip().is_loopback() { - working_dir - } else { - PathBuf::from(DEFAULT_WORKING_DIR) - } - -} async fn spawn_dataflow_on_machine( daemon_connections: &mut HashMap, diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 77827a787..6e0d33592 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -8,6 +8,7 @@ use crate::{ use eyre::{bail, eyre, Context}; use std::{path::Path, process::Command}; use tracing::info; +use std::collections::HashSet; use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -18,6 +19,7 @@ pub fn check_dataflow( ) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; + let is_multiple = nodes.iter().map(|n| &n.deploy.machine).collect::>().len() > 1; // check that nodes and operators exist for node in &nodes { @@ -27,7 +29,7 @@ pub fn check_dataflow( source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if node.deploy.machine.is_empty() { + } else if !is_multiple { resolve_path(source, working_dir).wrap_err_with(|| { format!("Could not find source path `{}`", source) })?; @@ -48,11 +50,18 @@ pub fn check_dataflow( OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else { + } else if !is_multiple{ let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } + }else { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!("paths of operator must be absolute (operator `{}`)", operator_definition.id); + } + info!("skipping path check for remote operator `{}`", operator_definition.id); + } } OperatorSource::Python(python_source) => { @@ -60,15 +69,33 @@ pub fn check_dataflow( let path = &python_source.source; if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !working_dir.join(path).exists() { - bail!("no Python library at `{path}`"); + } else if !is_multiple { + if !working_dir.join(path).exists() { + bail!("no Python library at `{path}`"); + } + } else { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!("paths of python operator must be absolute (operator `{}`)", operator_definition.id); + } + info!("skipping path check for remote python operator `{}`", operator_definition.id); + } } OperatorSource::Wasm(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !working_dir.join(path).exists() { - bail!("no WASM library at `{path}`"); + } else if !is_multiple { + if !working_dir.join(path).exists() { + bail!("no WASM library at `{path}`"); + } + } else { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!("paths of Wasm operator must be absolute (operator `{}`)", operator_definition.id); + } + info!("skipping path check for remote Wasm operator `{}`", operator_definition.id); + } } } From 9dc07b1e4997948d88dfc47ce3c249904483c4e7 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 20:17:34 +0800 Subject: [PATCH 07/13] fix: local daemon deploy --- binaries/coordinator/src/lib.rs | 12 ---- binaries/coordinator/src/run/mod.rs | 36 ++++++----- binaries/daemon/src/spawn.rs | 7 ++- examples/multiple-daemons/dataflow.yml | 6 +- libraries/core/src/descriptor/mod.rs | 10 +--- libraries/core/src/descriptor/validate.rs | 73 ++++++++++++++--------- 6 files changed, 75 insertions(+), 69 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 467ecae61..5032d6a5f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -137,7 +137,6 @@ async fn start_inner( let mut dataflow_results: HashMap>> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); - let mut daemon_working_dirs: HashMap = HashMap::new(); while let Some(event) = events.next().await { if event.log() { @@ -208,17 +207,6 @@ async fn start_inner( "closing previous connection `{machine_id}` on new register" ); } - if ip.is_loopback() { - daemon_working_dirs.insert( - machine_id.clone(), - PathBuf::from("/tmp/"), //TODO: Register Daemon working directory - ); - } else { - daemon_working_dirs.insert( - machine_id.clone(), - PathBuf::from("/tmp/"), //TODO: Register Daemon working directory - ); - } } (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 948265597..a2e06a792 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,4 +1,7 @@ -use crate::{tcp_utils::{tcp_receive, tcp_send}, DaemonConnection}; +use crate::{ + tcp_utils::{tcp_receive, tcp_send}, + DaemonConnection, +}; use dora_core::{ daemon_messages::{ @@ -37,22 +40,24 @@ pub(super) async fn spawn_dataflow( .map(|c| (m.clone(), c.listen_socket)) }) .collect::, _>>()?; + let working_dir = if machines.len() > 1 { + PathBuf::from(DEFAULT_WORKING_DIR) + } else { + working_dir + }; + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir, + nodes: nodes.clone(), + machine_listen_ports, + dataflow_descriptor: dataflow, + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; - for machine in &machines { - let working_dir = PathBuf::from(DEFAULT_WORKING_DIR); - - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir, - nodes: nodes.clone(), - machine_listen_ports: machine_listen_ports.clone(), - dataflow_descriptor: dataflow.clone(), - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); spawn_dataflow_on_machine(daemon_connections, machine, &message) .await @@ -68,7 +73,6 @@ pub(super) async fn spawn_dataflow( }) } - async fn spawn_dataflow_on_machine( daemon_connections: &mut HashMap, machine: &str, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 98505a9ee..f2390a207 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,7 +8,8 @@ use dora_core::{ config::{DataId, NodeRunConfig}, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE + resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, + OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE, }, get_python_path, message::uhlc::HLC, @@ -88,9 +89,9 @@ pub async fn spawn_node( target_path.clone() } else { let path = source_to_path(source); - if path.is_absolute(){ + if path.is_absolute() { path - } else{ + } else { resolve_path(source, working_dir).wrap_err_with(|| { format!("failed to resolve node source `{}`", source) })? diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index 363b98bba..22b69dc7a 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -4,7 +4,7 @@ nodes: machine: A custom: build: cargo build -p multiple-daemons-example-node - source: ../../target/debug/multiple-daemons-example-node + source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node inputs: tick: dora/timer/millis/10 outputs: @@ -15,7 +15,7 @@ nodes: operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator - shared-library: ../../target/debug/multiple_daemons_example_operator + shared-library: /home/runner/work/dora/dora/target/debug/multiple_daemons_example_operator inputs: tick: dora/timer/millis/100 random: rust-node/random @@ -26,6 +26,6 @@ nodes: machine: B custom: build: cargo build -p multiple-daemons-example-sink - source: ../../target/debug/multiple-daemons-example-sink + source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 80671025d..db4eeed0a 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,11 +133,8 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - - validate::check_dataflow(self, working_dir) - .wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") } - } #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] @@ -434,12 +431,11 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result { } pub fn source_to_path(source: &str) -> PathBuf { let path = Path::new(&source); - let path = if path.extension().is_none() { + if path.extension().is_none() { path.with_extension(EXE_EXTENSION) } else { path.to_owned() - }; - path + } } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 6e0d33592..fd61cc5ff 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -6,20 +6,22 @@ use crate::{ }; use eyre::{bail, eyre, Context}; +use std::collections::HashSet; use std::{path::Path, process::Command}; use tracing::info; -use std::collections::HashSet; use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow( - dataflow: &Descriptor, - working_dir: &Path, -) -> eyre::Result<()> { +pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; - let is_multiple = nodes.iter().map(|n| &n.deploy.machine).collect::>().len() > 1; + let is_multiple = nodes + .iter() + .map(|n| &n.deploy.machine) + .collect::>() + .len() + > 1; // check that nodes and operators exist for node in &nodes { @@ -30,18 +32,18 @@ pub fn check_dataflow( if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. } else if !is_multiple { - resolve_path(source, working_dir).wrap_err_with(|| { - format!("Could not find source path `{}`", source) - })?; - - } else { - let path = source_to_path(source); - if path.is_relative() { - eyre::bail!("paths of remote nodes must be absolute (node `{}`)", node.id); - } - info!("skipping path check for remote node `{}`", node.id); - + resolve_path(source, working_dir) + .wrap_err_with(|| format!("Could not find source path `{}`", source))?; + } else { + let path = source_to_path(source); + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`)", + node.id + ); } + info!("skipping path check for remote node `{}`", node.id); + } } }, descriptor::CoreNodeKind::Runtime(node) => { @@ -50,18 +52,23 @@ pub fn check_dataflow( OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple{ + } else if !is_multiple { let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } - }else { + } else { let path = source_to_path(path); if path.is_relative() { - eyre::bail!("paths of operator must be absolute (operator `{}`)", operator_definition.id); + eyre::bail!( + "paths of operator must be absolute (operator `{}`)", + operator_definition.id + ); } - info!("skipping path check for remote operator `{}`", operator_definition.id); - + info!( + "skipping path check for remote operator `{}`", + operator_definition.id + ); } } OperatorSource::Python(python_source) => { @@ -76,10 +83,15 @@ pub fn check_dataflow( } else { let path = source_to_path(path); if path.is_relative() { - eyre::bail!("paths of python operator must be absolute (operator `{}`)", operator_definition.id); + eyre::bail!( + "paths of python operator must be absolute (operator `{}`)", + operator_definition.id + ); } - info!("skipping path check for remote python operator `{}`", operator_definition.id); - + info!( + "skipping path check for remote python operator `{}`", + operator_definition.id + ); } } OperatorSource::Wasm(path) => { @@ -92,10 +104,15 @@ pub fn check_dataflow( } else { let path = source_to_path(path); if path.is_relative() { - eyre::bail!("paths of Wasm operator must be absolute (operator `{}`)", operator_definition.id); + eyre::bail!( + "paths of Wasm operator must be absolute (operator `{}`)", + operator_definition.id + ); } - info!("skipping path check for remote Wasm operator `{}`", operator_definition.id); - + info!( + "skipping path check for remote Wasm operator `{}`", + operator_definition.id + ); } } } From 21576c77b5845a48486e62e2a547fd85b5c6ffd1 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Thu, 13 Jun 2024 17:03:37 +0800 Subject: [PATCH 08/13] fix remote working_dir platform-cross --- Cargo.lock | 1 + binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/run/mod.rs | 6 ++++-- examples/multiple-daemons/dataflow.yml | 3 +++ libraries/core/src/descriptor/validate.rs | 12 ++++++------ 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa1044954..96402de7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2269,6 +2269,7 @@ name = "dora-coordinator" version = "0.3.4" dependencies = [ "ctrlc", + "dirs 5.0.1", "dora-core", "dora-tracing", "eyre", diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 4c6b68877..6319fea4a 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -25,3 +25,4 @@ futures-concurrency = "7.1.0" serde_json = "1.0.86" names = "0.14.0" ctrlc = "3.2.5" +dirs = "5.0.1" diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index a2e06a792..10c035af5 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -16,7 +16,7 @@ use std::{ path::PathBuf, }; use uuid::{NoContext, Timestamp, Uuid}; -const DEFAULT_WORKING_DIR: &str = "/tmp"; + #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( @@ -41,7 +41,9 @@ pub(super) async fn spawn_dataflow( }) .collect::, _>>()?; let working_dir = if machines.len() > 1 { - PathBuf::from(DEFAULT_WORKING_DIR) + dirs::home_dir() + .ok_or_else(|| eyre!("could not create working directory for multiple-daemons dataflow!")) + .map(|home| home)? } else { working_dir }; diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index 22b69dc7a..c0d0eab1d 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -4,6 +4,7 @@ nodes: machine: A custom: build: cargo build -p multiple-daemons-example-node + # The path is for CI, replace it with your source absolute path here source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node inputs: tick: dora/timer/millis/10 @@ -15,6 +16,7 @@ nodes: operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator + # The path is for CI, replace it with your source absolute path here shared-library: /home/runner/work/dora/dora/target/debug/multiple_daemons_example_operator inputs: tick: dora/timer/millis/100 @@ -26,6 +28,7 @@ nodes: machine: B custom: build: cargo build -p multiple-daemons-example-sink + # The path is for CI, replace it with your source absolute path here source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fd61cc5ff..8c57c73d5 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -16,12 +16,12 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; - let is_multiple = nodes + let is_local = nodes .iter() .map(|n| &n.deploy.machine) .collect::>() .len() - > 1; + <= 1; // check that nodes and operators exist for node in &nodes { @@ -31,7 +31,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; } else { @@ -52,7 +52,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); @@ -76,7 +76,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result let path = &python_source.source; if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { if !working_dir.join(path).exists() { bail!("no Python library at `{path}`"); } @@ -97,7 +97,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result OperatorSource::Wasm(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { if !working_dir.join(path).exists() { bail!("no WASM library at `{path}`"); } From d40a34d8a815b757c80ea5019826137e0f9017f4 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 24 Jun 2024 15:09:44 +0800 Subject: [PATCH 09/13] change the check rules and configure working-dir --- Cargo.lock | 1 + binaries/coordinator/src/run/mod.rs | 11 +- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/lib.rs | 13 ++- examples/multiple-daemons/dataflow.yml | 14 ++- libraries/core/src/descriptor/mod.rs | 10 +- libraries/core/src/descriptor/validate.rs | 117 ++++++++++++---------- 7 files changed, 98 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 44702fd18..4b7737d59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2322,6 +2322,7 @@ dependencies = [ "async-trait", "bincode", "ctrlc", + "dirs 5.0.1", "dora-arrow-convert", "dora-core", "dora-download", diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 6efdcef27..3069775e8 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -24,7 +24,7 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - dataflow.check(&working_dir)?; + //dataflow.check(&working_dir)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); @@ -39,15 +39,6 @@ pub(super) async fn spawn_dataflow( .map(|c| (m.clone(), c.listen_socket)) }) .collect::, _>>()?; - let working_dir = if machines.len() > 1 { - dirs::home_dir() - .ok_or_else(|| { - eyre!("could not create working directory for multiple-daemons dataflow!") - }) - .map(|home| home)? - } else { - working_dir - }; let spawn_command = SpawnDataflowNodes { dataflow_id: uuid, working_dir, diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index a53607f9e..c78450c8f 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -39,3 +39,4 @@ aligned-vec = "0.5.0" ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" +dirs = "5.0.1" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 521b5bd89..4d895644d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -572,7 +572,7 @@ impl Daemon { bail!("there is already a running dataflow with ID `{dataflow_id}`") } }; - + let mut working_dir = working_dir; for node in nodes { let local = node.deploy.machine == self.machine_id; @@ -614,6 +614,17 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); + match &node.deploy.working_dir { + Some(local_working_dir) => { + working_dir = PathBuf::from(local_working_dir); + } + None => { + if !node.deploy.local { + working_dir = dirs::home_dir().wrap_err("failed to get home dir and change working dir")?; + } + tracing::debug!("As you don't specify working_dir in remote machine, change the home dir as working dir: {working_dir:?}"); + } + } match spawn::spawn_node( dataflow_id, &working_dir, diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index c0d0eab1d..e23ac86a9 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -2,22 +2,25 @@ nodes: - id: rust-node _unstable_deploy: machine: A + local: true custom: build: cargo build -p multiple-daemons-example-node # The path is for CI, replace it with your source absolute path here - source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node + source: ../../target/debug/multiple-daemons-example-node inputs: tick: dora/timer/millis/10 outputs: - random - id: runtime-node _unstable_deploy: - machine: A + machine: B + local: false + working_dir: /home/miyamo/dora/examples/multiple-daemons operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator # The path is for CI, replace it with your source absolute path here - shared-library: /home/runner/work/dora/dora/target/debug/multiple_daemons_example_operator + shared-library: ../../target/debug/multiple_daemons_example_operator inputs: tick: dora/timer/millis/100 random: rust-node/random @@ -25,10 +28,11 @@ nodes: - status - id: rust-sink _unstable_deploy: - machine: B + machine: A + local: true custom: build: cargo build -p multiple-daemons-example-sink # The path is for CI, replace it with your source absolute path here - source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-sink + source: ../../target/debug/multiple-daemons-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 501f8805e..ea77d4b5b 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -142,6 +142,8 @@ impl Descriptor { #[serde(deny_unknown_fields)] pub struct Deploy { pub machine: Option, + pub local: Option, + pub working_dir: Option, } /// Dora Node @@ -299,6 +301,8 @@ impl ResolvedNode { #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ResolvedDeploy { pub machine: String, + pub local: bool, + pub working_dir: Option, } impl ResolvedDeploy { fn new(deploy: Deploy, descriptor: &Descriptor) -> Self { @@ -307,7 +311,11 @@ impl ResolvedDeploy { Some(m) => m, None => default_machine.to_owned(), }; - Self { machine } + Self { + machine, + local: deploy.local.unwrap_or(true), + working_dir: deploy.working_dir.clone(), + } } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index e7939b528..ffa37f0d6 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -6,7 +6,6 @@ use crate::{ }; use eyre::{bail, eyre, Context}; -use std::collections::HashSet; use std::{path::Path, process::Command}; use tracing::info; @@ -17,12 +16,6 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; - let is_local = nodes - .iter() - .map(|n| &n.deploy.machine) - .collect::>() - .len() - <= 1; // check that nodes and operators exist for node in &nodes { @@ -33,93 +26,113 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if is_local { + } else if node.deploy.local { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; } else { - let path = source_to_path(source); - if path.is_relative() { - eyre::bail!( - "paths of remote nodes must be absolute (node `{}`)", - node.id - ); + match &node.deploy.working_dir { + Some(_working_dir) => {}, + None => { + let path = source_to_path(source); + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`) unless you specify a working_dir for remote machine", + node.id + ); + } + info!("skipping path check for remote node `{}`", node.id); + } } - info!("skipping path check for remote node `{}`", node.id); } } }, - descriptor::CoreNodeKind::Runtime(node) => { - for operator_definition in &node.operators { + descriptor::CoreNodeKind::Runtime(runtime) => { + for operator_definition in &runtime.operators { match &operator_definition.config.source { OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if is_local { + } else if node.deploy.local { let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } } else { - let path = source_to_path(path); - if path.is_relative() { - eyre::bail!( - "paths of operator must be absolute (operator `{}`)", - operator_definition.id - ); + match &node.deploy.working_dir { + Some(_working_dir) => {}, + None => { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!( + "paths of operator must be absolute (operator `{}`) unless you specify a working_dir for remote machine", + operator_definition.id + ); + } + info!( + "skipping path check for remote operator `{}`", + operator_definition.id + ); + } } - info!( - "skipping path check for remote operator `{}`", - operator_definition.id - ); } } OperatorSource::Python(python_source) => { - has_python_operator = true; let path = &python_source.source; if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if is_local { + } else if node.deploy.local { if !working_dir.join(path).exists() { bail!("no Python library at `{path}`"); } + has_python_operator = true; } else { - let path = source_to_path(path); - if path.is_relative() { - eyre::bail!( - "paths of python operator must be absolute (operator `{}`)", - operator_definition.id - ); + match &node.deploy.working_dir { + Some(_working_dir) => {}, + None => { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!( + "paths of python operator must be absolute (operator `{}`) unless you specify a working_dir for remote machine", + operator_definition.id + ); + } + info!( + "skipping path check for remote python operator `{}`", + operator_definition.id + ); + } } - info!( - "skipping path check for remote python operator `{}`", - operator_definition.id - ); } } OperatorSource::Wasm(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if is_local { + } else if node.deploy.local { if !working_dir.join(path).exists() { bail!("no WASM library at `{path}`"); } } else { - let path = source_to_path(path); - if path.is_relative() { - eyre::bail!( - "paths of Wasm operator must be absolute (operator `{}`)", - operator_definition.id - ); + match &node.deploy.working_dir { + Some(_working_dir) => {}, + None => { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!( + "paths of Wasm operator must be absolute (operator `{}`) unless you specify a working_dir for remote machine", + operator_definition.id + ); + } + info!( + "skipping path check for remote Wasm operator `{}`", + operator_definition.id + ); + } } - info!( - "skipping path check for remote Wasm operator `{}`", - operator_definition.id - ); } } } } - } + }, } } From 8d35617181ff996d1ab620b801a2535fdbefa2c4 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 24 Jun 2024 17:06:04 +0800 Subject: [PATCH 10/13] fix inter-daemon peer address --- binaries/daemon/src/lib.rs | 16 +++++++++++++--- libraries/core/src/descriptor/mod.rs | 2 +- libraries/core/src/descriptor/validate.rs | 10 +++++----- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 4d895644d..1035bfa32 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -348,7 +348,16 @@ impl Daemon { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} } - for (machine_id, socket) in machine_listen_ports { + for (machine_id, mut socket) in machine_listen_ports { + if socket.ip().is_loopback() { + if let Some(ref coordinator_socket) = self.coordinator_connection { + let new_ip = coordinator_socket + .peer_addr() + .wrap_err("failed to get peer address of coordinator")? + .ip(); + socket = SocketAddr::new(new_ip, socket.port()); + } + } match self.inter_daemon_connections.entry(machine_id) { std::collections::btree_map::Entry::Vacant(entry) => { entry.insert(InterDaemonConnection::new(socket)); @@ -620,11 +629,12 @@ impl Daemon { } None => { if !node.deploy.local { - working_dir = dirs::home_dir().wrap_err("failed to get home dir and change working dir")?; + working_dir = dirs::home_dir() + .wrap_err("failed to get home dir and change working dir")?; } tracing::debug!("As you don't specify working_dir in remote machine, change the home dir as working dir: {working_dir:?}"); + } } - } match spawn::spawn_node( dataflow_id, &working_dir, diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index ea77d4b5b..9dbbf5e9c 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -311,7 +311,7 @@ impl ResolvedDeploy { Some(m) => m, None => default_machine.to_owned(), }; - Self { + Self { machine, local: deploy.local.unwrap_or(true), working_dir: deploy.working_dir.clone(), diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index ffa37f0d6..a54a7910e 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -31,7 +31,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result .wrap_err_with(|| format!("Could not find source path `{}`", source))?; } else { match &node.deploy.working_dir { - Some(_working_dir) => {}, + Some(_working_dir) => {} None => { let path = source_to_path(source); if path.is_relative() { @@ -59,7 +59,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result } } else { match &node.deploy.working_dir { - Some(_working_dir) => {}, + Some(_working_dir) => {} None => { let path = source_to_path(path); if path.is_relative() { @@ -87,7 +87,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result has_python_operator = true; } else { match &node.deploy.working_dir { - Some(_working_dir) => {}, + Some(_working_dir) => {} None => { let path = source_to_path(path); if path.is_relative() { @@ -113,7 +113,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result } } else { match &node.deploy.working_dir { - Some(_working_dir) => {}, + Some(_working_dir) => {} None => { let path = source_to_path(path); if path.is_relative() { @@ -132,7 +132,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result } } } - }, + } } } From 78d316d73f899b4f0c33da8d092b69fd1f11cc46 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 24 Jun 2024 18:21:55 +0800 Subject: [PATCH 11/13] try ubuntu CI --- examples/multiple-daemons/dataflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index e23ac86a9..9d7e9aef6 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -15,7 +15,7 @@ nodes: _unstable_deploy: machine: B local: false - working_dir: /home/miyamo/dora/examples/multiple-daemons + working_dir: /Users/runner/work/dora/dora/examples/multiple-daemons operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator From ece7e41a54de80a7c9541687a8ee8276f363e98e Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Wed, 26 Jun 2024 11:49:04 +0800 Subject: [PATCH 12/13] fix: remote dora logs --- binaries/coordinator/src/lib.rs | 1 + binaries/daemon/src/lib.rs | 18 ++++++++++++++---- examples/multiple-daemons/dataflow.yml | 6 ++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3b0c64596..28b8c6825 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -618,6 +618,7 @@ fn dataflow_result( } } +#[derive(Debug)] struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1035bfa32..a67ac0882 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -573,10 +573,7 @@ impl Daemon { ) -> eyre::Result<()> { let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { - std::collections::hash_map::Entry::Vacant(entry) => { - self.working_dir.insert(dataflow_id, working_dir.clone()); - entry.insert(dataflow) - } + std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), std::collections::hash_map::Entry::Occupied(_) => { bail!("there is already a running dataflow with ID `{dataflow_id}`") } @@ -635,6 +632,19 @@ impl Daemon { tracing::debug!("As you don't specify working_dir in remote machine, change the home dir as working dir: {working_dir:?}"); } } + match self.working_dir.entry(dataflow_id) { + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(working_dir.clone()); + } + std::collections::hash_map::Entry::Occupied(entry) => { + tracing::info!( + "working_dir for dataflow {} in daemon {} already exists: {:?}", + dataflow_id, + node.deploy.machine, + entry.get() + ); + } + } match spawn::spawn_node( dataflow_id, &working_dir, diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index 9d7e9aef6..8a26dfbdf 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -5,7 +5,6 @@ nodes: local: true custom: build: cargo build -p multiple-daemons-example-node - # The path is for CI, replace it with your source absolute path here source: ../../target/debug/multiple-daemons-example-node inputs: tick: dora/timer/millis/10 @@ -15,11 +14,11 @@ nodes: _unstable_deploy: machine: B local: false - working_dir: /Users/runner/work/dora/dora/examples/multiple-daemons + # This path is for CI, replace it with your remote daemon working_dir here + working_dir: /home/runner/work/dora/dora/examples/multiple-daemons operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator - # The path is for CI, replace it with your source absolute path here shared-library: ../../target/debug/multiple_daemons_example_operator inputs: tick: dora/timer/millis/100 @@ -32,7 +31,6 @@ nodes: local: true custom: build: cargo build -p multiple-daemons-example-sink - # The path is for CI, replace it with your source absolute path here source: ../../target/debug/multiple-daemons-example-sink inputs: message: runtime-node/rust-operator/status From 96f4f7660ee099be9c83f4e0a744b94ea8f4f859 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Wed, 26 Jun 2024 13:57:03 +0800 Subject: [PATCH 13/13] don't check the dataflow in coordinator --- binaries/coordinator/src/run/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 3069775e8..71c3dd440 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -24,8 +24,6 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - //dataflow.check(&working_dir)?; - let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext));