branch-4.0: [opt](multi-catalog) Optimize file split size. #60637
branch-4.0: [opt](multi-catalog) Optimize file split size. #60637yiguolei merged 1 commit intoapache:branch-4.0from
Conversation
### What problem does this PR solve? ### Release note This PR introduces a **dynamic and progressive file split size adjustment mechanism** to improve scan parallelism and resource utilization for external table scans, while avoiding excessive small splits or inefficiently large initial splits. #### 1. Split Size Adjustment Strategy ##### 1.1 Non-Batch Split Mode In non-batch split mode, a **two-phase split size selection strategy** is applied based on the total size of all input files: * The total size of all splits is calculated in advance. * If the total size **exceeds `maxInitialSplitNum * maxInitialSplitSize`**: * `split_size = maxSplitSize` (default **64MB**) * Otherwise: * `split_size = maxInitialSplitSize` (default **32MB**) This strategy reduces the number of splits for small datasets while improving parallelism for large-scale scans. --- ##### 1.2 Batch Split Mode In batch split mode, a **progressive split size adjustment strategy** is introduced: * As the total file size increases, * When the number of files gradually **exceeds `maxInitialSplitNum`**, * The `split_size` is **smoothly increased from `maxInitialSplitSize` (32MB) toward `maxSplitSize` (64MB)**. This approach avoids generating too many small splits at the early stage while gradually increasing scan parallelism as the workload grows, resulting in more stable scheduling and execution behavior. --- ##### 1.3 User-Specified Split Size (Backward Compatibility) This PR **preserves the session variable `file_split_size`** for user-defined split size configuration: * If `file_split_size` is explicitly set by the user: * The user-defined value takes precedence. * The dynamic split size adjustment logic is bypassed. * This ensures full backward compatibility with existing configurations and tuning practices. --- #### 2. Support Status by Data Source | Data Source | Non-Batch Split Mode | Batch Split Mode | Notes | | ----------- | -------------------- | ---------------- | ----------------------------------------------------- | | Hive | ✅ Supported | ✅ Supported | Uses Doris internal HDFS FileSplitter | | Iceberg | ✅ Supported | ❌ Not supported | File splitting is currently delegated to Iceberg APIs | | Paimon | ✅ Supported | ❌ Not supported | Only non-batch split mode is implemented | --- #### 3. New Hive HDFS FileSplitter Logic For Hive HDFS files, this PR introduces an enhanced file splitting strategy: 1. **Splits never span multiple HDFS blocks** * Prevents cross-block reads and avoids unnecessary IO overhead. 2. **Tail split optimization** * If the remaining file size is smaller than `split_size * 2`, * The remaining part is **evenly divided** into splits, * Preventing the creation of very small tail splits and improving overall scan efficiency. --- #### Summary * Introduces dynamic and progressive split size adjustment * Supports both batch and non-batch split modes * Preserves user-defined split size configuration for backward compatibility * Optimizes Hive HDFS file splitting to reduce small tail splits and cross-block IO
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR cherry-picks upstream work to optimize external table scan parallelism by introducing dynamic file split sizing, adding new session variables to control the behavior, and updating external scan nodes and related tests to use the new splitter logic.
Changes:
- Add session variables to control max initial split size, max split size, and the initial split-count threshold for dynamic sizing.
- Refactor file splitting into a stateful
FileSplitterand integrate it into Hive/Paimon/Iceberg/TVF scan paths. - Update/introduce unit + regression tests to reflect new split counts and backend assignment behavior.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy | Updates expected explain output split counts after new split sizing behavior. |
| fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java | Updates expected assignment totals impacted by scheduling changes. |
| fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java | Adjusts unit test to account for new FileSplitter initialization and sizing variables. |
| fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java | Adds new unit tests for split sizing and block-based splitting behavior. |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds new session variables controlling dynamic split sizing parameters. |
| fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java | Uses new dynamic target split-size selection for TVF scans. |
| fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java | Integrates FileSplitter and target split-size selection into Paimon native splitting. |
| fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java | Adds dynamic split sizing logic for Iceberg planning (batch vs non-batch behavior). |
| fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java | Integrates FileSplitter and per-scan target split-size selection (batch/non-batch). |
| fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java | Changes isBatchMode() to no longer declare throws UserException. |
| fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java | Refactors file splitting into an instance-based splitter with block-aware splitting and initial-split tracking. |
| fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java | Simplifies explain-path batch-mode handling after isBatchMode() signature change. |
| fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java | Adds fileSplitter instance initialization and removes old getRealFileSplitSize() logic. |
| fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java | Alters split assignment ordering and redistribution conditions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @Test | ||
| public void testNonSplittableFlagDecrementsCounter() throws Exception { | ||
| LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz"); | ||
| BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; | ||
| FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2); | ||
| List<Split> splits = fileSplitter.splitFile( | ||
| loc, | ||
| 0L, | ||
| locations, | ||
| 10 * MB, | ||
| 0L, | ||
| false, | ||
| Collections.emptyList(), | ||
| FileSplit.FileSplitCreator.DEFAULT); | ||
| Assert.assertEquals(1, splits.size()); | ||
| } |
There was a problem hiding this comment.
testNonSplittableFlagDecrementsCounter doesn't assert the counter decrement or any observable behavior related to the name; it currently only asserts splits.size() == 1, which is already covered by other tests. Either assert getRemainingInitialSplitNum() changes as intended or rename the test to match what it verifies.
| public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException { | ||
| ListMultimap<Backend, Split> assignment = ArrayListMultimap.create(); | ||
|
|
||
| Collections.shuffle(splits, new Random(FIXED_SHUFFLE_SEED)); | ||
| List<Split> remainingSplits; |
There was a problem hiding this comment.
computeScanRangeAssignment's contract/Javadoc says splits should be in partition/path order to maximize OS page cache reuse, but the new Collections.shuffle(...) explicitly destroys that ordering. It also mutates the caller-provided splits list in place, which can be surprising if the caller reuses it later. If randomized ordering is still required, shuffle a copy (or gate it behind a config) and update the Javadoc accordingly; otherwise remove the shuffle to preserve the intended scheduling behavior.
| // Materialize planFiles() into a list to avoid iterating the CloseableIterable twice. | ||
| // RISK: It will cost memory if the table is large. | ||
| List<FileScanTask> fileScanTaskList = new ArrayList<>(); | ||
| try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) { | ||
| for (FileScanTask task : scanTasksIter) { | ||
| fileScanTaskList.add(task); | ||
| } | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to materialize file scan tasks", e); | ||
| } | ||
|
|
||
| targetSplitSize = determineTargetFileSplitSize(fileScanTaskList); | ||
| return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize); |
There was a problem hiding this comment.
Materializing scan.planFiles() into a List can cause significant memory pressure for large Iceberg tables (the code comment already calls this out). A safer approach is to compute the threshold by iterating one CloseableIterable and then call scan.planFiles() again for the actual TableScanUtil.splitFiles(...) pass (trading some extra planning work for bounded memory), or to use a cheaper heuristic that doesn't require buffering all tasks.
| // Materialize planFiles() into a list to avoid iterating the CloseableIterable twice. | |
| // RISK: It will cost memory if the table is large. | |
| List<FileScanTask> fileScanTaskList = new ArrayList<>(); | |
| try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) { | |
| for (FileScanTask task : scanTasksIter) { | |
| fileScanTaskList.add(task); | |
| } | |
| } catch (Exception e) { | |
| throw new RuntimeException("Failed to materialize file scan tasks", e); | |
| } | |
| targetSplitSize = determineTargetFileSplitSize(fileScanTaskList); | |
| return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize); | |
| // Iterate planFiles() once to determine the target split size, then plan again for splitting. | |
| try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) { | |
| targetSplitSize = determineTargetFileSplitSize(scanTasksIter); | |
| } catch (Exception e) { | |
| throw new RuntimeException("Failed to determine target file split size", e); | |
| } | |
| return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); |
| // Ensure fileSplitter is initialized on the spy as doInitialize() is not called in this unit test | ||
| FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize, maxSplitSize, | ||
| 0); | ||
| try { | ||
| java.lang.reflect.Field field = FileQueryScanNode.class.getDeclaredField("fileSplitter"); | ||
| field.setAccessible(true); | ||
| field.set(spyPaimonScanNode, fileSplitter); | ||
| } catch (NoSuchFieldException | IllegalAccessException e) { | ||
| throw new RuntimeException("Failed to inject FileSplitter into PaimonScanNode test", e); | ||
| } | ||
|
|
||
| // Note: The original PaimonSource is sufficient for this test | ||
| // No need to mock catalog properties since doInitialize() is not called in this test | ||
| // Mock SessionVariable behavior | ||
| Mockito.when(sv.isForceJniScanner()).thenReturn(false); | ||
| Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE"); | ||
| Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize); | ||
| Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize); | ||
|
|
There was a problem hiding this comment.
This test injects fileSplitter via reflection, which is brittle (field renames/visibility changes will silently break the test) and bypasses the normal initialization path. Prefer calling the real initialization (doInitialize()), exposing a small test-only setter, or constructing the scan node in a way that initializes fileSplitter without reflection.
| // Ensure fileSplitter is initialized on the spy as doInitialize() is not called in this unit test | |
| FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize, maxSplitSize, | |
| 0); | |
| try { | |
| java.lang.reflect.Field field = FileQueryScanNode.class.getDeclaredField("fileSplitter"); | |
| field.setAccessible(true); | |
| field.set(spyPaimonScanNode, fileSplitter); | |
| } catch (NoSuchFieldException | IllegalAccessException e) { | |
| throw new RuntimeException("Failed to inject FileSplitter into PaimonScanNode test", e); | |
| } | |
| // Note: The original PaimonSource is sufficient for this test | |
| // No need to mock catalog properties since doInitialize() is not called in this test | |
| // Mock SessionVariable behavior | |
| Mockito.when(sv.isForceJniScanner()).thenReturn(false); | |
| Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE"); | |
| Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize); | |
| Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize); | |
| // Mock SessionVariable behavior used during initialization and splitting | |
| Mockito.when(sv.isForceJniScanner()).thenReturn(false); | |
| Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE"); | |
| Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize); | |
| Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize); | |
| // Initialize the scan node through its normal initialization path | |
| spyPaimonScanNode.doInitialize(); |
| public void setMaxSplitSize(long maxSplitSize) { | ||
| this.maxSplitSize = maxSplitSize; | ||
| } | ||
|
|
||
| public int maxInitialSplitNum() { | ||
| return maxInitialSplitNum; | ||
| } | ||
|
|
||
| public void setMaxInitialSplits(int maxInitialSplitNum) { | ||
| this.maxInitialSplitNum = maxInitialSplitNum; | ||
| } |
There was a problem hiding this comment.
The accessor/mutator API here is internally inconsistent: maxInitialSplitNum() doesn't follow the getXxx naming used by the other getters, and setMaxInitialSplits()/setMaxSplitSize()/setMaxInitialSplitSize() would leave remainingInitialSplitNum and currentMaxSplitSize out of sync with the new configuration if they were ever called. Consider removing these setters and making the config fields final (preferred), or ensure setters also reset the derived state/counters.
| while (splitStart < start + length) { | ||
| updateCurrentMaxSplitSize(); | ||
| long splitBytes; | ||
| long remainingBlockBytes = blocks.get(currentBlockIdx).getEnd() - splitStart; | ||
| if (remainingBlockBytes <= currentMaxSplitSize) { | ||
| splitBytes = remainingBlockBytes; |
There was a problem hiding this comment.
FileSplitter keeps currentMaxSplitSize as a mutable instance field that is updated on each split. In batch mode, HiveScanNode.startSplit() generates splits concurrently across partitions, so splitFile() can be invoked by multiple threads on the same fileSplitter instance; this makes currentMaxSplitSize a data race and can produce incorrect split sizing. Make the chosen max split size a local variable (e.g., have updateCurrentMaxSplitSize() return the size) or otherwise synchronize/avoid sharing mutable state across threads.
| name = MAX_FILE_SPLIT_SIZE, | ||
| description = {"对于每个 table scan,最大文件分片大小。" | ||
| + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了 MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。", | ||
| "For each table scan, the maximum initial file split size. " |
There was a problem hiding this comment.
The English description for MAX_FILE_SPLIT_SIZE says "the maximum initial file split size", which is misleading (this variable represents the max split size after the initial phase). Please correct the English description to match the variable name/Chinese description.
| "For each table scan, the maximum initial file split size. " | |
| "For each table scan, the maximum file split size. " |
| totalFileSize += rawFile.fileSize(); | ||
| if (totalFileSize | ||
| >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { | ||
| result = sessionVariable.getMaxSplitSize(); | ||
| break; |
There was a problem hiding this comment.
totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum() can overflow long for large session settings, which may break the split-size selection logic. Consider a safe multiply (e.g., Math.multiplyExact with overflow handling) or a division-based comparison.
| return TableScanUtil.splitFiles(scan.planFiles(), | ||
| sessionVariable.getFileSplitSize()); | ||
| } | ||
| if (isBatchMode()) { | ||
| // Currently iceberg batch split mode will use max split size. | ||
| // TODO: dynamic split size in batch split mode need to customize iceberg splitter. | ||
| return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize()); |
There was a problem hiding this comment.
In the file_split_size > 0 and isBatchMode() branches, targetSplitSize is not set before creating splits, so createIcebergSplit() will call split.setTargetSplitSize(0). That makes split-weight computation degenerate (division by zero clamps to standard weight), which can harm scan range assignment. Set targetSplitSize to the actual split size used in these branches (user value or maxSplitSize) before returning.
| return TableScanUtil.splitFiles(scan.planFiles(), | |
| sessionVariable.getFileSplitSize()); | |
| } | |
| if (isBatchMode()) { | |
| // Currently iceberg batch split mode will use max split size. | |
| // TODO: dynamic split size in batch split mode need to customize iceberg splitter. | |
| return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize()); | |
| targetSplitSize = sessionVariable.getFileSplitSize(); | |
| return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); | |
| } | |
| if (isBatchMode()) { | |
| // Currently iceberg batch split mode will use max split size. | |
| // TODO: dynamic split size in batch split mode need to customize iceberg splitter. | |
| targetSplitSize = sessionVariable.getMaxSplitSize(); | |
| return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); |
| for (FileScanTask task : tasks) { | ||
| accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file()); | ||
| if (accumulatedTotalFileSize | ||
| >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { | ||
| result = sessionVariable.getMaxSplitSize(); |
There was a problem hiding this comment.
accumulatedTotalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum() can overflow long with large session variable values, potentially selecting the wrong split size. Please use a safe multiply / overflow check or rewrite the comparison to avoid multiplication overflow.
| for (FileScanTask task : tasks) { | |
| accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file()); | |
| if (accumulatedTotalFileSize | |
| >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { | |
| result = sessionVariable.getMaxSplitSize(); | |
| long maxSplitSize = sessionVariable.getMaxSplitSize(); | |
| long maxInitialSplitNum = sessionVariable.getMaxInitialSplitNum(); | |
| // Compute a safe threshold equivalent to maxSplitSize * maxInitialSplitNum, | |
| // guarding against long overflow. If the product would overflow, cap it at Long.MAX_VALUE. | |
| long threshold; | |
| if (maxSplitSize <= 0 || maxInitialSplitNum <= 0) { | |
| threshold = Long.MAX_VALUE; | |
| } else if (maxSplitSize > Long.MAX_VALUE / maxInitialSplitNum) { | |
| threshold = Long.MAX_VALUE; | |
| } else { | |
| threshold = maxSplitSize * maxInitialSplitNum; | |
| } | |
| for (FileScanTask task : tasks) { | |
| long size = ScanTaskUtil.contentSizeInBytes(task.file()); | |
| if (size > 0 && Long.MAX_VALUE - accumulatedTotalFileSize < size) { | |
| accumulatedTotalFileSize = Long.MAX_VALUE; | |
| } else { | |
| accumulatedTotalFileSize += size; | |
| } | |
| if (accumulatedTotalFileSize >= threshold) { | |
| result = maxSplitSize; |
FE UT Coverage ReportIncrement line coverage |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
What problem does this PR solve?
Problem Summary:
Release note
Cherry-pick #58858
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)