From 61f84ad03347e9a0782065e590885cf9d0d7fcd6 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 8 Apr 2025 15:18:50 -0700 Subject: [PATCH 1/2] Updating alcf flow to read the Prefect JSON block alcf-allocation-root-path for pointing towards script locations on Eagle. Also updating dispatcher.py to use the correct name for nersc_recon_flow. --- config.yml | 4 ++-- orchestration/flows/bl832/alcf.py | 29 +++++++++++++++---------- orchestration/flows/bl832/dispatcher.py | 12 +++++----- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/config.yml b/config.yml index 3624ef3e..e9936adc 100644 --- a/config.yml +++ b/config.yml @@ -27,13 +27,13 @@ globus: alcf832_raw: root_path: /data/raw uri: alcf.anl.gov - uuid: a89365da-e614-48fe-9ee5-f5e1698a68c8 # 55c3adf6-31f1-4647-9a38-52591642f7e7 + uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110 name: alcf_raw alcf832_scratch: root_path: /data/scratch uri: alcf.anl.gov - uuid: a89365da-e614-48fe-9ee5-f5e1698a68c8 # 55c3adf6-31f1-4647-9a38-52591642f7e7 + uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110 name: alcf_scratch alcf_eagle832: diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 7ecd6e1a..087e5bf5 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -33,6 +33,13 @@ def __init__( config: Config832 ) -> None: super().__init__(config) + # Load allocation root from the Prefect JSON block + # The block must be registered with the name "alcf-allocation-root-path" + allocation_data = JSON.load("alcf-allocation-root-path").value + self.allocation_root = allocation_data.get("alcf-allocation-root-path") + if not self.allocation_root: + raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") + logger.info(f"Allocation root loaded: {self.allocation_root}") def reconstruct( self, @@ -51,8 +58,8 @@ def reconstruct( file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name - iri_als_bl832_rundir = "/eagle/IRI-ALS-832/data/raw" - iri_als_bl832_recon_script = "/eagle/IRI-ALS-832/scripts/globus_reconstruction.py" + iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" + iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py" gcc = Client(code_serialization_strategy=CombinedCode()) @@ -70,8 +77,8 @@ def reconstruct( @staticmethod def _reconstruct_wrapper( - rundir: str = "/eagle/IRI-ALS-832/data/raw", - script_path: str = "/eagle/IRI-ALS-832/scripts/globus_reconstruction.py", + rundir: str = "/eagle/IRIProd/ALS/data/raw", + script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py", h5_file_name: str = None, folder_path: str = None ) -> str: @@ -125,11 +132,11 @@ def build_multi_resolution( file_name = Path(file_path).stem folder_name = Path(file_path).parent.name - tiff_scratch_path = f"/eagle/IRI-ALS-832/data/scratch/{folder_name}/rec{file_name}/" - raw_path = f"/eagle/IRI-ALS-832/data/raw/{folder_name}/{file_name}.h5" + tiff_scratch_path = f"{self.allocation_root}/data/scratch/{folder_name}/rec{file_name}/" + raw_path = f"{self.allocation_root}/raw/{folder_name}/{file_name}.h5" - iri_als_bl832_rundir = "/eagle/IRI-ALS-832/data/raw" - iri_als_bl832_conversion_script = "/eagle/IRI-ALS-832/scripts/tiff_to_zarr.py" + iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" + iri_als_bl832_conversion_script = f"{self.allocation_root}/scripts/tiff_to_zarr.py" gcc = Client(code_serialization_strategy=CombinedCode()) @@ -147,8 +154,8 @@ def build_multi_resolution( @staticmethod def _build_multi_resolution_wrapper( - rundir: str = "/eagle/IRI-ALS-832/data/raw", - script_path: str = "/eagle/IRI-ALS-832/scripts/tiff_to_zarr.py", + rundir: str = "/eagle/IRIProd/ALS/data/raw", + script_path: str = "/eagle/IRIProd/ALS/scripts/tiff_to_zarr.py", recon_path: str = None, raw_path: str = None ) -> str: @@ -461,7 +468,7 @@ def alcf_recon_flow( if __name__ == "__main__": folder_name = 'dabramov' - file_name = '20240425_104614_nist-sand-30-100_27keV_z8mm_n2625' + file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' flow_success = alcf_recon_flow( file_path=f"/{folder_name}/{file_name}.h5", config=Config832() diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index baed1e23..475fe821 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -21,8 +21,8 @@ class FlowParameterMapper: "file_path", "is_export_control", "config"], - # Placeholder parameters for NERSC reconstruction - "nersc_recon/nersc_recon": [ + # From nersc.py + "nersc_recon_flow/nersc_recon_flow": [ "file_path", "is_export_control", "config"] # Placeholder parameters for NERSC reconstruction @@ -71,7 +71,7 @@ def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: b # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, - "nersc_recon/nersc_recon": nersc_recon, # This is a placeholder for the NERSC reconstruction flow + "nersc_recon_flow/nersc_recon_flow": nersc_recon, "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows @@ -143,9 +143,9 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_specific_flow("alcf_recon_flow/alcf_recon_flow", alcf_params)) - if decision_settings.value.get("nersc_recon/nersc_recon"): - nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon/nersc_recon", available_params) - tasks.append(run_specific_flow("nersc_recon/nersc_recon", nersc_params)) + if decision_settings.value.get("nersc_recon_flow/nersc_recon_flow"): + nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) + tasks.append(run_specific_flow("nersc_recon_flow/nersc_recon_flow", nersc_params)) # Run ALCF and NERSC flows in parallel, if any if tasks: From 8fbaa65433be73e67dfb86ab24f22183a8443efd Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 8 Apr 2025 15:56:56 -0700 Subject: [PATCH 2/2] adding prefect block alcf-allocation-root-path to test_globus_flow.py pytest --- orchestration/_tests/test_globus_flow.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index dd411238..e0823b27 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -37,10 +37,14 @@ def prefect_test_fixture(): decision_settings = JSON(value={ "alcf_recon_flow/alcf_recon_flow": True, - "nersc_recon/nersc_recon": False, # This is a placeholder for the NERSC reconstruction flow + "nersc_recon_flow/nersc_recon_flow": False, "new_832_file_flow/new_file_832": False }) decision_settings.save(name="decision-settings") + + alcf_allocation_root = JSON(value={"alcf-allocation-root-path": "/eagle/IRIProd/ALS"}) + alcf_allocation_root.save(name="alcf-allocation-root-path") + yield