Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions orchestration/_tests/test_bl733/test_move.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,30 @@ def test_process_new_733_file_task(mocker: MockFixture) -> None:
Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the flow to test.
from orchestration.flows.bl733.move import process_new_733_file_task

# Patch the Secret.load and init_transfer_client in the configuration context.
mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())
# Patch ALL three transfer functions BEFORE importing/instantiating Config733
mocker.patch(
"orchestration.flows.bl733.config.transfer.build_endpoints",
return_value={
"bl733-als-data733": mocker.MagicMock(root_path="/mock/data733"),
"bl733-als-data733_raw": mocker.MagicMock(root_path="/mock/data733_raw"),
"bl733-nersc-alsdev_raw": mocker.MagicMock(root_path="/mock/nersc_raw"),
"bl733-beegfs-data": mocker.MagicMock(root_path="/mock/beegfs"),
}
)
mocker.patch(
"orchestration.flows.bl733.config.transfer.build_apps",
return_value={"als_transfer": "mock_app"}
)

mocker.patch(
"orchestration.flows.bl733.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
return_value=mocker.MagicMock() # Return a mock TransferClient
)

# Instantiate the dummy configuration.
from orchestration.flows.bl733.move import process_new_733_file_task
# Instantiate the mock configuration.
mock_config = Config733()

# Generate a test file path.
Expand All @@ -88,11 +101,11 @@ def test_process_new_733_file_task(mocker: MockFixture) -> None:
mock_transfer_controller = mocker.MagicMock()
mock_transfer_controller.copy.return_value = True

mock_prune = mocker.patch(
"orchestration.flows.bl733.move.prune",
return_value=None
mock_prune_controller = mocker.MagicMock()
mocker.patch(
"orchestration.flows.bl733.move.get_prune_controller",
return_value=mock_prune_controller
)

# Patch get_transfer_controller where it is used in process_new_733_file_task.
mocker.patch(
"orchestration.flows.bl733.move.get_transfer_controller",
Expand All @@ -105,16 +118,15 @@ def test_process_new_733_file_task(mocker: MockFixture) -> None:
# Verify that the transfer controller's copy method was called exactly twice.
assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"
assert mock_prune_controller.prune.call_count == 1, "Prune function should be called exactly once"

# Reset mocks and test with config=None
mock_transfer_controller.copy.reset_mock()
mock_prune.reset_mock()

mock_prune_controller.prune.reset_mock()
result = process_new_733_file_task(file_path=test_file_path, config=None)
assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"
assert mock_prune_controller.prune.call_count == 1, "Prune function should be called exactly once"


def test_dispatcher_733_flow(mocker: MockFixture) -> None:
Expand All @@ -141,14 +153,25 @@ class MockConfig:

# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())

# Patch ALL three transfer functions BEFORE importing Config733
mocker.patch(
"orchestration.flows.bl733.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
"orchestration.flows.bl733.config.transfer.build_endpoints",
return_value={
"bl733-als-data733": mocker.MagicMock(root_path="/mock/data733"),
"bl733-als-data733_raw": mocker.MagicMock(root_path="/mock/data733_raw"),
"bl733-nersc-alsdev_raw": mocker.MagicMock(root_path="/mock/nersc_raw"),
"bl733-beegfs-data": mocker.MagicMock(root_path="/mock/beegfs"),
}
)
# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch(
"orchestration.flows.bl733.move.schedule_prefect_flow",
return_value=None
"orchestration.flows.bl733.config.transfer.build_apps",
return_value={"als_transfer": "mock_app"}
)

mocker.patch(
"orchestration.flows.bl733.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a mock TransferClient
)

# Patch the process_new_733_file_task function to monitor its calls.
Expand Down
30 changes: 30 additions & 0 deletions orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,29 @@ def test_alcf_recon_flow(mocker: MockFixture):
mock_secret.get.return_value = str(uuid4())

mocker.patch('prefect.blocks.system.Secret.load', return_value=mock_secret)
mocker.patch(
"orchestration.flows.bl832.config.transfer.build_endpoints",
return_value={
"spot832": mocker.MagicMock(),
"data832": mocker.MagicMock(),
"data832_raw": mocker.MagicMock(),
"data832_scratch": mocker.MagicMock(),
"nersc832": mocker.MagicMock(),
"nersc_alsdev": mocker.MagicMock(),
"nersc832_alsdev_raw": mocker.MagicMock(),
"nersc832_alsdev_scratch": mocker.MagicMock(),
"nersc832_alsdev_pscratch_raw": mocker.MagicMock(),
"nersc832_alsdev_pscratch_scratch": mocker.MagicMock(),
"nersc832_alsdev_recon_scripts": mocker.MagicMock(),
"alcf832_raw": mocker.MagicMock(),
"alcf832_scratch": mocker.MagicMock(),
}
)
mocker.patch(
"orchestration.flows.bl832.config.transfer.build_apps",
return_value={"als_transfer": "mock_app"}
)

# 2) Patch out the calls in Config832 that do real Globus auth:
# a) init_transfer_client(...) used in the constructor
mocker.patch(
Expand All @@ -243,6 +266,13 @@ def test_alcf_recon_flow(mocker: MockFixture):
return_value=mocker.MagicMock() # pretend FlowsClient
)

mock_settings = mocker.MagicMock()
mock_settings.__getitem__ = lambda self, key: {"scicat": "mock_scicat", "ghcr_images832": "mock_ghcr"}[key]
mocker.patch(
"orchestration.config.settings",
mock_settings
)

# 3) Now import the real code AFTER these patches
from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController
from orchestration.flows.bl832.config import Config832
Expand Down
52 changes: 51 additions & 1 deletion orchestration/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from abc import ABC, abstractmethod
import collections
import builtins
from pathlib import Path
import re
import os

import yaml

from dynaconf import Dynaconf

settings = Dynaconf(
settings_files=["config.yml"],
)


def get_config():
return read_config(config_file=Path(__file__).parent.parent / "config.yml")
Expand Down Expand Up @@ -41,3 +48,46 @@ def expand_environment_variables(config):
return type(config)([expand_environment_variables(v) for v in config])
else:
return config


class BeamlineConfig(ABC):
"""
Base class for beamline configurations.

This class reads the common configuration from disk, builds endpoints and apps,
and initializes the Globus Transfer and Flows clients. Beamline-specific subclasses
must override the _setup_specific_config() method to assign their own attributes.

:param beamline_id: Beamline number identifier with periods (e.g. "8.3.2" or "7.3.3").
:param config: The loaded configuration dictionary.
"""

def __init__(
self,
beamline_id: str
) -> None:
"""
Initialize the BeamlineConfig with beamline-specific settings.
:param beamline_id: Beamline number identifier with periods (e.g. "8.3.2" or "7.3.3").
:raises ValueError: If beamline_id format is invalid.
:return: None
"""
pattern = r'^\d+(\.\d+)+$'
if not re.match(pattern, beamline_id):
raise ValueError(f"Invalid beamline_id format: '{beamline_id}'."
f"Expected format: digits separated by dots (e.g., '8.3.2', '7.0.1.2', '12.3')")
self.beamline_id = beamline_id
self.config = settings
self._beam_specific_config()
self.config = None # Clear reference to config after beam-specific setup

@abstractmethod
def _beam_specific_config(self) -> None:
"""
Set up beamline-specific configuration attributes.

This method must be implemented by subclasses. Typical assignments
include selecting endpoints (using keys that include the beamline ID),
and other beamline-specific parameters.
"""
pass
13 changes: 8 additions & 5 deletions orchestration/flows/bl733/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from globus_sdk import TransferClient

from orchestration.config import BeamlineConfig
from orchestration.globus import transfer


# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged)
class Config733:
class Config733(BeamlineConfig):
def __init__(self) -> None:
config = transfer.get_config()
self.endpoints = transfer.build_endpoints(config)
self.apps = transfer.build_apps(config)
super().__init__(beamline_id="7.3.3")

def _beam_specific_config(self) -> None:
self.endpoints = transfer.build_endpoints(self.config)
self.apps = transfer.build_apps(self.config)
self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"])
self.data733 = self.endpoints["bl733-als-data733"]
self.data733_raw = self.endpoints["bl733-als-data733_raw"]
Expand Down
125 changes: 7 additions & 118 deletions orchestration/flows/bl733/move.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import logging
from pathlib import Path
import os
Expand All @@ -9,8 +8,7 @@

from orchestration.flows.scicat.ingest import scicat_ingest_flow
from orchestration.flows.bl733.config import Config733
from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe
from orchestration.prefect import schedule_prefect_flow
from orchestration.prune_controller import PruneMethod, get_prune_controller
from orchestration.transfer_controller import CopyMethod, get_transfer_controller

logger = logging.getLogger(__name__)
Expand All @@ -37,120 +35,6 @@ def get_common_parent_path(file_paths: list[str]) -> str:
return os.path.commonpath(file_paths)


def prune(
file_path: str = None,
source_endpoint: GlobusEndpoint = None,
check_endpoint: Optional[GlobusEndpoint] = None,
days_from_now: float = 0.0,
config: Config733 = None
) -> bool:
"""
Prune (delete) data from a globus endpoint.
If days_from_now is 0, executes pruning immediately.
Otherwise, schedules pruning for future execution using Prefect.
Args:
file_path (str): The path to the file or directory to prune
source_endpoint (GlobusEndpoint): The globus endpoint containing the data
check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning
days_from_now (float): Delay before pruning; if 0, prune immediately
Returns:
bool: True if pruning was successful or scheduled successfully, False otherwise
"""
logger = get_run_logger()

if not file_path:
logger.error("No file_path provided for pruning operation")
return False

if not source_endpoint:
logger.error("No source_endpoint provided for pruning operation")
return False

if not config:
config = Config733()

if days_from_now < 0:
raise ValueError(f"Invalid days_from_now: {days_from_now}")

logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'")

# convert float days → timedelta
delay: datetime.timedelta = datetime.timedelta(days=days_from_now)

# If days_from_now is 0, prune immediately
if delay.total_seconds() == 0:
logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'")
return _prune_globus_endpoint(
relative_path=file_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config
)
else:
# Otherwise, schedule pruning for future execution
logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' "
f"in {delay.total_seconds()/86400:.1f} days")

try:
schedule_prefect_flow(
deployment_name="prune_globus_endpoint/prune_data733",
flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}",
parameters={
"relative_path": file_path,
"source_endpoint": source_endpoint,
"check_endpoint": check_endpoint,
},
duration_from_now=delay,
)
logger.info(f"Successfully scheduled pruning task for {delay.total_seconds()/86400:.1f} days from now")
return True
except Exception as e:
logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True)
return False

# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls
# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here.


@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{relative_path}")
def _prune_globus_endpoint(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: Optional[GlobusEndpoint] = None,
config: Config733 = None
) -> None:
"""
Prefect flow that performs the actual Globus endpoint pruning operation.
Args:
relative_path (str): The path of the file or directory to prune
source_endpoint (GlobusEndpoint): The Globus endpoint to prune from
check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning
config (BeamlineConfig): Configuration object with transfer client
"""
logger = get_run_logger()

logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'")

if not config:
config = Config733()

globus_settings = Variable.get("globus-settings", _sync=True)
max_wait_seconds = globus_settings["max_wait_seconds"]

flow_name = f"prune_from_{source_endpoint.name}"
logger.info(f"Running flow: {flow_name}")
logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}")
prune_one_safe(
file=relative_path,
if_older_than_days=0,
transfer_client=config.tc,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
logger=logger,
max_wait_seconds=max_wait_seconds
)


@flow(name="new_733_file_flow", flow_run_name="process_new-{file_path[0]}")
def process_new_733_file_flow(
file_path: list[str],
Expand Down Expand Up @@ -261,9 +145,14 @@ def process_new_733_file_task(
logger.info("Step 4: Scheduling pruning from data733")

# Waiting for PR #62 to be merged (prune_controller)
prune_controller = get_prune_controller(
prune_type=PruneMethod.GLOBUS,
config=config
)

bl733_settings = Variable.get("bl733-settings", _sync=True)
for file_path in file_paths:
prune(
prune_controller.prune(
file_path=file_path,
source_endpoint=config.data733_raw,
check_endpoint=config.nersc733_alsdev_raw,
Expand Down
4 changes: 2 additions & 2 deletions orchestration/flows/bl733/prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ deployments:
name: dispatcher_733_pool
work_queue_name: dispatcher_733_queue

- name: prune_data733
entrypoint: orchestration/flows/bl733/move.py:_prune_globus_endpoint
- name: prune_globus_endpoint
entrypoint: orchestration/prune_controller.py:prune_globus_endpoint
work_pool:
name: prune_733_pool
work_queue_name: prune_733_queue
Loading