Skip to content

ETL

Stream Postgres changes anywhere, in real-time

ETL is a Rust framework for building change data capture (CDC) pipelines on Postgres. Stream inserts, updates, deletes, truncates, and schema events to feature-gated destination modules or your own custom destination. BigQuery is the recommended stable first production destination; DuckLake, ClickHouse, and Snowflake modules are available in etl-destinations, and the Iceberg module is deprecated for new deployments.

Using ETL with Supabase? The Supabase product documentation for database replication explains how replication works inside Supabase itself, including dashboard-level concepts that complement this library-focused guide.

  • Real-time: Changes stream as they happen, not in batches
  • Reliable: At-least-once delivery with automatic retries
  • Extensible: Implement one trait to add any destination
  • Fast: Parallel initial copy, configurable batching
  • Type-safe: Rust API with compile-time guarantees
  1. Initial copy: ETL copies existing table data to your destination
  2. Streaming: ETL streams events (Insert, Update, Delete, Relation, and more) in real-time
  3. Recovery: The store persists state so pipelines resume after restarts

See Architecture for details, and Schema Changes for DDL semantics and limitations.

Install ETL in your project:

[dependencies]
etl = { git = "https://github.com/supabase/etl" }
tokio = { version = "1.0", features = ["full"] }

Create a pipeline with a source config, store, and destination:

use etl::{
config::{
BatchConfig, InvalidatedSlotBehavior, MemoryBackpressureConfig, PgConnectionConfig,
PipelineConfig, TableSyncCopyConfig, TcpKeepaliveConfig, TlsConfig,
},
destination::{Destination, DropTableForCopyResult, WriteEventsResult, WriteTableRowsResult},
error::EtlResult,
pipeline::Pipeline,
store::MemoryStore,
types::{Event, ReplicatedTableSchema, TableRow},
};
#[derive(Clone)]
struct NoopDestination;
impl Destination for NoopDestination {
fn name() -> &'static str {
"noop"
}
async fn drop_table_for_copy(
&self,
_replicated_table_schema: &ReplicatedTableSchema,
async_result: DropTableForCopyResult<()>,
) -> EtlResult<()> {
async_result.send(Ok(()));
Ok(())
}
async fn write_table_rows(
&self,
_replicated_table_schema: &ReplicatedTableSchema,
_table_rows: Vec<TableRow>,
async_result: WriteTableRowsResult<()>,
) -> EtlResult<()> {
async_result.send(Ok(()));
Ok(())
}
async fn write_events(
&self,
_events: Vec<Event>,
async_result: WriteEventsResult<()>,
) -> EtlResult<()> {
async_result.send(Ok(()));
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pg_config = PgConnectionConfig {
host: "localhost".to_string(),
hostaddr: None,
port: 5432,
name: "mydb".to_string(),
username: "postgres".to_string(),
password: Some("password".to_string().into()),
tls: TlsConfig { enabled: false, trusted_root_certs: String::new() },
keepalive: TcpKeepaliveConfig::default(),
};
let config = PipelineConfig {
id: 1,
publication_name: "my_publication".to_string(),
pg_connection: pg_config,
store_pg_connection: None,
batch: BatchConfig {
max_fill_ms: 5000,
memory_budget_ratio: 0.2,
max_bytes: 8 * 1024 * 1024,
},
table_error_retry_delay_ms: 10_000,
table_error_retry_max_attempts: 5,
max_table_sync_workers: 4,
max_copy_connections_per_table: PipelineConfig::DEFAULT_MAX_COPY_CONNECTIONS_PER_TABLE,
memory_refresh_interval_ms: 100,
memory_backpressure: Some(MemoryBackpressureConfig::default()),
table_sync_copy: TableSyncCopyConfig::default(),
invalidated_slot_behavior: InvalidatedSlotBehavior::default(),
};
let store = MemoryStore::new();
let destination = NoopDestination;
let mut pipeline = Pipeline::new(config, store, destination);
pipeline.start().await?;
pipeline.wait().await?;
Ok(())
}

This snippet intentionally shows ETL used as a library with your own Destination implementation. Feature-gated destination modules live in etl-destinations: BigQuery, DuckLake, ClickHouse, Snowflake, and the deprecated Iceberg module. The shared pipeline, config, store, and event types should come from etl.

Pipeline::start() installs ETL's source-side schema helpers before replication begins. If you use PostgresStore as the runtime store, PostgresStore::new() separately prepares the Postgres-backed state tables.

SectionWhat you'll find
Your First PipelineStep-by-step instructions to get things done
Postgres Replication ConceptsDeep dives into concepts and architecture
ExamplesRunnable bigquery and ducklake example binaries

Pull requests and issues welcome on GitHub.

New destinations: Open an issue first to gauge interest. Each built-in destination carries long-term maintenance cost, so we only accept those with significant community demand.