Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ globus:
alcf832_raw:
root_path: /data/raw
uri: alcf.anl.gov
uuid: a89365da-e614-48fe-9ee5-f5e1698a68c8 # 55c3adf6-31f1-4647-9a38-52591642f7e7
uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110
name: alcf_raw

alcf832_scratch:
root_path: /data/scratch
uri: alcf.anl.gov
uuid: a89365da-e614-48fe-9ee5-f5e1698a68c8 # 55c3adf6-31f1-4647-9a38-52591642f7e7
uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110
name: alcf_scratch

alcf_eagle832:
Expand Down
6 changes: 5 additions & 1 deletion orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ def prefect_test_fixture():

decision_settings = JSON(value={
"alcf_recon_flow/alcf_recon_flow": True,
"nersc_recon/nersc_recon": False, # This is a placeholder for the NERSC reconstruction flow
"nersc_recon_flow/nersc_recon_flow": False,
"new_832_file_flow/new_file_832": False
})
decision_settings.save(name="decision-settings")

alcf_allocation_root = JSON(value={"alcf-allocation-root-path": "/eagle/IRIProd/ALS"})
alcf_allocation_root.save(name="alcf-allocation-root-path")

yield


Expand Down
29 changes: 18 additions & 11 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ def __init__(
config: Config832
) -> None:
super().__init__(config)
# Load allocation root from the Prefect JSON block
# The block must be registered with the name "alcf-allocation-root-path"
allocation_data = JSON.load("alcf-allocation-root-path").value
self.allocation_root = allocation_data.get("alcf-allocation-root-path")
if not self.allocation_root:
raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'")
logger.info(f"Allocation root loaded: {self.allocation_root}")

def reconstruct(
self,
Expand All @@ -51,8 +58,8 @@ def reconstruct(
file_name = Path(file_path).stem + ".h5"
folder_name = Path(file_path).parent.name

iri_als_bl832_rundir = "/eagle/IRI-ALS-832/data/raw"
iri_als_bl832_recon_script = "/eagle/IRI-ALS-832/scripts/globus_reconstruction.py"
iri_als_bl832_rundir = f"{self.allocation_root}/data/raw"
iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py"

gcc = Client(code_serialization_strategy=CombinedCode())

Expand All @@ -70,8 +77,8 @@ def reconstruct(

@staticmethod
def _reconstruct_wrapper(
rundir: str = "/eagle/IRI-ALS-832/data/raw",
script_path: str = "/eagle/IRI-ALS-832/scripts/globus_reconstruction.py",
rundir: str = "/eagle/IRIProd/ALS/data/raw",
script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py",
h5_file_name: str = None,
folder_path: str = None
) -> str:
Expand Down Expand Up @@ -125,11 +132,11 @@ def build_multi_resolution(
file_name = Path(file_path).stem
folder_name = Path(file_path).parent.name

tiff_scratch_path = f"/eagle/IRI-ALS-832/data/scratch/{folder_name}/rec{file_name}/"
raw_path = f"/eagle/IRI-ALS-832/data/raw/{folder_name}/{file_name}.h5"
tiff_scratch_path = f"{self.allocation_root}/data/scratch/{folder_name}/rec{file_name}/"
raw_path = f"{self.allocation_root}/raw/{folder_name}/{file_name}.h5"

iri_als_bl832_rundir = "/eagle/IRI-ALS-832/data/raw"
iri_als_bl832_conversion_script = "/eagle/IRI-ALS-832/scripts/tiff_to_zarr.py"
iri_als_bl832_rundir = f"{self.allocation_root}/data/raw"
iri_als_bl832_conversion_script = f"{self.allocation_root}/scripts/tiff_to_zarr.py"

gcc = Client(code_serialization_strategy=CombinedCode())

Expand All @@ -147,8 +154,8 @@ def build_multi_resolution(

@staticmethod
def _build_multi_resolution_wrapper(
rundir: str = "/eagle/IRI-ALS-832/data/raw",
script_path: str = "/eagle/IRI-ALS-832/scripts/tiff_to_zarr.py",
rundir: str = "/eagle/IRIProd/ALS/data/raw",
script_path: str = "/eagle/IRIProd/ALS/scripts/tiff_to_zarr.py",
recon_path: str = None,
raw_path: str = None
) -> str:
Expand Down Expand Up @@ -461,7 +468,7 @@ def alcf_recon_flow(

if __name__ == "__main__":
folder_name = 'dabramov'
file_name = '20240425_104614_nist-sand-30-100_27keV_z8mm_n2625'
file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast'
flow_success = alcf_recon_flow(
file_path=f"/{folder_name}/{file_name}.h5",
config=Config832()
Expand Down
12 changes: 6 additions & 6 deletions orchestration/flows/bl832/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class FlowParameterMapper:
"file_path",
"is_export_control",
"config"],
# Placeholder parameters for NERSC reconstruction
"nersc_recon/nersc_recon": [
# From nersc.py
"nersc_recon_flow/nersc_recon_flow": [
"file_path",
"is_export_control",
"config"] # Placeholder parameters for NERSC reconstruction
Expand Down Expand Up @@ -71,7 +71,7 @@ def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: b
# Define which flows to run based on the input settings
settings = {
"alcf_recon_flow/alcf_recon_flow": alcf_recon,
"nersc_recon/nersc_recon": nersc_recon, # This is a placeholder for the NERSC reconstruction flow
"nersc_recon_flow/nersc_recon_flow": nersc_recon,
"new_832_file_flow/new_file_832": new_file_832
}
# Save the settings in a JSON block for later retrieval by other flows
Expand Down Expand Up @@ -143,9 +143,9 @@ async def dispatcher(
alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params)
tasks.append(run_specific_flow("alcf_recon_flow/alcf_recon_flow", alcf_params))

if decision_settings.value.get("nersc_recon/nersc_recon"):
nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon/nersc_recon", available_params)
tasks.append(run_specific_flow("nersc_recon/nersc_recon", nersc_params))
if decision_settings.value.get("nersc_recon_flow/nersc_recon_flow"):
nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params)
tasks.append(run_specific_flow("nersc_recon_flow/nersc_recon_flow", nersc_params))

# Run ALCF and NERSC flows in parallel, if any
if tasks:
Expand Down