Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,11 @@ globus:
uuid: 26b3d2cf-fd80-4a64-a78f-38a155aca926
name: data733_raw

bl733-lamarr-global:
root_path: /bl733/
uri: lamarr.als.lbl.gov
uuid: dbaac176-b1f7-4134-979a-0b1668786d11
name: bl733-lamarr-global

bl733-lamarr-beamlines:
root_path: /bl733/
uri: lamarr.als.lbl.gov
uuid: aee983fc-826e-4081-bfb2-62529970540d
name: bl733-lamarr-beamlines

bl733-beegfs-data:
root_path: /beamline_staging/bl733/raw/
uri: beegfs.als.lbl.gov
uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a
name: bl733-beegfs-data
# 8.3.2 ENDPOINTS

spot832:
Expand Down
2 changes: 1 addition & 1 deletion orchestration/flows/bl733/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ def __init__(self) -> None:
self.data733 = self.endpoints["bl733-als-data733"]
self.data733_raw = self.endpoints["bl733-als-data733_raw"]
self.nersc733_alsdev_raw = self.endpoints["bl733-nersc-alsdev_raw"]
self.lamarr733 = self.endpoints["bl733-lamarr-beamlines"]
self.beegfs733 = self.endpoints["bl733-beegfs-data"]
40 changes: 33 additions & 7 deletions orchestration/flows/bl733/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from prefect import flow
from pathlib import Path
from prefect import flow, runtime
from typing import Optional, Union, Any

from orchestration.flows.bl733.move import process_new_733_file_task
Expand All @@ -8,29 +9,54 @@
logger = logging.getLogger(__name__)


def generate_flow_run_name() -> str:
"""Generate flow run name from runtime parameters."""
params = runtime.flow_run.parameters
file_path = params.get("file_path")
if file_path is None:
return "dispatcher-no_file"
elif isinstance(file_path, str):
return f"dispatcher-{Path(file_path).name}"
elif len(file_path) == 1:
return f"dispatcher-{Path(file_path[0]).name}"
else:
return f"dispatcher-{Path(file_path[0]).name}_+{len(file_path)-1}_more"


# TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config733
@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}")
@flow(name="dispatcher", flow_run_name=generate_flow_run_name)
def dispatcher(
file_path: Optional[str] = None,
file_path: Optional[Union[str, list[str]]] = None,
is_export_control: bool = False,
config: Optional[Union[dict, Any]] = None,
) -> None:
"""
Dispatcher flow for BL733 beamline that launches the new_733_file_flow.

:param file_path: Path to the file to be processed.
:param file_path: Path(s) to the file(s) to be processed. Can be a single path (str)
or multiple paths (list[str]).
:param is_export_control: Flag indicating if export control measures should be applied.
(Not used in the current BL733 processing)
:param config: Configuration settings for processing.
If not provided, a default Config733 is instantiated.
:raises ValueError: If no file_path is provided.
"""

logger.info("Starting dispatcher flow for BL 7.3.3")
# Normalize file_path to a list
if file_path is None:
file_paths = []
elif isinstance(file_path, str):
file_paths = [file_path]
else:
file_paths = file_path

num_files = len(file_paths)

logger.info(f"Starting dispatcher flow for BL 7.3.3 with {num_files} file(s)")
logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}")

# Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running.
if file_path is None:
if not file_paths: # returns True for empty list
logger.error("No file_path provided to dispatcher.")
raise ValueError("File path is required for processing.")

Expand All @@ -46,7 +72,7 @@ def dispatcher(
file_path=file_path,
config=config
)
logger.info("Dispatcher flow completed successfully.")
logger.info(f"Dispatcher flow completed successfully for {num_files} file(s).")
except Exception as e:
logger.error(f"Error during processing in dispatcher flow: {e}")
raise
107 changes: 85 additions & 22 deletions orchestration/flows/bl733/move.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import logging
from pathlib import Path
import os
from typing import Optional

from prefect import flow, get_run_logger, task
Expand All @@ -18,6 +19,24 @@
# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here.


def get_common_parent_path(file_paths: list[str]) -> str:
"""
Find the highest common parent directory for a list of file paths.

:param file_paths: List of file paths
:return: Common parent directory path
"""
if not file_paths:
raise ValueError("No file paths provided")

if len(file_paths) == 1:
# Single file - return its parent directory
return str(Path(file_paths[0]).parent)

# Use os.path.commonpath for multiple files
return os.path.commonpath(file_paths)


def prune(
file_path: str = None,
source_endpoint: GlobusEndpoint = None,
Expand Down Expand Up @@ -74,13 +93,12 @@ def prune(

try:
schedule_prefect_flow(
deployment_name="prune_globus_endpoint/prune_globus_endpoint",
flow_name=f"prune_globus-{source_endpoint.name}-{file_path}",
deployment_name="prune_globus_endpoint/prune_data733",
flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}",
parameters={
"relative_path": file_path,
"source_endpoint": source_endpoint,
"check_endpoint": check_endpoint,
"config": config
},
duration_from_now=delay,
)
Expand Down Expand Up @@ -133,9 +151,9 @@ def _prune_globus_endpoint(
)


@flow(name="new_733_file_flow", flow_run_name="process_new-{file_path}")
@flow(name="new_733_file_flow", flow_run_name="process_new-{file_path[0]}")
def process_new_733_file_flow(
file_path: str,
file_path: list[str],
config: Optional[Config733] = None
) -> None:
"""
Expand All @@ -145,7 +163,7 @@ def process_new_733_file_flow(
3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat.
4. Schedule pruning from NERSC CFS.

:param file_path: Path to the new file to be processed.
:param file_paths: Paths to the new files to be processed.
:param config: Configuration settings for processing.
:return: None
"""
Expand All @@ -157,58 +175,103 @@ def process_new_733_file_flow(

@task(name="new_733_file_task")
def process_new_733_file_task(
file_path: str,
file_path: list[str],
config: Optional[Config733] = None
) -> None:
"""
Task to process new data at BL 7.3.3
1. Copy the data from data733 to Lamarr (our common staging area).
1. Copy the data from data733 to beegfs (our common staging area).
2. Copy the file from the data733 to NERSC CFS.
3. Ingest the data from Lamarr into SciCat.
3. Ingest the data from beegfs into SciCat.
4. Schedule pruning from data733 for 6 months from now.
5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future.

:param file_path: Path to the new file to be processed.
:param file_paths: Path(s) to the new file(s) to be processed.
:param config: Configuration settings for processing.
"""
logger = get_run_logger()

logger.info(f"Processing new 733 file: {file_path}")
if file_path is None:
file_paths = []
elif isinstance(file_path, str):
file_paths = [file_path]
else:
file_paths = file_path

if not file_paths:
logger.error("No file_paths provided")
raise ValueError("No file_paths provided")

logger.info(f"Processing new 733 files: {file_paths}")

if not config:
logger.info("No config provided, creating default Config733")
config = Config733()

common_path = get_common_parent_path(file_paths)
logger.info(f"Common parent path: {common_path}")

logger.info("Initializing Globus transfer controller")
transfer_controller = get_transfer_controller(
transfer_type=CopyMethod.GLOBUS,
config=config
)

logger.info(f"Step 1: Copying {common_path} from data733 to beegfs ({config.beegfs733.name})")

transfer_controller.copy(
file_path=file_path,
file_path=common_path,
source=config.data733_raw,
destination=config.lamarr733
destination=config.beegfs733
)
logger.info("Step 1 complete: File copied to beegfs")

logger.info(f"Step 2: Copying {common_path} from data733 to NERSC CFS ({config.nersc733_alsdev_raw.name})")
transfer_controller.copy(
file_path=file_path,
file_path=common_path,
source=config.data733_raw,
destination=config.nersc733_alsdev_raw
)

# Note that the SciCat ingester assumes the data is on Lamarr.
logger.info("Step 2 complete: File copied to NERSC CFS")

logger.info(f"Step 3: Ingesting {len(file_paths)} into SciCat")

# Build beegfs paths for SciCat ingestion
beegfs_file_paths = []
for fp in file_paths:
# Get relative path from source root
try:
rel_path = str(Path(fp).relative_to(config.data733_raw.root_path))
except ValueError:
# Already a relative path
rel_path = fp.lstrip("/")

# Build full beegfs path
beegfs_path = "/global/" + config.beegfs733.root_path.strip("/") + "/" + rel_path
beegfs_file_paths.append(beegfs_path)

logger.info(f"Beegfs paths: {beegfs_file_paths}")
try:
scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs")
scicat_ingest_flow(file_paths=beegfs_file_paths, ingester_spec="als733_saxs")
logger.info("Step 3 complete: SciCat ingest successful")
except Exception as e:
logger.error(f"SciCat ingest failed with {e}")

logger.info("Step 4: Scheduling pruning from data733")

# Waiting for PR #62 to be merged (prune_controller)
bl733_settings = Variable.get("bl733-settings", _sync=True)
prune(
file_path=file_path,
source_endpoint=config.data733_raw,
check_endpoint=config.nersc733_alsdev_raw,
days_from_now=bl733_settings["delete_data733_files_after_days"] # 180 days
)
for file_path in file_paths:
prune(
file_path=file_path,
source_endpoint=config.data733_raw,
check_endpoint=config.nersc733_alsdev_raw,
days_from_now=bl733_settings["delete_data733_files_after_days"] # 180 days
)

logger.info("Step 4 complete: Pruning scheduled")
logger.info(f"All steps complete for {len(file_paths)} file(s)")

# TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years?
# Waiting for PR #62 to be merged (transfer_controller)
Expand Down