diff --git a/Dockerfile b/Dockerfile index d409f98d..3034b7dd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM prefecthq/prefect:2.20.17-python3.11 +FROM prefecthq/prefect:3.4.2-python3.11 WORKDIR /app COPY ./requirements.txt /tmp/ diff --git a/create_deployment_832_dispatcher.sh b/create_deployment_832_dispatcher.sh deleted file mode 100755 index afaae432..00000000 --- a/create_deployment_832_dispatcher.sh +++ /dev/null @@ -1,6 +0,0 @@ -export $(grep -v '^#' .env | xargs) - - -prefect work-pool create 'dispatcher_pool' -prefect deployment build ./orchestration/flows/bl832/dispatcher.py:dispatcher -n run_832_dispatcher -q bl832 -p dispatcher_pool -prefect deployment apply dispatcher-deployment.yaml \ No newline at end of file diff --git a/create_deployments_832.sh b/create_deployments_832.sh deleted file mode 100644 index 0c9bbd3e..00000000 --- a/create_deployments_832.sh +++ /dev/null @@ -1,38 +0,0 @@ -export $(grep -v '^#' .env | xargs) - - -# Create work pools. If a work pool already exists, it will throw a warning but that's no problem -prefect work-pool create 'new_file_832_flow_pool' -prefect work-pool create 'new_file_832_prune_pool' -prefect work-pool create 'scicat_ingest_pool' - -# new_file_832_flow_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "new_file_832_flow_pool" - -# file transfer from data832 to nersc832 with Grafana Monitoring enabled -prefect deployment build ./orchestration/flows/bl832/move.py:test_transfers_832_grafana -n test_transfers_832_grafana -p new_file_832_flow_pool -q test_transfers_832_queue -prefect deployment apply test_transfers_832_grafana-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/move.py:process_new_832_file -n new_file_832 -p new_file_832_flow_pool -q new_file_832_queue -prefect deployment apply process_new_832_file-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/move.py:test_transfers_832 -n test_transfers_832 -p new_file_832_flow_pool -q test_transfers_832_queue -prefect deployment apply test_transfers_832-deployment.yaml - -# new_file_832_prune_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "new_file_832_prune_pool" - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_spot832 -n prune_spot832 -p new_file_832_prune_pool -q prune_spot832_queue -prefect deployment apply prune_spot832-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_data832 -n prune_data832 -p new_file_832_prune_pool -q prune_data832_queue -prefect deployment apply prune_data832-deployment.yaml - -# scicat_ingest_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "scicat_ingest_pool" - -prefect deployment build ./orchestration/flows/scicat/ingest.py:ingest_dataset -n ingest_dataset -p scicat_ingest_pool -q ingest_dataset_queue -prefect deployment apply ingest_dataset-deployment.yaml \ No newline at end of file diff --git a/create_deployments_832_alcf.sh b/create_deployments_832_alcf.sh deleted file mode 100755 index 815ca3a3..00000000 --- a/create_deployments_832_alcf.sh +++ /dev/null @@ -1,35 +0,0 @@ -export $(grep -v '^#' .env | xargs) - - -# create 'alfc_flow_pool' -prefect work-pool create 'alcf_flow_pool' -# create 'aclf_prune_pool' -prefect work-pool create 'alcf_prune_pool' - -# alcf_flow_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "alcf_flow_pool" -prefect deployment build ./orchestration/flows/bl832/alcf.py:alcf_recon_flow -n alcf_recon_flow -p alcf_flow_pool -q alcf_recon_flow_queue -prefect deployment apply alcf_recon_flow-deployment.yaml - -# alcf_prune_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "alcf_prune_pool" -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_alcf832_raw -n prune_alcf832_raw -p alcf_prune_pool -q prune_alcf832_raw_queue -prefect deployment apply prune_alcf832_raw-deployment.yaml - - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_alcf832_scratch -n prune_alcf832_scratch -p alcf_prune_pool -q prune_alcf832_scratch_queue -prefect deployment apply prune_alcf832_scratch-deployment.yaml - - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_data832_raw -n prune_data832_raw -p alcf_prune_pool -q prune_data832_raw_queue -prefect deployment apply prune_data832_raw-deployment.yaml - - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_data832_scratch -n prune_data832_scratch -q bl832 -p alcf_prune_pool -q prune_data832_scratch_queue -prefect deployment apply prune_data832_scratch-deployment.yaml - - -# prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_scratch -n prune_nersc832_alsdev_scratch -q bl832 -p alcf_prune_pool -q prune_nersc832_alsdev_scratch_queue -# prefect deployment apply prune_nersc832_alsdev_scratch-deployment.yaml \ No newline at end of file diff --git a/create_deployments_832_nersc.sh b/create_deployments_832_nersc.sh deleted file mode 100755 index 45731f1b..00000000 --- a/create_deployments_832_nersc.sh +++ /dev/null @@ -1,26 +0,0 @@ -export $(grep -v '^#' .env | xargs) - -# create 'nersc_flow_pool' -prefect work-pool create 'nersc_flow_pool' -prefect work-pool create 'nersc_prune_pool' - -# nersc_flow_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "nersc_flow_pool" -prefect deployment build ./orchestration/flows/bl832/nersc.py:nersc_recon_flow -n nersc_recon_flow -p nersc_flow_pool -q nersc_recon_flow_queue -prefect deployment apply nersc_recon_flow-deployment.yaml - -# nersc_prune_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "nersc_prune_pool" -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw -n prune_nersc832_alsdev_pscratch_raw -p nersc_prune_pool -q prune_nersc832_pscratch_queue -prefect deployment apply prune_nersc832_alsdev_pscratch_raw-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch -n prune_nersc832_alsdev_pscratch_scratch -p nersc_prune_pool -q prune_nersc832_pscratch_queue -prefect deployment apply prune_nersc832_alsdev_pscratch_scratch-deployment.yaml - -# nersc_streaming_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "nersc_streaming_pool" -prefect deployment build ./orchestration/flows/bl832/nersc.py:nersc_streaming_flow -n nersc_streaming_flow -p nersc_streaming_pool -q nersc_832_streaming_flow_queue -prefect deployment apply nersc_streaming_flow-deployment.yaml \ No newline at end of file diff --git a/init_work_pools.py b/init_work_pools.py new file mode 100644 index 00000000..b4ceb255 --- /dev/null +++ b/init_work_pools.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +""" +init_work_pools.py +Description: + Initializes Prefect work pools and deployments for the beamline defined by the BEAMLINE environment variable. + Uses orchestration/flows/bl"$BEAMLINE"/prefect.yaml as the single source of truth. +Requirements: + - BEAMLINE must be set (e.g., 832). + - A prefect.yaml file must exist in orchestration/flows/bl"$BEAMLINE"/. + - Prefect CLI must be installed and available in PATH. +Behavior: + - Waits until the Prefect server is reachable via its /health endpoint. + - Creates any missing work pools defined in the beamline's prefect.yaml. + - Deploys all flows defined in the beamline's prefect.yaml. + - Creates/updates Prefect Secret blocks for GLOBUS_CLIENT_ID and GLOBUS_CLIENT_SECRET + if the corresponding environment variables are present. Otherwise warns and continues. +Environment Variables: + BEAMLINE The beamline identifier (e.g., 832). Required. + PREFECT_API_URL Override the Prefect server API URL. + Default: http://prefect_server:4200/api +""" + +import httpx +import logging +import os +import subprocess +import sys +import time +import yaml + +from prefect.blocks.system import Secret + + +# ---------------- Logging Setup ---------------- # +logger = logging.getLogger("init_work_pools") +handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter( + fmt="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +handler.setFormatter(formatter) +logger.addHandler(handler) +logger.setLevel(logging.INFO) +# ------------------------------------------------ # + + +def check_env() -> tuple[str, str, str]: + """Validate required environment variables and paths.""" + beamline = os.environ.get("BEAMLINE") + if not beamline: + logger.error("Must set BEAMLINE (e.g., 832, 733)") + sys.exit(1) + + prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml" + if not os.path.isfile(prefect_yaml): + logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!") + sys.exit(1) + + api_url = os.environ.get("PREFECT_API_URL", "http://prefect_server:4200/api") + return beamline, prefect_yaml, api_url + + +def wait_for_prefect_server(api_url: str, beamline: str, timeout: int = 180): + """Wait until Prefect server health endpoint responds (unless using Prefect Cloud).""" + if "api.prefect.cloud" in api_url: + logger.info(f"[Init:{beamline}] Prefect Cloud detected — skipping health check.") + return + + health_url = f"{api_url}/health" + logger.info(f"[Init:{beamline}] Waiting for Prefect server at {health_url}...") + + start = time.time() + while time.time() - start < timeout: + try: + r = httpx.get(health_url, timeout=2.0) + if r.status_code == 200: + logger.info(f"[Init:{beamline}] Prefect server is up.") + return + except Exception: + pass + logger.warning(f"[Init:{beamline}] Still waiting...") + time.sleep(3) + + logger.error(f"[Init:{beamline}] Prefect server did not become ready in time.") + sys.exit(1) + + +def ensure_work_pools(prefect_yaml: str, beamline: str): + """Ensure that all work pools from prefect.yaml exist (create if missing).""" + with open(prefect_yaml, "r") as f: + config = yaml.safe_load(f) + + pools = {d["work_pool"]["name"] for d in config.get("deployments", []) if "work_pool" in d} + + for pool in pools: + logger.info(f"[Init:{beamline}] Ensuring pool: {pool}") + try: + subprocess.run( + ["prefect", "work-pool", "inspect", pool], + check=True, + capture_output=True, + ) + logger.info(f"[Init:{beamline}] Work pool '{pool}' already exists.") + except subprocess.CalledProcessError: + logger.info(f"[Init:{beamline}] Creating work pool: {pool}") + subprocess.run( + ["prefect", "work-pool", "create", pool, "--type", "process"], + check=True, + ) + + +def deploy_flows(prefect_yaml: str, beamline: str): + """Deploy flows defined in prefect.yaml using Prefect CLI.""" + logger.info(f"[Init:{beamline}] Deploying flows from {prefect_yaml}...") + subprocess.run( + ["prefect", "--no-prompt", "deploy", "--prefect-file", prefect_yaml, "--all"], + check=True, + ) + logger.info(f"[Init:{beamline}] Done.") + + +def ensure_globus_secrets(beamline: str): + globus_client_id = os.environ.get("GLOBUS_CLIENT_ID") + globus_client_secret = os.environ.get("GLOBUS_CLIENT_SECRET") + + if globus_client_id and globus_client_secret: + # Create or update Prefect Secret blocks for Globus credentials + try: + Secret(value=globus_client_id).save(name="globus-client-id", overwrite=True) + Secret(value=globus_client_secret).save(name="globus-client-secret", overwrite=True) + logger.info(f"[Init:{beamline}] Created/updated Prefect Secret blocks for Globus credentials.") + except Exception as e: + logger.warning(f"[Init:{beamline}] Failed to create/update Prefect Secret blocks: {str(e)}") + + +def main(): + beamline, prefect_yaml, api_url = check_env() + logger.info(f"[Init:{beamline}] Using prefect.yaml at {prefect_yaml}") + wait_for_prefect_server(api_url, beamline) + ensure_globus_secrets(beamline) + ensure_work_pools(prefect_yaml, beamline) + deploy_flows(prefect_yaml, beamline) + + +if __name__ == "__main__": + main() diff --git a/init_work_pools.sh b/init_work_pools.sh new file mode 100644 index 00000000..bb7c425b --- /dev/null +++ b/init_work_pools.sh @@ -0,0 +1,120 @@ +#!/usr/bin/env bash +# +# init_work_pools.sh +# +# Description: +# Initializes Prefect work pools and deployments for the beamline defined by the BEAMLINE environment variable. +# Uses orchestration/flows/bl"$BEAMLINE"/prefect.yaml as the single source of truth. +# +# Requirements: +# - BEAMLINE must be set (e.g., 832). +# - A prefect.yaml file must exist in orchestration/flows/bl"$BEAMLINE"/. +# +# Behavior: +# - Waits until the Prefect server is reachable via its /health endpoint. +# - Creates any missing work pools defined in the beamline's prefect.yaml. +# - Deploys all flows defined in the beamline's prefect.yaml. +# +# Usage: +# ./init_work_pools.sh +# +# Environment Variables: +# BEAMLINE The beamline identifier (e.g., 832). Required. +# PREFECT_API_URL Override the Prefect server API URL. +# Default: http://prefect_server:4200/api +# +# Notes: +# - Intended to be run as a one-shot init container alongside a Prefect server. +# - Idempotent: re-running will not recreate pools that already exist. +# - Typical docker-compose integration: +# +# prefect_init: +# build: ./splash_flows +# container_name: prefect_init +# command: ["/bin/bash", "/splash_flows/init_work_pools.sh"] +# volumes: +# - ./splash_flows:/splash_flows:ro +# environment: +# - PREFECT_API_URL=http://prefect_server:4200/api +# - PREFECT_LOGGING_LEVEL=DEBUG +# - BEAMLINE=832 +# depends_on: +# - prefect_server +# networks: +# - prenet +# restart: "no" # run once, then stop + +set -euo pipefail + +BEAMLINE="${BEAMLINE:?Must set BEAMLINE (e.g. 832, 733)}" + +# Path to the Prefect project file +PREFECT_YAML="/splash_flows/orchestration/flows/bl${BEAMLINE}/prefect.yaml" + +if [[ ! -f "$PREFECT_YAML" ]]; then + echo "[Init:${BEAMLINE}] ERROR: Expected $PREFECT_YAML not found!" >&2 + exit 1 +fi + +# If PREFECT_API_URL is already defined in the container’s environment, it will use that value. +# If not, it falls back to the default http://prefect_server:4200/api. +: "${PREFECT_API_URL:=http://prefect_server:4200/api}" + +echo "[Init:${BEAMLINE}] Using prefect.yaml at $PREFECT_YAML" +echo "[Init:${BEAMLINE}] Waiting for Prefect server at $PREFECT_API_URL..." + +# Wait for Prefect server to be ready, querying the health endpoint +python3 - < won't do real Globus calls now mock_config = Config832() diff --git a/orchestration/_tests/test_sfapi_flow.py b/orchestration/_tests/test_sfapi_flow.py index a1809686..66203d19 100644 --- a/orchestration/_tests/test_sfapi_flow.py +++ b/orchestration/_tests/test_sfapi_flow.py @@ -21,9 +21,9 @@ def prefect_test_fixture(): """ with prefect_test_harness(): globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) yield diff --git a/orchestration/_tests/test_transfer_controller.py b/orchestration/_tests/test_transfer_controller.py index a1cae916..ec7e094d 100644 --- a/orchestration/_tests/test_transfer_controller.py +++ b/orchestration/_tests/test_transfer_controller.py @@ -12,7 +12,6 @@ from .test_globus import MockTransferClient - @pytest.fixture(autouse=True, scope="session") def prefect_test_fixture(): """ @@ -26,10 +25,10 @@ def prefect_test_fixture(): with prefect_test_harness(): # Create ephemeral secrets in the local Prefect test database globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) yield @@ -191,7 +190,9 @@ def test_globus_transfer_controller_copy_failure( mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecretClass()) - with patch("orchestration.transfer_controller.start_transfer", return_value=(False, "mock-task-id")) as mock_start_transfer: + with patch("orchestration.transfer_controller.start_transfer", + return_value=(False, "mock-task-id") + ) as mock_start_transfer: controller = GlobusTransferController(mock_config832) result = controller.copy( file_path="some_dir/test_file.txt", @@ -226,6 +227,7 @@ def test_globus_transfer_controller_copy_exception( assert result is False, "Expected False when TransferAPIError is raised." mock_start_transfer.assert_called_once() + def test_globus_transfer_controller_with_metrics( mock_config832, mock_globus_endpoint, transfer_controller_module ): @@ -235,30 +237,30 @@ def test_globus_transfer_controller_with_metrics( GlobusTransferController = transfer_controller_module["GlobusTransferController"] from orchestration.prometheus_utils import PrometheusMetrics mock_prometheus = MagicMock(spec=PrometheusMetrics) - + with patch("orchestration.transfer_controller.start_transfer", return_value=(True, "mock-task-id")) as mock_start_transfer: # Create the controller with mock prometheus metrics controller = GlobusTransferController(mock_config832, prometheus_metrics=mock_prometheus) - + # Set up mock for get_transfer_file_info mock_transfer_info = {"bytes_transferred": 1024 * 1024} # 1MB controller.get_transfer_file_info = MagicMock(return_value=mock_transfer_info) - + # Execute the copy operation result = controller.copy( file_path="some_dir/test_file.txt", source=mock_globus_endpoint, destination=mock_globus_endpoint, ) - + # Verify transfer was successful assert result is True mock_start_transfer.assert_called_once() - + # Verify metrics were collected and pushed controller.get_transfer_file_info.assert_called_once_with("mock-task-id") mock_prometheus.push_metrics_to_prometheus.assert_called_once() - + # Verify the metrics data metrics_data = mock_prometheus.push_metrics_to_prometheus.call_args[0][0] assert metrics_data["bytes_transferred"] == 1024 * 1024 @@ -275,6 +277,7 @@ def test_globus_transfer_controller_with_metrics( # Tests for SimpleTransferController # -------------------------------------------------------------------------- + def test_simple_transfer_controller_no_file_path( mock_config832, mock_file_system_endpoint, transfer_controller_module ): diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 0b6bcb1c..4638d9c1 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -8,7 +8,8 @@ from globus_compute_sdk import Client, Executor from globus_compute_sdk.serialize import CombinedCode from prefect import flow, task -from prefect.blocks.system import JSON, Secret +from prefect.blocks.system import Secret +from prefect.variables import Variable from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController @@ -36,7 +37,7 @@ def __init__( super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" - allocation_data = JSON.load("alcf-allocation-root-path").value + allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) self.allocation_root = allocation_data.get("alcf-allocation-root-path") if not self.allocation_root: raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") @@ -314,7 +315,7 @@ def schedule_pruning( Returns: bool: True if the tasks were scheduled successfully, False otherwise. """ - pruning_config = JSON.load("pruning-config").value + pruning_config = Variable.get("pruning-config", _sync=True) if one_minute: alcf_delay = datetime.timedelta(minutes=1) @@ -347,7 +348,7 @@ def schedule_pruning( return True -@flow(name="alcf_recon_flow") +@flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") def alcf_recon_flow( file_path: str, config: Optional[Config832] = None, diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cd9da2f0..cf1d0c64 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -1,11 +1,11 @@ import asyncio -from prefect import flow, task, get_run_logger -from prefect.blocks.system import JSON -from prefect.deployments.deployments import run_deployment from pydantic import BaseModel, ValidationError, Field from typing import Any, Optional, Union from orchestration.flows.bl832.move import process_new_832_file_task +from prefect import flow, task, get_run_logger +from prefect.deployments import run_deployment +from prefect.variables import Variable class FlowParameterMapper: @@ -75,8 +75,7 @@ def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: b "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows - settings_json = JSON(value=settings) - settings_json.save(name="decision-settings", overwrite=True) + Variable.set(name="decision-settings", value=settings, overwrite=True, _sync=True) except Exception as e: logger.error(f"Failed to set up decision settings: {e}") raise @@ -122,10 +121,10 @@ async def dispatcher( raise # Run new_file_832 first (synchronously) - available_params = inputs.dict() + available_params = inputs.model_dump() try: - decision_settings = await JSON.load("decision-settings") - if decision_settings.value.get("new_832_file_flow/new_file_832"): + decision_settings = await Variable.get("decision-settings") + if decision_settings.get("new_832_file_flow/new_file_832"): logger.info("Running new_file_832 flow...") process_new_832_file_task( file_path=available_params.get("file_path"), @@ -142,11 +141,11 @@ async def dispatcher( # Prepare ALCF and NERSC flows to run asynchronously, based on settings tasks = [] - if decision_settings.value.get("alcf_recon_flow/alcf_recon_flow"): + if decision_settings.get("alcf_recon_flow/alcf_recon_flow"): alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) - if decision_settings.value.get("nersc_recon_flow/nersc_recon_flow"): + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index b547a5c7..9150c1d4 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -5,7 +5,7 @@ from globus_sdk import TransferClient from prefect import flow, task, get_run_logger -from prefect.blocks.system import JSON +from prefect.variables import Variable from orchestration.flows.scicat.ingest import ingest_dataset from orchestration.flows.bl832.config import Config832 @@ -52,6 +52,7 @@ def transfer_data_to_nersc( transfer_client: TransferClient, data832: GlobusEndpoint, nersc832: GlobusEndpoint, + config=None, ): logger = get_run_logger() @@ -60,14 +61,15 @@ def transfer_data_to_nersc( file_path = file_path[1:] # Initialize config - config = Config832() + if not config: + config = Config832() # Import here to avoid circular imports from orchestration.transfer_controller import get_transfer_controller, CopyMethod # Change prometheus_metrics=None if do not want to push metrics # prometheus_metrics = None - prometheus_metrics = PrometheusMetrics() + prometheus_metrics = PrometheusMetrics() # Get a Globus transfer controller transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, @@ -142,7 +144,7 @@ def process_new_832_file_task( if not is_export_control and send_to_nersc: transfer_data_to_nersc( - relative_path, config.tc, config.data832, config.nersc832 + relative_path, config.tc, config.data832, config.nersc832, config ) logger.info( f"File successfully transferred from data832 to NERSC {file_path}. Task {task}" @@ -161,7 +163,7 @@ def process_new_832_file_task( # datetime.timedelta(0.0), # ) - bl832_settings = JSON.load("bl832-settings").value + bl832_settings = Variable.get("bl832-settings", _sync=True) flow_name = f"delete spot832: {Path(file_path).name}" schedule_spot832_delete_days = bl832_settings["delete_spot832_files_after_days"] @@ -217,7 +219,7 @@ def test_transfers_832(file_path: str = "/raw/transfer_tests/test.txt"): ) logger.info(f"Transferred {spot832_path} to spot to data") - task = transfer_data_to_nersc(new_file, config.tc, config.data832, config.nersc832) + task = transfer_data_to_nersc(new_file, config.tc, config.data832, config.nersc832, config) logger.info( f"File successfully transferred from data832 to NERSC {spot832_path}. Task {task}" ) @@ -228,8 +230,17 @@ def test_transfers_832_grafana(file_path: str = "/raw/transfer_tests/test/"): logger = get_run_logger() config = Config832() - task = transfer_data_to_nersc(file_path, config.tc, config.data832, config.nersc_alsdev) + task = transfer_data_to_nersc(file_path, config.tc, config.data832, config.nersc_alsdev, config) logger.info( f"File successfully transferred from data832 to NERSC {file_path}. Task {task}" - ) \ No newline at end of file + ) + + +if __name__ == "__main__": + import os + import dotenv + + dotenv.load_dotenv() + + file_path = "/raw/transfer_tests/test.txt" diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index 05680ed0..c988ce87 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -9,7 +9,7 @@ from authlib.jose import JsonWebKey from prefect import flow, get_run_logger -from prefect.blocks.system import JSON +from prefect.variables import Variable from sfapi_client import Client from sfapi_client.compute import Machine from typing import Optional @@ -313,7 +313,7 @@ def schedule_pruning( # nersc/pscratch : 1 day # nersc832/scratch : never? - pruning_config = JSON.load("pruning-config").value + pruning_config = Variable.get("pruning-config", _sync=True) data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) nersc832_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) @@ -425,7 +425,7 @@ def schedule_pruning( logger.error(f"Failed to schedule prune task: {e}") -@flow(name="nersc_recon_flow") +@flow(name="nersc_recon_flow", flow_run_name="nersc_recon-{file_path}") def nersc_recon_flow( file_path: str, config: Optional[Config832] = None, diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml new file mode 100644 index 00000000..0af99461 --- /dev/null +++ b/orchestration/flows/bl832/prefect.yaml @@ -0,0 +1,105 @@ +# orchestration/flows/bl832/prefect.yaml + +name: bl832 +prefect-version: 3.4.2 + +deployments: +# Dispatcher flow -- launches new file processing flows and downstream reconstruction +- name: run_832_dispatcher + entrypoint: orchestration/flows/bl832/dispatcher.py:dispatcher + work_pool: + name: dispatcher_832_pool + work_queue_name: dispatcher_832_queue + +# Move new files to the appropriate locations (data832, nersc cfs) and ingest metadata in scicat +- name: new_file_832 + entrypoint: orchestration/flows/bl832/move.py:process_new_832_file + work_pool: + name: new_file_832_pool + work_queue_name: new_file_832_queue + +# Scheduled flow for testing file transfers +- name: test_transfers_832 + entrypoint: orchestration/flows/bl832/move.py:test_transfers_832 + work_pool: + name: new_file_832_pool + work_queue_name: test_transfers_832_queue + schedules: + - cron: "0 */12 * * *" # Every 12 hours + slug: "test-move-733-flight-check" + timezone: America/Los_Angeles + active: true + +- name: test_832_transfers_grafana + entrypoint: orchestration/flows/bl832/move.py:test_transfers_832_grafana + work_pool: + name: new_file_832_pool + work_queue_name: test_832_transfers_grafana_queue + +# Reconstuction flows +- name: nersc_recon_flow + entrypoint: orchestration/flows/bl832/nersc.py:nersc_recon_flow + work_pool: + name: nersc_recon_flow_pool + work_queue_name: nersc_recon_flow_queue + +- name: nersc_streaming_flow + entrypoint: orchestration/flows/bl832/nersc.py:nersc_streaming_flow + work_pool: + name: nersc_streaming_pool + work_queue_name: nersc_832_streaming_flow_queue + +- name: alcf_recon_flow + entrypoint: orchestration/flows/bl832/alcf.py:alcf_recon_flow + work_pool: + name: alcf_recon_flow_pool + work_queue_name: alcf_recon_flow_queue + +# Pruning flows +- name: prune_data832_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_data832_raw + work_pool: + name: prune_832_pool + work_queue_name: prune_data832_raw_queue + +- name: prune_data832_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_data832_scratch + work_pool: + name: prune_832_pool + work_queue_name: prune_data832_scratch_queue + +- name: prune_data832 + entrypoint: orchestration/flows/bl832/prune.py:prune_data832 + work_pool: + name: prune_832_pool + work_queue_name: prune_832_queue + +- name: prune_spot832 + entrypoint: orchestration/flows/bl832/prune.py:prune_spot832 + work_pool: + name: prune_832_pool + work_queue_name: prune_832_queue + +- name: prune_alcf832_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_raw + work_pool: + name: prune_832_pool + work_queue_name: prune_alcf832_raw_queue + +- name: prune_alcf832_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_scratch + work_pool: + name: prune_832_pool + work_queue_name: prune_alcf832_scratch_queue + +- name: prune_nersc832_alsdev_pscratch_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw + work_pool: + name: prune_832_pool + work_queue_name: prune_nersc832_pscratch_queue + +- name: prune_nersc832_alsdev_pscratch_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch + work_pool: + name: prune_832_pool + work_queue_name: prune_nersc832_pscratch_queue diff --git a/orchestration/transfer_controller.py b/orchestration/transfer_controller.py index 09e82649..7a71605a 100644 --- a/orchestration/transfer_controller.py +++ b/orchestration/transfer_controller.py @@ -10,7 +10,6 @@ import globus_sdk from orchestration.flows.bl832.config import Config832 -from orchestration.flows.bl832.job_controller import HPC from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prometheus_utils import PrometheusMetrics @@ -135,23 +134,23 @@ def get_transfer_file_info( ) -> Optional[dict]: """ Get information about a completed transfer from the Globus API. - + Args: task_id (str): The Globus transfer task ID transfer_client (TransferClient, optional): TransferClient instance - + Returns: Optional[dict]: Task information including bytes_transferred, or None if unavailable """ if transfer_client is None: transfer_client = self.config.tc - + try: task_info = transfer_client.get_task(task_id) task_dict = task_info.data if task_dict.get('status') == 'SUCCEEDED': - bytes_transferred = task_dict.get('bytes_transferred', 0) + bytes_transferred = task_dict.get('bytes_transferred', 0) bytes_checksummed = task_dict.get('bytes_checksummed', 0) files_transferred = task_dict.get('files_transferred', 0) effective_bytes_per_second = task_dict.get('effective_bytes_per_second', 0) @@ -161,13 +160,13 @@ def get_transfer_file_info( 'files_transferred': files_transferred, 'effective_bytes_per_second': effective_bytes_per_second } - + return None - + except Exception as e: logger.error(f"Error getting transfer task info: {e}") return None - + def collect_and_push_metrics( self, start_time: float, @@ -181,7 +180,7 @@ def collect_and_push_metrics( ) -> None: """ Collect transfer metrics and push them to Prometheus. - + Args: start_time (float): Transfer start time as UNIX timestamp. end_time (float): Transfer end time as UNIX timestamp. @@ -201,10 +200,10 @@ def collect_and_push_metrics( end_datetime = datetime.datetime.fromtimestamp(end_time, tz=datetime.timezone.utc) start_timestamp = start_datetime.isoformat() end_timestamp = end_datetime.isoformat() - + # Calculate duration in seconds duration_seconds = end_time - start_time - + # Calculate transfer speed (bytes per second) # transfer_speed = file_size / duration_seconds if duration_seconds > 0 and file_size > 0 else 0 @@ -220,13 +219,13 @@ def collect_and_push_metrics( "status": "success" if success else "failed", "machine": machine_name } - + # Push metrics to Prometheus self.prometheus_metrics.push_metrics_to_prometheus(metrics, logger) - + except Exception as e: logger.error(f"Error collecting or pushing metrics: {e}") - + def copy( self, file_path: str = None, @@ -244,11 +243,11 @@ def copy( Returns: bool: True if the transfer was successful, False otherwise. """ - + if not file_path: logger.error("No file_path provided") return False - + if not source or not destination: logger.error("Source or destination endpoint not provided") return False @@ -268,7 +267,7 @@ def copy( success = False task_id = None # Initialize task_id here to prevent UnboundLocalError file_size = 0 # Initialize file_size here as well - + try: success, task_id = start_transfer( transfer_client=self.config.tc, @@ -284,10 +283,10 @@ def copy( logger.info("Transfer completed successfully.") else: logger.error("Transfer failed.") - + except globus_sdk.services.transfer.errors.TransferAPIError as e: logger.error(f"Failed to submit transfer: {e}") - + finally: # Stop the timer and calculate the duration transfer_end_time = time.time() @@ -300,7 +299,7 @@ def copy( transfer_speed = transfer_info.get('effective_bytes_per_second', 0) logger.info(f"Globus Task Info: Transferred {file_size} bytes ") logger.info(f"Globus Task Info: Effective speed: {transfer_speed} bytes/second") - + # Collect and push metrics if enabled if self.prometheus_metrics and file_size > 0: self.collect_and_push_metrics( @@ -313,7 +312,7 @@ def copy( transfer_speed=transfer_speed, success=success, ) - + return success @@ -432,4 +431,4 @@ def get_transfer_controller( if success: logger.info("Simple transfer succeeded.") else: - logger.error("Simple transfer failed.") \ No newline at end of file + logger.error("Simple transfer failed.") diff --git a/requirements.txt b/requirements.txt index 24b8e9e4..6552c650 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ numpy>=1.26.4 pillow pydantic==2.11 python-dotenv -prefect==2.20.17 +prefect==3.4.2 pyscicat pyyaml authlib diff --git a/scripts/create_832_variable_blocks.py b/scripts/create_832_variable_blocks.py new file mode 100644 index 00000000..3cf0dcd6 --- /dev/null +++ b/scripts/create_832_variable_blocks.py @@ -0,0 +1,32 @@ +from prefect.variables import Variable +import asyncio + + +async def create_variables(): + variables = { + "alcf-allocation-root-path": {"alcf-allocation-root-path": "/eagle/IRIBeta/als"}, + "bl832-settings": { + "delete_spot832_files_after_days": 1, + "delete_data832_files_after_days": 35 + }, + "globus-settings": {"max_wait_seconds": 600}, + "pruning-config": { + "max_wait_seconds": 120, + "delete_alcf832_files_after_days": 1, + "delete_data832_files_after_days": 14, + "delete_nersc832_files_after_days": 7 + }, + "decision-settings": { + "nersc_recon_flow/nersc_recon_flow": True, + "new_832_file_flow/new_file_832": True, + "alcf_recon_flow/alcf_recon_flow": False + } + } + + for name, value in variables.items(): + await Variable.set(name=name, value=value, overwrite=True) + print(f"Created variable: {name}") + + +if __name__ == "__main__": + asyncio.run(create_variables())