The native Rust implementation for Apache Hudi, with C++ & Python API bindings.
The Hudi-rs project aims to standardize the core Apache Hudi APIs, and broaden the Hudi integration in the data ecosystems for a diverse range of users and projects.
| Source | Downloads | Installation Command |
|---|---|---|
| PyPi.org | pip install hudi |
|
| Crates.io | cargo add hudi |
Note
These examples expect a Hudi table exists at /tmp/trips_table, created using
the quick start guide.
For the full reader API reference (ReadOptions, filter expressions, behavioral guarantees), see docs/reader-spec.md.
Snapshot query reads the latest version of the data from the table. The table API also accepts column filters that drive partition + file pruning and row-level filtering.
from hudi import HudiReadOptions, HudiTableBuilder
import pyarrow as pa
hudi_table = HudiTableBuilder.from_base_uri("/tmp/trips_table").build()
batches = hudi_table.read(
HudiReadOptions(filters=[("city", "=", "san_francisco")])
)
# convert to PyArrow table
arrow_table = pa.Table.from_batches(batches)
result = arrow_table.select(["rider", "city", "ts", "fare"])
print(result)use hudi::error::Result;
use hudi::table::ReadOptions;
use hudi::table::builder::TableBuilder as HudiTableBuilder;
use arrow::compute::concat_batches;
#[tokio::main]
async fn main() -> Result<()> {
let hudi_table = HudiTableBuilder::from_base_uri("/tmp/trips_table").build().await?;
let options = ReadOptions::new().with_filters([("city", "=", "san_francisco")])?;
let batches = hudi_table.read(&options).await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let columns = vec!["rider", "city", "ts", "fare"];
for col_name in columns {
let idx = batch.schema().index_of(col_name).unwrap();
println!("{}: {}", col_name, batch.column(idx));
}
Ok(())
}To run read-optimized (RO) query on Merge-on-Read (MOR) tables, set hoodie.read.use.read_optimized.mode in ReadOptions.
from hudi import HudiReadOptions
batches = hudi_table.read(
HudiReadOptions(hudi_options={"hoodie.read.use.read_optimized.mode": "true"})
)let options = ReadOptions::new()
.with_hudi_option("hoodie.read.use.read_optimized.mode", "true");
let batches = hudi_table.read(&options).await?;Time-travel query reads the data at a specific timestamp from the table. The table API also accepts column filters that drive partition + file pruning and row-level filtering.
batches = hudi_table.read(
HudiReadOptions(filters=[("city", "=", "san_francisco")])
.with_as_of_timestamp("20241231123456789")
)let options = ReadOptions::new()
.with_as_of_timestamp("20241231123456789")
.with_filters([("city", "=", "san_francisco")])?;
let batches = hudi_table.read(&options).await?;Supported timestamp formats
The supported formats for the timestamp argument are:
- Hudi Timeline format (highest matching precedence):
yyyyMMddHHmmssSSSoryyyyMMddHHmmss. - Unix epoch time in seconds, milliseconds, microseconds, or nanoseconds.
- RFC 3339 / ISO 8601 with timezone offset, including:
yyyy-MM-dd'T'HH:mm:ss.SSS+00:00yyyy-MM-dd'T'HH:mm:ss.SSSZyyyy-MM-dd'T'HH:mm:ss+00:00yyyy-MM-dd'T'HH:mm:ssZ
Timestamp strings without a timezone offset (for example yyyy-MM-dd'T'HH:mm:ss) and date-only strings (for example yyyy-MM-dd) are not accepted.
Incremental query reads the changed data from the table for a given time range.
from hudi import HudiQueryType
# read the records between t1 (exclusive) and t2 (inclusive)
batches = hudi_table.read(
HudiReadOptions()
.with_query_type(HudiQueryType.Incremental)
.with_start_timestamp(t1)
.with_end_timestamp(t2)
)
# read the records after t1 (end defaults to the latest commit)
batches = hudi_table.read(
HudiReadOptions()
.with_query_type(HudiQueryType.Incremental)
.with_start_timestamp(t1)
)
# with column filters applied to the changed records
batches = hudi_table.read(
HudiReadOptions(filters=[("city", "=", "san_francisco")])
.with_query_type(HudiQueryType.Incremental)
.with_start_timestamp(t1)
.with_end_timestamp(t2)
)use hudi::table::QueryType;
// read the records between t1 (exclusive) and t2 (inclusive)
let options = ReadOptions::new()
.with_query_type(QueryType::Incremental)
.with_start_timestamp(t1)
.with_end_timestamp(t2);
let batches = hudi_table.read(&options).await?;
// read the records after t1 (end defaults to the latest commit)
let options = ReadOptions::new()
.with_query_type(QueryType::Incremental)
.with_start_timestamp(t1);
let batches = hudi_table.read(&options).await?;
// with column filters applied to the changed records
let options = ReadOptions::new()
.with_query_type(QueryType::Incremental)
.with_start_timestamp(t1)
.with_end_timestamp(t2)
.with_filters([("city", "=", "san_francisco")])?;
let batches = hudi_table.read(&options).await?;Incremental queries support the same timestamp formats as time-travel queries.
Streaming reads yield RecordBatches one at a time without loading the full result into memory.
The same ReadOptions knobs apply, plus batch_size and projection.
options = (
HudiReadOptions(
filters=[("city", "=", "san_francisco")],
projection=["rider", "city", "ts", "fare"],
)
.with_batch_size(4096)
)
for batch in hudi_table.read_stream(options):
print(batch.num_rows)use futures::StreamExt;
let options = ReadOptions::new()
.with_filters([("city", "=", "san_francisco")])?
.with_projection(["rider", "city", "ts", "fare"])
.with_batch_size(4096)?;
let mut stream = hudi_table.read_stream(&options).await?;
while let Some(batch) = stream.next().await {
let batch = batch?;
println!("{}", batch.num_rows());
}File group reading allows you to read data from a specific file slice. This is useful when integrating with query engines, where the plan provides file paths.
from hudi import HudiFileGroupReader
reader = HudiFileGroupReader(
"/table/base/path", {"hoodie.read.start.timestamp": "0"})
# Returns a PyArrow RecordBatch
record_batch = reader.read_file_slice_from_paths("relative/path.parquet", [])use hudi::file_group::reader::FileGroupReader;
use hudi::table::ReadOptions;
// Inside an async context
let reader = FileGroupReader::new_with_options(
"/table/base/path", [("hoodie.read.start.timestamp", "0")]).await?;
// Returns an Arrow RecordBatch
let record_batch = reader
.read_file_slice_from_paths(
"relative/path.parquet",
Vec::<&str>::new(),
&ReadOptions::new(),
)
.await?;#include "cxx.h"
#include "src/lib.rs.h"
#include "arrow/c/abi.h"
// Functions may throw rust::Error on failure
auto reader = new_file_group_reader_with_options(
"/table/base/path", {"hoodie.read.start.timestamp=0"});
// Returns an ArrowArrayStream pointer
std::vector<std::string> log_file_paths{};
ArrowArrayStream* stream_ptr = reader->read_file_slice_from_paths("relative/path.parquet", log_file_paths);Hudi-rs provides APIs to support integration with query engines. The sections below highlight some commonly used APIs.
Create a Hudi table instance using its constructor or the TableBuilder API.
All read APIs accept a ReadOptions (Rust) / HudiReadOptions (Python) value. It stores three fields — filters, projection, and hudi_options — and exposes chainable with_* builders for the rest. The available knobs:
query_type(with_query_type) —Snapshot(default) orIncremental. Drives dispatch inread,read_stream, andget_file_slices.filters— column filters as(field, op, value)tuples. The field can be any column (partition or data). Used for partition pruning, file-level stats pruning (snapshot only), and row-level filtering.projection— columns to return. Streaming pushes the projection down to the parquet reader; eager reads project after merging.batch_size(with_batch_size) — rows per batch (streaming only; eager reads return one batch per file slice).as_of_timestamp(with_as_of_timestamp) — snapshot/time-travel timestamp (defaults to latest commit).start_timestamp/end_timestamp(with_start_timestamp/with_end_timestamp) — incremental range (defaults to earliest…latest).hudi_options— per-read Hudi configs (e.g.hoodie.read.use.read_optimized.mode). Read configs are not stored in the table; they flow exclusively throughReadOptions.
| Stage | API | Description |
|---|---|---|
| Query planning | get_file_slices(options) |
Get the file slices the read targets, dispatched on options.query_type. To bucket for parallel reads, call hudi::util::collection::split_into_chunks on the result. |
compute_table_stats(options) |
Estimated (num_rows, byte_size) for scan planning. Snapshot (default) uses the metadata table; incremental aggregates from changed file slices. Returns None when stats cannot be computed. |
|
| Query execution | create_file_group_reader_with_options(read_options, extra_storage_overrides) |
Create a file group reader with the table's configs. Both args are optional. Timestamps are resolved automatically (e.g. AsOfTimestamp → EndTimestamp), so callers can pass the same options used for get_file_slices. |
read(options) / read_stream(options) |
Record-read APIs. Dispatch on options.query_type. read_stream errors on Incremental for now. Per-slice streaming lives on FileGroupReader. |
Create a Hudi file group reader instance using its constructor or the Hudi table API create_file_group_reader_with_options().
| Stage | API | Description |
|---|---|---|
| Query execution | read_file_slice() |
Read records from a given file slice; based on the configs, read records from only base file, or from base file and log files, and merge records based on the configured strategy. |
read_file_slice_from_paths() |
Read records from an explicit base file path and a list of log file paths. Pass an empty log path list to read just the base file. | |
read_file_slice_stream() |
Streaming version of read_file_slice(). Yields true streaming batches when the slice is base-file-only or read-optimized; for MOR slices with log files, falls back to a single merged batch. |
|
read_file_slice_from_paths_stream() |
Streaming version of read_file_slice_from_paths(). |
Enabling the hudi crate with datafusion feature will provide a DataFusion
extension to query Hudi tables.
Add crate hudi with datafusion feature to your application to query a Hudi table.
cargo new my_project --bin && cd my_project
cargo add tokio@1 datafusion@52
cargo add hudi --features datafusionUpdate src/main.rs with the code snippet below then cargo run.
Add python hudi with datafusion feature to your application to query a Hudi table.
pip install hudi[datafusion]use std::sync::Arc;
use datafusion::error::Result;
use datafusion::prelude::{DataFrame, SessionContext};
use hudi::HudiDataSource;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let hudi = HudiDataSource::new_with_options(
"/tmp/trips_table",
[("hoodie.read.input.partitions", "5")]).await?;
ctx.register_table("trips_table", Arc::new(hudi))?;
let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 'san_francisco'").await?;
df.show().await?;
Ok(())
}from datafusion import SessionContext
from hudi import HudiDataFusionDataSource
table = HudiDataFusionDataSource(
"/tmp/trips_table", [("hoodie.read.input.partitions", "5")]
)
ctx = SessionContext()
ctx.register_table_provider("trips", table)
ctx.sql("SELECT max(fare), city from trips group by city order by 1 desc").show()Hudi is also integrated with
Ensure cloud storage credentials are set properly as environment variables, e.g., AWS_*, AZURE_*, or GOOGLE_*.
Relevant storage environment variables will then be picked up. The target table's base uri with schemes such
as s3://, az://, or gs:// will be processed accordingly.
Alternatively, you can pass the storage configuration as options via Table APIs.
from hudi import HudiTableBuilder
hudi_table = (
HudiTableBuilder
.from_base_uri("s3://bucket/trips_table")
.with_option("aws_region", "us-west-2")
.build()
)use hudi::error::Result;
use hudi::table::builder::TableBuilder as HudiTableBuilder;
#[tokio::main]
async fn main() -> Result<()> {
let hudi_table = HudiTableBuilder::from_base_uri("s3://bucket/trips_table")
.with_option("aws_region", "us-west-2")
.build().await?;
Ok(())
}Check out the contributing guide for all the details about making contributions to the project.