diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 0319363bf5d8..0a2bc500423a 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -512,18 +512,71 @@ impl GenericByteViewArray { }; } - // 3) Allocate exactly capacity for all non-inline data - let mut data_buf = Vec::with_capacity(total_large); + struct GcCopyGroup { + total_buffer_bytes: usize, + total_len: usize, + } + + let mut groups = vec![]; + let one_group = [GcCopyGroup { + total_buffer_bytes: total_large, + total_len: len, + }]; + let gc_copy_groups = if total_large > i32::MAX as usize { + // Slow-path: need to split into multiple copy groups + let mut current_length = 0; + let mut current_elements = 0; + + for view in self.views() { + let len = *view as u32; + if len > MAX_INLINE_VIEW_LEN { + if current_length + len > i32::MAX as u32 { + // Start a new group + groups.push(GcCopyGroup { + total_buffer_bytes: current_length as usize, + total_len: current_elements, + }); + current_length = 0; + current_elements = 0; + } + current_length += len; + current_elements += 1; + } + } + if current_elements != 0 { + groups.push(GcCopyGroup { + total_buffer_bytes: current_length as usize, + total_len: current_elements, + }); + } + &groups + } else { + one_group.as_slice() + }; + debug_assert!(gc_copy_groups.len() <= i32::MAX as usize); + + // 3) Copy the buffers group by group + let mut views_buf = Vec::with_capacity(len); + let mut data_blocks = Vec::with_capacity(gc_copy_groups.len()); + + let mut current_view_idx = 0; + + for (group_idx, gc_copy_group) in gc_copy_groups.iter().enumerate() { + let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes); - // 4) Iterate over views and process each inline/non-inline view - let views_buf: Vec = (0..len) - .map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) }) - .collect(); + // Directly push views to avoid intermediate Vec allocation + for view_idx in current_view_idx..current_view_idx + gc_copy_group.total_len { + let view = + unsafe { self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf) }; + views_buf.push(view); + } + + data_blocks.push(Buffer::from_vec(data_buf)); + current_view_idx += gc_copy_group.total_len; + } - // 5) Wrap up buffers - let data_block = Buffer::from_vec(data_buf); + // 4) Wrap up buffers let views_scalar = ScalarBuffer::from(views_buf); - let data_blocks = vec![data_block]; // SAFETY: views_scalar, data_blocks, and nulls are correctly aligned and sized unsafe { GenericByteViewArray::new_unchecked(views_scalar, data_blocks, nulls) } @@ -538,10 +591,15 @@ impl GenericByteViewArray { /// inside one of `self.buffers`. /// - `data_buf` must be ready to have additional bytes appended. /// - After this call, the returned view will have its - /// `buffer_index` reset to `0` and its `offset` updated so that it points + /// `buffer_index` reset to `buffer_idx` and its `offset` updated so that it points /// into the bytes just appended at the end of `data_buf`. #[inline(always)] - unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec) -> u128 { + unsafe fn copy_view_to_buffer( + &self, + i: usize, + buffer_idx: i32, + data_buf: &mut Vec, + ) -> u128 { // SAFETY: `i < self.len()` ensures this is in‑bounds. let raw_view = unsafe { *self.views().get_unchecked(i) }; let mut bv = ByteView::from(raw_view); @@ -561,7 +619,7 @@ impl GenericByteViewArray { let new_offset = data_buf.len() as u32; data_buf.extend_from_slice(slice); - bv.buffer_index = 0; + bv.buffer_index = buffer_idx as u32; bv.offset = new_offset; bv.into() } @@ -1404,6 +1462,56 @@ mod tests { } } + #[test] + #[cfg_attr(miri, ignore)] // Takes too long + fn test_gc_huge_array() { + // Construct multiple 128 MiB BinaryView entries so total > 4 GiB + let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view + let num_views: usize = 36; + + // Create a single 128 MiB data block with a simple byte pattern + let buffer = Buffer::from_vec(vec![0xAB; block_len]); + let buffer2 = Buffer::from_vec(vec![0xFF; block_len]); + + // Append this block and then add many views pointing to it + let mut builder = BinaryViewBuilder::new(); + let block_id = builder.append_block(buffer); + for _ in 0..num_views / 2 { + builder + .try_append_view(block_id, 0, block_len as u32) + .expect("append view into 128MiB block"); + } + let block_id2 = builder.append_block(buffer2); + for _ in 0..num_views / 2 { + builder + .try_append_view(block_id2, 0, block_len as u32) + .expect("append view into 128MiB block"); + } + + let array = builder.finish(); + let total = array.total_buffer_bytes_used(); + assert!( + total > u32::MAX as usize, + "Expected total non-inline bytes to exceed 4 GiB, got {}", + total + ); + + // Run gc and verify correctness + let gced = array.gc(); + assert_eq!(gced.len(), num_views, "Length mismatch after gc"); + assert_eq!(gced.null_count(), 0, "Null count mismatch after gc"); + assert_ne!( + gced.data_buffers().len(), + 1, + "gc with huge buffer should not consolidate data into a single buffer" + ); + + // Element-wise equality check across the entire array + array.iter().zip(gced.iter()).for_each(|(orig, got)| { + assert_eq!(orig, got, "Value mismatch after gc on huge array"); + }); + } + #[test] fn test_eq() { let test_data = [