From 53ccafab1087f415033c65c360d30ea5e808ac5e Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Tue, 21 May 2024 14:35:15 +0000 Subject: [PATCH] FEAT-#7283: Introduce MinRowPartitionSize and MinColumnPartitionSize Signed-off-by: Igoshev, Iaroslav --- .../partitioning_examples.svg | 4 +- docs/usage_guide/benchmarking.rst | 12 +-- docs/usage_guide/optimization_notes/index.rst | 7 +- .../range_partitioning_ops.rst | 6 +- modin/config/__init__.py | 4 + modin/config/envvars.py | 97 +++++++++++++++++++ modin/conftest.py | 10 +- .../dataframe/pandas/dataframe/dataframe.py | 22 +++-- .../pandas/partitioning/axis_partition.py | 14 ++- .../pandas/partitioning/partition_manager.py | 12 ++- .../partitioning/partition_manager.py | 4 +- .../column_stores/column_store_dispatcher.py | 6 +- .../io/column_stores/parquet_dispatcher.py | 11 ++- modin/core/io/text/text_file_dispatcher.py | 4 +- modin/core/storage_formats/cudf/parser.py | 9 +- modin/core/storage_formats/pandas/merge.py | 5 +- modin/core/storage_formats/pandas/parsers.py | 9 +- modin/core/storage_formats/pandas/utils.py | 6 +- modin/tests/config/test_envvars.py | 9 +- .../storage_formats/pandas/test_internals.py | 19 ++-- modin/tests/pandas/dataframe/test_indexing.py | 4 +- .../pandas/dataframe/test_map_metadata.py | 4 +- modin/tests/pandas/dataframe/test_udf.py | 4 +- .../pandas/internals/test_repartition.py | 2 +- modin/tests/pandas/test_io.py | 4 +- modin/tests/pandas/test_series.py | 12 +-- modin/tests/pandas/utils.py | 9 +- 27 files changed, 228 insertions(+), 81 deletions(-) diff --git a/docs/img/partitioning_mechanism/partitioning_examples.svg b/docs/img/partitioning_mechanism/partitioning_examples.svg index 27a57f95680..6e084cec32c 100644 --- a/docs/img/partitioning_mechanism/partitioning_examples.svg +++ b/docs/img/partitioning_mechanism/partitioning_examples.svg @@ -1,4 +1,4 @@ - + -
modin.pandas.DataFrame
shape: 30 rows, 10 columns
modin.pandas.DataFrame...
modin.pandas.DataFrame
shape: 256 rows, 10 columns
modin.pandas.DataFrame...
modin.pandas.DataFrame
shape: 256 rows, 256 columns
modin.pandas.DataFrame...
1. Nor of the axes length is greater than the MinPartitionSize, so the whole frame is fit into a single partition.
1. Nor of the axes length is greater th...
2. Only row axis length is greater than the MinPartitionSize and so this is the only axis to be split. The total amount of partitions is equal to the NPartitions.
2. Only row axis length is greater than the...
3. Row and columns axes are greater than the MinPartitionSize and so the frame is split along both axes: each onto the NPartitions number of partitions. The total amount of partitions is the square of NPartitions.
3. Row and columns axes are greater than the MinPar...
Examples of partitioning with the following parameters being set:
Examples of partitioning with the follo...
NPartitions = 4
MinPartitionSize = 32
NPartitions = 4MinPartitionSize = 32
Viewer does not support full SVG 1.1
\ No newline at end of file +
modin.pandas.DataFrame
shape: 30 rows, 10 columns
modin.pandas.DataFrame...
modin.pandas.DataFrame
shape: 256 rows, 10 columns
modin.pandas.DataFrame...
modin.pandas.DataFrame
shape: 256 rows, 256 columns
modin.pandas.DataFrame...
1. The entire frame fits into a single partition.
1. The entire frame fits into a single p...
2. Row axis length is greater than the MinRowPartitionSize and so this is the only axis to be split. The total amount of partitions is equal to the NPartitions.
2. Row axis length is greater than the MinRo...
3. Row and columns axes are greater than the MinRowPartitionSize and MinColumnPartitionSize, respectively, so the frame is split along both axes: each into the NPartitions number of partitions. The total amount of partitions is the square of NPartitions.
3. Row and columns axes are greater than the MinRow...
Examples of partitioning with the following parameters being set:
Examples of partitioning with the follo...
NPartitions = 4
MinRowPartitionSize = 32
MinColumnPartitionSize = 32
NPartitions = 4MinRowPartitionSize = 32...
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/usage_guide/benchmarking.rst b/docs/usage_guide/benchmarking.rst index f26a9dac3ec..535eb52ed86 100644 --- a/docs/usage_guide/benchmarking.rst +++ b/docs/usage_guide/benchmarking.rst @@ -31,13 +31,13 @@ Consider the following ipython script: .. code-block:: python import modin.pandas as pd - from modin.config import MinPartitionSize + from modin.config import MinRowPartitionSize import time import ray # Look at the Ray documentation with respect to the Ray configuration suited to you most. ray.init() - df = pd.DataFrame(list(range(MinPartitionSize.get() * 2))) + df = pd.DataFrame(list(range(MinRowPartitionSize.get() * 2))) %time result = df.map(lambda x: time.sleep(0.1) or x) %time print(result) @@ -82,12 +82,12 @@ following script with benchmark mode on: from io import BytesIO import modin.pandas as pd - from modin.config import BenchmarkMode, MinPartitionSize + from modin.config import BenchmarkMode, MinRowPartitionSize BenchmarkMode.put(True) start = time.time() - df = pd.DataFrame(list(range(MinPartitionSize.get())), columns=['A']) + df = pd.DataFrame(list(range(MinRowPartitionSize.get())), columns=['A']) result1 = df.map(lambda x: time.sleep(0.2) or x + 1) result2 = df.map(lambda x: time.sleep(0.2) or x + 2) result1.to_parquet(BytesIO()) @@ -136,10 +136,10 @@ That will typically block on any asynchronous computation: from io import BytesIO import modin.pandas as pd - from modin.config import MinPartitionSize, NPartitions + from modin.config import MinRowPartitionSize, NPartitions import modin.utils - MinPartitionSize.put(32) + MinRowPartitionSize.put(32) NPartitions.put(16) def slow_add_one(x): diff --git a/docs/usage_guide/optimization_notes/index.rst b/docs/usage_guide/optimization_notes/index.rst index 97841b50470..aadd813e318 100644 --- a/docs/usage_guide/optimization_notes/index.rst +++ b/docs/usage_guide/optimization_notes/index.rst @@ -49,11 +49,14 @@ How Modin partitions a dataframe Modin uses a partitioning scheme that partitions a dataframe along both axes, resulting in a matrix of partitions. The row and column chunk sizes are computed independently based on the length of the appropriate axis and Modin's special :doc:`configuration variables ` -(``NPartitions`` and ``MinPartitionSize``): +(``NPartitions``, ``MinRowPartitionSize`` and ``MinColumnPartitionSize``): - ``NPartitions`` is the maximum number of splits along an axis; by default, it equals to the number of cores on your local machine or cluster of nodes. -- ``MinPartitionSize`` is the minimum number of rows/columns to do a split. For instance, if ``MinPartitionSize`` +- ``MinRowPartitionSize`` is the minimum number of rows to do a split. For instance, if ``MinRowPartitionSize`` + is 32, the row axis will not be split unless the amount of rows is greater than 32. If it is is greater, for example, 34, + then the row axis is sliced into two partitions: containing 32 and 2 rows accordingly. +- ``MinColumnPartitionSize`` is the minimum number of columns to do a split. For instance, if ``MinColumnPartitionSize`` is 32, the column axis will not be split unless the amount of columns is greater than 32. If it is is greater, for example, 34, then the column axis is sliced into two partitions: containing 32 and 2 columns accordingly. diff --git a/docs/usage_guide/optimization_notes/range_partitioning_ops.rst b/docs/usage_guide/optimization_notes/range_partitioning_ops.rst index 2c352f49c20..2aeee3ad273 100644 --- a/docs/usage_guide/optimization_notes/range_partitioning_ops.rst +++ b/docs/usage_guide/optimization_notes/range_partitioning_ops.rst @@ -105,7 +105,7 @@ by reading its source code: else: raise NotImplementedError(dtype) - pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy() + pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinRowPartitionSize.get())).to_numpy() nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000] duplicate_rate = [0, 0.1, 0.5, 0.95] @@ -192,7 +192,7 @@ micro-benchmark by reading its source code: cfg.CpuCount.put(16) - pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy() + pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinRowPartitionSize.get())).to_numpy() nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000] duplicate_rate = [0, 0.1, 0.5, 0.95] @@ -312,7 +312,7 @@ by reading its source code: else: raise NotImplementedError(dtype) - pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy() + pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinRowPartitionSize.get())).to_numpy() nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000] duplicate_rate = [0, 0.1, 0.5, 0.95] diff --git a/modin/config/__init__.py b/modin/config/__init__.py index 86ae3abcb17..35e48f90dc2 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -37,7 +37,9 @@ LogMemoryInterval, LogMode, Memory, + MinColumnPartitionSize, MinPartitionSize, + MinRowPartitionSize, ModinNumpy, NPartitions, PersistentPickle, @@ -82,6 +84,8 @@ # Partitioning "NPartitions", "MinPartitionSize", + "MinRowPartitionSize", + "MinColumnPartitionSize", # ASV specific "TestDatasetSize", "AsvImplementation", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 59f1ca265ed..92813a4165a 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -644,6 +644,8 @@ def put(cls, value: int) -> None: if value <= 0: raise ValueError(f"Min partition size should be > 0, passed value {value}") super().put(value) + MinRowPartitionSize.put(value) + MinColumnPartitionSize.put(value) @classmethod def get(cls) -> int: @@ -654,6 +656,13 @@ def get(cls) -> int: ------- int """ + from modin.error_message import ErrorMessage + + ErrorMessage.single_warning( + "`MinPartitionSize` is deprecated and will be removed in a future version. " + + "Use `MinRowPartitionSize` and `MinColumnPartitionSize` instead.", + FutureWarning, + ) min_partition_size = super().get() if min_partition_size <= 0: raise ValueError( @@ -662,6 +671,94 @@ def get(cls) -> int: return min_partition_size +class MinRowPartitionSize(EnvironmentVariable, type=int): + """ + Minimum number of rows in a single pandas partition split. + + Once a partition for a pandas dataframe has more than this many elements, + Modin adds another partition. + """ + + varname = "MODIN_MIN_ROW_PARTITION_SIZE" + default = 32 + + @classmethod + def put(cls, value: int) -> None: + """ + Set ``MinRowPartitionSize`` with extra checks. + + Parameters + ---------- + value : int + Config value to set. + """ + if value <= 0: + raise ValueError( + f"Min row partition size should be > 0, passed value {value}" + ) + super().put(value) + + @classmethod + def get(cls) -> int: + """ + Get ``MinRowPartitionSize`` with extra checks. + + Returns + ------- + int + """ + min_row_partition_size = super().get() + if min_row_partition_size <= 0: + raise ValueError( + f"`MinRowPartitionSize` should be > 0; current value: {min_row_partition_size}" + ) + return min_row_partition_size + + +class MinColumnPartitionSize(EnvironmentVariable, type=int): + """ + Minimum number of columns in a single pandas partition split. + + Once a partition for a pandas dataframe has more than this many elements, + Modin adds another partition. + """ + + varname = "MODIN_MIN_COLUMN_PARTITION_SIZE" + default = 32 + + @classmethod + def put(cls, value: int) -> None: + """ + Set ``MinColumnPartitionSize`` with extra checks. + + Parameters + ---------- + value : int + Config value to set. + """ + if value <= 0: + raise ValueError( + f"Min column partition size should be > 0, passed value {value}" + ) + super().put(value) + + @classmethod + def get(cls) -> int: + """ + Get ``MinColumnPartitionSize`` with extra checks. + + Returns + ------- + int + """ + min_column_partition_size = super().get() + if min_column_partition_size <= 0: + raise ValueError( + f"`MinColumnPartitionSize` should be > 0; current value: {min_column_partition_size}" + ) + return min_column_partition_size + + class TestReadFromSqlServer(EnvironmentVariable, type=bool): """Set to true to test reading from SQL server.""" diff --git a/modin/conftest.py b/modin/conftest.py index e45fd0f99c6..a025b9783c7 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -60,7 +60,7 @@ def _saving_make_api_url(token, _make_api_url=modin.utils._make_api_url): BenchmarkMode, GithubCI, IsExperimental, - MinPartitionSize, + MinRowPartitionSize, NPartitions, ) from modin.core.execution.dispatching.factories import factories # noqa: E402 @@ -487,11 +487,11 @@ def set_async_read_mode(request): @pytest.fixture -def set_min_partition_size(request): - old_min_partition_size = MinPartitionSize.get() - MinPartitionSize.put(request.param) +def set_min_row_partition_size(request): + old_min_row_partition_size = MinRowPartitionSize.get() + MinRowPartitionSize.put(request.param) yield - MinPartitionSize.put(old_min_partition_size) + MinRowPartitionSize.put(old_min_row_partition_size) ray_client_server = None diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 3b56c5762f6..ad20b884e58 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -32,7 +32,13 @@ from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype from pandas.core.indexes.api import Index, RangeIndex -from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions +from modin.config import ( + Engine, + IsRayCluster, + MinColumnPartitionSize, + MinRowPartitionSize, + NPartitions, +) from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index from modin.core.dataframe.pandas.dataframe.utils import ( @@ -1592,7 +1598,7 @@ def _reorder_labels(self, row_positions=None, col_positions=None): new_lengths = get_length_list( axis_len=len(row_idx), num_splits=ordered_rows.shape[0], - min_block_size=MinPartitionSize.get(), + min_block_size=MinRowPartitionSize.get(), ) else: # If the frame's partitioning was preserved then @@ -1630,7 +1636,7 @@ def _reorder_labels(self, row_positions=None, col_positions=None): new_widths = get_length_list( axis_len=len(col_idx), num_splits=ordered_cols.shape[1], - min_block_size=MinPartitionSize.get(), + min_block_size=MinColumnPartitionSize.get(), ) else: # If the frame's partitioning was preserved then @@ -2635,10 +2641,10 @@ def _apply_func_to_range_partitioning( # algorithm how many partitions we want to end up with, so it samples and finds pivots # according to that. if sampling_probability >= 1: - from modin.config import MinPartitionSize + from modin.config import MinRowPartitionSize - ideal_num_new_partitions = round(len(grouper) / MinPartitionSize.get()) - if len(grouper) < MinPartitionSize.get() or ideal_num_new_partitions < 2: + ideal_num_new_partitions = round(len(grouper) / MinRowPartitionSize.get()) + if len(grouper) < MinRowPartitionSize.get() or ideal_num_new_partitions < 2: # If the data is too small, we shouldn't try reshuffling/repartitioning but rather # simply combine all partitions and apply the sorting to the whole dataframe return grouper.combine_and_apply(func=func) @@ -3582,7 +3588,7 @@ def broadcast_apply_full_axis( kw["row_lengths"] = get_length_list( axis_len=len(new_index), num_splits=new_partitions.shape[0], - min_block_size=MinPartitionSize.get(), + min_block_size=MinRowPartitionSize.get(), ) elif axis == 1: if self._row_lengths_cache is not None and len(new_index) == sum( @@ -3594,7 +3600,7 @@ def broadcast_apply_full_axis( kw["column_widths"] = get_length_list( axis_len=len(new_columns), num_splits=new_partitions.shape[1], - min_block_size=MinPartitionSize.get(), + min_block_size=MinColumnPartitionSize.get(), ) elif axis == 0: if self._column_widths_cache is not None and len( diff --git a/modin/core/dataframe/pandas/partitioning/axis_partition.py b/modin/core/dataframe/pandas/partitioning/axis_partition.py index c89b012b1ca..33532009626 100644 --- a/modin/core/dataframe/pandas/partitioning/axis_partition.py +++ b/modin/core/dataframe/pandas/partitioning/axis_partition.py @@ -18,7 +18,7 @@ import numpy as np import pandas -from modin.config import MinPartitionSize +from modin.config import MinColumnPartitionSize, MinRowPartitionSize from modin.core.dataframe.base.partitioning.axis_partition import ( BaseDataframeAxisPartition, ) @@ -277,7 +277,11 @@ def apply( for part in axis_partition.list_of_blocks ] ), - min_block_size=MinPartitionSize.get(), + min_block_size=( + MinRowPartitionSize.get() + if self.axis == 0 + else MinColumnPartitionSize.get() + ), ) ) result = self._wrap_partitions( @@ -289,7 +293,11 @@ def apply( num_splits, maintain_partitioning, *self.list_of_blocks, - min_block_size=MinPartitionSize.get(), + min_block_size=( + MinRowPartitionSize.get() + if self.axis == 0 + else MinColumnPartitionSize.get() + ), lengths=lengths, manual_partition=manual_partition, ) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 79b3d9ccf75..5b74018696a 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -31,7 +31,8 @@ BenchmarkMode, CpuCount, Engine, - MinPartitionSize, + MinColumnPartitionSize, + MinRowPartitionSize, NPartitions, PersistentPickle, ProgressBar, @@ -1024,9 +1025,12 @@ def from_pandas(cls, df, return_dims=False): A NumPy array with partitions (with dimensions or not). """ num_splits = NPartitions.get() - min_block_size = MinPartitionSize.get() - row_chunksize = compute_chunksize(df.shape[0], num_splits, min_block_size) - col_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size) + min_row_block_size = MinRowPartitionSize.get() + min_column_block_size = MinColumnPartitionSize.get() + row_chunksize = compute_chunksize(df.shape[0], num_splits, min_row_block_size) + col_chunksize = compute_chunksize( + df.shape[1], num_splits, min_column_block_size + ) bar_format = ( "{l_bar}{bar}{r_bar}" diff --git a/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py index 4fc5e84a181..d5a69f30195 100644 --- a/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py @@ -16,7 +16,7 @@ import numpy as np import ray -from modin.config import GpuCount, MinPartitionSize +from modin.config import GpuCount, MinRowPartitionSize from modin.core.execution.ray.common import RayWrapper from modin.core.execution.ray.generic.partitioning import ( GenericRayDataframePartitionManager, @@ -126,7 +126,7 @@ def from_pandas(cls, df, return_dims=False): put_func = cls._partition_class.put # For now, we default to row partitioning pandas_dfs = split_result_of_axis_func_pandas( - 0, num_splits, df, min_block_size=MinPartitionSize.get() + 0, num_splits, df, min_block_size=MinRowPartitionSize.get() ) keys = [ put_func(cls._get_gpu_managers()[i], pandas_dfs[i]) diff --git a/modin/core/io/column_stores/column_store_dispatcher.py b/modin/core/io/column_stores/column_store_dispatcher.py index 94d801ac807..d77d7687ff6 100644 --- a/modin/core/io/column_stores/column_store_dispatcher.py +++ b/modin/core/io/column_stores/column_store_dispatcher.py @@ -22,7 +22,7 @@ import numpy as np import pandas -from modin.config import MinPartitionSize, NPartitions +from modin.config import MinColumnPartitionSize, MinRowPartitionSize, NPartitions from modin.core.io.file_dispatcher import FileDispatcher from modin.core.storage_formats.pandas.utils import compute_chunksize @@ -130,7 +130,7 @@ def build_index(cls, partition_ids): index = index_len index_len = len(index) num_partitions = NPartitions.get() - min_block_size = MinPartitionSize.get() + min_block_size = MinRowPartitionSize.get() index_chunksize = compute_chunksize(index_len, num_partitions, min_block_size) if index_chunksize > index_len: row_lengths = [index_len] + [0 for _ in range(num_partitions - 1)] @@ -177,7 +177,7 @@ def build_columns(cls, columns, num_row_parts=None): else: num_remaining_parts = round(NPartitions.get() / num_row_parts) min_block_size = min( - columns_length // num_remaining_parts, MinPartitionSize.get() + columns_length // num_remaining_parts, MinColumnPartitionSize.get() ) column_splits = compute_chunksize( columns_length, NPartitions.get(), max(1, min_block_size) diff --git a/modin/core/io/column_stores/parquet_dispatcher.py b/modin/core/io/column_stores/parquet_dispatcher.py index 067f08cd60e..1899df8c7ab 100644 --- a/modin/core/io/column_stores/parquet_dispatcher.py +++ b/modin/core/io/column_stores/parquet_dispatcher.py @@ -30,7 +30,7 @@ from packaging import version from pandas.io.common import stringify_path -from modin.config import MinPartitionSize, NPartitions +from modin.config import MinColumnPartitionSize, MinRowPartitionSize, NPartitions from modin.core.io.column_stores.column_store_dispatcher import ColumnStoreDispatcher from modin.error_message import ErrorMessage from modin.utils import _inherit_docstrings @@ -659,7 +659,7 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths): if row_lengths is not None: desired_row_nparts = max( - 1, min(sum(row_lengths) // MinPartitionSize.get(), NPartitions.get()) + 1, min(sum(row_lengths) // MinRowPartitionSize.get(), NPartitions.get()) ) else: desired_row_nparts = actual_row_nparts @@ -703,14 +703,15 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths): new_parts[offset + i].append(split[i]) new_row_lengths.extend( - get_length_list(part_len, num_splits, MinPartitionSize.get()) + get_length_list(part_len, num_splits, MinRowPartitionSize.get()) ) remote_parts = np.array(new_parts) row_lengths = new_row_lengths desired_col_nparts = max( - 1, min(sum(column_widths) // MinPartitionSize.get(), NPartitions.get()) + 1, + min(sum(column_widths) // MinColumnPartitionSize.get(), NPartitions.get()), ) # only repartition along cols if the actual number of col splits 1.5 times BIGGER than desired if 1.5 * desired_col_nparts < remote_parts.shape[1]: @@ -729,7 +730,7 @@ def _normalize_partitioning(cls, remote_parts, row_lengths, column_widths): ] ) column_widths = get_length_list( - sum(column_widths), desired_col_nparts, MinPartitionSize.get() + sum(column_widths), desired_col_nparts, MinColumnPartitionSize.get() ) return remote_parts, row_lengths, column_widths diff --git a/modin/core/io/text/text_file_dispatcher.py b/modin/core/io/text/text_file_dispatcher.py index db583700fea..6f7e68393b9 100644 --- a/modin/core/io/text/text_file_dispatcher.py +++ b/modin/core/io/text/text_file_dispatcher.py @@ -30,7 +30,7 @@ from pandas.core.dtypes.common import is_list_like from pandas.io.common import stringify_path -from modin.config import MinPartitionSize, NPartitions +from modin.config import MinColumnPartitionSize, NPartitions from modin.core.io.file_dispatcher import FileDispatcher, OpenFile from modin.core.io.text.utils import CustomNewlineIterator from modin.core.storage_formats.pandas.utils import compute_chunksize @@ -571,7 +571,7 @@ def _define_metadata( """ # This is the number of splits for the columns num_splits = min(len(column_names) or 1, NPartitions.get()) - min_block_size = MinPartitionSize.get() + min_block_size = MinColumnPartitionSize.get() column_chunksize = compute_chunksize(df.shape[1], num_splits, min_block_size) if column_chunksize > len(column_names): column_widths = [len(column_names)] diff --git a/modin/core/storage_formats/cudf/parser.py b/modin/core/storage_formats/cudf/parser.py index dc428079a35..062d71e0e59 100644 --- a/modin/core/storage_formats/cudf/parser.py +++ b/modin/core/storage_formats/cudf/parser.py @@ -20,7 +20,7 @@ from pandas.core.dtypes.concat import union_categoricals from pandas.io.common import infer_compression -from modin.config import MinPartitionSize +from modin.config import MinColumnPartitionSize, MinRowPartitionSize from modin.core.execution.ray.implementations.cudf_on_ray.partitioning.partition_manager import ( GPU_MANAGERS, ) @@ -41,7 +41,12 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover A list of pandas DataFrames. """ splits = split_result_of_axis_func_pandas( - axis, num_splits, df, min_block_size=MinPartitionSize.get() + axis, + num_splits, + df, + min_block_size=( + MinRowPartitionSize.get() if axis == 0 else MinColumnPartitionSize.get() + ), ) if not isinstance(splits, list): splits = [splits] diff --git a/modin/core/storage_formats/pandas/merge.py b/modin/core/storage_formats/pandas/merge.py index 37a9c325bd0..ca983785926 100644 --- a/modin/core/storage_formats/pandas/merge.py +++ b/modin/core/storage_formats/pandas/merge.py @@ -21,7 +21,7 @@ from pandas.core.dtypes.common import is_list_like from pandas.errors import MergeError -from modin.config import MinPartitionSize, NPartitions +from modin.config import MinRowPartitionSize, NPartitions from modin.core.dataframe.base.dataframe.utils import join_columns from modin.core.dataframe.pandas.metadata import ModinDtypes @@ -190,7 +190,8 @@ def map_func( if ( left._modin_frame._partitions.shape[0] < 0.3 * NPartitions.get() # to avoid empty partitions after repartition; can materialize index - and len(left._modin_frame) > NPartitions.get() * MinPartitionSize.get() + and len(left._modin_frame) + > NPartitions.get() * MinRowPartitionSize.get() ): left = left.repartition(axis=0) diff --git a/modin/core/storage_formats/pandas/parsers.py b/modin/core/storage_formats/pandas/parsers.py index c5478672b0b..7910dbe13fe 100644 --- a/modin/core/storage_formats/pandas/parsers.py +++ b/modin/core/storage_formats/pandas/parsers.py @@ -54,7 +54,7 @@ from pandas.io.common import infer_compression from pandas.util._decorators import doc -from modin.config import MinPartitionSize +from modin.config import MinColumnPartitionSize, MinRowPartitionSize from modin.core.io.file_dispatcher import OpenFile from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas from modin.db_conn import ModinDatabaseConnection @@ -116,7 +116,12 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover A list of pandas DataFrames. """ splits = split_result_of_axis_func_pandas( - axis, num_splits, df, min_block_size=MinPartitionSize.get() + axis, + num_splits, + df, + min_block_size=( + MinRowPartitionSize.get() if axis == 0 else MinColumnPartitionSize.get() + ), ) if not isinstance(splits, list): splits = [splits] diff --git a/modin/core/storage_formats/pandas/utils.py b/modin/core/storage_formats/pandas/utils.py index ac3297cb5ac..b560befde69 100644 --- a/modin/core/storage_formats/pandas/utils.py +++ b/modin/core/storage_formats/pandas/utils.py @@ -22,7 +22,7 @@ import numpy as np import pandas -from modin.config import MinPartitionSize, NPartitions +from modin.config import MinColumnPartitionSize, MinRowPartitionSize, NPartitions def compute_chunksize(axis_len: int, num_splits: int, min_block_size: int) -> int: @@ -254,7 +254,9 @@ def merge_partitioning(left, right, axis=1): chunk_size = compute_chunksize( axis_len=res_shape, num_splits=NPartitions.get(), - min_block_size=MinPartitionSize.get(), + min_block_size=( + MinRowPartitionSize.get() if axis == 0 else MinColumnPartitionSize.get() + ), ) return ceil(res_shape / chunk_size) else: diff --git a/modin/tests/config/test_envvars.py b/modin/tests/config/test_envvars.py index 4341a64d79d..03bdc01fa9e 100644 --- a/modin/tests/config/test_envvars.py +++ b/modin/tests/config/test_envvars.py @@ -329,7 +329,14 @@ def test_context_manager_update_config(modify_config): @pytest.mark.parametrize( "config_name", - ["NPartitions", "CpuCount", "LogMemoryInterval", "LogFileSize", "MinPartitionSize"], + [ + "NPartitions", + "CpuCount", + "LogMemoryInterval", + "LogFileSize", + "MinRowPartitionSize", + "MinColumnPartitionSize", + ], ) def test_wrong_values(config_name): config: cfg.EnvironmentVariable = getattr(cfg, config_name) diff --git a/modin/tests/core/storage_formats/pandas/test_internals.py b/modin/tests/core/storage_formats/pandas/test_internals.py index 9a16022a371..71523677a81 100644 --- a/modin/tests/core/storage_formats/pandas/test_internals.py +++ b/modin/tests/core/storage_formats/pandas/test_internals.py @@ -23,7 +23,8 @@ from modin.config import ( CpuCount, Engine, - MinPartitionSize, + MinColumnPartitionSize, + MinRowPartitionSize, NPartitions, RangePartitioning, context, @@ -151,7 +152,7 @@ def construct_modin_df_by_scheme(pandas_df, partitioning_scheme): axis=0, num_splits=len(row_lengths), result=pandas_df, - min_block_size=MinPartitionSize.get(), + min_block_size=MinRowPartitionSize.get(), length_list=row_lengths, ) partitions = [ @@ -159,7 +160,7 @@ def construct_modin_df_by_scheme(pandas_df, partitioning_scheme): axis=1, num_splits=len(column_widths), result=row_part, - min_block_size=MinPartitionSize.get(), + min_block_size=MinColumnPartitionSize.get(), length_list=column_widths, ) for row_part in row_partitions @@ -717,7 +718,7 @@ def test_reorder_labels_dtypes(): @pytest.mark.parametrize( "left_partitioning, right_partitioning, ref_with_cache_available, ref_with_no_cache", - # Note: this test takes into consideration that `MinPartitionSize == 32` and `NPartitions == 4` + # Note: this test takes into consideration that `MinRowPartitionSize == 32`, MinColumnPartitionSize == 32` and `NPartitions == 4` [ ( [2], @@ -752,7 +753,9 @@ def test_reorder_labels_dtypes(): ], ) @pytest.mark.parametrize( - "modify_config", [{NPartitions: 4, MinPartitionSize: 32}], indirect=True + "modify_config", + [{NPartitions: 4, MinRowPartitionSize: 32, MinColumnPartitionSize: 32}], + indirect=True, ) def test_merge_partitioning( left_partitioning, @@ -2655,7 +2658,7 @@ def remote_func(): ], ) def test_map_approaches(partitioning_scheme, expected_map_approach): - data_size = MinPartitionSize.get() * CpuCount.get() + data_size = MinRowPartitionSize.get() * CpuCount.get() data = {f"col{i}": np.ones(data_size) for i in range(data_size)} df = pandas.DataFrame(data) @@ -2674,8 +2677,8 @@ def test_map_approaches(partitioning_scheme, expected_map_approach): def test_map_partitions_joined_by_column(): with context(NPartitions=CpuCount.get() * 2): - ncols = MinPartitionSize.get() - nrows = MinPartitionSize.get() * CpuCount.get() * 2 + ncols = MinColumnPartitionSize.get() + nrows = MinRowPartitionSize.get() * CpuCount.get() * 2 data = {f"col{i}": np.ones(nrows) for i in range(ncols)} df = pd.DataFrame(data) partitions = df._query_compiler._modin_frame._partitions diff --git a/modin/tests/pandas/dataframe/test_indexing.py b/modin/tests/pandas/dataframe/test_indexing.py index b3f1bed2dc9..eb94a0ec44b 100644 --- a/modin/tests/pandas/dataframe/test_indexing.py +++ b/modin/tests/pandas/dataframe/test_indexing.py @@ -21,7 +21,7 @@ from pandas._testing import ensure_clean import modin.pandas as pd -from modin.config import MinPartitionSize, NPartitions +from modin.config import MinRowPartitionSize, NPartitions from modin.pandas.indexing import is_range_like from modin.pandas.testing import assert_index_equal from modin.tests.pandas.utils import ( @@ -2634,7 +2634,7 @@ def test__getitem_bool_with_empty_partition(): # This test case comes from # https://github.com/modin-project/modin/issues/5188 - size = MinPartitionSize.get() + size = MinRowPartitionSize.get() pandas_series = pandas.Series([True if i % 2 else False for i in range(size)]) modin_series = pd.Series(pandas_series) diff --git a/modin/tests/pandas/dataframe/test_map_metadata.py b/modin/tests/pandas/dataframe/test_map_metadata.py index 9c6ed0cc2af..e46dfe9754f 100644 --- a/modin/tests/pandas/dataframe/test_map_metadata.py +++ b/modin/tests/pandas/dataframe/test_map_metadata.py @@ -19,7 +19,7 @@ import pytest import modin.pandas as pd -from modin.config import MinPartitionSize, NPartitions, StorageFormat +from modin.config import MinRowPartitionSize, NPartitions, StorageFormat from modin.core.dataframe.pandas.metadata import LazyProxyCategoricalDtype from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas from modin.pandas.testing import assert_index_equal, assert_series_equal @@ -586,7 +586,7 @@ def _get_lazy_proxy(): axis=0, num_splits=nchunks, result=pandas_df, - min_block_size=MinPartitionSize.get(), + min_block_size=MinRowPartitionSize.get(), length_list=[2, 2, 2], ) diff --git a/modin/tests/pandas/dataframe/test_udf.py b/modin/tests/pandas/dataframe/test_udf.py index 32dd0d4ff61..a3474103a34 100644 --- a/modin/tests/pandas/dataframe/test_udf.py +++ b/modin/tests/pandas/dataframe/test_udf.py @@ -18,7 +18,7 @@ from pandas.core.dtypes.common import is_list_like import modin.pandas as pd -from modin.config import MinPartitionSize, NPartitions +from modin.config import MinRowPartitionSize, NPartitions from modin.tests.pandas.utils import ( agg_func_except_keys, agg_func_except_values, @@ -201,7 +201,7 @@ def test_explode_all_partitions(column, ignore_index): # expand every row in the input data into two rows. It's especially # important that the input data has list-like elements that must be # expanded at the boundaries of the partitions, e.g. at row 31. - num_rows = NPartitions.get() * MinPartitionSize.get() + num_rows = NPartitions.get() * MinRowPartitionSize.get() data = {"A": [[3, 4]] * num_rows, "C": [["a", "b"]] * num_rows} eval_general( *create_test_dfs(data), diff --git a/modin/tests/pandas/internals/test_repartition.py b/modin/tests/pandas/internals/test_repartition.py index cfbd7128a3c..9a186d19b64 100644 --- a/modin/tests/pandas/internals/test_repartition.py +++ b/modin/tests/pandas/internals/test_repartition.py @@ -62,6 +62,6 @@ def test_repartition(axis, dtype): def test_repartition_7170(): - with context(MinPartitionSize=102, NPartitions=5): + with context(MinColumnPartitionSize=102, NPartitions=5): df = pd.DataFrame(np.random.rand(10000, 100)) _ = df._repartition(axis=1).to_numpy() diff --git a/modin/tests/pandas/test_io.py b/modin/tests/pandas/test_io.py index e1afab5a1bd..3088bc1ab65 100644 --- a/modin/tests/pandas/test_io.py +++ b/modin/tests/pandas/test_io.py @@ -39,7 +39,7 @@ AsyncReadMode, Engine, IsExperimental, - MinPartitionSize, + MinRowPartitionSize, ReadSqlEngine, StorageFormat, TestDatasetSize, @@ -1556,7 +1556,7 @@ def test_read_parquet_indexing_by_column(self, tmp_path, engine, make_parquet_fi # which fails when min_partition_size < nrows < min_partition_size * (num_partitions - 1) nrows = ( - MinPartitionSize.get() + 1 + MinRowPartitionSize.get() + 1 ) # Use the minimal guaranteed failing value for nrows. unique_filename = get_unique_filename(extension="parquet", data_dir=tmp_path) make_parquet_file(filename=unique_filename, nrows=nrows) diff --git a/modin/tests/pandas/test_series.py b/modin/tests/pandas/test_series.py index b7be43c62dc..b8dae2a2f70 100644 --- a/modin/tests/pandas/test_series.py +++ b/modin/tests/pandas/test_series.py @@ -1110,12 +1110,12 @@ def test_astype_categorical(data): @pytest.mark.parametrize("data", [["a", "a", "b", "c", "c", "d", "b", "d"]]) @pytest.mark.parametrize( - "set_min_partition_size", + "set_min_row_partition_size", [2, 4], - ids=["four_partitions", "two_partitions"], + ids=["four_row_partitions", "two_row_partitions"], indirect=True, ) -def test_astype_categorical_issue5722(data, set_min_partition_size): +def test_astype_categorical_issue5722(data, set_min_row_partition_size): modin_series, pandas_series = create_test_series(data) modin_result = modin_series.astype("category") @@ -4906,12 +4906,12 @@ def test_cat_codes(data): @pytest.mark.parametrize( - "set_min_partition_size", + "set_min_row_partition_size", [1, 2], - ids=["four_partitions", "two_partitions"], + ids=["four_row_partitions", "two_row_partitions"], indirect=True, ) -def test_cat_codes_issue5650(set_min_partition_size): +def test_cat_codes_issue5650(set_min_row_partition_size): data = {"name": ["abc", "def", "ghi", "jkl"]} pandas_df = pandas.DataFrame(data) pandas_df = pandas_df.astype("category") diff --git a/modin/tests/pandas/utils.py b/modin/tests/pandas/utils.py index 2dd4346c814..17af73d295b 100644 --- a/modin/tests/pandas/utils.py +++ b/modin/tests/pandas/utils.py @@ -41,7 +41,8 @@ import modin.pandas as pd from modin.config import ( Engine, - MinPartitionSize, + MinColumnPartitionSize, + MinRowPartitionSize, NPartitions, RangePartitioning, TestDatasetSize, @@ -248,11 +249,11 @@ # Fully fill all of the partitions used in tests. test_data_large_categorical_dataframe = { - i: pandas.Categorical(np.arange(NPartitions.get() * MinPartitionSize.get())) - for i in range(NPartitions.get() * MinPartitionSize.get()) + i: pandas.Categorical(np.arange(NPartitions.get() * MinRowPartitionSize.get())) + for i in range(NPartitions.get() * MinColumnPartitionSize.get()) } test_data_large_categorical_series_values = [ - pandas.Categorical(np.arange(NPartitions.get() * MinPartitionSize.get())) + pandas.Categorical(np.arange(NPartitions.get() * MinRowPartitionSize.get())) ] test_data_large_categorical_series_keys = ["categorical_series"]