From 20661790b2e75e0e49d3ac2ea0b0958907ac9ed7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 3 Oct 2025 13:23:52 -0700 Subject: [PATCH 1/3] new_file_832_flow is now a wrapper for new_file_832_task, which is called directly in dispatcher --- orchestration/flows/bl832/dispatcher.py | 23 +++++++++------- orchestration/flows/bl832/move.py | 35 +++++++++++++++++++------ 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 7591c7e9..f316442e 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -5,6 +5,8 @@ from pydantic import BaseModel, ValidationError, Field from typing import Any, Optional, Union +from orchestration.flows.bl832.move import process_new_832_file_task + class FlowParameterMapper: """ @@ -99,7 +101,7 @@ async def run_specific_flow(flow_name: str, parameters: dict) -> None: raise -@flow(name="dispatcher") +@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") async def dispatcher( file_path: Optional[str] = None, is_export_control: bool = False, @@ -125,15 +127,18 @@ async def dispatcher( decision_settings = await JSON.load("decision-settings") if decision_settings.value.get("new_832_file_flow/new_file_832"): logger.info("Running new_file_832 flow...") - await run_specific_flow("new_832_file_flow/new_file_832", - FlowParameterMapper.get_flow_parameters( - "new_832_file_flow/new_file_832", - available_params)) - logger.info("Completed new_file_832 flow.") + process_new_832_file_task( + file_path=available_params.get("file_path"), + is_export_control=available_params.get("is_export_control", False), + send_to_nersc=not available_params.get("is_export_control", False), # Infer from is_export_control + config=available_params.get("config") + ) + + logger.info("Completed new_file_832 task.") except Exception as e: - logger.error(f"new_832_file_flow/new_file_832 flow failed: {e}") + logger.error(f"new_832_file_flow/new_file_832 task failed: {e}") # Optionally, raise a specific ValueError - raise ValueError("new_file_832 flow Failed") from e + raise ValueError("new_file_832 task Failed") from e # Prepare ALCF and NERSC flows to run asynchronously, based on settings tasks = [] @@ -165,7 +170,7 @@ async def dispatcher( """ try: # Setup decision settings based on input parameters - setup_decision_settings(alcf_recon=True, nersc_recon=False, new_file_832=False) + setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True) # Run the main decision flow with the specified parameters # asyncio.run(dispatcher( # config={}, # PYTEST, ALCF, NERSC diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 6c61a90c..b547a5c7 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -45,6 +45,7 @@ def transfer_spot_to_data( logger.info(f"spot832 to data832 globus task_id: {task}") return success + @task(name="transfer_data_to_nersc") def transfer_data_to_nersc( file_path: str, @@ -53,17 +54,17 @@ def transfer_data_to_nersc( nersc832: GlobusEndpoint, ): logger = get_run_logger() - + # if source_file begins with "/", it will mess up os.path.join if file_path[0] == "/": file_path = file_path[1:] # Initialize config config = Config832() - + # Import here to avoid circular imports from orchestration.transfer_controller import get_transfer_controller, CopyMethod - + # Change prometheus_metrics=None if do not want to push metrics # prometheus_metrics = None prometheus_metrics = PrometheusMetrics() @@ -73,7 +74,7 @@ def transfer_data_to_nersc( config=config, prometheus_metrics=prometheus_metrics ) - + # Use transfer controller to copy the file # The controller automatically handles metrics collection and pushing logger.info(f"Transferring {file_path} from data832 to nersc") @@ -85,11 +86,29 @@ def transfer_data_to_nersc( return success + @flow(name="new_832_file_flow") -def process_new_832_file(file_path: str, - is_export_control=False, - send_to_nersc=True, - config=None): +def process_new_832_file_flow( + file_path: str, + is_export_control=False, + send_to_nersc=True, + config=None +): + process_new_832_file_task( + file_path=file_path, + is_export_control=is_export_control, + send_to_nersc=send_to_nersc, + config=config + ) + + +@task(name="new_832_file_task") +def process_new_832_file_task( + file_path: str, + is_export_control=False, + send_to_nersc=True, + config=None +): """ Sends a file along a path: - Copy from spot832 to data832 From 33b9ddeb092ae9c16aa9cdcc0fe3b9d7c467887e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 3 Oct 2025 14:04:30 -0700 Subject: [PATCH 2/3] Renaming runs_specific_flow to run_recon_flow_async so it's more obvious --- orchestration/flows/bl832/dispatcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index f316442e..d2bf53cc 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -83,8 +83,8 @@ def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: b return settings -@task(name="run_specific_flow") -async def run_specific_flow(flow_name: str, parameters: dict) -> None: +@task(name="run_recon_flow_async") +async def run_recon_flow_async(flow_name: str, parameters: dict) -> None: """ This task is used to run a specific flow with dynamically provided parameters. @@ -144,11 +144,11 @@ async def dispatcher( tasks = [] if decision_settings.value.get("alcf_recon_flow/alcf_recon_flow"): alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) - tasks.append(run_specific_flow("alcf_recon_flow/alcf_recon_flow", alcf_params)) + tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) if decision_settings.value.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) - tasks.append(run_specific_flow("nersc_recon_flow/nersc_recon_flow", nersc_params)) + tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) # Run ALCF and NERSC flows in parallel, if any if tasks: From 77381baea38e609856a07b316fdb4b0ae90e8eb9 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 3 Oct 2025 14:05:57 -0700 Subject: [PATCH 3/3] setting send_to_nersc=true --- orchestration/flows/bl832/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index d2bf53cc..cd9da2f0 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -130,7 +130,7 @@ async def dispatcher( process_new_832_file_task( file_path=available_params.get("file_path"), is_export_control=available_params.get("is_export_control", False), - send_to_nersc=not available_params.get("is_export_control", False), # Infer from is_export_control + send_to_nersc=True, config=available_params.get("config") )