Skip to content

branch-4.0: [opt](multi-catalog) Optimize file split size. #60637

Merged
yiguolei merged 1 commit intoapache:branch-4.0from
kaka11chen:cherry-pick-58858_4.0
Feb 12, 2026
Merged

branch-4.0: [opt](multi-catalog) Optimize file split size. #60637
yiguolei merged 1 commit intoapache:branch-4.0from
kaka11chen:cherry-pick-58858_4.0

Conversation

@kaka11chen
Copy link
Contributor

What problem does this PR solve?

Problem Summary:

Release note

Cherry-pick #58858

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

### What problem does this PR solve?

### Release note

This PR introduces a **dynamic and progressive file split size
adjustment mechanism** to improve scan parallelism and resource
utilization for external table scans, while avoiding excessive small
splits or inefficiently large initial splits.

#### 1. Split Size Adjustment Strategy

##### 1.1 Non-Batch Split Mode

In non-batch split mode, a **two-phase split size selection strategy**
is applied based on the total size of all input files:

* The total size of all splits is calculated in advance.
* If the total size **exceeds `maxInitialSplitNum *
maxInitialSplitSize`**:

  * `split_size = maxSplitSize` (default **64MB**)
* Otherwise:

  * `split_size = maxInitialSplitSize` (default **32MB**)

This strategy reduces the number of splits for small datasets while
improving parallelism for large-scale scans.

---

##### 1.2 Batch Split Mode

In batch split mode, a **progressive split size adjustment strategy** is
introduced:

* As the total file size increases,
* When the number of files gradually **exceeds `maxInitialSplitNum`**,
* The `split_size` is **smoothly increased from `maxInitialSplitSize`
(32MB) toward `maxSplitSize` (64MB)**.

This approach avoids generating too many small splits at the early stage
while gradually increasing scan parallelism as the workload grows,
resulting in more stable scheduling and execution behavior.

---

##### 1.3 User-Specified Split Size (Backward Compatibility)

This PR **preserves the session variable `file_split_size`** for
user-defined split size configuration:

* If `file_split_size` is explicitly set by the user:

  * The user-defined value takes precedence.
  * The dynamic split size adjustment logic is bypassed.
* This ensures full backward compatibility with existing configurations
and tuning practices.

---

#### 2. Support Status by Data Source

| Data Source | Non-Batch Split Mode | Batch Split Mode | Notes |
| ----------- | -------------------- | ---------------- |
----------------------------------------------------- |
| Hive | ✅ Supported | ✅ Supported | Uses Doris internal HDFS
FileSplitter |
| Iceberg | ✅ Supported | ❌ Not supported | File splitting is currently
delegated to Iceberg APIs |
| Paimon | ✅ Supported | ❌ Not supported | Only non-batch split mode is
implemented |

---

#### 3. New Hive HDFS FileSplitter Logic

For Hive HDFS files, this PR introduces an enhanced file splitting
strategy:

1. **Splits never span multiple HDFS blocks**

   * Prevents cross-block reads and avoids unnecessary IO overhead.

2. **Tail split optimization**

   * If the remaining file size is smaller than `split_size * 2`,
   * The remaining part is **evenly divided** into splits,
* Preventing the creation of very small tail splits and improving
overall scan efficiency.

---

#### Summary

* Introduces dynamic and progressive split size adjustment
* Supports both batch and non-batch split modes
* Preserves user-defined split size configuration for backward
compatibility
* Optimizes Hive HDFS file splitting to reduce small tail splits and
cross-block IO
Copilot AI review requested due to automatic review settings February 10, 2026 06:29
@kaka11chen kaka11chen requested a review from yiguolei as a code owner February 10, 2026 06:29
@Thearas
Copy link
Contributor

Thearas commented Feb 10, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@kaka11chen
Copy link
Contributor Author

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR cherry-picks upstream work to optimize external table scan parallelism by introducing dynamic file split sizing, adding new session variables to control the behavior, and updating external scan nodes and related tests to use the new splitter logic.

Changes:

  • Add session variables to control max initial split size, max split size, and the initial split-count threshold for dynamic sizing.
  • Refactor file splitting into a stateful FileSplitter and integrate it into Hive/Paimon/Iceberg/TVF scan paths.
  • Update/introduce unit + regression tests to reflect new split counts and backend assignment behavior.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy Updates expected explain output split counts after new split sizing behavior.
fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java Updates expected assignment totals impacted by scheduling changes.
fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java Adjusts unit test to account for new FileSplitter initialization and sizing variables.
fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java Adds new unit tests for split sizing and block-based splitting behavior.
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java Adds new session variables controlling dynamic split sizing parameters.
fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java Uses new dynamic target split-size selection for TVF scans.
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java Integrates FileSplitter and target split-size selection into Paimon native splitting.
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java Adds dynamic split sizing logic for Iceberg planning (batch vs non-batch behavior).
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java Integrates FileSplitter and per-scan target split-size selection (batch/non-batch).
fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java Changes isBatchMode() to no longer declare throws UserException.
fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java Refactors file splitting into an instance-based splitter with block-aware splitting and initial-split tracking.
fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java Simplifies explain-path batch-mode handling after isBatchMode() signature change.
fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java Adds fileSplitter instance initialization and removes old getRealFileSplitSize() logic.
fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java Alters split assignment ordering and redistribution conditions.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +164 to +179
@Test
public void testNonSplittableFlagDecrementsCounter() throws Exception {
LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz");
BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)};
FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2);
List<Split> splits = fileSplitter.splitFile(
loc,
0L,
locations,
10 * MB,
0L,
false,
Collections.emptyList(),
FileSplit.FileSplitCreator.DEFAULT);
Assert.assertEquals(1, splits.size());
}
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testNonSplittableFlagDecrementsCounter doesn't assert the counter decrement or any observable behavior related to the name; it currently only asserts splits.size() == 1, which is already covered by other tests. Either assert getRemainingInitialSplitNum() changes as intended or rename the test to match what it verifies.

Copilot uses AI. Check for mistakes.
Comment on lines 224 to 228
public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException {
ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();

Collections.shuffle(splits, new Random(FIXED_SHUFFLE_SEED));
List<Split> remainingSplits;
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeScanRangeAssignment's contract/Javadoc says splits should be in partition/path order to maximize OS page cache reuse, but the new Collections.shuffle(...) explicitly destroys that ordering. It also mutates the caller-provided splits list in place, which can be surprising if the caller reuses it later. If randomized ordering is still required, shuffle a copy (or gate it behind a config) and update the Javadoc accordingly; otherwise remove the shuffle to preserve the intended scheduling behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +461 to +473
// Materialize planFiles() into a list to avoid iterating the CloseableIterable twice.
// RISK: It will cost memory if the table is large.
List<FileScanTask> fileScanTaskList = new ArrayList<>();
try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) {
for (FileScanTask task : scanTasksIter) {
fileScanTaskList.add(task);
}
} catch (Exception e) {
throw new RuntimeException("Failed to materialize file scan tasks", e);
}

targetSplitSize = determineTargetFileSplitSize(fileScanTaskList);
return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize);
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Materializing scan.planFiles() into a List can cause significant memory pressure for large Iceberg tables (the code comment already calls this out). A safer approach is to compute the threshold by iterating one CloseableIterable and then call scan.planFiles() again for the actual TableScanUtil.splitFiles(...) pass (trading some extra planning work for bounded memory), or to use a cheaper heuristic that doesn't require buffering all tasks.

Suggested change
// Materialize planFiles() into a list to avoid iterating the CloseableIterable twice.
// RISK: It will cost memory if the table is large.
List<FileScanTask> fileScanTaskList = new ArrayList<>();
try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) {
for (FileScanTask task : scanTasksIter) {
fileScanTaskList.add(task);
}
} catch (Exception e) {
throw new RuntimeException("Failed to materialize file scan tasks", e);
}
targetSplitSize = determineTargetFileSplitSize(fileScanTaskList);
return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize);
// Iterate planFiles() once to determine the target split size, then plan again for splitting.
try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) {
targetSplitSize = determineTargetFileSplitSize(scanTasksIter);
} catch (Exception e) {
throw new RuntimeException("Failed to determine target file split size", e);
}
return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);

Copilot uses AI. Check for mistakes.
Comment on lines +99 to 117
// Ensure fileSplitter is initialized on the spy as doInitialize() is not called in this unit test
FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize, maxSplitSize,
0);
try {
java.lang.reflect.Field field = FileQueryScanNode.class.getDeclaredField("fileSplitter");
field.setAccessible(true);
field.set(spyPaimonScanNode, fileSplitter);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Failed to inject FileSplitter into PaimonScanNode test", e);
}

// Note: The original PaimonSource is sufficient for this test
// No need to mock catalog properties since doInitialize() is not called in this test
// Mock SessionVariable behavior
Mockito.when(sv.isForceJniScanner()).thenReturn(false);
Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE");
Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize);
Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize);

Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test injects fileSplitter via reflection, which is brittle (field renames/visibility changes will silently break the test) and bypasses the normal initialization path. Prefer calling the real initialization (doInitialize()), exposing a small test-only setter, or constructing the scan node in a way that initializes fileSplitter without reflection.

Suggested change
// Ensure fileSplitter is initialized on the spy as doInitialize() is not called in this unit test
FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize, maxSplitSize,
0);
try {
java.lang.reflect.Field field = FileQueryScanNode.class.getDeclaredField("fileSplitter");
field.setAccessible(true);
field.set(spyPaimonScanNode, fileSplitter);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Failed to inject FileSplitter into PaimonScanNode test", e);
}
// Note: The original PaimonSource is sufficient for this test
// No need to mock catalog properties since doInitialize() is not called in this test
// Mock SessionVariable behavior
Mockito.when(sv.isForceJniScanner()).thenReturn(false);
Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE");
Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize);
Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize);
// Mock SessionVariable behavior used during initialization and splitting
Mockito.when(sv.isForceJniScanner()).thenReturn(false);
Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE");
Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize);
Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize);
// Initialize the scan node through its normal initialization path
spyPaimonScanNode.doInitialize();

Copilot uses AI. Check for mistakes.
Comment on lines +70 to +80
public void setMaxSplitSize(long maxSplitSize) {
this.maxSplitSize = maxSplitSize;
}

public int maxInitialSplitNum() {
return maxInitialSplitNum;
}

public void setMaxInitialSplits(int maxInitialSplitNum) {
this.maxInitialSplitNum = maxInitialSplitNum;
}
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The accessor/mutator API here is internally inconsistent: maxInitialSplitNum() doesn't follow the getXxx naming used by the other getters, and setMaxInitialSplits()/setMaxSplitSize()/setMaxInitialSplitSize() would leave remainingInitialSplitNum and currentMaxSplitSize out of sync with the new configuration if they were ever called. Consider removing these setters and making the config fields final (preferred), or ensure setters also reset the derived state/counters.

Copilot uses AI. Check for mistakes.
Comment on lines +172 to +177
while (splitStart < start + length) {
updateCurrentMaxSplitSize();
long splitBytes;
long remainingBlockBytes = blocks.get(currentBlockIdx).getEnd() - splitStart;
if (remainingBlockBytes <= currentMaxSplitSize) {
splitBytes = remainingBlockBytes;
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSplitter keeps currentMaxSplitSize as a mutable instance field that is updated on each split. In batch mode, HiveScanNode.startSplit() generates splits concurrently across partitions, so splitFile() can be invoked by multiple threads on the same fileSplitter instance; this makes currentMaxSplitSize a data race and can produce incorrect split sizing. Make the chosen max split size a local variable (e.g., have updateCurrentMaxSplitSize() return the size) or otherwise synchronize/avoid sharing mutable state across threads.

Copilot uses AI. Check for mistakes.
name = MAX_FILE_SPLIT_SIZE,
description = {"对于每个 table scan,最大文件分片大小。"
+ "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了 MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。",
"For each table scan, the maximum initial file split size. "
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The English description for MAX_FILE_SPLIT_SIZE says "the maximum initial file split size", which is misleading (this variable represents the max split size after the initial phase). Please correct the English description to match the variable name/Chinese description.

Suggested change
"For each table scan, the maximum initial file split size. "
"For each table scan, the maximum file split size. "

Copilot uses AI. Check for mistakes.
Comment on lines +445 to +449
totalFileSize += rawFile.fileSize();
if (totalFileSize
>= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
result = sessionVariable.getMaxSplitSize();
break;
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum() can overflow long for large session settings, which may break the split-size selection logic. Consider a safe multiply (e.g., Math.multiplyExact with overflow handling) or a division-based comparison.

Copilot uses AI. Check for mistakes.
Comment on lines +451 to +457
return TableScanUtil.splitFiles(scan.planFiles(),
sessionVariable.getFileSplitSize());
}
if (isBatchMode()) {
// Currently iceberg batch split mode will use max split size.
// TODO: dynamic split size in batch split mode need to customize iceberg splitter.
return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize());
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the file_split_size > 0 and isBatchMode() branches, targetSplitSize is not set before creating splits, so createIcebergSplit() will call split.setTargetSplitSize(0). That makes split-weight computation degenerate (division by zero clamps to standard weight), which can harm scan range assignment. Set targetSplitSize to the actual split size used in these branches (user value or maxSplitSize) before returning.

Suggested change
return TableScanUtil.splitFiles(scan.planFiles(),
sessionVariable.getFileSplitSize());
}
if (isBatchMode()) {
// Currently iceberg batch split mode will use max split size.
// TODO: dynamic split size in batch split mode need to customize iceberg splitter.
return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize());
targetSplitSize = sessionVariable.getFileSplitSize();
return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
}
if (isBatchMode()) {
// Currently iceberg batch split mode will use max split size.
// TODO: dynamic split size in batch split mode need to customize iceberg splitter.
targetSplitSize = sessionVariable.getMaxSplitSize();
return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);

Copilot uses AI. Check for mistakes.
Comment on lines +479 to +483
for (FileScanTask task : tasks) {
accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file());
if (accumulatedTotalFileSize
>= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
result = sessionVariable.getMaxSplitSize();
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulatedTotalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum() can overflow long with large session variable values, potentially selecting the wrong split size. Please use a safe multiply / overflow check or rewrite the comparison to avoid multiplication overflow.

Suggested change
for (FileScanTask task : tasks) {
accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file());
if (accumulatedTotalFileSize
>= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
result = sessionVariable.getMaxSplitSize();
long maxSplitSize = sessionVariable.getMaxSplitSize();
long maxInitialSplitNum = sessionVariable.getMaxInitialSplitNum();
// Compute a safe threshold equivalent to maxSplitSize * maxInitialSplitNum,
// guarding against long overflow. If the product would overflow, cap it at Long.MAX_VALUE.
long threshold;
if (maxSplitSize <= 0 || maxInitialSplitNum <= 0) {
threshold = Long.MAX_VALUE;
} else if (maxSplitSize > Long.MAX_VALUE / maxInitialSplitNum) {
threshold = Long.MAX_VALUE;
} else {
threshold = maxSplitSize * maxInitialSplitNum;
}
for (FileScanTask task : tasks) {
long size = ScanTaskUtil.contentSizeInBytes(task.file());
if (size > 0 && Long.MAX_VALUE - accumulatedTotalFileSize < size) {
accumulatedTotalFileSize = Long.MAX_VALUE;
} else {
accumulatedTotalFileSize += size;
}
if (accumulatedTotalFileSize >= threshold) {
result = maxSplitSize;

Copilot uses AI. Check for mistakes.
@hello-stephen
Copy link
Contributor

FE UT Coverage Report

Increment line coverage 32.92% (80/243) 🎉
Increment coverage report
Complete coverage report

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Feb 10, 2026
@github-actions
Copy link
Contributor

PR approved by at least one committer and no changes requested.

@github-actions
Copy link
Contributor

PR approved by anyone and no changes requested.

@morningman morningman changed the title [opt](multi-catalog) Optimize file split size. branch-4.0: [opt](multi-catalog) Optimize file split size. Feb 11, 2026
@yiguolei yiguolei merged commit 962c85b into apache:branch-4.0 Feb 12, 2026
35 of 40 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved Indicates a PR has been approved by one committer. reviewed

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants