Skip to content

Fixed blocks in aggregation state for better memory management #19649

@alchemist51

Description

@alchemist51

Describe the issue

Currently in GroupedHashAggregateStream struct, the group accumulator state is stored in a single Vec. This has following associated problems:

  1. No memory is freed until the GroupByHash has output every batch
  2. Upstream operators that hold references to any of the record batches will lead to holding up the underlying record batch in memory

Courtesy to @alamb for explaining it in detail here

Solution

Instead of storing a single large chunk of RecordBatchStream and slicing it, we could store it as multiple chunks. We have already discussed about the fixed sizing for the group accumulators in a different light, where we were trying to reduce the cost due to the double copy which we do in Vec::resize() for storing the groupaccumulators state. The same idea could be implemented for this issue and can help in reducing the overall memory usage for datafusion.

In the experiment, we took the PR #15591 by @Rachelint and made it work for constraint memory for PrimitiveArrow types.Since it’s for primitive types we took the field WatchID which has very high cardinality. For reference, in clickbench data WatchID has ~99M cardinality. Here is the query we tried:

SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"), SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;

When we run this query for a 16GB allocated memory pool, it fails to run with the below resource exhaustion message:

Resources exhausted: Failed to allocate additional 2684354560(2.5GB) bytes for 
TopK[0] with 13421914214(12.5GB) bytes already allocated for this reservation - 
1878841178(1.74GB) bytes remain available for the total pool
[TOPK-INSERT] ▼ Receiving batch from aggregation: rows=8192, current_heap=0/10, memory=1000 bytes
[TOPK-MEMORY] BEFORE insert: total=140994 bytes, batches_count=0, batches_size=0 bytes, entry_uses=10, reservation=0 bytes
[TOPK-STORE] INSERT batch_id=0, uses=10, batch_size=2684354560 bytes, batch_rows=8192, total_batches=0->1, total_size=0->2684354560 bytes

However when we enable the blocked approach for this query. Not only we were able to run the query in 16GB, it was working in 4GB memory pool as well:

 ./target/debug/datafusion-cli -m 4g
+---------------------+---------------------------+---------------------------+---------------------+
| WatchID             | min(hits.ResolutionWidth) | max(hits.ResolutionWidth) | sum(hits.IsRefresh) |
+---------------------+---------------------------+---------------------------+---------------------+
| 9223372033328793741 | 1368                      | 1368                      | 0                   |
| 9223371941779979288 | 1479                      | 1479                      | 0                   |
| 9223371906781104763 | 1638                      | 1638                      | 0                   |
| 9223371803397398692 | 1990                      | 1990                      | 0                   |
| 9223371799215233959 | 1638                      | 1638                      | 0                   |
| 9223371785975219972 | 0                         | 0                         | 0                   |
| 9223371776706839366 | 1368                      | 1368                      | 0                   |
| 9223371740707848038 | 1750                      | 1750                      | 0                   |
| 9223371715190479830 | 1368                      | 1368                      | 0                   |
| 9223371620124912624 | 1828                      | 1828                      | 0                   |
+---------------------+---------------------------+---------------------------+---------------------+

This is currently supported only for few GroupedAccumulators and Primitive data types. But the early results show promise that this can help us in making datafusion more memory friendly as can be seen from the topK logs as well:

[TOPK-INSERT] ▼ Receiving batch from aggregation: rows=8192, current_heap=0/10, memory=1000 bytes
[TOPK-MEMORY] BEFORE insert: total=140994 bytes, batches_count=0, batches_size=0 bytes, entry_uses=10, reservation=0 bytes
[TOPK-STORE] INSERT batch_id=0, uses=10, batch_size=1310720 bytes, batch_rows=8192, total_batches=0->1, total_size=0->1310720 bytes

For next steps, I’m thinking of submitting following issue tickets for making it work for resource intensive queries:

  • Introduce blocked approach for latest → PR Intermediate result blocked approach to aggregation memory management #15591 is stale, we will need to revive it and make it work for the latest. We will focus on the memory optimisation coming from the change. Also PR Intermediate result blocked approach to aggregation memory management #15591 doesn’t cover the following cases:
    • Support blocked approach with spills → We will always be running in memory constraint environment and will need to have spill support for the blocked approach for it.
    • Support multi group by → current POC code only helps in the case of single group by query, we need to implement it for multi group by cases.
    • Support for non-primitive types → currently we only support primitive types of Arrow, we will need to implement it for BinaryViews like structs.
    • Support all accumulators → currently only few of the grouped accumulators are supported like AVG,SUM,MIN,MAX. We need to extend it to support all varieties of aggs

cc: @bharath-techie

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions