From 550e254a7f90491703db95c7e6fb103bbb495230 Mon Sep 17 00:00:00 2001 From: "Igoshev, Iaroslav" Date: Wed, 24 Apr 2024 16:15:16 +0000 Subject: [PATCH] Add from_map Signed-off-by: Igoshev, Iaroslav --- .../implementations/pandas_on_dask/io/io.py | 45 ++++++++++++++++++ .../dispatching/factories/dispatcher.py | 5 ++ .../dispatching/factories/factories.py | 19 ++++++++ .../implementations/pandas_on_ray/io/io.py | 47 +++++++++++++++++++ .../pandas_on_unidist/io/io.py | 46 ++++++++++++++++++ modin/core/io/io.py | 28 +++++++++++ modin/pandas/io.py | 30 ++++++++++++ modin/tests/pandas/test_io.py | 19 +++++++- 8 files changed, 238 insertions(+), 1 deletion(-) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index b68ad983715..217bf8e7e0c 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -13,6 +13,8 @@ """Module houses class that implements ``BaseIO`` using Dask as an execution engine.""" +import numpy as np +import pandas from distributed.client import default_client from modin.core.execution.dask.common import DaskWrapper @@ -188,3 +190,46 @@ def df_to_series(df): partitions = [client.submit(df_to_series, part) for part in partitions] return from_delayed(partitions) + + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + partitions = np.array( + [ + [ + cls.frame_partition_cls( + deploy_map_func.remote(func, obj, *args, **kwargs) + ) + ] + for obj in iterable + ] + ) + return cls.query_compiler_cls(cls.frame_cls(partitions)) + + +def deploy_map_func(func, obj, *args, **kwargs): + result = func(obj, *args, **kwargs) + if not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result diff --git a/modin/core/execution/dispatching/factories/dispatcher.py b/modin/core/execution/dispatching/factories/dispatcher.py index 99c6153264c..0bbe84af9aa 100644 --- a/modin/core/execution/dispatching/factories/dispatcher.py +++ b/modin/core/execution/dispatching/factories/dispatcher.py @@ -191,6 +191,11 @@ def from_ray(cls, ray_obj): def from_dask(cls, dask_obj): return cls.get_factory()._from_dask(dask_obj) + @classmethod + @_inherit_docstrings(factories.BaseFactory._from_map) + def from_map(cls, func, iterable, *args, **kwargs): + return cls.get_factory()._from_map(func, iterable, *args, **kwargs) + @classmethod @_inherit_docstrings(factories.BaseFactory._read_parquet) def read_parquet(cls, **kwargs): diff --git a/modin/core/execution/dispatching/factories/factories.py b/modin/core/execution/dispatching/factories/factories.py index d521a40f2da..98c11b7be8e 100644 --- a/modin/core/execution/dispatching/factories/factories.py +++ b/modin/core/execution/dispatching/factories/factories.py @@ -221,6 +221,25 @@ def _from_ray(cls, ray_obj): def _from_dask(cls, dask_obj): return cls.io_cls.from_dask(dask_obj) + @classmethod + @doc( + _doc_io_method_template, + source="a map function", + params=""" + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + """, + method="from_map", + ) + def _from_map(cls, func, iterable, *args, **kwargs): + return cls.io_cls.from_map(func, iterable, *args, **kwargs) + @classmethod @doc( _doc_io_method_template, diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index ec88dda60d9..267806945bf 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -15,8 +15,10 @@ import io +import numpy as np import pandas from pandas.io.common import get_handle, stringify_path +import ray from ray.data import from_pandas_refs from modin.config import RayTaskCustomResources @@ -68,6 +70,7 @@ class PandasOnRayIO(RayIO): """Factory providing methods for performing I/O operations using pandas as storage format on Ray as engine.""" frame_cls = PandasOnRayDataframe + frame_partition_cls = PandasOnRayDataframePartition query_compiler_cls = PandasQueryCompiler build_args = dict( frame_partition_cls=PandasOnRayDataframePartition, @@ -302,3 +305,47 @@ def to_ray(cls, modin_obj): """ parts = unwrap_partitions(modin_obj, axis=0) return from_pandas_refs(parts) + + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + partitions = np.array( + [ + [ + cls.frame_partition_cls( + deploy_map_func.remote(func, obj, *args, **kwargs) + ) + ] + for obj in iterable + ] + ) + return cls.query_compiler_cls(cls.frame_cls(partitions)) + + +@ray.remote +def deploy_map_func(func, obj, *args, **kwargs): + result = func(obj, *args, **kwargs) + if not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index c5bc772ad7f..2a6a84bfb9e 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -15,8 +15,10 @@ import io +import numpy as np import pandas from pandas.io.common import get_handle, stringify_path +import unidist from modin.core.execution.unidist.common import SignalActor, UnidistWrapper from modin.core.execution.unidist.generic.io import UnidistIO @@ -258,3 +260,47 @@ def func(df, **kw): # pragma: no cover UnidistWrapper.materialize( [part.list_of_blocks[0] for row in result for part in row] ) + + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + partitions = np.array( + [ + [ + cls.frame_partition_cls( + deploy_map_func.remote(func, obj, *args, **kwargs) + ) + ] + for obj in iterable + ] + ) + return cls.query_compiler_cls(cls.frame_cls(partitions)) + + +@unidist.remote +def deploy_map_func(func, obj, *args, **kwargs): + result = func(obj, *args, **kwargs) + if not isinstance(result, pandas.DataFrame): + result = pandas.DataFrame(result) + return result diff --git a/modin/core/io/io.py b/modin/core/io/io.py index 56f6f353de0..cb7647e2207 100644 --- a/modin/core/io/io.py +++ b/modin/core/io/io.py @@ -164,6 +164,34 @@ def from_dask(cls, dask_obj): "Modin DataFrame can only be converted to a Dask DataFrame if Modin uses a Dask engine." ) + @classmethod + def from_map(cls, func, iterable, *args, **kwargs): + """ + Create a Modin `query_compiler` from a map function. + + This method will construct a Modin `query_compiler` split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + BaseQueryCompiler + QueryCompiler containing data returned by map function. + """ + raise RuntimeError( + "Modin DataFrame can only be created if Modin uses Ray, Dask or MPI engine." + ) + @classmethod @_inherit_docstrings(pandas.read_parquet, apilink="pandas.read_parquet") @doc( diff --git a/modin/pandas/io.py b/modin/pandas/io.py index e29629ebd46..bbfdcce68f3 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -1109,6 +1109,36 @@ def from_dask(dask_obj) -> DataFrame: return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_dask(dask_obj)) +def from_map(func, iterable, *args, **kwargs) -> DataFrame: + """ + Create a Modin DataFrame from map function applied to an iterable object. + + This method will construct a Modin DataFrame split by row partitions. + The number of row partitions matches the number of elements in the iterable object. + + Parameters + ---------- + func : callable + Function to map across the iterable object. + iterable : Iterable + An iterable object. + *args : tuple + Positional arguments to pass in `func`. + **kwargs : dict + Keyword arguments to pass in `func`. + + Returns + ------- + DataFrame + A new Modin DataFrame object. + """ + from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher + + return ModinObjects.DataFrame( + query_compiler=FactoryDispatcher.from_map(func, iterable, *args, *kwargs) + ) + + def to_pandas(modin_obj: SupportsPublicToPandas) -> DataFrame | Series: """ Convert a Modin DataFrame/Series to a pandas DataFrame/Series. diff --git a/modin/tests/pandas/test_io.py b/modin/tests/pandas/test_io.py index 08e92d12d17..9dbafde7623 100644 --- a/modin/tests/pandas/test_io.py +++ b/modin/tests/pandas/test_io.py @@ -47,7 +47,7 @@ TestReadFromSqlServer, ) from modin.db_conn import ModinDatabaseConnection, UnsupportedDatabaseException -from modin.pandas.io import from_arrow, from_dask, from_ray, to_pandas +from modin.pandas.io import from_arrow, from_dask, from_ray, to_pandas, from_map from modin.tests.test_utils import warns_that_defaulting_to_pandas from .utils import ( @@ -3461,3 +3461,20 @@ def test_from_dask(): result_df = from_dask(dask_df) df_equals(result_df, modin_df) + + +@pytest.mark.skipif( + condition=Engine.get() not in ("Ray", "Dask", "Unidist"), + reason="Dask DataFrame can only be created if Modin uses Ray, Dask or MPI engine.", +) +@pytest.mark.filterwarnings(default_to_pandas_ignore_string) +def test_from_map(): + factor = 3 + data = [1] * factor + [2] * factor + [3] * factor + expected_df = pd.DataFrame(data, index=[0, 1, 2] * factor) + + def map_func(x, factor): + return [x] * factor + + result_df = from_map(map_func, [1, 2, 3], 3) + df_equals(result_df, expected_df)