[Enhancement](parquet)update runtime filter when read next parquet row group.#59053
[Enhancement](parquet)update runtime filter when read next parquet row group.#59053morningman merged 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
b0d62d6 to
fcacd4c
Compare
Possible file(s) that should be tracked in LFS detected: 🚨The following file(s) exceeds the file size limit:
Consider using |
|
run buildall |
TPC-H: Total hot run time: 32192 ms |
TPC-DS: Total hot run time: 176509 ms |
ClickBench: Total hot run time: 27.14 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
There was a problem hiding this comment.
Pull request overview
This PR enhances Parquet row group filtering by fetching the latest join runtime filters when creating each row group reader, rather than only at the file level. This allows for more efficient data filtering as runtime filters become available during query execution.
Key Changes:
- Added runtime filter update mechanism that is called when creating new Parquet row group readers
- Introduced test coverage to verify that runtime filters are applied across multiple row groups
Reviewed changes
Copilot reviewed 9 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/external_table_p0/hive/test_parquet_join_runtime_filter.groovy | Adds comprehensive test suite for verifying runtime filter application across Parquet row groups |
| docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run84.hql | Creates test tables for runtime filter validation |
| be/src/vec/exec/scan/scanner.cpp | Initializes total runtime filter count during scanner construction |
| be/src/vec/exec/scan/file_scanner.h | Updates method signature to track runtime filter changes |
| be/src/vec/exec/scan/file_scanner.cpp | Implements runtime filter update callback for Parquet readers |
| be/src/vec/exec/format/parquet/vparquet_reader.h | Adds callback mechanism and lazy read context update method |
| be/src/vec/exec/format/parquet/vparquet_reader.cpp | Implements runtime filter updates when creating row group readers |
| be/src/vec/exec/format/parquet/vparquet_group_reader.h | Extends LazyReadContext to include partition and missing column information |
| be/src/runtime_filter/runtime_filter_consumer_helper.h | Exposes runtime filter count accessor |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _output_tuple_desc(_local_state->output_tuple_desc()), | ||
| _output_row_descriptor(_local_state->_parent->output_row_descriptor()), | ||
| _has_prepared(false) { | ||
| _total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums()); |
There was a problem hiding this comment.
The function name cast_set is ambiguous and doesn't clearly convey its purpose. Consider renaming to something more descriptive like set_int_value or using direct casting syntax like static_cast<int>.
| _total_rf_num = cast_set<int>(_local_state->_helper.runtime_filter_nums()); | |
| _total_rf_num = static_cast<int>(_local_state->_helper.runtime_filter_nums()); |
| new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns); | ||
| new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns); |
There was a problem hiding this comment.
Moving from _lazy_read_ctx and then assigning back to it on line 416 leaves the source in a moved-from state. Consider copying these values instead of moving, or restructure to avoid the circular dependency.
| new_lazy_read_ctx.fill_partition_columns = std::move(_lazy_read_ctx.fill_partition_columns); | |
| new_lazy_read_ctx.fill_missing_columns = std::move(_lazy_read_ctx.fill_missing_columns); | |
| new_lazy_read_ctx.fill_partition_columns = _lazy_read_ctx.fill_partition_columns; | |
| new_lazy_read_ctx.fill_missing_columns = _lazy_read_ctx.fill_missing_columns; |
| // when create new row group reader, call this function to get lasted runtime filter conjuncts. | ||
| std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = [](bool* changed, | ||
| VExprContextSPtrs&) { | ||
| *changed = false; | ||
| return Status::OK(); | ||
| }; |
There was a problem hiding this comment.
The default lambda implementation always sets changed to false and returns OK. Consider documenting why this is the default behavior or making it more explicit with a named static function.
| // when create new row group reader, call this function to get lasted runtime filter conjuncts. | |
| std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = [](bool* changed, | |
| VExprContextSPtrs&) { | |
| *changed = false; | |
| return Status::OK(); | |
| }; | |
| // when creating a new row group reader, call this function to get the latest runtime filter conjuncts. | |
| // The default implementation does nothing, sets 'changed' to false, and returns OK. | |
| // This is used when no late runtime filter is required. | |
| static Status default_late_rf_func(bool* changed, VExprContextSPtrs&) { | |
| *changed = false; | |
| return Status::OK(); | |
| } | |
| std::function<Status(bool*, VExprContextSPtrs&)> _call_late_rf_func = default_late_rf_func; |
71cefb9 to
44fa16d
Compare
|
run buildall |
TPC-H: Total hot run time: 32772 ms |
TPC-DS: Total hot run time: 176928 ms |
ClickBench: Total hot run time: 27.49 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
44fa16d to
2937fe0
Compare
|
run buildall |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
### What problem does this PR solve? Problem Summary: case from pr : #59053
…w group. (apache#59053) Problem Summary: This pull request achieves better filtering by fetching the latest join runtime filter when creating the Parquet row group reader. Previously, the join runtime filter was fetched at the Parquet file level.
### What problem does this PR solve? Problem Summary: case from pr : apache#59053
…t parquet row group. (#59053) (#59725) bp #59053 bp #59557 ### What problem does this PR solve? Problem Summary: This pull request achieves better filtering by fetching the latest join runtime filter when creating the Parquet row group reader. Previously, the join runtime filter was fetched at the Parquet file level.
### What problem does this PR solve? Problem Summary: case from pr : apache#59053
…ax filter (#60197) ### What problem does this PR solve? Problem Summary: 1. Fixed an issue where the Parquet reader's pushdown predicate row group min-max filter was ineffective due to a refactoring of the column predicate #59187. 2. Removed tablet_schema from RuntimePredicate when performing topn runtime filters, making RuntimePredicate not limited to olap scan node. 3. Removed the feature in #59053 where the Parquet reader could obtain runtime filters in real time, for the purpose of unifying the implementation of olap and file scan node logic in the future. 4. Temporarily disable the topn runtime filter for VARBINARY type, as the implementation of this type in column predicate is incomplete, affecting pr #58721.
…ax filter (#60197) ### What problem does this PR solve? Problem Summary: 1. Fixed an issue where the Parquet reader's pushdown predicate row group min-max filter was ineffective due to a refactoring of the column predicate #59187. 2. Removed tablet_schema from RuntimePredicate when performing topn runtime filters, making RuntimePredicate not limited to olap scan node. 3. Removed the feature in #59053 where the Parquet reader could obtain runtime filters in real time, for the purpose of unifying the implementation of olap and file scan node logic in the future. 4. Temporarily disable the topn runtime filter for VARBINARY type, as the implementation of this type in column predicate is incomplete, affecting pr #58721.
What problem does this PR solve?
Problem Summary:
This pull request achieves better filtering by fetching the latest join runtime filter when creating the Parquet row group reader. Previously, the join runtime filter was fetched at the Parquet file level.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)