From eb1a967a92501d72af643f3433d6260e767c111e Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Sat, 12 Jul 2025 17:59:34 -0700 Subject: [PATCH 1/7] All works, just need to satisfy mypy and whatnot now --- xarray/namedarray/daskmanager.py | 54 ++++++++++++++++++++++++++++++++ xarray/tests/test_dask.py | 28 +++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index 6485ba375f5..3ef0229356b 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -4,8 +4,10 @@ from typing import TYPE_CHECKING, Any import numpy as np +import dask from xarray.core.indexing import ImplicitToExplicitIndexingAdapter +from xarray.core.common import _contains_cftime_datetimes from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint, T_ChunkedArray from xarray.namedarray.utils import is_duck_dask_array, module_available @@ -16,6 +18,7 @@ _NormalizedChunks, duckarray, ) + from xarray.namedarray.parallelcompat import _Chunks try: from dask.array import Array as DaskArray @@ -264,3 +267,54 @@ def shuffle( if chunks != "auto": raise NotImplementedError("Only chunks='auto' is supported at present.") return dask.array.shuffle(x, indexer, axis, chunks="auto") + + def rechunk( # type: ignore[override] + self, + data: T_ChunkedArray, + chunks: _NormalizedChunks | tuple[int, ...] | _Chunks, + **kwargs: Any, + ) -> Any: + """ + Changes the chunking pattern of the given array. + + Called when the .chunk method is called on an xarray object that is already chunked. + + Parameters + ---------- + data : dask array + Array to be rechunked. + chunks : int, tuple, dict or str, optional + The new block dimensions to create. -1 indicates the full size of the + corresponding dimension. Default is "auto" which automatically + determines chunk sizes. + + Returns + ------- + chunked array + + See Also + -------- + dask.array.Array.rechunk + cubed.Array.rechunk + """ + + if _contains_cftime_datetimes(data): + # Preprocess chunks if they're cftime + cftime_nbytes_approx = 64 + from dask.utils import parse_bytes + target_chunksize = parse_bytes(dask.config.get("array.chunk-size")) + + # Calculate total elements per chunk + elements_per_chunk = target_chunksize // cftime_nbytes_approx + + # Distribute elements across dimensions + # Simple approach: try to make chunks roughly cubic + ndim = data.ndim # type:ignore + shape = data.shape # type:ignore + if ndim > 0: + chunk_size_per_dim = int(elements_per_chunk ** (1.0 / ndim)) + chunks = tuple(min(chunk_size_per_dim, dim_size) for dim_size in shape) + else: + chunks = () + + return data.rechunk(chunks, **kwargs) diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 9024f2ae677..7f67769fd7c 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -1059,6 +1059,27 @@ def make_da(): return da +def make_da_cftime(): + yrs = np.arange(2000,2120) + cftime_dates = xr.date_range( + start=f"{yrs[0]}-01-01", + end=f"{yrs[-1]}-12-31", + freq="1YE", + use_cftime=True, + ) + yr_array = np.tile(cftime_dates.values, (10, 1)) + da = xr.DataArray( + yr_array, + dims=["x", "t"], + coords={"x": np.arange(10), "t": cftime_dates}, + name="a", + ).chunk({"x": 4, "t": 5}) + da.x.attrs["long_name"] = "x" + da.attrs["test"] = "test" + da.coords["c2"] = 0.5 + da.coords["ndcoord"] = da.x * 2 + + return da def make_ds(): map_ds = xr.Dataset() @@ -1140,6 +1161,13 @@ def test_auto_chunk_da(obj): np.testing.assert_array_equal(actual, expected) assert actual.chunks == expected.chunks +@pytest.mark.parametrize("obj", [make_da_cftime()]) +def test_auto_chunk_da_cftime(obj): + actual = obj.chunk("auto").data + expected = obj.data.rechunk({0: 10, 1: 120}) + np.testing.assert_array_equal(actual, expected) + assert actual.chunks == expected.chunks + def test_map_blocks_error(map_da, map_ds): def bad_func(darray): From 852476d127df64540298369e40ae20ce2f89aa2d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 13 Jul 2025 01:10:13 +0000 Subject: [PATCH 2/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/namedarray/daskmanager.py | 25 +++++++++++++------------ xarray/tests/test_dask.py | 7 +++++-- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index 3ef0229356b..945bcf077dd 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -3,11 +3,11 @@ from collections.abc import Callable, Iterable, Sequence from typing import TYPE_CHECKING, Any -import numpy as np import dask +import numpy as np -from xarray.core.indexing import ImplicitToExplicitIndexingAdapter from xarray.core.common import _contains_cftime_datetimes +from xarray.core.indexing import ImplicitToExplicitIndexingAdapter from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint, T_ChunkedArray from xarray.namedarray.utils import is_duck_dask_array, module_available @@ -268,12 +268,12 @@ def shuffle( raise NotImplementedError("Only chunks='auto' is supported at present.") return dask.array.shuffle(x, indexer, axis, chunks="auto") - def rechunk( # type: ignore[override] - self, - data: T_ChunkedArray, - chunks: _NormalizedChunks | tuple[int, ...] | _Chunks, - **kwargs: Any, - ) -> Any: + def rechunk( # type: ignore[override] + self, + data: T_ChunkedArray, + chunks: _NormalizedChunks | tuple[int, ...] | _Chunks, + **kwargs: Any, + ) -> Any: """ Changes the chunking pattern of the given array. @@ -302,15 +302,16 @@ def rechunk( # type: ignore[override] # Preprocess chunks if they're cftime cftime_nbytes_approx = 64 from dask.utils import parse_bytes + target_chunksize = parse_bytes(dask.config.get("array.chunk-size")) - + # Calculate total elements per chunk elements_per_chunk = target_chunksize // cftime_nbytes_approx - + # Distribute elements across dimensions # Simple approach: try to make chunks roughly cubic - ndim = data.ndim # type:ignore - shape = data.shape # type:ignore + ndim = data.ndim # type:ignore + shape = data.shape # type:ignore if ndim > 0: chunk_size_per_dim = int(elements_per_chunk ** (1.0 / ndim)) chunks = tuple(min(chunk_size_per_dim, dim_size) for dim_size in shape) diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 7f67769fd7c..68a93dfc9e2 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -1059,8 +1059,9 @@ def make_da(): return da + def make_da_cftime(): - yrs = np.arange(2000,2120) + yrs = np.arange(2000, 2120) cftime_dates = xr.date_range( start=f"{yrs[0]}-01-01", end=f"{yrs[-1]}-12-31", @@ -1078,9 +1079,10 @@ def make_da_cftime(): da.attrs["test"] = "test" da.coords["c2"] = 0.5 da.coords["ndcoord"] = da.x * 2 - + return da + def make_ds(): map_ds = xr.Dataset() map_ds["a"] = make_da() @@ -1161,6 +1163,7 @@ def test_auto_chunk_da(obj): np.testing.assert_array_equal(actual, expected) assert actual.chunks == expected.chunks + @pytest.mark.parametrize("obj", [make_da_cftime()]) def test_auto_chunk_da_cftime(obj): actual = obj.chunk("auto").data From 1aba531a52231043ad5db5fe6927a43792e5341c Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Sat, 12 Jul 2025 18:15:09 -0700 Subject: [PATCH 3/7] Fix moving import to be optional --- xarray/namedarray/daskmanager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index 945bcf077dd..c1c9cd1fa86 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -3,7 +3,6 @@ from collections.abc import Callable, Iterable, Sequence from typing import TYPE_CHECKING, Any -import dask import numpy as np from xarray.core.common import _contains_cftime_datetimes @@ -302,8 +301,9 @@ def rechunk( # type: ignore[override] # Preprocess chunks if they're cftime cftime_nbytes_approx = 64 from dask.utils import parse_bytes + from dask import config as dask_config - target_chunksize = parse_bytes(dask.config.get("array.chunk-size")) + target_chunksize = parse_bytes(dask_config.get("array.chunk-size")) # Calculate total elements per chunk elements_per_chunk = target_chunksize // cftime_nbytes_approx From 9429c3d0c2f43cb6cb056916f339e3f36cc39052 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 13 Jul 2025 01:15:37 +0000 Subject: [PATCH 4/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/namedarray/daskmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index c1c9cd1fa86..6c9128c93b3 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -300,8 +300,8 @@ def rechunk( # type: ignore[override] if _contains_cftime_datetimes(data): # Preprocess chunks if they're cftime cftime_nbytes_approx = 64 - from dask.utils import parse_bytes from dask import config as dask_config + from dask.utils import parse_bytes target_chunksize = parse_bytes(dask_config.get("array.chunk-size")) From 3c9d27ef7f4af34e1fbeb9c956c6c6121c0a2e91 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Sat, 12 Jul 2025 18:18:10 -0700 Subject: [PATCH 5/7] Make mypy happy --- xarray/namedarray/daskmanager.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index c1c9cd1fa86..6a14c6dc2cf 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -300,18 +300,16 @@ def rechunk( # type: ignore[override] if _contains_cftime_datetimes(data): # Preprocess chunks if they're cftime cftime_nbytes_approx = 64 - from dask.utils import parse_bytes from dask import config as dask_config + from dask.utils import parse_bytes target_chunksize = parse_bytes(dask_config.get("array.chunk-size")) - # Calculate total elements per chunk elements_per_chunk = target_chunksize // cftime_nbytes_approx - # Distribute elements across dimensions - # Simple approach: try to make chunks roughly cubic - ndim = data.ndim # type:ignore - shape = data.shape # type:ignore + # Try to make chunks roughly cubic + ndim = data.ndim # type:ignore[attr-defined] + shape = data.shape # type:ignore[attr-defined] if ndim > 0: chunk_size_per_dim = int(elements_per_chunk ** (1.0 / ndim)) chunks = tuple(min(chunk_size_per_dim, dim_size) for dim_size in shape) From 5153d2d96717850fcbfb5137a52c566e7921a5d6 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Sat, 12 Jul 2025 20:26:05 -0700 Subject: [PATCH 6/7] Add some clarifying comments about what we need to do to optimise this --- xarray/namedarray/daskmanager.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index 6a14c6dc2cf..afa6a6fa463 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -307,7 +307,12 @@ def rechunk( # type: ignore[override] elements_per_chunk = target_chunksize // cftime_nbytes_approx - # Try to make chunks roughly cubic + """ + Try to make chunks roughly cubic. This needs to be a bit smarter, it + really ought to account for xr.structure.chunks._getchunk and try to + use the default encoding to set the chunk size. + """ + ndim = data.ndim # type:ignore[attr-defined] shape = data.shape # type:ignore[attr-defined] if ndim > 0: From cfdc31b26cb082fee61dd4db7c3ced7a6fd54ace Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 15 Jul 2025 09:00:42 +1000 Subject: [PATCH 7/7] @dcherian's suggestions. Just need to update chunking strategy to respect disk chunks sensibly & this should be ready to go, I think --- xarray/namedarray/daskmanager.py | 23 +++++++---------------- xarray/namedarray/utils.py | 27 +++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/xarray/namedarray/daskmanager.py b/xarray/namedarray/daskmanager.py index afa6a6fa463..5466ad44737 100644 --- a/xarray/namedarray/daskmanager.py +++ b/xarray/namedarray/daskmanager.py @@ -299,26 +299,17 @@ def rechunk( # type: ignore[override] if _contains_cftime_datetimes(data): # Preprocess chunks if they're cftime - cftime_nbytes_approx = 64 + from dask import config as dask_config from dask.utils import parse_bytes + from xarray.namedarray.utils import build_chunkspec + target_chunksize = parse_bytes(dask_config.get("array.chunk-size")) - elements_per_chunk = target_chunksize // cftime_nbytes_approx - - """ - Try to make chunks roughly cubic. This needs to be a bit smarter, it - really ought to account for xr.structure.chunks._getchunk and try to - use the default encoding to set the chunk size. - """ - - ndim = data.ndim # type:ignore[attr-defined] - shape = data.shape # type:ignore[attr-defined] - if ndim > 0: - chunk_size_per_dim = int(elements_per_chunk ** (1.0 / ndim)) - chunks = tuple(min(chunk_size_per_dim, dim_size) for dim_size in shape) - else: - chunks = () + chunks = build_chunkspec( + data, + target_chunksize=target_chunksize, + ) return data.rechunk(chunks, **kwargs) diff --git a/xarray/namedarray/utils.py b/xarray/namedarray/utils.py index 96060730345..4a3bcbcdacd 100644 --- a/xarray/namedarray/utils.py +++ b/xarray/namedarray/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import importlib +import sys import warnings from collections.abc import Hashable, Iterable, Iterator, Mapping from functools import lru_cache @@ -16,6 +17,8 @@ from numpy.typing import NDArray + from xarray.namedarray.parallelcompat import T_ChunkedArray + try: from dask.array.core import Array as DaskArray from dask.typing import DaskCollection @@ -195,6 +198,30 @@ def either_dict_or_kwargs( return pos_kwargs +def build_chunkspec( + data: T_ChunkedArray, + target_chunksize: int, +) -> tuple[int, ...]: + """ + Try to make chunks roughly cubic. This needs to be a bit smarter, it + really ought to account for xr.structure.chunks._getchunk and try to + use the default encoding to set the chunk size. + """ + from xarray.core.formatting import first_n_items + + cftime_nbytes_approx: int = sys.getsizeof(first_n_items(data, 1)) # type: ignore[no-untyped-call] + elements_per_chunk = target_chunksize // cftime_nbytes_approx + ndim = data.ndim # type:ignore[attr-defined] + shape = data.shape # type:ignore[attr-defined] + if ndim > 0: + chunk_size_per_dim = int(elements_per_chunk ** (1.0 / ndim)) + chunks = tuple(min(chunk_size_per_dim, dim_size) for dim_size in shape) + else: + chunks = () + + return chunks + + class ReprObject: """Object that prints as the given value, for use with sentinel values."""