Skip to content
Merged
2 changes: 1 addition & 1 deletion src/escalation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod unix;

use std::process::Stdio;

pub use self::unix::Command;
pub use self::unix::{Command, EscalationMethod};

#[derive(Debug, thiserror::Error)]
pub enum EscalationError {
Expand Down
18 changes: 9 additions & 9 deletions src/herder_daemon/ipc.rs → src/herder_api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
pub mod write_verify;

use std::fmt::{Debug, Display};

use serde::{Deserialize, Serialize, de::DeserializeOwned};

pub use super::writer_process::ipc::{WriteVerifyAction, WriteVerifyError, WriteVerifyEvent};

/// Tell the herder to start a herd for performing an arbitrary action.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StartHerd<A> {
Expand Down Expand Up @@ -52,23 +52,23 @@ pub trait HerdEvent:
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, derive_more::From)]
#[non_exhaustive]
pub enum TopLevelHerdEvent {
Writer(WriteVerifyEvent),
Writer(write_verify::WriteVerifyEvent),
}

macro_rules! impl_try_from_top_level_herd_event {
($arm:ident => $event_type:ty) => {
impl TryFrom<crate::herder_daemon::ipc::TopLevelHerdEvent> for $event_type {
type Error = crate::herder_daemon::ipc::TopLevelHerdEvent;
impl TryFrom<crate::herder_api::TopLevelHerdEvent> for $event_type {
type Error = crate::herder_api::TopLevelHerdEvent;
fn try_from(
ev: crate::herder_daemon::ipc::TopLevelHerdEvent,
) -> Result<Self, crate::herder_daemon::ipc::TopLevelHerdEvent> {
ev: crate::herder_api::TopLevelHerdEvent,
) -> Result<Self, crate::herder_api::TopLevelHerdEvent> {
match ev {
crate::herder_daemon::ipc::TopLevelHerdEvent::$arm(x) => Ok(x),
crate::herder_api::TopLevelHerdEvent::$arm(x) => Ok(x),
//other => Err(other),
}
}
}
};
}

pub(super) use impl_try_from_top_level_herd_event;
pub(self) use impl_try_from_top_level_herd_event;
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{fmt::Display, path::PathBuf};

use serde::{Deserialize, Serialize};

use super::HerdAction;
use crate::compression::CompressionFormat;
use crate::device::Type;
use crate::herder_daemon::ipc::{self, HerdAction};
use serde::{Deserialize, Serialize};
use std::{fmt::Display, path::PathBuf};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WriteVerifyAction {
Expand Down Expand Up @@ -40,9 +38,9 @@ pub enum WriteVerifyEvent {
Error(WriteVerifyError),
}

ipc::impl_try_from_top_level_herd_event!(Writer => WriteVerifyEvent);
super::impl_try_from_top_level_herd_event!(Writer => WriteVerifyEvent);

impl ipc::HerdEvent for WriteVerifyEvent {
impl super::HerdEvent for WriteVerifyEvent {
type StartInfo = WriteVerifyStart;
type Failure = WriteVerifyError;

Expand Down Expand Up @@ -74,6 +72,7 @@ pub enum WriteVerifyError {
UnexpectedTermination,
UnknownChildProcError(String),
FailedToUnmount { message: String, exit_code: i32 },
Panicked,
}

impl From<std::io::Error> for WriteVerifyError {
Expand Down Expand Up @@ -104,6 +103,7 @@ impl Display for WriteVerifyError {
f,
"Failed to unmount disk (exit code {exit_code})\n{message}"
),
WriteVerifyError::Panicked => write!(f, "Orchestrator panicked!"),
}
}
}
18 changes: 8 additions & 10 deletions src/herder_daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ use tracing::info;
use tracing_unwrap::ResultExt;

use crate::{
herder_daemon::ipc::{TopLevelHerdEvent, WriteVerifyAction},
herder_api::{StartHerd, TopLevelHerdEvent, write_verify::WriteVerifyAction},
ipc_common::{read_msg_async, write_msg},
};

pub mod ipc;
mod writer_process;

pub async fn main() {
loop {
let msg =
match read_msg_async::<ipc::StartHerd<WriteVerifyAction>>(tokio::io::stdin()).await {
Ok(d) => d,
Err(e) => {
tracing::info!("Error received on stdin, quitting: {e}");
return;
}
};
let msg = match read_msg_async::<StartHerd<WriteVerifyAction>>(tokio::io::stdin()).await {
Ok(d) => d,
Err(e) => {
tracing::info!("Error received on stdin, quitting: {e}");
return;
}
};
info!(?msg, "Received StartAction request");

let child = writer_process::spawn_writer(
Expand Down
4 changes: 1 addition & 3 deletions src/herder_daemon/writer_process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ use tracing_unwrap::ResultExt;

use crate::compression::CompressionFormat;
use crate::device;
use crate::herder_api::write_verify::*;

use self::utils::{CountRead, CountWrite, FileSourceReader, SyncDataFile};
use self::xplat::open_blockdev;

use ipc::*;

pub mod ipc;
#[cfg(test)]
mod tests;
mod utils;
Expand Down
6 changes: 2 additions & 4 deletions src/herder_daemon/writer_process/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,8 @@ mod helpers {

use rand::{Rng, SeedableRng, rngs::SmallRng};

use super::{
CompressionFormat, VerifyOp, WriteOp,
ipc::{WriteVerifyError, WriteVerifyEvent},
};
use super::{CompressionFormat, VerifyOp, WriteOp};
use crate::herder_api::write_verify::*;

/// Wraps an in-memory buffer and logs every single chunk of data written to it.
struct MockWrite<'a> {
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ mod device;
mod escalation;
mod hash;
mod hashfile;
mod herder_api;
mod herder_daemon;
mod herder_facade;
mod ipc_common;
mod logging;
mod native;
mod orchestrator;
mod tty;
mod ui;
mod util;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::herder_daemon::ipc::StartHerd;
use crate::herder_facade::DaemonError;
use super::DaemonError;
use crate::herder_api::StartHerd;
use crate::ipc_common::write_msg_async;
use serde::Serialize;
use tokio::io::AsyncWrite;
Expand All @@ -10,6 +10,7 @@ use tracing::info;
/// Literally doesn't even implement responses.
pub(super) trait HerderClient {
async fn start_writer<A: Serialize>(&mut self, id: u64, action: A) -> Result<(), DaemonError>;
async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError>;
}

/// A [HerderClient] that doesn't actually spawn the real [HerderClient] until it
Expand Down Expand Up @@ -70,6 +71,11 @@ impl<F: HerderClientFactory> HerderClient for LazyHerderClient<F> {
self.ensure_daemon().await?.start_writer(id, action).await?;
Ok(())
}

async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> {
self.ensure_daemon().await?;
Ok(())
}
}

/// A low-level handle to a child process herder daemon.
Expand All @@ -95,6 +101,10 @@ impl<W: AsyncWrite + Unpin> HerderClient for RawHerderClient<W> {
.map_err(DaemonError::TransportFailure)?;
Ok(())
}

async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> {
unimplemented!("not implemented")
}
}

#[cfg(test)]
Expand Down Expand Up @@ -142,6 +152,10 @@ mod tests {
self.call_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}

async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> {
unimplemented!()
}
}

#[derive(Debug, Clone)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::DaemonError;
use super::client::LazyHerderClient;
use super::client::{HerderClient, HerderClientFactory, RawHerderClient};
use super::{HerdHandle, HerderFacade, StartWriterError};
use crate::escalation::run_escalate;
use crate::herder_daemon::ipc::{HerdAction, HerdEvent, TopLevelHerdEvent};
use crate::herder_facade::DaemonError;
use crate::herder_facade::client::{HerderClient, HerderClientFactory, RawHerderClient};
use crate::herder_api::{HerdAction, HerdEvent, TopLevelHerdEvent};
use crate::ipc_common::read_msg_async;
use futures::StreamExt;
use std::collections::HashMap;
Expand Down Expand Up @@ -123,6 +123,10 @@ where
initial_info,
})
}

async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError> {
self.escalated_daemon.ensure_escalated_daemon().await
}
}

#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! Utilities for spawning and interacting with herder daemons.
//! WARNING: HERE THERE BE DRAGONS
//!
//! The good parts of this submodule will get assimilated into orchestrator once I get back to working
//! on the stdiomux branch. Don't rely on this module whatsoever! Orchestrator is mildly stable though.

mod client;
mod facade;

use futures::stream::BoxStream;

use crate::herder_daemon::ipc::{HerdAction, HerdEvent, TopLevelHerdEvent};
use crate::herder_api::{HerdAction, HerdEvent, TopLevelHerdEvent};

pub use facade::make_herder_facade_impl;

Expand All @@ -21,6 +24,8 @@ pub trait HerderFacade {
action: A,
escalated: bool,
) -> Result<HerdHandle<A::Event>, StartWriterError<A::Event>>;

async fn ensure_escalated_daemon(&mut self) -> Result<(), DaemonError>;
}

/// A wrapper around the events and information associated with a single herd
Expand Down
60 changes: 60 additions & 0 deletions src/orchestrator/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Exposes the [`Orchestrator`], which is a facade that orchestrates all "high-level" work
//! and tracks the state of worker tasks.

pub use self::{
herder_facade::{DaemonError, StartWriterError},
write_verify::{WriteVerifyParams, WriteVerifyStarted, WriterState},
};
use crate::{escalation::EscalationMethod, herder_api::write_verify::*};

mod herder_facade;
mod real;
pub mod watch;
mod write_verify;

/// Main facade for UI implementations to interact with the rest of the program's logic.
///
/// This can be thought of as the glue between the UI and the backend, handling the following:
///
/// - spawning child processes and escalating them
/// - orchestrating multi-step workflows, such as write + verify
/// - reduction of event streams from the child processes into full states, along with [`Watch`]
/// handles for you to query state updates
///
/// Note that the interface is fully asynchronous. For synchronous UI implementations, you should
/// spawn a worker task as a shim between the [`Orchestrator`] and your synchronous UI threads,
/// probably using channels and such.
///
/// The API for this can be considered "mostly" stable. I'll be changing out the error types, but
/// in general, the overall shape of this API can be used for new UI developments.
pub trait Orchestrator {
/// Start a write + verify workflow.
///
/// Returns when we get an initial success message from the task group, or there was a failure.
async fn start_write_verify(
&self,
begin_params: WriteVerifyParams,
) -> Result<WriteVerifyStarted, StartWriterError<WriteVerifyEvent>>;

/// Attempt to spawn a child process as root using the provided escalation method (or [`None`] to
/// automatically guess which one to use).
///
/// Returns [`Ok`] if we successfully managed to escalate, or an error if we failed. If we were
/// already escalated before this was called, returns [`Ok`].
///
/// Once this is called, all future workflows will be routed through the escalated child process
/// rather than executing at the parent's permission level!
///
/// If your requested method involves the terminal, you should switch back to the non-alternate
/// screen before calling this.
async fn escalate(&mut self, method: Option<EscalationMethod>) -> Result<(), DaemonError>;

/// Returns whether or not we have a child process running as root.
#[expect(dead_code)]
fn is_escalated(&self) -> bool;
}

/// Make the actual prod-used orchestrator implementation.
pub fn make_orchestrator_impl(log_path: &str) -> impl Orchestrator + Send + Sync + 'static {
self::real::OrchestratorImpl::new(herder_facade::make_herder_facade_impl(log_path))
}
Loading
Loading