From 52c9d37f59faede14adabe8dc82b346826e5c407 Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:46:00 +0800 Subject: [PATCH 01/13] refuse pass relative path to remote daemon --- binaries/coordinator/src/run/mod.rs | 12 ++++++++++- libraries/core/src/descriptor/mod.rs | 11 +++++++++- libraries/core/src/descriptor/validate.rs | 26 ++++++++++++++++++++--- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index f37613581..889340cea 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - dataflow.check(&working_dir)?; + let remote_machine_id: Vec<_> = daemon_connections + .iter() + .filter_map(|(id, c)| { + if !c.listen_socket.ip().is_loopback() { + Some(id.as_str()) + } else { + None + } + }) + .collect(); + dataflow.check_in_daemon(&working_dir, &remote_machine_id)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index fc836412a..59fafae99 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,16 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None).wrap_err("Dataflow could not be validated.") + } + + pub fn check_in_daemon( + &self, + working_dir: &Path, + remote_machine_id: &[&str], + ) -> eyre::Result<()> { + validate::check_dataflow(self, working_dir, Some(remote_machine_id)) + .wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fe5580967..fdf883130 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,7 +1,7 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, get_python_path, }; @@ -12,18 +12,38 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { +pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path, remote_daemon_id: Option<&[&str]>) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; // check that nodes and operators exist for node in &nodes { match &node.kind { - descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() { + descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { SHELL_SOURCE => (), source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. + } else if let Some(remote_daemon_id) = remote_daemon_id { + if remote_daemon_id.contains(&node.deploy.machine.as_str()) { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`)", + node.id + ); + } + info!("skipping path check for remote node `{}`", node.id); + } else { + resolve_path(source, working_dir).wrap_err_with(|| { + format!("Could not find source path `{}`", source) + })?; + } } else { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; From ef0455678736f53ca8743fd204205963082daea5 Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:52:08 +0800 Subject: [PATCH 02/13] fix fmt --- .vscode/settings.json | 3 +++ libraries/core/src/descriptor/mod.rs | 3 ++- libraries/core/src/descriptor/validate.rs | 6 +++++- 3 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..415b3c10e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "cmake.sourceDirectory": "/home/xuchang/github/dora/examples/cmake-dataflow" +} \ No newline at end of file diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 59fafae99..04276244c 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,8 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, None).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None) + .wrap_err("Dataflow could not be validated.") } pub fn check_in_daemon( diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fdf883130..f72add968 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -12,7 +12,11 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path, remote_daemon_id: Option<&[&str]>) -> eyre::Result<()> { +pub fn check_dataflow( + dataflow: &Descriptor, + working_dir: &Path, + remote_daemon_id: Option<&[&str]>, +) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; From 3567796ecea42576c687af17e673b996043cc29d Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 7 Jun 2024 00:52:42 +0800 Subject: [PATCH 03/13] del vscode --- .vscode/settings.json | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 415b3c10e..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "cmake.sourceDirectory": "/home/xuchang/github/dora/examples/cmake-dataflow" -} \ No newline at end of file From 135bb7c854091239c352d7fefba2bd57f174fd96 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Fri, 7 Jun 2024 11:57:18 +0800 Subject: [PATCH 04/13] add daemon_working_dirs manage and skip check on multiple-daemon completely --- binaries/cli/src/main.rs | 23 ++++++++++++---- binaries/coordinator/src/lib.rs | 13 +++++---- binaries/coordinator/src/listener.rs | 2 ++ binaries/coordinator/src/run/mod.rs | 32 ++++++++++++---------- binaries/daemon/src/coordinator.rs | 4 ++- binaries/daemon/src/lib.rs | 7 +++-- libraries/core/src/coordinator_messages.rs | 3 ++ libraries/core/src/daemon_messages.rs | 2 +- 8 files changed, 55 insertions(+), 31 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 8ac75485e..7b3a8926f 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -37,7 +37,6 @@ struct Args { #[clap(subcommand)] command: Command, } - /// dora-rs cli client #[derive(Debug, clap::Subcommand)] enum Command { @@ -120,6 +119,11 @@ enum Command { #[clap(long)] run_dataflow: Option, + + #[clap(long, default_value = ".")] + working_dir: PathBuf, + + }, /// Run runtime Runtime, @@ -192,7 +196,10 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; + let dataflow_descriptor = Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + if dataflow_descriptor.deploy.machine.is_none() { + dataflow_descriptor.check(&working_dir).wrap_err("could not validate yaml dataflow"); + } check::check_environment()? } None => check::check_environment()?, @@ -245,9 +252,12 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; + if dataflow_descriptor.deploy.machine.is_none() { // TODO: 根据 node 的 machine 的个数来判断 + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; + + } let mut session = connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( @@ -304,6 +314,7 @@ fn run() -> eyre::Result<()> { addr, machine_id, run_dataflow, + working_dir, } => { let rt = Builder::new_multi_thread() .enable_all() @@ -328,7 +339,7 @@ fn run() -> eyre::Result<()> { let localhost = Ipv4Addr::new(127, 0, 0, 1); (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() }); - Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await + Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr, working_dir).await } } }) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 0a30ac103..73dfe54c2 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -141,6 +141,7 @@ async fn start_inner( let mut dataflow_results: HashMap>> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); + let mut daemon_working_dirs: HashMap = HashMap::new(); while let Some(event) = events.next().await { if event.log() { @@ -172,6 +173,7 @@ async fn start_inner( mut connection, dora_version: daemon_version, listen_port, + working_dir, } => { let coordinator_version: &&str = &env!("CARGO_PKG_VERSION"); let version_check = if &daemon_version == coordinator_version { @@ -211,9 +213,10 @@ async fn start_inner( "closing previous connection `{machine_id}` on new register" ); } + daemon_working_dirs.insert(machine_id.clone(), working_dir.clone()); } (Err(err), _) => { - tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); + tracing::warn!("failed to register daemon connection and daemon working_dir for machine `{machine_id}`: {err}"); } (Ok(_), Err(err)) => { tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}"); @@ -320,7 +323,6 @@ async fn start_inner( local_working_dir, } => { let name = name.or_else(|| names::Generator::default().next()); - let inner = async { if let Some(name) = name.as_deref() { // check that name is unique @@ -333,10 +335,10 @@ async fn start_inner( } let dataflow = start_dataflow( dataflow, - local_working_dir, name, &mut daemon_connections, &clock, + &mut daemon_working_dirs, ) .await?; Ok(dataflow) @@ -854,16 +856,16 @@ async fn retrieve_logs( async fn start_dataflow( dataflow: Descriptor, - working_dir: PathBuf, name: Option, daemon_connections: &mut HashMap, clock: &HLC, + daemon_working_dirs: &mut HashMap, ) -> eyre::Result { let SpawnedDataflow { uuid, machines, nodes, - } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?; + } = spawn_dataflow(dataflow, daemon_connections, clock, daemon_working_dirs).await?; Ok(RunningDataflow { uuid, name, @@ -954,6 +956,7 @@ pub enum DaemonEvent { machine_id: String, connection: TcpStream, listen_port: u16, + working_dir: PathBuf, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 86600a4be..01f7d5ef1 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -53,12 +53,14 @@ pub async fn handle_connection( machine_id, dora_version, listen_port, + working_dir, } => { let event = DaemonEvent::Register { dora_version, machine_id, connection, listen_port, + working_dir, }; let _ = events_tx.send(Event::Daemon(event)).await; break; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 0d8301841..7e30de03b 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -20,11 +20,11 @@ use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( dataflow: Descriptor, - working_dir: PathBuf, daemon_connections: &mut HashMap, clock: &HLC, + daemon_working_dirs: &mut HashMap, ) -> eyre::Result { - dataflow.check(&working_dir)?; + //dataflow.check(&working_dir)?; let nodes = dataflow.resolve_aliases_and_set_defaults(); let uuid = Uuid::new_v7(Timestamp::now(NoContext)); @@ -39,20 +39,22 @@ pub(super) async fn spawn_dataflow( .map(|c| (m.clone(), c.listen_socket)) }) .collect::, _>>()?; - - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir, - nodes: nodes.clone(), - machine_listen_ports, - dataflow_descriptor: dataflow, - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; - + for machine in &machines { + let working_dir = daemon_working_dirs + .get_mut(machine) + .wrap_err_with(|| format!("no daemon working_dir for machine `{machine}`"))?; + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir: working_dir.clone(), + nodes: nodes.clone(), + machine_listen_ports: machine_listen_ports.clone(), + dataflow_descriptor: dataflow.to_owned(), + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); spawn_dataflow_on_machine(daemon_connections, machine, &message) .await diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index d2f86b3cc..8a13a1469 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -8,7 +8,7 @@ use dora_core::{ message::uhlc::HLC, }; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::SocketAddr}; +use std::{io::ErrorKind, net::SocketAddr, path::PathBuf}; use tokio::{ net::TcpStream, sync::{mpsc, oneshot}, @@ -26,6 +26,7 @@ pub async fn register( machine_id: String, listen_port: u16, clock: &HLC, + working_dir:PathBuf, ) -> eyre::Result>> { let mut stream = TcpStream::connect(addr) .await @@ -38,6 +39,7 @@ pub async fn register( dora_version: env!("CARGO_PKG_VERSION").to_owned(), machine_id, listen_port, + working_dir, }, timestamp: clock.new_timestamp(), })?; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index aa4e32794..50e1d01f2 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -82,6 +82,7 @@ impl Daemon { coordinator_addr: SocketAddr, machine_id: String, bind_addr: SocketAddr, + working_dir: PathBuf, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); @@ -90,7 +91,7 @@ impl Daemon { // spawn listen loop let (events_tx, events_rx) = flume::bounded(10); let listen_port = - inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; + inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; //这个我还不知道怎么处理 let daemon_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), timestamp: e.timestamp, @@ -98,7 +99,7 @@ impl Daemon { // connect to the coordinator let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock) + coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock, working_dir.clone()) .await .wrap_err("failed to connect to dora-coordinator")? .map( @@ -316,7 +317,7 @@ impl Daemon { working_dir, nodes, machine_listen_ports, - dataflow_descriptor, + dataflow_descriptor, }) => { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 38e9eae2e..a5ba6cb61 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use crate::daemon_messages::DataflowId; use eyre::eyre; @@ -7,6 +9,7 @@ pub enum CoordinatorRequest { dora_version: String, machine_id: String, listen_port: u16, + working_dir: PathBuf, }, Event { machine_id: String, diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 91c634cca..6fbd7c57f 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, fmt, net::SocketAddr, path::PathBuf, From cd4fbe9403960f171c1b30edaca89584f31ea74a Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 14:22:30 +0800 Subject: [PATCH 05/13] add: default working dir to remote deamon --- binaries/coordinator/src/lib.rs | 12 ++++++++ binaries/coordinator/src/run/mod.rs | 41 +++++++++++++++++----------- binaries/daemon/src/spawn.rs | 14 ++++++---- libraries/core/src/descriptor/mod.rs | 18 +++++++----- 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5032d6a5f..467ecae61 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -137,6 +137,7 @@ async fn start_inner( let mut dataflow_results: HashMap>> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); + let mut daemon_working_dirs: HashMap = HashMap::new(); while let Some(event) = events.next().await { if event.log() { @@ -207,6 +208,17 @@ async fn start_inner( "closing previous connection `{machine_id}` on new register" ); } + if ip.is_loopback() { + daemon_working_dirs.insert( + machine_id.clone(), + PathBuf::from("/tmp/"), //TODO: Register Daemon working directory + ); + } else { + daemon_working_dirs.insert( + machine_id.clone(), + PathBuf::from("/tmp/"), //TODO: Register Daemon working directory + ); + } } (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 889340cea..08f233629 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,7 +1,4 @@ -use crate::{ - tcp_utils::{tcp_receive, tcp_send}, - DaemonConnection, -}; +use crate::{tcp_utils::{tcp_receive, tcp_send}, DaemonConnection}; use dora_core::{ daemon_messages::{ @@ -16,6 +13,7 @@ use std::{ path::PathBuf, }; use uuid::{NoContext, Timestamp, Uuid}; +const DEFAULT_WORKING_DIR: &str = "/tmp"; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( @@ -50,19 +48,21 @@ pub(super) async fn spawn_dataflow( }) .collect::, _>>()?; - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir, - nodes: nodes.clone(), - machine_listen_ports, - dataflow_descriptor: dataflow, - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; - + for machine in &machines { + let working_dir = find_working_dir(daemon_connections, machine, working_dir.clone()); + + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir, + nodes: nodes.clone(), + machine_listen_ports: machine_listen_ports.clone(), + dataflow_descriptor: dataflow.clone(), + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); spawn_dataflow_on_machine(daemon_connections, machine, &message) .await @@ -78,6 +78,15 @@ pub(super) async fn spawn_dataflow( }) } +fn find_working_dir(daemon_connections: &mut HashMap, machine: &str, working_dir: PathBuf) -> PathBuf { + if daemon_connections.get_mut(machine).unwrap().listen_socket.ip().is_loopback() { + working_dir + } else { + PathBuf::from(DEFAULT_WORKING_DIR) + } + +} + async fn spawn_dataflow_on_machine( daemon_connections: &mut HashMap, machine: &str, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 90751c5bf..98505a9ee 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,8 +8,7 @@ use dora_core::{ config::{DataId, NodeRunConfig}, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, - ResolvedNode, SHELL_SOURCE, + resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE }, get_python_path, message::uhlc::HLC, @@ -88,9 +87,14 @@ pub async fn spawn_node( .wrap_err("failed to download custom node")?; target_path.clone() } else { - resolve_path(source, working_dir).wrap_err_with(|| { - format!("failed to resolve node source `{}`", source) - })? + let path = source_to_path(source); + if path.is_absolute(){ + path + } else{ + resolve_path(source, working_dir).wrap_err_with(|| { + format!("failed to resolve node source `{}`", source) + })? + } }; // If extension is .py, use python to run the script diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 04276244c..a9872e7ef 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -9,7 +9,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, env::consts::EXE_EXTENSION, fmt, - path::{Path, PathBuf}, + path::{self, Path, PathBuf}, }; use tracing::warn; pub use visualize::collect_dora_timers; @@ -427,12 +427,7 @@ pub fn source_is_url(source: &str) -> bool { } pub fn resolve_path(source: &str, working_dir: &Path) -> Result { - let path = Path::new(&source); - let path = if path.extension().is_none() { - path.with_extension(EXE_EXTENSION) - } else { - path.to_owned() - }; + let path = source_to_path(source); // Search path within current working directory if let Ok(abs_path) = working_dir.join(&path).canonicalize() { @@ -444,6 +439,15 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result { bail!("Could not find source path {}", path.display()) } } +pub fn source_to_path(source: &str) -> PathBuf { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; + path +} #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields)] From 8a67bfd773b17efd2ce02457f8642b1e21711eaa Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 15:09:37 +0800 Subject: [PATCH 06/13] fix: skip path check in all multiple machine --- binaries/coordinator/src/run/mod.rs | 12 +------- libraries/core/src/descriptor/mod.rs | 13 ++------- libraries/core/src/descriptor/validate.rs | 34 ++++++++--------------- 3 files changed, 15 insertions(+), 44 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 08f233629..617025499 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -22,17 +22,7 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - let remote_machine_id: Vec<_> = daemon_connections - .iter() - .filter_map(|(id, c)| { - if !c.listen_socket.ip().is_loopback() { - Some(id.as_str()) - } else { - None - } - }) - .collect(); - dataflow.check_in_daemon(&working_dir, &remote_machine_id)?; + dataflow.check(&working_dir)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index a9872e7ef..80671025d 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -9,7 +9,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, env::consts::EXE_EXTENSION, fmt, - path::{self, Path, PathBuf}, + path::{Path, PathBuf}, }; use tracing::warn; pub use visualize::collect_dora_timers; @@ -133,18 +133,11 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, None) - .wrap_err("Dataflow could not be validated.") - } - pub fn check_in_daemon( - &self, - working_dir: &Path, - remote_machine_id: &[&str], - ) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, Some(remote_machine_id)) + validate::check_dataflow(self, working_dir) .wrap_err("Dataflow could not be validated.") } + } #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index f72add968..77827a787 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,7 +1,7 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, get_python_path, }; @@ -9,13 +9,12 @@ use eyre::{bail, eyre, Context}; use std::{path::Path, process::Command}; use tracing::info; -use super::{resolve_path, Descriptor, SHELL_SOURCE}; +use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow( dataflow: &Descriptor, working_dir: &Path, - remote_daemon_id: Option<&[&str]>, ) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; @@ -28,30 +27,19 @@ pub fn check_dataflow( source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if let Some(remote_daemon_id) = remote_daemon_id { - if remote_daemon_id.contains(&node.deploy.machine.as_str()) { - let path = Path::new(&source); - let path = if path.extension().is_none() { - path.with_extension(EXE_EXTENSION) - } else { - path.to_owned() - }; - if path.is_relative() { - eyre::bail!( - "paths of remote nodes must be absolute (node `{}`)", - node.id - ); - } - info!("skipping path check for remote node `{}`", node.id); - } else { + } else if node.deploy.machine.is_empty() { resolve_path(source, working_dir).wrap_err_with(|| { format!("Could not find source path `{}`", source) })?; + + } else { + let path = source_to_path(source); + if path.is_relative() { + eyre::bail!("paths of remote nodes must be absolute (node `{}`)", node.id); + } + info!("skipping path check for remote node `{}`", node.id); + } - } else { - resolve_path(source, working_dir) - .wrap_err_with(|| format!("Could not find source path `{}`", source))?; - }; } }, descriptor::CoreNodeKind::Runtime(node) => { From 513ac1bdd98d225d46aa488d60adcf1738f3b251 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 16:42:19 +0800 Subject: [PATCH 07/13] add: skip check operator in multiple-daemons --- binaries/coordinator/src/run/mod.rs | 10 +----- libraries/core/src/descriptor/validate.rs | 39 +++++++++++++++++++---- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 617025499..948265597 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -40,7 +40,7 @@ pub(super) async fn spawn_dataflow( for machine in &machines { - let working_dir = find_working_dir(daemon_connections, machine, working_dir.clone()); + let working_dir = PathBuf::from(DEFAULT_WORKING_DIR); let spawn_command = SpawnDataflowNodes { dataflow_id: uuid, @@ -68,14 +68,6 @@ pub(super) async fn spawn_dataflow( }) } -fn find_working_dir(daemon_connections: &mut HashMap, machine: &str, working_dir: PathBuf) -> PathBuf { - if daemon_connections.get_mut(machine).unwrap().listen_socket.ip().is_loopback() { - working_dir - } else { - PathBuf::from(DEFAULT_WORKING_DIR) - } - -} async fn spawn_dataflow_on_machine( daemon_connections: &mut HashMap, diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 77827a787..6e0d33592 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -8,6 +8,7 @@ use crate::{ use eyre::{bail, eyre, Context}; use std::{path::Path, process::Command}; use tracing::info; +use std::collections::HashSet; use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -18,6 +19,7 @@ pub fn check_dataflow( ) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; + let is_multiple = nodes.iter().map(|n| &n.deploy.machine).collect::>().len() > 1; // check that nodes and operators exist for node in &nodes { @@ -27,7 +29,7 @@ pub fn check_dataflow( source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if node.deploy.machine.is_empty() { + } else if !is_multiple { resolve_path(source, working_dir).wrap_err_with(|| { format!("Could not find source path `{}`", source) })?; @@ -48,11 +50,18 @@ pub fn check_dataflow( OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else { + } else if !is_multiple{ let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } + }else { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!("paths of operator must be absolute (operator `{}`)", operator_definition.id); + } + info!("skipping path check for remote operator `{}`", operator_definition.id); + } } OperatorSource::Python(python_source) => { @@ -60,15 +69,33 @@ pub fn check_dataflow( let path = &python_source.source; if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !working_dir.join(path).exists() { - bail!("no Python library at `{path}`"); + } else if !is_multiple { + if !working_dir.join(path).exists() { + bail!("no Python library at `{path}`"); + } + } else { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!("paths of python operator must be absolute (operator `{}`)", operator_definition.id); + } + info!("skipping path check for remote python operator `{}`", operator_definition.id); + } } OperatorSource::Wasm(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !working_dir.join(path).exists() { - bail!("no WASM library at `{path}`"); + } else if !is_multiple { + if !working_dir.join(path).exists() { + bail!("no WASM library at `{path}`"); + } + } else { + let path = source_to_path(path); + if path.is_relative() { + eyre::bail!("paths of Wasm operator must be absolute (operator `{}`)", operator_definition.id); + } + info!("skipping path check for remote Wasm operator `{}`", operator_definition.id); + } } } From 9dc07b1e4997948d88dfc47ce3c249904483c4e7 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Mon, 10 Jun 2024 20:17:34 +0800 Subject: [PATCH 08/13] fix: local daemon deploy --- binaries/coordinator/src/lib.rs | 12 ---- binaries/coordinator/src/run/mod.rs | 36 ++++++----- binaries/daemon/src/spawn.rs | 7 ++- examples/multiple-daemons/dataflow.yml | 6 +- libraries/core/src/descriptor/mod.rs | 10 +--- libraries/core/src/descriptor/validate.rs | 73 ++++++++++++++--------- 6 files changed, 75 insertions(+), 69 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 467ecae61..5032d6a5f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -137,7 +137,6 @@ async fn start_inner( let mut dataflow_results: HashMap>> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); - let mut daemon_working_dirs: HashMap = HashMap::new(); while let Some(event) = events.next().await { if event.log() { @@ -208,17 +207,6 @@ async fn start_inner( "closing previous connection `{machine_id}` on new register" ); } - if ip.is_loopback() { - daemon_working_dirs.insert( - machine_id.clone(), - PathBuf::from("/tmp/"), //TODO: Register Daemon working directory - ); - } else { - daemon_working_dirs.insert( - machine_id.clone(), - PathBuf::from("/tmp/"), //TODO: Register Daemon working directory - ); - } } (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 948265597..a2e06a792 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,4 +1,7 @@ -use crate::{tcp_utils::{tcp_receive, tcp_send}, DaemonConnection}; +use crate::{ + tcp_utils::{tcp_receive, tcp_send}, + DaemonConnection, +}; use dora_core::{ daemon_messages::{ @@ -37,22 +40,24 @@ pub(super) async fn spawn_dataflow( .map(|c| (m.clone(), c.listen_socket)) }) .collect::, _>>()?; + let working_dir = if machines.len() > 1 { + PathBuf::from(DEFAULT_WORKING_DIR) + } else { + working_dir + }; + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir, + nodes: nodes.clone(), + machine_listen_ports, + dataflow_descriptor: dataflow, + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; - for machine in &machines { - let working_dir = PathBuf::from(DEFAULT_WORKING_DIR); - - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir, - nodes: nodes.clone(), - machine_listen_ports: machine_listen_ports.clone(), - dataflow_descriptor: dataflow.clone(), - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); spawn_dataflow_on_machine(daemon_connections, machine, &message) .await @@ -68,7 +73,6 @@ pub(super) async fn spawn_dataflow( }) } - async fn spawn_dataflow_on_machine( daemon_connections: &mut HashMap, machine: &str, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 98505a9ee..f2390a207 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,7 +8,8 @@ use dora_core::{ config::{DataId, NodeRunConfig}, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE + resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, + OperatorSource, PythonSource, ResolvedNode, SHELL_SOURCE, }, get_python_path, message::uhlc::HLC, @@ -88,9 +89,9 @@ pub async fn spawn_node( target_path.clone() } else { let path = source_to_path(source); - if path.is_absolute(){ + if path.is_absolute() { path - } else{ + } else { resolve_path(source, working_dir).wrap_err_with(|| { format!("failed to resolve node source `{}`", source) })? diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index 363b98bba..22b69dc7a 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -4,7 +4,7 @@ nodes: machine: A custom: build: cargo build -p multiple-daemons-example-node - source: ../../target/debug/multiple-daemons-example-node + source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node inputs: tick: dora/timer/millis/10 outputs: @@ -15,7 +15,7 @@ nodes: operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator - shared-library: ../../target/debug/multiple_daemons_example_operator + shared-library: /home/runner/work/dora/dora/target/debug/multiple_daemons_example_operator inputs: tick: dora/timer/millis/100 random: rust-node/random @@ -26,6 +26,6 @@ nodes: machine: B custom: build: cargo build -p multiple-daemons-example-sink - source: ../../target/debug/multiple-daemons-example-sink + source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 80671025d..db4eeed0a 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,11 +133,8 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - - validate::check_dataflow(self, working_dir) - .wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") } - } #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] @@ -434,12 +431,11 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result { } pub fn source_to_path(source: &str) -> PathBuf { let path = Path::new(&source); - let path = if path.extension().is_none() { + if path.extension().is_none() { path.with_extension(EXE_EXTENSION) } else { path.to_owned() - }; - path + } } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 6e0d33592..fd61cc5ff 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -6,20 +6,22 @@ use crate::{ }; use eyre::{bail, eyre, Context}; +use std::collections::HashSet; use std::{path::Path, process::Command}; use tracing::info; -use std::collections::HashSet; use super::{resolve_path, source_to_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow( - dataflow: &Descriptor, - working_dir: &Path, -) -> eyre::Result<()> { +pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; - let is_multiple = nodes.iter().map(|n| &n.deploy.machine).collect::>().len() > 1; + let is_multiple = nodes + .iter() + .map(|n| &n.deploy.machine) + .collect::>() + .len() + > 1; // check that nodes and operators exist for node in &nodes { @@ -30,18 +32,18 @@ pub fn check_dataflow( if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. } else if !is_multiple { - resolve_path(source, working_dir).wrap_err_with(|| { - format!("Could not find source path `{}`", source) - })?; - - } else { - let path = source_to_path(source); - if path.is_relative() { - eyre::bail!("paths of remote nodes must be absolute (node `{}`)", node.id); - } - info!("skipping path check for remote node `{}`", node.id); - + resolve_path(source, working_dir) + .wrap_err_with(|| format!("Could not find source path `{}`", source))?; + } else { + let path = source_to_path(source); + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`)", + node.id + ); } + info!("skipping path check for remote node `{}`", node.id); + } } }, descriptor::CoreNodeKind::Runtime(node) => { @@ -50,18 +52,23 @@ pub fn check_dataflow( OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple{ + } else if !is_multiple { let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } - }else { + } else { let path = source_to_path(path); if path.is_relative() { - eyre::bail!("paths of operator must be absolute (operator `{}`)", operator_definition.id); + eyre::bail!( + "paths of operator must be absolute (operator `{}`)", + operator_definition.id + ); } - info!("skipping path check for remote operator `{}`", operator_definition.id); - + info!( + "skipping path check for remote operator `{}`", + operator_definition.id + ); } } OperatorSource::Python(python_source) => { @@ -76,10 +83,15 @@ pub fn check_dataflow( } else { let path = source_to_path(path); if path.is_relative() { - eyre::bail!("paths of python operator must be absolute (operator `{}`)", operator_definition.id); + eyre::bail!( + "paths of python operator must be absolute (operator `{}`)", + operator_definition.id + ); } - info!("skipping path check for remote python operator `{}`", operator_definition.id); - + info!( + "skipping path check for remote python operator `{}`", + operator_definition.id + ); } } OperatorSource::Wasm(path) => { @@ -92,10 +104,15 @@ pub fn check_dataflow( } else { let path = source_to_path(path); if path.is_relative() { - eyre::bail!("paths of Wasm operator must be absolute (operator `{}`)", operator_definition.id); + eyre::bail!( + "paths of Wasm operator must be absolute (operator `{}`)", + operator_definition.id + ); } - info!("skipping path check for remote Wasm operator `{}`", operator_definition.id); - + info!( + "skipping path check for remote Wasm operator `{}`", + operator_definition.id + ); } } } From 213d9a041cd8c93ca05ff9f425eaff1236a696b1 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Tue, 11 Jun 2024 17:14:04 +0800 Subject: [PATCH 09/13] add: skip all the check --- binaries/cli/src/main.rs | 10 ++++-- binaries/coordinator/src/lib.rs | 4 ++- binaries/coordinator/src/run/mod.rs | 47 +++++++++++++++++++++-------- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 324a43415..d77b82f37 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -336,9 +336,9 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; + //dataflow_descriptor + // .check(&working_dir) + // .wrap_err("Could not validate yaml")?; let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; @@ -419,6 +419,10 @@ fn run() -> eyre::Result<()> { run_dataflow, working_dir, } => { + let working_dir = working_dir + .canonicalize() + .context("failed to canonicalize working dir path")? + .to_owned(); let rt = Builder::new_multi_thread() .enable_all() .build() diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 507ceea2a..a4d493a84 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -334,6 +334,7 @@ async fn start_inner( name, &mut daemon_connections, &clock, + local_working_dir, &mut daemon_working_dirs, ) .await?; @@ -855,13 +856,14 @@ async fn start_dataflow( name: Option, daemon_connections: &mut HashMap, clock: &HLC, + local_working_dir: PathBuf, daemon_working_dirs: &mut HashMap, ) -> eyre::Result { let SpawnedDataflow { uuid, machines, nodes, - } = spawn_dataflow(dataflow, daemon_connections, clock, daemon_working_dirs).await?; + } = spawn_dataflow(dataflow, daemon_connections, clock, local_working_dir, daemon_working_dirs).await?; Ok(RunningDataflow { uuid, name, diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index e9059fbb4..81cf7ce7d 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -16,13 +16,15 @@ use std::{ path::PathBuf, }; use uuid::{NoContext, Timestamp, Uuid}; +const DEFAULT_WORKING_DIR: &str = "/tmp"; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( dataflow: Descriptor, daemon_connections: &mut HashMap, clock: &HLC, - daemon_working_dirs: &mut HashMap, + working_dir: PathBuf, + daemon_working_dirs: &mut HashMap, ) -> eyre::Result { //dataflow.check(&working_dir)?; @@ -39,27 +41,46 @@ pub(super) async fn spawn_dataflow( .map(|c| (m.clone(), c.listen_socket)) }) .collect::, _>>()?; - - for machine in &machines { - let working_dir = daemon_working_dirs - .get_mut(machine) - .wrap_err_with(|| format!("no daemon working_dir for machine `{machine}`"))?; + if machines.len() > 1 { + for machine in &machines { + let working_dir = daemon_working_dirs.get(machine).ok_or_else(|| eyre!("no daemon working dir for machine `{machine}`"))?.to_owned(); + println!("Working dir: {:?}", working_dir); + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir: working_dir, + nodes: nodes.clone(), + machine_listen_ports: machine_listen_ports.clone(), + dataflow_descriptor: dataflow.clone(), + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; + spawn_dataflow_on_machine(daemon_connections, machine, &message) + .await + .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; + } + + } else { let spawn_command = SpawnDataflowNodes { dataflow_id: uuid, - working_dir: working_dir.clone(), + working_dir: working_dir, nodes: nodes.clone(), - machine_listen_ports: machine_listen_ports.clone(), - dataflow_descriptor: dataflow.to_owned(), + machine_listen_ports, + dataflow_descriptor: dataflow, }; let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::Spawn(spawn_command), timestamp: clock.new_timestamp(), })?; - tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); - spawn_dataflow_on_machine(daemon_connections, machine, &message) - .await - .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; + for machine in &machines { + tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); + spawn_dataflow_on_machine(daemon_connections, machine, &message) + .await + .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; + } } + tracing::info!("successfully spawned dataflow `{uuid}`"); From 21576c77b5845a48486e62e2a547fd85b5c6ffd1 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Thu, 13 Jun 2024 17:03:37 +0800 Subject: [PATCH 10/13] fix remote working_dir platform-cross --- Cargo.lock | 1 + binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/run/mod.rs | 6 ++++-- examples/multiple-daemons/dataflow.yml | 3 +++ libraries/core/src/descriptor/validate.rs | 12 ++++++------ 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa1044954..96402de7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2269,6 +2269,7 @@ name = "dora-coordinator" version = "0.3.4" dependencies = [ "ctrlc", + "dirs 5.0.1", "dora-core", "dora-tracing", "eyre", diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 4c6b68877..6319fea4a 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -25,3 +25,4 @@ futures-concurrency = "7.1.0" serde_json = "1.0.86" names = "0.14.0" ctrlc = "3.2.5" +dirs = "5.0.1" diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index a2e06a792..10c035af5 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -16,7 +16,7 @@ use std::{ path::PathBuf, }; use uuid::{NoContext, Timestamp, Uuid}; -const DEFAULT_WORKING_DIR: &str = "/tmp"; + #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( @@ -41,7 +41,9 @@ pub(super) async fn spawn_dataflow( }) .collect::, _>>()?; let working_dir = if machines.len() > 1 { - PathBuf::from(DEFAULT_WORKING_DIR) + dirs::home_dir() + .ok_or_else(|| eyre!("could not create working directory for multiple-daemons dataflow!")) + .map(|home| home)? } else { working_dir }; diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index 22b69dc7a..c0d0eab1d 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -4,6 +4,7 @@ nodes: machine: A custom: build: cargo build -p multiple-daemons-example-node + # The path is for CI, replace it with your source absolute path here source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node inputs: tick: dora/timer/millis/10 @@ -15,6 +16,7 @@ nodes: operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator + # The path is for CI, replace it with your source absolute path here shared-library: /home/runner/work/dora/dora/target/debug/multiple_daemons_example_operator inputs: tick: dora/timer/millis/100 @@ -26,6 +28,7 @@ nodes: machine: B custom: build: cargo build -p multiple-daemons-example-sink + # The path is for CI, replace it with your source absolute path here source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fd61cc5ff..8c57c73d5 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -16,12 +16,12 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; - let is_multiple = nodes + let is_local = nodes .iter() .map(|n| &n.deploy.machine) .collect::>() .len() - > 1; + <= 1; // check that nodes and operators exist for node in &nodes { @@ -31,7 +31,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; } else { @@ -52,7 +52,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); @@ -76,7 +76,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result let path = &python_source.source; if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { if !working_dir.join(path).exists() { bail!("no Python library at `{path}`"); } @@ -97,7 +97,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result OperatorSource::Wasm(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !is_multiple { + } else if is_local { if !working_dir.join(path).exists() { bail!("no WASM library at `{path}`"); } From 83f0b25c9da547f3b84d65c0714a236c7ba2fe05 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Fri, 14 Jun 2024 10:58:54 +0800 Subject: [PATCH 11/13] fix: skip all path exist check --- binaries/cli/src/main.rs | 14 +++---- binaries/coordinator/src/lib.rs | 9 +++- binaries/coordinator/src/run/mod.rs | 51 +++++------------------ binaries/daemon/src/coordinator.rs | 2 +- binaries/daemon/src/lib.rs | 33 ++++++++------- binaries/daemon/src/spawn.rs | 15 +++---- examples/multiple-daemons/dataflow.yml | 4 -- libraries/core/src/daemon_messages.rs | 2 +- libraries/core/src/descriptor/mod.rs | 15 +++---- libraries/core/src/descriptor/validate.rs | 38 +++-------------- 10 files changed, 62 insertions(+), 121 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 4de4ee3ad..e8af1ea07 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -191,8 +191,6 @@ enum Command { #[clap(long, default_value = ".")] working_dir: PathBuf, - - }, /// Run runtime Runtime, @@ -356,9 +354,9 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - //dataflow_descriptor - // .check(&working_dir) - // .wrap_err("Could not validate yaml")?; + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; @@ -445,9 +443,9 @@ fn run() -> eyre::Result<()> { working_dir, } => { let working_dir = working_dir - .canonicalize() - .context("failed to canonicalize working dir path")? - .to_owned(); + .canonicalize() + .context("failed to canonicalize working dir path")? + .to_owned(); let rt = Builder::new_multi_thread() .enable_all() .build() diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 0e54d5880..560395768 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -863,7 +863,14 @@ async fn start_dataflow( uuid, machines, nodes, - } = spawn_dataflow(dataflow, daemon_connections, clock, local_working_dir, daemon_working_dirs).await?; + } = spawn_dataflow( + dataflow, + daemon_connections, + clock, + local_working_dir, + daemon_working_dirs, + ) + .await?; Ok(RunningDataflow { uuid, name, diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 94a58a807..f52e73056 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -16,7 +16,6 @@ use std::{ path::PathBuf, }; use uuid::{NoContext, Timestamp, Uuid}; -const DEFAULT_WORKING_DIR: &str = "/tmp"; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( @@ -26,7 +25,7 @@ pub(super) async fn spawn_dataflow( working_dir: PathBuf, daemon_working_dirs: &mut HashMap, ) -> eyre::Result { - //dataflow.check(&working_dir)?; + dataflow.check(&working_dir)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); @@ -41,54 +40,26 @@ pub(super) async fn spawn_dataflow( .map(|c| (m.clone(), c.listen_socket)) }) .collect::, _>>()?; - if machines.len() > 1 { - for machine in &machines { - let working_dir = daemon_working_dirs.get(machine).ok_or_else(|| eyre!("no daemon working dir for machine `{machine}`"))?.to_owned(); - println!("Working dir: {:?}", working_dir); - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir: working_dir, - nodes: nodes.clone(), - machine_listen_ports: machine_listen_ports.clone(), - dataflow_descriptor: dataflow.clone(), - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; - spawn_dataflow_on_machine(daemon_connections, machine, &message) - .await - .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; - } - - } else { let working_dir = if machines.len() > 1 { - dirs::home_dir() - .ok_or_else(|| { - eyre!("could not create working directory for multiple-daemons dataflow!") - }) - .map(|home| home)? - } else { - working_dir - }; + for machine in &machines { + let working_dir = daemon_working_dirs + .get(machine) + .ok_or_else(|| eyre!("no daemon working dir for machine `{machine}`"))? + .to_owned(); let spawn_command = SpawnDataflowNodes { dataflow_id: uuid, working_dir: working_dir, nodes: nodes.clone(), - machine_listen_ports, - dataflow_descriptor: dataflow, + machine_listen_ports: machine_listen_ports.clone(), + dataflow_descriptor: dataflow.clone(), }; let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::Spawn(spawn_command), timestamp: clock.new_timestamp(), })?; - for machine in &machines { - tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); - spawn_dataflow_on_machine(daemon_connections, machine, &message) - .await - .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; - } + spawn_dataflow_on_machine(daemon_connections, machine, &message) + .await + .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; } - tracing::info!("successfully spawned dataflow `{uuid}`"); diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 8a13a1469..e9738482f 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -26,7 +26,7 @@ pub async fn register( machine_id: String, listen_port: u16, clock: &HLC, - working_dir:PathBuf, + working_dir: PathBuf, ) -> eyre::Result>> { let mut stream = TcpStream::connect(addr) .await diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index de8ca94fa..15c92aded 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -106,19 +106,24 @@ impl Daemon { }); // connect to the coordinator - let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock, working_dir.clone()) - .await - .wrap_err("failed to connect to dora-coordinator")? - .map( - |Timestamped { - inner: event, - timestamp, - }| Timestamped { - inner: Event::Coordinator(event), - timestamp, - }, - ); + let coordinator_events = coordinator::register( + coordinator_addr, + machine_id.clone(), + listen_port, + &clock, + working_dir.clone(), + ) + .await + .wrap_err("failed to connect to dora-coordinator")? + .map( + |Timestamped { + inner: event, + timestamp, + }| Timestamped { + inner: Event::Coordinator(event), + timestamp, + }, + ); // Spawn local listener loop let (events_tx, events_rx) = flume::bounded(10); @@ -344,7 +349,7 @@ impl Daemon { working_dir, nodes, machine_listen_ports, - dataflow_descriptor, + dataflow_descriptor, }) => { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 2a6907b53..55a673842 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,8 +8,8 @@ use dora_core::{ config::DataId, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, source_to_path, Descriptor, OperatorDefinition, - OperatorSource, PythonSource, ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE, + resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, + ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE, }, get_python_path, message::uhlc::HLC, @@ -103,14 +103,9 @@ pub async fn spawn_node( .wrap_err("failed to download custom node")?; target_path.clone() } else { - let path = source_to_path(source); - if path.is_absolute() { - path - } else { - resolve_path(source, working_dir).wrap_err_with(|| { - format!("failed to resolve node source `{}`", source) - })? - } + resolve_path(source, working_dir).wrap_err_with(|| { + format!("failed to resolve node source `{}`", source) + })? }; // If extension is .py, use python to run the script diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml index 5509b17f6..363b98bba 100644 --- a/examples/multiple-daemons/dataflow.yml +++ b/examples/multiple-daemons/dataflow.yml @@ -4,8 +4,6 @@ nodes: machine: A custom: build: cargo build -p multiple-daemons-example-node - # The path is for CI, replace it with your source absolute path here - #source: /home/runner/work/dora/dora/target/debug/multiple-daemons-example-node source: ../../target/debug/multiple-daemons-example-node inputs: tick: dora/timer/millis/10 @@ -17,7 +15,6 @@ nodes: operators: - id: rust-operator build: cargo build -p multiple-daemons-example-operator - # The path is for CI, replace it with your source absolute path here shared-library: ../../target/debug/multiple_daemons_example_operator inputs: tick: dora/timer/millis/100 @@ -29,7 +26,6 @@ nodes: machine: B custom: build: cargo build -p multiple-daemons-example-sink - # The path is for CI, replace it with your source absolute path here source: ../../target/debug/multiple-daemons-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index a46225672..ae6fc2625 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet}, fmt, net::SocketAddr, path::PathBuf, diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 501f8805e..61122a256 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -464,7 +464,12 @@ pub fn source_is_url(source: &str) -> bool { } pub fn resolve_path(source: &str, working_dir: &Path) -> Result { - let path = source_to_path(source); + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; // Search path within current working directory if let Ok(abs_path) = working_dir.join(&path).canonicalize() { @@ -476,14 +481,6 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result { bail!("Could not find source path {}", path.display()) } } -pub fn source_to_path(source: &str) -> PathBuf { - let path = Path::new(&source); - if path.extension().is_none() { - path.with_extension(EXE_EXTENSION) - } else { - path.to_owned() - } -} #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields)] diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index e7939b528..71e985952 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -10,7 +10,7 @@ use std::collections::HashSet; use std::{path::Path, process::Command}; use tracing::info; -use super::{resolve_path, source_to_path, Descriptor, DYNAMIC_SOURCE, SHELL_SOURCE}; +use super::{resolve_path, Descriptor, DYNAMIC_SOURCE, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -37,14 +37,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; } else { - let path = source_to_path(source); - if path.is_relative() { - eyre::bail!( - "paths of remote nodes must be absolute (node `{}`)", - node.id - ); - } - info!("skipping path check for remote node `{}`", node.id); + info!("skipping path exist check for remote node `{}`", node.id); } } }, @@ -60,15 +53,8 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result bail!("no shared library at `{}`", path.display()); } } else { - let path = source_to_path(path); - if path.is_relative() { - eyre::bail!( - "paths of operator must be absolute (operator `{}`)", - operator_definition.id - ); - } info!( - "skipping path check for remote operator `{}`", + "skipping path exist check for remote operator `{}`", operator_definition.id ); } @@ -83,15 +69,8 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result bail!("no Python library at `{path}`"); } } else { - let path = source_to_path(path); - if path.is_relative() { - eyre::bail!( - "paths of python operator must be absolute (operator `{}`)", - operator_definition.id - ); - } info!( - "skipping path check for remote python operator `{}`", + "skipping path exist check for remote python operator `{}`", operator_definition.id ); } @@ -104,15 +83,8 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result bail!("no WASM library at `{path}`"); } } else { - let path = source_to_path(path); - if path.is_relative() { - eyre::bail!( - "paths of Wasm operator must be absolute (operator `{}`)", - operator_definition.id - ); - } info!( - "skipping path check for remote Wasm operator `{}`", + "skipping path exist check for remote Wasm operator `{}`", operator_definition.id ); } From db240693a40aa46b200a062e22848b9755fff9f6 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Fri, 14 Jun 2024 13:28:29 +0800 Subject: [PATCH 12/13] fix: dora up for dynamic node? --- .github/workflows/ci.yml | 2 ++ binaries/cli/src/main.rs | 11 ++++++++--- binaries/cli/src/up.rs | 8 +++++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d74da80a3..fc2bad7ae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -290,6 +290,8 @@ jobs: sleep 10 dora stop --name ci-rust-test --grace-duration 5s cd .. + dora destroy + dora up --working-dir ./example/rust-dataflow dora build examples/rust-dataflow/dataflow_dynamic.yml dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic cargo run -p rust-dataflow-example-sink-dynamic diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index e8af1ea07..4a43e1df9 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -86,6 +86,8 @@ enum Command { /// Use a custom configuration #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] config: Option, + #[clap(long, default_value = ".")] + working_dir: PathBuf, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { @@ -312,8 +314,11 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { config } => { - up::up(config.as_deref())?; + Command::Up { + config, + working_dir + } => { + up::up(config.as_deref(), working_dir)?; } Command::Logs { dataflow, @@ -461,7 +466,7 @@ fn run() -> eyre::Result<()> { ); } - Daemon::run_dataflow(&dataflow_path).await + Daemon::run_dataflow(&working_dir).await } None => { if coordinator_addr.ip() == LOCALHOST { diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index d1376d7ca..6b4a13d4e 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -2,10 +2,11 @@ use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; use eyre::Context; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; +use std::path::PathBuf; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>, working_dir: PathBuf) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into(); let mut session = match connect_to_coordinator(coordinator_addr) { @@ -26,7 +27,7 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { }; if !daemon_running(&mut *session)? { - start_daemon().wrap_err("failed to start dora-daemon")?; + start_daemon(working_dir).wrap_err("failed to start dora-daemon")?; // wait a bit until daemon is connected let mut i = 0; @@ -93,11 +94,12 @@ fn start_coordinator() -> eyre::Result<()> { Ok(()) } -fn start_daemon() -> eyre::Result<()> { +fn start_daemon(working_dir: PathBuf) -> eyre::Result<()> { let mut cmd = Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); cmd.arg("daemon"); cmd.arg("--quiet"); + cmd.arg("--working-dir").arg(working_dir); cmd.spawn().wrap_err("failed to run `dora daemon`")?; println!("started dora daemon"); From f477d6bc0cd76aa64672bacf00b06ffbe0795ca5 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Fri, 14 Jun 2024 14:25:08 +0800 Subject: [PATCH 13/13] fix: local run-dataflow --- .github/workflows/ci.yml | 11 +++++++---- binaries/cli/src/main.rs | 6 +++--- binaries/cli/src/up.rs | 2 +- binaries/daemon/src/lib.rs | 14 +++++++------- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fc2bad7ae..4a22516ee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -291,7 +291,7 @@ jobs: dora stop --name ci-rust-test --grace-duration 5s cd .. dora destroy - dora up --working-dir ./example/rust-dataflow + dora up --working-dir examples/rust-dataflow dora build examples/rust-dataflow/dataflow_dynamic.yml dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic cargo run -p rust-dataflow-example-sink-dynamic @@ -321,10 +321,13 @@ jobs: sleep 10 dora stop --name ci-python-test --grace-duration 5s pip install opencv-python - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic - python ../examples/python-dataflow/plot_dynamic.py + dora destroy + cd .. + dora up --working-dir examples/python-dataflow + dora start examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + python examples/python-dataflow/plot_dynamic.py sleep 5 - dora stop --name ci-python-test --grace-duration 5s + dora stop --name ci-python-dynamic --grace-duration 5s dora destroy clippy: diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 4a43e1df9..736bd01a7 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -314,9 +314,9 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { + Command::Up { config, - working_dir + working_dir, } => { up::up(config.as_deref(), working_dir)?; } @@ -466,7 +466,7 @@ fn run() -> eyre::Result<()> { ); } - Daemon::run_dataflow(&working_dir).await + Daemon::run_dataflow(&dataflow_path, working_dir).await } None => { if coordinator_addr.ip() == LOCALHOST { diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 6b4a13d4e..fc758bdeb 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,8 +1,8 @@ use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; use eyre::Context; -use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; use std::path::PathBuf; +use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 15c92aded..238177844 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -154,13 +154,13 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { - let working_dir = dataflow_path - .canonicalize() - .context("failed to canoncialize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? - .to_owned(); + pub async fn run_dataflow(dataflow_path: &Path, working_dir: PathBuf) -> eyre::Result<()> { + // let working_dir = dataflow_path + // .canonicalize() + // .context("failed to canoncialize dataflow path")? + // .parent() + // .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? + // .to_owned(); let descriptor = Descriptor::read(dataflow_path).await?; descriptor.check(&working_dir)?;