-
Notifications
You must be signed in to change notification settings - Fork 7k
[Data] Add Polars batch format support to map_batches #58896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Add 'polars' to VALID_BATCH_FORMATS - Implement to_polars() methods in ArrowBlockAccessor and PandasBlockAccessor - Add batch_to_block_from_polars() for converting Polars DataFrames to blocks - Update _validate_batch_output() to accept Polars DataFrames - Add comprehensive validation for LazyFrame, Series, and invalid types - Update documentation with Polars examples and performance notes - Add tests for Polars format in map_batches, iter_batches, and take_batch - Include URLs to Polars documentation in docstrings - Document that Polars format always creates copies (no zero-copy support) Signed-off-by: soffer-anyscale <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for Polars DataFrames as a batch format in Ray Data's map_batches() API, enabling users to leverage Polars' performance-optimized DataFrame library alongside existing formats (numpy, pandas, pyarrow).
Key Changes:
- Added
"polars"as a valid batch format option in Ray Data APIs - Implemented conversion methods (
to_polars()) in block accessors to transform blocks to Polars DataFrames - Added validation logic for Polars-specific types (LazyFrame, Series) with helpful error messages
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
python/ray/data/tests/test_map_batches.py |
Added tests for Polars batch format in map_batches() including timestamp handling |
python/ray/data/tests/test_consumption.py |
Added assertions to verify Polars DataFrame type in batch format tests |
python/ray/data/tests/block_batching/test_util.py |
Extended batch format test parameterization to include Polars |
python/ray/data/dataset.py |
Updated docstrings to document Polars format option and memory behavior |
python/ray/data/block.py |
Added Polars support to batch format constants, types, and conversion logic |
python/ray/data/_internal/planner/plan_udf_map_op.py |
Added validation for Polars LazyFrame and DataFrame in UDF outputs |
python/ray/data/_internal/pandas_block.py |
Implemented to_polars() method for converting Pandas blocks to Polars DataFrames |
python/ray/data/_internal/arrow_block.py |
Implemented to_polars() method for converting Arrow blocks to Polars DataFrames |
doc/source/data/transforming-data.rst |
Added Polars examples and updated documentation on batch formats |
doc/source/data/iterating-over-data.rst |
Added Polars example for iterating over batches |
doc/source/data/inspecting-data.rst |
Added Polars example for take_batch() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Polars works better with contiguous arrays | ||
| from ray.data._internal.arrow_ops import transform_pyarrow | ||
|
|
||
| combined_table = transform_pyarrow.combine_chunks(self._table, copy=False) |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states 'This conversion creates a copy of the data', but the code uses copy=False in combine_chunks(). This is inconsistent and may confuse developers. Either update the comment to clarify that combine_chunks doesn't copy but the subsequent Polars conversion does, or explain the complete copy behavior more accurately.
| """Create a block from a Polars DataFrame. | ||
| Converts a Polars DataFrame to an Arrow Table or Pandas DataFrame block. | ||
| See https://docs.pola.rs/ for Polars documentation. |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected URL typo from 'pola.rs' to 'pola.rs' - should be 'docs.pola.rs' or 'pola-rs.github.io/polars'.
|
|
||
| # Convert to Polars DataFrame using from_pandas() | ||
| # See https://docs.pola.rs/api/dataframe/#polars.DataFrame.from_pandas | ||
| return pl.from_pandas(self._table) |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The docstring states 'This conversion creates a copy of the data', but doesn't explain why zero-copy is impossible. Consider adding a brief explanation (e.g., 'due to differences in memory layout between Pandas and Polars') to help developers understand the limitation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds support for batch_format="polars" to ray.data.map_batches(), which is a great enhancement for users of the Polars library. The changes are comprehensive, covering documentation, new conversion methods, validation logic, and tests. My review identified a few areas for improvement, including some inefficient or redundant code, and a bug in a deprecated function that was updated. Overall, this is a solid contribution that will improve the Ray Data ecosystem.
| """ | ||
| # Check that batch_format | ||
| accepted_batch_formats = ["pandas", "pyarrow", "numpy"] | ||
| accepted_batch_formats = ["pandas", "pyarrow", "numpy", "polars"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding "polars" to accepted_batch_formats for the deprecated add_column method will cause a runtime error. The add_column implementation does not handle the "polars" batch format and will fall through to the else block, which assumes a NumPy batch format, leading to an AssertionError.
Since add_column is deprecated, it's probably best to not add Polars support to it. Please remove "polars" from accepted_batch_formats to avoid this bug.
| accepted_batch_formats = ["pandas", "pyarrow", "numpy", "polars"] | |
| accepted_batch_formats = ["pandas", "pyarrow", "numpy"] |
| if len(self._table.columns) != len(set(self._table.columns)): | ||
| duplicates = [ | ||
| col for col in self._table.columns | ||
| if list(self._table.columns).count(col) > 1 | ||
| ] | ||
| raise ValueError( | ||
| f"Pandas DataFrame has duplicate column names: {duplicates}. " | ||
| "Rename duplicate columns before converting to Polars." | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check for duplicate columns is a bit inefficient. A more idiomatic and performant way to check for and find duplicates in a pandas Index is to use self._table.columns.is_unique and self._table.columns.duplicated().
if not self._table.columns.is_unique:
duplicates = self._table.columns[self._table.columns.duplicated()].unique().tolist()
raise ValueError(
f"Pandas DataFrame has duplicate column names: {duplicates}. "
"Rename duplicate columns before converting to Polars."
)| if isinstance(out_batch, pl.LazyFrame): | ||
| raise ValueError( | ||
| "Generator yielded a Polars LazyFrame. " | ||
| "All yielded frames must be materialized. " | ||
| "Call .collect() on LazyFrames before yielding. " | ||
| "See https://docs.pola.rs/api/lazyframe/#collect for details." | ||
| ) | ||
| elif isinstance(out_batch, pl.DataFrame): | ||
| # DataFrame is always eager, but verify it's valid | ||
| try: | ||
| # Access schema to ensure DataFrame is valid | ||
| _ = out_batch.schema | ||
| except Exception as e: | ||
| raise ValueError( | ||
| f"Polars DataFrame is in invalid state: {e}. " | ||
| "Ensure the DataFrame is properly constructed." | ||
| ) from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check for pl.LazyFrame here is redundant. The _validate_batch_output function, called on line 538, already checks if the output is a pl.LazyFrame and raises an appropriate ValueError. You can remove the isinstance(out_batch, pl.LazyFrame) block and convert the elif to an if to avoid duplicated logic.
| if isinstance(out_batch, pl.LazyFrame): | |
| raise ValueError( | |
| "Generator yielded a Polars LazyFrame. " | |
| "All yielded frames must be materialized. " | |
| "Call .collect() on LazyFrames before yielding. " | |
| "See https://docs.pola.rs/api/lazyframe/#collect for details." | |
| ) | |
| elif isinstance(out_batch, pl.DataFrame): | |
| # DataFrame is always eager, but verify it's valid | |
| try: | |
| # Access schema to ensure DataFrame is valid | |
| _ = out_batch.schema | |
| except Exception as e: | |
| raise ValueError( | |
| f"Polars DataFrame is in invalid state: {e}. " | |
| "Ensure the DataFrame is properly constructed." | |
| ) from e | |
| if isinstance(out_batch, pl.DataFrame): | |
| # DataFrame is always eager, but verify it's valid | |
| try: | |
| # Access schema to ensure DataFrame is valid | |
| _ = out_batch.schema | |
| except Exception as e: | |
| raise ValueError( | |
| f"Polars DataFrame is in invalid state: {e}. " | |
| "Ensure the DataFrame is properly constructed." | |
| ) from e |
| if len(batch.columns) != len(set(batch.columns)): | ||
| duplicates = [ | ||
| col for col in batch.columns if batch.columns.count(col) > 1 | ||
| ] | ||
| raise ValueError( | ||
| f"Polars DataFrame has duplicate column names: {duplicates}. " | ||
| "Rename duplicate columns before converting." | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description
Adds support for
batch_format="polars"toray.data.map_batches(), allowing users to work with Polars DataFrames as batch format alongside existing numpy, pandas, and pyarrow formats.Polars is a fast DataFrame library written in Rust with a Python API that provides significant performance improvements over pandas for many operations. As Polars adoption grows in the open-source data processing ecosystem, users increasingly want to use it in their Ray Data pipelines. This change enables seamless integration of Polars into Ray Data workflows, allowing users to leverage Polars' optimized query engine and expressive API when processing batches.
Related issues
N/A
Additional information
Why Polars Support?
Changes
Usage Example
Performance Notes
Polars format conversions always create copies of data (zero-copy is not possible). This may result in 2-3x memory usage compared to Arrow format due to input and output conversions. For large datasets, consider using
batch_format="pyarrow"for better memory efficiency.