diff --git a/dask_ml/cluster/k_means.py b/dask_ml/cluster/k_means.py index b961cec38..474f20571 100644 --- a/dask_ml/cluster/k_means.py +++ b/dask_ml/cluster/k_means.py @@ -20,13 +20,13 @@ ) from ..utils import _timed, _timer, check_array, row_norms +import numba # isort:skip (see https://github.com/dask/dask-ml/pull/577) + if SK_024: from ._compat import _kmeans_plusplus else: from ._compat import _k_init as _kmeans_plusplus -import numba # isort:skip (see https://github.com/dask/dask-ml/pull/577) - logger = logging.getLogger(__name__) diff --git a/dask_ml/impute.py b/dask_ml/impute.py index 76ef02578..1b1101a9b 100644 --- a/dask_ml/impute.py +++ b/dask_ml/impute.py @@ -70,12 +70,18 @@ def _fit_frame(self, X): if self.strategy == "mean": avg = X.mean(axis=0).values elif self.strategy == "median": - avg = X.quantile().values + avg = [np.median(X[col].dropna()) for col in X.columns] elif self.strategy == "constant": avg = np.full(len(X.columns), self.fill_value) else: - avg = [X[col].value_counts().nlargest(1).index for col in X.columns] - avg = np.concatenate(*dask.compute(avg)) + avg = [] + for col in X.columns: + val_counts = X[col].value_counts().reset_index() + if isinstance(X, dd.DataFrame): + x = val_counts.to_dask_array(lengths=True) + else: + x = val_counts.values + avg.append(x[(x[:, 1] == x[:, 1][0])][:, 0].min()) self.statistics_ = pd.Series(dask.compute(avg)[0], index=X.columns) diff --git a/tests/test_impute.py b/tests/test_impute.py index 130abf422..f395d10d2 100644 --- a/tests/test_impute.py +++ b/tests/test_impute.py @@ -8,7 +8,6 @@ import dask_ml.datasets import dask_ml.impute -from dask_ml._compat import DASK_2_26_0, PANDAS_1_2_0 from dask_ml.utils import assert_estimator_equal rng = np.random.RandomState(0) @@ -96,8 +95,6 @@ def test_simple_imputer_add_indicator_raises(): @pytest.mark.parametrize("daskify", [True, False]) @pytest.mark.parametrize("strategy", ["median", "most_frequent", "constant"]) def test_frame_strategies(daskify, strategy): - if strategy == "most_frequent" and PANDAS_1_2_0: - raise pytest.skip("Behavior change in pandas. Unclear.") df = pd.DataFrame({"A": [1, 1, np.nan, np.nan, 2, 2]}) if daskify: df = dd.from_pandas(df, 2) @@ -109,14 +106,12 @@ def test_frame_strategies(daskify, strategy): b = dask_ml.impute.SimpleImputer(strategy=strategy, fill_value=fill_value) b.fit(df) - if not daskify and strategy == "median": - expected = pd.Series([1.5], index=["A"]) - elif daskify and strategy == "median" and DASK_2_26_0: - # New quantile implementation in Dask - expected = pd.Series([1.0], index=["A"]) - else: - expected = pd.Series([2], index=["A"]) - tm.assert_series_equal(b.statistics_, expected, check_dtype=False) + c = sklearn.impute.SimpleImputer(strategy=strategy, fill_value=fill_value) + c.fit(df) + + tm.assert_series_equal( + b.statistics_, pd.Series(c.statistics_, index=["A"]), check_dtype=False + ) def test_impute_most_frequent():