Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions orchestration/flows/bl832/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from pydantic import BaseModel, ValidationError, Field
from typing import Any, Optional, Union

from orchestration.flows.bl832.move import process_new_832_file_task


class FlowParameterMapper:
"""
Expand Down Expand Up @@ -81,8 +83,8 @@ def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: b
return settings


@task(name="run_specific_flow")
async def run_specific_flow(flow_name: str, parameters: dict) -> None:
@task(name="run_recon_flow_async")
async def run_recon_flow_async(flow_name: str, parameters: dict) -> None:
"""
This task is used to run a specific flow with dynamically provided parameters.

Expand All @@ -99,7 +101,7 @@ async def run_specific_flow(flow_name: str, parameters: dict) -> None:
raise


@flow(name="dispatcher")
@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}")
async def dispatcher(
file_path: Optional[str] = None,
is_export_control: bool = False,
Expand All @@ -125,25 +127,28 @@ async def dispatcher(
decision_settings = await JSON.load("decision-settings")
if decision_settings.value.get("new_832_file_flow/new_file_832"):
logger.info("Running new_file_832 flow...")
await run_specific_flow("new_832_file_flow/new_file_832",
FlowParameterMapper.get_flow_parameters(
"new_832_file_flow/new_file_832",
available_params))
logger.info("Completed new_file_832 flow.")
process_new_832_file_task(
file_path=available_params.get("file_path"),
is_export_control=available_params.get("is_export_control", False),
send_to_nersc=True,
config=available_params.get("config")
)

logger.info("Completed new_file_832 task.")
except Exception as e:
logger.error(f"new_832_file_flow/new_file_832 flow failed: {e}")
logger.error(f"new_832_file_flow/new_file_832 task failed: {e}")
# Optionally, raise a specific ValueError
raise ValueError("new_file_832 flow Failed") from e
raise ValueError("new_file_832 task Failed") from e

# Prepare ALCF and NERSC flows to run asynchronously, based on settings
tasks = []
if decision_settings.value.get("alcf_recon_flow/alcf_recon_flow"):
alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params)
tasks.append(run_specific_flow("alcf_recon_flow/alcf_recon_flow", alcf_params))
tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params))

if decision_settings.value.get("nersc_recon_flow/nersc_recon_flow"):
nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params)
tasks.append(run_specific_flow("nersc_recon_flow/nersc_recon_flow", nersc_params))
tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params))

# Run ALCF and NERSC flows in parallel, if any
if tasks:
Expand All @@ -165,7 +170,7 @@ async def dispatcher(
"""
try:
# Setup decision settings based on input parameters
setup_decision_settings(alcf_recon=True, nersc_recon=False, new_file_832=False)
setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True)
# Run the main decision flow with the specified parameters
# asyncio.run(dispatcher(
# config={}, # PYTEST, ALCF, NERSC
Expand Down
35 changes: 27 additions & 8 deletions orchestration/flows/bl832/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def transfer_spot_to_data(
logger.info(f"spot832 to data832 globus task_id: {task}")
return success


@task(name="transfer_data_to_nersc")
def transfer_data_to_nersc(
file_path: str,
Expand All @@ -53,17 +54,17 @@ def transfer_data_to_nersc(
nersc832: GlobusEndpoint,
):
logger = get_run_logger()

# if source_file begins with "/", it will mess up os.path.join
if file_path[0] == "/":
file_path = file_path[1:]

# Initialize config
config = Config832()

# Import here to avoid circular imports
from orchestration.transfer_controller import get_transfer_controller, CopyMethod

# Change prometheus_metrics=None if do not want to push metrics
# prometheus_metrics = None
prometheus_metrics = PrometheusMetrics()
Expand All @@ -73,7 +74,7 @@ def transfer_data_to_nersc(
config=config,
prometheus_metrics=prometheus_metrics
)

# Use transfer controller to copy the file
# The controller automatically handles metrics collection and pushing
logger.info(f"Transferring {file_path} from data832 to nersc")
Expand All @@ -85,11 +86,29 @@ def transfer_data_to_nersc(

return success


@flow(name="new_832_file_flow")
def process_new_832_file(file_path: str,
is_export_control=False,
send_to_nersc=True,
config=None):
def process_new_832_file_flow(
file_path: str,
is_export_control=False,
send_to_nersc=True,
config=None
):
process_new_832_file_task(
file_path=file_path,
is_export_control=is_export_control,
send_to_nersc=send_to_nersc,
config=config
)


@task(name="new_832_file_task")
def process_new_832_file_task(
file_path: str,
is_export_control=False,
send_to_nersc=True,
config=None
):
"""
Sends a file along a path:
- Copy from spot832 to data832
Expand Down