-
Couldn't load subscription status.
- Fork 1k
Fix: ViewType gc on huge batch would produce bad output #8694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0482a20
7034580
464db03
142284c
3f7c50d
2110c46
aa511d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -512,18 +512,71 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |
| }; | ||
| } | ||
|
|
||
| // 3) Allocate exactly capacity for all non-inline data | ||
| let mut data_buf = Vec::with_capacity(total_large); | ||
| struct GcCopyGroup { | ||
| total_buffer_bytes: usize, | ||
| total_len: usize, | ||
| } | ||
|
|
||
| let mut groups = vec![]; | ||
| let one_group = [GcCopyGroup { | ||
| total_buffer_bytes: total_large, | ||
| total_len: len, | ||
| }]; | ||
| let gc_copy_groups = if total_large > i32::MAX as usize { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we can add such as cold flag, since it's rare for the case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds ok to me ( But it's not in a loop so I think the improvement will not too much) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, got it. |
||
| // Slow-path: need to split into multiple copy groups | ||
| let mut current_length = 0; | ||
| let mut current_elements = 0; | ||
|
|
||
| for view in self.views() { | ||
| let len = *view as u32; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part is so slow, but it's right, I can make it faster(by handling the numbers via grouping or batching) if required There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure how you would make this much faster - I think the code needs to find the locations to split in any event There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if the buffer size is greater than i32::MAX, it's possible that a single buffer is much smaller than i32::MAX, so this can find batch-by-batch, rather than just adding small buffer one-by-one? |
||
| if len > MAX_INLINE_VIEW_LEN { | ||
| if current_length + len > i32::MAX as u32 { | ||
| // Start a new group | ||
| groups.push(GcCopyGroup { | ||
| total_buffer_bytes: current_length as usize, | ||
| total_len: current_elements, | ||
| }); | ||
| current_length = 0; | ||
| current_elements = 0; | ||
| } | ||
| current_length += len; | ||
| current_elements += 1; | ||
| } | ||
| } | ||
| if current_elements != 0 { | ||
| groups.push(GcCopyGroup { | ||
| total_buffer_bytes: current_length as usize, | ||
| total_len: current_elements, | ||
| }); | ||
| } | ||
| &groups | ||
| } else { | ||
| one_group.as_slice() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've add a let one_group = [GcCopyGroup {
total_buffer_bytes: total_large,
total_len: len,
}]; |
||
| }; | ||
| debug_assert!(gc_copy_groups.len() <= i32::MAX as usize); | ||
|
|
||
| // 3) Copy the buffers group by group | ||
| let mut views_buf = Vec::with_capacity(len); | ||
| let mut data_blocks = Vec::with_capacity(gc_copy_groups.len()); | ||
|
|
||
| let mut current_view_idx = 0; | ||
|
|
||
| for (group_idx, gc_copy_group) in gc_copy_groups.iter().enumerate() { | ||
| let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes); | ||
|
|
||
| // 4) Iterate over views and process each inline/non-inline view | ||
| let views_buf: Vec<u128> = (0..len) | ||
| .map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) }) | ||
| .collect(); | ||
| // Directly push views to avoid intermediate Vec allocation | ||
| for view_idx in current_view_idx..current_view_idx + gc_copy_group.total_len { | ||
| let view = | ||
| unsafe { self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf) }; | ||
| views_buf.push(view); | ||
| } | ||
|
|
||
| data_blocks.push(Buffer::from_vec(data_buf)); | ||
| current_view_idx += gc_copy_group.total_len; | ||
| } | ||
|
|
||
| // 5) Wrap up buffers | ||
| let data_block = Buffer::from_vec(data_buf); | ||
| // 4) Wrap up buffers | ||
| let views_scalar = ScalarBuffer::from(views_buf); | ||
| let data_blocks = vec![data_block]; | ||
|
|
||
| // SAFETY: views_scalar, data_blocks, and nulls are correctly aligned and sized | ||
| unsafe { GenericByteViewArray::new_unchecked(views_scalar, data_blocks, nulls) } | ||
|
|
@@ -538,10 +591,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |
| /// inside one of `self.buffers`. | ||
| /// - `data_buf` must be ready to have additional bytes appended. | ||
| /// - After this call, the returned view will have its | ||
| /// `buffer_index` reset to `0` and its `offset` updated so that it points | ||
| /// `buffer_index` reset to `buffer_idx` and its `offset` updated so that it points | ||
| /// into the bytes just appended at the end of `data_buf`. | ||
| #[inline(always)] | ||
| unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec<u8>) -> u128 { | ||
| unsafe fn copy_view_to_buffer( | ||
| &self, | ||
| i: usize, | ||
| buffer_idx: i32, | ||
| data_buf: &mut Vec<u8>, | ||
| ) -> u128 { | ||
| // SAFETY: `i < self.len()` ensures this is in‑bounds. | ||
| let raw_view = unsafe { *self.views().get_unchecked(i) }; | ||
| let mut bv = ByteView::from(raw_view); | ||
|
|
@@ -561,7 +619,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |
| let new_offset = data_buf.len() as u32; | ||
| data_buf.extend_from_slice(slice); | ||
|
|
||
| bv.buffer_index = 0; | ||
| bv.buffer_index = buffer_idx as u32; | ||
| bv.offset = new_offset; | ||
| bv.into() | ||
| } | ||
|
|
@@ -1404,6 +1462,56 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| #[test] | ||
| #[cfg_attr(miri, ignore)] // Takes too long | ||
| fn test_gc_huge_array() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test requires about 5GiB memory, it's huge, I don't know would it affect the testing on some machines There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous code would meet bug only when buffer greater than 4GiB, the current code can be tested when > 2GiB. Personally I think leave 2GiB for test is ok but 4GiB is also ok to me, decide on reviewer's idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran this test on my mac M3 and it takes 1.5 seconds so I think it is ok I also verified that without the code in this PR, the test fails like: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have removed the single buffer limitation :-( |
||
| // Construct multiple 128 MiB BinaryView entries so total > 4 GiB | ||
| let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view | ||
| let num_views: usize = 36; | ||
|
|
||
| // Create a single 128 MiB data block with a simple byte pattern | ||
| let buffer = Buffer::from_vec(vec![0xAB; block_len]); | ||
| let buffer2 = Buffer::from_vec(vec![0xFF; block_len]); | ||
|
|
||
| // Append this block and then add many views pointing to it | ||
| let mut builder = BinaryViewBuilder::new(); | ||
| let block_id = builder.append_block(buffer); | ||
| for _ in 0..num_views / 2 { | ||
| builder | ||
| .try_append_view(block_id, 0, block_len as u32) | ||
| .expect("append view into 128MiB block"); | ||
| } | ||
| let block_id2 = builder.append_block(buffer2); | ||
| for _ in 0..num_views / 2 { | ||
| builder | ||
| .try_append_view(block_id2, 0, block_len as u32) | ||
| .expect("append view into 128MiB block"); | ||
| } | ||
|
|
||
| let array = builder.finish(); | ||
| let total = array.total_buffer_bytes_used(); | ||
| assert!( | ||
| total > u32::MAX as usize, | ||
| "Expected total non-inline bytes to exceed 4 GiB, got {}", | ||
| total | ||
| ); | ||
|
|
||
| // Run gc and verify correctness | ||
| let gced = array.gc(); | ||
| assert_eq!(gced.len(), num_views, "Length mismatch after gc"); | ||
| assert_eq!(gced.null_count(), 0, "Null count mismatch after gc"); | ||
| assert_ne!( | ||
| gced.data_buffers().len(), | ||
| 1, | ||
| "gc with huge buffer should not consolidate data into a single buffer" | ||
| ); | ||
|
|
||
| // Element-wise equality check across the entire array | ||
| array.iter().zip(gced.iter()).for_each(|(orig, got)| { | ||
| assert_eq!(orig, got, "Value mismatch after gc on huge array"); | ||
| }); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_eq() { | ||
| let test_data = [ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this somewhat confusing at first as it is just deferring the creation of the view buffers.
I think the code would be clearer (and faster) if you simply created the new buffers directly (with a branch for when the total length was too large)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would making the fast path slower 🤔, a single copy-group is just as simple as previous code. Maybe I should just remove the allocation here ( https://github.com/apache/arrow-rs/pull/8694/files#r2470787503 )?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the struct will affect some performance? We can compare the benchmark.