Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Allow JIT compilation with an internal API #61032

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions pandas/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""public toolkit API"""

from pandas.api import (
executors,
extensions,
indexers,
interchange,
Expand All @@ -9,6 +10,7 @@
)

__all__ = [
"executors",
"extensions",
"indexers",
"interchange",
Expand Down
7 changes: 7 additions & 0 deletions pandas/api/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Public API for function executor engines to be used with ``map`` and ``apply``.
"""

from pandas.core.apply import BaseExecutionEngine

__all__ = ["BaseExecutionEngine"]
104 changes: 104 additions & 0 deletions pandas/core/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,110 @@
ResType = dict[int, Any]


class BaseExecutionEngine(abc.ABC):
"""
Base class for execution engines for map and apply methods.

An execution engine receives all the parameters of a call to
``apply`` or ``map``, such as the data container, the function,
etc. and takes care of running the execution.

Supporting different engines allows functions to be JIT compiled,
run in parallel, and others. Besides the default executor which
simply runs the code with the Python interpreter and pandas.
"""

@staticmethod
@abc.abstractmethod
def map(
data: Series | DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable | None,
skip_na: bool,
):
"""
Executor method to run functions elementwise.

In general, pandas uses ``map`` for running functions elementwise,
but ``Series.apply`` with the default ``by_row='compat'`` will also
call this executor function.

Parameters
----------
data : Series, DataFrame or NumPy ndarray
The object to use for the data. Some methods implement a ``raw``
parameter which will convert the original pandas object to a
NumPy array, which will then be passed here to the executor.
func : function or NumPy ufunc
The function to execute.
args : tuple
Positional arguments to be passed to ``func``.
kwargs : dict
Keyword arguments to be passed to ``func``.
decorator : function, optional
For JIT compilers and other engines that need to decorate the
function ``func``, this is the decorator to use. While the
executor may already know which is the decorator to use, this
is useful as for a single executor the user can specify for a
example ``numba.jit`` or ``numba.njit(nogil=True)``, and this
decorator parameter will contain the exact decortor from the
executor the user wants to use.
skip_na : bool
Whether the function should be called for missing values or not.
This is specified by the pandas user as ``map(na_action=None)``
or ``map(na_action='ignore')``.
"""

@staticmethod
@abc.abstractmethod
def apply(
data: Series | DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable,
axis: Axis,
):
"""
Executor method to run functions by an axis.

While we can see ``map`` as executing the function for each cell
in a ``DataFrame`` (or ``Series``), ``apply`` will execute the
function for each column (or row).

Parameters
----------
data : Series, DataFrame or NumPy ndarray
The object to use for the data. Some methods implement a ``raw``
parameter which will convert the original pandas object to a
NumPy array, which will then be passed here to the executor.
func : function or NumPy ufunc
The function to execute.
args : tuple
Positional arguments to be passed to ``func``.
kwargs : dict
Keyword arguments to be passed to ``func``.
decorator : function, optional
For JIT compilers and other engines that need to decorate the
function ``func``, this is the decorator to use. While the
executor may already know which is the decorator to use, this
is useful as for a single executor the user can specify for a
example ``numba.jit`` or ``numba.njit(nogil=True)``, and this
decorator parameter will contain the exact decortor from the
executor the user wants to use.
axis : {0 or 'index', 1 or 'columns'}
0 or 'index' should execute the function passing each column as
parameter. 1 or 'columns' should execute the function passing
each row as parameter. The default executor engine passes rows
as pandas ``Series``. Other executor engines should probably
expect functions to be implemented this way for compatibility.
But passing rows as other data structures is technically possible
as far as the function ``func`` is implemented accordingly.
"""


def frame_apply(
obj: DataFrame,
func: AggFuncType,
Expand Down
73 changes: 73 additions & 0 deletions pandas/core/bodo_patched.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
This file is here as an example, this code will live in the Numba and
Bodo libraries.
"""

from __future__ import annotations

from typing import (
TYPE_CHECKING,
Any,
)

import bodo
import numpy as np

import pandas as pd

if TYPE_CHECKING:
from collections.abc import Callable

from pandas._typing import (
AggFuncType,
Axis,
)


class BodoExecutionEngine(pd.api.executors.BaseExecutionEngine):
@staticmethod
def map(
data: pd.Series | pd.DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable,
skip_na: bool,
):
raise NotImplementedError("engine='bodo' not supported for map")

@staticmethod
def apply(
data: pd.Series | pd.DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable,
axis: Axis,
):
if isinstance(data, pd.Series):
raise NotImplementedError("engine='bodo' not supported for Series.apply")

if isinstance(data, np.ndarray):
raise NotImplementedError("engine='bodo' not supported when raw=True")

if args or kwargs:
raise NotImplementedError(
"engine='bodo' not supported when args or kwargs are specified"
)

if isinstance(func, str) and axis != 1:
raise NotImplementedError(
"engine='bodo' only supports axis=1 when func is the name of a "
"user-defined function"
)

def jit_func(df, func, axis):
return df.apply(func, axis=axis)

jit_func = decorator(jit_func)

return jit_func(data, func, axis)


bodo.jit.__pandas_udf__ = BodoExecutionEngine
116 changes: 80 additions & 36 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -10254,7 +10254,7 @@ def apply(
result_type: Literal["expand", "reduce", "broadcast"] | None = None,
args=(),
by_row: Literal[False, "compat"] = "compat",
engine: Literal["python", "numba"] = "python",
engine: Callable | None | Literal["python", "numba"] = None,
engine_kwargs: dict[str, bool] | None = None,
**kwargs,
):
Expand Down Expand Up @@ -10316,35 +10316,32 @@ def apply(

.. versionadded:: 2.1.0

engine : {'python', 'numba'}, default 'python'
Choose between the python (default) engine or the numba engine in apply.
engine : decorator or {'python', 'numba'}, optional
Choose the execution engine to use. If not provided the function
will be executed by the regular Python interpreter.

The numba engine will attempt to JIT compile the passed function,
which may result in speedups for large DataFrames.
It also supports the following engine_kwargs :
Other options include JIT compilers such Numba and Bodo, which in some
cases can speed up the execution. To use an executor you can provide
the decorators ``numba.jit``, ``numba.njit`` or ``bodo.jit``. You can
also provide the decorator with parameters, like ``numba.jit(nogit=True)``.

- nopython (compile the function in nopython mode)
- nogil (release the GIL inside the JIT compiled function)
- parallel (try to apply the function in parallel over the DataFrame)
Not all functions can be executed with all execution engines. In general,
JIT compilers will require type stability in the function (no variable
should change data type during the execution). And not all pandas and
NumPy APIs are supported. Check the engine documentation [1]_ and [2]_
for limitations.

Note: Due to limitations within numba/how pandas interfaces with numba,
you should only use this if raw=True

Note: The numba compiler only supports a subset of
valid Python/numpy operations.
.. warning::

Please read more about the `supported python features
<https://numba.pydata.org/numba-doc/dev/reference/pysupported.html>`_
and `supported numpy features
<https://numba.pydata.org/numba-doc/dev/reference/numpysupported.html>`_
in numba to learn what you can or cannot use in the passed function.
String parameters will stop being supported in a future pandas version.

.. versionadded:: 2.2.0

engine_kwargs : dict
Pass keyword arguments to the engine.
This is currently only used by the numba engine,
see the documentation for the engine argument for more information.

**kwargs
Additional keyword arguments to pass as keywords arguments to
`func`.
Expand All @@ -10367,6 +10364,13 @@ def apply(
behavior or errors and are not supported. See :ref:`gotchas.udf-mutation`
for more details.

References
----------
.. [1] `Numba documentation
<https://numba.readthedocs.io/en/stable/index.html>`_
.. [2] `Bodo documentation
<https://docs.bodo.ai/latest/>`/

Examples
--------
>>> df = pd.DataFrame([[4, 9]] * 3, columns=["A", "B"])
Expand Down Expand Up @@ -10435,22 +10439,60 @@ def apply(
0 1 2
1 1 2
2 1 2

Advanced users can speed up their code by using a Just-in-time (JIT) compiler
with ``apply``. The main JIT compilers available for pandas are Numba and Bodo.
In general, JIT compilation is only possible when the function passed to
``apply`` has type stability (variables in the function do not change their
type during the execution).

>>> import bodo
>>> df.apply(lambda x: x.A + x.B, axis=1, engine=bodo.jit(parallel=True))

Note that JIT compilation is only recommended for functions that take a
significant amount of time to run. Fast functions are unlikely to run faster
with JIT compilation.
"""
from pandas.core.apply import frame_apply
if engine is None or isinstance(engine, str):
from pandas.core.apply import frame_apply

op = frame_apply(
self,
func=func,
axis=axis,
raw=raw,
result_type=result_type,
by_row=by_row,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
return op.apply().__finalize__(self, method="apply")
if engine is None:
engine = "python"

op = frame_apply(
self,
func=func,
axis=axis,
raw=raw,
result_type=result_type,
by_row=by_row,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
return op.apply().__finalize__(self, method="apply")
elif hasattr(engine, "__pandas_udf__"):
if result_type is not None:
raise NotImplementedError(
f"{result_type=} only implemented for the default engine"
)

data = self
if raw:
# This will upcast the whole DataFrame to the same type,
# and likely result in an object 2D array.
# We should probably pass a list of 1D arrays instead, at
# lest for ``axis=0``
data = data.values
return engine.__pandas_udf__.apply(
data=data,
func=func,
args=args,
kwargs=kwargs,
decorator=engine,
axis=axis,
)

def map(
self, func: PythonFuncType, na_action: Literal["ignore"] | None = None, **kwargs
Expand Down Expand Up @@ -10567,9 +10609,11 @@ def _append(

index = Index(
[other.name],
name=self.index.names
if isinstance(self.index, MultiIndex)
else self.index.name,
name=(
self.index.names
if isinstance(self.index, MultiIndex)
else self.index.name
),
)
row_df = other.to_frame().T
# infer_objects is needed for
Expand Down
Loading
Loading