Skip to content
4 changes: 2 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ globus:
alcf832_raw:
root_path: /data/raw
uri: alcf.anl.gov
uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110
uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7
name: alcf_raw

alcf832_scratch:
root_path: /data/scratch
uri: alcf.anl.gov
uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110
uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7
name: alcf_scratch

alcf_eagle832:
Expand Down
6 changes: 3 additions & 3 deletions orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,9 @@ def test_alcf_recon_flow(mocker: MockFixture):

mock_hpc_reconstruct.assert_called_once()
mock_hpc_multires.assert_called_once()
# HPC is done, so there's 1 successful transfer (data832->alcf).
# We have not transferred tiff or zarr => total 1 copy
assert mock_transfer_controller.copy.call_count == 1
# HPC is done, so there's 2 successful transfer (data832->alcf).
# We have not transferred tiff or zarr => total 2 copies
assert mock_transfer_controller.copy.call_count == 2
mock_schedule_pruning.assert_not_called()

# Reset
Expand Down
71 changes: 41 additions & 30 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from concurrent.futures import Future
import datetime
import logging
from pathlib import Path
import time
from typing import Optional

from globus_compute_sdk import Client, Executor
from globus_compute_sdk.serialize import CombinedCode
from prefect import flow, task
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from prefect.variables import Variable

Expand All @@ -16,9 +15,6 @@
from orchestration.transfer_controller import get_transfer_controller, CopyMethod
from orchestration.prefect import schedule_prefect_flow

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class ALCFTomographyHPCController(TomographyHPCController):
"""
Expand All @@ -37,6 +33,7 @@ def __init__(
super().__init__(config)
# Load allocation root from the Prefect JSON block
# The block must be registered with the name "alcf-allocation-root-path"
logger = get_run_logger()
allocation_data = Variable.get("alcf-allocation-root-path", _sync=True)
self.allocation_root = allocation_data.get("alcf-allocation-root-path")
if not self.allocation_root:
Expand All @@ -56,7 +53,7 @@ def reconstruct(
Returns:
bool: True if the task completed successfully, False otherwise.
"""

logger = get_run_logger()
file_name = Path(file_path).stem + ".h5"
folder_name = Path(file_path).parent.name

Expand Down Expand Up @@ -131,11 +128,13 @@ def build_multi_resolution(
Returns:
bool: True if the task completed successfully, False otherwise.
"""
logger = get_run_logger()

file_name = Path(file_path).stem
folder_name = Path(file_path).parent.name

tiff_scratch_path = f"{self.allocation_root}/data/scratch/{folder_name}/rec{file_name}/"
raw_path = f"{self.allocation_root}/raw/{folder_name}/{file_name}.h5"
raw_path = f"{self.allocation_root}/data/raw/{folder_name}/{file_name}.h5"

iri_als_bl832_rundir = f"{self.allocation_root}/data/raw"
iri_als_bl832_conversion_script = f"{self.allocation_root}/scripts/tiff_to_zarr.py"
Expand Down Expand Up @@ -190,7 +189,8 @@ def _build_multi_resolution_wrapper(
def _wait_for_globus_compute_future(
future: Future,
task_name: str,
check_interval: int = 20
check_interval: int = 20,
walltime: int = 1200 # seconds = 20 minutes
) -> bool:
"""
Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running.
Expand All @@ -199,16 +199,26 @@ def _wait_for_globus_compute_future(
future: The future object returned from the Globus Compute Executor submit method.
task_name: A descriptive name for the task being executed (used for logging).
check_interval: The interval (in seconds) between status checks.
walltime: The maximum time (in seconds) to wait for the task to complete.

Returns:
bool: True if the task completed successfully, False otherwise.
bool: True if the task completed successfully within walltime, False otherwise.
"""
logger = get_run_logger()

start_time = time.time()
success = False

try:
previous_state = None
while not future.done():
elapsed_time = time.time() - start_time
if elapsed_time > walltime:
logger.error(f"The {task_name} task exceeded the walltime of {walltime} seconds."
"Cancelling the Globus Compute job.")
future.cancel()
return False

# Check if the task was cancelled
if future.cancelled():
logger.warning(f"The {task_name} task was cancelled.")
Expand Down Expand Up @@ -268,6 +278,8 @@ def schedule_prune_task(
Returns:
bool: True if the task was scheduled successfully, False otherwise.
"""
logger = get_run_logger()

try:
flow_name = f"delete {location}: {Path(path).name}"
schedule_prefect_flow(
Expand Down Expand Up @@ -315,6 +327,8 @@ def schedule_pruning(
Returns:
bool: True if the tasks were scheduled successfully, False otherwise.
"""
logger = get_run_logger()

pruning_config = Variable.get("pruning-config", _sync=True)

if one_minute:
Expand Down Expand Up @@ -363,6 +377,7 @@ def alcf_recon_flow(
Returns:
bool: True if the flow completed successfully, False otherwise.
"""
logger = get_run_logger()

if config is None:
config = Config832()
Expand Down Expand Up @@ -413,6 +428,15 @@ def alcf_recon_flow(
else:
logger.info("Reconstruction Successful.")

# Transfer A: Send reconstructed data (tiff) to data832
logger.info(f"Transferring {file_name} from {config.alcf832_scratch} "
f"at ALCF to {config.data832_scratch} at data832")
data832_tiff_transfer_success = transfer_controller.copy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does Step 2B (the multi-res flow below) depend on data832_tiff_transfer_success being true? If yes, you could wrap 2B in an if/else statement so it doesn't try to run and crash.

Or do a similar value error raise if that's the right behavior like you do here (if I have this right -- if alcf_multi_res_success is not true you don't try to run the zarr transfer "Transfer B" even further below):

            if not alcf_multi_res_success:
                logger.error("Tiff to Zarr Failed.")
                raise ValueError("Tiff to Zarr at ALCF Failed")
            else:
                logger.info("Tiff to Zarr Successful.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multiresolution step does not rely on the TIFF transfer being successful, just that reconstruction was successful. I switched the transfer order so that users have the option to view the TIFF stack at the beamline before waiting for the Zarr conversion to complete (an extra 10+ minute overhead).

file_path=scratch_path_tiff,
source=config.alcf832_scratch,
destination=config.data832_scratch
)

# STEP 2B: Run the Tiff to Zarr Globus Flow
logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}")
alcf_multi_res_success = tomography_controller.build_multi_resolution(
Expand All @@ -423,27 +447,14 @@ def alcf_recon_flow(
raise ValueError("Tiff to Zarr at ALCF Failed")
else:
logger.info("Tiff to Zarr Successful.")

# STEP 3: Send reconstructed data (tiffs and zarr) to data832
if alcf_reconstruction_success:
# Transfer A: Send reconstructed data (tiff) to data832
logger.info(f"Transferring {file_name} from {config.alcf832_scratch} "
f"at ALCF to {config.data832_scratch} at data832")
data832_tiff_transfer_success = transfer_controller.copy(
file_path=scratch_path_tiff,
source=config.alcf832_scratch,
destination=config.data832_scratch
)

if alcf_multi_res_success:
# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {config.alcf832_scratch} "
f"at ALCF to {config.data832_scratch} at data832")
data832_zarr_transfer_success = transfer_controller.copy(
file_path=scratch_path_zarr,
source=config.alcf832_scratch,
destination=config.data832_scratch
)
# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {config.alcf832_scratch} "
f"at ALCF to {config.data832_scratch} at data832")
data832_zarr_transfer_success = transfer_controller.copy(
file_path=scratch_path_zarr,
source=config.alcf832_scratch,
destination=config.data832_scratch
)

# Place holder in case we want to transfer to NERSC for long term storage
nersc_transfer_success = False
Expand Down