Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM prefecthq/prefect:2.20.17-python3.11
FROM prefecthq/prefect:3.4.2-python3.13

WORKDIR /app
COPY ./requirements.txt /tmp/
Expand Down
17 changes: 17 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
globus:
globus_endpoints:

# 5.3.1 ENDPOINTS

bl531-nersc_alsdev:
root_path: /global/cfs/cdirs/als/gsharing/data_mover/531
uri: nersc.gov
uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3
name: bl531-nersc_alsdev

bl531-compute-dtn:
root_path: /
uri: compute-dtn.als.lbl.gov
uuid: TBD
name: bl531-compute-dtn

# 8.3.2 ENDPOINTS

spot832:
root_path: /
uri: spot832.lbl.gov
Expand Down
151 changes: 151 additions & 0 deletions init_work_pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""
init_work_pools.py

Description:
Initializes Prefect work pools and deployments for the beamline defined by the BEAMLINE environment variable.
Uses orchestration/flows/bl"$BEAMLINE"/prefect.yaml as the single source of truth.

Requirements:
- BEAMLINE must be set (e.g., 832).
- A prefect.yaml file must exist in orchestration/flows/bl"$BEAMLINE"/.
- Prefect CLI must be installed and available in PATH.

Behavior:
- Waits until the Prefect server is reachable via its /health endpoint.
- Creates any missing work pools defined in the beamline's prefect.yaml.
- Deploys all flows defined in the beamline's prefect.yaml.
- Creates/updates Prefect Secret blocks for GLOBUS_CLIENT_ID and GLOBUS_CLIENT_SECRET
if the corresponding environment variables are present. Otherwise warns and continues.


Environment Variables:
BEAMLINE The beamline identifier (e.g., 832). Required.
PREFECT_API_URL Override the Prefect server API URL.
Default: http://prefect_server:4200/api
"""

import httpx
import logging
import os
import subprocess
import sys
import time
import yaml

from prefect.blocks.system import Secret


# ---------------- Logging Setup ---------------- #
logger = logging.getLogger("init_work_pools")
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# ------------------------------------------------ #


def check_env() -> tuple[str, str, str]:
"""Validate required environment variables and paths."""
beamline = os.environ.get("BEAMLINE")
if not beamline:
logger.error("Must set BEAMLINE (e.g., 832, 733)")
sys.exit(1)

prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml"
if not os.path.isfile(prefect_yaml):
logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!")
sys.exit(1)

api_url = os.environ.get("PREFECT_API_URL", "http://prefect_server:4200/api")
return beamline, prefect_yaml, api_url


def wait_for_prefect_server(api_url: str, beamline: str, timeout: int = 180):
"""Wait until Prefect server health endpoint responds (unless using Prefect Cloud)."""
if "api.prefect.cloud" in api_url:
logger.info(f"[Init:{beamline}] Prefect Cloud detected — skipping health check.")
return

health_url = f"{api_url}/health"
logger.info(f"[Init:{beamline}] Waiting for Prefect server at {health_url}...")

start = time.time()
while time.time() - start < timeout:
try:
r = httpx.get(health_url, timeout=2.0)
if r.status_code == 200:
logger.info(f"[Init:{beamline}] Prefect server is up.")
return
except Exception:
pass
logger.warning(f"[Init:{beamline}] Still waiting...")
time.sleep(3)

logger.error(f"[Init:{beamline}] Prefect server did not become ready in time.")
sys.exit(1)


def ensure_work_pools(prefect_yaml: str, beamline: str):
"""Ensure that all work pools from prefect.yaml exist (create if missing)."""
with open(prefect_yaml, "r") as f:
config = yaml.safe_load(f)

pools = {d["work_pool"]["name"] for d in config.get("deployments", []) if "work_pool" in d}

for pool in pools:
logger.info(f"[Init:{beamline}] Ensuring pool: {pool}")
try:
subprocess.run(
["prefect", "work-pool", "inspect", pool],
check=True,
capture_output=True,
)
logger.info(f"[Init:{beamline}] Work pool '{pool}' already exists.")
except subprocess.CalledProcessError:
logger.info(f"[Init:{beamline}] Creating work pool: {pool}")
subprocess.run(
["prefect", "work-pool", "create", pool, "--type", "process"],
check=True,
)


def deploy_flows(prefect_yaml: str, beamline: str):
"""Deploy flows defined in prefect.yaml using Prefect CLI."""
logger.info(f"[Init:{beamline}] Deploying flows from {prefect_yaml}...")
subprocess.run(
["prefect", "--no-prompt", "deploy", "--prefect-file", prefect_yaml, "--all"],
check=True,
)
logger.info(f"[Init:{beamline}] Done.")


def ensure_globus_secrets(beamline: str):
globus_client_id = os.environ.get("GLOBUS_CLIENT_ID")
globus_client_secret = os.environ.get("GLOBUS_CLIENT_SECRET")

if globus_client_id and globus_client_secret:
# Create or update Prefect Secret blocks for Globus credentials
try:
Secret(value=globus_client_id).save(name="globus-client-id", overwrite=True)
Secret(value=globus_client_secret).save(name="globus-client-secret", overwrite=True)
logger.info(f"[Init:{beamline}] Created/updated Prefect Secret blocks for Globus credentials.")
except Exception as e:
logger.warning(f"[Init:{beamline}] Failed to create/update Prefect Secret blocks: {str(e)}")


def main():
beamline, prefect_yaml, api_url = check_env()
logger.info(f"[Init:{beamline}] Using prefect.yaml at {prefect_yaml}")
wait_for_prefect_server(api_url, beamline)
ensure_globus_secrets(beamline)
ensure_work_pools(prefect_yaml, beamline)
deploy_flows(prefect_yaml, beamline)


if __name__ == "__main__":
main()
Empty file.
186 changes: 186 additions & 0 deletions orchestration/_tests/test_bl531/test_move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
'''Pytest unit tests for BL531 move flow. '''

import logging
import pytest
from uuid import uuid4

from prefect.testing.utilities import prefect_test_harness
from prefect.blocks.system import Secret, JSON
from pytest_mock import MockFixture

from orchestration._tests.test_transfer_controller import MockSecret
from orchestration.flows.bl531.config import Config531

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
"""
A pytest fixture that automatically sets up and tears down the Prefect test harness
for the entire test session. It creates and saves test secrets and configurations
required for Globus integration.

Yields:
None
"""
with prefect_test_harness():
globus_client_id = Secret(value=str(uuid4()))
globus_client_id.save(name="globus-client-id", overwrite=True)

globus_client_secret = Secret(value=str(uuid4()))
globus_client_secret.save(name="globus-client-secret", overwrite=True)

pruning_config = JSON(value={"max_wait_seconds": 600})
pruning_config.save(name="pruning-config", overwrite=True)

yield


# ----------------------------
# Tests for 531
# ----------------------------

def test_process_new_531_file_task(mocker: MockFixture) -> None:
"""
Test the process_new_531_file_task flow from orchestration.flows.bl531.move.

This test verifies that:
- The get_transfer_controller function is called (patched) with the correct parameters.
- The returned transfer controller's copy method is called with the expected file path,
source, and destination endpoints from the provided configuration.

Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the flow to test.
from orchestration.flows.bl531.move import process_new_531_file_task

# Patch the Secret.load and init_transfer_client in the configuration context.
with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()):
mocker.patch(
"orchestration.flows.bl531.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
)

# Instantiate the dummy configuration.
mock_config = Config531()

# Generate a test file path.
test_file_path = f"/tmp/test_file_{uuid4()}.txt"

# Create a mock transfer controller with a mocked 'copy' method.
mock_transfer_controller = mocker.MagicMock()
mock_transfer_controller.copy.return_value = True

mock_prune = mocker.patch(
"orchestration.flows.bl531.move.prune",
return_value=None
)

# Patch get_transfer_controller where it is used in process_new_531_file_task.
mocker.patch(
"orchestration.flows.bl531.move.get_transfer_controller",
return_value=mock_transfer_controller
)

# Execute the move flow with the test file path and mock configuration.
result = process_new_531_file_task(file_path=test_file_path, config=mock_config)

# Verify that the transfer controller's copy method was called exactly once.
assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"

# Reset mocks and test with config=None
mock_transfer_controller.copy.reset_mock()
mock_prune.reset_mock()

result = process_new_531_file_task(file_path=test_file_path, config=None)
assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"


def test_dispatcher_531_flow(mocker: MockFixture) -> None:
"""
Test the dispatcher flow for BL531.

This test verifies that:
- The process_new_531_file_task function is called with the correct parameters
when the dispatcher flow is executed.
Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the dispatcher flow to test.
from orchestration.flows.bl531.dispatcher import dispatcher

# Create a mock configuration object.
class MockConfig:
pass

mock_config = MockConfig()

# Generate a test file path.
test_file_path = f"/tmp/test_file_{uuid4()}.txt"

# Patch the schedule_prefect_flow call to avoid real Prefect interaction
with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()):
mocker.patch(
"orchestration.flows.bl531.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
)
# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch(
"orchestration.flows.bl531.move.schedule_prefect_flow",
return_value=None
)

# Patch the process_new_531_file_task function to monitor its calls.
mock_process_new_531_file_task = mocker.patch(
"orchestration.flows.bl531.dispatcher.process_new_531_file_task",
return_value=None
)

# Execute the dispatcher flow with test parameters.
dispatcher(
file_path=test_file_path,
is_export_control=False,
config=mock_config
)

# Verify that process_new_531_file_task was called exactly once with the expected arguments.
mock_process_new_531_file_task.assert_called_once_with(
file_path=test_file_path,
config=mock_config
)

# Verify that process_new_531_file_task_task is called even when config is None
mock_process_new_531_file_task.reset_mock()
dispatcher(
file_path=test_file_path,
is_export_control=False,
config=None
)
mock_process_new_531_file_task.assert_called_once()

# Test error handling for missing file_path
mock_process_new_531_file_task.reset_mock()
with pytest.raises(ValueError):
dispatcher(
file_path=None,
is_export_control=False,
config=mock_config
)
mock_process_new_531_file_task.assert_not_called()

# Test error handling for export control flag
mock_process_new_531_file_task.reset_mock()
with pytest.raises(ValueError):
dispatcher(
file_path=test_file_path,
is_export_control=True,
config=mock_config
)
mock_process_new_531_file_task.assert_not_called()
Loading