From 7ec827eaeb6f5bac1a09b5480721b2050f512211 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Sun, 2 Mar 2025 17:43:46 +0700 Subject: [PATCH 1/9] [PoC] Allow JIT compilation with an internal API --- pandas/core/bodo_patched.py | 57 +++++++++++++++++++++++++++++++++++++ pandas/core/frame.py | 35 ++++++++++++++++++++++- 2 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 pandas/core/bodo_patched.py diff --git a/pandas/core/bodo_patched.py b/pandas/core/bodo_patched.py new file mode 100644 index 0000000000000..fe9569fbe270f --- /dev/null +++ b/pandas/core/bodo_patched.py @@ -0,0 +1,57 @@ +""" +This file is here as an example, this code will live in the Numba and +Bodo libraries. +""" +from __future__ import annotations +from collections.abc import Callable +from typing import TYPE_CHECKING, Literal, Any + +import pandas as pd +import bodo + +if TYPE_CHECKING: + from pandas._typing import Axis, AggFuncType + +def __pandas_udf__( + jit_decorator: Callable, + obj: pd.Series | pd.DataFrame, + method: Literal["apply", "map"], + func: AggFuncType, + axis: Axis, + raw: bool, + result_type: Literal["expand", "reduce", "broadcast"] | None, + args: tuple, + kwargs: dict[str, Any], + by_row: Literal[False, "compat"], +): + + if isinstance(obj, pd.DataFrame) and method == "apply": + if result_type is not None: + raise NotImplementedError( + "engine='bodo' not supported when result_type is not None" + ) + + if raw: + raise NotImplementedError( + "engine='bodo' not supported when raw=True" + ) + if isinstance(func, str) and axis != 1: + raise NotImplementedError( + "engine='bodo' only supports axis=1 when func is the name of a " + "user-defined function" + ) + if args or kwargs: + raise NotImplementedError( + "engine='bodo' not supported when args or kwargs are specified" + ) + @jit_decorator + def jit_func(df, func, axis): + return df.apply(func, axis=axis) + + return jit_func(obj, func, axis) + else: + raise NotImplementedError( + f"engine='bodo' not supported for {obj.__class__.__name__}.{method}" + ) + +bodo.jit.__pandas_udf__ = __pandas_udf__ diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 4a86048bc20e2..6c3402edfb26f 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10256,6 +10256,7 @@ def apply( by_row: Literal[False, "compat"] = "compat", engine: Literal["python", "numba"] = "python", engine_kwargs: dict[str, bool] | None = None, + jit: Callable | None = None, **kwargs, ): """ @@ -10345,6 +10346,12 @@ def apply( Pass keyword arguments to the engine. This is currently only used by the numba engine, see the documentation for the engine argument for more information. + + jit : function, optional + Numba or Bodo decorator to JIT compile the execution. The main available + options are ``numba.jit``, ``numba.njit`` or ``bodo.jit``. Parameters can + be used in the same way as the decorators ``numba.jit(parallel=True)`` etc. + **kwargs Additional keyword arguments to pass as keywords arguments to `func`. @@ -10435,7 +10442,33 @@ def apply( 0 1 2 1 1 2 2 1 2 - """ + + Advanced users can speed up their code by using a Just-in-time (JIT) compiler + with ``apply``. The main JIT compilers available for pandas are Numba and Bodo. + In general, JIT compilation is only possible when the function passed to + ``apply`` has type stability (variables in the function do not change their + type during the execution). + + >>> import bodo + >>> df.apply(lambda x: x.A + x.B, axis=1, jit=bodo.jit(parallel=True)) + + Note that JIT compilation is only recommended for functions that take a + significant amount of time to run. Fast functions are unlikely to run faster + with JIT compilation. + """ + if hasattr(jit, "__pandas_udf__"): + return jit.__pandas_udf__( + jit_decorator=jit, + obj=self, + method="apply", + func=func, + axis=axis, + raw=raw, + result_type=result_type, + by_row=by_row, + args=args, + kwargs=kwargs) + from pandas.core.apply import frame_apply op = frame_apply( From bc2a17828324ee85b7d3b36c2d74c76350db8b8c Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Mon, 3 Mar 2025 00:07:53 +0700 Subject: [PATCH 2/9] Improving the documentation --- pandas/core/frame.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 6c3402edfb26f..78dc9822ed558 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10348,9 +10348,12 @@ def apply( see the documentation for the engine argument for more information. jit : function, optional - Numba or Bodo decorator to JIT compile the execution. The main available - options are ``numba.jit``, ``numba.njit`` or ``bodo.jit``. Parameters can - be used in the same way as the decorators ``numba.jit(parallel=True)`` etc. + Decorator to JIT compile the execution. The main available options are + ``numba.jit``, ``numba.njit`` or ``bodo.jit``. Parameters can be used in + the same way as the decorators, for example ``numba.jit(parallel=True)``. + + Refer to the the [1]_ and [2]_ documentation to learn about limitations + on what code can be JIT compiled. **kwargs Additional keyword arguments to pass as keywords arguments to @@ -10374,6 +10377,12 @@ def apply( behavior or errors and are not supported. See :ref:`gotchas.udf-mutation` for more details. + References + ---------- + .. [1] `Numba documentation + `_ + .. [2] `Bodo documentation + `/ Examples -------- >>> df = pd.DataFrame([[4, 9]] * 3, columns=["A", "B"]) @@ -10462,12 +10471,13 @@ def apply( obj=self, method="apply", func=func, + args=args, + kwargs=kwargs, axis=axis, raw=raw, result_type=result_type, by_row=by_row, - args=args, - kwargs=kwargs) + ) from pandas.core.apply import frame_apply @@ -10600,9 +10610,11 @@ def _append( index = Index( [other.name], - name=self.index.names - if isinstance(self.index, MultiIndex) - else self.index.name, + name=( + self.index.names + if isinstance(self.index, MultiIndex) + else self.index.name + ), ) row_df = other.to_frame().T # infer_objects is needed for From 8b420ccaef679aed700502d03925e2271d1cecf4 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Mon, 3 Mar 2025 00:50:08 +0700 Subject: [PATCH 3/9] CI --- pandas/core/bodo_patched.py | 29 ++++++++++++++++++++--------- pandas/core/frame.py | 1 + 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/pandas/core/bodo_patched.py b/pandas/core/bodo_patched.py index fe9569fbe270f..434497bac6ba5 100644 --- a/pandas/core/bodo_patched.py +++ b/pandas/core/bodo_patched.py @@ -2,15 +2,27 @@ This file is here as an example, this code will live in the Numba and Bodo libraries. """ + from __future__ import annotations -from collections.abc import Callable -from typing import TYPE_CHECKING, Literal, Any -import pandas as pd +from typing import ( + TYPE_CHECKING, + Any, + Literal, +) + import bodo +import pandas as pd + if TYPE_CHECKING: - from pandas._typing import Axis, AggFuncType + from collections.abc import Callable + + from pandas._typing import ( + AggFuncType, + Axis, + ) + def __pandas_udf__( jit_decorator: Callable, @@ -24,7 +36,6 @@ def __pandas_udf__( kwargs: dict[str, Any], by_row: Literal[False, "compat"], ): - if isinstance(obj, pd.DataFrame) and method == "apply": if result_type is not None: raise NotImplementedError( @@ -32,9 +43,7 @@ def __pandas_udf__( ) if raw: - raise NotImplementedError( - "engine='bodo' not supported when raw=True" - ) + raise NotImplementedError("engine='bodo' not supported when raw=True") if isinstance(func, str) and axis != 1: raise NotImplementedError( "engine='bodo' only supports axis=1 when func is the name of a " @@ -44,6 +53,7 @@ def __pandas_udf__( raise NotImplementedError( "engine='bodo' not supported when args or kwargs are specified" ) + @jit_decorator def jit_func(df, func, axis): return df.apply(func, axis=axis) @@ -51,7 +61,8 @@ def jit_func(df, func, axis): return jit_func(obj, func, axis) else: raise NotImplementedError( - f"engine='bodo' not supported for {obj.__class__.__name__}.{method}" + f"engine='bodo' not supported for {obj.__name__}.{method}" ) + bodo.jit.__pandas_udf__ = __pandas_udf__ diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 78dc9822ed558..6238d2dfc02dd 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10383,6 +10383,7 @@ def apply( `_ .. [2] `Bodo documentation `/ + Examples -------- >>> df = pd.DataFrame([[4, 9]] * 3, columns=["A", "B"]) From 6a9ee5aad0f52e630660a5b1f4da6b5e4a5f6a0b Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Sun, 9 Mar 2025 17:24:45 +0700 Subject: [PATCH 4/9] Better execution engine API --- pandas/api/__init__.py | 2 + pandas/api/executors/__init__.py | 7 +++ pandas/core/apply.py | 104 +++++++++++++++++++++++++++++++ pandas/core/bodo_patched.py | 63 ++++++++++--------- pandas/core/frame.py | 100 +++++++++++++++-------------- 5 files changed, 196 insertions(+), 80 deletions(-) create mode 100644 pandas/api/executors/__init__.py diff --git a/pandas/api/__init__.py b/pandas/api/__init__.py index 8f659e3cd14c8..a016e67a41360 100644 --- a/pandas/api/__init__.py +++ b/pandas/api/__init__.py @@ -1,6 +1,7 @@ """public toolkit API""" from pandas.api import ( + executors, extensions, indexers, interchange, @@ -9,6 +10,7 @@ ) __all__ = [ + "executors", "extensions", "indexers", "interchange", diff --git a/pandas/api/executors/__init__.py b/pandas/api/executors/__init__.py new file mode 100644 index 0000000000000..04c94ee688332 --- /dev/null +++ b/pandas/api/executors/__init__.py @@ -0,0 +1,7 @@ +""" +Public API for function executor engines to be used with ``map`` and ``apply``. +""" + +from pandas.core.apply import BaseExecutionEngine + +__all__ = ["BaseExecutionEngine"] diff --git a/pandas/core/apply.py b/pandas/core/apply.py index f36fc82fb1a11..ad9980119a299 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -74,6 +74,110 @@ ResType = dict[int, Any] +class BaseExecutionEngine(abc.ABC): + """ + Base class for execution engines for map and apply methods. + + An execution engine receives all the parameters of a call to + ``apply`` or ``map``, such as the data container, the function, + etc. and takes care of running the execution. + + Supporting different engines allows functions to be JIT compiled, + run in parallel, and others. Besides the default executor which + simply runs the code with the Python interpreter and pandas. + """ + + @staticmethod + @abc.abstractmethod + def map( + data: Series | DataFrame | np.ndarray, + func: AggFuncType, + args: tuple, + kwargs: dict[str, Any], + decorator: Callable | None, + skip_na: bool, + ): + """ + Executor method to run functions elementwise. + + In general, pandas uses ``map`` for running functions elementwise, + but ``Series.apply`` with the default ``by_row='compat'`` will also + call this executor function. + + Parameters + ---------- + data : Series, DataFrame or NumPy ndarray + The object to use for the data. Some methods implement a ``raw`` + parameter which will convert the original pandas object to a + NumPy array, which will then be passed here to the executor. + func : function or NumPy ufunc + The function to execute. + args : tuple + Positional arguments to be passed to ``func``. + kwargs : dict + Keyword arguments to be passed to ``func``. + decorator : function, optional + For JIT compilers and other engines that need to decorate the + function ``func``, this is the decorator to use. While the + executor may already know which is the decorator to use, this + is useful as for a single executor the user can specify for a + example ``numba.jit`` or ``numba.njit(nogil=True)``, and this + decorator parameter will contain the exact decortor from the + executor the user wants to use. + skip_na : bool + Whether the function should be called for missing values or not. + This is specified by the pandas user as ``map(na_action=None)`` + or ``map(na_action='ignore')``. + """ + + @staticmethod + @abc.abstractmethod + def apply( + data: Series | DataFrame | np.ndarray, + func: AggFuncType, + args: tuple, + kwargs: dict[str, Any], + decorator: Callable, + axis: Axis, + ): + """ + Executor method to run functions by an axis. + + While we can see ``map`` as executing the function for each cell + in a ``DataFrame`` (or ``Series``), ``apply`` will execute the + function for each column (or row). + + Parameters + ---------- + data : Series, DataFrame or NumPy ndarray + The object to use for the data. Some methods implement a ``raw`` + parameter which will convert the original pandas object to a + NumPy array, which will then be passed here to the executor. + func : function or NumPy ufunc + The function to execute. + args : tuple + Positional arguments to be passed to ``func``. + kwargs : dict + Keyword arguments to be passed to ``func``. + decorator : function, optional + For JIT compilers and other engines that need to decorate the + function ``func``, this is the decorator to use. While the + executor may already know which is the decorator to use, this + is useful as for a single executor the user can specify for a + example ``numba.jit`` or ``numba.njit(nogil=True)``, and this + decorator parameter will contain the exact decortor from the + executor the user wants to use. + axis : {0 or 'index', 1 or 'columns'} + 0 or 'index' should execute the function passing each column as + parameter. 1 or 'columns' should execute the function passing + each row as parameter. The default executor engine passes rows + as pandas ``Series``. Other executor engines should probably + expect functions to be implemented this way for compatibility. + But passing rows as other data structures is technically possible + as far as the function ``func`` is implemented accordingly. + """ + + def frame_apply( obj: DataFrame, func: AggFuncType, diff --git a/pandas/core/bodo_patched.py b/pandas/core/bodo_patched.py index 434497bac6ba5..a61037b750be2 100644 --- a/pandas/core/bodo_patched.py +++ b/pandas/core/bodo_patched.py @@ -8,10 +8,10 @@ from typing import ( TYPE_CHECKING, Any, - Literal, ) import bodo +import numpy as np import pandas as pd @@ -24,45 +24,50 @@ ) -def __pandas_udf__( - jit_decorator: Callable, - obj: pd.Series | pd.DataFrame, - method: Literal["apply", "map"], - func: AggFuncType, - axis: Axis, - raw: bool, - result_type: Literal["expand", "reduce", "broadcast"] | None, - args: tuple, - kwargs: dict[str, Any], - by_row: Literal[False, "compat"], -): - if isinstance(obj, pd.DataFrame) and method == "apply": - if result_type is not None: +class BodoExecutionEngine(pd.api.executors.BaseExecutionEngine): + @staticmethod + def map( + data: pd.Series | pd.DataFrame | np.ndarray, + func: AggFuncType, + args: tuple, + kwargs: dict[str, Any], + decorator: Callable, + skip_na: bool, + ): + raise NotImplementedError("engine='bodo' not supported for map") + + @staticmethod + def apply( + data: pd.Series | pd.DataFrame | np.ndarray, + func: AggFuncType, + args: tuple, + kwargs: dict[str, Any], + decorator: Callable, + axis: Axis, + ): + if isinstance(data, pd.Series): + raise NotImplementedError("engine='bodo' not supported for Series.apply") + + if isinstance(data, np.ndarray): + raise NotImplementedError("engine='bodo' not supported when raw=True") + + if args or kwargs: raise NotImplementedError( - "engine='bodo' not supported when result_type is not None" + "engine='bodo' not supported when args or kwargs are specified" ) - if raw: - raise NotImplementedError("engine='bodo' not supported when raw=True") if isinstance(func, str) and axis != 1: raise NotImplementedError( "engine='bodo' only supports axis=1 when func is the name of a " "user-defined function" ) - if args or kwargs: - raise NotImplementedError( - "engine='bodo' not supported when args or kwargs are specified" - ) - @jit_decorator def jit_func(df, func, axis): return df.apply(func, axis=axis) - return jit_func(obj, func, axis) - else: - raise NotImplementedError( - f"engine='bodo' not supported for {obj.__name__}.{method}" - ) + jit_func = decorator(jit_func) + + return jit_func(data, func, axis) -bodo.jit.__pandas_udf__ = __pandas_udf__ +bodo.jit.__pandas_udf__ = BodoExecutionEngine diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 6238d2dfc02dd..4942acc889094 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10254,9 +10254,8 @@ def apply( result_type: Literal["expand", "reduce", "broadcast"] | None = None, args=(), by_row: Literal[False, "compat"] = "compat", - engine: Literal["python", "numba"] = "python", + engine: Callable | None | Literal["python", "numba"] = None, engine_kwargs: dict[str, bool] | None = None, - jit: Callable | None = None, **kwargs, ): """ @@ -10317,28 +10316,24 @@ def apply( .. versionadded:: 2.1.0 - engine : {'python', 'numba'}, default 'python' - Choose between the python (default) engine or the numba engine in apply. + engine : decorator or {'python', 'numba'}, optional + Choose the execution engine to use. If not provided the function + will be executed by the regular Python interpreter. - The numba engine will attempt to JIT compile the passed function, - which may result in speedups for large DataFrames. - It also supports the following engine_kwargs : + Other options include JIT compilers such Numba and Bodo, which in some + cases can speed up the execution. To use an executor you can provide + the decorators ``numba.jit``, ``numba.njit`` or ``bodo.jit``. You can + also provide the decorator with parameters, like ``numba.jit(nogit=True)``. - - nopython (compile the function in nopython mode) - - nogil (release the GIL inside the JIT compiled function) - - parallel (try to apply the function in parallel over the DataFrame) + Not all functions can be executed with all execution engines. In general, + JIT compilers will require type stability in the function (no variable + should change data type during the execution). And not all pandas and + NumPy APIs are supported. Check the engine documentation [1]_ and [2]_ + for limitations. - Note: Due to limitations within numba/how pandas interfaces with numba, - you should only use this if raw=True - - Note: The numba compiler only supports a subset of - valid Python/numpy operations. + .. warning:: - Please read more about the `supported python features - `_ - and `supported numpy features - `_ - in numba to learn what you can or cannot use in the passed function. + String parameters will stop being supported in a future pandas version. .. versionadded:: 2.2.0 @@ -10347,14 +10342,6 @@ def apply( This is currently only used by the numba engine, see the documentation for the engine argument for more information. - jit : function, optional - Decorator to JIT compile the execution. The main available options are - ``numba.jit``, ``numba.njit`` or ``bodo.jit``. Parameters can be used in - the same way as the decorators, for example ``numba.jit(parallel=True)``. - - Refer to the the [1]_ and [2]_ documentation to learn about limitations - on what code can be JIT compiled. - **kwargs Additional keyword arguments to pass as keywords arguments to `func`. @@ -10460,41 +10447,52 @@ def apply( type during the execution). >>> import bodo - >>> df.apply(lambda x: x.A + x.B, axis=1, jit=bodo.jit(parallel=True)) + >>> df.apply(lambda x: x.A + x.B, axis=1, engine=bodo.jit(parallel=True)) Note that JIT compilation is only recommended for functions that take a significant amount of time to run. Fast functions are unlikely to run faster with JIT compilation. """ - if hasattr(jit, "__pandas_udf__"): - return jit.__pandas_udf__( - jit_decorator=jit, - obj=self, - method="apply", + if engine is None or isinstance(engine, str): + from pandas.core.apply import frame_apply + + if engine is None: + engine = "python" + + op = frame_apply( + self, func=func, - args=args, - kwargs=kwargs, axis=axis, raw=raw, result_type=result_type, by_row=by_row, + engine=engine, + engine_kwargs=engine_kwargs, + args=args, + kwargs=kwargs, ) + return op.apply().__finalize__(self, method="apply") + elif hasattr(engine, "__pandas_udf__"): + if result_type is not None: + raise NotImplementedError( + f"{result_type=} only implemented for the default engine" + ) - from pandas.core.apply import frame_apply - - op = frame_apply( - self, - func=func, - axis=axis, - raw=raw, - result_type=result_type, - by_row=by_row, - engine=engine, - engine_kwargs=engine_kwargs, - args=args, - kwargs=kwargs, - ) - return op.apply().__finalize__(self, method="apply") + data = self + if raw: + # This will upcast the whole DataFrame to the same type, + # and likely result in an object 2D array. + # We should probably pass a list of 1D arrays instead, at + # lest for ``axis=0`` + data = data.values + return engine.__pandas_udf__.apply( + data=data, + func=func, + args=args, + kwargs=kwargs, + decorator=engine, + axis=axis, + ) def map( self, func: PythonFuncType, na_action: Literal["ignore"] | None = None, **kwargs From 444de67307e9b7c28a44da901f094562f34bd06e Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Sun, 9 Mar 2025 17:59:09 +0700 Subject: [PATCH 5/9] Fixing test --- pandas/tests/api/test_api.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pandas/tests/api/test_api.py b/pandas/tests/api/test_api.py index 4a05259a98087..2ba90948be399 100644 --- a/pandas/tests/api/test_api.py +++ b/pandas/tests/api/test_api.py @@ -6,6 +6,7 @@ from pandas import api import pandas._testing as tm from pandas.api import ( + executors as api_executors, extensions as api_extensions, indexers as api_indexers, interchange as api_interchange, @@ -243,6 +244,7 @@ def test_depr(self): class TestApi(Base): allowed_api_dirs = [ + "executors", "types", "extensions", "indexers", @@ -338,6 +340,7 @@ class TestApi(Base): "ExtensionArray", "ExtensionScalarOpsMixin", ] + allowed_api_executors = ["BaseExecutionEngine"] def test_api(self): self.check(api, self.allowed_api_dirs) @@ -357,6 +360,9 @@ def test_api_indexers(self): def test_api_extensions(self): self.check(api_extensions, self.allowed_api_extensions) + def test_api_executors(self): + self.check(api_executors, self.allowed_api_executors) + class TestErrors(Base): def test_errors(self): From 7e1e855358a318e337c9675485b0f3826e7413fc Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Mon, 10 Mar 2025 14:00:42 +0700 Subject: [PATCH 6/9] Added tests, fixed some bugs and added a release note --- doc/source/whatsnew/v3.0.0.rst | 1 + pandas/core/apply.py | 4 +- pandas/core/frame.py | 40 ++++++++- pandas/tests/apply/test_frame_apply.py | 117 +++++++++++++++++++++---- 4 files changed, 144 insertions(+), 18 deletions(-) diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index c967b97cb2ef6..2675653a34b75 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -65,6 +65,7 @@ Other enhancements - :class:`Rolling` and :class:`Expanding` now support aggregations ``first`` and ``last`` (:issue:`33155`) - :func:`read_parquet` accepts ``to_pandas_kwargs`` which are forwarded to :meth:`pyarrow.Table.to_pandas` which enables passing additional keywords to customize the conversion to pandas, such as ``maps_as_pydicts`` to read the Parquet map data type as python dictionaries (:issue:`56842`) - :meth:`.DataFrameGroupBy.transform`, :meth:`.SeriesGroupBy.transform`, :meth:`.DataFrameGroupBy.agg`, :meth:`.SeriesGroupBy.agg`, :meth:`.SeriesGroupBy.apply`, :meth:`.DataFrameGroupBy.apply` now support ``kurt`` (:issue:`40139`) +- :meth:`DataFrame.apply` supports using third-party execution engines like the Bodo.ai JIT compiler (:issue:`60668`) - :meth:`DataFrameGroupBy.transform`, :meth:`SeriesGroupBy.transform`, :meth:`DataFrameGroupBy.agg`, :meth:`SeriesGroupBy.agg`, :meth:`RollingGroupby.apply`, :meth:`ExpandingGroupby.apply`, :meth:`Rolling.apply`, :meth:`Expanding.apply`, :meth:`DataFrame.apply` with ``engine="numba"`` now supports positional arguments passed as kwargs (:issue:`58995`) - :meth:`Rolling.agg`, :meth:`Expanding.agg` and :meth:`ExponentialMovingWindow.agg` now accept :class:`NamedAgg` aggregations through ``**kwargs`` (:issue:`28333`) - :meth:`Series.map` can now accept kwargs to pass on to func (:issue:`59814`) diff --git a/pandas/core/apply.py b/pandas/core/apply.py index ad9980119a299..fab1750243d85 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -120,7 +120,7 @@ def map( For JIT compilers and other engines that need to decorate the function ``func``, this is the decorator to use. While the executor may already know which is the decorator to use, this - is useful as for a single executor the user can specify for a + is useful as for a single executor the user can specify for example ``numba.jit`` or ``numba.njit(nogil=True)``, and this decorator parameter will contain the exact decortor from the executor the user wants to use. @@ -163,7 +163,7 @@ def apply( For JIT compilers and other engines that need to decorate the function ``func``, this is the decorator to use. While the executor may already know which is the decorator to use, this - is useful as for a single executor the user can specify for a + is useful as for a single executor the user can specify for example ``numba.jit`` or ``numba.njit(nogil=True)``, and this decorator parameter will contain the exact decortor from the executor the user wants to use. diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 4942acc889094..352ced38225e9 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10459,6 +10459,9 @@ def apply( if engine is None: engine = "python" + if engine not in ["python", "numba"]: + raise ValueError(f"Unknown engine '{engine}'") + op = frame_apply( self, func=func, @@ -10478,6 +10481,31 @@ def apply( f"{result_type=} only implemented for the default engine" ) + agg_axis = self._get_agg_axis(axis) + + # one axis is empty + if not all(self.shape): + try: + if axis == 0: + r = func(Series([], dtype=np.float64), *args, **kwargs) + else: + r = func( + Series(index=self.columns, dtype=np.float64), + *args, + **kwargs, + ) + except Exception: + pass + else: + if not isinstance(r, Series): + if len(agg_axis): + r = func(Series([], dtype=np.float64), *args, **kwargs) + else: + r = np.nan + + return self._constructor_sliced(r, index=agg_axis) + return self.copy() + data = self if raw: # This will upcast the whole DataFrame to the same type, @@ -10485,7 +10513,7 @@ def apply( # We should probably pass a list of 1D arrays instead, at # lest for ``axis=0`` data = data.values - return engine.__pandas_udf__.apply( + result = engine.__pandas_udf__.apply( data=data, func=func, args=args, @@ -10493,6 +10521,16 @@ def apply( decorator=engine, axis=axis, ) + if raw: + if result.ndim == 2: + return self._constructor( + result, index=self.index, columns=self.columns + ) + else: + return self._constructor_sliced(result, index=agg_axis) + return result + else: + raise ValueError(f"Unknown engine {engine}") def map( self, func: PythonFuncType, na_action: Literal["ignore"] | None = None, **kwargs diff --git a/pandas/tests/apply/test_frame_apply.py b/pandas/tests/apply/test_frame_apply.py index b9e407adc3051..2d47cd851ad10 100644 --- a/pandas/tests/apply/test_frame_apply.py +++ b/pandas/tests/apply/test_frame_apply.py @@ -17,10 +17,63 @@ date_range, ) import pandas._testing as tm +from pandas.api.executors import BaseExecutionEngine from pandas.tests.frame.common import zip_frames from pandas.util.version import Version +class MockExecutionEngine(BaseExecutionEngine): + """ + Execution Engine to test if the execution engine interface receives and + uses all parameters provided by the user. + + Making this engine work as the default Python engine by calling it, no extra + functionality is implemented here. + + When testing, this will be called when this engine is provided, and then the + same pandas.map and pandas.apply function will be called, but without engine, + executing the default behavior from the python engine. + """ + + def map(data, func, args, kwargs, decorator, skip_na): + kwargs_to_pass = kwargs if isinstance(data, DataFrame) else {} + return data.map( + func, action_na="ignore" if skip_na else False, **kwargs_to_pass + ) + + def apply(data, func, args, kwargs, decorator, axis): + if isinstance(data, Series): + return data.apply(func, convert_dtype=True, args=args, by_row=False) + elif isinstance(data, DataFrame): + return data.apply( + func, + axis=axis, + raw=False, + result_type=None, + args=args, + by_row="compat", + **kwargs, + ) + else: + assert isinstance(data, np.ndarray) + + def wrap_function(func): + # https://github.com/numpy/numpy/issues/8352 + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + if isinstance(result, str): + result = np.array(result, dtype=object) + return result + + return wrapper + + return np.apply_along_axis(wrap_function(func), axis, data, *args, **kwargs) + + +class MockEngineDecorator: + __pandas_udf__ = MockExecutionEngine + + @pytest.fixture def int_frame_const_col(): """ @@ -35,7 +88,13 @@ def int_frame_const_col(): return df -@pytest.fixture(params=["python", pytest.param("numba", marks=pytest.mark.single_cpu)]) +@pytest.fixture( + params=[ + "python", + pytest.param("numba", marks=pytest.mark.single_cpu), + MockEngineDecorator, + ] +) def engine(request): if request.param == "numba": pytest.importorskip("numba") @@ -1079,12 +1138,21 @@ def test_result_type_broadcast(int_frame_const_col, request, engine): mark = pytest.mark.xfail(reason="numba engine doesn't support list return") request.node.add_marker(mark) df = int_frame_const_col - # broadcast result - result = df.apply( - lambda x: [1, 2, 3], axis=1, result_type="broadcast", engine=engine - ) - expected = df.copy() - tm.assert_frame_equal(result, expected) + if engine is MockEngineDecorator: + with pytest.raises( + NotImplementedError, + match="result_type='broadcast' only implemented for the default engine", + ): + df.apply( + lambda x: [1, 2, 3], axis=1, result_type="broadcast", engine=engine + ) + else: + # broadcast result + result = df.apply( + lambda x: [1, 2, 3], axis=1, result_type="broadcast", engine=engine + ) + expected = df.copy() + tm.assert_frame_equal(result, expected) def test_result_type_broadcast_series_func(int_frame_const_col, engine, request): @@ -1097,14 +1165,27 @@ def test_result_type_broadcast_series_func(int_frame_const_col, engine, request) request.node.add_marker(mark) df = int_frame_const_col columns = ["other", "col", "names"] - result = df.apply( - lambda x: Series([1, 2, 3], index=columns), - axis=1, - result_type="broadcast", - engine=engine, - ) - expected = df.copy() - tm.assert_frame_equal(result, expected) + + if engine is MockEngineDecorator: + with pytest.raises( + NotImplementedError, + match="result_type='broadcast' only implemented for the default engine", + ): + df.apply( + lambda x: Series([1, 2, 3], index=columns), + axis=1, + result_type="broadcast", + engine=engine, + ) + else: + result = df.apply( + lambda x: Series([1, 2, 3], index=columns), + axis=1, + result_type="broadcast", + engine=engine, + ) + expected = df.copy() + tm.assert_frame_equal(result, expected) def test_result_type_series_result(int_frame_const_col, engine, request): @@ -1791,3 +1872,9 @@ def test_agg_dist_like_and_nonunique_columns(): result = df.agg({"A": "count"}) expected = df["A"].count() tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize("engine_name", ["unknown", 25]) +def test_wrong_engine(engine_name): + with pytest.raises(ValueError, match="Unknown engine "): + DataFrame().apply(lambda x: x, engine=engine_name) From 58fb30da9dd81e90721c1e360628f0b127c48fa5 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Mon, 10 Mar 2025 14:01:55 +0700 Subject: [PATCH 7/9] Removed temporary bodo decorator example --- pandas/core/bodo_patched.py | 73 ------------------------------------- 1 file changed, 73 deletions(-) delete mode 100644 pandas/core/bodo_patched.py diff --git a/pandas/core/bodo_patched.py b/pandas/core/bodo_patched.py deleted file mode 100644 index a61037b750be2..0000000000000 --- a/pandas/core/bodo_patched.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -This file is here as an example, this code will live in the Numba and -Bodo libraries. -""" - -from __future__ import annotations - -from typing import ( - TYPE_CHECKING, - Any, -) - -import bodo -import numpy as np - -import pandas as pd - -if TYPE_CHECKING: - from collections.abc import Callable - - from pandas._typing import ( - AggFuncType, - Axis, - ) - - -class BodoExecutionEngine(pd.api.executors.BaseExecutionEngine): - @staticmethod - def map( - data: pd.Series | pd.DataFrame | np.ndarray, - func: AggFuncType, - args: tuple, - kwargs: dict[str, Any], - decorator: Callable, - skip_na: bool, - ): - raise NotImplementedError("engine='bodo' not supported for map") - - @staticmethod - def apply( - data: pd.Series | pd.DataFrame | np.ndarray, - func: AggFuncType, - args: tuple, - kwargs: dict[str, Any], - decorator: Callable, - axis: Axis, - ): - if isinstance(data, pd.Series): - raise NotImplementedError("engine='bodo' not supported for Series.apply") - - if isinstance(data, np.ndarray): - raise NotImplementedError("engine='bodo' not supported when raw=True") - - if args or kwargs: - raise NotImplementedError( - "engine='bodo' not supported when args or kwargs are specified" - ) - - if isinstance(func, str) and axis != 1: - raise NotImplementedError( - "engine='bodo' only supports axis=1 when func is the name of a " - "user-defined function" - ) - - def jit_func(df, func, axis): - return df.apply(func, axis=axis) - - jit_func = decorator(jit_func) - - return jit_func(data, func, axis) - - -bodo.jit.__pandas_udf__ = BodoExecutionEngine From c239fc9ded7a4d3e9aa3cece5ae096478b786a2e Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Mon, 10 Mar 2025 14:55:16 +0700 Subject: [PATCH 8/9] make mypy happy --- pandas/core/frame.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 352ced38225e9..2fd2f01853c13 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10481,10 +10481,11 @@ def apply( f"{result_type=} only implemented for the default engine" ) - agg_axis = self._get_agg_axis(axis) + agg_axis = self._get_agg_axis(self._get_axis_number(axis)) # one axis is empty if not all(self.shape): + func = cast(Callable, func) try: if axis == 0: r = func(Series([], dtype=np.float64), *args, **kwargs) @@ -10506,13 +10507,13 @@ def apply( return self._constructor_sliced(r, index=agg_axis) return self.copy() - data = self + data: DataFrame | np.ndarray = self if raw: # This will upcast the whole DataFrame to the same type, # and likely result in an object 2D array. # We should probably pass a list of 1D arrays instead, at # lest for ``axis=0`` - data = data.values + data = self.values result = engine.__pandas_udf__.apply( data=data, func=func, From 95671525f46130bde221fb518506e25acd59c387 Mon Sep 17 00:00:00 2001 From: Marc Garcia Date: Mon, 10 Mar 2025 23:43:58 +0700 Subject: [PATCH 9/9] Typos --- pandas/core/apply.py | 4 ++-- pandas/core/frame.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pandas/core/apply.py b/pandas/core/apply.py index fab1750243d85..da6124307e3f1 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -122,7 +122,7 @@ def map( executor may already know which is the decorator to use, this is useful as for a single executor the user can specify for example ``numba.jit`` or ``numba.njit(nogil=True)``, and this - decorator parameter will contain the exact decortor from the + decorator parameter will contain the exact decorator from the executor the user wants to use. skip_na : bool Whether the function should be called for missing values or not. @@ -165,7 +165,7 @@ def apply( executor may already know which is the decorator to use, this is useful as for a single executor the user can specify for example ``numba.jit`` or ``numba.njit(nogil=True)``, and this - decorator parameter will contain the exact decortor from the + decorator parameter will contain the exact decorator from the executor the user wants to use. axis : {0 or 'index', 1 or 'columns'} 0 or 'index' should execute the function passing each column as diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 2fd2f01853c13..083fb9358c462 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10447,7 +10447,7 @@ def apply( type during the execution). >>> import bodo - >>> df.apply(lambda x: x.A + x.B, axis=1, engine=bodo.jit(parallel=True)) + >>> df.apply(lambda x: x.A + x.B, axis=1, engine=bodo.jit) Note that JIT compilation is only recommended for functions that take a significant amount of time to run. Fast functions are unlikely to run faster