-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Fix async_udf batch size behaviour #18819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix async_udf batch size behaviour #18819
Conversation
| ); | ||
| } | ||
|
|
||
| let datas = ColumnarValue::values_to_arrays(&result_batches)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ColumnarValue::values_to_arrays requires that each batch has the same length, but this is not necessarily the case for our batches here. If the ideal batch size doesn't evenly divide the total number of rows we'll get one batch at the end with a different length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with this assesment
Your solution works and looks good to me. I played around with it locally and I think using the concat kernel might be a little faster, for your consideration:
Here is what works well for me locally
diff --git a/datafusion/physical-expr/src/async_scalar_function.rs b/datafusion/physical-expr/src/async_scalar_function.rs
index 1a794f411b..afb01f7b5e 100644
--- a/datafusion/physical-expr/src/async_scalar_function.rs
+++ b/datafusion/physical-expr/src/async_scalar_function.rs
@@ -16,7 +16,8 @@
// under the License.
use crate::ScalarFunctionExpr;
-use arrow::array::{make_array, MutableArrayData, RecordBatch};
+use arrow::array::RecordBatch;
+use arrow::compute::concat;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
@@ -192,23 +193,22 @@ impl AsyncFuncExpr {
);
}
+ // Create the all the arrays into a single array
let datas = result_batches
- .iter()
+ .into_iter()
.map(|cv| match cv {
- ColumnarValue::Array(arr) => Ok(arr.to_data()),
- ColumnarValue::Scalar(scalar) => {
- Ok(scalar.to_array_of_size(1)?.to_data())
- }
+ ColumnarValue::Array(arr) => Ok(arr),
+ ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?),
})
.collect::<Result<Vec<_>>>()?;
- let total_len = datas.iter().map(|d| d.len()).sum();
- let mut mutable = MutableArrayData::new(datas.iter().collect(), false, total_len);
- datas.iter().enumerate().for_each(|(i, data)| {
- mutable.extend(i, 0, data.len());
- });
- let array_ref = make_array(mutable.freeze());
- Ok(ColumnarValue::Array(array_ref))
+ // Get references to the arrays as dyn Array to call concat
+ let dyn_arrays = datas
+ .iter()
+ .map(|arr| arr as &dyn arrow::array::Array)
+ .collect::<Vec<_>>();
+ let result_array = concat(&dyn_arrays)?;
+ Ok(ColumnarValue::Array(result_array))
}
}…ia10/datafusion into sb/fix-async-udf-batch-size-bug
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this; looks like some CI failures to address and I think the datafusion-testing submodule was updated accidentally too?
|
|
||
| /// Simulates calling an async external service | ||
| async fn call_external_service(arg1: ColumnarValue) -> Result<ColumnarValue> { | ||
| tokio::time::sleep(Duration::from_millis(10)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this sleep, not necessary for testing
…ia10/datafusion into sb/fix-async-udf-batch-size-bug
|
Thanks @Jefffrey , I've removed the unnecessary |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @shivbhatia10 -- this is a nice find and solution
| .into_scalar_udf(), | ||
| ); | ||
|
|
||
| let df = ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified this test fails without the code fix in this PR
---- user_defined::user_defined_async_scalar_functions::test_async_udf_with_non_modular_batch_size stdout ----
Error: Internal("Arguments has mixed length. Expected length: 2, found length: 1")
| ); | ||
| } | ||
|
|
||
| let datas = ColumnarValue::values_to_arrays(&result_batches)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with this assesment
Your solution works and looks good to me. I played around with it locally and I think using the concat kernel might be a little faster, for your consideration:
Here is what works well for me locally
diff --git a/datafusion/physical-expr/src/async_scalar_function.rs b/datafusion/physical-expr/src/async_scalar_function.rs
index 1a794f411b..afb01f7b5e 100644
--- a/datafusion/physical-expr/src/async_scalar_function.rs
+++ b/datafusion/physical-expr/src/async_scalar_function.rs
@@ -16,7 +16,8 @@
// under the License.
use crate::ScalarFunctionExpr;
-use arrow::array::{make_array, MutableArrayData, RecordBatch};
+use arrow::array::RecordBatch;
+use arrow::compute::concat;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
@@ -192,23 +193,22 @@ impl AsyncFuncExpr {
);
}
+ // Create the all the arrays into a single array
let datas = result_batches
- .iter()
+ .into_iter()
.map(|cv| match cv {
- ColumnarValue::Array(arr) => Ok(arr.to_data()),
- ColumnarValue::Scalar(scalar) => {
- Ok(scalar.to_array_of_size(1)?.to_data())
- }
+ ColumnarValue::Array(arr) => Ok(arr),
+ ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?),
})
.collect::<Result<Vec<_>>>()?;
- let total_len = datas.iter().map(|d| d.len()).sum();
- let mut mutable = MutableArrayData::new(datas.iter().collect(), false, total_len);
- datas.iter().enumerate().for_each(|(i, data)| {
- mutable.extend(i, 0, data.len());
- });
- let array_ref = make_array(mutable.freeze());
- Ok(ColumnarValue::Array(array_ref))
+ // Get references to the arrays as dyn Array to call concat
+ let dyn_arrays = datas
+ .iter()
+ .map(|arr| arr as &dyn arrow::array::Array)
+ .collect::<Vec<_>>();
+ let result_array = concat(&dyn_arrays)?;
+ Ok(ColumnarValue::Array(result_array))
}
}|
FYI @goldmedal in case you would like to review as well |
|
Looks great ot me -- thank you @shivbhatia10 🙏 |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#18822. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> This PR fixes the bug outlined in the issue, we shouldn't use `ColumnarValue::values_to_arrays` on the batches collected in `async_scalar_function.rs`. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Added a test to cover this behaviour and fixed the issue in the async scalar function physical expression. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, I added a new `user_defined_async_scalar_functions.rs` test file similar to `user_defined_scalar_functions.rs` which contains a test that covers this behaviour. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Shiv Bhatia <[email protected]>
Which issue does this PR close?
Rationale for this change
This PR fixes the bug outlined in the issue, we shouldn't use
ColumnarValue::values_to_arrayson the batches collected inasync_scalar_function.rs.What changes are included in this PR?
Added a test to cover this behaviour and fixed the issue in the async scalar function physical expression.
Are these changes tested?
Yes, I added a new
user_defined_async_scalar_functions.rstest file similar touser_defined_scalar_functions.rswhich contains a test that covers this behaviour.Are there any user-facing changes?
Yes