diff --git a/orchestration/_tests/test_bl733/test_move.py b/orchestration/_tests/test_bl733/test_move.py index 75f26153..3cae2e96 100644 --- a/orchestration/_tests/test_bl733/test_move.py +++ b/orchestration/_tests/test_bl733/test_move.py @@ -68,17 +68,30 @@ def test_process_new_733_file_task(mocker: MockFixture) -> None: Parameters: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. """ - # Import the flow to test. - from orchestration.flows.bl733.move import process_new_733_file_task - # Patch the Secret.load and init_transfer_client in the configuration context. mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) + # Patch ALL three transfer functions BEFORE importing/instantiating Config733 + mocker.patch( + "orchestration.flows.bl733.config.transfer.build_endpoints", + return_value={ + "bl733-als-data733": mocker.MagicMock(root_path="/mock/data733"), + "bl733-als-data733_raw": mocker.MagicMock(root_path="/mock/data733_raw"), + "bl733-nersc-alsdev_raw": mocker.MagicMock(root_path="/mock/nersc_raw"), + "bl733-beegfs-data": mocker.MagicMock(root_path="/mock/beegfs"), + } + ) + mocker.patch( + "orchestration.flows.bl733.config.transfer.build_apps", + return_value={"als_transfer": "mock_app"} + ) + mocker.patch( "orchestration.flows.bl733.config.transfer.init_transfer_client", - return_value=mocker.MagicMock() # Return a dummy TransferClient + return_value=mocker.MagicMock() # Return a mock TransferClient ) - # Instantiate the dummy configuration. + from orchestration.flows.bl733.move import process_new_733_file_task + # Instantiate the mock configuration. mock_config = Config733() # Generate a test file path. @@ -88,11 +101,11 @@ def test_process_new_733_file_task(mocker: MockFixture) -> None: mock_transfer_controller = mocker.MagicMock() mock_transfer_controller.copy.return_value = True - mock_prune = mocker.patch( - "orchestration.flows.bl733.move.prune", - return_value=None + mock_prune_controller = mocker.MagicMock() + mocker.patch( + "orchestration.flows.bl733.move.get_prune_controller", + return_value=mock_prune_controller ) - # Patch get_transfer_controller where it is used in process_new_733_file_task. mocker.patch( "orchestration.flows.bl733.move.get_transfer_controller", @@ -105,16 +118,15 @@ def test_process_new_733_file_task(mocker: MockFixture) -> None: # Verify that the transfer controller's copy method was called exactly twice. assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" - assert mock_prune.call_count == 1, "Prune function should be called exactly once" + assert mock_prune_controller.prune.call_count == 1, "Prune function should be called exactly once" # Reset mocks and test with config=None mock_transfer_controller.copy.reset_mock() - mock_prune.reset_mock() - + mock_prune_controller.prune.reset_mock() result = process_new_733_file_task(file_path=test_file_path, config=None) assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice" assert result is None, "The flow should return None" - assert mock_prune.call_count == 1, "Prune function should be called exactly once" + assert mock_prune_controller.prune.call_count == 1, "Prune function should be called exactly once" def test_dispatcher_733_flow(mocker: MockFixture) -> None: @@ -141,14 +153,25 @@ class MockConfig: # Patch the schedule_prefect_flow call to avoid real Prefect interaction mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()) + + # Patch ALL three transfer functions BEFORE importing Config733 mocker.patch( - "orchestration.flows.bl733.config.transfer.init_transfer_client", - return_value=mocker.MagicMock() # Return a dummy TransferClient + "orchestration.flows.bl733.config.transfer.build_endpoints", + return_value={ + "bl733-als-data733": mocker.MagicMock(root_path="/mock/data733"), + "bl733-als-data733_raw": mocker.MagicMock(root_path="/mock/data733_raw"), + "bl733-nersc-alsdev_raw": mocker.MagicMock(root_path="/mock/nersc_raw"), + "bl733-beegfs-data": mocker.MagicMock(root_path="/mock/beegfs"), + } ) - # Patch the schedule_prefect_flow call to avoid real Prefect interaction mocker.patch( - "orchestration.flows.bl733.move.schedule_prefect_flow", - return_value=None + "orchestration.flows.bl733.config.transfer.build_apps", + return_value={"als_transfer": "mock_app"} + ) + + mocker.patch( + "orchestration.flows.bl733.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a mock TransferClient ) # Patch the process_new_733_file_task function to monitor its calls. diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 42e7c6c3..46cc22b0 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -231,6 +231,29 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_secret.get.return_value = str(uuid4()) mocker.patch('prefect.blocks.system.Secret.load', return_value=mock_secret) + mocker.patch( + "orchestration.flows.bl832.config.transfer.build_endpoints", + return_value={ + "spot832": mocker.MagicMock(), + "data832": mocker.MagicMock(), + "data832_raw": mocker.MagicMock(), + "data832_scratch": mocker.MagicMock(), + "nersc832": mocker.MagicMock(), + "nersc_alsdev": mocker.MagicMock(), + "nersc832_alsdev_raw": mocker.MagicMock(), + "nersc832_alsdev_scratch": mocker.MagicMock(), + "nersc832_alsdev_pscratch_raw": mocker.MagicMock(), + "nersc832_alsdev_pscratch_scratch": mocker.MagicMock(), + "nersc832_alsdev_recon_scripts": mocker.MagicMock(), + "alcf832_raw": mocker.MagicMock(), + "alcf832_scratch": mocker.MagicMock(), + } + ) + mocker.patch( + "orchestration.flows.bl832.config.transfer.build_apps", + return_value={"als_transfer": "mock_app"} + ) + # 2) Patch out the calls in Config832 that do real Globus auth: # a) init_transfer_client(...) used in the constructor mocker.patch( @@ -243,6 +266,13 @@ def test_alcf_recon_flow(mocker: MockFixture): return_value=mocker.MagicMock() # pretend FlowsClient ) + mock_settings = mocker.MagicMock() + mock_settings.__getitem__ = lambda self, key: {"scicat": "mock_scicat", "ghcr_images832": "mock_ghcr"}[key] + mocker.patch( + "orchestration.config.settings", + mock_settings + ) + # 3) Now import the real code AFTER these patches from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController from orchestration.flows.bl832.config import Config832 diff --git a/orchestration/config.py b/orchestration/config.py index 6abf5fc3..f912c27d 100644 --- a/orchestration/config.py +++ b/orchestration/config.py @@ -1,10 +1,17 @@ +from abc import ABC, abstractmethod import collections import builtins from pathlib import Path +import re import os - import yaml +from dynaconf import Dynaconf + +settings = Dynaconf( + settings_files=["config.yml"], +) + def get_config(): return read_config(config_file=Path(__file__).parent.parent / "config.yml") @@ -41,3 +48,46 @@ def expand_environment_variables(config): return type(config)([expand_environment_variables(v) for v in config]) else: return config + + +class BeamlineConfig(ABC): + """ + Base class for beamline configurations. + + This class reads the common configuration from disk, builds endpoints and apps, + and initializes the Globus Transfer and Flows clients. Beamline-specific subclasses + must override the _setup_specific_config() method to assign their own attributes. + + :param beamline_id: Beamline number identifier with periods (e.g. "8.3.2" or "7.3.3"). + :param config: The loaded configuration dictionary. + """ + + def __init__( + self, + beamline_id: str + ) -> None: + """ + Initialize the BeamlineConfig with beamline-specific settings. + :param beamline_id: Beamline number identifier with periods (e.g. "8.3.2" or "7.3.3"). + :raises ValueError: If beamline_id format is invalid. + :return: None + """ + pattern = r'^\d+(\.\d+)+$' + if not re.match(pattern, beamline_id): + raise ValueError(f"Invalid beamline_id format: '{beamline_id}'." + f"Expected format: digits separated by dots (e.g., '8.3.2', '7.0.1.2', '12.3')") + self.beamline_id = beamline_id + self.config = settings + self._beam_specific_config() + self.config = None # Clear reference to config after beam-specific setup + + @abstractmethod + def _beam_specific_config(self) -> None: + """ + Set up beamline-specific configuration attributes. + + This method must be implemented by subclasses. Typical assignments + include selecting endpoints (using keys that include the beamline ID), + and other beamline-specific parameters. + """ + pass diff --git a/orchestration/flows/bl733/config.py b/orchestration/flows/bl733/config.py index 49d1606e..93738745 100644 --- a/orchestration/flows/bl733/config.py +++ b/orchestration/flows/bl733/config.py @@ -1,13 +1,16 @@ from globus_sdk import TransferClient + +from orchestration.config import BeamlineConfig from orchestration.globus import transfer -# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged) -class Config733: +class Config733(BeamlineConfig): def __init__(self) -> None: - config = transfer.get_config() - self.endpoints = transfer.build_endpoints(config) - self.apps = transfer.build_apps(config) + super().__init__(beamline_id="7.3.3") + + def _beam_specific_config(self) -> None: + self.endpoints = transfer.build_endpoints(self.config) + self.apps = transfer.build_apps(self.config) self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) self.data733 = self.endpoints["bl733-als-data733"] self.data733_raw = self.endpoints["bl733-als-data733_raw"] diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 9029e2c8..43310624 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -1,4 +1,3 @@ -import datetime import logging from pathlib import Path import os @@ -9,8 +8,7 @@ from orchestration.flows.scicat.ingest import scicat_ingest_flow from orchestration.flows.bl733.config import Config733 -from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe -from orchestration.prefect import schedule_prefect_flow +from orchestration.prune_controller import PruneMethod, get_prune_controller from orchestration.transfer_controller import CopyMethod, get_transfer_controller logger = logging.getLogger(__name__) @@ -37,120 +35,6 @@ def get_common_parent_path(file_paths: list[str]) -> str: return os.path.commonpath(file_paths) -def prune( - file_path: str = None, - source_endpoint: GlobusEndpoint = None, - check_endpoint: Optional[GlobusEndpoint] = None, - days_from_now: float = 0.0, - config: Config733 = None -) -> bool: - """ - Prune (delete) data from a globus endpoint. - If days_from_now is 0, executes pruning immediately. - Otherwise, schedules pruning for future execution using Prefect. - Args: - file_path (str): The path to the file or directory to prune - source_endpoint (GlobusEndpoint): The globus endpoint containing the data - check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning - days_from_now (float): Delay before pruning; if 0, prune immediately - Returns: - bool: True if pruning was successful or scheduled successfully, False otherwise - """ - logger = get_run_logger() - - if not file_path: - logger.error("No file_path provided for pruning operation") - return False - - if not source_endpoint: - logger.error("No source_endpoint provided for pruning operation") - return False - - if not config: - config = Config733() - - if days_from_now < 0: - raise ValueError(f"Invalid days_from_now: {days_from_now}") - - logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") - - # convert float days → timedelta - delay: datetime.timedelta = datetime.timedelta(days=days_from_now) - - # If days_from_now is 0, prune immediately - if delay.total_seconds() == 0: - logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'") - return _prune_globus_endpoint( - relative_path=file_path, - source_endpoint=source_endpoint, - check_endpoint=check_endpoint, - config=config - ) - else: - # Otherwise, schedule pruning for future execution - logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' " - f"in {delay.total_seconds()/86400:.1f} days") - - try: - schedule_prefect_flow( - deployment_name="prune_globus_endpoint/prune_data733", - flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", - parameters={ - "relative_path": file_path, - "source_endpoint": source_endpoint, - "check_endpoint": check_endpoint, - }, - duration_from_now=delay, - ) - logger.info(f"Successfully scheduled pruning task for {delay.total_seconds()/86400:.1f} days from now") - return True - except Exception as e: - logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) - return False - -# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls -# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. - - -@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{relative_path}") -def _prune_globus_endpoint( - relative_path: str, - source_endpoint: GlobusEndpoint, - check_endpoint: Optional[GlobusEndpoint] = None, - config: Config733 = None -) -> None: - """ - Prefect flow that performs the actual Globus endpoint pruning operation. - Args: - relative_path (str): The path of the file or directory to prune - source_endpoint (GlobusEndpoint): The Globus endpoint to prune from - check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning - config (BeamlineConfig): Configuration object with transfer client - """ - logger = get_run_logger() - - logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") - - if not config: - config = Config733() - - globus_settings = Variable.get("globus-settings", _sync=True) - max_wait_seconds = globus_settings["max_wait_seconds"] - - flow_name = f"prune_from_{source_endpoint.name}" - logger.info(f"Running flow: {flow_name}") - logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") - prune_one_safe( - file=relative_path, - if_older_than_days=0, - transfer_client=config.tc, - source_endpoint=source_endpoint, - check_endpoint=check_endpoint, - logger=logger, - max_wait_seconds=max_wait_seconds - ) - - @flow(name="new_733_file_flow", flow_run_name="process_new-{file_path[0]}") def process_new_733_file_flow( file_path: list[str], @@ -261,9 +145,14 @@ def process_new_733_file_task( logger.info("Step 4: Scheduling pruning from data733") # Waiting for PR #62 to be merged (prune_controller) + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, + config=config + ) + bl733_settings = Variable.get("bl733-settings", _sync=True) for file_path in file_paths: - prune( + prune_controller.prune( file_path=file_path, source_endpoint=config.data733_raw, check_endpoint=config.nersc733_alsdev_raw, diff --git a/orchestration/flows/bl733/prefect.yaml b/orchestration/flows/bl733/prefect.yaml index bf66d2a6..f94b3771 100644 --- a/orchestration/flows/bl733/prefect.yaml +++ b/orchestration/flows/bl733/prefect.yaml @@ -24,8 +24,8 @@ deployments: name: dispatcher_733_pool work_queue_name: dispatcher_733_queue -- name: prune_data733 - entrypoint: orchestration/flows/bl733/move.py:_prune_globus_endpoint +- name: prune_globus_endpoint + entrypoint: orchestration/prune_controller.py:prune_globus_endpoint work_pool: name: prune_733_pool work_queue_name: prune_733_queue diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index ff19a9c3..788eef4a 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -1,12 +1,16 @@ from globus_sdk import TransferClient + +from orchestration.config import BeamlineConfig from orchestration.globus import transfer, flows -class Config832: +class Config832(BeamlineConfig): def __init__(self) -> None: - config = transfer.get_config() - self.endpoints = transfer.build_endpoints(config) - self.apps = transfer.build_apps(config) + super().__init__(beamline_id="8.3.2") + + def _beam_specific_config(self) -> None: + self.endpoints = transfer.build_endpoints(self.config) + self.apps = transfer.build_apps(self.config) self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) self.flow_client = flows.get_flows_client() self.spot832 = self.endpoints["spot832"] @@ -22,5 +26,5 @@ def __init__(self) -> None: self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] self.alcf832_raw = self.endpoints["alcf832_raw"] self.alcf832_scratch = self.endpoints["alcf832_scratch"] - self.scicat = config["scicat"] - self.ghcr_images832 = config["ghcr_images832"] + self.scicat = self.config["scicat"] + self.ghcr_images832 = self.config["ghcr_images832"] diff --git a/orchestration/prune_controller.py b/orchestration/prune_controller.py new file mode 100644 index 00000000..685c92b2 --- /dev/null +++ b/orchestration/prune_controller.py @@ -0,0 +1,385 @@ +from abc import ABC, abstractmethod +import datetime +from enum import Enum +import logging +import os +from typing import Generic, Optional, TypeVar + +from prefect import flow +from prefect.variables import Variable + +from orchestration.config import BeamlineConfig +from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe +from orchestration.prefect import schedule_prefect_flow +from orchestration.transfer_endpoints import FileSystemEndpoint, TransferEndpoint + + +logger = logging.getLogger(__name__) + +Endpoint = TypeVar("Endpoint", bound=TransferEndpoint) + + +class PruneController(Generic[Endpoint], ABC): + """ + Abstract base class for pruning controllers. + + This class defines the common interface that all prune controllers must implement, + regardless of the specific pruning mechanism they use. + + :param config: Configuration object containing endpoints and credentials + """ + def __init__( + self, + config: BeamlineConfig, + ) -> None: + """ + Initialize the prune controller with configuration. + + Args: + config (BeamlineConfig): Configuration object containing endpoints and credentials + """ + self.config = config + logger.debug(f"Initialized {self.__class__.__name__} with config for beamline {config.beamline_id}") + + @abstractmethod + def prune( + self, + file_path: str = None, + source_endpoint: Endpoint = None, + check_endpoint: Optional[Endpoint] = None, + days_from_now: float = 0.0 + ) -> bool: + """ + Prune (delete) data from the source endpoint. + + This method either executes the pruning immediately or schedules it for future execution, + depending on the days_from_now parameter. + + :param file_path: The path to the file or directory to prune + :param source_endpoint: The endpoint containing the data to be pruned + :param check_endpoint: If provided, verify data exists here before pruning + :param days_from_now: Delay in days before pruning; if 0.0, prune immediately. + + :return: True if pruning was successful or scheduled successfully, False otherwise + """ + pass + + +class FileSystemPruneController(PruneController[FileSystemEndpoint]): + """ + Controller for pruning files from local file systems. + + This controller handles pruning operations on local or mounted file systems + using standard file system operations. + + :param config: Configuration object containing file system paths + """ + def __init__( + self, + config: BeamlineConfig + ) -> None: + """ + Initialize the file system prune controller. + + :param config: Configuration object containing file system paths + """ + super().__init__(config) + logger.debug(f"Initialized FileSystemPruneController for beamline {config.beamline_id}") + + def prune( + self, + file_path: str = None, + source_endpoint: FileSystemEndpoint = None, + check_endpoint: Optional[FileSystemEndpoint] = None, + days_from_now: float = 0.0, + ) -> bool: + """ + Prune (delete) data from a file system endpoint. + + If days_from_now is 0, executes pruning immediately. + Otherwise, schedules pruning for future execution using Prefect. + + :param file_path: The path to the file or directory to prune + :param source_endpoint: The file system endpoint containing the data + :param check_endpoint: If provided, verify data exists here before pruning + :param days_from_now: Delay in days before pruning; if 0.0, prune immediately. If <0, throws error. + :return: True if pruning was successful or scheduled successfully, False otherwise + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + if days_from_now < 0: + logger.error("days_from_now cannot be negative") + return False + + flow_name = f"prune_from_{source_endpoint.name}" + logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") + + # convert float days → timedelta + days_from_now: datetime.timedelta = datetime.timedelta(days=days_from_now) + + # If days_from_now is 0, prune immediately + if days_from_now.total_seconds() == 0: + logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'") + try: + prune_filesystem_endpoint( + relative_path=file_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=self.config + ) + return True + except Exception as e: + logger.error(f"Failed to prune file: {str(e)}", exc_info=True) + return False + else: + # Otherwise, schedule pruning for future execution + logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' " + f"in {days_from_now.total_seconds()/86400:.1f} days") + + try: + schedule_prefect_flow( + deployment_name="prune_filesystem_endpoint/prune_filesystem_endpoint", + flow_run_name=flow_name, + parameters={ + "relative_path": file_path, + "source_endpoint": source_endpoint, + "check_endpoint": check_endpoint, + "config": self.config + }, + duration_from_now=days_from_now, + ) + logger.info(f"Successfully scheduled pruning task for {days_from_now.total_seconds()/86400:.1f} days from now") + return True + except Exception as e: + logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) + return False + + +@flow(name="prune_filesystem_endpoint") +def prune_filesystem_endpoint( + relative_path: str, + source_endpoint: FileSystemEndpoint, + check_endpoint: Optional[FileSystemEndpoint] = None, + config: BeamlineConfig = None +) -> None: + """ + Prefect flow that performs the actual filesystem pruning operation. + + :param relative_path: The path of the file or directory to prune + :param source_endpoint: The source endpoint to prune from + :param check_endpoint: If provided, verify data exists here before pruning + :param config: Configuration object, if needed + :return: True if pruning was successful, False otherwise + """ + logger.info(f"Running flow: prune_from_{source_endpoint.name}") + logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") + + # Check if the file exists at the source endpoint using os.path + source_full_path = source_endpoint.full_path(relative_path) + if not os.path.exists(source_full_path): + logger.warning(f"File {relative_path} does not exist at the source: {source_endpoint.name}.") + return False + + # If check_endpoint is provided, verify file exists there before pruning + if check_endpoint is not None: + check_full_path = check_endpoint.full_path(relative_path) + if os.path.exists(check_full_path): + logger.info(f"File {relative_path} exists on the check point: {check_endpoint.name}.") + logger.info("Safe to prune.") + else: + logger.warning(f"File {relative_path} does not exist at the check point: {check_endpoint.name}.") + logger.warning("Not safe to prune.") + return False + + # Now perform the pruning operation + if os.path.isdir(source_full_path): + logger.info(f"Pruning directory {relative_path}") + import shutil + shutil.rmtree(source_full_path) + else: + logger.info(f"Pruning file {relative_path}") + os.remove(source_full_path) + + logger.info(f"Successfully pruned {relative_path} from {source_endpoint.name}") + return True + + +class GlobusPruneController(PruneController[GlobusEndpoint]): + """ + Controller for pruning files from Globus endpoints. + + This controller handles pruning operations on Globus endpoints using + the Globus Transfer API. + + :param config: Configuration object containing Globus endpoints and credentials + """ + def __init__( + self, + config: BeamlineConfig + ) -> None: + """ + Initialize the file system prune controller. + + Args: + config (BeamlineConfig): Configuration object containing file system paths + """ + super().__init__(config) + logger.debug(f"Initialized FileSystemPruneController for beamline {config.beamline_id}") + + def prune( + self, + file_path: str = None, + source_endpoint: GlobusEndpoint = None, + check_endpoint: Optional[GlobusEndpoint] = None, + days_from_now: float = 0.0 + ) -> bool: + """ + Prune (delete) data from a file system endpoint. + + If days_from_now is 0, executes pruning immediately. + Otherwise, schedules pruning for future execution using Prefect. + + :param file_path: The path to the file or directory to prune + :param source_endpoint: The file system endpoint containing the data + :param check_endpoint: If provided, verify data exists here before pruning + :param days_from_now: Delay before pruning; if 0, prune immediately. If <0, throws error. + :return: True if pruning was successful or scheduled successfully, False otherwise + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + if days_from_now < 0: + logger.error("days_from_now cannot be negative") + return False + + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + flow_name = f"prune_from_{source_endpoint.name}" + logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") + + # convert float days → timedelta + days_from_now: datetime.timedelta = datetime.timedelta(days=days_from_now) + + # If days_from_now is 0, prune immediately + if days_from_now.total_seconds() == 0: + logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'") + try: + prune_globus_endpoint( + relative_path=file_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=self.config + ) + return True + except Exception as e: + logger.error(f"Failed to prune file: {str(e)}", exc_info=True) + return False + else: + # Otherwise, schedule pruning for future execution + logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' " + f"in {days_from_now.total_seconds()/86400:.1f} days") + + try: + schedule_prefect_flow( + deployment_name="prune_globus_endpoint/prune_globus_endpoint", + flow_run_name=flow_name, + parameters={ + "relative_path": file_path, + "source_endpoint": source_endpoint, + "check_endpoint": check_endpoint, + "config": self.config + }, + duration_from_now=days_from_now, + ) + logger.info(f"Successfully scheduled pruning task for {days_from_now.total_seconds()/86400:.1f} days from now") + return True + except Exception as e: + logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) + return False + + +@flow(name="prune_globus_endpoint") +def prune_globus_endpoint( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: Optional[GlobusEndpoint] = None, + config: BeamlineConfig = None +) -> None: + """ + Prefect flow that performs the actual Globus endpoint pruning operation. + + :param relative_path: The path of the file or directory to prune + :param source_endpoint: The Globus endpoint to prune from + :param check_endpoint: If provided, verify data exists here before pruning + :param config: Configuration object with transfer client + """ + logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") + + globus_settings = Variable.get("globus-settings", _sync=True) + max_wait_seconds = globus_settings["max_wait_seconds"] + flow_name = f"prune_from_{source_endpoint.name}" + logger.info(f"Running flow: {flow_name}") + logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") + prune_one_safe( + file=relative_path, + if_older_than_days=0, + transfer_client=config.tc, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + logger=logger, + max_wait_seconds=max_wait_seconds + ) + + +class PruneMethod(Enum): + """ + Enum representing different prune methods. + + These values are used to select the appropriate prune controller + through the factory function get_prune_controller(). + + Attributes: + GLOBUS: Use Globus Transfer API for pruning operations + SIMPLE: Use local file system operations for pruning + """ + GLOBUS = "globus" + SIMPLE = "simple" + + +def get_prune_controller( + prune_type: PruneMethod, + config: BeamlineConfig +) -> PruneController: + """ + Factory function to get the appropriate prune controller based on the prune type. + + :param prune_type: The type of pruning to perform + :param config: The configuration object containing endpoint information + + :return: The appropriate prune controller instance + + :raises ValueError: If an invalid prune type is provided + """ + logger.debug(f"Creating prune controller of type: {prune_type.name}") + + if prune_type == PruneMethod.GLOBUS: + logger.debug("Returning GlobusPruneController") + return GlobusPruneController(config) + elif prune_type == PruneMethod.SIMPLE: + logger.debug("Returning FileSystemPruneController") + return FileSystemPruneController(config) + else: + error_msg = f"Invalid prune type: {prune_type}" + logger.error(error_msg) + raise ValueError(error_msg) diff --git a/orchestration/transfer_controller.py b/orchestration/transfer_controller.py index 7a71605a..afd4bebb 100644 --- a/orchestration/transfer_controller.py +++ b/orchestration/transfer_controller.py @@ -9,7 +9,7 @@ import globus_sdk -from orchestration.flows.bl832.config import Config832 +from orchestration.config import BeamlineConfig from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prometheus_utils import PrometheusMetrics @@ -87,7 +87,7 @@ class TransferController(Generic[Endpoint], ABC): """ def __init__( self, - config: Config832 + config: BeamlineConfig ) -> None: self.config = config @@ -121,7 +121,7 @@ class GlobusTransferController(TransferController[GlobusEndpoint]): """ def __init__( self, - config: Config832, + config: BeamlineConfig, prometheus_metrics: Optional[PrometheusMetrics] = None ) -> None: super().__init__(config) @@ -317,7 +317,7 @@ def copy( class SimpleTransferController(TransferController[FileSystemEndpoint]): - def __init__(self, config: Config832) -> None: + def __init__(self, config: BeamlineConfig) -> None: super().__init__(config) """ Use a simple 'cp' command to move data within the same system. @@ -390,7 +390,7 @@ class CopyMethod(Enum): def get_transfer_controller( transfer_type: CopyMethod, - config: Config832, + config: BeamlineConfig, prometheus_metrics: Optional[PrometheusMetrics] = None ) -> TransferController: """ @@ -409,26 +409,3 @@ def get_transfer_controller( return SimpleTransferController(config) else: raise ValueError(f"Invalid transfer type: {transfer_type}") - - -if __name__ == "__main__": - config = Config832() - transfer_type = CopyMethod.GLOBUS - globus_transfer_controller = get_transfer_controller(transfer_type, config) - globus_transfer_controller.copy( - file_path="dabramov/test.txt", - source=config.alcf832_raw, - destination=config.alcf832_scratch - ) - - simple_transfer_controller = get_transfer_controller(CopyMethod.SIMPLE, config) - success = simple_transfer_controller.copy( - file_path="test.rtf", - source=FileSystemEndpoint("source", "/Users/david/Documents/copy_test/test_source/"), - destination=FileSystemEndpoint("destination", "/Users/david/Documents/copy_test/test_destination/") - ) - - if success: - logger.info("Simple transfer succeeded.") - else: - logger.error("Simple transfer failed.") diff --git a/orchestration/transfer_endpoints.py b/orchestration/transfer_endpoints.py new file mode 100644 index 00000000..6f15e5ac --- /dev/null +++ b/orchestration/transfer_endpoints.py @@ -0,0 +1,98 @@ +# orchestration/transfer_endpoints.py +from abc import ABC +from pathlib import Path + + +class TransferEndpoint(ABC): + """ + Abstract base class for endpoints. + """ + def __init__( + self, + name: str, + root_path: str, + uri: str + ) -> None: + """ + Initializes a TransferEndpoint. + + :param name: A human-readable or reference name for the endpoint. + :param root_path: Root path or base directory for this endpoint. + :param uri: Uri for this endpoint. + """ + self.name = name + self.root_path = root_path + self.uri = uri + + def name(self) -> str: + """ + A human-readable or reference name for the endpoint. + """ + return self.name + + def root_path(self) -> str: + """ + Root path or base directory for this endpoint. + """ + return self.root_path + + def uri(self) -> str: + """ + Uri for this endpoint. + """ + return self.uri + + +class FileSystemEndpoint(TransferEndpoint): + """ + A file system endpoint. + + :param TransferEndpoint: Abstract class for endpoints. + """ + def __init__( + self, + name: str, + root_path: str, + uri: str + ) -> None: + super().__init__(name, root_path, uri) + + def full_path( + self, + path_suffix: str + ) -> str: + """ + Constructs the full path by appending the path_suffix to the root_path. + + :param path_suffix: The relative path to append. + :return: The full absolute path. + """ + return str(Path(self.root_path) / path_suffix) + + +class HPSSEndpoint(TransferEndpoint): + """ + An HPSS endpoint. + + Args: + TransferEndpoint: Abstract class for endpoints. + """ + def __init__( + self, + name: str, + root_path: str, + uri: str + ) -> None: + super().__init__(name, root_path, uri) + + def full_path(self, path_suffix: str) -> str: + """ + Constructs the full path by appending the path_suffix to the HPSS endpoint's root_path. + This is used by the HPSS transfer controllers to compute the absolute path on HPSS. + + :param path_suffix: The relative path to append. + :return: The full absolute path. + """ + if path_suffix.startswith("/"): + path_suffix = path_suffix[1:] + return f"{self.root_path.rstrip('/')}/{path_suffix}" diff --git a/requirements.txt b/requirements.txt index 3ed219ca..5f72de3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ authlib +dynaconf globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk globus-sdk>=3.0 griffe>=0.49.0,<2.0.0