perf(frequency): parallel tree-reduce of partial FTables (~1.3x speedup)#3728
Merged
Conversation
Replace serial `merge_all(recv.iter())` with `rayon::reduce` over the collected partial FTables. New helpers `merge_ftables` and `merge_weighted_ftables` merge two partial tables with per-column parallelism (each column's `Frequencies` / HashMap is independent). Why: samply profiling on a 1M-row x 41-col CSV showed only ~3.7 of 16 cores busy on average — workers all finished their chunks then parked on the threadpool's condvar while one main thread folded N partial HashMaps. Tree-reduce turns the O(N) serial fold into O(log N) rounds that the rayon pool runs in parallel, and the per-column merge inside each round adds a second axis of parallelism. Measured (hyperfine, 10 runs, M4 Max 16 cores, 1M rows x 41 cols): baseline (master): 924.0 ms +/- 7.9 user 3187 ms (~3.5x cores) new (parallel-merge): 716.7 ms +/- 22.4 user 3782 ms (~5.3x cores) -> 1.29x faster Output is byte-identical to master (`Frequencies::merge` is the `Commute` trait's commutative + associative merge, so any reduce order yields the same result). All 160 frequency tests pass, including the existing `prop_frequency_seq_vs_parallel` proptest.
Up to standards ✅🟢 Issues
|
Contributor
There was a problem hiding this comment.
Pull request overview
This PR improves qsv frequency performance by replacing the serial merge of per-chunk partial frequency tables with a Rayon-powered parallel tree-reduce, including parallel per-column merges during each reduce step.
Changes:
- Replaced serial folding of partial unweighted
FTableswith a Rayonreducetree-reduce. - Replaced serial folding of partial
WeightedFTableswith a Rayonreducetree-reduce. - Added pairwise merge helpers to merge two partial tables with per-column parallelism.
…for streaming reduce - Promote `debug_assert_eq!` -> `assert_eq!` in `merge_ftables` and `merge_weighted_ftables`. Without this the subsequent `zip(...)` would silently truncate to the shorter Vec on a length mismatch and drop columns in release builds. Cost is one cmp per merge call (~16 in default path). - Switch both reduce call sites to `recv.into_iter().par_bridge().reduce(...)`. The previous `recv.iter().collect::<Vec<_>>()` materialized all `nchunks` partial tables before reducing — a real concern when memory-aware chunking picks small chunk sizes (nchunks can grow into the hundreds). par_bridge lets the reducer consume chunks as they arrive, so peak memory is bounded by rayon's working set rather than `nchunks`. 160 frequency tests still pass.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replace the serial
merge_all(recv.iter())of partial frequency tables with arayon::reducetree-reduce over the collected partials. Two new helpers (merge_ftables,merge_weighted_ftables) merge two partial tables with per-column parallelism, since each column'sFrequencies/ HashMap is independent.Why
Samply profiling on a 1M-row × 41-column CSV showed only ~3.7 of 16 cores busy on average for
qsv frequency— workers all finished their chunks at roughly the same wall time, then parked on the threadpool's condvar while one main thread serially folded N partial HashMaps viamerge_all. With nchunks == njobs == 16 and abounded(nchunks)channel that never back-pressures, the merge dominated the long tail.Tree-reduce turns the O(N) serial fold into O(log N) rounds that the rayon pool runs in parallel, and the per-column merge inside each round adds a second axis of parallelism.
Measured
hyperfine, 10 runs, warmup 3, M4 Max 16 cores, 1M-row × 41-col NYC 311 CSV:
~1.29× wall-time speedup; user-CPU rose because workers now contribute during the merge phase.
Correctness
diff -qclean on the benchmark CSV).Frequencies::mergeisqsv-stats'sCommutetrait — commutative + associative — so any reduce order yields the same result.cargo test frequency -F all_featurespass, including the existingprop_frequency_seq_vs_parallelproptest that directly compares parallel vs sequential output.src/cmd/frequency.rstouched (+57 / -21).Test plan
cargo build --locked --bin qsv -F all_features— cleancargo clippy --locked --bin qsv -F all_features— cleancargo test --locked frequency -F all_features— 160/160 passcargo test --locked stats -F all_features— 706/706 pass (no regression)🤖 Generated with Claude Code