Skip to content

Commit

Permalink
Add from_map
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
YarShev committed Apr 24, 2024
1 parent bbb136d commit 550e254
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 1 deletion.
45 changes: 45 additions & 0 deletions modin/core/execution/dask/implementations/pandas_on_dask/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Module houses class that implements ``BaseIO`` using Dask as an execution engine."""

import numpy as np
import pandas
from distributed.client import default_client

from modin.core.execution.dask.common import DaskWrapper
Expand Down Expand Up @@ -188,3 +190,46 @@ def df_to_series(df):
partitions = [client.submit(df_to_series, part) for part in partitions]

return from_delayed(partitions)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.
This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
partitions = np.array(
[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


def deploy_map_func(func, obj, *args, **kwargs):
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
5 changes: 5 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def from_ray(cls, ray_obj):
def from_dask(cls, dask_obj):
return cls.get_factory()._from_dask(dask_obj)

@classmethod
@_inherit_docstrings(factories.BaseFactory._from_map)
def from_map(cls, func, iterable, *args, **kwargs):
return cls.get_factory()._from_map(func, iterable, *args, **kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._read_parquet)
def read_parquet(cls, **kwargs):
Expand Down
19 changes: 19 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,25 @@ def _from_ray(cls, ray_obj):
def _from_dask(cls, dask_obj):
return cls.io_cls.from_dask(dask_obj)

@classmethod
@doc(
_doc_io_method_template,
source="a map function",
params="""
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
""",
method="from_map",
)
def _from_map(cls, func, iterable, *args, **kwargs):
return cls.io_cls.from_map(func, iterable, *args, **kwargs)

@classmethod
@doc(
_doc_io_method_template,
Expand Down
47 changes: 47 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import io

import numpy as np
import pandas
from pandas.io.common import get_handle, stringify_path
import ray
from ray.data import from_pandas_refs

from modin.config import RayTaskCustomResources
Expand Down Expand Up @@ -68,6 +70,7 @@ class PandasOnRayIO(RayIO):
"""Factory providing methods for performing I/O operations using pandas as storage format on Ray as engine."""

frame_cls = PandasOnRayDataframe
frame_partition_cls = PandasOnRayDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_partition_cls=PandasOnRayDataframePartition,
Expand Down Expand Up @@ -302,3 +305,47 @@ def to_ray(cls, modin_obj):
"""
parts = unwrap_partitions(modin_obj, axis=0)
return from_pandas_refs(parts)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.
This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
partitions = np.array(
[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


@ray.remote
def deploy_map_func(func, obj, *args, **kwargs):
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import io

import numpy as np
import pandas
from pandas.io.common import get_handle, stringify_path
import unidist

from modin.core.execution.unidist.common import SignalActor, UnidistWrapper
from modin.core.execution.unidist.generic.io import UnidistIO
Expand Down Expand Up @@ -258,3 +260,47 @@ def func(df, **kw): # pragma: no cover
UnidistWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.
This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
partitions = np.array(
[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))


@unidist.remote
def deploy_map_func(func, obj, *args, **kwargs):
result = func(obj, *args, **kwargs)
if not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result
28 changes: 28 additions & 0 deletions modin/core/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,34 @@ def from_dask(cls, dask_obj):
"Modin DataFrame can only be converted to a Dask DataFrame if Modin uses a Dask engine."
)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.
This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
raise RuntimeError(
"Modin DataFrame can only be created if Modin uses Ray, Dask or MPI engine."
)

@classmethod
@_inherit_docstrings(pandas.read_parquet, apilink="pandas.read_parquet")
@doc(
Expand Down
30 changes: 30 additions & 0 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,36 @@ def from_dask(dask_obj) -> DataFrame:
return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_dask(dask_obj))


def from_map(func, iterable, *args, **kwargs) -> DataFrame:
"""
Create a Modin DataFrame from map function applied to an iterable object.
This method will construct a Modin DataFrame split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
DataFrame
A new Modin DataFrame object.
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

return ModinObjects.DataFrame(
query_compiler=FactoryDispatcher.from_map(func, iterable, *args, *kwargs)
)


def to_pandas(modin_obj: SupportsPublicToPandas) -> DataFrame | Series:
"""
Convert a Modin DataFrame/Series to a pandas DataFrame/Series.
Expand Down
19 changes: 18 additions & 1 deletion modin/tests/pandas/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
TestReadFromSqlServer,
)
from modin.db_conn import ModinDatabaseConnection, UnsupportedDatabaseException
from modin.pandas.io import from_arrow, from_dask, from_ray, to_pandas
from modin.pandas.io import from_arrow, from_dask, from_ray, to_pandas, from_map
from modin.tests.test_utils import warns_that_defaulting_to_pandas

from .utils import (
Expand Down Expand Up @@ -3461,3 +3461,20 @@ def test_from_dask():

result_df = from_dask(dask_df)
df_equals(result_df, modin_df)


@pytest.mark.skipif(
condition=Engine.get() not in ("Ray", "Dask", "Unidist"),
reason="Dask DataFrame can only be created if Modin uses Ray, Dask or MPI engine.",
)
@pytest.mark.filterwarnings(default_to_pandas_ignore_string)
def test_from_map():
factor = 3
data = [1] * factor + [2] * factor + [3] * factor
expected_df = pd.DataFrame(data, index=[0, 1, 2] * factor)

def map_func(x, factor):
return [x] * factor

result_df = from_map(map_func, [1, 2, 3], 3)
df_equals(result_df, expected_df)

0 comments on commit 550e254

Please sign in to comment.