diff --git a/src/escalation/mod.rs b/src/escalation/mod.rs index 3d2f468e..4f45c7ef 100644 --- a/src/escalation/mod.rs +++ b/src/escalation/mod.rs @@ -5,7 +5,7 @@ mod unix; use std::process::Stdio; -pub use self::unix::Command; +pub use self::unix::{Command, EscalationMethod}; #[derive(Debug, thiserror::Error)] pub enum EscalationError { diff --git a/src/herder_daemon/ipc.rs b/src/herder_api/mod.rs similarity index 77% rename from src/herder_daemon/ipc.rs rename to src/herder_api/mod.rs index 593e82b5..d2fac3b5 100644 --- a/src/herder_daemon/ipc.rs +++ b/src/herder_api/mod.rs @@ -1,9 +1,9 @@ +pub mod write_verify; + use std::fmt::{Debug, Display}; use serde::{Deserialize, Serialize, de::DeserializeOwned}; -pub use super::writer_process::ipc::{WriteVerifyAction, WriteVerifyError, WriteVerifyEvent}; - /// Tell the herder to start a herd for performing an arbitrary action. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StartHerd { @@ -52,18 +52,18 @@ pub trait HerdEvent: #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, derive_more::From)] #[non_exhaustive] pub enum TopLevelHerdEvent { - Writer(WriteVerifyEvent), + Writer(write_verify::WriteVerifyEvent), } macro_rules! impl_try_from_top_level_herd_event { ($arm:ident => $event_type:ty) => { - impl TryFrom for $event_type { - type Error = crate::herder_daemon::ipc::TopLevelHerdEvent; + impl TryFrom for $event_type { + type Error = crate::herder_api::TopLevelHerdEvent; fn try_from( - ev: crate::herder_daemon::ipc::TopLevelHerdEvent, - ) -> Result { + ev: crate::herder_api::TopLevelHerdEvent, + ) -> Result { match ev { - crate::herder_daemon::ipc::TopLevelHerdEvent::$arm(x) => Ok(x), + crate::herder_api::TopLevelHerdEvent::$arm(x) => Ok(x), //other => Err(other), } } @@ -71,4 +71,4 @@ macro_rules! impl_try_from_top_level_herd_event { }; } -pub(super) use impl_try_from_top_level_herd_event; +pub(self) use impl_try_from_top_level_herd_event; diff --git a/src/herder_daemon/writer_process/ipc.rs b/src/herder_api/write_verify.rs similarity index 92% rename from src/herder_daemon/writer_process/ipc.rs rename to src/herder_api/write_verify.rs index 17ac0ba8..fabb745c 100644 --- a/src/herder_daemon/writer_process/ipc.rs +++ b/src/herder_api/write_verify.rs @@ -1,10 +1,8 @@ -use std::{fmt::Display, path::PathBuf}; - -use serde::{Deserialize, Serialize}; - +use super::HerdAction; use crate::compression::CompressionFormat; use crate::device::Type; -use crate::herder_daemon::ipc::{self, HerdAction}; +use serde::{Deserialize, Serialize}; +use std::{fmt::Display, path::PathBuf}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct WriteVerifyAction { @@ -40,9 +38,9 @@ pub enum WriteVerifyEvent { Error(WriteVerifyError), } -ipc::impl_try_from_top_level_herd_event!(Writer => WriteVerifyEvent); +super::impl_try_from_top_level_herd_event!(Writer => WriteVerifyEvent); -impl ipc::HerdEvent for WriteVerifyEvent { +impl super::HerdEvent for WriteVerifyEvent { type StartInfo = WriteVerifyStart; type Failure = WriteVerifyError; @@ -74,6 +72,7 @@ pub enum WriteVerifyError { UnexpectedTermination, UnknownChildProcError(String), FailedToUnmount { message: String, exit_code: i32 }, + Panicked, } impl From for WriteVerifyError { @@ -104,6 +103,7 @@ impl Display for WriteVerifyError { f, "Failed to unmount disk (exit code {exit_code})\n{message}" ), + WriteVerifyError::Panicked => write!(f, "Orchestrator panicked!"), } } } diff --git a/src/herder_daemon/mod.rs b/src/herder_daemon/mod.rs index 53089214..6adb8201 100644 --- a/src/herder_daemon/mod.rs +++ b/src/herder_daemon/mod.rs @@ -8,23 +8,21 @@ use tracing::info; use tracing_unwrap::ResultExt; use crate::{ - herder_daemon::ipc::{TopLevelHerdEvent, WriteVerifyAction}, + herder_api::{StartHerd, TopLevelHerdEvent, write_verify::WriteVerifyAction}, ipc_common::{read_msg_async, write_msg}, }; -pub mod ipc; mod writer_process; pub async fn main() { loop { - let msg = - match read_msg_async::>(tokio::io::stdin()).await { - Ok(d) => d, - Err(e) => { - tracing::info!("Error received on stdin, quitting: {e}"); - return; - } - }; + let msg = match read_msg_async::>(tokio::io::stdin()).await { + Ok(d) => d, + Err(e) => { + tracing::info!("Error received on stdin, quitting: {e}"); + return; + } + }; info!(?msg, "Received StartAction request"); let child = writer_process::spawn_writer( diff --git a/src/herder_daemon/writer_process/mod.rs b/src/herder_daemon/writer_process/mod.rs index 746d6859..a350272e 100644 --- a/src/herder_daemon/writer_process/mod.rs +++ b/src/herder_daemon/writer_process/mod.rs @@ -17,13 +17,11 @@ use tracing_unwrap::ResultExt; use crate::compression::CompressionFormat; use crate::device; +use crate::herder_api::write_verify::*; use self::utils::{CountRead, CountWrite, FileSourceReader, SyncDataFile}; use self::xplat::open_blockdev; -use ipc::*; - -pub mod ipc; #[cfg(test)] mod tests; mod utils; diff --git a/src/herder_daemon/writer_process/tests.rs b/src/herder_daemon/writer_process/tests.rs index b6f235cc..f9b70882 100644 --- a/src/herder_daemon/writer_process/tests.rs +++ b/src/herder_daemon/writer_process/tests.rs @@ -251,10 +251,8 @@ mod helpers { use rand::{Rng, SeedableRng, rngs::SmallRng}; - use super::{ - CompressionFormat, VerifyOp, WriteOp, - ipc::{WriteVerifyError, WriteVerifyEvent}, - }; + use super::{CompressionFormat, VerifyOp, WriteOp}; + use crate::herder_api::write_verify::*; /// Wraps an in-memory buffer and logs every single chunk of data written to it. struct MockWrite<'a> { diff --git a/src/main.rs b/src/main.rs index 99bc4c6d..70883cae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,11 +7,12 @@ mod device; mod escalation; mod hash; mod hashfile; +mod herder_api; mod herder_daemon; -mod herder_facade; mod ipc_common; mod logging; mod native; +mod orchestrator; mod tty; mod ui; mod util; diff --git a/src/herder_facade/client.rs b/src/orchestrator/herder_facade/client.rs similarity index 94% rename from src/herder_facade/client.rs rename to src/orchestrator/herder_facade/client.rs index 4d2836d2..454aa311 100644 --- a/src/herder_facade/client.rs +++ b/src/orchestrator/herder_facade/client.rs @@ -1,5 +1,5 @@ -use crate::herder_daemon::ipc::StartHerd; -use crate::herder_facade::DaemonError; +use super::DaemonError; +use crate::herder_api::StartHerd; use crate::ipc_common::write_msg_async; use serde::Serialize; use tokio::io::AsyncWrite; @@ -10,6 +10,7 @@ use tracing::info; /// Literally doesn't even implement responses. pub(super) trait HerderClient { async fn start_writer(&mut self, id: u64, action: A) -> Result<(), DaemonError>; + async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError>; } /// A [HerderClient] that doesn't actually spawn the real [HerderClient] until it @@ -70,6 +71,11 @@ impl HerderClient for LazyHerderClient { self.ensure_daemon().await?.start_writer(id, action).await?; Ok(()) } + + async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> { + self.ensure_daemon().await?; + Ok(()) + } } /// A low-level handle to a child process herder daemon. @@ -95,6 +101,10 @@ impl HerderClient for RawHerderClient { .map_err(DaemonError::TransportFailure)?; Ok(()) } + + async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> { + unimplemented!("not implemented") + } } #[cfg(test)] @@ -142,6 +152,10 @@ mod tests { self.call_count.fetch_add(1, Ordering::SeqCst); Ok(()) } + + async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> { + unimplemented!() + } } #[derive(Debug, Clone)] diff --git a/src/herder_facade/facade.rs b/src/orchestrator/herder_facade/facade.rs similarity index 95% rename from src/herder_facade/facade.rs rename to src/orchestrator/herder_facade/facade.rs index 2a8747d1..b6f1fb9f 100644 --- a/src/herder_facade/facade.rs +++ b/src/orchestrator/herder_facade/facade.rs @@ -1,9 +1,9 @@ +use super::DaemonError; use super::client::LazyHerderClient; +use super::client::{HerderClient, HerderClientFactory, RawHerderClient}; use super::{HerdHandle, HerderFacade, StartWriterError}; use crate::escalation::run_escalate; -use crate::herder_daemon::ipc::{HerdAction, HerdEvent, TopLevelHerdEvent}; -use crate::herder_facade::DaemonError; -use crate::herder_facade::client::{HerderClient, HerderClientFactory, RawHerderClient}; +use crate::herder_api::{HerdAction, HerdEvent, TopLevelHerdEvent}; use crate::ipc_common::read_msg_async; use futures::StreamExt; use std::collections::HashMap; @@ -123,6 +123,10 @@ where initial_info, }) } + + async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> { + self.escalated_daemon.ensure_escalated_daemon().await + } } #[derive(Debug)] diff --git a/src/herder_facade/mod.rs b/src/orchestrator/herder_facade/mod.rs similarity index 80% rename from src/herder_facade/mod.rs rename to src/orchestrator/herder_facade/mod.rs index 88f602fa..248f9b84 100644 --- a/src/herder_facade/mod.rs +++ b/src/orchestrator/herder_facade/mod.rs @@ -1,11 +1,14 @@ -//! Utilities for spawning and interacting with herder daemons. +//! WARNING: HERE THERE BE DRAGONS +//! +//! The good parts of this submodule will get assimilated into orchestrator once I get back to working +//! on the stdiomux branch. Don't rely on this module whatsoever! Orchestrator is mildly stable though. mod client; mod facade; use futures::stream::BoxStream; -use crate::herder_daemon::ipc::{HerdAction, HerdEvent, TopLevelHerdEvent}; +use crate::herder_api::{HerdAction, HerdEvent, TopLevelHerdEvent}; pub use facade::make_herder_facade_impl; @@ -21,6 +24,8 @@ pub trait HerderFacade { action: A, escalated: bool, ) -> Result, StartWriterError>; + + async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError>; } /// A wrapper around the events and information associated with a single herd diff --git a/src/orchestrator/mod.rs b/src/orchestrator/mod.rs new file mode 100644 index 00000000..8b832285 --- /dev/null +++ b/src/orchestrator/mod.rs @@ -0,0 +1,60 @@ +//! Exposes the [`Orchestrator`], which is a facade that orchestrates all "high-level" work +//! and tracks the state of worker tasks. + +pub use self::{ + herder_facade::{DaemonError, StartWriterError}, + write_verify::{WriteVerifyParams, WriteVerifyStarted, WriterState}, +}; +use crate::{escalation::EscalationMethod, herder_api::write_verify::*}; + +mod herder_facade; +mod real; +pub mod watch; +mod write_verify; + +/// Main facade for UI implementations to interact with the rest of the program's logic. +/// +/// This can be thought of as the glue between the UI and the backend, handling the following: +/// +/// - spawning child processes and escalating them +/// - orchestrating multi-step workflows, such as write + verify +/// - reduction of event streams from the child processes into full states, along with [`Watch`] +/// handles for you to query state updates +/// +/// Note that the interface is fully asynchronous. For synchronous UI implementations, you should +/// spawn a worker task as a shim between the [`Orchestrator`] and your synchronous UI threads, +/// probably using channels and such. +/// +/// The API for this can be considered "mostly" stable. I'll be changing out the error types, but +/// in general, the overall shape of this API can be used for new UI developments. +pub trait Orchestrator { + /// Start a write + verify workflow. + /// + /// Returns when we get an initial success message from the task group, or there was a failure. + async fn start_write_verify( + &self, + begin_params: WriteVerifyParams, + ) -> Result>; + + /// Attempt to spawn a child process as root using the provided escalation method (or [`None`] to + /// automatically guess which one to use). + /// + /// Returns [`Ok`] if we successfully managed to escalate, or an error if we failed. If we were + /// already escalated before this was called, returns [`Ok`]. + /// + /// Once this is called, all future workflows will be routed through the escalated child process + /// rather than executing at the parent's permission level! + /// + /// If your requested method involves the terminal, you should switch back to the non-alternate + /// screen before calling this. + async fn escalate(&mut self, method: Option) -> Result<(), DaemonError>; + + /// Returns whether or not we have a child process running as root. + #[expect(dead_code)] + fn is_escalated(&self) -> bool; +} + +/// Make the actual prod-used orchestrator implementation. +pub fn make_orchestrator_impl(log_path: &str) -> impl Orchestrator + Send + Sync + 'static { + self::real::OrchestratorImpl::new(herder_facade::make_herder_facade_impl(log_path)) +} diff --git a/src/orchestrator/real.rs b/src/orchestrator/real.rs new file mode 100644 index 00000000..073e276e --- /dev/null +++ b/src/orchestrator/real.rs @@ -0,0 +1,74 @@ +use std::{sync::Arc, time::Instant}; + +use futures::StreamExt; + +use super::herder_facade::{DaemonError, HerderFacade, StartWriterError}; +use crate::{ + escalation::EscalationMethod, + herder_api::write_verify::WriteVerifyEvent, + orchestrator::{Orchestrator, WriteVerifyParams, WriteVerifyStarted, WriterState}, +}; + +/// Actual orchestrator implementation used by Caligula. +pub struct OrchestratorImpl { + // TODO: get rid of the entire herder facade thing altogether. just assimilate the good parts + // into orchestrator. + h: Arc>, + escalation: Option>, +} + +impl OrchestratorImpl { + pub fn new(h: H) -> Self { + Self { + h: Arc::new(tokio::sync::Mutex::new(h)), + escalation: None, + } + } +} + +impl Orchestrator for OrchestratorImpl { + async fn start_write_verify( + &self, + params: WriteVerifyParams, + ) -> Result> { + // request the herder to start the action + let mut lock = self.h.lock().await; + let handle = lock + .start_herd(params.make_child_config(), self.escalation.is_some()) + .await?; + drop(lock); + + // create state reduction task + let (tx_state, rx_state) = tokio::sync::watch::channel(WriterState::initial( + Instant::now(), + !params.compression.is_identity(), + handle.initial_info.input_file_bytes, + )); + let mut events = handle.events; + let _jh = tokio::spawn(async move { + while !tx_state.borrow().is_finished() && !tx_state.is_closed() { + let event = events.next().await; + tx_state.send_modify(move |state| { + *state = std::mem::take(state).on_status(Instant::now(), event); + }); + } + }); + let state = super::watch::Watch { rx: rx_state }; + + Ok(WriteVerifyStarted { + start: handle.initial_info, + state, + }) + } + + async fn escalate(&mut self, method: Option) -> Result<(), DaemonError> { + self.escalation = Some(method); + let mut lock = self.h.lock().await; + lock.ensure_escalated_daemon().await?; + Ok(()) + } + + fn is_escalated(&self) -> bool { + todo!() + } +} diff --git a/src/orchestrator/watch.rs b/src/orchestrator/watch.rs new file mode 100644 index 00000000..4deef7bd --- /dev/null +++ b/src/orchestrator/watch.rs @@ -0,0 +1,63 @@ +use std::{fmt::Debug, ops::Deref}; +use tokio::sync::watch; + +/// A handle for you to watch state changes. +/// +/// This is used because state change events may arrive from the child process at a +/// much faster rate than a UI should reasonably draw them. On UI updates, you can +/// query this object for new +/// +/// Technically speaking, this is just a thin wrapper around [`watch::Receiver`]. +/// We may change the underlying implementation of this later, so I'm wrapping it +/// like so in order to prevent us from needing to do more refactors later. +#[derive(Clone)] +pub struct Watch { + pub(super) rx: watch::Receiver, +} + +impl Debug for Watch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("WatchState").field(&*self.borrow()).finish() + } +} + +/// A borrowed reference from a [`Watch`]. +pub struct Ref<'a, S> { + r: watch::Ref<'a, S>, +} + +#[derive(Debug, thiserror::Error)] +#[error("Worker halted before state reached the expected condition!")] +pub struct ClosedEarly; + +impl Watch { + /// Returns a reference to the most recent state. + /// + /// WARNING: Outstanding borrows hold a read lock on the inner value! If you hold this, + /// the state reducer may not be able to update the state! + pub fn borrow(&self) -> Ref<'_, S> { + Ref { + r: self.rx.borrow(), + } + } + + /// Wait until the state matches the given predicate. + /// + /// Fails if state never matches the predicate. + #[expect(dead_code)] + pub async fn wait_until( + &mut self, + pred: impl FnMut(&S) -> bool, + ) -> Result, ClosedEarly> { + let r = self.rx.wait_for(pred).await.map_err(|_| ClosedEarly)?; + Ok(Ref { r }) + } +} + +impl<'a, S> Deref for Ref<'a, S> { + type Target = S; + + fn deref(&self) -> &Self::Target { + self.r.deref() + } +} diff --git a/src/ui/writer_tracking.rs b/src/orchestrator/write_verify.rs similarity index 84% rename from src/ui/writer_tracking.rs rename to src/orchestrator/write_verify.rs index fb54b4f2..4729c16e 100644 --- a/src/ui/writer_tracking.rs +++ b/src/orchestrator/write_verify.rs @@ -1,11 +1,57 @@ -use std::time::Instant; - -use tracing::{info, trace}; - +use super::watch::Watch; use crate::{ byteseries::{ByteSeries, EstimatedTime}, - herder_daemon::ipc::{WriteVerifyError, WriteVerifyEvent}, + compression::CompressionFormat, + device::WriteTarget, + herder_api::write_verify::*, }; +use bytesize::ByteSize; +use std::time::Instant; +use std::{fs::File, path::PathBuf}; +use tracing::{info, trace}; + +/// Params for starting a write + verify workflow. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct WriteVerifyParams { + pub input_file: PathBuf, + pub input_file_size: ByteSize, + pub compression: CompressionFormat, + pub target: WriteTarget, +} + +/// Result of starting a write + verify workflow. +#[derive(Debug, Clone)] +pub struct WriteVerifyStarted { + pub start: WriteVerifyStart, + pub state: Watch, +} + +impl WriteVerifyParams { + pub fn new( + input_file: PathBuf, + compression: CompressionFormat, + target: WriteTarget, + ) -> std::io::Result { + let input_file_size = ByteSize::b(File::open(&input_file)?.metadata()?.len()); + Ok(Self { + input_file, + input_file_size, + compression, + target, + }) + } + + pub fn make_child_config(&self) -> WriteVerifyAction { + WriteVerifyAction { + dest: self.target.devnode.clone(), + src: self.input_file.clone(), + verify: true, + compression: self.compression, + target_type: self.target.target_type, + block_size: self.target.block_size.0.map(|s| s.as_u64()), + } + } +} /// A state machine for tracking the state of the writer, based on received /// messages. @@ -128,6 +174,20 @@ impl WriterState { } } +impl Default for WriterState { + /// Suitable value to put into the cell when [`std::mem::take()`] is called. + fn default() -> Self { + let now = Instant::now(); + Self::Finished { + finish_time: now, + error: Some(WriteVerifyError::Panicked), + write_hist: ByteSeries::new(now), + verify_hist: None, + total_write_bytes: 0, + } + } +} + #[derive(Debug, Clone, PartialEq)] pub struct Writing { pub write_hist: ByteSeries, @@ -192,10 +252,7 @@ impl Writing { mod tests { use std::time::{Duration, Instant}; - use crate::{ - byteseries::ByteSeries, - herder_daemon::ipc::{WriteVerifyError, WriteVerifyEvent}, - }; + use crate::{byteseries::ByteSeries, herder_api::write_verify::*}; use super::WriterState; diff --git a/src/ui/fancy_ui/display.rs b/src/ui/fancy_ui/display.rs index a07740e0..45b3524f 100644 --- a/src/ui/fancy_ui/display.rs +++ b/src/ui/fancy_ui/display.rs @@ -1,20 +1,16 @@ -use std::{sync::Arc, time::Instant}; +use std::time::Instant; -use crossterm::event::EventStream; -use futures::{StreamExt, stream::BoxStream}; +use futures::{Stream, StreamExt}; use ratatui::{ Terminal, backend::Backend, layout::{Constraint, Direction, Layout, Rect}, widgets::{Block, Borders, Paragraph, Wrap}, }; -use tokio::{select, time}; use crate::{ - herder_daemon::ipc::WriteVerifyEvent, - herder_facade::HerdHandle, logging::LogPaths, - ui::{start::BeginParams, writer_tracking::WriterState}, + orchestrator::{WriterState, watch::Watch}, }; use super::{ @@ -22,38 +18,23 @@ use super::{ widgets::{SpeedChart, WriterProgressBar, WritingInfoTable}, }; -pub struct FancyUI<'a, B> +pub struct FancyUI<'a, B, S> where B: Backend, + S: Stream + Unpin + 'a, { - terminal: &'a mut Terminal, - events: EventStream, - handle: Option>, - state: State, - log_paths: Arc, + pub terminal: &'a mut Terminal, + pub events: S, + pub child_state: Watch, + pub state: State, + pub log_paths: &'a LogPaths, } -impl<'a, B> FancyUI<'a, B> +impl<'a, B, S> FancyUI<'a, B, S> where B: Backend, + S: Stream + Unpin + 'a, { - #[tracing::instrument(skip_all)] - pub fn new( - params: &BeginParams, - handle: HerdHandle, - terminal: &'a mut Terminal, - log_paths: Arc, - ) -> Self { - let input_file_bytes = handle.initial_info.input_file_bytes; - Self { - terminal, - handle: Some(handle), - events: EventStream::new(), - state: State::initial(Instant::now(), params, input_file_bytes), - log_paths, - } - } - #[tracing::instrument(skip_all, level = "debug")] pub async fn show(mut self) -> anyhow::Result<()> { loop { @@ -68,46 +49,14 @@ where } #[tracing::instrument(skip_all, level = "trace")] - async fn get_and_handle_events(mut self) -> anyhow::Result> { - let msg = { - if let Some(handle) = &mut self.handle { - get_event_child_active(&mut self.events, &mut handle.events).await - } else { - get_event_child_dead(&mut self.events).await - }? - }; - self.state = self.state.on_event(msg)?; - - // Drop handle/process if process died - if self.state.child.is_finished() { - self.handle = None; - } - - draw(&mut self.state, self.terminal, &self.log_paths)?; - Ok(self) - } -} - -async fn get_event_child_dead(ui_events: &mut EventStream) -> anyhow::Result { - Ok(UIEvent::RecvTermEvent(ui_events.next().await.unwrap()?)) -} + async fn get_and_handle_events(mut self) -> anyhow::Result { + while let Some(event) = self.events.next().await { + let child = self.child_state.borrow(); + self.state = self.state.on_event(&child, event)?; -#[tracing::instrument(skip_all, level = "trace")] -async fn get_event_child_active( - ui_events: &mut EventStream, - child_events: &mut BoxStream<'static, WriteVerifyEvent>, -) -> anyhow::Result { - let sleep = tokio::time::sleep(time::Duration::from_millis(250)); - select! { - _ = sleep => { - return Ok(UIEvent::SleepTimeout); - } - msg = child_events.next() => { - return Ok(UIEvent::RecvChildStatus(Instant::now(), msg)); - } - event = ui_events.next() => { - return Ok(UIEvent::RecvTermEvent(event.unwrap()?)); + draw(&mut self.state, &child, self.terminal, &self.log_paths)?; } + Ok(self) } } @@ -165,17 +114,18 @@ fn centered_rect(r: Rect, w: u16, h: u16) -> Rect { pub fn draw( state: &mut State, + child: &WriterState, terminal: &mut Terminal, log_paths: &LogPaths, ) -> anyhow::Result<()> { - let progress_bar = WriterProgressBar::from_writer(&state.child); + let progress_bar = WriterProgressBar::from_writer(&child); - let final_time = match state.child { - WriterState::Finished { finish_time, .. } => finish_time, + let final_time = match child { + WriterState::Finished { finish_time, .. } => *finish_time, _ => Instant::now(), }; - let error = match &state.child { + let error = match &child { WriterState::Finished { error, .. } => error.as_ref(), _ => None, }; @@ -183,11 +133,11 @@ pub fn draw( let info_table = WritingInfoTable { input_filename: &state.input_filename, target_filename: &state.target_filename, - state: &state.child, + state: &child, }; let speed_chart = SpeedChart { - state: &state.child, + state: &child, final_time, }; diff --git a/src/ui/fancy_ui/mod.rs b/src/ui/fancy_ui/mod.rs index 71739939..a0e4a941 100644 --- a/src/ui/fancy_ui/mod.rs +++ b/src/ui/fancy_ui/mod.rs @@ -1,5 +1,60 @@ +use futures::{Stream, StreamExt as _, stream}; +use ratatui::{Terminal, prelude::Backend}; + +use self::state::UIEvent; +use crate::{ + logging::LogPaths, + orchestrator::{WriteVerifyParams, WriterState, watch::Watch}, + ui::fancy_ui::{display::FancyUI, state::State}, +}; +use std::time::Duration; + mod display; mod state; mod widgets; -pub use display::FancyUI; +/// How often we refresh the display +const REFRESH_PERIOD: Duration = Duration::from_millis(250); + +pub struct Params<'a, B, T> +where + B: Backend + 'a, + T: Stream> + 'a, +{ + pub terminal: &'a mut Terminal, + pub begin: &'a WriteVerifyParams, + pub child_state: Watch, + pub terminal_events: T, + pub log_paths: &'a LogPaths, +} + +/// Run the fancy TUI. +#[tracing::instrument(skip_all)] +pub async fn run<'a, B, T>(params: Params<'a, B, T>) -> anyhow::Result<()> +where + B: Backend, + T: Stream> + 'a, +{ + let terminal_events = + params + .terminal_events + .map(|e: std::io::Result| { + UIEvent::RecvTermEvent(e.map_err(|e| (e.to_string(), e.kind()))) + }); + let timeout_events = + stream::unfold(tokio::time::interval(REFRESH_PERIOD), |mut i| async move { + i.tick().await; + Some((UIEvent::SleepTimeout, i)) + }); + let events = Box::pin(stream::select(terminal_events, timeout_events)); + + let ui = FancyUI { + terminal: params.terminal, + events, + child_state: params.child_state, + state: State::initial(params.begin), + log_paths: params.log_paths, + }; + + ui.show().await +} diff --git a/src/ui/fancy_ui/state.rs b/src/ui/fancy_ui/state.rs index dbaf4abc..ab6a3bea 100644 --- a/src/ui/fancy_ui/state.rs +++ b/src/ui/fancy_ui/state.rs @@ -1,68 +1,69 @@ -use std::time::Instant; - use crossterm::event::{Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers}; use tracing::info; -use crate::{ - herder_daemon::ipc::WriteVerifyEvent, - ui::{start::BeginParams, writer_tracking::WriterState}, -}; +use crate::orchestrator::{WriteVerifyParams, WriterState}; use super::widgets::{QuitModal, QuitModalResult, SpeedChartState}; #[derive(Debug, PartialEq, Clone)] pub enum UIEvent { SleepTimeout, - RecvChildStatus(Instant, Option), - RecvTermEvent(Event), + RecvTermEvent(Result), } #[derive(Debug, Clone)] pub struct State { pub input_filename: String, pub target_filename: String, - pub child: WriterState, pub graph_state: SpeedChartState, pub quit_modal: Option, } impl State { - pub fn initial(now: Instant, params: &BeginParams, input_file_bytes: u64) -> Self { + pub fn initial(params: &WriteVerifyParams) -> Self { State { input_filename: params.input_file.to_string_lossy().to_string(), target_filename: params.target.devnode.to_string_lossy().to_string(), - child: WriterState::initial(now, !params.compression.is_identity(), input_file_bytes), graph_state: SpeedChartState::default(), quit_modal: None, } } #[tracing::instrument(skip_all, level = "debug", fields(ev))] - pub fn on_event(self, ev: UIEvent) -> anyhow::Result { + pub fn on_event(self, child: &WriterState, ev: UIEvent) -> anyhow::Result { Ok(match ev { UIEvent::SleepTimeout => self, - UIEvent::RecvChildStatus(t, m) => Self { - child: self.child.on_status(t, m), - ..self - }, - UIEvent::RecvTermEvent(e) => self.on_term_event(e)?, + UIEvent::RecvTermEvent(e) => self.on_term_event(child, e)?, }) } #[tracing::instrument(skip_all, level = "debug", fields(ev))] - fn on_term_event(self, ev: Event) -> anyhow::Result { + fn on_term_event( + self, + child: &WriterState, + ev: Result, + ) -> anyhow::Result { match ev { - Event::Key(KeyEvent { + Ok(Event::Key(KeyEvent { kind: KeyEventKind::Press, code, modifiers, .. - }) => self.handle_key_down((code, modifiers)), + })) => self.handle_key_down(child, code, modifiers), + Err((msg, kind)) => { + tracing::error!("Error getting term event ({kind}): {msg}"); + Err(Quit)? + } _ => Ok(self), } } - fn handle_key_down(mut self, (kc, km): (KeyCode, KeyModifiers)) -> anyhow::Result { + fn handle_key_down( + mut self, + child: &WriterState, + kc: KeyCode, + km: KeyModifiers, + ) -> anyhow::Result { if let Some(qm) = &self.quit_modal { return match qm.handle_key_down(kc) { Some(QuitModalResult::Quit) => Err(Quit.into()), @@ -78,7 +79,7 @@ impl State { (KeyCode::Char('c'), KeyModifiers::CONTROL) | (KeyCode::Esc, _) | (KeyCode::Char('q'), _) => { - if self.child.is_finished() { + if child.is_finished() { info!("Writing and verification finished; quitting immediately"); Err(Quit.into()) } else { diff --git a/src/ui/fancy_ui/widgets.rs b/src/ui/fancy_ui/widgets.rs index aa03e31b..14d87041 100644 --- a/src/ui/fancy_ui/widgets.rs +++ b/src/ui/fancy_ui/widgets.rs @@ -13,7 +13,7 @@ use ratatui::{ }, }; -use crate::ui::writer_tracking::WriterState; +use crate::orchestrator::WriterState; pub struct SpeedChart<'a> { pub state: &'a WriterState, diff --git a/src/ui/mod.rs b/src/ui/mod.rs index faa754e3..6d404d23 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -3,15 +3,14 @@ mod fancy_ui; mod simple_ui; mod start; mod utils; -mod writer_tracking; use std::{fs::File, path::Path, sync::Arc}; pub use self::cli::BurnArgs; pub use self::utils::ByteSpeed; use crate::{ - herder_facade::make_herder_facade_impl, logging::LogPaths, + orchestrator::make_orchestrator_impl, tty::TermiosRestore, ui::{ simple_ui::do_setup_wizard, @@ -40,10 +39,10 @@ pub async fn main( return Ok(()); }; - let mut herder = make_herder_facade_impl(log_paths.main()); + let mut orc = make_orchestrator_impl(log_paths.main()); let handle = try_start_burn( - &mut herder, - &begin_params.make_child_config(), + &mut orc, + &begin_params, args.root, args.interactive.is_interactive(), ) diff --git a/src/ui/simple_ui/ask_outfile.rs b/src/ui/simple_ui/ask_outfile.rs index d66f13af..0c738666 100644 --- a/src/ui/simple_ui/ask_outfile.rs +++ b/src/ui/simple_ui/ask_outfile.rs @@ -6,7 +6,8 @@ use tracing::debug; use crate::{ compression::{AVAILABLE_FORMATS, CompressionArg, CompressionFormat}, device::{self, Removable, WriteTarget, enumerate_devices}, - ui::{cli::BurnArgs, start::BeginParams}, + orchestrator::WriteVerifyParams, + ui::cli::BurnArgs, }; #[tracing::instrument(skip_all)] @@ -77,7 +78,10 @@ pub fn ask_outfile(args: &BurnArgs) -> anyhow::Result { } #[tracing::instrument(skip_all)] -pub fn confirm_write(args: &BurnArgs, begin_params: &BeginParams) -> Result { +pub fn confirm_write( + args: &BurnArgs, + begin_params: &WriteVerifyParams, +) -> Result { if args.force { debug!("Skipping confirm because of --force"); Ok(true) diff --git a/src/ui/simple_ui/mod.rs b/src/ui/simple_ui/mod.rs index ffe42aa9..60b7ca46 100644 --- a/src/ui/simple_ui/mod.rs +++ b/src/ui/simple_ui/mod.rs @@ -4,16 +4,14 @@ //! As pretty as ratatui looks, sometimes you can't use a full-featured terminal. //! This is what this module is for. -use std::time::Instant; +use std::time::Duration; -use futures::StreamExt; use indicatif::ProgressBar; use indicatif::ProgressStyle; -use crate::compression::CompressionFormat; use crate::device::WriteTarget; -use crate::herder_daemon::ipc::WriteVerifyEvent; -use crate::ui::writer_tracking::WriterState; +use crate::herder_api::write_verify::WriteVerifyStart; +use crate::orchestrator::{WriteVerifyParams, WriterState, watch::Watch}; use self::ask_hash::ask_hash; use self::ask_outfile::ask_compression; @@ -21,22 +19,23 @@ use self::ask_outfile::ask_outfile; use self::ask_outfile::confirm_write; use super::cli::BurnArgs; -use super::start::BeginParams; -use crate::herder_facade::HerdHandle; mod ask_hash; mod ask_outfile; +/// How often we refresh the display +const REFRESH_PERIOD: Duration = Duration::from_millis(250); + /// Returns the [BeginParams] if the user confirms, and None if the user doesn't. #[tracing::instrument(skip_all)] -pub fn do_setup_wizard(args: &BurnArgs) -> Result, anyhow::Error> { +pub fn do_setup_wizard(args: &BurnArgs) -> Result, anyhow::Error> { let compression = ask_compression(args)?; let _hash_info = ask_hash(args, compression)?; let target = match &args.out { Some(f) => WriteTarget::try_from(f.as_ref())?, None => ask_outfile(args)?, }; - let begin_params = BeginParams::new(args.image.clone(), compression, target)?; + let begin_params = WriteVerifyParams::new(args.image.clone(), compression, target)?; if !confirm_write(args, &begin_params)? { eprintln!("Aborting."); return Ok(None); @@ -44,12 +43,15 @@ pub fn do_setup_wizard(args: &BurnArgs) -> Result, anyhow::E Ok(Some(begin_params)) } +pub struct Params<'a> { + pub initial_info: &'a WriteVerifyStart, + pub child_state: Watch, +} + +/// Run the simple TUI. #[tracing::instrument(skip_all)] -pub async fn run_simple_burning_ui( - mut handle: HerdHandle, - cf: CompressionFormat, -) -> anyhow::Result<()> { - let input_file_bytes = handle.initial_info.input_file_bytes; +pub async fn run<'a>(params: Params<'a>) -> anyhow::Result<()> { + let input_file_bytes = params.initial_info.input_file_bytes; let write_progress = ProgressBar::new(100).with_message("Burning").with_style( ProgressStyle::with_template( "[{elapsed_precise}] {msg:>10} {wide_bar:.green/black} {percent:>3}%", @@ -63,12 +65,12 @@ pub async fn run_simple_burning_ui( .unwrap(), ); - let mut child_state = WriterState::initial(Instant::now(), !cf.is_identity(), input_file_bytes); - + let mut interval = tokio::time::interval(REFRESH_PERIOD); loop { - let x = handle.events.next().await; - child_state = child_state.on_status(Instant::now(), x); - match &child_state { + interval.tick().await; + + let child_state = params.child_state.borrow(); + match &*child_state { WriterState::Writing(b) => { write_progress.set_position((b.approximate_ratio() * 1000.0) as u64) } diff --git a/src/ui/start.rs b/src/ui/start.rs index 9d8ff0b1..3d34e7d7 100644 --- a/src/ui/start.rs +++ b/src/ui/start.rs @@ -1,66 +1,27 @@ -use std::{fmt::Display, fs::File, path::PathBuf, sync::Arc}; +use std::{fmt::Display, sync::Arc}; -use bytesize::ByteSize; use inquire::Confirm; use tracing::debug; use crate::{ - compression::CompressionFormat, - device::{self, WriteTarget}, - herder_daemon::ipc::{WriteVerifyAction, WriteVerifyError, WriteVerifyEvent}, - herder_facade::{HerdHandle, HerderFacade, StartWriterError}, + device, + herder_api::write_verify::{WriteVerifyError, WriteVerifyEvent}, logging::LogPaths, + orchestrator::{Orchestrator, StartWriterError, WriteVerifyParams, WriteVerifyStarted}, ui::{ cli::{Interactive, UseSudo}, - fancy_ui::FancyUI, - simple_ui::run_simple_burning_ui, utils::TUICapture, }, }; -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct BeginParams { - pub input_file: PathBuf, - pub input_file_size: ByteSize, - pub compression: CompressionFormat, - pub target: WriteTarget, -} - -impl BeginParams { - pub fn new( - input_file: PathBuf, - compression: CompressionFormat, - target: WriteTarget, - ) -> std::io::Result { - let input_file_size = ByteSize::b(File::open(&input_file)?.metadata()?.len()); - Ok(Self { - input_file, - input_file_size, - compression, - target, - }) - } - - pub fn make_child_config(&self) -> WriteVerifyAction { - WriteVerifyAction { - dest: self.target.devnode.clone(), - src: self.input_file.clone(), - verify: true, - compression: self.compression, - target_type: self.target.target_type, - block_size: self.target.block_size.0.map(|s| s.as_u64()), - } - } -} - #[tracing::instrument(skip_all, fields(root, interactive))] pub async fn try_start_burn( - herder: &mut impl HerderFacade, - args: &WriteVerifyAction, + orc: &mut impl Orchestrator, + args: &WriteVerifyParams, root: UseSudo, interactive: bool, -) -> anyhow::Result> { - let err = match herder.start_herd(args.clone(), false).await { +) -> Result> { + let err = match orc.start_write_verify(args.clone()).await { Ok(p) => { return Ok(p); } @@ -74,20 +35,23 @@ pub async fn try_start_burn( let response = Confirm::new(&format!( "We don't have permissions on {}. Escalate using sudo?", - args.dest.to_string_lossy() + args.target.name )) .with_default(true) .with_help_message( "We will use the sudo command, which may prompt you for a password.", ) - .prompt()?; + .prompt() + .expect("prompting the user should not fail"); if response { - return Ok(herder.start_herd(args.clone(), true).await?); + orc.escalate(None).await?; + return Ok(orc.start_write_verify(args.clone()).await?); } } (UseSudo::Always, _) => { - return Ok(herder.start_herd(args.clone(), true).await?); + orc.escalate(None).await?; + return Ok(orc.start_write_verify(args.clone()).await?); } _ => {} } @@ -98,30 +62,40 @@ pub async fn try_start_burn( pub async fn begin_writing( interactive: Interactive, - params: BeginParams, - handle: HerdHandle, + params: WriteVerifyParams, + started: WriteVerifyStarted, log_paths: Arc, ) -> anyhow::Result<()> { debug!("Opening TUI"); + if interactive.is_interactive() { debug!("Using fancy interactive TUI"); let mut tui = TUICapture::new()?; let terminal = tui.terminal(); // create app and run it - FancyUI::new(¶ms, handle, terminal, log_paths) - .show() - .await?; + super::fancy_ui::run(super::fancy_ui::Params { + terminal, + begin: ¶ms, + child_state: started.state, + terminal_events: crossterm::event::EventStream::new(), + log_paths: &log_paths, + }) + .await?; debug!("Closing TUI"); } else { debug!("Using simple TUI"); - run_simple_burning_ui(handle, params.compression).await?; + super::simple_ui::run(super::simple_ui::Params { + initial_info: &started.start, + child_state: started.state, + }) + .await?; } Ok(()) } -impl Display for BeginParams { +impl Display for WriteVerifyParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "Input: {}", self.input_file.to_string_lossy())?; if self.compression.is_identity() {