Skip to content

Commit

Permalink
workflow status command (#2016)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjus authored Feb 5, 2025
1 parent ad43bbf commit 0c9fff9
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 8 deletions.
13 changes: 7 additions & 6 deletions apps/framework-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ use routines::ls::{list_db, list_streaming};
use routines::metrics_console::run_console;
use routines::plan;
use routines::ps::show_processes;
use routines::scripts::init_workflow;
use routines::scripts::list_workflows;
use routines::scripts::pause_workflow;
use routines::scripts::run_workflow;
use routines::scripts::terminate_workflow;
use routines::scripts::unpause_workflow;
use routines::scripts::{
get_workflow_status, init_workflow, list_workflows, pause_workflow, run_workflow,
terminate_workflow, unpause_workflow,
};

use settings::{read_settings, Settings};
use std::cmp::Ordering;
Expand Down Expand Up @@ -1023,6 +1021,9 @@ async fn top_command_handler(
}
Some(WorkflowCommands::Pause { name }) => pause_workflow(&project, name).await,
Some(WorkflowCommands::Unpause { name }) => unpause_workflow(&project, name).await,
Some(WorkflowCommands::Status { name, id }) => {
get_workflow_status(&project, name, id.clone()).await
}
None => Err(RoutineFailure::error(Message {
action: "Workflow".to_string(),
details: "No subcommand provided".to_string(),
Expand Down
9 changes: 9 additions & 0 deletions apps/framework-cli/src/cli/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,13 @@ pub enum WorkflowCommands {
/// Name of the workflow to unpause
name: String,
},
/// Get the status of a workflow
Status {
/// Name of the workflow
name: String,

/// Optional run ID (defaults to most recent)
#[arg(long)]
id: Option<String>,
},
}
111 changes: 109 additions & 2 deletions apps/framework-cli/src/cli/routines/scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::infrastructure::orchestration::temporal::{
};
use crate::project::Project;
use crate::utilities::constants::{APP_DIR, SCRIPTS_DIR};
use chrono::{DateTime, Utc};
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowExecutionStatus;
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::{
ListWorkflowExecutionsRequest, SignalWorkflowExecutionRequest,
TerminateWorkflowExecutionRequest,
DescribeWorkflowExecutionRequest, ListWorkflowExecutionsRequest,
SignalWorkflowExecutionRequest, TerminateWorkflowExecutionRequest,
};

pub async fn init_workflow(
Expand Down Expand Up @@ -352,6 +353,112 @@ pub async fn unpause_workflow(
}))
}

pub async fn get_workflow_status(
_project: &Project,
name: &str,
run_id: Option<String>,
) -> Result<RoutineSuccess, RoutineFailure> {
let mut client = get_temporal_client().await.map_err(|_| {
RoutineFailure::error(Message {
action: "Workflow".to_string(),
details: "Could not connect to Temporal.\n".to_string(),
})
})?;

// If no run_id provided, get the most recent one
let execution_id = if let Some(id) = run_id {
id
} else {
// List workflows to get most recent
let request = ListWorkflowExecutionsRequest {
namespace: DEFAULT_TEMPORTAL_NAMESPACE.to_string(),
page_size: 1,
query: format!("WorkflowId = '{}'", name),
..Default::default()
};

let response = client
.list_workflow_executions(request)
.await
.map_err(|e| {
RoutineFailure::error(Message {
action: "Workflow".to_string(),
details: format!("Could not find workflow '{}': {}\n", name, e.message()),
})
})?;

let executions = response.into_inner().executions;
if executions.is_empty() {
return Err(RoutineFailure::error(Message {
action: "Workflow".to_string(),
details: format!("No executions found for workflow '{}'\n", name),
}));
}

executions[0].execution.as_ref().unwrap().run_id.clone()
};

// Get workflow details
let request = DescribeWorkflowExecutionRequest {
namespace: DEFAULT_TEMPORTAL_NAMESPACE.to_string(),
execution: Some(
temporal_sdk_core_protos::temporal::api::common::v1::WorkflowExecution {
workflow_id: name.to_string(),
run_id: execution_id.clone(),
},
),
};

let response = client
.describe_workflow_execution(request)
.await
.map_err(|e| {
RoutineFailure::error(Message {
action: "Workflow".to_string(),
details: format!(
"Could not get status for workflow '{}': {}\n",
name,
e.message()
),
})
})?;

let info = response.into_inner().workflow_execution_info.unwrap();

// Format the output
let status_emoji = match WorkflowExecutionStatus::try_from(info.status) {
Ok(status) => match status {
WorkflowExecutionStatus::Running => "⏳",
WorkflowExecutionStatus::Completed => "✅",
WorkflowExecutionStatus::Failed => "❌",
_ => "❓",
},
Err(_) => "❓",
};

let start_time = DateTime::<Utc>::from_timestamp(
info.start_time.as_ref().unwrap().seconds,
info.start_time.as_ref().unwrap().nanos as u32,
)
.unwrap();

let execution_time = Utc::now().signed_duration_since(start_time);

let details = format!(
"Status: {}\nRun ID: {}\nStatus: {} {}\nExecution Time: {}s\n",
name,
execution_id,
info.status().as_str_name(),
status_emoji,
execution_time.num_seconds()
);

Ok(RoutineSuccess::success(Message {
action: "Workflow".to_string(),
details,
}))
}

#[cfg(test)]
mod tests {
use crate::framework::languages::SupportedLanguages;
Expand Down

0 comments on commit 0c9fff9

Please sign in to comment.