fix(python/sedonadb): Ensure that Python UDFs executing with >1 batch do not cause deadlock#558
Conversation
There was a problem hiding this comment.
Pull request overview
This PR resolves a deadlock issue that occurred when executing Python UDFs with multiple record batches. The fix modifies the batch collection strategy to use a single block_on() call instead of repeatedly acquiring the GIL for each batch, while preserving streaming output for cases where it's needed.
Changes:
- Modified batch collection to gather all batches in a single async operation for
.to_pandas()and.execute() - Updated signal checking interval from 1 second to 2 seconds and added
py.run(cr"pass", None, None)calls - Added regression tests to verify multi-batch operations complete without hanging
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| python/sedonadb/src/dataframe.rs | Introduces to_batches() method and Batches struct for single-pass collection; refactors execute() to collect in one async operation |
| python/sedonadb/python/sedonadb/dataframe.py | Updates to_arrow_table() to use new to_batches() method |
| python/sedonadb/src/runtime.rs | Increases signal check interval and adds py.run(cr"pass") calls before signal checks |
| python/sedonadb/tests/test_udf.py | Adds regression tests for multi-batch UDF execution and collection; corrects expected exception types |
| python/sedonadb/tests/test_dataframe.py | Updates error message assertion to match new wording |
| python/sedonadb/tests/functions/test_wkb.py | Adds GEOS version check for EWKB tests |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
zhangfengcdt
left a comment
There was a problem hiding this comment.
minor comments, otherwise, lgtm
python/sedonadb/src/dataframe.rs
Outdated
| #[pymethods] | ||
| impl Batches { | ||
| fn __len__(&self) -> usize { | ||
| self.count |
There was a problem hiding this comment.
Is this the number of rows or number of batches? Looks like len is expected to be the later.
There was a problem hiding this comment.
That's a great point...I removed it since it's not used for anything yet.
| return pa.table(self, schema=pa.schema(schema)) | ||
| # Collects all batches into an object that exposes __arrow_c_stream__() | ||
| batches = self._impl.to_batches(schema) | ||
| return pa.table(batches) |
There was a problem hiding this comment.
I assume we do not need to pass schema to pa.table anymore.
There was a problem hiding this comment.
Yes, passing it via to_batches() ensures it makes it to Rust!
I had intended to post a reprex to GeoPandas regarding crashes when threading but was caught by this issue, where the way we collected things into Python caused a lot of attempts to acquire the GIL (or perhaps a lock related to tokio) which interfered with UDF execution.
Briefly, before this PR, the Python bindings always collected via a special
RecordBatchReaderthat calledblock_on()+allow_threads(), waiting for the next batch in the outputSendableRecordBatchStream. To ensure cancellation requests worked, we aquired the GIL every 1 second to check for signals. After this PR, we use oneblock_on()+allow_threads()and collect all the batches at once when we know this is what needs to happen anyway. Streaming output still works it just is now only invoked when required.This constant
block_on()+ GIL acquisition caused a deadlock when Python UDFs were also trying to acquire the GIL (or perhaps a tokio lock of some kind).The workaround here is not a full solution but covers the most common case, where a user wants to collect the entire result (e.g.,
.to_pandas(). This is simpler to orchestrate.