diff --git a/config.yml b/config.yml index d0eec943..5701a1c7 100644 --- a/config.yml +++ b/config.yml @@ -27,13 +27,13 @@ globus: alcf832_raw: root_path: /data/raw uri: alcf.anl.gov - uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110 + uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 name: alcf_raw alcf832_scratch: root_path: /data/scratch uri: alcf.anl.gov - uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110 + uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 name: alcf_scratch alcf_eagle832: diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index b71be618..c5e93b6c 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -305,9 +305,9 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_hpc_reconstruct.assert_called_once() mock_hpc_multires.assert_called_once() - # HPC is done, so there's 1 successful transfer (data832->alcf). - # We have not transferred tiff or zarr => total 1 copy - assert mock_transfer_controller.copy.call_count == 1 + # HPC is done, so there's 2 successful transfer (data832->alcf). + # We have not transferred tiff or zarr => total 2 copies + assert mock_transfer_controller.copy.call_count == 2 mock_schedule_pruning.assert_not_called() # Reset diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 4638d9c1..bdf96ac2 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,13 +1,12 @@ from concurrent.futures import Future import datetime -import logging from pathlib import Path import time from typing import Optional from globus_compute_sdk import Client, Executor from globus_compute_sdk.serialize import CombinedCode -from prefect import flow, task +from prefect import flow, task, get_run_logger from prefect.blocks.system import Secret from prefect.variables import Variable @@ -16,9 +15,6 @@ from orchestration.transfer_controller import get_transfer_controller, CopyMethod from orchestration.prefect import schedule_prefect_flow -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - class ALCFTomographyHPCController(TomographyHPCController): """ @@ -37,6 +33,7 @@ def __init__( super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" + logger = get_run_logger() allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) self.allocation_root = allocation_data.get("alcf-allocation-root-path") if not self.allocation_root: @@ -56,7 +53,7 @@ def reconstruct( Returns: bool: True if the task completed successfully, False otherwise. """ - + logger = get_run_logger() file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name @@ -131,11 +128,13 @@ def build_multi_resolution( Returns: bool: True if the task completed successfully, False otherwise. """ + logger = get_run_logger() + file_name = Path(file_path).stem folder_name = Path(file_path).parent.name tiff_scratch_path = f"{self.allocation_root}/data/scratch/{folder_name}/rec{file_name}/" - raw_path = f"{self.allocation_root}/raw/{folder_name}/{file_name}.h5" + raw_path = f"{self.allocation_root}/data/raw/{folder_name}/{file_name}.h5" iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" iri_als_bl832_conversion_script = f"{self.allocation_root}/scripts/tiff_to_zarr.py" @@ -190,7 +189,8 @@ def _build_multi_resolution_wrapper( def _wait_for_globus_compute_future( future: Future, task_name: str, - check_interval: int = 20 + check_interval: int = 20, + walltime: int = 1200 # seconds = 20 minutes ) -> bool: """ Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running. @@ -199,16 +199,26 @@ def _wait_for_globus_compute_future( future: The future object returned from the Globus Compute Executor submit method. task_name: A descriptive name for the task being executed (used for logging). check_interval: The interval (in seconds) between status checks. + walltime: The maximum time (in seconds) to wait for the task to complete. Returns: - bool: True if the task completed successfully, False otherwise. + bool: True if the task completed successfully within walltime, False otherwise. """ + logger = get_run_logger() + start_time = time.time() success = False try: previous_state = None while not future.done(): + elapsed_time = time.time() - start_time + if elapsed_time > walltime: + logger.error(f"The {task_name} task exceeded the walltime of {walltime} seconds." + "Cancelling the Globus Compute job.") + future.cancel() + return False + # Check if the task was cancelled if future.cancelled(): logger.warning(f"The {task_name} task was cancelled.") @@ -268,6 +278,8 @@ def schedule_prune_task( Returns: bool: True if the task was scheduled successfully, False otherwise. """ + logger = get_run_logger() + try: flow_name = f"delete {location}: {Path(path).name}" schedule_prefect_flow( @@ -315,6 +327,8 @@ def schedule_pruning( Returns: bool: True if the tasks were scheduled successfully, False otherwise. """ + logger = get_run_logger() + pruning_config = Variable.get("pruning-config", _sync=True) if one_minute: @@ -363,6 +377,7 @@ def alcf_recon_flow( Returns: bool: True if the flow completed successfully, False otherwise. """ + logger = get_run_logger() if config is None: config = Config832() @@ -413,6 +428,15 @@ def alcf_recon_flow( else: logger.info("Reconstruction Successful.") + # Transfer A: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + f"at ALCF to {config.data832_scratch} at data832") + data832_tiff_transfer_success = transfer_controller.copy( + file_path=scratch_path_tiff, + source=config.alcf832_scratch, + destination=config.data832_scratch + ) + # STEP 2B: Run the Tiff to Zarr Globus Flow logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") alcf_multi_res_success = tomography_controller.build_multi_resolution( @@ -423,27 +447,14 @@ def alcf_recon_flow( raise ValueError("Tiff to Zarr at ALCF Failed") else: logger.info("Tiff to Zarr Successful.") - - # STEP 3: Send reconstructed data (tiffs and zarr) to data832 - if alcf_reconstruction_success: - # Transfer A: Send reconstructed data (tiff) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - f"at ALCF to {config.data832_scratch} at data832") - data832_tiff_transfer_success = transfer_controller.copy( - file_path=scratch_path_tiff, - source=config.alcf832_scratch, - destination=config.data832_scratch - ) - - if alcf_multi_res_success: - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - f"at ALCF to {config.data832_scratch} at data832") - data832_zarr_transfer_success = transfer_controller.copy( - file_path=scratch_path_zarr, - source=config.alcf832_scratch, - destination=config.data832_scratch - ) + # Transfer B: Send reconstructed data (zarr) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + f"at ALCF to {config.data832_scratch} at data832") + data832_zarr_transfer_success = transfer_controller.copy( + file_path=scratch_path_zarr, + source=config.alcf832_scratch, + destination=config.data832_scratch + ) # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False