Skip to content

Conversation

@LiaCastaneda
Copy link
Contributor

Which issue does this PR close?

Closes #
Related to the https://github.com/apache/datafusion/issues/16841#issuecomment-3563643947

Rationale for this change

Aggregation accumulators that store Arrow Arrays can have memory over accounting when array buffers are shared between multiple ScalarValues or directly Arrow Arrays. This occurs because doing ScalarValue::try_from_array() create slices that reference the same underlying buffers and each ScalarValue reports the full buffer size when calculating memory usage and the same physical buffer gets counted multiple times, leading to over accounting (this comment explains very well why we are seeing this)

There have been several attempts to fix this before which included compacting data to not keep the whole array alive, or using get_slice_memory_size instead of get_array_memory_size. However, we observed that both had downsides:

  • compact() was CPU inefficient (copies data which can be expensive)
  • get_slice_memory_size() only accounts for logical memory, but it is not the actual physical buffer capacity, therefore the amount returned is not accurate.

What changes are included in this PR?

This approach avoids double-counting memory by using Arrow's TrackingMemoryPool, which automatically deduplicates shared buffers when accounting them. This means we don't need to compact() or call get_slice_memory_size() just to solve the accounting problem. Note that compact() might still be useful when we want to release memory pressure.

  • Updated Accumulator::size() and GroupsAccumulator::size() signatures to accept Option<&dyn MemoryPool>:
    • When pool is None Returns total memory size including Arrow buffers using either get_slice_memory_size or just ScalarValue-> size () (same as before, so its backward compatible)
    • When pool is Some iteturns structural size only and claims buffers with the pool for deduplication tracking. Callers using the pool must add pool.used() to get total memory.

Updated accumulators that use the pool parameter:

  • DistinctCountAccumulator
  • ArrayAggAccumulator
  • For OrderSensitiveArrayAggAccumulator and DistinctArrayAggAccumulator I removed the compacting since it was introduced specifically to solve the over accounting, and its not needed anymore.
  • FirstValueAccumulator / LastValueAccumulator
    All other accumulator implementations had to be updated to match new signature

Are these changes tested?

Added distinct_count_does_not_over_account_memory() test to test memory pool deduplication for COUNT(DISTINCT) with array types. Also updated the existing accumulator tests to use memory pool, it verifies the accounted memory is still less than when not using the memory pool (in some cases even less than when we compacted).

Are there any user-facing changes?

yes, the API size for Accumulators and GroupAccumulators changed from fn size(&self) -> usize; to fn size(&self, pool: Option<&dyn MemoryPool>) -> usize;

Not sure if this is the best API design... I'm open to suggestions. In any case, if None is passed the behavior will remain the same as before. Also IIUC this function is mainly used to keep the DF memory pool within its bounds during aggregations.

@github-actions github-actions bot added logical-expr Logical plan and expressions core Core DataFusion crate substrait Changes to the substrait crate common Related to common crate proto Related to proto crate functions Changes to functions implementation ffi Changes to the ffi crate physical-plan Changes to the physical-plan crate spark labels Dec 26, 2025
@LiaCastaneda LiaCastaneda force-pushed the lia/use-arrow-pool-to-fix-memory-overaccounting-aggregations branch from 72d5f92 to 2378e6e Compare December 26, 2025 17:01
@LiaCastaneda LiaCastaneda force-pushed the lia/use-arrow-pool-to-fix-memory-overaccounting-aggregations branch from 2378e6e to 7d158c9 Compare December 26, 2025 17:07
@LiaCastaneda LiaCastaneda marked this pull request as ready for review December 30, 2025 10:01
@LiaCastaneda
Copy link
Contributor Author

This is probably not the perfect solution -- but maybe a starting point? We've seen over accounting problems in topk as well, so maybe the pool could also be integrated there? I'm open to suggestions :)

@gabotechs
Copy link
Contributor

Nice! I'll review this one soon

@LiaCastaneda LiaCastaneda changed the title Use arrow pool to fix memory over accounting aggregations Use arrow pool to fix memory over accounting in aggregations Dec 30, 2025
Comment on lines +441 to 444
fn size(&self, _pool: Option<&dyn MemoryPool>) -> usize {
size_of_val(self) - size_of_val(&self.max) + self.max.size()
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MinAccumulator and MaxAccumulator could also use the pool since they hold a ScalarValue, which could hold an Arrow Array -> shared buffers. I didn’t do it to avoid adding more changes in this PR.

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Dec 30, 2025
@LiaCastaneda LiaCastaneda force-pushed the lia/use-arrow-pool-to-fix-memory-overaccounting-aggregations branch from 1b5bcc0 to cb09c94 Compare December 30, 2025 13:28
Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad to see progress on the memory accounting issues! this definitely improves the situation.

Before moving forward with it, I think it might be worth clarifying the long term direction of memory tracking in arrow-rs/datafusion. Here are a couple of thoughts about the current state of things:

  • There are two different MemoryPool traits with overlapping intentions: the one from DataFusion and the one from arrow-rs. My impression is that this is an undesirable state, and we might want to consolidate into one.
  • The current memory reservation mechanism is passive, meaning that it's up to the developers to manually register/deregister whatever memory was used in the appropriate MemoryPool, rather than happening automatically during the actual allocation, which can open the door for mistakes that lead to under/over accounting
  • The overall assumption that aggregation accumulators, execution plans, etc... have a "size" might be flawed. Ultimately, what occupies space in memory is the shared buffers in the underlaying data, and not the accumulators or the execution plans. In arrow-rs all buffers are shared, and therefore, no single struct can claim its ownership, or that it's part of its "size".

In the C++ Arrow MemoryPool implementation, it's the MemoryPool itself who performs the allocation, and therefore, having a MemoryPool there is not optional. Having a MemoryPool be the one responsible for allocations, and for failing if a new allocation would overflow the max capacity, looks in my opinion a superior approach that leaves very little room for errors.

After reviewing the C++ Arrow's memory management model, I get the impression that is the most advanced and mature one, and none of the implementations in arrow-rs or DataFusion seem to mirror it, so I wonder if rather than building on top of the current pillars, we should instead be exploring ways of getting those pillars very right from the beginning.


if let Some(pool) = pool {
for arr in &self.values {
claim_buffers_recursive(&arr.to_data(), pool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine that .size() here can be called an arbitrary amount of times. What would happen with claim_buffers_recursive if this is called a lot of times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will call claim() multiple times on the same Buffer/Bytes, but is safe because each call replaces the previous reservation (and does not add to it), so the pool will only track each buffer once regardless of how many times size() is called

if let Some(array) = scalar.get_array_ref() {
total += size_of::<Arc<dyn Array>>();
if let Some(pool) = pool {
claim_buffers_recursive(&array.to_data(), pool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that if a &dyn MemoryPool is passed, then the array size does not compute towards the total size, and it's instead claimed in the &dyn MemoryPool instead.

Imagine this scenario:

  • The underlying array is huge
  • A dyn MemoryPool is passed, so the array size does not compute towards the total_size, it's just claimed in the Arrow Buffer memory pool
  • In GroupedHashAggregateStream::update_memory_reservation, the total_size is very small, as the array size did not compute towards it.
  • When calling reservation.try_resize() with the small total_size, the reservation succeeds

Isn't this a problematic scenario?

Copy link
Contributor Author

@LiaCastaneda LiaCastaneda Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why after calling size() we add arrow_pool.used() to account for the buffer memory (I forgot to add it). I also considered calling arrow_pool.used() directly inside each accumulator's size() function so the caller doesn't have to remember to do it. However, that would still cause over-accounting in scenarios like update_memory_reservation() where we sum size() across multiple accumulators since they can share buffers as well:

        let total_size = self.group_values.size()
            + self.group_ordering.size()
            + self.current_group_indices.capacity() * size_of::<usize>()
            + self
                .accumulators
                .iter()
                .map(|x| x.size(Some(&self.arrow_pool)))
                .sum::<usize>() <--- If each size() returned arrow_pool.used() we would still be over counting the pool 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is how we would use the pool properly (link) to calculate the total size without counting shared buffers multiple times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Ok, and that works because we have 1 TrackingArrowPool per GroupedHashAggregateStream 👍

Comment on lines 1596 to 1605
if let Some(array) = scalar.get_array_ref() {
total += size_of::<Arc<dyn Array>>();
if let Some(pool) = pool {
claim_buffers_recursive(&array.to_data(), pool);
} else {
total += scalar.size() - size_of_val(scalar);
}
} else {
total += scalar.size() - size_of_val(scalar);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern seems to be repeated several times across the project. Maybe a helper could be useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, good idea

@LiaCastaneda
Copy link
Contributor Author

Yes, I completely agree with you. I created apache/arrow-rs#8938 to raise this issue on the Arrow side. In most other Arrow implementations, they carry around a context object that contains the memory pool, so newly created arrays are immediately accounted for in the pool. In that case, DataFusion wouldn't have to do anything other than use the Arrow memory pool instead of the DataFusion pool. However, I'm aware this would require a considerable amount of effort on the Arrow side, and I'm not sure what the community there thinks of this idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions physical-plan Changes to the physical-plan crate proto Related to proto crate spark substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants