Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions doc/source/data/inspecting-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,31 @@ of the returned batch, set ``batch_format``.
0 5.1 3.5 ... 0.2 0
1 4.9 3.0 ... 0.2 0

.. tab-item:: Polars

.. testcode::

import ray
import polars as pl

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

batch = ds.take_batch(batch_size=2, batch_format="polars")
print(batch)

.. testoutput::
:options: +MOCK

shape: (2, 5)
┌─────────────────┬────────────────┬─────────────────┬────────────────┬────────┐
│ sepal length... │ sepal width... │ petal length... │ petal width... │ target │
│ --- │ --- │ --- │ --- │ --- │
│ f64 │ f64 │ f64 │ f64 │ i64 │
╞═════════════════╪════════════════╪═════════════════╪════════════════╪════════╡
│ 5.1 │ 3.5 │ 1.4 │ 0.2 │ 0 │
│ 4.9 │ 3.0 │ 1.4 │ 0.2 │ 0 │
└─────────────────┴────────────────┴─────────────────┴────────────────┴────────┘

For more information on working with batches, see
:ref:`Transforming batches <transforming_batches>` and
:ref:`Iterating over batches <iterating-over-batches>`.
Expand Down
25 changes: 25 additions & 0 deletions doc/source/data/iterating-over-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,31 @@ formats by calling one of the following methods:
sepal length (cm) sepal width (cm) petal length (cm) petal width (cm) target
0 5.1 3.5 1.4 0.2 0
1 4.9 3.0 1.4 0.2 0

.. tab-item:: Polars

.. testcode::

import ray
import polars as pl

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for batch in ds.iter_batches(batch_size=2, batch_format="polars"):
print(batch)

.. testoutput::
:options: +MOCK

shape: (2, 5)
┌─────────────────┬────────────────┬─────────────────┬────────────────┬────────┐
│ sepal length... │ sepal width... │ petal length... │ petal width... │ target │
│ --- │ --- │ --- │ --- │ --- │
│ f64 │ f64 │ f64 │ f64 │ i64 │
╞═════════════════╪════════════════╪═════════════════╪════════════════╪════════╡
│ 5.1 │ 3.5 │ 1.4 │ 0.2 │ 0 │
│ 4.9 │ 3.0 │ 1.4 │ 0.2 │ 0 │
└─────────────────┴────────────────┴─────────────────┴────────────────┴────────┘
...
sepal length (cm) sepal width (cm) petal length (cm) petal width (cm) target
0 6.2 3.4 5.4 2.3 2
Expand Down
19 changes: 17 additions & 2 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ batches is more performant than transforming rows.
Configuring batch format
~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. By
Ray Data represents batches as dicts of NumPy ndarrays, pandas DataFrames, or Polars DataFrames. By
default, Ray Data represents batches as dicts of NumPy ndarrays. To configure the batch type,
specify ``batch_format`` in :meth:`~ray.data.Dataset.map_batches`. You can return either
format from your function, but ``batch_format`` should match the input of your function.
Expand Down Expand Up @@ -181,9 +181,24 @@ format from your function, but ``batch_format`` should match the input of your f
.map_batches(drop_nas, batch_format="pandas")
)

.. tab-item:: Polars

.. testcode::

import polars as pl
import ray

def drop_nas(batch: pl.DataFrame) -> pl.DataFrame:
return batch.drop_nulls()

ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="polars")
)

The user defined function you pass to :meth:`~ray.data.Dataset.map_batches` is more flexible. Because you can represent batches
in multiple ways (see :ref:`Configuring batch format <configure_batch_format>`), the function should be of type
``Callable[DataBatch, DataBatch]``, where ``DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]]``. In
``Callable[DataBatch, DataBatch]``, where ``DataBatch = Union[pd.DataFrame, pl.DataFrame, Dict[str, np.ndarray]]``. In
other words, your function should take as input and output a batch of data which you can represent as a
pandas DataFrame or a dictionary with string keys and NumPy ndarrays values. For example, your function might look like:

Expand Down
34 changes: 34 additions & 0 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

if TYPE_CHECKING:
import pandas
import polars

from ray.data._internal.planner.exchange.sort_task_spec import SortKey

Expand Down Expand Up @@ -305,6 +306,39 @@ def to_numpy(
def to_arrow(self) -> "pyarrow.Table":
return self._table

def to_polars(self) -> "polars.DataFrame":
"""Convert this Arrow block into a Polars DataFrame.

Converts a PyArrow Table to a Polars DataFrame. See
https://docs.pola.rs/ for Polars documentation.

Note: This conversion creates a copy of the data. Zero-copy conversion
from Arrow to Polars is not possible.

Returns:
A Polars DataFrame containing the data.

Raises:
ImportError: If Polars is not installed.
"""
try:
import polars as pl
except ImportError:
raise ImportError(
"Polars is not installed. Install with `pip install polars`. "
"See https://docs.pola.rs/ for more information."
)

# Combine chunks for better performance and compatibility
# Polars works better with contiguous arrays
from ray.data._internal.arrow_ops import transform_pyarrow

combined_table = transform_pyarrow.combine_chunks(self._table, copy=False)
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states 'This conversion creates a copy of the data', but the code uses copy=False in combine_chunks(). This is inconsistent and may confuse developers. Either update the comment to clarify that combine_chunks doesn't copy but the subsequent Polars conversion does, or explain the complete copy behavior more accurately.

Copilot uses AI. Check for mistakes.

# Convert to Polars DataFrame using from_arrow()
# See https://docs.pola.rs/api/dataframe/#polars.DataFrame.from_arrow
return pl.from_arrow(combined_table)

def num_rows(self) -> int:
# Arrow may represent an empty table via an N > 0 row, 0-column table, e.g. when
# slicing an empty table, so we return 0 if num_columns == 0.
Expand Down
50 changes: 50 additions & 0 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

if TYPE_CHECKING:
import pandas
import polars
import pyarrow

from ray.data._internal.planner.exchange.sort_task_spec import SortKey
Expand Down Expand Up @@ -479,6 +480,55 @@ def to_arrow(self) -> "pyarrow.Table":

return arrow_table

def to_polars(self) -> "polars.DataFrame":
"""Convert this Pandas block into a Polars DataFrame.

Converts a Pandas DataFrame to a Polars DataFrame. See
https://docs.pola.rs/ for Polars documentation.

Note: This conversion creates a copy of the data. Zero-copy conversion
from Pandas to Polars is not possible.

Returns:
A Polars DataFrame containing the data.

Raises:
ImportError: If Polars is not installed.
ValueError: If the Pandas DataFrame has duplicate column names or
invalid column names.
"""
try:
import polars as pl
except ImportError:
raise ImportError(
"Polars is not installed. Install with `pip install polars`. "
"See https://docs.pola.rs/ for more information."
)

# Validate column names before conversion
# Polars doesn't allow duplicate column names
if len(self._table.columns) != len(set(self._table.columns)):
duplicates = [
col for col in self._table.columns
if list(self._table.columns).count(col) > 1
]
raise ValueError(
f"Pandas DataFrame has duplicate column names: {duplicates}. "
"Rename duplicate columns before converting to Polars."
)
Comment on lines +510 to +518
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check for duplicate columns is a bit inefficient. A more idiomatic and performant way to check for and find duplicates in a pandas Index is to use self._table.columns.is_unique and self._table.columns.duplicated().

        if not self._table.columns.is_unique:
            duplicates = self._table.columns[self._table.columns.duplicated()].unique().tolist()
            raise ValueError(
                f"Pandas DataFrame has duplicate column names: {duplicates}. "
                "Rename duplicate columns before converting to Polars."
            )


# Validate column names are strings
for col in self._table.columns:
if not isinstance(col, str):
raise ValueError(
f"Pandas DataFrame has non-string column name: {col} (type: {type(col)}). "
"All column names must be strings for Polars conversion."
)

# Convert to Polars DataFrame using from_pandas()
# See https://docs.pola.rs/api/dataframe/#polars.DataFrame.from_pandas
return pl.from_pandas(self._table)
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The docstring states 'This conversion creates a copy of the data', but doesn't explain why zero-copy is impossible. Consider adding a brief explanation (e.g., 'due to differences in memory layout between Pandas and Polars') to help developers understand the limitation.

Copilot uses AI. Check for mistakes.

def num_rows(self) -> int:
return self._table.shape[0]

Expand Down
50 changes: 49 additions & 1 deletion python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,29 @@ def _try_wrap_udf_exception(e: Exception, item: Any = None):


def _validate_batch_output(batch: Block) -> None:
"""Validate that a batch output from a UDF is a supported type.

See https://docs.pola.rs/ for Polars documentation.
"""
# Check for Polars DataFrame
# Polars is an optional dependency, so we check for it here
try:
import polars as pl

if isinstance(batch, pl.DataFrame):
# Polars DataFrames are valid - DataFrame is always eager
# LazyFrame is a separate class, so if we get here it's already a DataFrame
return
elif isinstance(batch, pl.LazyFrame):
raise ValueError(
"The `fn` you passed to `map_batches` returned a Polars LazyFrame. "
"LazyFrames must be collected before returning. Use `.collect()` to "
"materialize the LazyFrame into a DataFrame. "
"See https://docs.pola.rs/api/lazyframe/#collect for details."
)
except ImportError:
pass

if not isinstance(
batch,
(
Expand All @@ -434,7 +457,7 @@ def _validate_batch_output(batch: Block) -> None:
raise ValueError(
"The `fn` you passed to `map_batches` returned a value of type "
f"{type(batch)}. This isn't allowed -- `map_batches` expects "
"`fn` to return a `pandas.DataFrame`, `pyarrow.Table`, "
"`fn` to return a `pandas.DataFrame`, `polars.DataFrame`, `pyarrow.Table`, "
"`numpy.ndarray`, `list`, or `dict[str, numpy.ndarray]`."
)

Expand Down Expand Up @@ -510,8 +533,33 @@ def transform_fn(
else:
raise e from None
else:
# Validate all yielded batches (for generators, validate each item)
for out_batch in res:
_validate_batch_output(out_batch)
# Additional validation: ensure Polars DataFrames are eager
# See https://docs.pola.rs/ for Polars documentation
try:
import polars as pl

if isinstance(out_batch, pl.LazyFrame):
raise ValueError(
"Generator yielded a Polars LazyFrame. "
"All yielded frames must be materialized. "
"Call .collect() on LazyFrames before yielding. "
"See https://docs.pola.rs/api/lazyframe/#collect for details."
)
elif isinstance(out_batch, pl.DataFrame):
# DataFrame is always eager, but verify it's valid
try:
# Access schema to ensure DataFrame is valid
_ = out_batch.schema
except Exception as e:
raise ValueError(
f"Polars DataFrame is in invalid state: {e}. "
"Ensure the DataFrame is properly constructed."
) from e
Comment on lines +544 to +560
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check for pl.LazyFrame here is redundant. The _validate_batch_output function, called on line 538, already checks if the output is a pl.LazyFrame and raises an appropriate ValueError. You can remove the isinstance(out_batch, pl.LazyFrame) block and convert the elif to an if to avoid duplicated logic.

Suggested change
if isinstance(out_batch, pl.LazyFrame):
raise ValueError(
"Generator yielded a Polars LazyFrame. "
"All yielded frames must be materialized. "
"Call .collect() on LazyFrames before yielding. "
"See https://docs.pola.rs/api/lazyframe/#collect for details."
)
elif isinstance(out_batch, pl.DataFrame):
# DataFrame is always eager, but verify it's valid
try:
# Access schema to ensure DataFrame is valid
_ = out_batch.schema
except Exception as e:
raise ValueError(
f"Polars DataFrame is in invalid state: {e}. "
"Ensure the DataFrame is properly constructed."
) from e
if isinstance(out_batch, pl.DataFrame):
# DataFrame is always eager, but verify it's valid
try:
# Access schema to ensure DataFrame is valid
_ = out_batch.schema
except Exception as e:
raise ValueError(
f"Polars DataFrame is in invalid state: {e}. "
"Ensure the DataFrame is properly constructed."
) from e

except ImportError:
pass
yield out_batch

return transform_fn
Expand Down
Loading