diff --git a/config.yml b/config.yml
index d8d990a4..ab350b04 100644
--- a/config.yml
+++ b/config.yml
@@ -145,6 +145,26 @@ globus:
uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3
name: nersc832
+ # 9.3.1 ENDPOINTS
+
+ bl931-beegfs-data:
+ root_path: /beamline_staging/bl931/raw/
+ uri: beegfs.als.lbl.gov
+ uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a
+ name: bl931-beegfs-data
+
+ bl931-compute-dtn:
+ root_path: /
+ uri: compute-dtn.als.lbl.gov
+ uuid: 23af478e-d459-4e78-9753-5091b5fb432a
+ name: bl931-compute-dtn
+
+ bl931-nersc_alsdev_raw:
+ root_path: /global/cfs/cdirs/als/data_mover/9.3.1/raw
+ uri: nersc.gov
+ uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
+ name: bl931-nersc_alsdev_raw
+
globus_apps:
als_transfer:
client_id: ${GLOBUS_CLIENT_ID}
diff --git a/docs/mkdocs/docs/bl931.md b/docs/mkdocs/docs/bl931.md
new file mode 100644
index 00000000..6838b3d0
--- /dev/null
+++ b/docs/mkdocs/docs/bl931.md
@@ -0,0 +1,102 @@
+# Beamline 9.3.1 Flows
+
+This page documents the workflows supported by Splash Flows at [ALS Beamline 9.3.1 (Tender X-ray Spectroscopy)](https://als.lbl.gov/beamlines/9-3-1/).
+
+## Data at 9.3.1
+
+Generates spectroscopy data.
+
+## File Watcher
+
+There is a file watcher on the acquisition system that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps:
+- Copy scans in real time from a Globus collection on the `compute-dtn` server to `NERSC CFS` using Globus Transfer.
+- Copy project data to `NERSC HPSS` for long-term storage (TBD).
+- Analysis on HPC systems (TBD).
+- Ingest into SciCat (TBD).
+- Schedule data pruning from `compute-dtn` and `NERSC CFS`.
+
+## Prefect Configuration
+
+### Registered Flows
+
+#### `dispatcher.py`
+
+The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the `dispatcher()` Flow is called. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code).
+
+#### `move.py`
+
+Flow to process a new file at BL 9.3.1
+1. Copy the file from `compute-dtn` to `NERSC CFS` and ingest the file path and metadata into SciCat.
+2. Schedule pruning from `compute-dtn`.
+3. Copy the file from `NERSC CFS` to `NERSC HPSS`. Ingest the archived file path in SciCat.
+4. Schedule pruning from `NERSC CFS`.
+
+## VM Details
+
+The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff.
+
+`flow-931.als.lbl.gov`
+
+## Flow Diagram
+
+```mermaid
+sequenceDiagram
+ participant DET as Detector/
File Watcher
+ participant DISP as Prefect
Dispatcher
+ participant DTN as compute-dtn
Storage
+ participant GLOB as Globus
Transfer
+ participant CFS as NERSC
CFS
+ participant CAT as SciCat
Metadata
+ participant HPSS as NERSC
HPSS
+
+ %% Initial Trigger
+ DET->>DET: Monitor filesystem
+ DET->>DISP: Trigger on new file
+ DISP->>DISP: Coordinate flows
+
+ %% Flow 1: new_file_931
+ rect rgb(220, 230, 255)
+ note over DISP,CAT: FLOW 1: new_file_931
+ DISP->>GLOB: Init transfer
+ activate GLOB
+ GLOB->>DTN: Read from compute-dtn
+ DTN-->>GLOB: Data
+ GLOB->>CFS: Write to NERSC CFS
+ GLOB-->>DISP: Transfer complete
+ deactivate GLOB
+
+ DISP->>CAT: Register metadata (TBD)
+ end
+
+ %% Flow 2: HPSS Archive
+ rect rgb(220, 255, 230)
+ note over DISP,HPSS: FLOW 2: HPSS Archive (TBD)
+ DISP->>GLOB: Init archive transfer
+ activate GLOB
+ GLOB->>CFS: Read from CFS
+ CFS-->>GLOB: Data
+ GLOB->>HPSS: Write to tape
+ GLOB-->>DISP: Archive complete
+ deactivate GLOB
+
+ DISP->>CAT: Update metadata (TBD)
+ end
+
+ %% Flow 3: Scheduled Pruning
+ rect rgb(255, 255, 220)
+ note over DISP,CFS: FLOW 3: Scheduled Pruning
+ DISP->>DISP: Schedule prune tasks
+
+ DISP->>DTN: Prune from compute-dtn
+ activate DTN
+ DTN->>DTN: Delete expired data
+ DTN-->>DISP: Pruning complete
+ deactivate DTN
+
+ DISP->>CFS: Prune from CFS (after HPSS)
+ activate CFS
+ CFS->>CFS: Delete expired data
+ CFS-->>DISP: Pruning complete
+ deactivate CFS
+ end
+```
\ No newline at end of file
diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml
index 05e6a2d2..3ed99823 100644
--- a/docs/mkdocs/mkdocs.yml
+++ b/docs/mkdocs/mkdocs.yml
@@ -15,8 +15,10 @@ nav:
- Getting Started: getting_started.md
- Beamline Implementations:
- 7.3.3 SAXS/WAXS/GISAXS: bl733.md
- - 8.3.2 Micro Tomography - Compute at ALCF: alcf832.md
- - 8.3.2 Micro Tomography - Compute at NERSC: nersc832.md
+ - Beamline 8.3.2 - Microtomography:
+ - Compute at ALCF: alcf832.md
+ - Compute at NERSC: nersc832.md
+ - Beamline 9.3.1 - Tender X-ray Spectroscopy: bl931.md
- Orchestration: orchestration.md
- Configuration: configuration.md
# - Troubleshooting: troubleshooting.md
diff --git a/init_work_pools.py b/init_work_pools.py
index de150b67..f65aa3ea 100644
--- a/init_work_pools.py
+++ b/init_work_pools.py
@@ -49,10 +49,15 @@ def check_env() -> tuple[str, str, str]:
"""Validate required environment variables and paths."""
beamline = os.environ.get("BEAMLINE")
if not beamline:
- logger.error("Must set BEAMLINE (e.g., 832, 733)")
+ logger.error("Must set BEAMLINE (e.g., 832, 733, dichroism)")
sys.exit(1)
- prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml"
+ # Check if the beamline identifier is a number or a string to get the correct flows folder name
+ if beamline.isdigit():
+ prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml"
+ else:
+ prefect_yaml = f"orchestration/flows/{beamline}/prefect.yaml"
+
if not os.path.isfile(prefect_yaml):
logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!")
sys.exit(1)
diff --git a/orchestration/_tests/test_bl931/__init__.py b/orchestration/_tests/test_bl931/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/orchestration/_tests/test_bl931/test_move.py b/orchestration/_tests/test_bl931/test_move.py
new file mode 100644
index 00000000..72be36ac
--- /dev/null
+++ b/orchestration/_tests/test_bl931/test_move.py
@@ -0,0 +1,205 @@
+'''Pytest unit tests for BL931 move flow. '''
+
+import logging
+import pytest
+from uuid import uuid4
+
+from prefect.testing.utilities import prefect_test_harness
+from prefect.blocks.system import Secret
+from prefect.variables import Variable
+from pytest_mock import MockFixture
+
+from orchestration._tests.test_transfer_controller import MockSecret
+from orchestration.flows.bl931.config import Config931
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+@pytest.fixture(autouse=True, scope="session")
+def prefect_test_fixture():
+ """
+ A pytest fixture that automatically sets up and tears down the Prefect test harness
+ for the entire test session. It creates and saves test secrets and configurations
+ required for Globus integration.
+
+ Yields:
+ None
+ """
+ with prefect_test_harness():
+ globus_client_id = Secret(value=str(uuid4()))
+ globus_client_id.save(name="globus-client-id", overwrite=True)
+
+ globus_client_secret = Secret(value=str(uuid4()))
+ globus_client_secret.save(name="globus-client-secret", overwrite=True)
+
+ Variable.set(
+ name="globus-settings",
+ value={"max_wait_seconds": 600},
+ overwrite=True,
+ _sync=True
+ )
+
+ Variable.set(
+ name="bl931-settings",
+ value={
+ "delete_data931_files_after_days": 180
+ },
+ overwrite=True,
+ _sync=True
+ )
+
+ yield
+
+
+# ----------------------------
+# Tests for 931
+# ----------------------------
+
+def test_process_new_931_file_task(mocker: MockFixture) -> None:
+ """
+ Test the process_new_931_file flow from orchestration.flows.bl931.move.
+
+ This test verifies that:
+ - The get_transfer_controller function is called (patched) with the correct parameters.
+ - The returned transfer controller's copy method is called with the expected file path,
+ source, and destination endpoints from the provided configuration.
+
+ Parameters:
+ mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
+ """
+ # Import the flow to test.
+ from orchestration.flows.bl931.move import process_new_931_file_task
+
+ # Patch the Secret.load and init_transfer_client in the configuration context.
+ mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())
+ mocker.patch(
+ "orchestration.flows.bl931.config.transfer.init_transfer_client",
+ return_value=mocker.MagicMock() # Return a dummy TransferClient
+ )
+ # Patch the schedule_prefect_flow call to avoid real Prefect interaction
+ mocker.patch(
+ "orchestration.flows.bl931.move.schedule_prefect_flow",
+ return_value=None
+ )
+
+ # Instantiate the dummy configuration.
+ mock_config = Config931()
+
+ # Generate a test file path.
+ test_file_path = f"/tmp/test_file_{uuid4()}.txt"
+
+ # Create a mock transfer controller with a mocked 'copy' method.
+ mock_transfer_controller = mocker.MagicMock()
+ mock_transfer_controller.copy.return_value = True
+
+ mock_prune = mocker.patch(
+ "orchestration.flows.bl931.move.prune",
+ return_value=None
+ )
+
+ # Patch get_transfer_controller where it is used in process_new_931_file_task.
+ mocker.patch(
+ "orchestration.flows.bl931.move.get_transfer_controller",
+ return_value=mock_transfer_controller
+ )
+
+ # Execute the move flow with the test file path and mock configuration.
+ result = process_new_931_file_task(file_path=test_file_path, config=mock_config)
+
+ # Verify that the transfer controller's copy method was called exactly once.
+ assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice"
+ assert result is None, "The flow should return None"
+ assert mock_prune.call_count == 1, "Prune function should be called exactly once"
+
+ # Reset mocks and test with config=None
+ mock_transfer_controller.copy.reset_mock()
+ mock_prune.reset_mock()
+
+ result = process_new_931_file_task(file_path=test_file_path, config=None)
+ assert mock_transfer_controller.copy.call_count == 2, "Transfer controller copy method should be called exactly twice"
+ assert result is None, "The flow should return None"
+ assert mock_prune.call_count == 1, "Prune function should be called exactly once"
+
+
+def test_dispatcher_931_flow(mocker: MockFixture) -> None:
+ """
+ Test the dispatcher flow for BL931.
+
+ This test verifies that:
+ - The process_new_931_file_task function is called with the correct parameters
+ when the dispatcher flow is executed.
+ Parameters:
+ mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
+ """
+ # Import the dispatcher flow to test.
+ from orchestration.flows.bl931.dispatcher import dispatcher
+
+ # Create a mock configuration object.
+ class MockConfig:
+ pass
+
+ mock_config = MockConfig()
+
+ # Generate a test file path.
+ test_file_path = f"/tmp/test_file_{uuid4()}.txt"
+
+ # Patch the schedule_prefect_flow call to avoid real Prefect interaction
+ mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())
+ mocker.patch(
+ "orchestration.flows.bl931.config.transfer.init_transfer_client",
+ return_value=mocker.MagicMock() # Return a dummy TransferClient
+ )
+ # Patch the schedule_prefect_flow call to avoid real Prefect interaction
+ mocker.patch(
+ "orchestration.flows.bl931.move.schedule_prefect_flow",
+ return_value=None
+ )
+
+ # Patch the process_new_931_file_task function to monitor its calls.
+ mock_process_new_931_file_task = mocker.patch(
+ "orchestration.flows.bl931.dispatcher.process_new_931_file_task",
+ return_value=None
+ )
+
+ # Execute the dispatcher flow with test parameters.
+ dispatcher(
+ file_path=test_file_path,
+ is_export_control=False,
+ config=mock_config
+ )
+
+ # Verify that process_new_931_file_task was called exactly once with the expected arguments.
+ mock_process_new_931_file_task.assert_called_once_with(
+ file_path=test_file_path,
+ config=mock_config
+ )
+
+ # Verify that process_new_931_file_task is called even when config is None
+ mock_process_new_931_file_task.reset_mock()
+ dispatcher(
+ file_path=test_file_path,
+ is_export_control=False,
+ config=None
+ )
+ mock_process_new_931_file_task.assert_called_once()
+
+ # Test error handling for missing file_path
+ mock_process_new_931_file_task.reset_mock()
+ with pytest.raises(ValueError):
+ dispatcher(
+ file_path=None,
+ is_export_control=False,
+ config=mock_config
+ )
+ mock_process_new_931_file_task.assert_not_called()
+
+ # Test error handling for export control flag
+ mock_process_new_931_file_task.reset_mock()
+ with pytest.raises(ValueError):
+ dispatcher(
+ file_path=test_file_path,
+ is_export_control=True,
+ config=mock_config
+ )
+ mock_process_new_931_file_task.assert_not_called()
diff --git a/orchestration/flows/bl931/__init__.py b/orchestration/flows/bl931/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/orchestration/flows/bl931/config.py b/orchestration/flows/bl931/config.py
new file mode 100644
index 00000000..23b7686c
--- /dev/null
+++ b/orchestration/flows/bl931/config.py
@@ -0,0 +1,14 @@
+from globus_sdk import TransferClient
+from orchestration.globus import transfer
+
+
+# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged)
+class Config931:
+ def __init__(self) -> None:
+ config = transfer.get_config()
+ self.endpoints = transfer.build_endpoints(config)
+ self.apps = transfer.build_apps(config)
+ self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"])
+ self.bl931_compute_dtn = self.endpoints["bl931-compute-dtn"]
+ self.bl931_nersc_alsdev_raw = self.endpoints["bl931-nersc_alsdev_raw"]
+ self.bl931_beegfs = self.endpoints["bl931-beegfs-data"]
diff --git a/orchestration/flows/bl931/dispatcher.py b/orchestration/flows/bl931/dispatcher.py
new file mode 100644
index 00000000..e5d476ca
--- /dev/null
+++ b/orchestration/flows/bl931/dispatcher.py
@@ -0,0 +1,54 @@
+import logging
+from prefect import flow
+from typing import Optional, Union, Any
+
+from orchestration.flows.bl931.config import Config931
+from orchestration.flows.bl931.move import process_new_931_file_task
+
+logger = logging.getLogger(__name__)
+
+
+# TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config931
+@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}")
+def dispatcher(
+ file_path: Optional[str] = None,
+ is_export_control: bool = False,
+ config: Optional[Union[dict, Any]] = None,
+) -> None:
+ """
+ Dispatcher flow for BL931 beamline that launches the new_931_file_flow.
+
+ :param file_path: Path to the file to be processed.
+ :param is_export_control: Flag indicating if export control measures should be applied.
+ (Not used in the current BL931 processing)
+ :param config: Configuration settings for processing.
+ Expected to be an instance of Config931 or a dict that can be converted.
+ :raises ValueError: If no configuration is provided.
+ :raises TypeError: If the provided configuration is not a dict or Config931.
+ """
+
+ logger.info("Starting dispatcher flow for BL 9.3.1")
+ logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}")
+
+ # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running.
+ if file_path is None:
+ logger.error("No file_path provided to dispatcher.")
+ raise ValueError("File path is required for processing.")
+
+ if is_export_control:
+ logger.error("Data is under export control. Processing is not allowed.")
+ raise ValueError("Data is under export control. Processing is not allowed.")
+
+ if config is None:
+ logger.info("No config provided, initializing default Config931.")
+ config = Config931()
+
+ try:
+ process_new_931_file_task(
+ file_path=file_path,
+ config=config
+ )
+ logger.info("Dispatcher flow completed successfully.")
+ except Exception as e:
+ logger.error(f"Error during processing in dispatcher flow: {e}")
+ raise
diff --git a/orchestration/flows/bl931/move.py b/orchestration/flows/bl931/move.py
new file mode 100644
index 00000000..5398eec4
--- /dev/null
+++ b/orchestration/flows/bl931/move.py
@@ -0,0 +1,281 @@
+import datetime
+import logging
+from pathlib import Path
+from typing import Optional
+
+from prefect import flow, get_run_logger, task
+from prefect.variables import Variable
+
+from orchestration.flows.bl931.config import Config931
+from orchestration.flows.scicat.ingest import scicat_ingest_flow
+from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe
+from orchestration.prefect import schedule_prefect_flow
+from orchestration.transfer_controller import CopyMethod, get_transfer_controller
+
+logger = logging.getLogger(__name__)
+
+# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls
+# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here.
+
+
+def prune(
+ file_path: str = None,
+ source_endpoint: GlobusEndpoint = None,
+ check_endpoint: Optional[GlobusEndpoint] = None,
+ days_from_now: float = 0.0,
+ config: Config931 = None
+) -> bool:
+ """
+ Prune (delete) data from a globus endpoint.
+ If days_from_now is 0, executes pruning immediately.
+ Otherwise, schedules pruning for future execution using Prefect.
+ Args:
+ file_path (str): The path to the file or directory to prune
+ source_endpoint (GlobusEndpoint): The globus endpoint containing the data
+ check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning
+ days_from_now (float): Delay before pruning; if 0, prune immediately
+ Returns:
+ bool: True if pruning was successful or scheduled successfully, False otherwise
+ """
+ logger = get_run_logger()
+
+ if not file_path:
+ logger.error("No file_path provided for pruning operation")
+ return False
+
+ if not source_endpoint:
+ logger.error("No source_endpoint provided for pruning operation")
+ return False
+
+ if not config:
+ config = Config931()
+
+ if days_from_now < 0:
+ raise ValueError(f"Invalid days_from_now: {days_from_now}")
+
+ logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'")
+
+ # convert float days → timedelta
+ delay: datetime.timedelta = datetime.timedelta(days=days_from_now)
+
+ # If days_from_now is 0, prune immediately
+ if delay.total_seconds() == 0:
+ logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'")
+ return _prune_globus_endpoint(
+ relative_path=file_path,
+ source_endpoint=source_endpoint,
+ check_endpoint=check_endpoint,
+ config=config
+ )
+ else:
+ # Otherwise, schedule pruning for future execution
+ logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' "
+ f"in {delay.total_seconds()/86400:.1f} days")
+
+ try:
+ schedule_prefect_flow(
+ deployment_name="prune_globus_endpoint/prune_data931",
+ flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}",
+ parameters={
+ "relative_path": file_path,
+ "source_endpoint": source_endpoint,
+ "check_endpoint": check_endpoint,
+ },
+ duration_from_now=delay,
+ )
+ logger.info(f"Successfully scheduled pruning task for {delay.total_seconds()/86400:.1f} days from now")
+ return True
+ except Exception as e:
+ logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True)
+ return False
+
+# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls
+# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here.
+
+
+# @staticmethod
+@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{relative_path}")
+def _prune_globus_endpoint(
+ relative_path: str,
+ source_endpoint: GlobusEndpoint,
+ check_endpoint: Optional[GlobusEndpoint] = None,
+ config: Config931 = None
+) -> None:
+ """
+ Prefect flow that performs the actual Globus endpoint pruning operation.
+ Args:
+ relative_path (str): The path of the file or directory to prune
+ source_endpoint (GlobusEndpoint): The Globus endpoint to prune from
+ check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning
+ config (BeamlineConfig): Configuration object with transfer client
+ """
+ logger = get_run_logger()
+
+ logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'")
+
+ if not config:
+ config = Config931()
+
+ globus_settings = Variable.get("globus-settings", _sync=True)
+ max_wait_seconds = globus_settings["max_wait_seconds"]
+
+ flow_name = f"prune_from_{source_endpoint.name}"
+ logger.info(f"Running flow: {flow_name}")
+ logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}")
+ prune_one_safe(
+ file=relative_path,
+ if_older_than_days=0,
+ transfer_client=config.tc,
+ source_endpoint=source_endpoint,
+ check_endpoint=check_endpoint,
+ logger=logger,
+ max_wait_seconds=max_wait_seconds
+ )
+
+
+@flow(name="new_931_file_flow", flow_run_name="process_new-{file_path}")
+def process_new_931_file_flow(
+ file_path: str,
+ config: Optional[Config931] = None
+) -> None:
+ process_new_931_file_task(
+ file_path=file_path,
+ config=config
+ )
+
+
+@task(name="new_931_file_task")
+def process_new_931_file_task(
+ file_path: str,
+ config: Optional[Config931] = None
+) -> None:
+ """
+ Flow to process a new file at BL 9.3.1
+ 1. Copy the file from the data931 to NERSC CFS. Ingest file path in SciCat.
+ 2. Schedule pruning from data931. 6 months from now.
+ 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat.
+ 4. Schedule pruning from NERSC CFS.
+
+ :param file_path: Path to the new file to be processed.
+ :param config: Configuration settings for processing.
+ """
+ logger = get_run_logger()
+
+ if not file_path:
+ logger.error("No file_path provided")
+ raise ValueError("No file_path provided")
+
+ logger.info(f"Processing new 931 file: {file_path}")
+
+ if not config:
+ logger.info("No config provided, creating default Config931")
+ config = Config931()
+
+ logger.info("Initializing Globus transfer controller")
+ transfer_controller = get_transfer_controller(
+ transfer_type=CopyMethod.GLOBUS,
+ config=config
+ )
+
+ logger.info(f"Step 1: Copying {file_path} from data931 to beegfs ({config.bl931_beegfs.name})")
+
+ beegfs_transfer_success = transfer_controller.copy(
+ file_path=file_path,
+ source=config.bl931_compute_dtn,
+ destination=config.bl931_beegfs
+ )
+ if not beegfs_transfer_success:
+ logger.error("Step 1 failed: Beegfs transfer was not successful")
+ raise Warning("Beegfs transfer failed")
+ else:
+ logger.info("Step 1 complete: File copied to beegfs")
+
+ logger.info(f"Step 2: Copying {file_path} from data931 ({config.bl931_compute_dtn.name}) "
+ f"to NERSC CFS ({config.bl931_nersc_alsdev_raw.name})")
+
+ try:
+ nersc_transfer_success = transfer_controller.copy(
+ file_path=file_path,
+ source=config.bl931_compute_dtn,
+ destination=config.bl931_nersc_alsdev_raw
+ )
+ if not nersc_transfer_success:
+ logger.error("Step 2 failed: NERSC transfer was not successful")
+ raise RuntimeError("Transfer failed")
+
+ logger.info("Step 2 complete: File copied to NERSC CFS")
+ except Exception as e:
+ logger.error(f"Step 2 failed: Could not copy file to NERSC CFS: {e}", exc_info=True)
+ raise
+
+ # logger.info(f"Step 3: Ingesting {file_path} into SciCat")
+
+ # Build beegfs path for SciCat ingestion
+ # Get relative path from source root
+ # try:
+ # rel_path = str(Path(file_path).relative_to(config.bl931_compute_dtn.root_path))
+ # except ValueError:
+ # # Already a relative path
+ # rel_path = file_path.lstrip("/")
+
+ # # Build full beegfs path
+ # beegfs_path = "/global/" + config.bl931_beegfs.root_path.strip("/") + "/" + rel_path
+
+ # logger.info(f"Beegfs path: {beegfs_path}")
+ # try:
+ # scicat_ingest_flow(dataset_path=beegfs_path, ingester_spec="als931_ingester")
+ # logger.info("Step 3 complete: SciCat ingest successful")
+ # except Exception as e:
+ # logger.error(f"SciCat ingest failed with {e}")
+
+ # Waiting for PR #62 to be merged (prune_controller)
+ # TODO: Determine scheduling days_from_now based on beamline needs
+
+ logger.info("Step 4: Scheduling pruning from data931")
+ try:
+ bl931_settings = Variable.get("bl931-settings", _sync=True)
+ days_from_now = bl931_settings.get("delete_data931_files_after_days", 180)
+ logger.info(f"Pruning scheduled for {days_from_now} days from now")
+
+ prune(
+ file_path=file_path,
+ source_endpoint=config.bl931_compute_dtn,
+ check_endpoint=config.bl931_nersc_alsdev_raw,
+ days_from_now=days_from_now
+ )
+ logger.info("Step 4 complete: Pruning scheduled")
+ except Exception as e:
+ logger.error(f"Step 4 failed: Could not schedule pruning: {e}", exc_info=True)
+ raise
+
+ logger.info(f"All steps complete for file: {file_path}")
+
+ # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years?
+ # Waiting for PR #62 to be merged (transfer_controller)
+
+
+@flow(name="move_931_flight_check", flow_run_name="move_931_flight_check-{file_path}")
+def move_931_flight_check(
+ file_path: str = "globus_test/test.txt",
+):
+ """Please keep your arms and legs inside the vehicle at all times."""
+ logger = get_run_logger()
+ logger.info("931 flight check: testing transfer from data931 to NERSC CFS")
+
+ config = Config931()
+
+ transfer_controller = get_transfer_controller(
+ transfer_type=CopyMethod.GLOBUS,
+ config=config
+ )
+
+ success = transfer_controller.copy(
+ file_path=file_path,
+ source=config.bl931_compute_dtn,
+ destination=config.bl931_nersc_alsdev_raw
+ )
+ if success is True:
+ logger.info("931 flight check: transfer successful")
+ else:
+ logger.error("931 flight check: transfer failed")
+ raise RuntimeError("931 flight check: transfer failed")
diff --git a/orchestration/flows/bl931/prefect.yaml b/orchestration/flows/bl931/prefect.yaml
new file mode 100644
index 00000000..50980fe5
--- /dev/null
+++ b/orchestration/flows/bl931/prefect.yaml
@@ -0,0 +1,31 @@
+name: bl931
+prefect-version: 3.4.2
+deployments:
+- name: new_file_931_flow
+ entrypoint: orchestration/flows/bl931/move.py:process_new_931_file_flow
+ work_pool:
+ name: new_file_931_pool
+ work_queue_name: new_file_931_queue
+
+- name: new_file_931_flight_check
+ entrypoint: orchestration/flows/bl931/move.py:move_931_flight_check
+ work_pool:
+ name: new_file_931_pool
+ work_queue_name: move_file_931_flight_check_queue
+ schedules:
+ - cron: "0 */12 * * *" # Every 12 hours
+ slug: "test-move-931-flight-check"
+ timezone: America/Los_Angeles
+ active: true
+
+- name: run_931_dispatcher
+ entrypoint: orchestration/flows/bl931/dispatcher.py:dispatcher
+ work_pool:
+ name: dispatcher_931_pool
+ work_queue_name: dispatcher_931_queue
+
+- name: prune_data931
+ entrypoint: orchestration/flows/bl931/move.py:_prune_globus_endpoint
+ work_pool:
+ name: prune_931_pool
+ work_queue_name: prune_931_queue
diff --git a/requirements.txt b/requirements.txt
index 3ed219ca..9254d57d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,6 @@
authlib
globus-compute-sdk @ git+https://github.com/globus/globus-compute.git@d1731340074be56861ec91d732bdff44f8e2b46e#subdirectory=compute_sdk
+fastapi==0.116.1
globus-sdk>=3.0
griffe>=0.49.0,<2.0.0
h5py