Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bindings/src/icon4py/bindings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

NDArray: TypeAlias = np.ndarray | xp.ndarray

# TODO(havogt): import needed to register MultNodeRun in get_processor_properties, does the pattern make sense?
# TODO(havogt): import needed to register MultNodeRun in get_process_properties, does the pattern make sense?
assert hasattr(mpi_decomposition, "get_multinode_properties")

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -319,7 +319,7 @@ def construct_decomposition(
.set_dimension(dims.EdgeDim, e_glb_index, e_owner_mask, None)
.set_dimension(dims.VertexDim, v_glb_index, v_owner_mask, None)
)
processor_props = definitions.get_processor_properties(definitions.MultiNodeRun(), comm_id)
exchange = definitions.create_exchange(processor_props, decomposition_info)
process_props = definitions.get_process_properties(definitions.MultiNodeRun(), comm_id)
exchange = definitions.create_exchange(process_props, decomposition_info)

return processor_props, decomposition_info, exchange
return process_props, decomposition_info, exchange
30 changes: 15 additions & 15 deletions bindings/src/icon4py/bindings/debug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

def print_grid_decomp_info(
icon_grid: icon.IconGrid,
processor_props: definitions.ProcessProperties,
process_props: definitions.ProcessProperties,
decomposition_info: definitions.DecompositionInfo,
num_cells: int,
num_edges: int,
num_verts: int,
) -> None:
logger.info(
"icon_grid:cell_start for rank %s is.... %s",
processor_props.rank,
process_props.rank,
"\n".join(
[
f"{k!s} - {icon_grid.start_index(k)}"
Expand All @@ -38,14 +38,14 @@ def print_grid_decomp_info(
)
logger.info(
"icon_grid:cell_end for rank %s is.... %s",
processor_props.rank,
process_props.rank,
"\n".join(
[f"{k!s} - {icon_grid.end_index(k)}" for k in h_grid.get_domains_for_dim(dims.CellDim)]
),
)
logger.info(
"icon_grid:vert_start for rank %s is.... %s",
processor_props.rank,
process_props.rank,
"\n".join(
[
f"{k!s} - {icon_grid.start_index(k)}"
Expand All @@ -55,7 +55,7 @@ def print_grid_decomp_info(
)
logger.info(
"icon_grid:vert_end for rank %s is.... %s",
processor_props.rank,
process_props.rank,
"\n".join(
[
f"{k!s} - {icon_grid.end_index(k)}"
Expand All @@ -65,7 +65,7 @@ def print_grid_decomp_info(
)
logger.info(
"icon_grid:edge_start for rank %s is.... %s",
processor_props.rank,
process_props.rank,
"\n".join(
[
f"{k!s} - {icon_grid.start_index(k)}"
Expand All @@ -75,7 +75,7 @@ def print_grid_decomp_info(
)
logger.info(
"icon_grid:edge_end for rank %s is.... %s",
processor_props.rank,
process_props.rank,
"\n".join(
[f"{k!s} - {icon_grid.end_index(k)}" for k in h_grid.get_domains_for_dim(dims.EdgeDim)]
),
Expand All @@ -84,38 +84,38 @@ def print_grid_decomp_info(
for offset, connectivity in icon_grid.connectivities.items():
if gtx_common.is_neighbor_table(connectivity):
logger.debug(
f"icon_grid:{offset} for rank {processor_props.rank} is.... {connectivity.asnumpy()}"
f"icon_grid:{offset} for rank {process_props.rank} is.... {connectivity.asnumpy()}"
)

logger.info(
"c_glb_index for rank %s is.... %s",
processor_props.rank,
process_props.rank,
decomposition_info.global_index(dims.CellDim)[0:num_cells],
)
logger.info(
"e_glb_index for rank %s is.... %s",
processor_props.rank,
process_props.rank,
decomposition_info.global_index(dims.EdgeDim)[0:num_edges],
)
logger.info(
"v_glb_index for rank %s is.... %s",
processor_props.rank,
process_props.rank,
decomposition_info.global_index(dims.VertexDim)[0:num_verts],
)

logger.info(
"c_owner_mask for rank %s is.... %s",
processor_props.rank,
process_props.rank,
decomposition_info.owner_mask(dims.CellDim)[0:num_cells],
)
logger.info(
"e_owner_mask for rank %s is.... %s",
processor_props.rank,
process_props.rank,
decomposition_info.owner_mask(dims.EdgeDim)[0:num_edges],
)
logger.info(
"v_owner_mask for rank %s is.... %s",
processor_props.rank,
process_props.rank,
decomposition_info.owner_mask(dims.VertexDim)[0:num_verts],
)

Expand All @@ -125,5 +125,5 @@ def print_grid_decomp_info(
f"local vertices = {decomposition_info.global_index(dims.VertexDim, definitions.DecompositionInfo.EntryType.ALL).shape}"
)
logger.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: GHEX context setup: from {processor_props.comm_name} with {processor_props.comm_size} nodes"
f"rank={process_props.rank}/{process_props.comm_size}: GHEX context setup: from {process_props.comm_name} with {process_props.comm_size} nodes"
)
8 changes: 4 additions & 4 deletions bindings/src/icon4py/bindings/grid_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ def grid_init(
allocator = model_backends.get_allocator(actual_backend)

if comm_id is None:
processor_props = decomposition_defs.SingleNodeProcessProperties()
process_props = decomposition_defs.SingleNodeProcessProperties()
exchange_runtime = decomposition_defs.SingleNodeExchange()
else:
# Set MultiNodeExchange as exchange runtime
(
processor_props,
process_props,
decomposition_info,
exchange_runtime,
) = wrapper_common.construct_decomposition(
Expand Down Expand Up @@ -171,14 +171,14 @@ def grid_init(
num_edges=num_edges,
vertical_size=vertical_size,
limited_area=limited_area,
distributed=not processor_props.is_single_rank(),
distributed=not process_props.is_single_rank(),
allocator=allocator,
)

if comm_id is not None:
wrapper_debug_utils.print_grid_decomp_info(
grid,
processor_props,
process_props,
decomposition_info,
num_cells,
num_edges,
Expand Down
4 changes: 2 additions & 2 deletions bindings/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
metrics_savepoint,
model_top_height,
ndyn_substeps,
processor_props,
process_props,
savepoint_diffusion_exit,
savepoint_diffusion_init,
savepoint_nonhydro_exit,
Expand Down Expand Up @@ -62,7 +62,7 @@
"metrics_savepoint",
"model_top_height",
"ndyn_substeps",
"processor_props",
"process_props",
"savepoint_diffusion_exit",
"savepoint_diffusion_init",
"savepoint_nonhydro_exit",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
icon_grid,
interpolation_savepoint,
metrics_savepoint,
processor_props,
process_props,
)

from ..fixtures import advection_exit_savepoint, advection_init_savepoint
Expand Down
2 changes: 1 addition & 1 deletion model/atmosphere/diffusion/tests/diffusion/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
metrics_savepoint,
model_top_height,
ndyn_substeps,
processor_props,
process_props,
rayleigh_coeff,
rayleigh_type,
savepoint_diffusion_exit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@
)
@pytest.mark.parametrize("ndyn_substeps", [2])
@pytest.mark.parametrize("orchestration", [False])
@pytest.mark.parametrize("processor_props", [True], indirect=True)
@pytest.mark.parametrize("process_props", [True], indirect=True)
def test_parallel_diffusion(
experiment: definitions.Experiment,
step_date_init: str,
step_date_exit: str,
linit: bool,
ndyn_substeps: int,
processor_props: decomposition.ProcessProperties,
process_props: decomposition.ProcessProperties,
decomposition_info: decomposition.DecompositionInfo,
icon_grid: icon.IconGrid,
savepoint_diffusion_init: serialbox.IconDiffusionInitSavepoint,
Expand All @@ -69,26 +69,26 @@ def test_parallel_diffusion(
if orchestration and not test_utils.is_dace(backend):
raise pytest.skip("This test is only executed for `dace` backends.")
caplog.set_level("INFO")
parallel_helpers.check_comm_size(processor_props)
parallel_helpers.check_comm_size(process_props)
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: initializing diffusion for experiment '{definitions.Experiments.MCH_CH_R04B09}'"
f"rank={process_props.rank}/{process_props.comm_size}: initializing diffusion for experiment '{definitions.Experiments.MCH_CH_R04B09}'"
)
_log.info(
f"local cells = {decomposition_info.global_index(dims.CellDim, decomposition.DecompositionInfo.EntryType.ALL).shape} "
f"local edges = {decomposition_info.global_index(dims.EdgeDim, decomposition.DecompositionInfo.EntryType.ALL).shape} "
f"local vertices = {decomposition_info.global_index(dims.VertexDim, decomposition.DecompositionInfo.EntryType.ALL).shape}"
)
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: GHEX context setup: from {processor_props.comm_name} with {processor_props.comm_size} nodes"
f"rank={process_props.rank}/{process_props.comm_size}: GHEX context setup: from {process_props.comm_name} with {process_props.comm_size} nodes"
)

_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: using local grid with {icon_grid.num_cells} Cells, {icon_grid.num_edges} Edges, {icon_grid.num_vertices} Vertices"
f"rank={process_props.rank}/{process_props.comm_size}: using local grid with {icon_grid.num_cells} Cells, {icon_grid.num_edges} Edges, {icon_grid.num_vertices} Vertices"
)
config = definitions.construct_diffusion_config(experiment, ndyn_substeps=ndyn_substeps)
dtime = savepoint_diffusion_init.get_metadata("dtime").get("dtime")
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: setup: using {processor_props.comm_name} with {processor_props.comm_size} nodes"
f"rank={process_props.rank}/{process_props.comm_size}: setup: using {process_props.comm_name} with {process_props.comm_size} nodes"
)
vertical_config = v_grid.VerticalGridConfig(
icon_grid.num_levels,
Expand All @@ -101,7 +101,7 @@ def test_parallel_diffusion(
diffusion_params = diffusion_.DiffusionParams(config)
cell_geometry = grid_savepoint.construct_cell_geometry()
edge_geometry = grid_savepoint.construct_edge_geometry()
exchange = decomposition.create_exchange(processor_props, decomposition_info)
exchange = decomposition.create_exchange(process_props, decomposition_info)
diffusion = diffusion_.Diffusion(
grid=icon_grid,
config=config,
Expand All @@ -120,7 +120,7 @@ def test_parallel_diffusion(
orchestration=orchestration,
)

_log.info(f"rank={processor_props.rank}/{processor_props.comm_size}: diffusion initialized ")
_log.info(f"rank={process_props.rank}/{process_props.comm_size}: diffusion initialized ")

diagnostic_state = diffusion_states.DiffusionDiagnosticState(
hdef_ic=savepoint_diffusion_init.hdef_ic(),
Expand All @@ -142,7 +142,7 @@ def test_parallel_diffusion(
prognostic_state=prognostic_state,
dtime=dtime,
)
_log.info(f"rank={processor_props.rank}/{processor_props.comm_size}: diffusion run ")
_log.info(f"rank={process_props.rank}/{process_props.comm_size}: diffusion run ")

utils.verify_diffusion_fields(
config=config,
Expand All @@ -151,7 +151,7 @@ def test_parallel_diffusion(
diffusion_savepoint=savepoint_diffusion_exit,
)
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: running diffusion step - using {processor_props.comm_name} with {processor_props.comm_size} nodes - DONE"
f"rank={process_props.rank}/{process_props.comm_size}: running diffusion step - using {process_props.comm_name} with {process_props.comm_size} nodes - DONE"
)


Expand All @@ -169,14 +169,14 @@ def test_parallel_diffusion(
],
)
@pytest.mark.parametrize("ndyn_substeps", [2])
@pytest.mark.parametrize("processor_props", [True], indirect=True)
@pytest.mark.parametrize("process_props", [True], indirect=True)
def test_parallel_diffusion_multiple_steps(
experiment: definitions.Experiment,
step_date_init: str,
step_date_exit: str,
linit: bool,
ndyn_substeps: int,
processor_props: decomposition.ProcessProperties,
process_props: decomposition.ProcessProperties,
decomposition_info: decomposition.DecompositionInfo,
icon_grid: icon.IconGrid,
savepoint_diffusion_init: serialbox.IconDiffusionInitSavepoint,
Expand All @@ -196,21 +196,21 @@ def test_parallel_diffusion_multiple_steps(
# Diffusion initialization
######################################################################
caplog.set_level("INFO")
parallel_helpers.check_comm_size(processor_props)
parallel_helpers.check_comm_size(process_props)
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: initializing diffusion for experiment '{definitions.Experiments.MCH_CH_R04B09}'"
f"rank={process_props.rank}/{process_props.comm_size}: initializing diffusion for experiment '{definitions.Experiments.MCH_CH_R04B09}'"
)
_log.info(
f"local cells = {decomposition_info.global_index(dims.CellDim, decomposition.DecompositionInfo.EntryType.ALL).shape} "
f"local edges = {decomposition_info.global_index(dims.EdgeDim, decomposition.DecompositionInfo.EntryType.ALL).shape} "
f"local vertices = {decomposition_info.global_index(dims.VertexDim, decomposition.DecompositionInfo.EntryType.ALL).shape}"
)
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: GHEX context setup: from {processor_props.comm_name} with {processor_props.comm_size} nodes"
f"rank={process_props.rank}/{process_props.comm_size}: GHEX context setup: from {process_props.comm_name} with {process_props.comm_size} nodes"
)

_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: using local grid with {icon_grid.num_cells} Cells, {icon_grid.num_edges} Edges, {icon_grid.num_vertices} Vertices"
f"rank={process_props.rank}/{process_props.comm_size}: using local grid with {icon_grid.num_cells} Cells, {icon_grid.num_edges} Edges, {icon_grid.num_vertices} Vertices"
)
cell_geometry = grid_savepoint.construct_cell_geometry()
edge_geometry = grid_savepoint.construct_edge_geometry()
Expand All @@ -226,9 +226,9 @@ def test_parallel_diffusion_multiple_steps(
diffusion_params = diffusion_.DiffusionParams(config)
dtime = savepoint_diffusion_init.get_metadata("dtime").get("dtime")
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: setup: using {processor_props.comm_name} with {processor_props.comm_size} nodes"
f"rank={process_props.rank}/{process_props.comm_size}: setup: using {process_props.comm_name} with {process_props.comm_size} nodes"
)
exchange = decomposition.create_exchange(processor_props, decomposition_info)
exchange = decomposition.create_exchange(process_props, decomposition_info)

######################################################################
# DaCe NON-Orchestrated Backend
Expand All @@ -252,7 +252,7 @@ def test_parallel_diffusion_multiple_steps(
orchestration=False,
)

_log.info(f"rank={processor_props.rank}/{processor_props.comm_size}: diffusion initialized ")
_log.info(f"rank={process_props.rank}/{process_props.comm_size}: diffusion initialized ")

diagnostic_state_dace_non_orch = diffusion_states.DiffusionDiagnosticState(
hdef_ic=savepoint_diffusion_init.hdef_ic(),
Expand All @@ -276,16 +276,16 @@ def test_parallel_diffusion_multiple_steps(
prognostic_state=prognostic_state_dace_non_orch,
dtime=dtime,
)
_log.info(f"rank={processor_props.rank}/{processor_props.comm_size}: diffusion run ")
_log.info(f"rank={process_props.rank}/{process_props.comm_size}: diffusion run ")
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: running diffusion step - using {processor_props.comm_name} with {processor_props.comm_size} nodes - DONE"
f"rank={process_props.rank}/{process_props.comm_size}: running diffusion step - using {process_props.comm_name} with {process_props.comm_size} nodes - DONE"
)

######################################################################
# DaCe Orchestrated Backend
######################################################################

exchange = decomposition.create_exchange(processor_props, decomposition_info)
exchange = decomposition.create_exchange(process_props, decomposition_info)
diffusion = diffusion_.Diffusion(
grid=icon_grid,
config=config,
Expand All @@ -303,7 +303,7 @@ def test_parallel_diffusion_multiple_steps(
backend=backend,
orchestration=True,
)
_log.info(f"rank={processor_props.rank}/{processor_props.comm_size}: diffusion initialized ")
_log.info(f"rank={process_props.rank}/{process_props.comm_size}: diffusion initialized ")

diagnostic_state_dace_orch = diffusion_states.DiffusionDiagnosticState(
hdef_ic=savepoint_diffusion_init.hdef_ic(),
Expand All @@ -327,9 +327,9 @@ def test_parallel_diffusion_multiple_steps(
prognostic_state=prognostic_state_dace_orch,
dtime=dtime,
)
_log.info(f"rank={processor_props.rank}/{processor_props.comm_size}: diffusion run ")
_log.info(f"rank={process_props.rank}/{process_props.comm_size}: diffusion run ")
_log.info(
f"rank={processor_props.rank}/{processor_props.comm_size}: running diffusion step - using {processor_props.comm_name} with {processor_props.comm_size} nodes - DONE"
f"rank={process_props.rank}/{process_props.comm_size}: running diffusion step - using {process_props.comm_name} with {process_props.comm_size} nodes - DONE"
)

######################################################################
Expand Down
Loading
Loading