PoC: Blocked state management for hash aggregation#22712
Conversation
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
The goal of this PoC is to demonstrate the refactor is necessary, and also do some experiment with blocked aggregation state management. The next step would be to create a refactor-only PR. |
| |group_index, value| { | ||
| debug_assert!(group_index < len); | ||
| let block_idx = group_index / block_size; | ||
| let value_idx = group_index % block_size; |
There was a problem hiding this comment.
I think you want to avoid % and / (two integer divisions!) by enforcing power of two.
| sums.push(values.value(row)); | ||
| nulls.append_non_null(); | ||
| } else { | ||
| counts.push(0); |
There was a problem hiding this comment.
this can use collect rather than push
| self.len = 0; | ||
|
|
||
| for chunk in values.chunks(self.block_size) { | ||
| let mut block = |
There was a problem hiding this comment.
This seems to do an unnecessary zero allocation
I think the overaccounting issue is also worth mentioning, as it leads to performance degradation in downstream operators due to excessive spilling. |
| self.release_map(); | ||
|
|
||
| let emit_len = self.len.min(self.block_size); | ||
| let block = self.blocks.remove(0); |
There was a problem hiding this comment.
I think this will have a negative performance impact similar to the drain + collect identified in #19906
| let null_idx = self.take_null_for_emit(n); | ||
| let output = self.values_range(0, n); | ||
| let remaining = self.values_range(n, self.len - n); | ||
|
|
There was a problem hiding this comment.
Because values_range always allocates, output and remaining allocate together an additional n elements. See #22165 where I reduce the allocation overhead for partial aggregation.
| data_type: DataType, | ||
| map: HashTable<(usize, u64)>, | ||
| null_group: Option<usize>, | ||
| blocks: Vec<Box<[T::Native]>>, |
There was a problem hiding this comment.
Because of the performance overhead of removing the first elements from a Vec, I would consider other approaches, maybe VecDeque
| let emit_len = self.len.min(self.block_size); | ||
| let block = self.blocks.remove(0); | ||
| let mut values = block.into_vec(); | ||
| values.truncate(emit_len); |
There was a problem hiding this comment.
truncating leads to a memory accounting inaccuracy, should shrink_to_fit be considered here? It leads to a reallocation, so I'm not sure if it's the right thing; however, it's worth mentioning the overaccounting in the last block's case, when len is smaller than Vec's capacity (and RecordBatch::get_array_memory_size reports the capacity, not the length)
| /// that do not advertise blocked emit support may return an internal error. | ||
| /// | ||
| /// Callers should only use this once no further updates will arrive for the | ||
| /// current groups. |
There was a problem hiding this comment.
When the partial hash aggregation hits the OOM condition, it triggers an early emit, draining all the existing groups and then continuing with processing input batches.
Is this use case supported with the EmitTo::Block approach?
There was a problem hiding this comment.
Yes, this is intended.
I will correct it in the later split PR.
…#22729) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - part of apache#22710 - 1/N of apache#22712 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See issues. This PR split out partial and final aggregate strem from `GroupsHashAggregateStream` To fully migrate hash aggregation, we have to - Port this optimization back apache#11627 - Support spilling I think they should be leave to follow up PRs Todo in this PR: - [x] Add a temporary configuration `enable_migration_aggregate` to turn off this path Since it should be a regression if the above features are not added, it also helps if to prevent potential regressions from the migration of other aggregate streams. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Split out the streams from `GroupsHashAggregateStream` 1. Partial stage of hash aggregation 2. Final stage of hash aggregation ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 3. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
Another attempt for #7065
Rationale for this change
This PR is motivated by two related but distinct concerns:
It wants to show how to first refactor the existing code, then apply the optimization. This can make the implementation easier.
Refactoring Strategy
I created an issue to analyze the root cause of the existing code complexity and how to solve it by incrementally splitting the logic:
Original Issue for Blocked State Management
These issues explain the motivation and background well:
I think the main motivation is memory efficiency. Performance (~10% faster for high-cardinality cases in this PoC) is only a nice by-product.
Suppose we have buffered 1GB of state in the partial aggregation stage. If the internal states are stored in a contiguous
Vec, they cannot be freed until repartitioning is done — approximately when the final-stage aggregation finishes. That means peak memory usage can becomeall partial states + all final states; in the worst case, this can reach 2GB.Ideally, we should be able to stay closer to 1GB by managing memory with fixed-size blocks. Once final aggregation starts consuming partial state, the corresponding partial blocks can be freed incrementally.
Benchmark result
Summary: med/high cardinality is faster; low cardinality can be slower but acceptable?; high cardinality is slower due to a missing fast path, see below.
datafusion.execution.skip_partial_aggregation_probe_ratio_threshold, once implemented it's also likely to get faster, according to Q4 high cardinality's current number.Memory usage for Q4
microbench.sql
Implementation plan
This PR is just a PoC, it can be split into smaller patches for review.
What changes are included in this PR?
Refresher for related internal data structures
The simplified metal model for hash aggregation is

HashTable: group_key -> group_state, in reality group values and group states are all stored as contiguous vector for efficiency.Key Changes
Split out the partial and final aggregation logic
See #22710 for the idea, there are 2 execution paths split to finish the micro bench queries above:
RawPartialHashAggregateStreamPartialFinalHashAggregateStreamThey're only responsible for repartition-based 2 stage hash aggregation.
Support blocked memory management for states
This PoC only target to make the following workload work for blocked memory management
So in order to support blocked stage management:
impl<T> GroupValues for GroupValuesPrimitiveBlock<T>impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>The idea is to replace the internal contiguous vector with fixed size blocks (see above figure).
They're implemented with new structs just to make PoC simpler, it's possible to replace the existing implementation with this blocked approach.
Are these changes tested?
Are there any user-facing changes?