Skip to content

apache/hudi-rs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

262 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Hudi logo

The native Rust implementation for Apache Hudi, with C++ & Python API bindings.

hudi-rs ci hudi-rs codecov join hudi slack follow hudi x/twitter follow hudi linkedin

The Hudi-rs project aims to standardize the core Apache Hudi APIs, and broaden the Hudi integration in the data ecosystems for a diverse range of users and projects.

Source Downloads Installation Command
PyPi.org Image pip install hudi
Crates.io Image cargo add hudi

Usage Examples

Note

These examples expect a Hudi table exists at /tmp/trips_table, created using the quick start guide.

For the full reader API reference (ReadOptions, filter expressions, behavioral guarantees), see docs/reader-spec.md.

Snapshot Query

Snapshot query reads the latest version of the data from the table. The table API also accepts column filters that drive partition + file pruning and row-level filtering.

Python

from hudi import HudiReadOptions, HudiTableBuilder
import pyarrow as pa

hudi_table = HudiTableBuilder.from_base_uri("/tmp/trips_table").build()
batches = hudi_table.read(
    HudiReadOptions(filters=[("city", "=", "san_francisco")])
)

# convert to PyArrow table
arrow_table = pa.Table.from_batches(batches)
result = arrow_table.select(["rider", "city", "ts", "fare"])
print(result)

Rust

use hudi::error::Result;
use hudi::table::ReadOptions;
use hudi::table::builder::TableBuilder as HudiTableBuilder;
use arrow::compute::concat_batches;

#[tokio::main]
async fn main() -> Result<()> {
    let hudi_table = HudiTableBuilder::from_base_uri("/tmp/trips_table").build().await?;
    let options = ReadOptions::new().with_filters([("city", "=", "san_francisco")])?;
    let batches = hudi_table.read(&options).await?;
    let batch = concat_batches(&batches[0].schema(), &batches)?;
    let columns = vec!["rider", "city", "ts", "fare"];
    for col_name in columns {
        let idx = batch.schema().index_of(col_name).unwrap();
        println!("{}: {}", col_name, batch.column(idx));
    }
    Ok(())
}

To run read-optimized (RO) query on Merge-on-Read (MOR) tables, set hoodie.read.use.read_optimized.mode in ReadOptions.

Python

from hudi import HudiReadOptions

batches = hudi_table.read(
    HudiReadOptions(hudi_options={"hoodie.read.use.read_optimized.mode": "true"})
)

Rust

let options = ReadOptions::new()
    .with_hudi_option("hoodie.read.use.read_optimized.mode", "true");
let batches = hudi_table.read(&options).await?;

Time-Travel Query

Time-travel query reads the data at a specific timestamp from the table. The table API also accepts column filters that drive partition + file pruning and row-level filtering.

Python

batches = hudi_table.read(
    HudiReadOptions(filters=[("city", "=", "san_francisco")])
    .with_as_of_timestamp("20241231123456789")
)

Rust

let options = ReadOptions::new()
    .with_as_of_timestamp("20241231123456789")
    .with_filters([("city", "=", "san_francisco")])?;
let batches = hudi_table.read(&options).await?;
Supported timestamp formats

The supported formats for the timestamp argument are:

  • Hudi Timeline format (highest matching precedence): yyyyMMddHHmmssSSS or yyyyMMddHHmmss.
  • Unix epoch time in seconds, milliseconds, microseconds, or nanoseconds.
  • RFC 3339 / ISO 8601 with timezone offset, including:
    • yyyy-MM-dd'T'HH:mm:ss.SSS+00:00
    • yyyy-MM-dd'T'HH:mm:ss.SSSZ
    • yyyy-MM-dd'T'HH:mm:ss+00:00
    • yyyy-MM-dd'T'HH:mm:ssZ

Timestamp strings without a timezone offset (for example yyyy-MM-dd'T'HH:mm:ss) and date-only strings (for example yyyy-MM-dd) are not accepted.

Incremental Query

Incremental query reads the changed data from the table for a given time range.

Python

from hudi import HudiQueryType

# read the records between t1 (exclusive) and t2 (inclusive)
batches = hudi_table.read(
    HudiReadOptions()
    .with_query_type(HudiQueryType.Incremental)
    .with_start_timestamp(t1)
    .with_end_timestamp(t2)
)

# read the records after t1 (end defaults to the latest commit)
batches = hudi_table.read(
    HudiReadOptions()
    .with_query_type(HudiQueryType.Incremental)
    .with_start_timestamp(t1)
)

# with column filters applied to the changed records
batches = hudi_table.read(
    HudiReadOptions(filters=[("city", "=", "san_francisco")])
    .with_query_type(HudiQueryType.Incremental)
    .with_start_timestamp(t1)
    .with_end_timestamp(t2)
)

Rust

use hudi::table::QueryType;

// read the records between t1 (exclusive) and t2 (inclusive)
let options = ReadOptions::new()
    .with_query_type(QueryType::Incremental)
    .with_start_timestamp(t1)
    .with_end_timestamp(t2);
let batches = hudi_table.read(&options).await?;

// read the records after t1 (end defaults to the latest commit)
let options = ReadOptions::new()
    .with_query_type(QueryType::Incremental)
    .with_start_timestamp(t1);
let batches = hudi_table.read(&options).await?;

// with column filters applied to the changed records
let options = ReadOptions::new()
    .with_query_type(QueryType::Incremental)
    .with_start_timestamp(t1)
    .with_end_timestamp(t2)
    .with_filters([("city", "=", "san_francisco")])?;
let batches = hudi_table.read(&options).await?;

Incremental queries support the same timestamp formats as time-travel queries.

Streaming Read

Streaming reads yield RecordBatches one at a time without loading the full result into memory. The same ReadOptions knobs apply, plus batch_size and projection.

Python

options = (
    HudiReadOptions(
        filters=[("city", "=", "san_francisco")],
        projection=["rider", "city", "ts", "fare"],
    )
    .with_batch_size(4096)
)
for batch in hudi_table.read_stream(options):
    print(batch.num_rows)

Rust

use futures::StreamExt;

let options = ReadOptions::new()
    .with_filters([("city", "=", "san_francisco")])?
    .with_projection(["rider", "city", "ts", "fare"])
    .with_batch_size(4096)?;
let mut stream = hudi_table.read_stream(&options).await?;
while let Some(batch) = stream.next().await {
    let batch = batch?;
    println!("{}", batch.num_rows());
}

File Group Reading (Experimental)

File group reading allows you to read data from a specific file slice. This is useful when integrating with query engines, where the plan provides file paths.

Python

from hudi import HudiFileGroupReader

reader = HudiFileGroupReader(
    "/table/base/path", {"hoodie.read.start.timestamp": "0"})

# Returns a PyArrow RecordBatch
record_batch = reader.read_file_slice_from_paths("relative/path.parquet", [])

Rust

use hudi::file_group::reader::FileGroupReader;
use hudi::table::ReadOptions;

// Inside an async context
let reader = FileGroupReader::new_with_options(
    "/table/base/path", [("hoodie.read.start.timestamp", "0")]).await?;

// Returns an Arrow RecordBatch
let record_batch = reader
    .read_file_slice_from_paths(
        "relative/path.parquet",
        Vec::<&str>::new(),
        &ReadOptions::new(),
    )
    .await?;

C++

#include "cxx.h"
#include "src/lib.rs.h"
#include "arrow/c/abi.h"

// Functions may throw rust::Error on failure
auto reader = new_file_group_reader_with_options(
    "/table/base/path", {"hoodie.read.start.timestamp=0"});

// Returns an ArrowArrayStream pointer
std::vector<std::string> log_file_paths{};
ArrowArrayStream* stream_ptr = reader->read_file_slice_from_paths("relative/path.parquet", log_file_paths);

Query Engine Integration

Hudi-rs provides APIs to support integration with query engines. The sections below highlight some commonly used APIs.

Table API

Create a Hudi table instance using its constructor or the TableBuilder API.

All read APIs accept a ReadOptions (Rust) / HudiReadOptions (Python) value. It stores three fields — filters, projection, and hudi_options — and exposes chainable with_* builders for the rest. The available knobs:

  • query_type (with_query_type) — Snapshot (default) or Incremental. Drives dispatch in read, read_stream, and get_file_slices.
  • filters — column filters as (field, op, value) tuples. The field can be any column (partition or data). Used for partition pruning, file-level stats pruning (snapshot only), and row-level filtering.
  • projection — columns to return. Streaming pushes the projection down to the parquet reader; eager reads project after merging.
  • batch_size (with_batch_size) — rows per batch (streaming only; eager reads return one batch per file slice).
  • as_of_timestamp (with_as_of_timestamp) — snapshot/time-travel timestamp (defaults to latest commit).
  • start_timestamp / end_timestamp (with_start_timestamp / with_end_timestamp) — incremental range (defaults to earliest…latest).
  • hudi_options — per-read Hudi configs (e.g. hoodie.read.use.read_optimized.mode). Read configs are not stored in the table; they flow exclusively through ReadOptions.
Stage API Description
Query planning get_file_slices(options) Get the file slices the read targets, dispatched on options.query_type. To bucket for parallel reads, call hudi::util::collection::split_into_chunks on the result.
compute_table_stats(options) Estimated (num_rows, byte_size) for scan planning. Snapshot (default) uses the metadata table; incremental aggregates from changed file slices. Returns None when stats cannot be computed.
Query execution create_file_group_reader_with_options(read_options, extra_storage_overrides) Create a file group reader with the table's configs. Both args are optional. Timestamps are resolved automatically (e.g. AsOfTimestampEndTimestamp), so callers can pass the same options used for get_file_slices.
read(options) / read_stream(options) Record-read APIs. Dispatch on options.query_type. read_stream errors on Incremental for now. Per-slice streaming lives on FileGroupReader.

File Group API

Create a Hudi file group reader instance using its constructor or the Hudi table API create_file_group_reader_with_options().

Stage API Description
Query execution read_file_slice() Read records from a given file slice; based on the configs, read records from only base file, or from base file and log files, and merge records based on the configured strategy.
read_file_slice_from_paths() Read records from an explicit base file path and a list of log file paths. Pass an empty log path list to read just the base file.
read_file_slice_stream() Streaming version of read_file_slice(). Yields true streaming batches when the slice is base-file-only or read-optimized; for MOR slices with log files, falls back to a single merged batch.
read_file_slice_from_paths_stream() Streaming version of read_file_slice_from_paths().

Apache DataFusion

Enabling the hudi crate with datafusion feature will provide a DataFusion extension to query Hudi tables.

Add crate hudi with datafusion feature to your application to query a Hudi table.
cargo new my_project --bin && cd my_project
cargo add tokio@1 datafusion@52
cargo add hudi --features datafusion

Update src/main.rs with the code snippet below then cargo run.

Add python hudi with datafusion feature to your application to query a Hudi table.
pip install hudi[datafusion]

Rust

use std::sync::Arc;

use datafusion::error::Result;
use datafusion::prelude::{DataFrame, SessionContext};
use hudi::HudiDataSource;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    let hudi = HudiDataSource::new_with_options(
        "/tmp/trips_table",
        [("hoodie.read.input.partitions", "5")]).await?;
    ctx.register_table("trips_table", Arc::new(hudi))?;
    let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 'san_francisco'").await?;
    df.show().await?;
    Ok(())
}

Python

from datafusion import SessionContext
from hudi import HudiDataFusionDataSource

table = HudiDataFusionDataSource(
    "/tmp/trips_table", [("hoodie.read.input.partitions", "5")]
)
ctx = SessionContext()
ctx.register_table_provider("trips", table)
ctx.sql("SELECT max(fare), city from trips group by city order by 1 desc").show()

Other Integrations

Hudi is also integrated with

Work with cloud storage

Ensure cloud storage credentials are set properly as environment variables, e.g., AWS_*, AZURE_*, or GOOGLE_*. Relevant storage environment variables will then be picked up. The target table's base uri with schemes such as s3://, az://, or gs:// will be processed accordingly.

Alternatively, you can pass the storage configuration as options via Table APIs.

Python

from hudi import HudiTableBuilder

hudi_table = (
    HudiTableBuilder
    .from_base_uri("s3://bucket/trips_table")
    .with_option("aws_region", "us-west-2")
    .build()
)

Rust

use hudi::error::Result;
use hudi::table::builder::TableBuilder as HudiTableBuilder;

#[tokio::main]
async fn main() -> Result<()> {
    let hudi_table = HudiTableBuilder::from_base_uri("s3://bucket/trips_table")
        .with_option("aws_region", "us-west-2")
        .build().await?;
    Ok(())
}

Contributing

Check out the contributing guide for all the details about making contributions to the project.

About

The native Rust implementation for Apache Hudi, with C++ & Python API bindings.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors