Enable parsing columns from file path for Broker Load (#1582)#1635
Enable parsing columns from file path for Broker Load (#1582)#1635imay merged 14 commits intoapache:masterfrom
Conversation
gensrc/thrift/PlanNodes.thrift
Outdated
| // total size of the file | ||
| 8: optional i64 file_size | ||
| // columns parsed from file path | ||
| 9: optional list<string> columns_from_path |
There was a problem hiding this comment.
It's better to record the slot offset for columns_from_path or name it num_of_columns_from_file. Because we may add other columns_from_xxx, we should make it definite.
And you should comment that columns_from_path is after the columns read from file.
be/src/exec/broker_scanner.cpp
Outdated
| str_slot->len = value.size; | ||
| } | ||
|
|
||
| inline void BrokerScanner::fill_slots_of_columns_from_path(int start, const std::vector<SlotDescriptor*>& src_slot_descs, Tuple* tuple) { |
There was a problem hiding this comment.
Why don't put this function to BaseScanner to avoid write this twice.
And if you have num_columns_from_file, this function don't need start param.
be/src/exec/parquet_reader.cpp
Outdated
| } else { | ||
| time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) * 24 * 60 * 60); | ||
| tm* local; | ||
| local = localtime(×tamp); |
There was a problem hiding this comment.
use localtime_r which is thread-safe
03e6231 to
30e3f0a
Compare
2729227 to
abc553c
Compare
| _parquet_column_ids.clear(); | ||
| for (auto slot_desc : tuple_slot_descs) { | ||
| for (int i = 0; i < _num_of_columns_from_file; i++) { | ||
| auto slot_desc = tuple_slot_descs.at(i); |
There was a problem hiding this comment.
| auto slot_desc = tuple_slot_descs.at(i); | |
| auto slot_desc = tuple_slot_descs[i]; |
| } | ||
| RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); | ||
| // range of current file | ||
| const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); |
There was a problem hiding this comment.
| const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); | |
| const TBrokerRangeDesc& range = _ranges[_next_range - 1]; |
| } | ||
| } | ||
| // columnsFromPath | ||
| if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_59) { |
There was a problem hiding this comment.
I think there is no need to persist columnsFromPath. Because Load have already persist all Load statement, and will generate DataDescriptor when restart.
| if (dataDescription.getColumnNames() != null) { | ||
| assignColumnNames.addAll(dataDescription.getColumnNames()); | ||
| } | ||
| if (dataDescription.getColumnsFromPath() != null) { |
There was a problem hiding this comment.
If user specify columnFromPath without columnList,
I think we should return user's error message in LoadStatement analyze function.
For here, we should check dataDescription.getColumnsFromPath() in if (dataDescription.getColumnNames() != null) block
| fail(); | ||
| } | ||
|
|
||
| path = "/path/to/dir/k2==v2=//k1=v1//xxx.csv"; |
There was a problem hiding this comment.
I think should add directory test case like '/path/to/dir/k2==v2=//k1=v1/' which should return false
be/src/exec/base_scanner.cpp
Outdated
| } | ||
|
|
||
| void BaseScanner::fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path) { | ||
| if (start <= 0) { |
There was a problem hiding this comment.
I think this check is useless
There was a problem hiding this comment.
I think this check is useless
But we should skip the case of StreamLoadTask
c12e485 to
067b4c5
Compare
…apache#1656) Author: platoneko <platonekosama@gmail.com> Date: Wed Apr 12 12:24:15 2023 +0800 Use snapshot read to get tablet stats Author: plat1ko <platonekosama@gmail.com> Date: Tue Apr 11 16:02:51 2023 +0800 [selectdb-cloud] Fix incorrect tablet stats in finish tablet job (apache#1635) Author: plat1ko <platonekosama@gmail.com> Date: Fri Apr 7 23:13:06 2023 +0800 [feature](selectdb-cloud) Split tablet stats kv to reduce transaction conflicts (apache#1585) * Split tablet stats to reduce transaction conflicts * Fix mem txn kv and add ut * Add ut for atomic
Currently, we do not support parsing encoded/compressed columns in file path, eg: extract column k1 from file path /path/to/dir/k1=1/xxx.csv
This patch is able to parse columns from file path like in Spark(Partition Discovery).
This patch parse partition columns at BrokerScanNode.java and save parsing result of each file path as a property of TBrokerRangeDesc, then the broker reader of BE can read the value of specified partition column.
(I'm sorry to create a new pr about this issue for being not familiar with
git rebase)