From 1bdbe2c8dc91e4767abf212560e393c1ecfefca1 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 15 Aug 2025 15:23:02 -0700 Subject: [PATCH 01/16] Prefect 3 upgrade: flows, tests, dispatcher/BL832 updates, requirements, and Prefect YAML --- init_832_work_pools.sh | 19 ++++ orchestration/_tests/test_globus_flow.py | 20 ++-- orchestration/_tests/test_sfapi_flow.py | 4 +- .../_tests/test_transfer_controller.py | 23 +++-- orchestration/flows/bl832/alcf.py | 2 +- orchestration/flows/bl832/dispatcher.py | 6 +- orchestration/flows/bl832/move.py | 12 ++- orchestration/flows/bl832/nersc.py | 2 +- orchestration/flows/bl832/prefect.yaml | 97 +++++++++++++++++++ requirements.txt | 2 +- 10 files changed, 160 insertions(+), 27 deletions(-) create mode 100644 init_832_work_pools.sh create mode 100644 orchestration/flows/bl832/prefect.yaml diff --git a/init_832_work_pools.sh b/init_832_work_pools.sh new file mode 100644 index 00000000..e95f2836 --- /dev/null +++ b/init_832_work_pools.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -euo pipefail +: "${PREFECT_API_URL:=http://localhost:4200/api}" + +mk_pool () { + local pool="$1" + prefect work-pool inspect "$pool" >/dev/null 2>&1 || \ + prefect work-pool create "$pool" --type process +} + +mk_pool dispatcher_832_pool +mk_pool new_file_832_pool +mk_pool prune_832_pool +mk_pool alcf_recon_flow_pool +mk_pool nersc_recon_flow_pool +mk_pool nersc_streaming_pool + +# Create deployments; queues are optional and will be auto-created if named +prefect --no-prompt deploy --prefect-file orchestration/flows/bl832/prefect.yaml --all \ No newline at end of file diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index e0823b27..7bfc0af4 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -37,8 +37,8 @@ def prefect_test_fixture(): decision_settings = JSON(value={ "alcf_recon_flow/alcf_recon_flow": True, - "nersc_recon_flow/nersc_recon_flow": False, - "new_832_file_flow/new_file_832": False + "nersc_recon_flow/nersc_recon_flow": True, + "new_832_file_flow/new_file_832": True }) decision_settings.save(name="decision-settings") @@ -134,14 +134,13 @@ class MockSecret: mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) - # Import decision flow after mocking the necessary components - from orchestration.flows.bl832.dispatcher import dispatcher - # Mock read_deployment_by_name with a manually defined mock class class MockDeployment: - version = "1.0.0" - flow_id = str(uuid4()) - name = "test_deployment" + def __init__(self): + self.id = str(uuid4()) # Add this line + self.version = "1.0.0" + self.flow_id = str(uuid4()) + self.name = "test_deployment" mocker.patch('prefect.client.orchestration.PrefectClient.read_deployment_by_name', return_value=MockDeployment()) @@ -150,7 +149,7 @@ class MockDeployment: async def mock_run_deployment(*args, **kwargs): return None - mocker.patch('prefect.deployments.deployments.run_deployment', new=mock_run_deployment) + mocker.patch('prefect.deployments.run_deployment', new=mock_run_deployment) # Mock asyncio.gather to avoid actual async task execution async def mock_gather(*args, **kwargs): @@ -158,6 +157,9 @@ async def mock_gather(*args, **kwargs): mocker.patch('asyncio.gather', new=mock_gather) + # Import decision flow after mocking the necessary components + from orchestration.flows.bl832.dispatcher import dispatcher + # Run the decision flow result = asyncio.run(dispatcher( file_path="/global/raw/transfer_tests/test.txt", diff --git a/orchestration/_tests/test_sfapi_flow.py b/orchestration/_tests/test_sfapi_flow.py index a1809686..66203d19 100644 --- a/orchestration/_tests/test_sfapi_flow.py +++ b/orchestration/_tests/test_sfapi_flow.py @@ -21,9 +21,9 @@ def prefect_test_fixture(): """ with prefect_test_harness(): globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) yield diff --git a/orchestration/_tests/test_transfer_controller.py b/orchestration/_tests/test_transfer_controller.py index a1cae916..ec7e094d 100644 --- a/orchestration/_tests/test_transfer_controller.py +++ b/orchestration/_tests/test_transfer_controller.py @@ -12,7 +12,6 @@ from .test_globus import MockTransferClient - @pytest.fixture(autouse=True, scope="session") def prefect_test_fixture(): """ @@ -26,10 +25,10 @@ def prefect_test_fixture(): with prefect_test_harness(): # Create ephemeral secrets in the local Prefect test database globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) yield @@ -191,7 +190,9 @@ def test_globus_transfer_controller_copy_failure( mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecretClass()) - with patch("orchestration.transfer_controller.start_transfer", return_value=(False, "mock-task-id")) as mock_start_transfer: + with patch("orchestration.transfer_controller.start_transfer", + return_value=(False, "mock-task-id") + ) as mock_start_transfer: controller = GlobusTransferController(mock_config832) result = controller.copy( file_path="some_dir/test_file.txt", @@ -226,6 +227,7 @@ def test_globus_transfer_controller_copy_exception( assert result is False, "Expected False when TransferAPIError is raised." mock_start_transfer.assert_called_once() + def test_globus_transfer_controller_with_metrics( mock_config832, mock_globus_endpoint, transfer_controller_module ): @@ -235,30 +237,30 @@ def test_globus_transfer_controller_with_metrics( GlobusTransferController = transfer_controller_module["GlobusTransferController"] from orchestration.prometheus_utils import PrometheusMetrics mock_prometheus = MagicMock(spec=PrometheusMetrics) - + with patch("orchestration.transfer_controller.start_transfer", return_value=(True, "mock-task-id")) as mock_start_transfer: # Create the controller with mock prometheus metrics controller = GlobusTransferController(mock_config832, prometheus_metrics=mock_prometheus) - + # Set up mock for get_transfer_file_info mock_transfer_info = {"bytes_transferred": 1024 * 1024} # 1MB controller.get_transfer_file_info = MagicMock(return_value=mock_transfer_info) - + # Execute the copy operation result = controller.copy( file_path="some_dir/test_file.txt", source=mock_globus_endpoint, destination=mock_globus_endpoint, ) - + # Verify transfer was successful assert result is True mock_start_transfer.assert_called_once() - + # Verify metrics were collected and pushed controller.get_transfer_file_info.assert_called_once_with("mock-task-id") mock_prometheus.push_metrics_to_prometheus.assert_called_once() - + # Verify the metrics data metrics_data = mock_prometheus.push_metrics_to_prometheus.call_args[0][0] assert metrics_data["bytes_transferred"] == 1024 * 1024 @@ -275,6 +277,7 @@ def test_globus_transfer_controller_with_metrics( # Tests for SimpleTransferController # -------------------------------------------------------------------------- + def test_simple_transfer_controller_no_file_path( mock_config832, mock_file_system_endpoint, transfer_controller_module ): diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 0b6bcb1c..3148a020 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -347,7 +347,7 @@ def schedule_pruning( return True -@flow(name="alcf_recon_flow") +@flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{{ file_path | basename }}") def alcf_recon_flow( file_path: str, config: Optional[Config832] = None, diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cd9da2f0..bbb32245 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -1,7 +1,7 @@ import asyncio from prefect import flow, task, get_run_logger from prefect.blocks.system import JSON -from prefect.deployments.deployments import run_deployment +from prefect.deployments import run_deployment from pydantic import BaseModel, ValidationError, Field from typing import Any, Optional, Union @@ -101,7 +101,11 @@ async def run_recon_flow_async(flow_name: str, parameters: dict) -> None: raise +<<<<<<< HEAD @flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +======= +@flow(name="dispatcher", flow_run_name="dispatch_flows-{{ file_path | basename }}") +>>>>>>> af01bc5 (Prefect 3 upgrade: flows, tests, dispatcher/BL832 updates, requirements, and Prefect YAML) async def dispatcher( file_path: Optional[str] = None, is_export_control: bool = False, diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index b547a5c7..79dee612 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -67,7 +67,7 @@ def transfer_data_to_nersc( # Change prometheus_metrics=None if do not want to push metrics # prometheus_metrics = None - prometheus_metrics = PrometheusMetrics() + prometheus_metrics = PrometheusMetrics() # Get a Globus transfer controller transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, @@ -87,6 +87,7 @@ def transfer_data_to_nersc( return success +<<<<<<< HEAD @flow(name="new_832_file_flow") def process_new_832_file_flow( file_path: str, @@ -109,6 +110,13 @@ def process_new_832_file_task( send_to_nersc=True, config=None ): +======= +@flow(name="new_832_file_flow", flow_run_name="process_new-{{ file_path | basename }}") +def process_new_832_file(file_path: str, + is_export_control=False, + send_to_nersc=True, + config=None): +>>>>>>> af01bc5 (Prefect 3 upgrade: flows, tests, dispatcher/BL832 updates, requirements, and Prefect YAML) """ Sends a file along a path: - Copy from spot832 to data832 @@ -232,4 +240,4 @@ def test_transfers_832_grafana(file_path: str = "/raw/transfer_tests/test/"): logger.info( f"File successfully transferred from data832 to NERSC {file_path}. Task {task}" - ) \ No newline at end of file + ) diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index 05680ed0..b944fd5c 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -425,7 +425,7 @@ def schedule_pruning( logger.error(f"Failed to schedule prune task: {e}") -@flow(name="nersc_recon_flow") +@flow(name="nersc_recon_flow", flow_run_name="nersc_recon-{{ file_path | basename }}") def nersc_recon_flow( file_path: str, config: Optional[Config832] = None, diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml new file mode 100644 index 00000000..73ad3774 --- /dev/null +++ b/orchestration/flows/bl832/prefect.yaml @@ -0,0 +1,97 @@ +name: bl832 +prefect-version: 3.4.2 +# Ensure workers clone code and run from the repo root so imports like +pull: +- prefect.deployments.steps.git_clone: + repository: https://github.com/als-computing/splash_flows.git + branch: main + +deployments: +# Dispatcher flow -- launches new file processing flows and downstream reconstruction +- name: run_832_dispatcher + entrypoint: orchestration/flows/bl832/dispatcher.py:dispatcher + work_pool: + name: dispatcher_832_pool + work_queue_name: dispatcher_832_queue + +# Move new files to the appropriate locations (data832, nersc cfs) and ingest metadata in scicat +- name: new_file_832 + entrypoint: orchestration/flows/bl832/move.py:process_new_832_file + work_pool: + name: new_file_832_pool + work_queue_name: new_file_832_queue + +# Scheduled flow for testing file transfers +- name: test_transfers_832 + entrypoint: orchestration/flows/bl832/move.py:test_transfers_832 + work_pool: + name: new_file_832_pool + work_queue_name: test_transfers_832_queue + +# Reconstuction flows +- name: nersc_recon_flow + entrypoint: orchestration/flows/bl832/nersc.py:nersc_recon_flow + work_pool: + name: nersc_recon_flow_pool + work_queue_name: nersc_recon_flow_queue + +- name: nersc_streaming_flow + entrypoint: orchestration/flows/bl832/nersc.py:nersc_streaming_flow + work_pool: + name: nersc_streaming_pool + work_queue_name: nersc_832_streaming_flow_queue + +- name: alcf_recon_flow + entrypoint: orchestration/flows/bl832/alcf.py:alcf_recon_flow + work_pool: + name: alcf_recon_flow_pool + work_queue_name: alcf_recon_flow_queue + +# Pruning flows +- name: prune_data832_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_data832_raw + work_pool: + name: prune_832_pool + work_queue_name: prune_data832_raw_queue + +- name: prune_data832_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_data832_scratch + work_pool: + name: prune_832_pool + work_queue_name: prune_data832_scratch_queue + +- name: prune_data832 + entrypoint: orchestration/flows/bl832/prune.py:prune_data832 + work_pool: + name: prune_832_pool + work_queue_name: prune_832_queue + +- name: prune_spot832 + entrypoint: orchestration/flows/bl832/prune.py:prune_spot832 + work_pool: + name: prune_832_pool + work_queue_name: prune_832_queue + +- name: prune_alcf832_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_raw + work_pool: + name: prune_832_pool + work_queue_name: prune_alcf832_raw_queue + +- name: prune_alcf832_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_scratch + work_pool: + name: prune_832_pool + work_queue_name: prune_alcf832_scratch_queue + +- name: prune_nersc832_alsdev_pscratch_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw + work_pool: + name: prune_832_pool + work_queue_name: prune_nersc832_pscratch_queue + +- name: prune_nersc832_alsdev_pscratch_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch + work_pool: + name: prune_832_pool + work_queue_name: prune_nersc832_pscratch_queue diff --git a/requirements.txt b/requirements.txt index 24b8e9e4..6552c650 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ numpy>=1.26.4 pillow pydantic==2.11 python-dotenv -prefect==2.20.17 +prefect==3.4.2 pyscicat pyyaml authlib From c2623ea442f5598cd760ce8381d1beda5e8d9a75 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 18 Aug 2025 12:29:23 -0700 Subject: [PATCH 02/16] =?UTF-8?q?updating=20init=5F832=5Fwork=5Fpools.sh?= =?UTF-8?q?=20to=20read=20prefect.yaml=20as=20the=20ground=20truth=20for?= =?UTF-8?q?=20work=20pool=20and=20deployment=20names.=20waits=20for=20pref?= =?UTF-8?q?ect=20server=20health=20check=20before=20checking/creating=20wo?= =?UTF-8?q?rk=20pools=20and=20deploying=20flows.=20can=20be=20configured?= =?UTF-8?q?=20to=20run=20in=20a=20docker=20container,=20so=20we=20should?= =?UTF-8?q?=20no=20longer=20need=20to=20exec=20into=20worker=20containers?= =?UTF-8?q?=20to=20redeploy=20flows=20=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- init_832_work_pools.sh | 118 +++++++++++++++++++++++++++++++++++------ 1 file changed, 101 insertions(+), 17 deletions(-) diff --git a/init_832_work_pools.sh b/init_832_work_pools.sh index e95f2836..851a88c3 100644 --- a/init_832_work_pools.sh +++ b/init_832_work_pools.sh @@ -1,19 +1,103 @@ #!/usr/bin/env bash +# +# init_832_work_pools.sh +# +# Description: +# Initializes Prefect work pools and deployments for beamline 832 using orchestration/flows/bl832/prefect.yaml as the source of truth. +# This script: +# - Waits for Prefect server to become ready. +# - Creates any missing work pools defined in the prefect.yaml configuration. +# - Deploys flows using the prefect.yaml configuration. +# +# Usage: +# ./init_832_work_pools.sh +# +# Environment Variables: +# PREFECT_API_URL Override the API URL for the Prefect server. +# Default: http://prefect_server:4200/api +# +# Notes: +# - Intended to be run as a one-shot init container alongside Prefect server. +# - Idempotent: re-running will not recreate pools that already exist. +# - Can be included in an docker-compose setup, ex: +# prefect_init: +# build: ./splash_flows +# container_name: prefect_init +# command: ["/bin/bash", "/splash_flows/init_832_work_pools.sh"] +# volumes: +# - ./splash_flows:/splash_flows:ro +# environment: +# - PREFECT_API_URL=http://prefect_server:4200/api +# - PREFECT_LOGGING_LEVEL=DEBUG +# depends_on: +# - prefect_server +# networks: +# - prenet +# restart: "no" # run once, then stop + + set -euo pipefail -: "${PREFECT_API_URL:=http://localhost:4200/api}" - -mk_pool () { - local pool="$1" - prefect work-pool inspect "$pool" >/dev/null 2>&1 || \ - prefect work-pool create "$pool" --type process -} - -mk_pool dispatcher_832_pool -mk_pool new_file_832_pool -mk_pool prune_832_pool -mk_pool alcf_recon_flow_pool -mk_pool nersc_recon_flow_pool -mk_pool nersc_streaming_pool - -# Create deployments; queues are optional and will be auto-created if named -prefect --no-prompt deploy --prefect-file orchestration/flows/bl832/prefect.yaml --all \ No newline at end of file + +# Path to the Prefect project file +prefect_file="/splash_flows/orchestration/flows/bl832/prefect.yaml" + +# If PREFECT_API_URL is already defined in the container’s environment, it will use that value. +# If not, it falls back to the default http://prefect_server:4200/api. +: "${PREFECT_API_URL:=http://prefect_server:4200/api}" + +echo "[Init] Waiting for Prefect server at $PREFECT_API_URL..." + +# Wait for Prefect server to be ready, querying the health endpoint +python3 - <<'EOF' +import os, time, sys +import httpx + +api_url = os.environ.get("PREFECT_API_URL", "http://prefect_server:4200/api") +health_url = f"{api_url}/health" + +for _ in range(60): # try for ~3 minutes + try: + r = httpx.get(health_url, timeout=2.0) + if r.status_code == 200: + print("[Init] Prefect server is up.") + sys.exit(0) + except Exception: + pass + print("[Init] Still waiting...") + time.sleep(3) + +print("[Init] ERROR: Prefect server did not become ready in time.", file=sys.stderr) +sys.exit(1) +EOF + +echo "[Init] Creating work pools defined in $prefect_file..." + +python3 - < Date: Mon, 18 Aug 2025 12:49:33 -0700 Subject: [PATCH 03/16] making the init_work_pools.sh applicable across beamlines. now, we can set an env variable BEAMLINE=832. in the script, it will look for a prefect.yaml file in orchestration/flows/bl/prefect.yaml, which it will use as the ground truth to build the work pools and deploy flows. this way we only need to support one init script that can work across beamlines. --- init_832_work_pools.sh | 97 +++++++++++++++++++-------------- init_work_pools.sh | 120 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 40 deletions(-) create mode 100644 init_work_pools.sh diff --git a/init_832_work_pools.sh b/init_832_work_pools.sh index 851a88c3..bb7c425b 100644 --- a/init_832_work_pools.sh +++ b/init_832_work_pools.sh @@ -1,81 +1,99 @@ #!/usr/bin/env bash # -# init_832_work_pools.sh +# init_work_pools.sh # # Description: -# Initializes Prefect work pools and deployments for beamline 832 using orchestration/flows/bl832/prefect.yaml as the source of truth. -# This script: -# - Waits for Prefect server to become ready. -# - Creates any missing work pools defined in the prefect.yaml configuration. -# - Deploys flows using the prefect.yaml configuration. +# Initializes Prefect work pools and deployments for the beamline defined by the BEAMLINE environment variable. +# Uses orchestration/flows/bl"$BEAMLINE"/prefect.yaml as the single source of truth. +# +# Requirements: +# - BEAMLINE must be set (e.g., 832). +# - A prefect.yaml file must exist in orchestration/flows/bl"$BEAMLINE"/. +# +# Behavior: +# - Waits until the Prefect server is reachable via its /health endpoint. +# - Creates any missing work pools defined in the beamline's prefect.yaml. +# - Deploys all flows defined in the beamline's prefect.yaml. # # Usage: -# ./init_832_work_pools.sh +# ./init_work_pools.sh # # Environment Variables: -# PREFECT_API_URL Override the API URL for the Prefect server. +# BEAMLINE The beamline identifier (e.g., 832). Required. +# PREFECT_API_URL Override the Prefect server API URL. # Default: http://prefect_server:4200/api # # Notes: -# - Intended to be run as a one-shot init container alongside Prefect server. +# - Intended to be run as a one-shot init container alongside a Prefect server. # - Idempotent: re-running will not recreate pools that already exist. -# - Can be included in an docker-compose setup, ex: -# prefect_init: -# build: ./splash_flows -# container_name: prefect_init -# command: ["/bin/bash", "/splash_flows/init_832_work_pools.sh"] -# volumes: -# - ./splash_flows:/splash_flows:ro -# environment: -# - PREFECT_API_URL=http://prefect_server:4200/api -# - PREFECT_LOGGING_LEVEL=DEBUG -# depends_on: -# - prefect_server -# networks: -# - prenet -# restart: "no" # run once, then stop - +# - Typical docker-compose integration: +# +# prefect_init: +# build: ./splash_flows +# container_name: prefect_init +# command: ["/bin/bash", "/splash_flows/init_work_pools.sh"] +# volumes: +# - ./splash_flows:/splash_flows:ro +# environment: +# - PREFECT_API_URL=http://prefect_server:4200/api +# - PREFECT_LOGGING_LEVEL=DEBUG +# - BEAMLINE=832 +# depends_on: +# - prefect_server +# networks: +# - prenet +# restart: "no" # run once, then stop set -euo pipefail +BEAMLINE="${BEAMLINE:?Must set BEAMLINE (e.g. 832, 733)}" + # Path to the Prefect project file -prefect_file="/splash_flows/orchestration/flows/bl832/prefect.yaml" +PREFECT_YAML="/splash_flows/orchestration/flows/bl${BEAMLINE}/prefect.yaml" + +if [[ ! -f "$PREFECT_YAML" ]]; then + echo "[Init:${BEAMLINE}] ERROR: Expected $PREFECT_YAML not found!" >&2 + exit 1 +fi # If PREFECT_API_URL is already defined in the container’s environment, it will use that value. # If not, it falls back to the default http://prefect_server:4200/api. : "${PREFECT_API_URL:=http://prefect_server:4200/api}" -echo "[Init] Waiting for Prefect server at $PREFECT_API_URL..." +echo "[Init:${BEAMLINE}] Using prefect.yaml at $PREFECT_YAML" +echo "[Init:${BEAMLINE}] Waiting for Prefect server at $PREFECT_API_URL..." # Wait for Prefect server to be ready, querying the health endpoint -python3 - <<'EOF' +python3 - <&2 + exit 1 +fi + +# If PREFECT_API_URL is already defined in the container’s environment, it will use that value. +# If not, it falls back to the default http://prefect_server:4200/api. +: "${PREFECT_API_URL:=http://prefect_server:4200/api}" + +echo "[Init:${BEAMLINE}] Using prefect.yaml at $PREFECT_YAML" +echo "[Init:${BEAMLINE}] Waiting for Prefect server at $PREFECT_API_URL..." + +# Wait for Prefect server to be ready, querying the health endpoint +python3 - < Date: Mon, 18 Aug 2025 14:04:18 -0700 Subject: [PATCH 04/16] adding grafana transfer test to the deployments --- orchestration/flows/bl832/prefect.yaml | 35 +++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml index 73ad3774..a6724ed1 100644 --- a/orchestration/flows/bl832/prefect.yaml +++ b/orchestration/flows/bl832/prefect.yaml @@ -1,3 +1,5 @@ +# orchestration/flows/bl832/prefect.yaml + name: bl832 prefect-version: 3.4.2 # Ensure workers clone code and run from the repo root so imports like @@ -9,89 +11,94 @@ pull: deployments: # Dispatcher flow -- launches new file processing flows and downstream reconstruction - name: run_832_dispatcher - entrypoint: orchestration/flows/bl832/dispatcher.py:dispatcher + entrypoint: /splash_flows/orchestration/flows/bl832/dispatcher.py:dispatcher work_pool: name: dispatcher_832_pool work_queue_name: dispatcher_832_queue # Move new files to the appropriate locations (data832, nersc cfs) and ingest metadata in scicat - name: new_file_832 - entrypoint: orchestration/flows/bl832/move.py:process_new_832_file + entrypoint: /splash_flows/orchestration/flows/bl832/move.py:process_new_832_file work_pool: name: new_file_832_pool work_queue_name: new_file_832_queue # Scheduled flow for testing file transfers - name: test_transfers_832 - entrypoint: orchestration/flows/bl832/move.py:test_transfers_832 + entrypoint: /splash_flows/orchestration/flows/bl832/move.py:test_transfers_832 work_pool: name: new_file_832_pool work_queue_name: test_transfers_832_queue +- name: test_832_transfers_grafana + entrypoint: /splash_flows/orchestration/flows/bl832/move.py:test_transfers_832_grafana + work_pool: + name: new_file_832_pool + work_queue_name: test_832_transfers_grafana_queue # Reconstuction flows - name: nersc_recon_flow - entrypoint: orchestration/flows/bl832/nersc.py:nersc_recon_flow + entrypoint: /splash_flows/orchestration/flows/bl832/nersc.py:nersc_recon_flow work_pool: name: nersc_recon_flow_pool work_queue_name: nersc_recon_flow_queue - name: nersc_streaming_flow - entrypoint: orchestration/flows/bl832/nersc.py:nersc_streaming_flow + entrypoint: /splash_flows/orchestration/flows/bl832/nersc.py:nersc_streaming_flow work_pool: name: nersc_streaming_pool work_queue_name: nersc_832_streaming_flow_queue - name: alcf_recon_flow - entrypoint: orchestration/flows/bl832/alcf.py:alcf_recon_flow + entrypoint: /splash_flows/orchestration/flows/bl832/alcf.py:alcf_recon_flow work_pool: name: alcf_recon_flow_pool work_queue_name: alcf_recon_flow_queue # Pruning flows - name: prune_data832_raw - entrypoint: orchestration/flows/bl832/prune.py:prune_data832_raw + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_data832_raw work_pool: name: prune_832_pool work_queue_name: prune_data832_raw_queue - name: prune_data832_scratch - entrypoint: orchestration/flows/bl832/prune.py:prune_data832_scratch + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_data832_scratch work_pool: name: prune_832_pool work_queue_name: prune_data832_scratch_queue - name: prune_data832 - entrypoint: orchestration/flows/bl832/prune.py:prune_data832 + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_data832 work_pool: name: prune_832_pool work_queue_name: prune_832_queue - name: prune_spot832 - entrypoint: orchestration/flows/bl832/prune.py:prune_spot832 + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_spot832 work_pool: name: prune_832_pool work_queue_name: prune_832_queue - name: prune_alcf832_raw - entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_raw + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_alcf832_raw work_pool: name: prune_832_pool work_queue_name: prune_alcf832_raw_queue - name: prune_alcf832_scratch - entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_scratch + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_alcf832_scratch work_pool: name: prune_832_pool work_queue_name: prune_alcf832_scratch_queue - name: prune_nersc832_alsdev_pscratch_raw - entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw work_pool: name: prune_832_pool work_queue_name: prune_nersc832_pscratch_queue - name: prune_nersc832_alsdev_pscratch_scratch - entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch + entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch work_pool: name: prune_832_pool work_queue_name: prune_nersc832_pscratch_queue From 50eff4150dd1c9a4867a252d0679b6d58b076fbc Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 18 Aug 2025 14:05:22 -0700 Subject: [PATCH 05/16] Moving prefect 2 deployment scripts to scripts/legacy/ --- init_832_work_pools.sh | 120 ------------------ .../create_deployment_832_dispatcher.sh | 0 .../legacy/create_deployments_832.sh | 0 .../legacy/create_deployments_832_alcf.sh | 0 .../legacy/create_deployments_832_nersc.sh | 0 5 files changed, 120 deletions(-) delete mode 100644 init_832_work_pools.sh rename create_deployment_832_dispatcher.sh => scripts/legacy/create_deployment_832_dispatcher.sh (100%) rename create_deployments_832.sh => scripts/legacy/create_deployments_832.sh (100%) rename create_deployments_832_alcf.sh => scripts/legacy/create_deployments_832_alcf.sh (100%) rename create_deployments_832_nersc.sh => scripts/legacy/create_deployments_832_nersc.sh (100%) diff --git a/init_832_work_pools.sh b/init_832_work_pools.sh deleted file mode 100644 index bb7c425b..00000000 --- a/init_832_work_pools.sh +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env bash -# -# init_work_pools.sh -# -# Description: -# Initializes Prefect work pools and deployments for the beamline defined by the BEAMLINE environment variable. -# Uses orchestration/flows/bl"$BEAMLINE"/prefect.yaml as the single source of truth. -# -# Requirements: -# - BEAMLINE must be set (e.g., 832). -# - A prefect.yaml file must exist in orchestration/flows/bl"$BEAMLINE"/. -# -# Behavior: -# - Waits until the Prefect server is reachable via its /health endpoint. -# - Creates any missing work pools defined in the beamline's prefect.yaml. -# - Deploys all flows defined in the beamline's prefect.yaml. -# -# Usage: -# ./init_work_pools.sh -# -# Environment Variables: -# BEAMLINE The beamline identifier (e.g., 832). Required. -# PREFECT_API_URL Override the Prefect server API URL. -# Default: http://prefect_server:4200/api -# -# Notes: -# - Intended to be run as a one-shot init container alongside a Prefect server. -# - Idempotent: re-running will not recreate pools that already exist. -# - Typical docker-compose integration: -# -# prefect_init: -# build: ./splash_flows -# container_name: prefect_init -# command: ["/bin/bash", "/splash_flows/init_work_pools.sh"] -# volumes: -# - ./splash_flows:/splash_flows:ro -# environment: -# - PREFECT_API_URL=http://prefect_server:4200/api -# - PREFECT_LOGGING_LEVEL=DEBUG -# - BEAMLINE=832 -# depends_on: -# - prefect_server -# networks: -# - prenet -# restart: "no" # run once, then stop - -set -euo pipefail - -BEAMLINE="${BEAMLINE:?Must set BEAMLINE (e.g. 832, 733)}" - -# Path to the Prefect project file -PREFECT_YAML="/splash_flows/orchestration/flows/bl${BEAMLINE}/prefect.yaml" - -if [[ ! -f "$PREFECT_YAML" ]]; then - echo "[Init:${BEAMLINE}] ERROR: Expected $PREFECT_YAML not found!" >&2 - exit 1 -fi - -# If PREFECT_API_URL is already defined in the container’s environment, it will use that value. -# If not, it falls back to the default http://prefect_server:4200/api. -: "${PREFECT_API_URL:=http://prefect_server:4200/api}" - -echo "[Init:${BEAMLINE}] Using prefect.yaml at $PREFECT_YAML" -echo "[Init:${BEAMLINE}] Waiting for Prefect server at $PREFECT_API_URL..." - -# Wait for Prefect server to be ready, querying the health endpoint -python3 - < Date: Tue, 19 Aug 2025 09:57:39 -0700 Subject: [PATCH 06/16] Fixing flow run name formatting --- orchestration/flows/bl832/alcf.py | 2 +- orchestration/flows/bl832/dispatcher.py | 4 ---- orchestration/flows/bl832/move.py | 8 -------- orchestration/flows/bl832/nersc.py | 2 +- 4 files changed, 2 insertions(+), 14 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 3148a020..c57afe50 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -347,7 +347,7 @@ def schedule_pruning( return True -@flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{{ file_path | basename }}") +@flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") def alcf_recon_flow( file_path: str, config: Optional[Config832] = None, diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index bbb32245..978bea6e 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -101,11 +101,7 @@ async def run_recon_flow_async(flow_name: str, parameters: dict) -> None: raise -<<<<<<< HEAD @flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") -======= -@flow(name="dispatcher", flow_run_name="dispatch_flows-{{ file_path | basename }}") ->>>>>>> af01bc5 (Prefect 3 upgrade: flows, tests, dispatcher/BL832 updates, requirements, and Prefect YAML) async def dispatcher( file_path: Optional[str] = None, is_export_control: bool = False, diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 79dee612..fa3a0e6e 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -87,7 +87,6 @@ def transfer_data_to_nersc( return success -<<<<<<< HEAD @flow(name="new_832_file_flow") def process_new_832_file_flow( file_path: str, @@ -110,13 +109,6 @@ def process_new_832_file_task( send_to_nersc=True, config=None ): -======= -@flow(name="new_832_file_flow", flow_run_name="process_new-{{ file_path | basename }}") -def process_new_832_file(file_path: str, - is_export_control=False, - send_to_nersc=True, - config=None): ->>>>>>> af01bc5 (Prefect 3 upgrade: flows, tests, dispatcher/BL832 updates, requirements, and Prefect YAML) """ Sends a file along a path: - Copy from spot832 to data832 diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index b944fd5c..fd066b12 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -425,7 +425,7 @@ def schedule_pruning( logger.error(f"Failed to schedule prune task: {e}") -@flow(name="nersc_recon_flow", flow_run_name="nersc_recon-{{ file_path | basename }}") +@flow(name="nersc_recon_flow", flow_run_name="nersc_recon-{file_path}") def nersc_recon_flow( file_path: str, config: Optional[Config832] = None, From e3bb274eba400a6ec9578170899ca24447b61e69 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 19 Aug 2025 09:59:08 -0700 Subject: [PATCH 07/16] linting --- orchestration/transfer_controller.py | 43 ++++++++++++++-------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/orchestration/transfer_controller.py b/orchestration/transfer_controller.py index 09e82649..7a71605a 100644 --- a/orchestration/transfer_controller.py +++ b/orchestration/transfer_controller.py @@ -10,7 +10,6 @@ import globus_sdk from orchestration.flows.bl832.config import Config832 -from orchestration.flows.bl832.job_controller import HPC from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prometheus_utils import PrometheusMetrics @@ -135,23 +134,23 @@ def get_transfer_file_info( ) -> Optional[dict]: """ Get information about a completed transfer from the Globus API. - + Args: task_id (str): The Globus transfer task ID transfer_client (TransferClient, optional): TransferClient instance - + Returns: Optional[dict]: Task information including bytes_transferred, or None if unavailable """ if transfer_client is None: transfer_client = self.config.tc - + try: task_info = transfer_client.get_task(task_id) task_dict = task_info.data if task_dict.get('status') == 'SUCCEEDED': - bytes_transferred = task_dict.get('bytes_transferred', 0) + bytes_transferred = task_dict.get('bytes_transferred', 0) bytes_checksummed = task_dict.get('bytes_checksummed', 0) files_transferred = task_dict.get('files_transferred', 0) effective_bytes_per_second = task_dict.get('effective_bytes_per_second', 0) @@ -161,13 +160,13 @@ def get_transfer_file_info( 'files_transferred': files_transferred, 'effective_bytes_per_second': effective_bytes_per_second } - + return None - + except Exception as e: logger.error(f"Error getting transfer task info: {e}") return None - + def collect_and_push_metrics( self, start_time: float, @@ -181,7 +180,7 @@ def collect_and_push_metrics( ) -> None: """ Collect transfer metrics and push them to Prometheus. - + Args: start_time (float): Transfer start time as UNIX timestamp. end_time (float): Transfer end time as UNIX timestamp. @@ -201,10 +200,10 @@ def collect_and_push_metrics( end_datetime = datetime.datetime.fromtimestamp(end_time, tz=datetime.timezone.utc) start_timestamp = start_datetime.isoformat() end_timestamp = end_datetime.isoformat() - + # Calculate duration in seconds duration_seconds = end_time - start_time - + # Calculate transfer speed (bytes per second) # transfer_speed = file_size / duration_seconds if duration_seconds > 0 and file_size > 0 else 0 @@ -220,13 +219,13 @@ def collect_and_push_metrics( "status": "success" if success else "failed", "machine": machine_name } - + # Push metrics to Prometheus self.prometheus_metrics.push_metrics_to_prometheus(metrics, logger) - + except Exception as e: logger.error(f"Error collecting or pushing metrics: {e}") - + def copy( self, file_path: str = None, @@ -244,11 +243,11 @@ def copy( Returns: bool: True if the transfer was successful, False otherwise. """ - + if not file_path: logger.error("No file_path provided") return False - + if not source or not destination: logger.error("Source or destination endpoint not provided") return False @@ -268,7 +267,7 @@ def copy( success = False task_id = None # Initialize task_id here to prevent UnboundLocalError file_size = 0 # Initialize file_size here as well - + try: success, task_id = start_transfer( transfer_client=self.config.tc, @@ -284,10 +283,10 @@ def copy( logger.info("Transfer completed successfully.") else: logger.error("Transfer failed.") - + except globus_sdk.services.transfer.errors.TransferAPIError as e: logger.error(f"Failed to submit transfer: {e}") - + finally: # Stop the timer and calculate the duration transfer_end_time = time.time() @@ -300,7 +299,7 @@ def copy( transfer_speed = transfer_info.get('effective_bytes_per_second', 0) logger.info(f"Globus Task Info: Transferred {file_size} bytes ") logger.info(f"Globus Task Info: Effective speed: {transfer_speed} bytes/second") - + # Collect and push metrics if enabled if self.prometheus_metrics and file_size > 0: self.collect_and_push_metrics( @@ -313,7 +312,7 @@ def copy( transfer_speed=transfer_speed, success=success, ) - + return success @@ -432,4 +431,4 @@ def get_transfer_controller( if success: logger.info("Simple transfer succeeded.") else: - logger.error("Simple transfer failed.") \ No newline at end of file + logger.error("Simple transfer failed.") From ea5b23b22c969b3fff09e2172e88313a4e2384f9 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 10 Sep 2025 16:14:26 -0700 Subject: [PATCH 08/16] Updating prefect version in Dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index d409f98d..3034b7dd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM prefecthq/prefect:2.20.17-python3.11 +FROM prefecthq/prefect:3.4.2-python3.11 WORKDIR /app COPY ./requirements.txt /tmp/ From 3fe95fc2833878ddb415cce005bf6597ba003244 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 09:40:14 -0800 Subject: [PATCH 09/16] Adding script to create 832 Prefect Variables blocks (JSON is deprecated) --- scripts/create_832_variable_blocks.py | 32 +++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 scripts/create_832_variable_blocks.py diff --git a/scripts/create_832_variable_blocks.py b/scripts/create_832_variable_blocks.py new file mode 100644 index 00000000..3cf0dcd6 --- /dev/null +++ b/scripts/create_832_variable_blocks.py @@ -0,0 +1,32 @@ +from prefect.variables import Variable +import asyncio + + +async def create_variables(): + variables = { + "alcf-allocation-root-path": {"alcf-allocation-root-path": "/eagle/IRIBeta/als"}, + "bl832-settings": { + "delete_spot832_files_after_days": 1, + "delete_data832_files_after_days": 35 + }, + "globus-settings": {"max_wait_seconds": 600}, + "pruning-config": { + "max_wait_seconds": 120, + "delete_alcf832_files_after_days": 1, + "delete_data832_files_after_days": 14, + "delete_nersc832_files_after_days": 7 + }, + "decision-settings": { + "nersc_recon_flow/nersc_recon_flow": True, + "new_832_file_flow/new_file_832": True, + "alcf_recon_flow/alcf_recon_flow": False + } + } + + for name, value in variables.items(): + await Variable.set(name=name, value=value, overwrite=True) + print(f"Created variable: {name}") + + +if __name__ == "__main__": + asyncio.run(create_variables()) From a3d5b73ad2f8cdb47bb49d0ef807a0c0c3e285e1 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 09:40:54 -0800 Subject: [PATCH 10/16] Updating 832 logic to use Prefect Variables instead of Prefect JSON Blocks for settings --- orchestration/flows/bl832/alcf.py | 7 ++++--- orchestration/flows/bl832/dispatcher.py | 19 +++++++++---------- orchestration/flows/bl832/move.py | 13 +++++++++++-- orchestration/flows/bl832/nersc.py | 4 ++-- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index c57afe50..4638d9c1 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -8,7 +8,8 @@ from globus_compute_sdk import Client, Executor from globus_compute_sdk.serialize import CombinedCode from prefect import flow, task -from prefect.blocks.system import JSON, Secret +from prefect.blocks.system import Secret +from prefect.variables import Variable from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController @@ -36,7 +37,7 @@ def __init__( super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" - allocation_data = JSON.load("alcf-allocation-root-path").value + allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) self.allocation_root = allocation_data.get("alcf-allocation-root-path") if not self.allocation_root: raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") @@ -314,7 +315,7 @@ def schedule_pruning( Returns: bool: True if the tasks were scheduled successfully, False otherwise. """ - pruning_config = JSON.load("pruning-config").value + pruning_config = Variable.get("pruning-config", _sync=True) if one_minute: alcf_delay = datetime.timedelta(minutes=1) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 978bea6e..cf1d0c64 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -1,11 +1,11 @@ import asyncio -from prefect import flow, task, get_run_logger -from prefect.blocks.system import JSON -from prefect.deployments import run_deployment from pydantic import BaseModel, ValidationError, Field from typing import Any, Optional, Union from orchestration.flows.bl832.move import process_new_832_file_task +from prefect import flow, task, get_run_logger +from prefect.deployments import run_deployment +from prefect.variables import Variable class FlowParameterMapper: @@ -75,8 +75,7 @@ def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: b "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows - settings_json = JSON(value=settings) - settings_json.save(name="decision-settings", overwrite=True) + Variable.set(name="decision-settings", value=settings, overwrite=True, _sync=True) except Exception as e: logger.error(f"Failed to set up decision settings: {e}") raise @@ -122,10 +121,10 @@ async def dispatcher( raise # Run new_file_832 first (synchronously) - available_params = inputs.dict() + available_params = inputs.model_dump() try: - decision_settings = await JSON.load("decision-settings") - if decision_settings.value.get("new_832_file_flow/new_file_832"): + decision_settings = await Variable.get("decision-settings") + if decision_settings.get("new_832_file_flow/new_file_832"): logger.info("Running new_file_832 flow...") process_new_832_file_task( file_path=available_params.get("file_path"), @@ -142,11 +141,11 @@ async def dispatcher( # Prepare ALCF and NERSC flows to run asynchronously, based on settings tasks = [] - if decision_settings.value.get("alcf_recon_flow/alcf_recon_flow"): + if decision_settings.get("alcf_recon_flow/alcf_recon_flow"): alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) - if decision_settings.value.get("nersc_recon_flow/nersc_recon_flow"): + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index fa3a0e6e..c06c8fef 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -5,7 +5,7 @@ from globus_sdk import TransferClient from prefect import flow, task, get_run_logger -from prefect.blocks.system import JSON +from prefect.variables import Variable from orchestration.flows.scicat.ingest import ingest_dataset from orchestration.flows.bl832.config import Config832 @@ -161,7 +161,7 @@ def process_new_832_file_task( # datetime.timedelta(0.0), # ) - bl832_settings = JSON.load("bl832-settings").value + bl832_settings = Variable.get("bl832-settings", _sync=True) flow_name = f"delete spot832: {Path(file_path).name}" schedule_spot832_delete_days = bl832_settings["delete_spot832_files_after_days"] @@ -233,3 +233,12 @@ def test_transfers_832_grafana(file_path: str = "/raw/transfer_tests/test/"): logger.info( f"File successfully transferred from data832 to NERSC {file_path}. Task {task}" ) + + +if __name__ == "__main__": + import os + import dotenv + + dotenv.load_dotenv() + + file_path = "/raw/transfer_tests/test.txt" diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py index fd066b12..c988ce87 100644 --- a/orchestration/flows/bl832/nersc.py +++ b/orchestration/flows/bl832/nersc.py @@ -9,7 +9,7 @@ from authlib.jose import JsonWebKey from prefect import flow, get_run_logger -from prefect.blocks.system import JSON +from prefect.variables import Variable from sfapi_client import Client from sfapi_client.compute import Machine from typing import Optional @@ -313,7 +313,7 @@ def schedule_pruning( # nersc/pscratch : 1 day # nersc832/scratch : never? - pruning_config = JSON.load("pruning-config").value + pruning_config = Variable.get("pruning-config", _sync=True) data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) nersc832_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) From e3c7b9ed69033f88c80ff065d4766bc166a74092 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 09:41:16 -0800 Subject: [PATCH 11/16] Updaing pytest to reflect changes from JSON -> Variable blocks --- orchestration/_tests/test_globus_flow.py | 35 ++++++++++++++++-------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 7bfc0af4..c52448e5 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -2,7 +2,8 @@ from uuid import uuid4 import warnings -from prefect.blocks.system import JSON, Secret +from prefect.blocks.system import Secret +from prefect.variables import Variable from prefect.testing.utilities import prefect_test_harness import pytest from pytest_mock import MockFixture @@ -32,18 +33,30 @@ def prefect_test_fixture(): globus_compute_endpoint = Secret(value=str(uuid4())) globus_compute_endpoint.save(name="globus-compute-endpoint") - pruning_config = JSON(value={"max_wait_seconds": 600}) - pruning_config.save(name="pruning-config") + Variable.set( + name="pruning-config", + value={"max_wait_seconds": 600}, + overwrite=True, + _sync=True + ) - decision_settings = JSON(value={ - "alcf_recon_flow/alcf_recon_flow": True, - "nersc_recon_flow/nersc_recon_flow": True, - "new_832_file_flow/new_file_832": True - }) - decision_settings.save(name="decision-settings") + Variable.set( + name="decision-settings", + value={ + "alcf_recon_flow/alcf_recon_flow": True, + "nersc_recon_flow/nersc_recon_flow": True, + "new_832_file_flow/new_file_832": True + }, + overwrite=True, + _sync=True + ) - alcf_allocation_root = JSON(value={"alcf-allocation-root-path": "/eagle/IRIProd/ALS"}) - alcf_allocation_root.save(name="alcf-allocation-root-path") + Variable.set( + name="alcf-allocation-root-path", + value={"alcf-allocation-root-path": "/eagle/IRIProd/ALS"}, + overwrite=True, + _sync=True + ) yield From f1324910944cc39ae24583b06cdfbcf44e83300c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 09:42:08 -0800 Subject: [PATCH 12/16] Adding the python deployment script --- init_work_pools.py | 146 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 init_work_pools.py diff --git a/init_work_pools.py b/init_work_pools.py new file mode 100644 index 00000000..b4ceb255 --- /dev/null +++ b/init_work_pools.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +""" +init_work_pools.py +Description: + Initializes Prefect work pools and deployments for the beamline defined by the BEAMLINE environment variable. + Uses orchestration/flows/bl"$BEAMLINE"/prefect.yaml as the single source of truth. +Requirements: + - BEAMLINE must be set (e.g., 832). + - A prefect.yaml file must exist in orchestration/flows/bl"$BEAMLINE"/. + - Prefect CLI must be installed and available in PATH. +Behavior: + - Waits until the Prefect server is reachable via its /health endpoint. + - Creates any missing work pools defined in the beamline's prefect.yaml. + - Deploys all flows defined in the beamline's prefect.yaml. + - Creates/updates Prefect Secret blocks for GLOBUS_CLIENT_ID and GLOBUS_CLIENT_SECRET + if the corresponding environment variables are present. Otherwise warns and continues. +Environment Variables: + BEAMLINE The beamline identifier (e.g., 832). Required. + PREFECT_API_URL Override the Prefect server API URL. + Default: http://prefect_server:4200/api +""" + +import httpx +import logging +import os +import subprocess +import sys +import time +import yaml + +from prefect.blocks.system import Secret + + +# ---------------- Logging Setup ---------------- # +logger = logging.getLogger("init_work_pools") +handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter( + fmt="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +handler.setFormatter(formatter) +logger.addHandler(handler) +logger.setLevel(logging.INFO) +# ------------------------------------------------ # + + +def check_env() -> tuple[str, str, str]: + """Validate required environment variables and paths.""" + beamline = os.environ.get("BEAMLINE") + if not beamline: + logger.error("Must set BEAMLINE (e.g., 832, 733)") + sys.exit(1) + + prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml" + if not os.path.isfile(prefect_yaml): + logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!") + sys.exit(1) + + api_url = os.environ.get("PREFECT_API_URL", "http://prefect_server:4200/api") + return beamline, prefect_yaml, api_url + + +def wait_for_prefect_server(api_url: str, beamline: str, timeout: int = 180): + """Wait until Prefect server health endpoint responds (unless using Prefect Cloud).""" + if "api.prefect.cloud" in api_url: + logger.info(f"[Init:{beamline}] Prefect Cloud detected — skipping health check.") + return + + health_url = f"{api_url}/health" + logger.info(f"[Init:{beamline}] Waiting for Prefect server at {health_url}...") + + start = time.time() + while time.time() - start < timeout: + try: + r = httpx.get(health_url, timeout=2.0) + if r.status_code == 200: + logger.info(f"[Init:{beamline}] Prefect server is up.") + return + except Exception: + pass + logger.warning(f"[Init:{beamline}] Still waiting...") + time.sleep(3) + + logger.error(f"[Init:{beamline}] Prefect server did not become ready in time.") + sys.exit(1) + + +def ensure_work_pools(prefect_yaml: str, beamline: str): + """Ensure that all work pools from prefect.yaml exist (create if missing).""" + with open(prefect_yaml, "r") as f: + config = yaml.safe_load(f) + + pools = {d["work_pool"]["name"] for d in config.get("deployments", []) if "work_pool" in d} + + for pool in pools: + logger.info(f"[Init:{beamline}] Ensuring pool: {pool}") + try: + subprocess.run( + ["prefect", "work-pool", "inspect", pool], + check=True, + capture_output=True, + ) + logger.info(f"[Init:{beamline}] Work pool '{pool}' already exists.") + except subprocess.CalledProcessError: + logger.info(f"[Init:{beamline}] Creating work pool: {pool}") + subprocess.run( + ["prefect", "work-pool", "create", pool, "--type", "process"], + check=True, + ) + + +def deploy_flows(prefect_yaml: str, beamline: str): + """Deploy flows defined in prefect.yaml using Prefect CLI.""" + logger.info(f"[Init:{beamline}] Deploying flows from {prefect_yaml}...") + subprocess.run( + ["prefect", "--no-prompt", "deploy", "--prefect-file", prefect_yaml, "--all"], + check=True, + ) + logger.info(f"[Init:{beamline}] Done.") + + +def ensure_globus_secrets(beamline: str): + globus_client_id = os.environ.get("GLOBUS_CLIENT_ID") + globus_client_secret = os.environ.get("GLOBUS_CLIENT_SECRET") + + if globus_client_id and globus_client_secret: + # Create or update Prefect Secret blocks for Globus credentials + try: + Secret(value=globus_client_id).save(name="globus-client-id", overwrite=True) + Secret(value=globus_client_secret).save(name="globus-client-secret", overwrite=True) + logger.info(f"[Init:{beamline}] Created/updated Prefect Secret blocks for Globus credentials.") + except Exception as e: + logger.warning(f"[Init:{beamline}] Failed to create/update Prefect Secret blocks: {str(e)}") + + +def main(): + beamline, prefect_yaml, api_url = check_env() + logger.info(f"[Init:{beamline}] Using prefect.yaml at {prefect_yaml}") + wait_for_prefect_server(api_url, beamline) + ensure_globus_secrets(beamline) + ensure_work_pools(prefect_yaml, beamline) + deploy_flows(prefect_yaml, beamline) + + +if __name__ == "__main__": + main() From b4a0d1e1f90512cebbe3d69197b4eb613c0ce74f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 10:07:51 -0800 Subject: [PATCH 13/16] bringing prefect.yaml up to spec with the 5 beamline project --- orchestration/flows/bl832/prefect.yaml | 35 +++++++++++--------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml index a6724ed1..d18e9229 100644 --- a/orchestration/flows/bl832/prefect.yaml +++ b/orchestration/flows/bl832/prefect.yaml @@ -2,103 +2,98 @@ name: bl832 prefect-version: 3.4.2 -# Ensure workers clone code and run from the repo root so imports like -pull: -- prefect.deployments.steps.git_clone: - repository: https://github.com/als-computing/splash_flows.git - branch: main deployments: # Dispatcher flow -- launches new file processing flows and downstream reconstruction - name: run_832_dispatcher - entrypoint: /splash_flows/orchestration/flows/bl832/dispatcher.py:dispatcher + entrypoint: orchestration/flows/bl832/dispatcher.py:dispatcher work_pool: name: dispatcher_832_pool work_queue_name: dispatcher_832_queue # Move new files to the appropriate locations (data832, nersc cfs) and ingest metadata in scicat - name: new_file_832 - entrypoint: /splash_flows/orchestration/flows/bl832/move.py:process_new_832_file + entrypoint: orchestration/flows/bl832/move.py:process_new_832_file work_pool: name: new_file_832_pool work_queue_name: new_file_832_queue # Scheduled flow for testing file transfers - name: test_transfers_832 - entrypoint: /splash_flows/orchestration/flows/bl832/move.py:test_transfers_832 + entrypoint: orchestration/flows/bl832/move.py:test_transfers_832 work_pool: name: new_file_832_pool work_queue_name: test_transfers_832_queue - name: test_832_transfers_grafana - entrypoint: /splash_flows/orchestration/flows/bl832/move.py:test_transfers_832_grafana + entrypoint: orchestration/flows/bl832/move.py:test_transfers_832_grafana work_pool: name: new_file_832_pool work_queue_name: test_832_transfers_grafana_queue # Reconstuction flows - name: nersc_recon_flow - entrypoint: /splash_flows/orchestration/flows/bl832/nersc.py:nersc_recon_flow + entrypoint: orchestration/flows/bl832/nersc.py:nersc_recon_flow work_pool: name: nersc_recon_flow_pool work_queue_name: nersc_recon_flow_queue - name: nersc_streaming_flow - entrypoint: /splash_flows/orchestration/flows/bl832/nersc.py:nersc_streaming_flow + entrypoint: orchestration/flows/bl832/nersc.py:nersc_streaming_flow work_pool: name: nersc_streaming_pool work_queue_name: nersc_832_streaming_flow_queue - name: alcf_recon_flow - entrypoint: /splash_flows/orchestration/flows/bl832/alcf.py:alcf_recon_flow + entrypoint: orchestration/flows/bl832/alcf.py:alcf_recon_flow work_pool: name: alcf_recon_flow_pool work_queue_name: alcf_recon_flow_queue # Pruning flows - name: prune_data832_raw - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_data832_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_data832_raw work_pool: name: prune_832_pool work_queue_name: prune_data832_raw_queue - name: prune_data832_scratch - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_data832_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_data832_scratch work_pool: name: prune_832_pool work_queue_name: prune_data832_scratch_queue - name: prune_data832 - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_data832 + entrypoint: orchestration/flows/bl832/prune.py:prune_data832 work_pool: name: prune_832_pool work_queue_name: prune_832_queue - name: prune_spot832 - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_spot832 + entrypoint: orchestration/flows/bl832/prune.py:prune_spot832 work_pool: name: prune_832_pool work_queue_name: prune_832_queue - name: prune_alcf832_raw - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_alcf832_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_raw work_pool: name: prune_832_pool work_queue_name: prune_alcf832_raw_queue - name: prune_alcf832_scratch - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_alcf832_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_alcf832_scratch work_pool: name: prune_832_pool work_queue_name: prune_alcf832_scratch_queue - name: prune_nersc832_alsdev_pscratch_raw - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw + entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw work_pool: name: prune_832_pool work_queue_name: prune_nersc832_pscratch_queue - name: prune_nersc832_alsdev_pscratch_scratch - entrypoint: /splash_flows/orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch + entrypoint: orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch work_pool: name: prune_832_pool work_queue_name: prune_nersc832_pscratch_queue From dda5e2e55cb3034c0ddc35a670632ebc58b3e01b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 10:30:20 -0800 Subject: [PATCH 14/16] Updating pytest and transfer_to_nersc function to work after rebasing --- orchestration/_tests/test_globus_flow.py | 45 +++++++++++++++--------- orchestration/flows/bl832/move.py | 10 +++--- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index c52448e5..b71be618 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -58,14 +58,25 @@ def prefect_test_fixture(): _sync=True ) + Variable.set( + name="bl832-settings", + value={ + "delete_spot832_files_after_days": 1, + "delete_data832_files_after_days": 35 + }, + overwrite=True, + _sync=True + ) + yield class MockEndpoint: - def __init__(self, root_path, uuid_value=None): + def __init__(self, root_path, uuid_value=None, name=None): self.root_path = root_path self.uuid = uuid_value or str(uuid4()) self.uri = f"mock_endpoint_uri_{self.uuid}" + self.name = name or f"mock_endpoint_{self.uuid[:8]}" class MockConfig832(): @@ -147,6 +158,8 @@ class MockSecret: mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) + mocker.patch('orchestration.flows.bl832.move.schedule_prefect_flow', return_value=None) + # Mock read_deployment_by_name with a manually defined mock class class MockDeployment: def __init__(self): @@ -197,22 +210,22 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_secret = mocker.MagicMock() mock_secret.get.return_value = str(uuid4()) - with mocker.patch('prefect.blocks.system.Secret.load', return_value=mock_secret): - # 2) Patch out the calls in Config832 that do real Globus auth: - # a) init_transfer_client(...) used in the constructor - mocker.patch( - "orchestration.flows.bl832.config.transfer.init_transfer_client", - return_value=mocker.MagicMock() # pretend TransferClient - ) - # b) flows.get_flows_client(...) used in the constructor - mocker.patch( - "orchestration.flows.bl832.config.flows.get_flows_client", - return_value=mocker.MagicMock() # pretend FlowsClient - ) + mocker.patch('prefect.blocks.system.Secret.load', return_value=mock_secret) + # 2) Patch out the calls in Config832 that do real Globus auth: + # a) init_transfer_client(...) used in the constructor + mocker.patch( + "orchestration.flows.bl832.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # pretend TransferClient + ) + # b) flows.get_flows_client(...) used in the constructor + mocker.patch( + "orchestration.flows.bl832.config.flows.get_flows_client", + return_value=mocker.MagicMock() # pretend FlowsClient + ) - # 3) Now import the real code AFTER these patches - from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController - from orchestration.flows.bl832.config import Config832 + # 3) Now import the real code AFTER these patches + from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController + from orchestration.flows.bl832.config import Config832 # 4) Create a config => won't do real Globus calls now mock_config = Config832() diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index c06c8fef..9150c1d4 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -52,6 +52,7 @@ def transfer_data_to_nersc( transfer_client: TransferClient, data832: GlobusEndpoint, nersc832: GlobusEndpoint, + config=None, ): logger = get_run_logger() @@ -60,7 +61,8 @@ def transfer_data_to_nersc( file_path = file_path[1:] # Initialize config - config = Config832() + if not config: + config = Config832() # Import here to avoid circular imports from orchestration.transfer_controller import get_transfer_controller, CopyMethod @@ -142,7 +144,7 @@ def process_new_832_file_task( if not is_export_control and send_to_nersc: transfer_data_to_nersc( - relative_path, config.tc, config.data832, config.nersc832 + relative_path, config.tc, config.data832, config.nersc832, config ) logger.info( f"File successfully transferred from data832 to NERSC {file_path}. Task {task}" @@ -217,7 +219,7 @@ def test_transfers_832(file_path: str = "/raw/transfer_tests/test.txt"): ) logger.info(f"Transferred {spot832_path} to spot to data") - task = transfer_data_to_nersc(new_file, config.tc, config.data832, config.nersc832) + task = transfer_data_to_nersc(new_file, config.tc, config.data832, config.nersc832, config) logger.info( f"File successfully transferred from data832 to NERSC {spot832_path}. Task {task}" ) @@ -228,7 +230,7 @@ def test_transfers_832_grafana(file_path: str = "/raw/transfer_tests/test/"): logger = get_run_logger() config = Config832() - task = transfer_data_to_nersc(file_path, config.tc, config.data832, config.nersc_alsdev) + task = transfer_data_to_nersc(file_path, config.tc, config.data832, config.nersc_alsdev, config) logger.info( f"File successfully transferred from data832 to NERSC {file_path}. Task {task}" From 27c15f9113801b0c1a36f2b3c86bd1b6ad6671c6 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 10:40:14 -0800 Subject: [PATCH 15/16] Adding 12 hr automated scheduling for test_transfers_832 flow --- orchestration/flows/bl832/prefect.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml index d18e9229..0af99461 100644 --- a/orchestration/flows/bl832/prefect.yaml +++ b/orchestration/flows/bl832/prefect.yaml @@ -24,6 +24,12 @@ deployments: work_pool: name: new_file_832_pool work_queue_name: test_transfers_832_queue + schedules: + - cron: "0 */12 * * *" # Every 12 hours + slug: "test-move-733-flight-check" + timezone: America/Los_Angeles + active: true + - name: test_832_transfers_grafana entrypoint: orchestration/flows/bl832/move.py:test_transfers_832_grafana work_pool: From cde170b3f658ec5cefd382eff544ba8676d2df40 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 10:57:42 -0800 Subject: [PATCH 16/16] Deleting legacy prefect 2 create_deployments scripts. --- .../create_deployment_832_dispatcher.sh | 6 --- scripts/legacy/create_deployments_832.sh | 38 ------------------- scripts/legacy/create_deployments_832_alcf.sh | 35 ----------------- .../legacy/create_deployments_832_nersc.sh | 26 ------------- 4 files changed, 105 deletions(-) delete mode 100755 scripts/legacy/create_deployment_832_dispatcher.sh delete mode 100644 scripts/legacy/create_deployments_832.sh delete mode 100755 scripts/legacy/create_deployments_832_alcf.sh delete mode 100755 scripts/legacy/create_deployments_832_nersc.sh diff --git a/scripts/legacy/create_deployment_832_dispatcher.sh b/scripts/legacy/create_deployment_832_dispatcher.sh deleted file mode 100755 index afaae432..00000000 --- a/scripts/legacy/create_deployment_832_dispatcher.sh +++ /dev/null @@ -1,6 +0,0 @@ -export $(grep -v '^#' .env | xargs) - - -prefect work-pool create 'dispatcher_pool' -prefect deployment build ./orchestration/flows/bl832/dispatcher.py:dispatcher -n run_832_dispatcher -q bl832 -p dispatcher_pool -prefect deployment apply dispatcher-deployment.yaml \ No newline at end of file diff --git a/scripts/legacy/create_deployments_832.sh b/scripts/legacy/create_deployments_832.sh deleted file mode 100644 index 0c9bbd3e..00000000 --- a/scripts/legacy/create_deployments_832.sh +++ /dev/null @@ -1,38 +0,0 @@ -export $(grep -v '^#' .env | xargs) - - -# Create work pools. If a work pool already exists, it will throw a warning but that's no problem -prefect work-pool create 'new_file_832_flow_pool' -prefect work-pool create 'new_file_832_prune_pool' -prefect work-pool create 'scicat_ingest_pool' - -# new_file_832_flow_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "new_file_832_flow_pool" - -# file transfer from data832 to nersc832 with Grafana Monitoring enabled -prefect deployment build ./orchestration/flows/bl832/move.py:test_transfers_832_grafana -n test_transfers_832_grafana -p new_file_832_flow_pool -q test_transfers_832_queue -prefect deployment apply test_transfers_832_grafana-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/move.py:process_new_832_file -n new_file_832 -p new_file_832_flow_pool -q new_file_832_queue -prefect deployment apply process_new_832_file-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/move.py:test_transfers_832 -n test_transfers_832 -p new_file_832_flow_pool -q test_transfers_832_queue -prefect deployment apply test_transfers_832-deployment.yaml - -# new_file_832_prune_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "new_file_832_prune_pool" - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_spot832 -n prune_spot832 -p new_file_832_prune_pool -q prune_spot832_queue -prefect deployment apply prune_spot832-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_data832 -n prune_data832 -p new_file_832_prune_pool -q prune_data832_queue -prefect deployment apply prune_data832-deployment.yaml - -# scicat_ingest_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "scicat_ingest_pool" - -prefect deployment build ./orchestration/flows/scicat/ingest.py:ingest_dataset -n ingest_dataset -p scicat_ingest_pool -q ingest_dataset_queue -prefect deployment apply ingest_dataset-deployment.yaml \ No newline at end of file diff --git a/scripts/legacy/create_deployments_832_alcf.sh b/scripts/legacy/create_deployments_832_alcf.sh deleted file mode 100755 index 815ca3a3..00000000 --- a/scripts/legacy/create_deployments_832_alcf.sh +++ /dev/null @@ -1,35 +0,0 @@ -export $(grep -v '^#' .env | xargs) - - -# create 'alfc_flow_pool' -prefect work-pool create 'alcf_flow_pool' -# create 'aclf_prune_pool' -prefect work-pool create 'alcf_prune_pool' - -# alcf_flow_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "alcf_flow_pool" -prefect deployment build ./orchestration/flows/bl832/alcf.py:alcf_recon_flow -n alcf_recon_flow -p alcf_flow_pool -q alcf_recon_flow_queue -prefect deployment apply alcf_recon_flow-deployment.yaml - -# alcf_prune_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "alcf_prune_pool" -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_alcf832_raw -n prune_alcf832_raw -p alcf_prune_pool -q prune_alcf832_raw_queue -prefect deployment apply prune_alcf832_raw-deployment.yaml - - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_alcf832_scratch -n prune_alcf832_scratch -p alcf_prune_pool -q prune_alcf832_scratch_queue -prefect deployment apply prune_alcf832_scratch-deployment.yaml - - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_data832_raw -n prune_data832_raw -p alcf_prune_pool -q prune_data832_raw_queue -prefect deployment apply prune_data832_raw-deployment.yaml - - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_data832_scratch -n prune_data832_scratch -q bl832 -p alcf_prune_pool -q prune_data832_scratch_queue -prefect deployment apply prune_data832_scratch-deployment.yaml - - -# prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_scratch -n prune_nersc832_alsdev_scratch -q bl832 -p alcf_prune_pool -q prune_nersc832_alsdev_scratch_queue -# prefect deployment apply prune_nersc832_alsdev_scratch-deployment.yaml \ No newline at end of file diff --git a/scripts/legacy/create_deployments_832_nersc.sh b/scripts/legacy/create_deployments_832_nersc.sh deleted file mode 100755 index 45731f1b..00000000 --- a/scripts/legacy/create_deployments_832_nersc.sh +++ /dev/null @@ -1,26 +0,0 @@ -export $(grep -v '^#' .env | xargs) - -# create 'nersc_flow_pool' -prefect work-pool create 'nersc_flow_pool' -prefect work-pool create 'nersc_prune_pool' - -# nersc_flow_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "nersc_flow_pool" -prefect deployment build ./orchestration/flows/bl832/nersc.py:nersc_recon_flow -n nersc_recon_flow -p nersc_flow_pool -q nersc_recon_flow_queue -prefect deployment apply nersc_recon_flow-deployment.yaml - -# nersc_prune_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "nersc_prune_pool" -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw -n prune_nersc832_alsdev_pscratch_raw -p nersc_prune_pool -q prune_nersc832_pscratch_queue -prefect deployment apply prune_nersc832_alsdev_pscratch_raw-deployment.yaml - -prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch -n prune_nersc832_alsdev_pscratch_scratch -p nersc_prune_pool -q prune_nersc832_pscratch_queue -prefect deployment apply prune_nersc832_alsdev_pscratch_scratch-deployment.yaml - -# nersc_streaming_pool - # in docker-compose.yaml: - # command: prefect agent start --pool "nersc_streaming_pool" -prefect deployment build ./orchestration/flows/bl832/nersc.py:nersc_streaming_flow -n nersc_streaming_flow -p nersc_streaming_pool -q nersc_832_streaming_flow_queue -prefect deployment apply nersc_streaming_flow-deployment.yaml \ No newline at end of file