-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Add Spark-compatible xxhash64 and murmur3 hash functions
#19627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| SELECT xxhash64('hello'); | ||
| ---- | ||
| -4367754540140381902 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> spark.sql("SELECT xxhash64('hello')").show()
+--------------------+
| xxhash64(hello)|
+--------------------+
|-4367754540140381902|
+--------------------+
| SELECT xxhash64(1); | ||
| ---- | ||
| -7001672635703045582 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> spark.sql("SELECT xxhash64(cast(1 as long))").show()
+---------------------------+
|xxhash64(CAST(1 AS BIGINT))|
+---------------------------+
| -7001672635703045582|
+---------------------------+
| SELECT hash('hello'); | ||
| ---- | ||
| -1008564952 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> spark.sql("SELECT hash('hello')").show()
+-----------+
|hash(hello)|
+-----------+
|-1008564952|
+-----------+
|
@shehabgamin fyi |
| } | ||
| } | ||
|
|
||
| fn hash_column_murmur3(col: &ArrayRef, hashes: &mut [u32]) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like support for DataType::Dictionary may be missing. In the Sail codebase, we copied the logic from Comet, where the Dictionary type is handled. However, I’m not sure whether Comet’s implementation has changed since we copied it.
In Sail, the relevant logic can be found here:
- https://github.com/lakehq/sail/blob/540fb8350ab676dfd0c302fafb4176b11fb0ee84/crates/sail-function/src/scalar/hash/spark_murmur3_hash.rs#L68
- https://github.com/lakehq/sail/blob/540fb8350ab676dfd0c302fafb4176b11fb0ee84/crates/sail-function/src/scalar/hash/utils.rs#L12
Based on the attribution comments in those files, the corresponding Comet sources appear to come from the following commit:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except the dictionary type, it seems that the FixedSizeBinary is not handled as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll address this in the next few days. Moving to draft for now.
|
@andygrove Thanks for pinging and mentioning. The PR is in good shape and I'm glad it's contributed back to the upstream. |
| // Determine number of rows from the first array argument | ||
| let num_rows = args | ||
| .args | ||
| .iter() | ||
| .find_map(|arg| match arg { | ||
| ColumnarValue::Array(array) => Some(array.len()), | ||
| ColumnarValue::Scalar(_) => None, | ||
| }) | ||
| .unwrap_or(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Determine number of rows from the first array argument | |
| let num_rows = args | |
| .args | |
| .iter() | |
| .find_map(|arg| match arg { | |
| ColumnarValue::Array(array) => Some(array.len()), | |
| ColumnarValue::Scalar(_) => None, | |
| }) | |
| .unwrap_or(1); | |
| let num_rows = args.number_rows; |
| // Convert all arguments to arrays | ||
| let arrays: Vec<ArrayRef> = args | ||
| .args | ||
| .iter() | ||
| .map(|arg| match arg { | ||
| ColumnarValue::Array(array) => Arc::clone(array), | ||
| ColumnarValue::Scalar(scalar) => scalar | ||
| .to_array_of_size(num_rows) | ||
| .expect("Failed to convert scalar to array"), | ||
| }) | ||
| .collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Convert all arguments to arrays | |
| let arrays: Vec<ArrayRef> = args | |
| .args | |
| .iter() | |
| .map(|arg| match arg { | |
| ColumnarValue::Array(array) => Arc::clone(array), | |
| ColumnarValue::Scalar(scalar) => scalar | |
| .to_array_of_size(num_rows) | |
| .expect("Failed to convert scalar to array"), | |
| }) | |
| .collect(); | |
| let arrays = ColumnarValue::values_to_arrays(&args.args)?; |
| #[inline] | ||
| fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 { | ||
| XxHash64::oneshot(seed, data.as_ref()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| #[inline] | |
| fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: u64) -> u64 { | |
| XxHash64::oneshot(seed, data.as_ref()) | |
| } | |
| #[inline] | |
| fn spark_compatible_xxhash64<T: AsRef<[u8]>>(data: T, seed: i64) -> i64 { | |
| XxHash64::oneshot(seed as u64, data.as_ref()) as i64 | |
| } |
I wonder if it's worth doing this to make it easier to compute the resulting i64 array, without needing to convert the u64 hash vec to an i64 vec (unless compiler optimizes this away anyway 🤔 )
| for i in (0..data.len()).step_by(4) { | ||
| let ints = data.as_ptr().add(i) as *const i32; | ||
| let mut half_word = ints.read_unaligned(); | ||
| if cfg!(target_endian = "big") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember a previous PR for this same functionality, I'll copy a previous comment: #17093 (comment)
I don't think big endian should be considered; here's a comment from arrow-rs about how that doesn't target big endian: apache/arrow-rs#6917 (comment)
So for simplicity we could just remove this cfg?
| // SAFETY: all operations are guaranteed to be safe | ||
| unsafe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit curious about these unsafe blocks; a safety comment like all operations are guaranteed to be safe isn't exactly reassuring 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original comments were not copied over in full for some reason. I updated this.
| #[inline] | ||
| fn mix_k1(mut k1: i32) -> i32 { | ||
| k1 = k1.mul_wrapping(0xcc9e2d51u32 as i32); | ||
| k1 = k1.rotate_left(15); | ||
| k1.mul_wrapping(0x1b873593u32 as i32) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to provide a link to where this source code was extracted from? I'm assuming it was ported from some other implementation?
Which issue does this PR close?
Rationale for this change
Donate some hash functions from Comet so that other projects can benefit from them.
The functions were initially implemented in Comet by @advancedxy
What changes are included in this PR?
I used Claude Code to copy the code from Comet and add slt tests. I manually verified that the expected values match Spark for a few cases just to be sure that the code is correct.
Are these changes tested?
Yes, tests are part of the PR.
Are there any user-facing changes?
No