Pipeline Optimization - LimitOptimizer#730
Conversation
d313559 to
11aa9c0
Compare
Flow PHP - BenchmarksResults of the benchmarks from this PR are compared with the results from 1.x branch. Extractors+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
| AvroExtractorBench | bench_extract_10k | 1 | 3 | 34.889mb +0.95% | 440.442ms +1.75% | ±0.81% +245.55% |
| CSVExtractorBench | bench_extract_10k | 1 | 3 | 4.773mb +0.35% | 347.942ms +0.72% | ±0.31% +95.15% |
| JsonExtractorBench | bench_extract_10k | 1 | 3 | 4.921mb +0.33% | 694.870ms -0.76% | ±0.15% -50.43% |
| ParquetExtractorBench | bench_extract_10k | 1 | 3 | 233.480mb +0.01% | 993.406ms -2.37% | ±0.36% +1121.74% |
| TextExtractorBench | bench_extract_10k | 1 | 3 | 4.767mb +0.35% | 23.161ms +1.06% | ±0.31% -88.59% |
| XmlExtractorBench | bench_extract_10k | 1 | 3 | 4.767mb +0.35% | 555.245ms -0.48% | ±0.15% -48.03% |
+-----------------------+-------------------+------+-----+------------------+------------------+------------------+
Transformers+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
| RenameEntryTransformerBench | bench_transform_10k_rows | 1 | 3 | 87.034mb +0.02% | 66.939ms -2.76% | ±0.85% +7.18% |
+-----------------------------+--------------------------+------+-----+-----------------+-----------------+---------------+
Loaders+--------------------+----------------+------+-----+------------------+------------------+-----------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+--------------------+----------------+------+-----+------------------+------------------+-----------------+
| AvroLoaderBench | bench_load_10k | 1 | 3 | 93.274mb +0.02% | 695.975ms -1.25% | ±2.51% +133.35% |
| CSVLoaderBench | bench_load_10k | 1 | 3 | 46.050mb +0.04% | 69.665ms +0.70% | ±0.34% -70.21% |
| JsonLoaderBench | bench_load_10k | 1 | 3 | 88.552mb +0.02% | 76.827ms -1.88% | ±0.30% -78.24% |
| ParquetLoaderBench | bench_load_10k | 1 | 3 | 283.992mb +0.01% | 1.542s +1.03% | ±0.61% +140.28% |
| TextLoaderBench | bench_load_10k | 1 | 3 | 16.544mb +0.10% | 37.464ms +1.12% | ±1.69% +238.31% |
+--------------------+----------------+------+-----+------------------+------------------+-----------------+
Building Blocks+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
| benchmark | subject | revs | its | mem_peak | mode | rstdev |
+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
| RowsBench | bench_chunk_10_on_10k | 2 | 3 | 60.658mb +0.00% | 4.136ms +1.70% | ±2.42% +194.22% |
| RowsBench | bench_diff_left_1k_on_10k | 2 | 3 | 80.450mb +0.00% | 180.197ms +0.73% | ±1.01% -34.97% |
| RowsBench | bench_diff_right_1k_on_10k | 2 | 3 | 58.976mb +0.00% | 17.874ms -0.39% | ±0.22% -71.13% |
| RowsBench | bench_drop_1k_on_10k | 2 | 3 | 59.797mb +0.00% | 2.871ms -2.14% | ±1.27% +824.56% |
| RowsBench | bench_drop_right_1k_on_10k | 2 | 3 | 59.797mb +0.00% | 3.003ms +2.09% | ±2.16% +540.12% |
| RowsBench | bench_entries_on_10k | 2 | 3 | 59.010mb +0.00% | 3.975ms -6.86% | ±2.45% -30.82% |
| RowsBench | bench_filter_on_10k | 2 | 3 | 59.539mb +0.00% | 23.573ms -0.44% | ±0.93% -48.02% |
| RowsBench | bench_find_on_10k | 2 | 3 | 59.539mb +0.00% | 23.304ms -1.34% | ±0.50% -37.69% |
| RowsBench | bench_find_one_on_10k | 10 | 3 | 57.610mb +0.00% | 2.300μs -4.17% | ±3.55% +4.35% |
| RowsBench | bench_first_on_10k | 10 | 3 | 57.610mb +0.00% | 0.500μs 0.00% | ±0.00% 0.00% |
| RowsBench | bench_flat_map_on_1k | 2 | 3 | 65.843mb +0.00% | 13.634ms +0.79% | ±0.96% -47.03% |
| RowsBench | bench_map_on_10k | 2 | 3 | 91.363mb +0.00% | 60.981ms -3.16% | ±0.28% -54.04% |
| RowsBench | bench_merge_1k_on_10k | 2 | 3 | 60.060mb +0.00% | 3.272ms +4.51% | ±3.24% +834.83% |
| RowsBench | bench_partition_by_on_10k | 2 | 3 | 62.330mb +0.00% | 47.788ms -4.41% | ±0.19% -82.97% |
| RowsBench | bench_remove_on_10k | 2 | 3 | 62.160mb +0.00% | 7.675ms -1.75% | ±0.70% -43.97% |
| RowsBench | bench_sort_asc_on_1k | 2 | 3 | 57.610mb +0.00% | 50.515ms +0.43% | ±0.80% -32.85% |
| RowsBench | bench_sort_by_on_1k | 2 | 3 | 57.610mb +0.00% | 50.659ms +1.01% | ±0.28% +336.47% |
| RowsBench | bench_sort_desc_on_1k | 2 | 3 | 57.610mb +0.00% | 50.661ms +0.24% | ±0.08% -83.89% |
| RowsBench | bench_sort_entries_on_1k | 2 | 3 | 59.884mb +0.00% | 9.309ms -0.27% | ±0.62% -17.15% |
| RowsBench | bench_sort_on_1k | 2 | 3 | 57.609mb +0.00% | 37.583ms +0.48% | ±1.88% +2554.21% |
| RowsBench | bench_take_1k_on_10k | 10 | 3 | 57.610mb +0.00% | 21.000μs -1.30% | ±0.39% -56.36% |
| RowsBench | bench_take_right_1k_on_10k | 10 | 3 | 57.610mb +0.00% | 26.230μs +0.14% | ±0.89% +396.21% |
| RowsBench | bench_unique_on_1k | 2 | 3 | 80.451mb +0.00% | 178.852ms -1.06% | ±1.11% +33.45% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 91.718mb +0.01% | 146.632ms -0.97% | ±0.16% -72.76% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 47.599mb -0.02% | 74.443ms -0.66% | ±0.78% -19.34% |
| NativeEntryFactoryBench | bench_entry_factory | 1 | 3 | 12.391mb -0.04% | 17.717ms +0.73% | ±0.95% +237.42% |
+-------------------------+----------------------------+------+-----+-----------------+------------------+------------------+
|
| public function source(Extractor $extractor) : self; | ||
| public function setSource(Extractor $extractor) : self; | ||
|
|
||
| public function source() : Extractor; |
There was a problem hiding this comment.
I had to expose the pipeline data source in order to be able to access it at the Optimizer level.
| use Flow\ETL\Pipeline\Pipes; | ||
|
|
||
| final class LogicalPlan | ||
| final class ExecutionPlan |
There was a problem hiding this comment.
Technically speaking, looking at the current implementation, it's more a Physical than a Logical plan, but for now, lets call it an Execution Plan which IMO it's a most self-descriptive name for what this class is really responsible for.
|
Technically speaking, it should be possible to move the current Execution Plan processing logic into Optimizer, however, for now I would like to keep both mechanisms. There is a very good chance that Execution Plan processor will evolve into |
Change Log
Added
Fixed
Changed
Removed
Deprecated
Security
Closes: #715
Description
I was first trying to extend ExecutionPlan processors, however achieving what was needed without access to the Pipeline itself was extremely complicated, I ended up adding Locks to Transformers in order to avoid removing them from the pipeline in certain situations.
After a few attempts, I started once again from scratch, this time focusing on optimizing the pipeline not before execution but when the element is supposed to be added to the pipeline.
That's how I ended up creating a new mechanism in the ETL called
Optimizer.The role of the optimizer is to analyze the existing pipeline/extractor and decide what's the most optimal way of adding a given element to the pipeline.
Currently, there is only one Optimization that can be applied,
LimitOptimizationbut we can easily expand this mechanism in order to:It's best to show it on example:
Without Optimizations total time of reading 10 rows from 1mln rows parquet file was something around 11 sec, after optimization is applied, that time drops below 1sec.