Skip to content

Commit

Permalink
rename step to task (#2048)
Browse files Browse the repository at this point in the history
  • Loading branch information
DatGuyJonathan authored Feb 18, 2025
1 parent 0076b67 commit 873de0c
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 56 deletions.
4 changes: 2 additions & 2 deletions apps/framework-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,8 +1004,8 @@ async fn top_command_handler(
let project = load_project()?;

match &workflow_args.command {
Some(WorkflowCommands::Init { name, steps, step }) => {
init_workflow(&project, name, steps.clone(), step.clone()).await
Some(WorkflowCommands::Init { name, tasks, task }) => {
init_workflow(&project, name, tasks.clone(), task.clone()).await
}
Some(WorkflowCommands::Run { name, input }) => {
run_workflow(&project, name, input.clone()).await
Expand Down
12 changes: 6 additions & 6 deletions apps/framework-cli/src/cli/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,13 @@ pub enum WorkflowCommands {
/// Name of your workflow
name: String,

/// Comma-separated list of step names
/// Comma-separated list of task names
#[arg(long)]
steps: Option<String>,
tasks: Option<String>,

/// Individual step names (can be specified multiple times)
/// Individual task names (can be specified multiple times)
#[arg(long)]
step: Option<Vec<String>>,
task: Option<Vec<String>>,
},
/// Run a workflow
Run {
Expand All @@ -272,12 +272,12 @@ pub enum WorkflowCommands {
#[arg(short, long)]
input: Option<String>,
},
/// Resume a workflow from a specific step
/// Resume a workflow from a specific task
Resume {
/// Name of the workflow to resume
name: String,

/// Step to resume from
/// Task to resume from
#[arg(long)]
from: String,
},
Expand Down
29 changes: 18 additions & 11 deletions apps/framework-cli/src/cli/routines/scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ use temporal_sdk_core_protos::temporal::api::workflowservice::v1::{
pub async fn init_workflow(
project: &Project,
name: &str,
steps: Option<String>,
step: Option<Vec<String>>,
tasks: Option<String>,
task: Option<Vec<String>>,
) -> Result<RoutineSuccess, RoutineFailure> {
// Convert steps string to vector if present
let step_vec = if let Some(steps_str) = steps {
steps_str.split(',').map(|s| s.trim().to_string()).collect()
let task_vec = if let Some(tasks_str) = tasks {
tasks_str.split(',').map(|s| s.trim().to_string()).collect()
} else {
step.unwrap_or_default()
task.unwrap_or_default()
};

// Initialize the workflow using the existing Workflow::init method
Workflow::init(project, name, &step_vec).map_err(|e| {
Workflow::init(project, name, &task_vec).map_err(|e| {
RoutineFailure::new(
Message {
action: "Workflow Init Failed".to_string(),
Expand All @@ -47,7 +47,7 @@ pub async fn init_workflow(
Ok(RoutineSuccess::success(Message {
action: "Created".to_string(),
details: format!(
"Workflow '{}' initialized successfully\n\nNext Steps:\n1. cd {}/{}/{}\n2. Edit your workflow steps\n3. Run with: moose-cli workflow run {}",
"Workflow '{}' initialized successfully\n\nNext Steps:\n1. cd {}/{}/{}\n2. Edit your workflow tasks\n3. Run with: moose-cli workflow run {}",
name, APP_DIR, SCRIPTS_DIR, name, name
),
}))
Expand Down Expand Up @@ -733,7 +733,7 @@ mod tests {
}

#[tokio::test]
async fn test_workflow_init_with_steps() {
async fn test_workflow_init_with_tasks() {
let project = setup();

let result = init_workflow(
Expand All @@ -749,12 +749,19 @@ mod tests {

let workflow_dir = project.app_dir().join(SCRIPTS_DIR).join("daily-etl");

for (i, step) in ["extract", "transform", "load"].iter().enumerate() {
let file_path = workflow_dir.join(format!("{}.{}.py", i + 1, step));
assert!(file_path.exists(), "Step file {} should exist", step);
for (i, task) in ["extract", "transform", "load"].iter().enumerate() {
let file_path = workflow_dir.join(format!("{}.{}.py", i + 1, task));
assert!(file_path.exists(), "Task file {} should exist", task);

let content = fs::read_to_string(&file_path).unwrap();
assert!(content.contains("@task()"));

let expected_string = format!(r#""task": "{}""#, task);
assert!(
content.contains(&expected_string),
"Content should contain '{}'",
expected_string
);
}
}

Expand Down
8 changes: 4 additions & 4 deletions apps/framework-cli/src/framework/python/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ def {{name}}(): # The name of your script
# The return value is the output of the script.
# The return value should be a dictionary with at least:
# - step: the step name (e.g., "extract", "transform")
# - data: the actual data being passed to the next step
# - task: the task name (e.g., "extract", "transform")
# - data: the actual data being passed to the next task
return {
"step": "{{name}}", # The step name is the name of the script
"data": None # The data being passed to the next step (4MB limit)
"task": "{{name}}", # The task name is the name of the script
"data": None # The data being passed to the next task (4MB limit)
}
"#;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def dynamic_activity(execution_input: ScriptExecutionInput) -> WorkflowSte
if not task_func:
raise ValueError("No @task() function found in script.")

log.info(f"Activity received input: {execution_input.input_data}")
log.info(f"Task received input: {execution_input.input_data}")

# Pass the input data directly if it exists
input_data = execution_input.input_data if execution_input.input_data else {}
Expand All @@ -66,10 +66,10 @@ async def dynamic_activity(execution_input: ScriptExecutionInput) -> WorkflowSte

# Validate and encode result
if not isinstance(result, dict):
raise ValueError("Task must return a dictionary with 'step' and 'data' keys")
raise ValueError("Task must return a dictionary with 'task' and 'data' keys")

if "step" not in result or "data" not in result:
raise ValueError("Task result must contain 'step' and 'data' keys")
if "task" not in result or "data" not in result:
raise ValueError("Task result must contain 'task' and 'data' keys")

# Encode the result using our custom encoder
encoded_result = json.loads(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def collect_activities(workflow_dir: str) -> List[str]:
"""
Recursively collect all Python files from the workflow directory and its subdirectories,
"""
log.info(f"Collecting activities from {workflow_dir}")
log.info(f"Collecting tasks from {workflow_dir}")
script_paths = []
for root, _, files in os.walk(workflow_dir):
# Skip any folders named 'python_wrapper'
Expand Down Expand Up @@ -73,13 +73,13 @@ async def register_workflows(script_dir: str) -> Optional[Worker]:
act = create_activity_for_script(activity_name)
dynamic_activities.append(act)
_ALREADY_REGISTERED.add(activity_name)
log.info(f"Registered activity {activity_name}")
log.info(f"Registered task {activity_name}")

if len(dynamic_activities) == 0:
log.warning(f"No activities found in {script_dir}")
log.warning(f"No tasks found in {script_dir}")
return None
else:
log.info(f"Found {len(dynamic_activities)} activity(ies) in {script_dir}")
log.info(f"Found {len(dynamic_activities)} task(s) in {script_dir}")

# TODO: This should be configurable
log.info("Connecting to Temporal server...")
Expand Down
4 changes: 2 additions & 2 deletions apps/framework-cli/src/framework/scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ impl Workflow {

std::fs::create_dir_all(&workflow_dir)?;

// Create config.toml with workflow name and steps
// Create config.toml with workflow name and tasks
let config = if scripts.is_empty() {
WorkflowConfig::new(name.to_string())
} else {
WorkflowConfig::with_steps(name.to_string(), scripts.to_vec())
WorkflowConfig::with_tasks(name.to_string(), scripts.to_vec())
};
config.save(workflow_dir.join("config.toml"))?;

Expand Down
10 changes: 5 additions & 5 deletions apps/framework-cli/src/framework/scripts/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ pub struct WorkflowConfig {
#[serde(default = "default_timeout")]
pub timeout: String,

// Optional steps configuration
// Optional tasks configuration
#[serde(default)]
pub steps: Option<Vec<String>>,
pub tasks: Option<Vec<String>>,
}

impl WorkflowConfig {
Expand All @@ -24,13 +24,13 @@ impl WorkflowConfig {
schedule: default_schedule(),
retries: default_retries(),
timeout: default_timeout(),
steps: None,
tasks: None,
}
}

pub fn with_steps(name: String, steps: Vec<String>) -> Self {
pub fn with_tasks(name: String, tasks: Vec<String>) -> Self {
let mut config = Self::new(name);
config.steps = Some(steps);
config.tasks = Some(tasks);
config
}

Expand Down
6 changes: 3 additions & 3 deletions apps/framework-cli/src/framework/typescript/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ const {{name}}: TaskFunction = async () => {
// The return value is the output of the script.
// The return value should be a dictionary with at least:
// - step: the step name (e.g., "extract", "transform")
// - data: the actual data being passed to the next step
// - task: the task name (e.g., "extract", "transform")
// - data: the actual data being passed to the next task
return {
step: "{{name}}",
task: "{{name}}",
data: {}
};
};
Expand Down
6 changes: 3 additions & 3 deletions packages/py-moose-lib/moose_lib/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ def task(func: Callable[..., T] = None, *, retries: int = 3) -> Callable[..., T]
def validate_result(result: Any) -> None:
"""Ensure proper return format"""
if not isinstance(result, dict):
raise ValueError("Task must return a dictionary with 'step' and 'data' keys")
if "step" not in result or "data" not in result:
raise ValueError("Task result must contain 'step' and 'data' keys")
raise ValueError("Task must return a dictionary with 'task' and 'data' keys")
if "task" not in result or "data" not in result:
raise ValueError("Task result must contain 'task' and 'data' keys")

def decorator(f: Callable[..., T]) -> Callable[..., T]:
if asyncio.iscoroutinefunction(f):
Expand Down
6 changes: 3 additions & 3 deletions packages/ts-moose-lib/src/scripts/activity.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { log as logger } from "@temporalio/activity";
import * as fs from "fs";
import { WorkflowStepResult } from "./types";
import { WorkflowTaskResult } from "./types";

export interface ScriptExecutionInput {
scriptPath: string;
Expand All @@ -10,11 +10,11 @@ export interface ScriptExecutionInput {
export const activities = {
async executeScript(
input: ScriptExecutionInput,
): Promise<WorkflowStepResult> {
): Promise<WorkflowTaskResult> {
try {
const { scriptPath, inputData } = input;

logger.info(`Activity received input: ${JSON.stringify(inputData)}`);
logger.info(`Task received input: ${JSON.stringify(inputData)}`);

// TODO: Handle initial input data & passing data between steps
const processedInput = (inputData || {})?.data || {};
Expand Down
10 changes: 4 additions & 6 deletions packages/ts-moose-lib/src/scripts/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function collectActivities(
logger: DefaultLogger,
workflowDir: string,
): string[] {
logger.info(`Collecting activities from ${workflowDir}`);
logger.info(`Collecting tasks from ${workflowDir}`);
const scriptPaths: string[] = [];

function walkDir(dir: string) {
Expand Down Expand Up @@ -83,18 +83,16 @@ async function registerWorkflows(
const activity = await createActivityForScript(activityName);
dynamicActivities.push(activity);
ALREADY_REGISTERED.add(activityName);
logger.info(`Registered activity ${activityName}`);
logger.info(`Registered task ${activityName}`);
}
}

if (dynamicActivities.length === 0) {
logger.info(`No activities found in ${scriptDir}`);
logger.info(`No tasks found in ${scriptDir}`);
return null;
}

logger.info(
`Found ${dynamicActivities.length} activity(ies) in ${scriptDir}`,
);
logger.info(`Found ${dynamicActivities.length} task(s) in ${scriptDir}`);

// TODO: Make this configurable
logger.info("Connecting to Temporal server...");
Expand Down
2 changes: 1 addition & 1 deletion packages/ts-moose-lib/src/scripts/task.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface TaskFunction {
(): Promise<{ step: string; data: Record<string, any> }>;
(): Promise<{ task: string; data: Record<string, any> }>;
}

export interface TaskConfig {
Expand Down
4 changes: 2 additions & 2 deletions packages/ts-moose-lib/src/scripts/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export interface WorkflowState {
inputData: any | null;
}

export interface WorkflowStepResult {
step: string;
export interface WorkflowTaskResult {
task: string;
data: any;
}

0 comments on commit 873de0c

Please sign in to comment.