Skip to content
4 changes: 4 additions & 0 deletions docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ This document explains the changes made to Iris for this release
is hoped that a future ``libnetcdf`` release will recover the original
performance. See `netcdf-c#3183`_ for more details. (:pull:`6747`)

#. `@stephenworsley`_ reduced the memory load for regridding and other operations
using :func:`~iris._lazy_data.map_complete_blocks` when the output chunks would
exceed the optimum chunksize set in dask. (:pull:`6730`)


🔥 Deprecations
===============
Expand Down
42 changes: 38 additions & 4 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,14 @@ def map_complete_blocks(src, func, dims, out_sizes, dtype, *args, **kwargs):
--------
:func:`dask.array.map_blocks` : The function used for the mapping.

Notes
-----
.. note:

If the output chunks would larger than the maximum chunksize set
in the dask config, the input is rechunked, where possible to
optimise the output chunksize.

"""
data = None
result = None
Expand All @@ -640,17 +648,43 @@ def map_complete_blocks(src, func, dims, out_sizes, dtype, *args, **kwargs):
else:
data = src.lazy_data()

shape = list(src.shape)

if result is None and data is not None:
# Ensure dims are not chunked
in_chunks = list(data.chunks)
for dim in dims:
in_chunks[dim] = src.shape[dim]
data = data.rechunk(in_chunks)
in_chunks[dim] = (src.shape[dim],)

# Determine output chunks
out_chunks = list(data.chunks)
out_chunks = in_chunks.copy()
for dim, size in zip(dims, out_sizes):
out_chunks[dim] = size
out_chunks[dim] = (size,)
shape[dim] = size

# Ensure the chunksize of the output is a reasonable size.
max_outchunks = [max(chunk) for chunk in out_chunks]
df = [False] * len(max_outchunks)
for dim in dims:
df[dim] = True
df = tuple(df)
dtype = np.dtype(dtype)
opt_outchunks = _optimum_chunksize(
max_outchunks, shape, dtype=dtype, dims_fixed=df
)
for i, (chunk, max_out, opt_out) in enumerate(
zip(out_chunks, max_outchunks, opt_outchunks)
):
if opt_out < max_out:
new_chunks = []
for c in chunk:
new_chunks.extend((c // opt_out) * [opt_out])
if chunk_end := c % opt_out:
new_chunks.append(chunk_end)
in_chunks[i] = tuple(new_chunks)
out_chunks[i] = tuple(new_chunks)

data = data.rechunk(in_chunks)

# Assume operation preserves mask.
meta = da.utils.meta_from_array(data).astype(dtype)
Expand Down
22 changes: 22 additions & 0 deletions lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest.mock import Mock, PropertyMock

import dask.array as da
import dask.config
import numpy as np

from iris._lazy_data import is_lazy_data, map_complete_blocks
Expand Down Expand Up @@ -134,3 +135,24 @@ def test_multidimensional_input(self):
)
assert is_lazy_data(result)
assert_array_equal(result.compute(), array + 1)

def test_rechunking(self):
# Choose a dask array with an irregularly chunked dimension to be rechunked.
lazy_array = da.ones((5, 9, 10, 10), chunks=(2, 5, 10, 5))
cube, _ = create_mock_cube(lazy_array)

result = map_complete_blocks(
cube, self.func, dims=(2, 3), out_sizes=(30, 40), dtype=lazy_array.dtype
)
assert is_lazy_data(result)
# Reduce the optimum dask chunksize.
with dask.config.set({"array.chunk-size": "32KiB"}):
result = map_complete_blocks(
cube, self.func, dims=(2, 3), out_sizes=(30, 40), dtype=lazy_array.dtype
)
assert is_lazy_data(result)
expected_chunksize = (1, 2, 30, 40)
assert result.chunksize == expected_chunksize
# Note that one chunk is irregularly rechunked and the other isn't.
expected_chunk = (2, 2, 1, 2, 2)
assert result.chunks[1] == expected_chunk
Loading