diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d74da80a3..4a22516ee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -290,6 +290,8 @@ jobs: sleep 10 dora stop --name ci-rust-test --grace-duration 5s cd .. + dora destroy + dora up --working-dir examples/rust-dataflow dora build examples/rust-dataflow/dataflow_dynamic.yml dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic cargo run -p rust-dataflow-example-sink-dynamic @@ -319,10 +321,13 @@ jobs: sleep 10 dora stop --name ci-python-test --grace-duration 5s pip install opencv-python - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic - python ../examples/python-dataflow/plot_dynamic.py + dora destroy + cd .. + dora up --working-dir examples/python-dataflow + dora start examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + python examples/python-dataflow/plot_dynamic.py sleep 5 - dora stop --name ci-python-test --grace-duration 5s + dora stop --name ci-python-dynamic --grace-duration 5s dora destroy clippy: diff --git a/Cargo.lock b/Cargo.lock index 74ec1d4f2..8fa9526d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2281,6 +2281,7 @@ name = "dora-coordinator" version = "0.3.4" dependencies = [ "ctrlc", + "dirs 5.0.1", "dora-core", "dora-tracing", "eyre", diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 97a124008..736bd01a7 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -41,7 +41,6 @@ struct Args { #[clap(subcommand)] command: Command, } - /// dora-rs cli client #[derive(Debug, clap::Subcommand)] enum Command { @@ -87,6 +86,8 @@ enum Command { /// Use a custom configuration #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] config: Option, + #[clap(long, default_value = ".")] + working_dir: PathBuf, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { @@ -189,6 +190,9 @@ enum Command { /// Suppresses all log output to stdout. #[clap(long)] quiet: bool, + + #[clap(long, default_value = ".")] + working_dir: PathBuf, }, /// Run runtime Runtime, @@ -310,8 +314,11 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { config } => { - up::up(config.as_deref())?; + Command::Up { + config, + working_dir, + } => { + up::up(config.as_deref(), working_dir)?; } Command::Logs { dataflow, @@ -352,13 +359,9 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - if !coordinator_addr.is_loopback() { - dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; - } else { - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; - } + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; @@ -442,7 +445,12 @@ fn run() -> eyre::Result<()> { machine_id, run_dataflow, quiet: _, + working_dir, } => { + let working_dir = working_dir + .canonicalize() + .context("failed to canonicalize working dir path")? + .to_owned(); let rt = Builder::new_multi_thread() .enable_all() .build() @@ -458,13 +466,13 @@ fn run() -> eyre::Result<()> { ); } - Daemon::run_dataflow(&dataflow_path).await + Daemon::run_dataflow(&dataflow_path, working_dir).await } None => { if coordinator_addr.ip() == LOCALHOST { tracing::info!("Starting in local mode"); } - Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, working_dir).await } } }) diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index d1376d7ca..fc758bdeb 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,11 +1,12 @@ use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; use eyre::Context; +use std::path::PathBuf; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>, working_dir: PathBuf) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into(); let mut session = match connect_to_coordinator(coordinator_addr) { @@ -26,7 +27,7 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { }; if !daemon_running(&mut *session)? { - start_daemon().wrap_err("failed to start dora-daemon")?; + start_daemon(working_dir).wrap_err("failed to start dora-daemon")?; // wait a bit until daemon is connected let mut i = 0; @@ -93,11 +94,12 @@ fn start_coordinator() -> eyre::Result<()> { Ok(()) } -fn start_daemon() -> eyre::Result<()> { +fn start_daemon(working_dir: PathBuf) -> eyre::Result<()> { let mut cmd = Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); cmd.arg("daemon"); cmd.arg("--quiet"); + cmd.arg("--working-dir").arg(working_dir); cmd.spawn().wrap_err("failed to run `dora daemon`")?; println!("started dora daemon"); diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 4c6b68877..6319fea4a 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -25,3 +25,4 @@ futures-concurrency = "7.1.0" serde_json = "1.0.86" names = "0.14.0" ctrlc = "3.2.5" +dirs = "5.0.1" diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3b0c64596..560395768 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -137,6 +137,7 @@ async fn start_inner( let mut dataflow_results: HashMap>> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); + let mut daemon_working_dirs: HashMap = HashMap::new(); while let Some(event) = events.next().await { if event.log() { @@ -168,6 +169,7 @@ async fn start_inner( mut connection, dora_version: daemon_version, listen_port, + working_dir, } => { let coordinator_version: &&str = &env!("CARGO_PKG_VERSION"); let version_check = if &daemon_version == coordinator_version { @@ -207,9 +209,10 @@ async fn start_inner( "closing previous connection `{machine_id}` on new register" ); } + daemon_working_dirs.insert(machine_id.clone(), working_dir.clone()); } (Err(err), _) => { - tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); + tracing::warn!("failed to register daemon connection and daemon working_dir for machine `{machine_id}`: {err}"); } (Ok(_), Err(err)) => { tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}"); @@ -316,7 +319,6 @@ async fn start_inner( local_working_dir, } => { let name = name.or_else(|| names::Generator::default().next()); - let inner = async { if let Some(name) = name.as_deref() { // check that name is unique @@ -329,10 +331,11 @@ async fn start_inner( } let dataflow = start_dataflow( dataflow, - local_working_dir, name, &mut daemon_connections, &clock, + local_working_dir, + &mut daemon_working_dirs, ) .await?; Ok(dataflow) @@ -850,16 +853,24 @@ async fn retrieve_logs( async fn start_dataflow( dataflow: Descriptor, - working_dir: PathBuf, name: Option, daemon_connections: &mut HashMap, clock: &HLC, + local_working_dir: PathBuf, + daemon_working_dirs: &mut HashMap, ) -> eyre::Result { let SpawnedDataflow { uuid, machines, nodes, - } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?; + } = spawn_dataflow( + dataflow, + daemon_connections, + clock, + local_working_dir, + daemon_working_dirs, + ) + .await?; Ok(RunningDataflow { uuid, name, @@ -950,6 +961,7 @@ pub enum DaemonEvent { machine_id: String, connection: TcpStream, listen_port: u16, + working_dir: PathBuf, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 86600a4be..01f7d5ef1 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -53,12 +53,14 @@ pub async fn handle_connection( machine_id, dora_version, listen_port, + working_dir, } => { let event = DaemonEvent::Register { dora_version, machine_id, connection, listen_port, + working_dir, }; let _ = events_tx.send(Event::Daemon(event)).await; break; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 534b857bb..f52e73056 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -20,21 +20,12 @@ use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( dataflow: Descriptor, - working_dir: PathBuf, daemon_connections: &mut HashMap, clock: &HLC, + working_dir: PathBuf, + daemon_working_dirs: &mut HashMap, ) -> eyre::Result { - let remote_machine_id: Vec<_> = daemon_connections - .iter() - .filter_map(|(id, c)| { - if !c.listen_socket.ip().is_loopback() { - Some(id.as_str()) - } else { - None - } - }) - .collect(); - dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?; + dataflow.check(&working_dir)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); @@ -49,21 +40,22 @@ pub(super) async fn spawn_dataflow( .map(|c| (m.clone(), c.listen_socket)) }) .collect::, _>>()?; - - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir, - nodes: nodes.clone(), - machine_listen_ports, - dataflow_descriptor: dataflow, - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; - for machine in &machines { - tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); + let working_dir = daemon_working_dirs + .get(machine) + .ok_or_else(|| eyre!("no daemon working dir for machine `{machine}`"))? + .to_owned(); + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir: working_dir, + nodes: nodes.clone(), + machine_listen_ports: machine_listen_ports.clone(), + dataflow_descriptor: dataflow.clone(), + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; spawn_dataflow_on_machine(daemon_connections, machine, &message) .await .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index d2f86b3cc..e9738482f 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -8,7 +8,7 @@ use dora_core::{ message::uhlc::HLC, }; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::SocketAddr}; +use std::{io::ErrorKind, net::SocketAddr, path::PathBuf}; use tokio::{ net::TcpStream, sync::{mpsc, oneshot}, @@ -26,6 +26,7 @@ pub async fn register( machine_id: String, listen_port: u16, clock: &HLC, + working_dir: PathBuf, ) -> eyre::Result>> { let mut stream = TcpStream::connect(addr) .await @@ -38,6 +39,7 @@ pub async fn register( dora_version: env!("CARGO_PKG_VERSION").to_owned(), machine_id, listen_port, + working_dir, }, timestamp: clock.new_timestamp(), })?; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 521b5bd89..238177844 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -89,6 +89,7 @@ impl Daemon { machine_id: String, inter_daemon_addr: SocketAddr, local_listen_port: u16, + working_dir: PathBuf, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); @@ -105,19 +106,24 @@ impl Daemon { }); // connect to the coordinator - let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock) - .await - .wrap_err("failed to connect to dora-coordinator")? - .map( - |Timestamped { - inner: event, - timestamp, - }| Timestamped { - inner: Event::Coordinator(event), - timestamp, - }, - ); + let coordinator_events = coordinator::register( + coordinator_addr, + machine_id.clone(), + listen_port, + &clock, + working_dir.clone(), + ) + .await + .wrap_err("failed to connect to dora-coordinator")? + .map( + |Timestamped { + inner: event, + timestamp, + }| Timestamped { + inner: Event::Coordinator(event), + timestamp, + }, + ); // Spawn local listener loop let (events_tx, events_rx) = flume::bounded(10); @@ -148,13 +154,13 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { - let working_dir = dataflow_path - .canonicalize() - .context("failed to canoncialize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? - .to_owned(); + pub async fn run_dataflow(dataflow_path: &Path, working_dir: PathBuf) -> eyre::Result<()> { + // let working_dir = dataflow_path + // .canonicalize() + // .context("failed to canoncialize dataflow path")? + // .parent() + // .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? + // .to_owned(); let descriptor = Descriptor::read(dataflow_path).await?; descriptor.check(&working_dir)?; diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 38e9eae2e..a5ba6cb61 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use crate::daemon_messages::DataflowId; use eyre::eyre; @@ -7,6 +9,7 @@ pub enum CoordinatorRequest { dora_version: String, machine_id: String, listen_port: u16, + working_dir: PathBuf, }, Event { machine_id: String, diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 92b3460c1..61122a256 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -134,23 +134,7 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir, None, false) - .wrap_err("Dataflow could not be validated.") - } - - pub fn check_in_daemon( - &self, - working_dir: &Path, - remote_machine_id: &[&str], - coordinator_is_remote: bool, - ) -> eyre::Result<()> { - validate::check_dataflow( - self, - working_dir, - Some(remote_machine_id), - coordinator_is_remote, - ) - .wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 66368cce2..71e985952 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,25 +1,28 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, get_python_path, }; use eyre::{bail, eyre, Context}; +use std::collections::HashSet; use std::{path::Path, process::Command}; use tracing::info; use super::{resolve_path, Descriptor, DYNAMIC_SOURCE, SHELL_SOURCE}; + const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow( - dataflow: &Descriptor, - working_dir: &Path, - remote_daemon_id: Option<&[&str]>, - coordinator_is_remote: bool, -) -> eyre::Result<()> { +pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; + let is_local = nodes + .iter() + .map(|n| &n.deploy.machine) + .collect::>() + .len() + <= 1; // check that nodes and operators exist for node in &nodes { @@ -30,32 +33,12 @@ pub fn check_dataflow( source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. - } else if let Some(remote_daemon_id) = remote_daemon_id { - if remote_daemon_id.contains(&node.deploy.machine.as_str()) - || coordinator_is_remote - { - let path = Path::new(&source); - let path = if path.extension().is_none() { - path.with_extension(EXE_EXTENSION) - } else { - path.to_owned() - }; - if path.is_relative() { - eyre::bail!( - "paths of remote nodes must be absolute (node `{}`)", - node.id - ); - } - info!("skipping path check for remote node `{}`", node.id); - } else { - resolve_path(source, working_dir).wrap_err_with(|| { - format!("Could not find source path `{}`", source) - })?; - } - } else { + } else if is_local { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?; - }; + } else { + info!("skipping path exist check for remote node `{}`", node.id); + } } }, descriptor::CoreNodeKind::Runtime(node) => { @@ -64,11 +47,16 @@ pub fn check_dataflow( OperatorSource::SharedLibrary(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else { + } else if is_local { let path = adjust_shared_library_path(Path::new(&path))?; if !working_dir.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } + } else { + info!( + "skipping path exist check for remote operator `{}`", + operator_definition.id + ); } } OperatorSource::Python(python_source) => { @@ -76,15 +64,29 @@ pub fn check_dataflow( let path = &python_source.source; if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !working_dir.join(path).exists() { - bail!("no Python library at `{path}`"); + } else if is_local { + if !working_dir.join(path).exists() { + bail!("no Python library at `{path}`"); + } + } else { + info!( + "skipping path exist check for remote python operator `{}`", + operator_definition.id + ); } } OperatorSource::Wasm(path) => { if source_is_url(path) { info!("{path} is a URL."); // TODO: Implement url check. - } else if !working_dir.join(path).exists() { - bail!("no WASM library at `{path}`"); + } else if is_local { + if !working_dir.join(path).exists() { + bail!("no WASM library at `{path}`"); + } + } else { + info!( + "skipping path exist check for remote Wasm operator `{}`", + operator_definition.id + ); } } }