Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0bb245a
init commit for bl 9.3.1
davramov Sep 10, 2025
4be78e7
Adjusting 9.3.1 endpoint configuration to use the compute-dtn globus …
davramov Oct 27, 2025
5140103
Adding pytests for bl931 flows
davramov Oct 28, 2025
ffc1964
Making Config931 optional, since it is initialized within the methods…
davramov Oct 28, 2025
cf8e6a7
Updating other tests for prefect 3 support
davramov Oct 28, 2025
c0697c1
Adding documentation for bl 9.3.1 flows
davramov Oct 28, 2025
063fbc3
changing the data description in the documentation
davramov Oct 28, 2025
d36886c
Making the move flow call the move task, and updating the pytest/disp…
davramov Nov 6, 2025
e7d5f75
Adjusting endpoint names in the test movement flow
davramov Dec 3, 2025
899c5cc
Ensuring get_run_logger() is set in the flight check
davramov Dec 3, 2025
08822b1
if the transfer test fails, it should throw a runtime error
davramov Dec 3, 2025
b8a0fd5
updating init_work_pools to handle when beamline id is a number (e.g.…
davramov Dec 4, 2025
da72455
adjusting the test directory path for 931
davramov Dec 8, 2025
b802c95
Using the python=3.13 version of the prefect image
davramov Dec 8, 2025
e55880e
Pinning fastapi==0.116.1; breaks with fastapi==0.124.0
davramov Dec 8, 2025
24db959
Switching JSON Blocks to Variable Blocks
davramov Dec 17, 2025
1f26f32
Adding flow diagram to 9.3.1 documentation
davramov Dec 23, 2025
14c7d12
Updating flight scheck to move globus_test/test.txt
davramov Jan 14, 2026
c8166cf
removing commented out code
davramov Jan 14, 2026
835a2d7
adding _sync=True to Variable.get() call and added missing flow_run_n…
davramov Jan 14, 2026
547c858
Adding verbose logging to process_new_931_file_task
davramov Jan 14, 2026
330fc7c
Adding transfer_success RuntimeError if False
davramov Jan 14, 2026
849867d
Fixing the deployment name when scheduling prune_globus_endpoint
davramov Jan 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM prefecthq/prefect:3.4.2-python3.11
FROM prefecthq/prefect:3.4.2-python3.13

WORKDIR /app
COPY ./requirements.txt /tmp/
Expand Down
17 changes: 17 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
globus:
globus_endpoints:

# 9.3.1 ENDPOINTS

bl931-compute-dtn:
root_path: /
uri: compute-dtn.als.lbl.gov
uuid: 23af478e-d459-4e78-9753-5091b5fb432a
name: bl931-compute-dtn

bl931-nersc_alsdev_raw:
root_path: /global/cfs/cdirs/als/data_mover/9.3.1/raw
uri: nersc.gov
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
name: bl931-nersc_alsdev_raw

# 8.3.2 ENDPOINTS

spot832:
root_path: /
uri: spot832.lbl.gov
Expand Down
102 changes: 102 additions & 0 deletions docs/mkdocs/docs/bl931.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Beamline 9.3.1 Flows

This page documents the workflows supported by Splash Flows at [ALS Beamline 9.3.1 (Tender X-ray Spectroscopy)](https://als.lbl.gov/beamlines/9-3-1/).

## Data at 9.3.1

Generates spectroscopy data.

## File Watcher

There is a file watcher on the acquisition system that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps:
- Copy scans in real time from a Globus collection on the `compute-dtn` server to `NERSC CFS` using Globus Transfer.
- Copy project data to `NERSC HPSS` for long-term storage (TBD).
- Analysis on HPC systems (TBD).
- Ingest into SciCat (TBD).
- Schedule data pruning from `compute-dtn` and `NERSC CFS`.

## Prefect Configuration

### Registered Flows

#### `dispatcher.py`

The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the `dispatcher()` Flow is called. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code).

#### `move.py`

Flow to process a new file at BL 9.3.1
1. Copy the file from `compute-dtn` to `NERSC CFS` and ingest the file path and metadata into SciCat.
2. Schedule pruning from `compute-dtn`.
3. Copy the file from `NERSC CFS` to `NERSC HPSS`. Ingest the archived file path in SciCat.
4. Schedule pruning from `NERSC CFS`.

## VM Details

The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff.

`flow-931.als.lbl.gov`

## Flow Diagram

```mermaid
sequenceDiagram
participant DET as Detector/<br/>File Watcher
participant DISP as Prefect<br/>Dispatcher
participant DTN as compute-dtn<br/>Storage
participant GLOB as Globus<br/>Transfer
participant CFS as NERSC<br/>CFS
participant CAT as SciCat<br/>Metadata
participant HPSS as NERSC<br/>HPSS

%% Initial Trigger
DET->>DET: Monitor filesystem
DET->>DISP: Trigger on new file
DISP->>DISP: Coordinate flows

%% Flow 1: new_file_931
rect rgb(220, 230, 255)
note over DISP,CAT: FLOW 1: new_file_931
DISP->>GLOB: Init transfer
activate GLOB
GLOB->>DTN: Read from compute-dtn
DTN-->>GLOB: Data
GLOB->>CFS: Write to NERSC CFS
GLOB-->>DISP: Transfer complete
deactivate GLOB

DISP->>CAT: Register metadata (TBD)
end

%% Flow 2: HPSS Archive
rect rgb(220, 255, 230)
note over DISP,HPSS: FLOW 2: HPSS Archive (TBD)
DISP->>GLOB: Init archive transfer
activate GLOB
GLOB->>CFS: Read from CFS
CFS-->>GLOB: Data
GLOB->>HPSS: Write to tape
GLOB-->>DISP: Archive complete
deactivate GLOB

DISP->>CAT: Update metadata (TBD)
end

%% Flow 3: Scheduled Pruning
rect rgb(255, 255, 220)
note over DISP,CFS: FLOW 3: Scheduled Pruning
DISP->>DISP: Schedule prune tasks

DISP->>DTN: Prune from compute-dtn
activate DTN
DTN->>DTN: Delete expired data
DTN-->>DISP: Pruning complete
deactivate DTN

DISP->>CFS: Prune from CFS (after HPSS)
activate CFS
CFS->>CFS: Delete expired data
CFS-->>DISP: Pruning complete
deactivate CFS
end
```
7 changes: 5 additions & 2 deletions docs/mkdocs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ nav:
- Home: index.md
- Installation and Requirements: install.md
- Getting Started: getting_started.md
- Compute at ALCF: alcf832.md
- Compute at NERSC: nersc832.md
- Beamline Implementations:
- Beamline 8.3.2 - Microtomography:
- Compute at ALCF: alcf832.md
- Compute at NERSC: nersc832.md
- Beamline 9.3.1 - Tender X-ray Spectroscopy: bl931.md
- Orchestration: orchestration.md
- Configuration: configuration.md
# - Troubleshooting: troubleshooting.md
Expand Down
9 changes: 7 additions & 2 deletions init_work_pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ def check_env() -> tuple[str, str, str]:
"""Validate required environment variables and paths."""
beamline = os.environ.get("BEAMLINE")
if not beamline:
logger.error("Must set BEAMLINE (e.g., 832, 733)")
logger.error("Must set BEAMLINE (e.g., 832, 733, dichroism)")
sys.exit(1)

prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml"
# Check if the beamline identifier is a number or a string to get the correct flows folder name
if beamline.isdigit():
prefect_yaml = f"orchestration/flows/bl{beamline}/prefect.yaml"
else:
prefect_yaml = f"orchestration/flows/{beamline}/prefect.yaml"

if not os.path.isfile(prefect_yaml):
logger.error(f"[Init:{beamline}] Expected {prefect_yaml} not found!")
sys.exit(1)
Expand Down
Empty file.
205 changes: 205 additions & 0 deletions orchestration/_tests/test_bl931/test_move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
'''Pytest unit tests for BL931 move flow. '''

import logging
import pytest
from uuid import uuid4

from prefect.testing.utilities import prefect_test_harness
from prefect.blocks.system import Secret
from prefect.variables import Variable
from pytest_mock import MockFixture

from orchestration._tests.test_transfer_controller import MockSecret
from orchestration.flows.bl931.config import Config931

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
"""
A pytest fixture that automatically sets up and tears down the Prefect test harness
for the entire test session. It creates and saves test secrets and configurations
required for Globus integration.

Yields:
None
"""
with prefect_test_harness():
globus_client_id = Secret(value=str(uuid4()))
globus_client_id.save(name="globus-client-id", overwrite=True)

globus_client_secret = Secret(value=str(uuid4()))
globus_client_secret.save(name="globus-client-secret", overwrite=True)

Variable.set(
name="globus-settings",
value={"max_wait_seconds": 600},
overwrite=True,
_sync=True
)

Variable.set(
name="bl931-settings",
value={
"delete_data931_files_after_days": 180
},
overwrite=True,
_sync=True
)

yield


# ----------------------------
# Tests for 931
# ----------------------------

def test_process_new_931_file_task(mocker: MockFixture) -> None:
"""
Test the process_new_931_file flow from orchestration.flows.bl931.move.

This test verifies that:
- The get_transfer_controller function is called (patched) with the correct parameters.
- The returned transfer controller's copy method is called with the expected file path,
source, and destination endpoints from the provided configuration.

Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the flow to test.
from orchestration.flows.bl931.move import process_new_931_file_task

# Patch the Secret.load and init_transfer_client in the configuration context.
mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())
mocker.patch(
"orchestration.flows.bl931.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
)
# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch(
"orchestration.flows.bl931.move.schedule_prefect_flow",
return_value=None
)

# Instantiate the dummy configuration.
mock_config = Config931()

# Generate a test file path.
test_file_path = f"/tmp/test_file_{uuid4()}.txt"

# Create a mock transfer controller with a mocked 'copy' method.
mock_transfer_controller = mocker.MagicMock()
mock_transfer_controller.copy.return_value = True

mock_prune = mocker.patch(
"orchestration.flows.bl931.move.prune",
return_value=None
)

# Patch get_transfer_controller where it is used in process_new_931_file_task.
mocker.patch(
"orchestration.flows.bl931.move.get_transfer_controller",
return_value=mock_transfer_controller
)

# Execute the move flow with the test file path and mock configuration.
result = process_new_931_file_task(file_path=test_file_path, config=mock_config)

# Verify that the transfer controller's copy method was called exactly once.
assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"

# Reset mocks and test with config=None
mock_transfer_controller.copy.reset_mock()
mock_prune.reset_mock()

result = process_new_931_file_task(file_path=test_file_path, config=None)
assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once"
assert result is None, "The flow should return None"
assert mock_prune.call_count == 1, "Prune function should be called exactly once"


def test_dispatcher_931_flow(mocker: MockFixture) -> None:
"""
Test the dispatcher flow for BL931.

This test verifies that:
- The process_new_931_file_task function is called with the correct parameters
when the dispatcher flow is executed.
Parameters:
mocker (MockFixture): The pytest-mock fixture for patching and mocking objects.
"""
# Import the dispatcher flow to test.
from orchestration.flows.bl931.dispatcher import dispatcher

# Create a mock configuration object.
class MockConfig:
pass

mock_config = MockConfig()

# Generate a test file path.
test_file_path = f"/tmp/test_file_{uuid4()}.txt"

# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())
mocker.patch(
"orchestration.flows.bl931.config.transfer.init_transfer_client",
return_value=mocker.MagicMock() # Return a dummy TransferClient
)
# Patch the schedule_prefect_flow call to avoid real Prefect interaction
mocker.patch(
"orchestration.flows.bl931.move.schedule_prefect_flow",
return_value=None
)

# Patch the process_new_931_file_task function to monitor its calls.
mock_process_new_931_file_task = mocker.patch(
"orchestration.flows.bl931.dispatcher.process_new_931_file_task",
return_value=None
)

# Execute the dispatcher flow with test parameters.
dispatcher(
file_path=test_file_path,
is_export_control=False,
config=mock_config
)

# Verify that process_new_931_file_task was called exactly once with the expected arguments.
mock_process_new_931_file_task.assert_called_once_with(
file_path=test_file_path,
config=mock_config
)

# Verify that process_new_931_file_task is called even when config is None
mock_process_new_931_file_task.reset_mock()
dispatcher(
file_path=test_file_path,
is_export_control=False,
config=None
)
mock_process_new_931_file_task.assert_called_once()

# Test error handling for missing file_path
mock_process_new_931_file_task.reset_mock()
with pytest.raises(ValueError):
dispatcher(
file_path=None,
is_export_control=False,
config=mock_config
)
mock_process_new_931_file_task.assert_not_called()

# Test error handling for export control flag
mock_process_new_931_file_task.reset_mock()
with pytest.raises(ValueError):
dispatcher(
file_path=test_file_path,
is_export_control=True,
config=mock_config
)
mock_process_new_931_file_task.assert_not_called()
6 changes: 3 additions & 3 deletions orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ def prefect_test_fixture():
"""
with prefect_test_harness():
globus_client_id = Secret(value=str(uuid4()))
globus_client_id.save(name="globus-client-id")
globus_client_id.save(name="globus-client-id", overwrite=True)

globus_client_secret = Secret(value=str(uuid4()))
globus_client_secret.save(name="globus-client-secret")
globus_client_secret.save(name="globus-client-secret", overwrite=True)

globus_compute_endpoint = Secret(value=str(uuid4()))
globus_compute_endpoint.save(name="globus-compute-endpoint")
globus_compute_endpoint.save(name="globus-compute-endpoint", overwrite=True)

Variable.set(
name="pruning-config",
Expand Down
Empty file.
Loading