Skip to content

Commit a1ec72e

Browse files
authored
NERSC SFAPI Reconstruction Flow (#44)
1 parent 121aef5 commit a1ec72e

15 files changed

Lines changed: 1753 additions & 22 deletions

config.yml

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,6 @@ globus:
4848
uuid: 9032dd3a-e841-4687-a163-2720da731b5b
4949
name: alcf_home832
5050

51-
nersc_test:
52-
root_path: /global/cfs/cdirs/als/data_mover/share/dabramov
53-
uri: nersc.gov
54-
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
55-
name: nersc_test
56-
5751
nersc_alsdev:
5852
root_path: /global/homes/a/alsdev/test_directory/
5953
uri: nersc.gov
@@ -72,6 +66,24 @@ globus:
7266
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
7367
name: nersc832_alsdev_scratch
7468

69+
nersc832_alsdev_pscratch_raw:
70+
root_path: /pscratch/sd/a/alsdev/8.3.2/raw
71+
uri: nersc.gov
72+
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
73+
name: nersc832_alsdev_pscratch_raw
74+
75+
nersc832_alsdev_pscratch_scratch:
76+
root_path: /pscratch/sd/a/alsdev/8.3.2/scratch
77+
uri: nersc.gov
78+
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
79+
name: nersc832_alsdev_pscratch_scratch
80+
81+
nersc832_alsdev_recon_scripts:
82+
root_path: /global/cfs/cdirs/als/data_mover/8.3.2/tomography_reconstruction_scripts
83+
uri: nersc.gov
84+
uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58
85+
name: nersc832_alsdev_recon_scripts
86+
7587
nersc832:
7688
root_path: /global/cfs/cdirs/als/data_mover/8.3.2
7789
uri: nersc.gov
@@ -95,6 +107,14 @@ globus:
95107
client_id: ${GLOBUS_CLIENT_ID}
96108
client_secret: ${GLOBUS_CLIENT_SECRET}
97109

110+
harbor_images832:
111+
recon_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c
112+
multires_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c
113+
114+
ghcr_images832:
115+
recon_image: ghcr.io/als-computing/microct:master
116+
multires_image: ghcr.io/als-computing/microct:master
117+
98118
prefect:
99119
deployments:
100120
- type_spec: new_file_832

create_deployments_832_nersc.sh

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
export $(grep -v '^#' .env | xargs)
2+
3+
# create 'nersc_flow_pool'
4+
prefect work-pool create 'nersc_flow_pool'
5+
prefect work-pool create 'nersc_prune_pool'
6+
7+
# nersc_flow_pool
8+
# in docker-compose.yaml:
9+
# command: prefect agent start --pool "nersc_flow_pool"
10+
prefect deployment build ./orchestration/flows/bl832/nersc.py:nersc_recon_flow -n nersc_recon_flow -p nersc_flow_pool -q nersc_recon_flow_queue
11+
prefect deployment apply nersc_recon_flow-deployment.yaml
12+
13+
# nersc_prune_pool
14+
# in docker-compose.yaml:
15+
# command: prefect agent start --pool "nersc_prune_pool"
16+
prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_raw -n prune_nersc832_alsdev_pscratch_raw -p nersc_prune_pool -q prune_nersc832_pscratch_queue
17+
prefect deployment apply prune_nersc832_alsdev_pscratch_raw-deployment.yaml
18+
19+
prefect deployment build ./orchestration/flows/bl832/prune.py:prune_nersc832_alsdev_pscratch_scratch -n prune_nersc832_alsdev_pscratch_scratch -p nersc_prune_pool -q prune_nersc832_pscratch_queue
20+
prefect deployment apply prune_nersc832_alsdev_pscratch_scratch-deployment.yaml
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
# orchestration/_tests/test_sfapi_flow.py
2+
3+
from pathlib import Path
4+
import pytest
5+
from unittest.mock import MagicMock, patch, mock_open
6+
from uuid import uuid4
7+
8+
from prefect.blocks.system import Secret
9+
from prefect.testing.utilities import prefect_test_harness
10+
11+
12+
@pytest.fixture(autouse=True, scope="session")
13+
def prefect_test_fixture():
14+
"""
15+
A pytest fixture that automatically sets up and tears down the Prefect test harness
16+
for the entire test session. It creates and saves test secrets and configurations
17+
required for Globus integration.
18+
19+
Yields:
20+
None
21+
"""
22+
with prefect_test_harness():
23+
globus_client_id = Secret(value=str(uuid4()))
24+
globus_client_id.save(name="globus-client-id")
25+
globus_client_secret = Secret(value=str(uuid4()))
26+
globus_client_secret.save(name="globus-client-secret")
27+
28+
yield
29+
30+
31+
# ----------------------------
32+
# Tests for create_sfapi_client
33+
# ----------------------------
34+
35+
36+
def test_create_sfapi_client_success():
37+
"""
38+
Test successful creation of the SFAPI client.
39+
"""
40+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
41+
42+
# Mock data for client_id and client_secret files
43+
mock_client_id = 'value'
44+
mock_client_secret = '{"key": "value"}'
45+
46+
# Create separate mock_open instances for each file
47+
mock_open_client_id = mock_open(read_data=mock_client_id)
48+
mock_open_client_secret = mock_open(read_data=mock_client_secret)
49+
50+
with patch("orchestration.flows.bl832.nersc.os.getenv") as mock_getenv, \
51+
patch("orchestration.flows.bl832.nersc.os.path.isfile") as mock_isfile, \
52+
patch("builtins.open", side_effect=[
53+
mock_open_client_id.return_value,
54+
mock_open_client_secret.return_value
55+
]), \
56+
patch("orchestration.flows.bl832.nersc.JsonWebKey.import_key") as mock_import_key, \
57+
patch("orchestration.flows.bl832.nersc.Client") as MockClient:
58+
59+
# Mock environment variables
60+
mock_getenv.side_effect = lambda x: {
61+
"PATH_NERSC_CLIENT_ID": "/path/to/client_id",
62+
"PATH_NERSC_PRI_KEY": "/path/to/client_secret"
63+
}.get(x, None)
64+
65+
# Mock file existence
66+
mock_isfile.return_value = True
67+
68+
# Mock JsonWebKey.import_key to return a mock secret
69+
mock_import_key.return_value = "mock_secret"
70+
71+
# Create the client
72+
client = NERSCTomographyHPCController.create_sfapi_client()
73+
74+
# Assert that Client was instantiated with 'value' and 'mock_secret'
75+
MockClient.assert_called_once_with("value", "mock_secret")
76+
77+
# Assert that the returned client is the mocked client
78+
assert client == MockClient.return_value, "Client should be the mocked sfapi_client.Client instance"
79+
80+
81+
def test_create_sfapi_client_missing_paths():
82+
"""
83+
Test creation of the SFAPI client with missing credential paths.
84+
"""
85+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
86+
87+
with patch("orchestration.flows.bl832.nersc.os.getenv", return_value=None):
88+
with pytest.raises(ValueError, match="Missing NERSC credentials paths."):
89+
NERSCTomographyHPCController.create_sfapi_client()
90+
91+
92+
def test_create_sfapi_client_missing_files():
93+
"""
94+
Test creation of the SFAPI client with missing credential files.
95+
"""
96+
with (
97+
# Mock environment variables
98+
patch(
99+
"orchestration.flows.bl832.nersc.os.getenv",
100+
side_effect=lambda x: {
101+
"PATH_NERSC_CLIENT_ID": "/path/to/client_id",
102+
"PATH_NERSC_PRI_KEY": "/path/to/client_secret"
103+
}.get(x, None)
104+
),
105+
106+
# Mock file existence to simulate missing files
107+
patch("orchestration.flows.bl832.nersc.os.path.isfile", return_value=False)
108+
):
109+
# Import the module after applying patches to ensure mocks are in place
110+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
111+
112+
# Expect a FileNotFoundError due to missing credential files
113+
with pytest.raises(FileNotFoundError, match="NERSC credential files are missing."):
114+
NERSCTomographyHPCController.create_sfapi_client()
115+
116+
# ----------------------------
117+
# Fixture for Mocking SFAPI Client
118+
# ----------------------------
119+
120+
121+
@pytest.fixture
122+
def mock_sfapi_client():
123+
"""
124+
Mock the sfapi_client.Client class with necessary methods.
125+
"""
126+
with patch("orchestration.flows.bl832.nersc.Client") as MockClient:
127+
mock_client_instance = MockClient.return_value
128+
129+
# Mock the user method
130+
mock_user = MagicMock()
131+
mock_user.name = "testuser"
132+
mock_client_instance.user.return_value = mock_user
133+
134+
# Mock the compute method to return a mocked compute object
135+
mock_compute = MagicMock()
136+
mock_job = MagicMock()
137+
mock_job.jobid = "12345"
138+
mock_job.state = "COMPLETED"
139+
mock_compute.submit_job.return_value = mock_job
140+
mock_client_instance.compute.return_value = mock_compute
141+
142+
yield mock_client_instance
143+
144+
145+
# ----------------------------
146+
# Fixture for Mocking Config832
147+
# ----------------------------
148+
149+
@pytest.fixture
150+
def mock_config832():
151+
"""
152+
Mock the Config832 class to provide necessary configurations.
153+
"""
154+
with patch("orchestration.flows.bl832.nersc.Config832") as MockConfig:
155+
mock_config = MockConfig.return_value
156+
mock_config.harbor_images832 = {
157+
"recon_image": "mock_recon_image",
158+
"multires_image": "mock_multires_image",
159+
}
160+
mock_config.apps = {"als_transfer": "some_config"}
161+
yield mock_config
162+
163+
164+
# ----------------------------
165+
# Tests for NERSCTomographyHPCController
166+
# ----------------------------
167+
168+
def test_reconstruct_success(mock_sfapi_client, mock_config832):
169+
"""
170+
Test successful reconstruction job submission.
171+
"""
172+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
173+
from sfapi_client.compute import Machine
174+
175+
controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832)
176+
file_path = "path/to/file.h5"
177+
178+
with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None):
179+
result = controller.reconstruct(file_path=file_path)
180+
181+
# Verify that compute was called with Machine.perlmutter
182+
mock_sfapi_client.compute.assert_called_once_with(Machine.perlmutter)
183+
184+
# Verify that submit_job was called once
185+
mock_sfapi_client.compute.return_value.submit_job.assert_called_once()
186+
187+
# Verify that complete was called on the job
188+
mock_sfapi_client.compute.return_value.submit_job.return_value.complete.assert_called_once()
189+
190+
# Assert that the method returns True
191+
assert result is True, "reconstruct should return True on successful job completion."
192+
193+
194+
def test_reconstruct_submission_failure(mock_sfapi_client, mock_config832):
195+
"""
196+
Test reconstruction job submission failure.
197+
"""
198+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
199+
200+
controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832)
201+
file_path = "path/to/file.h5"
202+
203+
# Simulate submission failure
204+
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("Submission failed")
205+
206+
with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None):
207+
result = controller.reconstruct(file_path=file_path)
208+
209+
# Assert that the method returns False
210+
assert result is False, "reconstruct should return False on submission failure."
211+
212+
213+
def test_build_multi_resolution_success(mock_sfapi_client, mock_config832):
214+
"""
215+
Test successful multi-resolution job submission.
216+
"""
217+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
218+
from sfapi_client.compute import Machine
219+
220+
controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832)
221+
file_path = "path/to/file.h5"
222+
223+
with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None):
224+
result = controller.build_multi_resolution(file_path=file_path)
225+
226+
# Verify that compute was called with Machine.perlmutter
227+
mock_sfapi_client.compute.assert_called_once_with(Machine.perlmutter)
228+
229+
# Verify that submit_job was called once
230+
mock_sfapi_client.compute.return_value.submit_job.assert_called_once()
231+
232+
# Verify that complete was called on the job
233+
mock_sfapi_client.compute.return_value.submit_job.return_value.complete.assert_called_once()
234+
235+
# Assert that the method returns True
236+
assert result is True, "build_multi_resolution should return True on successful job completion."
237+
238+
239+
def test_build_multi_resolution_submission_failure(mock_sfapi_client, mock_config832):
240+
"""
241+
Test multi-resolution job submission failure.
242+
"""
243+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
244+
245+
controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832)
246+
file_path = "path/to/file.h5"
247+
248+
# Simulate submission failure
249+
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("Submission failed")
250+
251+
with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None):
252+
result = controller.build_multi_resolution(file_path=file_path)
253+
254+
# Assert that the method returns False
255+
assert result is False, "build_multi_resolution should return False on submission failure."
256+
257+
258+
def test_job_submission(mock_sfapi_client):
259+
"""
260+
Test job submission and status updates.
261+
"""
262+
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
263+
from sfapi_client.compute import Machine
264+
265+
controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=MagicMock())
266+
file_path = "path/to/file.h5"
267+
268+
# Mock Path to extract file and folder names
269+
with patch.object(Path, 'parent', new_callable=MagicMock) as mock_parent, \
270+
patch.object(Path, 'stem', new_callable=MagicMock) as mock_stem:
271+
mock_parent.name = "to"
272+
mock_stem.return_value = "file"
273+
274+
with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None):
275+
controller.reconstruct(file_path=file_path)
276+
277+
# Verify that compute was called with Machine.perlmutter
278+
mock_sfapi_client.compute.assert_called_once_with(Machine.perlmutter)
279+
280+
# Verify that submit_job was called once
281+
mock_sfapi_client.compute.return_value.submit_job.assert_called_once()
282+
283+
# Verify the returned job has the expected attributes
284+
submitted_job = mock_sfapi_client.compute.return_value.submit_job.return_value
285+
assert submitted_job.jobid == "12345", "Job ID should match the mock job ID."
286+
assert submitted_job.state == "COMPLETED", "Job state should be COMPLETED."

0 commit comments

Comments
 (0)