ETL
Stream Postgres changes anywhere, in real-time
ETL is a Rust framework for building change data capture (CDC) pipelines on Postgres. Stream inserts, updates, deletes, truncates, and schema events to feature-gated destination modules or your own custom destination. BigQuery is the recommended stable first production destination; DuckLake, ClickHouse, and Snowflake modules are available in etl-destinations, and the Iceberg module is deprecated for new deployments.
Using ETL with Supabase? The Supabase product documentation for database replication explains how replication works inside Supabase itself, including dashboard-level concepts that complement this library-focused guide.
Start Here
Section titled “Start Here”Why ETL?
Section titled “Why ETL?”- Real-time: Changes stream as they happen, not in batches
- Reliable: At-least-once delivery with automatic retries
- Extensible: Implement one trait to add any destination
- Fast: Parallel initial copy, configurable batching
- Type-safe: Rust API with compile-time guarantees
How It Works
Section titled “How It Works”- Initial copy: ETL copies existing table data to your destination
- Streaming: ETL streams events (Insert, Update, Delete, Relation, and more) in real-time
- Recovery: The store persists state so pipelines resume after restarts
See Architecture for details, and Schema Changes for DDL semantics and limitations.
Quick Example
Section titled “Quick Example”Install ETL in your project:
[dependencies]etl = { git = "https://github.com/supabase/etl" }tokio = { version = "1.0", features = ["full"] }Create a pipeline with a source config, store, and destination:
use etl::{ config::{ BatchConfig, InvalidatedSlotBehavior, MemoryBackpressureConfig, PgConnectionConfig, PipelineConfig, TableSyncCopyConfig, TcpKeepaliveConfig, TlsConfig, }, destination::{Destination, DropTableForCopyResult, WriteEventsResult, WriteTableRowsResult}, error::EtlResult, pipeline::Pipeline, store::MemoryStore, types::{Event, ReplicatedTableSchema, TableRow},};
#[derive(Clone)]struct NoopDestination;
impl Destination for NoopDestination { fn name() -> &'static str { "noop" }
async fn drop_table_for_copy( &self, _replicated_table_schema: &ReplicatedTableSchema, async_result: DropTableForCopyResult<()>, ) -> EtlResult<()> { async_result.send(Ok(())); Ok(()) }
async fn write_table_rows( &self, _replicated_table_schema: &ReplicatedTableSchema, _table_rows: Vec<TableRow>, async_result: WriteTableRowsResult<()>, ) -> EtlResult<()> { async_result.send(Ok(())); Ok(()) }
async fn write_events( &self, _events: Vec<Event>, async_result: WriteEventsResult<()>, ) -> EtlResult<()> { async_result.send(Ok(())); Ok(()) }}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pg_config = PgConnectionConfig { host: "localhost".to_string(), hostaddr: None, port: 5432, name: "mydb".to_string(), username: "postgres".to_string(), password: Some("password".to_string().into()), tls: TlsConfig { enabled: false, trusted_root_certs: String::new() }, keepalive: TcpKeepaliveConfig::default(), };
let config = PipelineConfig { id: 1, publication_name: "my_publication".to_string(), pg_connection: pg_config, store_pg_connection: None, batch: BatchConfig { max_fill_ms: 5000, memory_budget_ratio: 0.2, max_bytes: 8 * 1024 * 1024, }, table_error_retry_delay_ms: 10_000, table_error_retry_max_attempts: 5, max_table_sync_workers: 4, max_copy_connections_per_table: PipelineConfig::DEFAULT_MAX_COPY_CONNECTIONS_PER_TABLE, memory_refresh_interval_ms: 100, memory_backpressure: Some(MemoryBackpressureConfig::default()), table_sync_copy: TableSyncCopyConfig::default(), invalidated_slot_behavior: InvalidatedSlotBehavior::default(), };
let store = MemoryStore::new(); let destination = NoopDestination;
let mut pipeline = Pipeline::new(config, store, destination); pipeline.start().await?; pipeline.wait().await?;
Ok(())}This snippet intentionally shows ETL used as a library with your own Destination implementation.
Feature-gated destination modules live in etl-destinations: BigQuery,
DuckLake, ClickHouse, Snowflake, and the deprecated Iceberg module. The shared
pipeline, config, store, and event types should come from etl.
Pipeline::start() installs ETL's source-side schema helpers before
replication begins. If you use PostgresStore as the runtime store,
PostgresStore::new() separately prepares the Postgres-backed state tables.
Documentation
Section titled “Documentation”| Section | What you'll find |
|---|---|
| Your First Pipeline | Step-by-step instructions to get things done |
| Postgres Replication Concepts | Deep dives into concepts and architecture |
| Examples | Runnable bigquery and ducklake example binaries |
Contributing
Section titled “Contributing”Pull requests and issues welcome on GitHub.
New destinations: Open an issue first to gauge interest. Each built-in destination carries long-term maintenance cost, so we only accept those with significant community demand.