Skip to content

[C++][Parquet] Slow column reading from multi-column parquet files #38149

@marcin-krystianc

Description

@marcin-krystianc

Describe the bug, including details regarding any error messages, version, and platform.

Describe the bug, including details regarding any error messages, version, and platform.

Hi,

this is related to #38087 but it covers a different problem.
Similar to the previous issue, in our use case, we read some columns (e.g. 100) from a parquet file containing many more columns (e.g. 20k).

The problem is that the more columns are in the file to more time is needed to read a particular column (The repro code: https://github.com/marcin-krystianc/arrow_issue_2023-10-06).

In the graph below(Produced with https://github.com/marcin-krystianc/arrow_issue_2023-10-06/blob/master/plot_results.py), we can clearly see that when we read 100 columns from a parquet file (the orange line), the more columns are in the file the longer it takes to read a single column.
However, when we read the entire file (all columns), then the time to read a single column doesn't depend too much on the number of columns in the file. There is still some correlation but it is much weaker than before.
Screenshot 2023-10-06 143748
Screenshot 2023-10-06 151957

Both Python and C++ exhibit the same problem, but it is not a surprise since Python delegates the Parquet file reading to C++ anyway.

According to my analysis, there is a simple explanation for the reported problem. Namely, when we create a FileReader class, it reads and parses the entire metadata section from the file. Since the metadata section contains information about all columns, it means a lot of that metadata reading and parsing is wasted work in case we read only a tiny fraction of columns from the file.

Python code:
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

import time
import polars as pl
import csv
import gc

t_write = []
t_read_100_pre_buffer = []

path = "/tmp/test_wide.parquet"

columns_list = [
                100, 200, 300, 400, 500, 600, 700, 800, 900,              
                1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
                10_000, 20_000, 30_000, 40_000, 50_000, 
                ]

chunks_list = [1000, 10_000]
rows_lsit = [5000]
with open('results_python.csv', 'w', encoding='UTF8', newline='') as f:

    writer = csv.writer(f)
    # write the header
    writer.writerow(['columns','rows','chunk_size','writing(μs)','reading_all(μs)','reading_100(μs)'])

    for chunk_size in chunks_list:
        for rows in rows_lsit:
            for columns in columns_list:
      
                table = pl.DataFrame(
                    data=np.random.randn(rows, columns),
                    schema=[f"c{i}" for i in range(columns)]).to_arrow()

                t = time.time()
                pq.write_table(table, path, row_group_size=chunk_size, use_dictionary=False, write_statistics=False)
                t_writing = time.time() - t
                t_write.append(t_writing)

                del table
                gc.collect()

                t_read = []
                t_read_100 = []

                for i in range(0, 3):

                    t = time.time()
                    res = pq.read_table(path, use_threads=False)
                    t_read.append(time.time() - t)
        
                    del res 
                    gc.collect()

                    t = time.time()               
                    res_100 = pq.read_table(path, columns=[f"c{i}" for i in range(100)], use_threads=False)
                    t_read_100.append(time.time() - t)    
                
                    del res_100
                    gc.collect()

                t_reading = min(t_read)
                t_reading_100 = min(t_read_100)

                data = [columns, rows, chunk_size, t_writing * 1_000_000, t_reading * 1_000_000, t_reading_100 * 1_000_000]
                writer.writerow(data)
                print(str(data))
C++ code:
#include "arrow/api.h"
#include "arrow/io/api.h"
#include "arrow/result.h"
#include "arrow/util/type_fwd.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"

#include <iostream>
#include <list>
#include <chrono>
#include <random>
#include <vector>
#include <fstream>
#include <iomanip>

using arrow::Status;

namespace
{
  const char *FILE_NAME = "/tmp/my_cpp.parquet";

  std::shared_ptr<arrow::Table> GetTable(size_t nColumns, size_t nRows)
  {
    std::random_device dev;
    std::mt19937 rng(dev());
    std::uniform_real_distribution<> rand_gen(0.0, 1.0);

    std::vector<std::shared_ptr<arrow::Array>> arrays;
    std::vector<std::shared_ptr<arrow::Field>> fields;

    // For simplicity, we'll create int32 columns. You can expand this to handle other types.
    for (int i = 0; i < nColumns; i++)
    {
      arrow::DoubleBuilder builder;
      for (auto j = 0; j < nRows; j++)
      {
        if (!builder.Append(rand_gen(rng)).ok())
          throw std::runtime_error("builder.Append");
      }

      std::shared_ptr<arrow::Array> array;
      if (!builder.Finish(&array).ok())
        throw std::runtime_error("builder.Finish");

      arrays.push_back(array);
      fields.push_back(arrow::field("c_" + std::to_string(i), arrow::float64(), false));
    }

    auto table = arrow::Table::Make(arrow::schema(fields), arrays);
    return table;
  }

  Status WriteTableToParquet(size_t nColumns, size_t nRows, const std::string &filename, std::chrono::microseconds *dt, int64_t chunkSize)
  {
    auto table = GetTable(nColumns, nRows);
    auto begin = std::chrono::steady_clock::now();
    auto result = arrow::io::FileOutputStream::Open(filename);
    auto outfile = result.ValueOrDie();
    parquet::WriterProperties::Builder builder;
    auto properties = builder
                          .max_row_group_length(chunkSize)
                          ->disable_dictionary()
                          ->build();
    PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, chunkSize, properties));
    auto end = std::chrono::steady_clock::now();
    *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
    return Status::OK();
  }

  Status ReadEntireTable(const std::string &filename, std::chrono::microseconds *dt)
  {
    auto begin = std::chrono::steady_clock::now();

    std::shared_ptr<arrow::io::ReadableFile> infile;
    ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
    std::unique_ptr<parquet::arrow::FileReader> reader;
    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
    std::shared_ptr<arrow::Table> parquet_table;

    // Read the table.
    PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table));
    auto end = std::chrono::steady_clock::now();
    *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
    return Status::OK();
  }

  Status ReadColumnsAsTable(const std::string &filename, std::vector<int> indicies, std::chrono::microseconds *dt)
  {
    auto begin = std::chrono::steady_clock::now();

    std::shared_ptr<arrow::io::ReadableFile> infile;
    ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(filename));
    std::unique_ptr<parquet::arrow::FileReader> reader;
    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));

    // Read the table.
    std::shared_ptr<arrow::Table> parquet_table;
    PARQUET_THROW_NOT_OK(reader->ReadTable(indicies, &parquet_table));
    auto end = std::chrono::steady_clock::now();
    *dt = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
    return Status::OK();
  }

  Status RunMain(int argc, char **argv)
  {
    std::ofstream csvFile;
    csvFile.open("results_cpp.csv", std::ios_base::out); // append instead of overwrite
    csvFile << "columns, rows, chunk_size, writing(μs), reading_all(μs), reading_100(μs)" << std::endl;

    std::list<int> nColumns = {
        100, 200, 300, 400, 500, 600, 700, 800, 900,
        1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000,
        10000, 20000, 30000, 40000, 50000};

    std::list<int64_t> chunk_sizes = {1000, 100000};
    std::list<int> rows_list = {5000};

    std::vector<int> indicies(100);
    std::iota(indicies.begin(), indicies.end(), 0);

    for (auto chunk_size : chunk_sizes)
    {
      for (int nRow : rows_list)
      {
        for (int nColumn : nColumns)
        {
          std::chrono::microseconds writing_dt;
          ARROW_RETURN_NOT_OK(WriteTableToParquet(nColumn, nRow, FILE_NAME, &writing_dt, chunk_size));

          const int repeats = 3;
          std::vector<std::chrono::microseconds> reading_all_dts(repeats);
          std::vector<std::chrono::microseconds> reading_100_dts(repeats);
          for (int i = 0; i < repeats; i++)
          {
            ARROW_RETURN_NOT_OK(ReadEntireTable(FILE_NAME, &reading_all_dts[i]));
            ARROW_RETURN_NOT_OK(ReadColumnsAsTable(FILE_NAME, indicies, &reading_100_dts[i]));
          }

          auto reading_all_dt = *std::min_element(reading_all_dts.begin(), reading_all_dts.end());
          auto reading_100_dt = *std::min_element(reading_100_dts.begin(), reading_100_dts.end());

          std::cerr << "(" << nColumn << ", " << nRow << ")"
                    << ", chunk_size=" << chunk_size
                    << ", writing_dt=" << writing_dt.count() / nColumn
                    << ", reading_all_dt=" << reading_all_dt.count() / nColumn
                    << ", reading_100_dt=" << reading_100_dt.count() / 100
                    << std::endl;

          csvFile << nColumn << ","
                  << nRow << ","
                  << chunk_size << ","
                  << writing_dt.count() << ","
                  << reading_all_dt.count() << ","
                  << reading_100_dt.count()
                  << std::endl;
        }
      }
    }

    return Status::OK();
  }
}

int main(int argc, char **argv)
{
  Status st = RunMain(argc, argv);
  if (!st.ok())
  {
    std::cerr << st << std::endl;
    return 1;
  }
  return 0;
}

Component(s)

C++, Parquet

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions