<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Anton Borisov on Medium]]></title>
        <description><![CDATA[Stories by Anton Borisov on Medium]]></description>
        <link>https://medium.com/@borzoniusy?source=rss-a199772355b8------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/1*_HD97sxoXLQUatacKtg-9w.png</url>
            <title>Stories by Anton Borisov on Medium</title>
            <link>https://medium.com/@borzoniusy?source=rss-a199772355b8------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Fri, 05 Jun 2026 02:54:52 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@borzoniusy/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[AutoMQ: Shared Storage Architecture Deep Dive]]></title>
            <link>https://medium.com/fresha-data-engineering/automq-shared-storage-architecture-deep-dive-043c5226847e?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/043c5226847e</guid>
            <category><![CDATA[distributed-systems]]></category>
            <category><![CDATA[programming]]></category>
            <category><![CDATA[automq]]></category>
            <category><![CDATA[big-data]]></category>
            <category><![CDATA[kafka]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Wed, 18 Feb 2026 13:14:42 GMT</pubDate>
            <atom:updated>2026-02-18T13:14:42.910Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*DFlk9kEMnLrcKBjAOBqlOw.png" /></figure><p><strong><em>Disclaimer</em></strong><em>: I’ve previously written a broader overview of diskless Kafka proposals and the different ideas behind them, not tied specifically to AutoMQ, you can find it </em><a href="https://medium.com/fresha-data-engineering/the-good-the-bad-and-the-automq-5aa7a8748e71"><em>here</em></a><em>. This post goes deeper into AutoMQ specifics.</em></p><p>If you’ve run <a href="https://kafka.apache.org">Kafka</a> in production for long enough, you have learned the pattern: everything looks beautifully simple until <a href="https://aws.amazon.com/msk/">AWS MSK</a> runs monthly security patches and your cluster decides it’s time for a rebalance.</p><p>At small scale, replication feels like a safety blanket: a comforting guarantee that your data exists in multiple places and can survive the loss of any single machine. At cloud scale, that same replication quietly transforms into a tax you pay on every byte, every day, with no end in sight. You’re storing the same data three time because the standard is to set replication factor (RF) to 3, you’re paying for cross-AZ replication traffic that exists purely to keep those copies in sync, you’re watching partitions go under-replicated after the slightest operational drama, and every time you try to scale out, “add brokers” translates into a data migration that can take hours to complete.</p><p>Kafka still works. It’s just that the operational shape of “works” becomes: works, but you’re always one rebalance away from learning something new about your storage and networking bill.</p><p>What if we look at it differently:</p><blockquote>Make brokers disposable by shifting durability guarantees to shared storage in the cloud.</blockquote><p>That’s the whole architectural inversion, and everything else follows as a consequence. In this post, I’ll take <a href="https://www.automq.com/">AutoMQ</a> as reference and walk through the three designed layers: ElasticLog, S3Stream, and the metadata control plane, then trace a single record through the entire write path, and finally show how the design pays off in zero-copy failover and no-migration scaling.</p><figure><img alt="Diagram comparing Kafka and AutoMQ broker architectures. On the left, two Kafka brokers each with their own local disk, connected by a dashed arrow representing replication. On the right, two AutoMQ brokers each with only a cache layer, both connected by arrows to a shared S3 object storage layer below." src="https://cdn-images-1.medium.com/max/1024/1*6rJ2XgKCTmT6__B-wcnSLg.png" /><figcaption>A mental model: Kafka’s shared-nothing model (left) couples each broker to its own disk and replicates data between them. AutoMQ’s shared-storage model (right) reduces brokers to compute + cache attached to a shared object storage layer, brokers become disposable because they no longer own the data.</figcaption></figure><h3>Where the Pain Comes From</h3><p>Classic Kafka stores partitions on local disks, and durability emerges from replication: leaders write locally, followers fetch those writes, and the in-sync replica set becomes your safety envelope. It’s an elegant design that has powered some of the world’s largest streaming deployments, but it also couples three concerns tightly together: durability (determined by the replication factor and the health of the in-sync replicas aka ISRs), availability (dependent on leader election and how quickly followers can catch up), and scaling (which requires reassigning partitions, i.e. physically moving data).</p><p>When those three concerns are coupled, you start seeing the same script play out in different outfits. A broker dies and leader election fires within seconds, but the partitions it owned stay under-replicated until followers finish catching up, minutes, hours, maybe lifetimes. You add brokers to grow the cluster and the reassignment plan looks clean on paper, but behind the scenes it’s copying partition data across the network, competing with production traffic the whole time. Even the simplest capacity ask “we need more headroom” translates into a rebalance that has to physically move bytes across disks before anything actually improves.</p><p>AutoMQ makes one design choice that would be heresy in many Kafka shops: the replication factor is 1, and the durable copy lives in object storage.</p><p>One copy, period. <a href="https://aws.amazon.com/s3/storage-classes/">S3’s 99.999999999%</a> durability, eleven nines, handles the rest. And if you accept that single design decision, the dominoes fall in a very specific order: brokers become stateless because there are no local segment files to own, failover becomes a metadata-only operation because there are no segments to copy, and scaling becomes “add compute” because there’s no data to migrate. This isn’t “Kafka but cheaper.” It’s Kafka whose durability mechanism has moved from ISR-based replication to object storage, and that shift changes the operational character of the entire system.</p><h3>Quick Primer</h3><p>AutoMQ is Kafka-compatible at the protocol layer, meaning clients connect and produce and consume exactly as they would with standard Kafka, but internally it swaps Kafka’s file-backed log for one backed by shared storage. Three pieces matter for understanding how this works: ElasticLog, which is a Kafka Log implementation where “segments” aren’t files but slices of streams; S3Stream, which is a storage engine that turns S3 into something log-like through a combination of write-ahead logging, caching, and batched uploads; and the metadata control plane, which maintains the mapping that ties each topic-partition to the underlying streams and manages their object lifecycle.</p><figure><img alt="Diagram showing AutoMQ’s three-layer internal architecture. A Kafka client connects via Kafka protocol to a broker containing three layers: ElasticLog (segments) at the top, S3Stream (WAL + cache) in the middle, and Metadata (ownership) at the bottom. The S3Stream layer connects via a storage arrow to S3 object storage on the right" src="https://cdn-images-1.medium.com/max/1024/1*hiKI_lTR7BOYrNHqYj5v3Q.png" /><figcaption>AutoMQ’s three layers inside a broker:Clients connect via standard Kafka protocol, ElasticLog translates that into segment semantics over streams. S3Stream handles WAL-based durability and caching underneath. The metadata plane tracks partition-to-stream ownership, making the broker replaceable without moving data.</figcaption></figure><h3>Architecture at a Glance</h3><p>Think of AutoMQ as Kafka with the log peeled into three layers, each with a single, cleanly separated responsibility. ElasticLog sits at the top and exists to keep Kafka semantics intact, it’s what makes the system look and feel like Kafka to everything above it. S3Stream sits in the middle and makes S3 usable as a log-like storage backend, handling the messy realities of object storage economics and latency. And the metadata plane sits at the bottom, maintaining the ownership mappings and lifecycle state that allow brokers to be truly replaceable without any data movement.</p><h4>Layer 1: ElasticLog</h4><p>Kafka’s normal LogSegment is file-backed, consisting of .log, .index, .timeindex, and .txnindex files. AutoMQ introduces `ElasticLogSegment`, which presents itself as a standard Kafka segment to everything above it. It is backed not by files, but by stream slices:</p><pre>// core/.../streamaspect/ElasticLogSegment.java<br>public class ElasticLogSegment extends LogSegment {<br>    private final ElasticLogFileRecords log;       // Data -&gt; stream slice<br>    private final ElasticTimeIndex timeIndex;       // Timestamps -&gt; stream slice<br>    private final ElasticTransactionIndex txnIndex; // Transactions -&gt; stream slice<br><br>    public ElasticLogSegment(...) {<br>        super(null, null, null, null, ...);  // Nulls for all file-based params<br>        log = new ElasticLogFileRecords(<br>            sm.loadOrCreateSlice(&quot;log&quot; + suffix, meta.log()), ...);<br>    }<br>}</pre><p>Kafka wants to append records and advance the log end offset(LEO) synchronously, whereas S3 wants you to batch, pipeline, and confirm later. These two expectations are fundamentally at odds. AutoMQ resolves this tension by splitting the concerns: the LEO advances immediately in the Kafka-facing layer, while the confirmOffset advances separately after the underlying storage pipeline has actually made the bytes durable.</p><p>To prevent “async durability” from quietly turning into “infinite memory consumption,” the write path applies backpressure through a semaphore keyed by bytes, allowing some amount of unconfirmed writes with cap at 1GB currently. When the system reaches saturation, producers block early rather than letting memory silently balloon until something crashes.</p><p>The read path uses a two-tier executor design. Every consumer fetch first runs on a fast fetch executor (4 threads by default) that attempts to serve entirely from cache. If the data isn’t cached, the request fails fast and gets resubmitted to a slow fetch executor (12 threads) that reads from S3. This keeps cache hits on a small, low-latency thread pool and prevents slow S3 reads from blocking fast ones.</p><p>When a segment rolls because it has reached its size or time limit, the old segment’s stream slices are sealed, permanently closed to new writes, with their metadata finalized and offset ranges made stable. This is how AutoMQ achieves an object-store-friendly log shape: you append to the active slice until the segment rolls, then seal it and move on, creating a clean boundary between mutable and immutable data.</p><p>At this point you’ve built “Kafka semantics over slices,” but you still need a storage engine that makes those slices efficient to write, read, and manage at scale.</p><h4>Layer 2: S3Stream</h4><p>If you hear “Kafka on S3” and imagine every append causing an S3 PUT, stop right there — that’s not an architecture, that’s a bill. S3Stream’s entire reason for existing is to make object storage behave like a log without drowning in request costs and tail latency, and it accomplishes this through a three-stage pipeline where writes land in a durable WAL first, hot reads come from an explicit cache layer, and objects are uploaded to S3 in carefully batched operations to keep the request count sane.</p><p>Writes first land in a Write-Ahead Log backed by S3. Rather than issuing a separate S3 PUT for every partition append, which would mean thousands of tiny requests per second, the WAL collects records from all partitions into a shared buffer and flushes them as a single S3 object every 250ms or 8MB, whichever comes first. That one object is a mix of records from different partitions, ordered by arrival time, not by topic. Once the PUT succeeds, the producer gets its ack and the data is durable. Each broker maintains its own WAL, there’s no shared WAL across the cluster.</p><p>These WAL objects aren’t the permanent home. In the background, an upload pipeline reads them, reorganizes records by stream, and writes proper partition-ordered objects for efficient reads. Then the WAL objects are trimmed. The WAL exists to make small writes cheap and durable, the upload pipeline exists to make reads fast.</p><p>The upload pipeline doesn’t just dump WAL contents into S3 one-to-one, it makes deliberate grouping decisions to keep object counts and request costs under control. Large streams that produce 16MB or more per upload cycle get their own dedicated objects, while smaller streams are bundled together into stream set objects, preventing the object store from filling up with millions of tiny files that would each cost a separate GET request to read back.</p><p>Each upload cycle follows a <strong>prepare -&gt; upload -&gt; commit</strong> protocol: an object ID is allocated from the controller first, the data is written to S3, and only then is the object registered in <a href="https://kafka.apache.org/documentation/#kraft">KRaft</a> metadata. This ordering ensures that metadata always points to objects that actually exist, and that recovery after a crash mid-upload is deterministic. The controller knows exactly which uploads completed and which were abandoned.</p><p>Standard Kafka benefits massively from the operating system’s page cache, which transparently keeps recently-written and recently-read data in memory. But if brokers are stateless and S3 is the durable store, there are no local files for the OS to cache, so you need an explicit hot path instead. S3Stream fills this role with a block-structured in-memory cache, 200MB by default with up to 64 blocks, that serves recent reads at memory speed comparable to Kafka’s page cache hit rate. This creates a two-tier read path: the in-memory cache serves at sub-millisecond latency, and S3 GET operations handle everything else at 100–500ms. That layering is why the design can credibly claim that hot reads feel Kafka-like while being honest that cold reads are fundamentally different, no amount of caching eliminates that reality for data that’s fallen out of the hot tier.</p><p>At this point we have Kafka semantics through ElasticLog and durable, economical storage behavior through S3Stream. The remaining piece is the glue that makes brokers truly replaceable.</p><h4>Layer 3: Metadata Control Plane</h4><p>Shared storage solves the durability problem, but it creates a new requirement that didn’t exist in the shared-nothing world. Brokers must be able to answer instantly: “Which shared-storage streams represent this partition?”</p><p>AutoMQ maintains a bidirectional mapping in KRaft metadata that provides exactly this:</p><pre>// metadata/.../S3StreamsMetadataImage.java<br>private final TimelineHashMap&lt;TopicIdPartition, Set&lt;Long&gt;&gt; partition2streams;<br>private final TimelineHashMap&lt;Long, TopicIdPartition&gt; stream2partition;</pre><p>This mapping is the architectural keystone of the entire system. When a broker needs to serve a partition, it looks up the stream IDs in this map, and the streams already exist in S3, there’s nothing to copy, nothing to reconstruct. “Ownership” in AutoMQ is just a pointer, not a physical relationship with data on a local disk.</p><h3>Anatomy of a Write</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*OEKtwr0wgjFqs5Z9gfc0iQ.png" /></figure><p>Now that you know the three layers, let’s trace a single producer record through the entire system, from producer.send() to durable acknowledgment, and see where things can go wrong along the way.</p><h4>Step 1: ElasticLog Accepts the Append</h4><p>When a producer record batch arrives, ElasticLog.append() does two things in sequence. First, it tries to acquire a permit from the backpressure semaphore, sized to the byte count of the incoming batch:</p><pre>// ElasticLog.scala<br>val permit = records.sizeInBytes()<br>if (!APPEND_PERMIT_SEMAPHORE.tryAcquire(permit)) {<br>    while (!APPEND_PERMIT_SEMAPHORE.tryAcquire(permit, 1, TimeUnit.SECONDS)) {<br>        tryAppendStatistics()  // Log stats while the producer waits<br>    }<br>}</pre><p>If the semaphore is exhausted, meaning too many bytes are in flight between “appended” and “confirmed durable”, the producer thread blocks here, retrying every second. This is intentional: it’s better to slow down the producer than to let unconfirmed data silently consume all available memory.</p><p>Once the permit is acquired, the record is appended to the active ElasticLogSegment and the log end offset advances immediately. From this moment forward, consumers fetching with isolation.level=read_uncommitted can see the record, even though it isn&#39;t durable yet.</p><h4>Step 2: The Async Flush Enters S3Stream</h4><p>The segment’s asyncLogFlush() returns a CompletableFuture that propagates through the storage stack until it reaches the WAL. Along the way, two backpressure mechanisms guard against overload: if the LogCache is full, the request enters a backoffRecords queue and retries periodically until space opens up, and if the WAL itself is at physical capacity, it throws OverCapacityException, which triggers a forced upload of the current WAL contents to S3 to make room. Both mechanisms exist to prevent any single stage from silently overwhelming the next.</p><h4>Step 3: WAL Batches and Writes to S3</h4><p>The WAL doesn’t write every record individually, that would be financial suicide on S3. Instead, it batches records for up to 250ms or 8MB(defaults), whichever comes first, then issues a single S3 PUT for the entire batch as a WAL object.</p><p>When that PUT succeeds, the write is durable. Two things happen immediately: the record lands in LogCache inside the storage layer, making it available for hot reads, and the backpressure semaphore permit is released, reclaiming budget for new writes. Then, in ElasticLog’s completion callback, confirmOffset advances via a compare-and-set loop that ensures monotonic progress even when concurrent appends complete out of order:</p><pre>// ElasticLog.scala, confirmOffset advances atomically<br>while (true) {<br>    val offset = _confirmOffset.get()<br>    if (offset.messageOffset &lt; endOffset) {<br>        _confirmOffset.compareAndSet(offset,<br>            new LogOffsetMetadata(endOffset, activeSegment.baseOffset, activeSegment.size))<br>        notify = true<br>    } else {<br>        break()<br>    }<br>}</pre><p>Once confirmOffset advances, an appendAckQueue notification fires, processed by a single-threaded callback executor that sends the producer acknowledgment, single-threaded specifically to guarantee that acks are delivered in offset order.</p><h4>Step 4: Batched Upload to Permanent Objects</h4><p>The WAL is a staging area, not a long-term home. In the background, S3Storage runs an upload pipeline that drains WAL contents into permanent S3 objects through the same <strong>prepare -&gt; upload -&gt; commit</strong> protocol described earlier. The prepare phase allocates an object ID from the controller with a 60-minute TTL that ensures cleanup if the upload is abandoned. The upload phase groups records by stream, giving streams that produce 16MB or more per cycle their own dedicated stream objects while bundling smaller streams into stream set objects. And the commit phase registers the new objects in KRaft metadata and marks the WAL source data for trimming.</p><p>The pipeline is designed so that the next cycle’s prepare phase overlaps with the current cycle’s upload, ensuring there’s always forward progress. Once committed, the data has graduated from ephemeral WAL objects to permanent, compaction-eligible storage.</p><figure><img alt="Diagram showing the AutoMQ write path. A producer sends records through ElasticLog into an in-memory WAL buffer, which flushes batched records as an S3 Put Object to object storage. Step 1: S3 returns an acknowledgment back to the WAL buffer. Step 2: the WAL buffer places the data into LogCache below for hot reads." src="https://cdn-images-1.medium.com/max/1024/1*jEYr9UYsz2732tiJYsmhyQ.png" /><figcaption>The write path: Records flow through ElasticLog into the in-memory WAL buffer, which flushes batched S3 PUTs. Once S3 confirms the write (1), the buffer places the data into LogCache for hot reads (2), durability is confirmed before the producer ack fires.</figcaption></figure><h4>What Happens When Things Go Wrong</h4><p>The most straightforward failure is a WAL PUT that doesn’t succeed. When this happens, ElasticLogFileRecords marks the segment as FENCED terminal state where all subsequent append attempts throw IOException:</p><pre>// ElasticLogFileRecords.java<br>cf.whenComplete((rst, e) -&gt; {<br>    if (e instanceof IOException) {<br>        status = ElasticResourceStatus.FENCED;<br>    }<br>});</pre><p>The records that weren’t confirmed are effectively lost to the producer, which will see a timeout and can retry. The partition will likely need to roll a new segment or be reassigned to recover.</p><p>If a broker dies between append and WAL commit, the situation is similar: the log end offset advanced but confirmOffset didn&#39;t, meaning those records were never acknowledged. Producers will time out and retry, and the records, which existed only in the dead broker&#39;s memory are gone. This is actually the same semantic as standard Kafka when acks=1 and a broker dies before replication completes, with one important difference: in AutoMQ, it routes to the WAL, so the window of vulnerability is bounded by the WAL batching interval (up to 250ms) rather than by replication lag, which can be significantly longer.</p><p>The most interesting failure case is when a broker dies after the WAL commit but before the upload to permanent objects. Here, the WAL objects are already sitting in S3, so the data is durable, the producer received its acknowledgment and that acknowledgment was honest. During failover, the new broker runs WAL recovery, reads those WAL objects from S3, uploads them as proper permanent objects, and resets the WAL. No acknowledged data is lost, and the recovery is deterministic because the WAL lives in S3, not on a dead machine’s local disk.</p><h3>The Payoff: Zero-Copy Failover</h3><p>Classic Kafka failover has two phases that operate on very different timescales. The first phase is leader election, which is often fast at roughly 10–25 seconds including session timeout and client recovery. The second phase is the aftercare — under-replication recovery, manual reassignment, data movement, which can stretch into hours depending on partition sizes and cluster load.</p><p>AutoMQ aims to keep the first phase comparable to standard Kafka, and <strong>delete the second phase entirely</strong>.</p><h4>How Failure Is Actually Detected</h4><p>Failover isn’t triggered the instant a broker disappears — there’s a deliberate detection sequence designed to avoid false positives. The FailoverControlManager runs a scheduled task every 10 seconds that evaluates whether a failover should proceed by checking three conditions through NodeRuntimeMetadata.shouldFailover():</p><pre>// NodeRuntimeMetadata.java<br>public boolean shouldFailover() {<br>    return isFenced() &amp;&amp; hasOpeningStreams<br>        &amp;&amp; System.currentTimeMillis() - epoch &gt; DONT_FAILOVER_AFTER_NEW_EPOCH_MS;<br>}</pre><p>All three conditions must hold simultaneously: the node must be fenced (meaning KRaft’s session timeout has expired, which takes about 9 seconds by default), the node must have opening streams (confirming it was actually serving partitions rather than sitting idle), and the node must have been down for longer than a grace period after its last restart (to avoid triggering failover if a broker is simply bouncing quickly). Only when all three conditions are met does the controller create a FailoverContext, assign a target broker via round-robin from the pool of healthy nodes, and record the failover decision in KRaft metadata.</p><h4>The Metadata Event</h4><p>Once the failover decision is recorded, the actual transfer of partition ownership is remarkably lightweight. The new broker begins serving immediately by attaching to the existing streams in shared storage, creating partition handles with zero data copying:</p><pre>// core/.../streamaspect/ElasticReplicaManager.scala<br>case HostedPartition.None =&gt;<br>    val partition = Partition(tp, time, this)     // Just a metadata wrapper<br>    allPartitions.put(tp, HostedPartition.Online(partition))<br>    // No segment copy. No index rebuild. Partition is immediately online.</pre><p>Three lines of code and the partition is online — no segment copying, no index file rebuilding, no waiting for replicas to catch up. The bytes are already durable in shared storage and always have been; the only thing that needed to change was which broker holds the pointer.</p><h3>What Happens to In-Flight Writes</h3><p>When the old broker dies, any records that were appended but not yet WAL-committed are gone, they were never acknowledged, so producers with acks=all will time out and retry, which is the expected contract.</p><p>The interesting case is records that were WAL-committed and therefore acknowledged to the producer, but not yet uploaded as permanent objects. Those WAL objects are already sitting in S3, and the FailoverContext carries the dead broker&#39;s WAL configuration so the new broker knows where to find them. During recovery, the new broker reads those WAL objects, promotes any records that haven&#39;t already been committed as permanent objects, and resets the WAL. No acknowledged data is lost, and the recovery is fully deterministic because every piece of state it needs is already in shared storage.</p><h3>Epoch Fencing: Preventing Zombie Writers</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/896/1*4zj9JT4oSA19Rr-8SOq7CQ.png" /><figcaption>Zombie writers being fenced :)</figcaption></figure><p>There’s a subtle race condition worth considering: what if the old broker isn’t actually dead, but was just slow, and the network partition that triggered its fencing resolves before it realizes it’s been replaced? If both brokers attempt to write to the same stream, you have a split-brain problem.</p><p>S3Stream handles this through epoch-based fencing. When the new broker opens streams with a higher epoch, any subsequent append from the old broker’s lower epoch receives an EXPIRED_STREAM_EPOCH error.</p><p>The old broker’s stream is marked FENCED and all future operations on it fail immediately. There&#39;s no window for split-brain writes and the epoch mechanism makes ownership transfer atomic from the stream&#39;s perspective, regardless of whether the old broker knows it&#39;s been replaced.</p><h3>The Cold Cache Penalty</h3><p>There’s one thing that’s worth being honest about: the new broker’s first few seconds of read performance. When a partition comes online on a fresh broker, its LogCache is empty, which means every consumer fetch during this window hits S3 directly, 100–500ms per read instead of sub-millisecond.</p><p>This is a real, observable penalty, and the distinction matters: “partition is online” and “partition is performing at full speed” are not the same moment. The cache warms as new writes flow through and as consumers trigger reads, and within seconds to minutes depending on throughput the hot path is fully active again. But for latency-sensitive consumers, those first moments after failover will feel measurably different from steady state.</p><h3>Compaction</h3><p>AutoMQ compacts at multiple levels, from stream sets to large consolidated objects, and supports two strategies for doing so. Physical merges rewrite small objects into bigger sequential ones, targeting objects under 128MB. Composite merges create reference objects that point to the originals, trading some additional indirection for significantly cheaper write operations on larger data.</p><p>The goal isn’t just cost control, though that matters, it’s also about read behavior. Compaction is how you prevent “fetch a range of offsets” from degenerating into “issue 400 separate GET requests for tiny objects,” which would destroy both latency and your S3 bill.</p><h3>MetaStream: Replacing Kafka’s Local Checkpoint Files</h3><p>To understand what MetaStream does, you first need to understand what it replaces.</p><p>Standard Kafka keeps several checkpoint files on every broker’s local disk — all simple text files written atomically via temp-file-and-rename — covering recovery points, log start offsets, high watermarks, compaction progress, and leader epoch histories, plus serialized producer state snapshots that capture in-flight transactions at specific offsets. These files are written by periodic scheduled tasks rather than on every append, and they represent state that cannot be reconstructed from the message stream alone: messages don’t carry “I am a segment boundary” markers, epoch transitions aren’t embedded in payloads, and replaying producer state from offset zero on a 10TB partition could take hours.</p><p>Stateless brokers can’t have local files, so AutoMQ collapses all of this checkpoint state into four keys inside <strong>MetaStream</strong>, an append-only KV store that is itself persisted as an elastic stream in S3:</p><pre>// core/.../streamaspect/MetaStream.java<br>public static final String LOG_META_KEY = &quot;LOG&quot;;<br>public static final String PRODUCER_SNAPSHOTS_META_KEY = &quot;PRODUCER_SNAPSHOTS&quot;;<br>public static final String PARTITION_META_KEY = &quot;PARTITION&quot;;<br>public static final String LEADER_EPOCH_CHECKPOINT_KEY = &quot;LEADER_EPOCH_CHECKPOINT&quot;;</pre><p>Each key stores a specific piece of non-derivable state that a new broker needs to begin serving a partition without scanning the entire log. The LOG key is the segment inventory — a table of contents that maps each segment to its byte ranges within S3 streams, so the new broker knows where segments begin and end without scanning terabytes of data. The PARTITION key stores recovery checkpoints, most critically the recoverOffset that tells the new broker &quot;only replay messages from here forward,&quot; turning a potentially hours-long full-log scan into a seconds-long tail scan. The PRODUCER_SNAPSHOTS key captures serialized producer state at specific offsets, avoiding the need to replay every historical message through the producer state machine. And the LEADER_EPOCH_CHECKPOINT key records which leader owned which offset range, information that followers need for divergence detection after a leader change and that simply doesn&#39;t exist inside the messages themselves.</p><p>When a new broker takes over a partition, the MetaStream replay is the very first thing that happens. The broker opens the partition’s MetaStream from S3, iterates every entry to cache the latest value per key, then uses those four keys to reconstruct the full partition state: segment inventory, recovery checkpoints, producer snapshots, and epoch history. Finally, it replays only the messages after recoverOffset to rebuild producer state for the unvalidated tail.</p><p>The whole thing takes seconds. It’s reading a small metadata stream — typically a few hundred kilobytes after compaction — not scanning terabytes of partition data. And because MetaStream self-compacts when 60% of its entries become obsolete, it stays compact over the partition’s entire lifetime.</p><p>This is what makes brokers truly disposable in practice: the new broker doesn’t need yesterday’s local directory, doesn’t need any files from the dead broker, doesn’t need anything except access to shared storage. It rebuilds its entire world from the MetaStream and starts serving.</p><h3><strong>Tradeoffs</strong></h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*l1JCep30-LYiWJxopQdZyQ.png" /></figure><p>What you give up is real and shouldn’t be minimized. Cold reads are fundamentally slower because an S3 GET is not a page cache hit, that’s a 100–1000x difference in latency that no amount of wishful thinking can eliminate. You introduce cache tuning and compaction scheduling as first-class operational concerns that didn’t exist before. You trade the relative simplicity of file-system-backed storage for a multi-stage pipeline involving WAL batching, commit protocols, and cache coherence. And write latency increases from sub-millisecond to 5–50ms because the WAL batches writes for up to 250ms before issuing S3 PUTs.</p><p>What you stop suffering, though, is equally real. The 3x replication cost that was the non-negotiable baseline is gone. The assumption that “scaling implies migration” and the hours of data movement that came with it — is gone. The failover aftercare that triggered prolonged under-replication windows while you waited for data to rebalance across surviving brokers is gone.</p><p>If you’re optimizing for the cloud reality where durability is cheap in object storage but moving bytes around inside your cluster is expensive in both time and money, AutoMQ’s take starts to look appealing.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*QwIfut--RJEFLhPg1OmtEA.png" /></figure><h3>Closing Notes</h3><p>Kafka made brokers part of the durable identity of the system where each broker owned its partitions physically, and that ownership was load-bearing for both durability and availability.</p><p>AutoMQ removes that identity entirely. Data is permanent, brokers are not. And once you internalize that inversion, the entire design becomes consistent: ElasticLog keeps Kafka semantics intact without segment files, S3Stream makes object storage behave like a log without generating insane request costs, the metadata plane makes ownership transferable without copying a single byte, and MetaStream replaces local checkpoint files so that brokers can be truly, operationally stateless.</p><p>This is “bring compute to data,” applied to streaming logs. And if you’ve ever stared at a rebalance plan and thought <em>why am I moving all these bytes just to add capacity</em>, you already understand why this approach is attractive.</p><h3>Acknowledgments</h3><p>Thanks to wonderful <a href="https://medium.com/@nicolazar">Nicoleta Lazar</a> for reviewing drafts of this article and improving its clarity.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=043c5226847e" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/automq-shared-storage-architecture-deep-dive-043c5226847e">AutoMQ: Shared Storage Architecture Deep Dive</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[What the Fuss with Fluss: Flink 2.2 Delta Force]]></title>
            <link>https://medium.com/fresha-data-engineering/what-the-fuss-with-fluss-flink-2-2-delta-force-f184bd17e223?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/f184bd17e223</guid>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[big-data]]></category>
            <category><![CDATA[apache-flink]]></category>
            <category><![CDATA[data-science]]></category>
            <category><![CDATA[programming]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Fri, 09 Jan 2026 13:23:04 GMT</pubDate>
            <atom:updated>2026-01-12T21:56:11.373Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*cKPuBRlNd2zv8MPrqpggaQ.png" /></figure><p><a href="https://flink.apache.org/2025/12/04/apache-flink-2.2.0-advancing-real-time-data--ai-and-empowering-stream-processing-for-the-ai-era/">Apache Flink 2.2</a> delivers three major enhancements to <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin">Delta Join</a> that transform it from a promising optimisation into a production-ready solution for real-world streaming pipelines. If you’ve been waiting for Delta Join to handle CDC sources, support caching or allow filter pushdowns: the wait is over.</p><p>This article covers what’s new, why it matters, and how to put these capabilities into practice. If you’re unfamiliar with Delta Join fundamentals, my <a href="https://medium.com/fresha-data-engineering/what-the-fuss-with-fluss-flink-delta-force-1ab3d6be5c98">previous article on Flink 2.1’s Delta Join</a> provides the architectural foundation.</p><p>Flink 2.2 addresses the most critical gaps in Delta Join’s 2.1 implementation:</p><ol><li><strong>CDC Upsert Support</strong> (<a href="https://issues.apache.org/jira/browse/FLINK-38511">FLINK-38511</a>): Delta Join now accepts UPDATE_AFTER records, making it usable with MySQL CDC, PostgreSQL CDC, and <a href="https://debezium.io/">Debezium</a> connectors</li><li><strong>Built-in Caching</strong> (<a href="https://issues.apache.org/jira/browse/FLINK-38495">FLINK-38495</a>): An integrated LRU cache reduces external storage lookups by 80–90% for typical workloads</li><li><strong>Filter and Projection Support</strong> (<a href="https://issues.apache.org/jira/browse/FLINK-38556">FLINK-38556</a>): Deterministic filters and column projections can now be used by Delta Join, reducing data transfer and enabling new query patterns</li></ol><p>Together, these changes enable Delta Join for the most common streaming use case.</p><h3>Quick Recap</h3><p>The core problem Delta Join solves is state explosion. In a traditional streaming join, Flink keeps both sides of the join in state, every customer record, every order, everything needed to match late-arriving data. A 10-million-row customer table with 30-day retention easily consumes gigabytes of state per TaskManager. Checkpoints slow down, recovery takes forever, and your cloud bill climbs.</p><p>Delta Join flips this model. Instead of storing data in Flink’s state, it lives in external indexed store like <a href="https://fluss.apache.org/">Apache Incubating Fluss</a>. When a stream record arrives, Flink performs an async lookup against Fluss rather than querying local state. The result? State drops by 99%+ for large dimensions.</p><h3>CDC Upsert Support: The Game Changer</h3><p><a href="https://issues.apache.org/jira/browse/FLINK-38511"><strong>FLINK-38511</strong></a> makes Delta Join usable for real-world database replication. This single enhancement transforms Delta Join from a niche optimization for INSERT-only tables into a practical solution for the most common streaming use case: bidirectional enrichment from CDC sources.</p><p>CDC sources define four types of row operations:</p><ul><li>+I (INSERT) — New records added to the table</li><li>-UB / +UA (UPDATE_BEFORE / UPDATE_AFTER) — Modifications to existing records</li><li>-D (DELETE) — Records removed from the table</li></ul><p>Flink 2.1’s Delta Join could only handle INSERT operations from indexed lookup store like Fluss. Flink 2.2&#39;s code changes are deceptively simple but architecturally significant. In StreamingDeltaJoinOperator.java, the row kind validation evolved:</p><pre>// Flink 2.1: INSERT-only<br>Preconditions.checkArgument(<br>    RowKind.INSERT == element.getValue().getRowKind(),<br>    &quot;Delta join only supports INSERT records&quot;);<br><br>// Flink 2.2: INSERT + UPDATE_AFTER<br>Preconditions.checkArgument(<br>    RowKind.INSERT == element.getValue().getRowKind() ||<br>    RowKind.UPDATE_AFTER == element.getValue().getRowKind(),<br>    &quot;Currently, delta join only supports INSERT and UPDATE_AFTER records&quot;);</pre><p><strong>What Delta Join now supports</strong>:</p><ul><li>✅ INSERT (+I) — New dimension records</li><li>✅ UPDATE_AFTER (+UA) — Updated dimension values</li><li>❌ UPDATE_BEFORE (-UB) — Automatically dropped</li><li>❌ DELETE (-D) — Not supported (by fundamental design)</li></ul><h3>Why UPDATE_BEFORE is Automatically Dropped</h3><p>You might wonder: if Delta Join supports UPDATE_AFTER, why not UPDATE_BEFORE? The answer lies in the stateless design:</p><ol><li><strong>No Join State to Retract</strong>: Delta Join doesn’t maintain a table of join results in state. When an UPDATE arrives, there’s no previous join result to retract , the operator simply performs a fresh lookup.</li><li><strong>UPDATE_AFTER is Sufficient</strong>: When a dimension record updates from {id=1, name=&quot;Alice&quot;} to {id=1, name=&quot;Alice2&quot;}, the CDC stream produces -UB(id=1, name=&quot;Alice&quot;) followed by +UA(id=1, name=&quot;Alice2&quot;). The +UA implicitly replaces the old value in the dimension table&#39;s indexed storage.</li><li><strong>Downstream Idempotency</strong>: The enriched results flow to an idempotent sink (typically with a primary key), which naturally handles the update by overwriting the previous row.</li></ol><figure><img alt="Diagram showing a Flink Delta Join changelog path. A Source emits UB and UA events for a row update. The Flink Planner inserts a DropUpdateBefore node that removes UB and forwards UA only. Delta Join performs an async lookup and outputs the updated row. An Upsert Sink receives the joined result and overwrites the previous value for that key, demonstrating idempotent upsert behavior." src="https://cdn-images-1.medium.com/max/1024/1*tORE-1oVJuHdSIbC4ZI4UQ.png" /><figcaption>Delta Join changelog propagation path in Flink 2.2: the source emits update-before and update-after events, the planner drops update-before records, the Delta Join performs async lookups and emits only update-after joined rows and the upsert sink overwrites the previous key value.</figcaption></figure><p>The StreamPhysicalDropUpdateBefore node is automatically inserted during query optimization, you never see it in your SQL, but it&#39;s there in the execution plan, silently removing unnecessary retractions.</p><h3>Why DELETE Still Isn’t Supported</h3><p>This isn’t a TODO item, it’s a <strong>fundamental architectural constraint</strong>.</p><p><strong>No State for Retraction</strong>: To properly handle DELETE, Delta Join would need to remember what it previously emitted so it can retract the correct join result. But maintaining this state defeats the entire purpose of Delta Join and we’d be back to a regular stateful join with all its memory overhead.</p><p>The code in DeltaJoinUtil.java makes this explicit:</p><pre>private static boolean onlyProduceInsertOrUpdateAfter(StreamPhysicalRel node) {<br>    ChangelogMode changelogMode = getChangelogMode(node);<br>    Set&lt;RowKind&gt; allKinds = changelogMode.getContainedKinds();<br>    return !allKinds.contains(RowKind.UPDATE_BEFORE)<br>        &amp;&amp; !allKinds.contains(RowKind.DELETE);<br>}</pre><p>This validation isn’t commented as “TODO: support DELETE.”, it’s a design invariant for now.</p><p>The standard workaround is <strong>soft deletes. </strong>When a customer is “deleted,” CDC produces +UA(id=123, deleted=true), which Delta Join handles perfectly. Downstream analytics can filter on the deleted flag, and you maintain a complete audit trail.</p><p>CDC support makes Delta Join usable. But Flink 2.2 also makes it <strong>fast</strong>. Two complementary enhancements: built-in caching and filter/projection support combine to deliver speculatively 2-5x performance improvements over the 2.1 baseline.</p><h3>Built-in Caching Layer (<a href="https://issues.apache.org/jira/browse/FLINK-38495">FLINK-38495</a>)</h3><p>Every external lookup has cost: network round-trip, storage I/O, serialization overhead. In Flink 2.1, every stream record triggered a lookup, even if the same dimension key was looked up milliseconds earlier. For skewed workloads (hot customers, popular products), this meant repeatedly fetching the same data.</p><p><strong>Flink 2.2 introduces an integrated LRU cache</strong> that sits between the Delta Join operator and external storage. The implementation in DeltaJoinCache.java uses<a href="https://guava.dev/releases/21.0/api/docs/com/google/common/cache/Cache.html"> Guava Cache</a> with a two-level structure:</p><pre>// Dual-sided caching (one for left lookups, one for right)<br>private final Cache&lt;RowData, LinkedHashMap&lt;RowData, Object&gt;&gt; leftCache;<br>private final Cache&lt;RowData, LinkedHashMap&lt;RowData, Object&gt;&gt; rightCache;</pre><p><strong>Why this structure?</strong></p><ul><li><strong>Outer Cache</strong>: Maps join key → inner map (O(1) lookup by join key)</li><li><strong>Inner LinkedHashMap</strong>: Maps upsert key → row data (handles multiple rows per join key)</li><li><strong>LRU Eviction</strong>: Automatically manages memory when cache reaches configured size</li></ul><p>The inner map is crucial for handling dimension updates. When customer_id=123 updates, the cache upserts the new value in-place for that upsert key, ensuring lookups always see the latest data until LRU eviction.</p><p><strong>Configuration</strong>:</p><pre>table.exec.delta-join.cache-enabled: true      # Default<br>table.exec.delta-join.left.cache-size: 10000   # Max cached join keys<br>table.exec.delta-join.right.cache-size: 10000</pre><p><strong>Metrics to Monitor</strong>:</p><pre>deltaJoin.leftCache.hitRate          # Target: &gt;80%<br>deltaJoin.leftCache.requestCount     # Total lookups attempted<br>deltaJoin.leftCache.hitCount         # Successful cache hits<br>deltaJoin.leftCache.keySize          # Distinct join keys cached<br>deltaJoin.leftCache.totalNonEmptyValues  # Total cached rows</pre><p>Watch the hit rate. If it’s below 70%, either increase cache size or investigate data skew (if lookups are uniformly distributed across millions of keys, even a large cache won’t help and that’s a sign Delta Join might not be the right fit).</p><h3><strong>Filter and Projection Support (FLINK-38556)</strong></h3><p>Projections genuinely reduce data transfer: Delta Join fetches only the columns you specify from Fluss. Filters work differently: they’re applied in Flink <em>after</em> the lookup returns data. The rows still get fetched from Fluss; they just get filtered before joining.</p><p><strong>Projection (the real win)</strong></p><pre>SELECT o.*, c.name, c.tier<br>FROM orders o<br>JOIN (<br>  SELECT id, name, tier FROM customers  -- 3 columns instead of 20<br>) c ON o.customer_id = c.id;</pre><p>The projectionOnTemporalTable field in DeltaJoinSpec specifies which columns to fetch. Fluss returns only the requested columns. For wide dimension tables, this cuts network I/O and cache memory by 50–80%.</p><p><strong>Filters (applied post-lookup)</strong></p><p>Flink 2.2 allows exactly one Calc node between the source scan and Delta Join. Filters extracted from this node are applied in Flink after the lookup via a generated FlatMapFunction in AsyncDeltaJoinRunner. Data still gets fetched from Fluss, filtering just reduces rows before joining.</p><p>For CDC sources, filters must reference only upsert key columns.</p><p><strong>Why this restriction?</strong> Delta Join can emit duplicates because CDC updates on either side trigger fresh lookups against the opposite table and any in-flight async operations get replayed after checkpoint recovery. For upsert key columns (which determine record identity), values are stable, so filtering on them is safe. For non-upsert-key columns, values can change between duplicate emissions, leading to inconsistent results:</p><figure><img alt="Diagram showing two cases. Left: filter uses the upsert key (customer_id &gt; 50). A row update from US to UK still matches, so UB and UA changelog events are emitted. Right: filter uses a non-upsert key (country = ‘US’). After update, the new value no longer matches, so the events are not emitted. Demonstrates why filters must align with upsert keys to preserve update visibility." src="https://cdn-images-1.medium.com/max/1024/1*DiXmSIKJnJ34Nvdgt6_wxA.png" /><figcaption>Comparison of upsert-key filters vs non-upsert-key filters: when filtering on the upsert key, updates stay visible. When filtering on non-upsert key, updates can disappear.</figcaption></figure><p>For INSERT-only sources, filter on any column as no updates means no inconsistency.</p><p>All filters must be deterministic. RAND(), NOW(), CURRENT_TIMESTAMP() are rejected because duplicate emissions would produce different results.</p><p>There’s a separate concept that can cause confusion: some connectors support FilterPushDownSpec, which actually pushes filters to the storage layer. This is orthogonal to FLINK-38556. When a connector like Fluss supports this capability, Delta Join validates that those pushed filters also follow the upsert key restriction for CDC sources.</p><p>Correctness and changelog requirements on upstream data sources go beyond Delta Join and open up a larger conversation related to generic Join semantic. That’s outside our scope here, but <a href="https://issues.apache.org/jira/browse/FLINK-38579">FLINK-38579</a> provides a good entry point.</p><p>Caching reduces lookup frequency by 80–90%. Projection reduces data transfer by 50–80%. Filters reduce rows processed in Flink (but don’t reduce data fetched from Fluss unless the connector supports FilterPushDownSpec). Together, these make Delta Join significantly faster than 2.1.</p><h3>Design Tradeoffs and Limitations</h3><p>Delta Join’s performance comes with constraints. Understanding these limitations and why they exist as it’s critical for deciding when to use this optimization.</p><h4><strong>Indexing</strong></h4><p>Delta Join will not apply without proper indexing<strong>.</strong> The validation logic in DeltaJoinUtil.java checks that join keys completely cover at least one index or prefix of it defined in Fluss.</p><p>Delta Join performs thousands of lookups per second against external storage. Without indexes, each lookup becomes a full table scan turning an O(1) indexed lookup into an O(n) sequential scan. For a million-row dimension table, that’s the difference between 1ms and 1000ms per lookup.</p><h4>Only INNER JOIN</h4><p>This isn’t a limitation to be worked around as it’s an architectural invariant. LEFT, RIGHT, and FULL OUTER joins are fundamentally incompatible with Delta Join’s stateless design.</p><p>The problem with LEFT JOIN:</p><pre>T0: Order for customer_id=999 arrives<br>    → Async lookup → customer 999 not found in dimension table<br>    → LEFT JOIN semantics: emit (order_data, NULL, NULL, NULL)<br><br>T5: Customer 999 is added to dimension table<br>    → Should we now emit (order_data, customer_name, customer_tier, ...)?<br>    → But how do we retract the previous (order_data, NULL, NULL, NULL)?<br>Fundamental issue: No state tracking what was emitted previously</pre><p>Delta Join doesn’t maintain join results in state (that’s the whole point inavoiding state!). For INNER JOIN, unmatched rows are simply skipped, so no NULL emission, no retraction needed. For LEFT JOIN, unmatched rows <em>must</em> be emitted with NULL padding, and if that dimension record appears later, we’d need to retract the NULL emission and emit the matched result. This requires state.</p><p>From DeltaJoinUtil.java:</p><pre>public static boolean isJoinTypeSupported(FlinkJoinType flinkJoinType) {<br>    return FlinkJoinType.INNER == flinkJoinType;<br>}</pre><p>If you need LEFT JOIN semantics, use a regular stateful join as the state cost is unavoidable for correctness.</p><h4>The Idempotency</h4><p>Perhaps the most critical constraint: downstream operators must handle duplicate records idempotently. I mentioned this in previous section.</p><p>If downstream can’t handle seeing (order_id=100, customer_name=&#39;Alice&#39;) twice, the pipeline breaks.</p><p><strong>Bad</strong>: Append-only sinks</p><pre>CREATE TABLE kafka_results (<br>  order_id INT,<br>  customer_name STRING<br>) WITH (&#39;connector&#39; = &#39;kafka&#39;, &#39;format&#39; = &#39;json&#39;);<br><br>-- Kafka consumers will see duplicate records!<br>INSERT INTO kafka_results<br>SELECT o.id, c.name FROM orders o<br>JOIN customers c ON o.customer_id = c.id;</pre><p><strong>Good</strong>: Upsert sinks</p><pre>CREATE TABLE kafka_results (<br>  order_id INT PRIMARY KEY NOT ENFORCED,<br>  customer_name STRING<br>) WITH (<br>  &#39;connector&#39; = &#39;upsert-kafka&#39;,<br>  &#39;key.format&#39; = &#39;json&#39;,<br>  &#39;value.format&#39; = &#39;json&#39;<br>);<br><br>-- Duplicates merged by primary key → idempotent<br>INSERT INTO kafka_results<br>SELECT o.id, c.name FROM orders o<br>JOIN customers c ON o.customer_id = c.id;</pre><p><strong>What qualifies as idempotent</strong>:</p><ul><li>Upsert sinks (Fluss KV table, Kafka with upsert mode, JDBC with primary key, lake tables, StarRocks PK tables)</li><li>Stateless transformations (map, flatMap without side effects)</li><li>Any other sink that deduplicates by key</li></ul><p>If your downstream processing is append-only or has side effects (incrementing counters, triggering alerts), Delta Join will cause correctness issues.</p><h4>Filters limitations</h4><p>We covered this in the filter section, but it’s worth emphasizing: for CDC sources, filters must reference only upsert key columns, otherwise Flink falls back to a regular stateful join.</p><h3>How to choose now?</h3><figure><img alt="Table comparing Regular Join, Lookup Join, and Delta Join 2.2 across multiple aspects. Rows include: State Size (both streams vs left-only vs cache), Changelog Support, Join Types, Dimension Updates, External Storage, Latency, Throughput, Cache, Filter/Projection, Use Case, and State TTL. Highlights that Delta Join uses async lookup, supports INSERT and UPDATE_AFTER, and targets large CDC dimensions with high throughput." src="https://cdn-images-1.medium.com/max/1024/1*iCzrHz4Xxza4J11WsOh9eg.png" /><figcaption>Comparison matrix</figcaption></figure><figure><img alt="Vertical decision diagram for when to use Delta Join. Questions include: “Dimension &lt;1M rows or no index?”, “Hard deletes required?”, “Outer join needed?”, “Downstream non-idempotent?”, and “Ultra-low latency required?”. All answers shown as “No” lead to “Delta Join”, with a note saying “reduced state, high throughput, large CDC dims, async lookup.”" src="https://cdn-images-1.medium.com/max/1024/1*NY6t1jO7NDe-7eFmKN_0Kg.png" /><figcaption>Decision checklist for using Delta Join. If the dimension is indexed and large, hard deletes are not required, no outer join is needed, downstream is idempotent and ultra-low latency is not mandatory, then Delta Join is suitable and to consider.</figcaption></figure><h3>Closing Thoughts</h3><p>Apache Flink 2.2 transforms Delta Join from a promising optimization into a viable solution for real-world streaming pipelines.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/952/1*MASa7MTRAIZnGrH-BuHeMQ.png" /><figcaption>Relaxed gotcha to start thinking about The Delta Join + Fluss</figcaption></figure><p>Delta Join isn’t universal. INNER JOIN only, no DELETE support, mandatory indexes and the downstream idempotency requirement aren’t limitations to be worked around. They’re fundamental architectural tradeoffs. The optimization trades flexibility (no outer joins, no hard deletes) for efficiency (99% state reduction, 10x throughput).</p><p>For organizations processing millions of streaming events against large reference data, Flink 2.2’s Delta Join can reduce state by 99%, cut infrastructure costs significantly, and improve throughput by an order of magnitude, all while handling real-world CDC patterns that were impossible in 2.1.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=f184bd17e223" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/what-the-fuss-with-fluss-flink-2-2-delta-force-f184bd17e223">What the Fuss with Fluss: Flink 2.2 Delta Force</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Adding Authentication to Elixir Native Kafka Client: Trust, but Verify]]></title>
            <link>https://medium.com/fresha-data-engineering/adding-authentication-to-elixir-native-kafka-client-trust-but-verify-f7d6d3876cff?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/f7d6d3876cff</guid>
            <category><![CDATA[security]]></category>
            <category><![CDATA[elixir]]></category>
            <category><![CDATA[programming]]></category>
            <category><![CDATA[open-source]]></category>
            <category><![CDATA[kafka]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Mon, 15 Dec 2025 17:03:37 GMT</pubDate>
            <atom:updated>2025-12-15T17:11:18.173Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/818/1*vLZa2axiBxEyUdtv7wu44g.png" /></figure><p>There was an open <a href="https://github.com/kafkaex/kafka_ex/issues/242">issue</a> on the kafka_ex GitHub repository from 2017 asking for SASL authentication support. Eight years later, I closed it with my <a href="https://github.com/kafkaex/kafka_ex/pull/498">implementation</a>.</p><p>At Fresha, we run kafka_ex in production. It does the job. But some managed <a href="https://kafka.apache.org/">Kafka</a> providers require authentication and that missing feature had been sitting there for years. I got curious: what would it take to add SASL to an existing Kafka client? How does the protocol actually work? How hard could it be?</p><p>I decided to find out, as eight years is a long time for a feature request to sit open, so there is definitely something to learn in my favourite DIY way.</p><p>This is what I built. PLAIN and SCRAM mechanisms, the wire protocol details, the design decisions that make it extensible. <a href="https://elixir-lang.org/">Elixir</a> might not be your language, but the code is straightforward and the concepts are universal.</p><blockquote>Note: Medium code blocks seem to not support Elixir highlighting, so I would use Ruby one as a workaround, hope it won’t be confusing.</blockquote><p>Let’s start with what we’re actually implementing.</p><h3>What Is SASL?</h3><p><a href="https://datatracker.ietf.org/doc/html/rfc4616">SASL</a> (Simple Authentication and Security Layer) is a framework that separates authentication mechanisms from application protocols. Kafka adopted it to support multiple auth methods through one interface.</p><p>When a client connects to a SASL-enabled broker, authentication happens immediately before any produce or consume operations:</p><ol><li>Client opens TCP connection (usually with TLS)</li><li>Client queries broker’s API versions (optional but recommended)</li><li>Client sends SASL handshake, proposing a mechanism</li><li>Broker accepts or rejects</li><li>Client and broker exchange mechanism-specific messages</li><li>Authentication succeeds or fails</li><li>Normal Kafka operations begin</li></ol><figure><img alt="Sequence diagram of SASL auth between client (kafka_ex) and broker: TCP connect → optional TLS handshake → ApiVersions request/response → SaslHandshake request/response → loop of SaslAuthenticate request/response steps 1..n to complete authentication." src="https://cdn-images-1.medium.com/max/1024/1*7hmoi_E7K8Ff0LHHTmswew.png" /><figcaption>SASL flow in kafka_ex: TCP connect, optional TLS, ApiVersions negotiation, SASL handshake (pick mechanism), then one or more SaslAuthenticate request/response steps (e.g., SCRAM) before normal Kafka operations.</figcaption></figure><p>I implemented two mechanisms, with the architecture ready for more:</p><p><strong>PLAIN</strong> is the simplest: send username and password in a single message. It must run over TLS as sending credentials in cleartext is obviously a bad idea. Despite its simplicity, PLAIN is widely supported and often sufficient for internal services.</p><p><a href="https://en.wikipedia.org/wiki/Salted_Challenge_Response_Authentication_Mechanism"><strong>SCRAM</strong></a> (Salted Challenge Response Authentication Mechanism) is more sophisticated. Your password never crosses the wire. Instead, client and server exchange cryptographic proofs:</p><ol><li>Client sends a random nonce</li><li>Server responds with its own nonce, a salt and iteration count</li><li>Client derives proofs from the password and sends them</li><li>Server validates and sends its own proof</li><li>Client validates server’s proof (mutual authentication)</li></ol><p>Capturing the exchange doesn’t help replay it, nonces are unique. Offline brute-force is possible but PBKDF2 makes it expensive. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512.</p><figure><img alt="Sequence diagram of SCRAM exchange between kafka_ex client and broker using SaslAuthenticate requests/responses: client-first message, server-first reply with salt/iter/nonce, client-final proof, server-final signature or error, then “AUTH OK” and normal Kafka API interactions." src="https://cdn-images-1.medium.com/max/1024/1*qT72cMxI2rO2jOZ8EbIlrA.png" /><figcaption>SCRAM over Kafka SaslAuthenticate: client-first → server-first (nonce, salt, iterations) → client-final (proof) → server-final (server signature or error). On success, SASL completes and the connection can use normal Kafka APIs.</figcaption></figure><p><a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876"><strong>OAUTHBEARER</strong></a> (<a href="https://github.com/kafkaex/kafka_ex/pull/507">open PR</a>) uses JWT tokens from an OAuth provider. You supply a token provider function, the library calls it on each connection.</p><p><a href="https://github.com/aws/aws-msk-iam-auth"><strong>MSK IAM</strong></a> (planned) would use AWS credentials for Amazon MSK. Same pattern, though different credential source, same SASL framework.</p><p>I wanted to add authentication without rewriting existing code. More importantly, I wanted adding <em>new</em> mechanisms to be trivial.</p><figure><img alt="Diagram showing authentication architecture. KafkaEx.Socket calls Auth.SASL orchestrator, which connects to SASL Codec for encoding and Mechanism module for auth logic. Socket I/O sends encoded frames to Kafka broker." src="https://cdn-images-1.medium.com/max/1024/1*MTOOqlXJbpinioqJm1EiTg.png" /><figcaption>Authentication flow: KafkaEx.Socket triggers SASL orchestrator, which picks a mechanism module and passes it a send_fun closure. The mechanism builds payloads without knowing about wire protocol, send_fun handles encoding via Codec and I/O operations underneath.</figcaption></figure><p>Every SASL mechanism does two things: announce its name and perform authentication. In Elixir, we express this contract with a <a href="https://elixirschool.com/en/lessons/advanced/behaviours">behaviour</a>:</p><pre>defmodule KafkaEx.Auth.Mechanism do<br>  @type auth_opts :: KafkaEx.Auth.Config.t()<br>  @type send_fun :: (binary() -&gt; {:ok, binary() | nil} | {:error, term()})<br>  @callback mechanism_name(auth_opts()) :: String.t()<br>    @callback authenticate(auth_opts(), send_fun()) :: :ok | {:error, term()}<br>  end</pre><p>The send_fun abstraction is key. Mechanism modules don&#39;t know about sockets or Kafka framing: they send bytes, receive responses. This makes testing straightforward and keeps responsibilities separate.</p><p>Adding a new mechanism means implementing these two functions and registering in a map:</p><pre>@mechanisms %{<br>  plain: KafkaEx.Auth.SASL.Plain,<br>  scram: KafkaEx.Auth.SASL.Scram<br>  # oauthbearer: KafkaEx.Auth.SASL.OAuthBearer  &lt;- open PR<br>  # msk_iam: KafkaEx.Auth.SASL.MskIam           &lt;- future<br>}</pre><p>Authentication config needs validation up front:</p><pre>defmodule KafkaEx.Auth.Config do<br>  @enforce_keys [:mechanism, :username, :password]<br><br>  defstruct mechanism: :plain,<br>            username: nil,<br>            password: nil,<br>            mechanism_opts: %{}<br>  <br>  @type t :: %__MODULE__{<br>            mechanism: :plain | :scram,<br>            username: String.t(),<br>            password: String.t(),<br>            mechanism_opts: map()<br>          }<br>end</pre><p>The mechanism_opts map handles mechanism-specific settings: algorithm choice for SCRAM, token provider for OAUTHBEARER. Validation happens at construction:</p><pre>defp validate_config(%{mechanism: :plain} = cfg) do<br>  unless cfg[:username] &amp;&amp; cfg[:password] do<br>    raise ArgumentError, &quot;PLAIN requires username and password&quot;<br>  end<br>  cfg<br>end<br><br>defp validate_config(%{mechanism: :scram} = cfg) do<br>  unless cfg[:username] &amp;&amp; cfg[:password] do<br>    raise ArgumentError, &quot;SCRAM requires username and password&quot;<br>  end<br>  cfg<br>end</pre><p>Kafka uses length-prefixed binary messages. Authentication needs three request types: API Versions, SASL Handshake and SASL Authenticate.</p><p>Each request has a header with API key, version, correlation ID and client ID:</p><pre>defmodule KafkaEx.Auth.SASL.CodecBinary do<br>  @sasl_handshake_key 17<br>  @sasl_authenticate_key 36<br><br>def handshake_request(mechanism, corr, ver, client_id \\ &quot;kafka_ex&quot;) do<br>    &lt;&lt;<br>      @sasl_handshake_key::16,<br>      ver::16,<br>      corr::32,<br>      byte_size(client_id)::16,<br>      client_id::binary,<br>      byte_size(mechanism)::16,<br>      mechanism::binary<br>    &gt;&gt;<br>  end<br>  def authenticate_request(auth_bytes, corr, ver, client_id \\ &quot;kafka_ex&quot;) do<br>    &lt;&lt;<br>      @sasl_authenticate_key::16,<br>      ver::16,<br>      corr::32,<br>      byte_size(client_id)::16,<br>      client_id::binary,<br>      byte_size(auth_bytes)::32,<br>      auth_bytes::binary<br>    &gt;&gt;<br>  end<br>end</pre><p>Elixir’s binary syntax makes this readable: &lt;&lt;value::16&gt;&gt; means &quot;encode as 16-bit integer&quot;. Everything is explicitly sized, so no parsing ambiguity.</p><p>Responses need careful validation:</p><pre>def parse_handshake_response(&lt;&lt;corr::32, rest::binary&gt;&gt;, expected_corr, mechanism, ver) do<br>  if corr != expected_corr do<br>    {:error, :correlation_mismatch}<br>  else<br>    parse_handshake_body(rest, mechanism, ver)<br>  end<br>end<br><br>defp parse_handshake_body(&lt;&lt;0::16, _rest::binary&gt;&gt;, _mechanism, 0), do: :ok<br>defp parse_handshake_body(&lt;&lt;0::16, rest::binary&gt;&gt;, mechanism, 1) do<br>  mechanisms = parse_mechanism_list(rest)<br>  if mechanism in mechanisms, do: :ok, else: {:error, {:unsupported_mechanism, mechanism}}<br>end<br>defp parse_handshake_body(&lt;&lt;err::16-signed, _::binary&gt;&gt;, _, _) do<br>  {:error, {:handshake_failed, error_atom(err)}}<br>end</pre><p>Correlation mismatch means request/response pairing broke. Error codes translate to atoms that mean something when debugging.</p><h3>PLAIN</h3><p>PLAIN is almost trivial:</p><pre>defmodule KafkaEx.Auth.SASL.Plain do<br>  @behaviour KafkaEx.Auth.Mechanism<br><br>  @impl true<br>  def mechanism_name(_), do: &quot;PLAIN&quot;<br>  @impl true<br>  def authenticate(%Config{username: user, password: pass}, send_fun) do<br>    # RFC 4616: [authzid] NUL authcid NUL passwd<br>    payload = &lt;&lt;0, user::binary, 0, pass::binary&gt;&gt;<br>    <br>    case send_fun.(payload) do<br>      {:ok, _} -&gt; :ok<br>      error -&gt; error<br>    end<br>  end<br>end</pre><p>One message, three fields separated by null bytes: authorization identity (empty), authentication identity (username), password. Send it, you’re done.</p><p>The simplicity is deceptive though as PLAIN <em>requires</em> TLS. Without encryption, you’re sending credentials in cleartext. The version support module enforces this:</p><pre>def validate_config(%Config{mechanism: :plain}, socket) do<br>  if socket.ssl do<br>    :ok<br>  else<br>    {:error, :plain_requires_tls}<br>  end<br>end</pre><h3>SCRAM</h3><p>SCRAM is more involved. It’s a stateful protocol, each message depends on previous exchanges.</p><figure><img alt="Diagram showing SCRAM authentication in six steps: Client sends nonce, Server responds with salt and iterations, Client derives keys using PBKDF2 and HMAC, Client builds auth_msg and computes proof using XOR, Client sends proof, Server reverses XOR and validates by comparing hashes." src="https://cdn-images-1.medium.com/max/1024/1*Qs__-k0IJxLsjrS4MyKmEw.png" /><figcaption>SCRAM flow simplified. Client sends nonce, server responds with salt and iterations. Client derives keys from password (never sent), builds proof via XOR trick, sends it. Server reverses XOR to verify client knows the password. Some nonce details omitted, see <a href="https://datatracker.ietf.org/doc/html/rfc5802">RFC 5802</a> for full spec.</figcaption></figure><p>So I modeled the required state as a struct that accumulates values through the exchange:</p><pre>defmodule KafkaEx.Auth.ScramFlow.Internal do<br>  defstruct [<br>    :algorithm,<br>    :username,<br>    :password,<br>    :client_nonce,<br>    :client_first_bare,<br>    :server_first_raw,<br>    :server_nonce,<br>    :salt,<br>    :iterations,<br>    :auth_message,<br>    :server_signature<br>  ]<br>end</pre><p>The flow threads state through each step:</p><pre>def authenticate(username, password, algo, send_fun) do<br>  st0 = %Internal{<br>    algorithm: algo, <br>    username: username, <br>    password: password, <br>    client_nonce: nonce()<br>  }<br>  <br>  {client_first, st1} = Internal.client_first(st0)<br><br>  with {:ok, server_first} &lt;- send_fun.(client_first),<br>       {:ok, st2} &lt;- Internal.handle_server_first(st1, server_first),<br>       {client_final, st3} &lt;- Internal.client_final(st2),<br>       {:ok, server_final} &lt;- send_fun.(client_final) do<br>    Internal.verify_server_final(st3, server_final)<br>  end<br>end<br><br>defp nonce(len \\ 24), do: len |&gt; :crypto.strong_rand_bytes() |&gt; Base.encode64(padding: false)</pre><p>Pure functional composition. Each step succeeds with new state or fails with a reason.</p><p>We establish identity and randomness:</p><pre>def client_first(%__MODULE__{} = s) do<br>  gs2 = &quot;n,,&quot;  # No channel binding<br>  cfb = &quot;n=#{escape(s.username)},r=#{s.client_nonce}&quot;<br>  {gs2 &lt;&gt; cfb, %{s | client_first_bare: cfb}}<br>end<br><br>defp escape(str) do<br>  str<br>  |&gt; String.replace(&quot;=&quot;, &quot;=3D&quot;)<br>  |&gt; String.replace(&quot;,&quot;, &quot;=2C&quot;)<br>end</pre><p>Username escaping follows RFC 5802.</p><p>The server sends its nonce (appended to ours), a salt and iteration count:</p><pre>def handle_server_first(%__MODULE__{} = s, server_first) do<br>  with %{&quot;r&quot; =&gt; nonce, &quot;s&quot; =&gt; salt_b64, &quot;i&quot; =&gt; iter_str} &lt;- parse_kv(server_first),<br>       true &lt;- String.starts_with?(nonce, s.client_nonce) or {:error, :invalid_server_nonce} do<br>    {:ok, %{s | <br>      server_first_raw: server_first,<br>      server_nonce: nonce,<br>      salt: Base.decode64!(salt_b64),<br>      iterations: String.to_integer(iter_str)<br>    }}<br>  end<br>end</pre><p>Nonce validation is critical, the server’s nonce must start with ours. This prevents replay attacks.</p><p>Here’s where security happens:</p><pre>def client_final(%__MODULE__{} = s) do<br>  salted = pbkdf2(s.algorithm, s.password, s.salt, s.iterations)<br>  <br>  client_key = hmac(s.algorithm, salted, &quot;Client Key&quot;)<br>  server_key = hmac(s.algorithm, salted, &quot;Server Key&quot;)<br>  stored_key = hash(s.algorithm, client_key)<br>  <br>  cfwp = &quot;c=#{Base.encode64(&quot;n,,&quot;)},r=#{s.server_nonce}&quot;<br>  auth_message = &quot;#{s.client_first_bare},#{s.server_first_raw},#{cfwp}&quot;<br>  <br>  client_sig = hmac(s.algorithm, stored_key, auth_message)<br>  proof = :crypto.exor(client_key, client_sig)<br>  server_sig = hmac(s.algorithm, server_key, auth_message)<br>  <br>  final_message = &quot;#{cfwp},p=#{Base.encode64(proof)}&quot;<br>  <br>  {final_message, %{s | auth_message: auth_message, server_signature: server_sig}}<br>end</pre><p>We send a <em>proof</em> we know the password, not the password itself. The server verifies against stored data without seeing plaintext credentials.</p><p>SCRAM is mutual and we verify the server too:</p><pre>def verify_server_final(%__MODULE__{} = s, server_final) do<br>  case parse_kv(server_final) do<br>    %{&quot;v&quot; =&gt; sig_b64} -&gt;<br>      if Base.decode64!(sig_b64) == s.server_signature do<br>        :ok<br>      else<br>        {:error, :server_signature_mismatch}<br>      end<br>    <br>    %{&quot;e&quot; =&gt; error} -&gt;<br>      {:error, {:server_error, error}}<br>  end<br>end</pre><p>This catches man-in-the-middle attacks where someone impersonates the broker.</p><p>The public interface is clean:</p><pre>defmodule KafkaEx.Auth.SASL.Scram do<br>  @behaviour KafkaEx.Auth.Mechanism<br><br>@impl true<br>  def mechanism_name(%Config{mechanism_opts: %{algo: :sha512}}), do: &quot;SCRAM-SHA-512&quot;<br>  def mechanism_name(%Config{}), do: &quot;SCRAM-SHA-256&quot;<br>  @impl true<br>  def authenticate(%Config{username: u, password: p, mechanism_opts: opts}, send_fun) do<br>    algo = if opts[:algo] == :sha512, do: :sha512, else: :sha256<br>    ScramFlow.authenticate(u, p, algo, send_fun)<br>  end<br>end</pre><p>Same behaviour, same interface. The complexity stays isolated in ScramFlow.</p><p>The SASL module coordinates the pieces:</p><pre>defmodule KafkaEx.Auth.SASL do<br>  @mechanisms %{<br>    plain: KafkaEx.Auth.SASL.Plain,<br>    scram: KafkaEx.Auth.SASL.Scram<br>  }<br><br>  def authenticate(socket, %Config{} = creds) do<br>    with {:ok, mech_mod} &lt;- get_mechanism_module(creds),<br>         api_versions &lt;- fetch_api_versions_if_needed(socket),<br>         handshake_v &lt;- CodecBinary.pick_handshake_version(api_versions),<br>         auth_v &lt;- api_versions |&gt; CodecBinary.pick_authenticate_version() |&gt; min(1),<br>         :ok &lt;- perform_handshake(socket, mech_mod, handshake_v, creds),<br>         :ok &lt;- mech_mod.authenticate(creds, &amp;send_authenticate(socket, &amp;1, auth_v)) do<br>      Logger.debug(&quot;SASL authentication successful&quot;)<br>      :ok<br>    else<br>      {:error, reason} = err -&gt;<br>        Logger.error(&quot;SASL authentication failed: #{inspect(reason)}&quot;)<br>        err<br>    end<br>  end<br>end</pre><p>The with chain reads like a checklist: get module, fetch versions, handshake, authenticate.</p><h3>Testing Against Real Kafka</h3><p>Unit tests aren’t enough for authentication. You need real brokers.</p><p>Docker Compose with <a href="https://hub.docker.com/r/confluentinc/cp-kafka/">Confluent’s Kafka image</a>:</p><pre>services:<br>  kafka-1:<br>    image: confluentinc/cp-kafka:7.0.4<br>    ports:<br>      - &quot;9092:9092&quot;   # No auth<br>      - &quot;9192:9192&quot;   # SASL/PLAIN<br>      - &quot;9292:9292&quot;   # SASL/SCRAM<br>    environment:<br>      KAFKA_LISTENERS: &quot;INTERNAL://:29092,NOAUTH://:9092,PLAIN://:9192,SCRAM://:9292&quot;<br>      KAFKA_ADVERTISED_LISTENERS: &quot;INTERNAL://kafka-1:29092,NOAUTH://localhost:9092,PLAIN://localhost:9192,SCRAM://localhost:9292&quot;<br>    env_file: docker-compose-kafka.env</pre><p>The env file configures <a href="https://en.wikipedia.org/wiki/Java_Authentication_and_Authorization_Service">JAAS</a>:</p><pre>KAFKA_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512<br>KAFKA_LISTENER_NAME_plain_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required user_test=&quot;secret&quot;;<br>KAFKA_LISTENER_NAME_scram_SCRAM-SHA-256_SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required;<br>KAFKA_LISTENER_NAME_scram_SCRAM-SHA-512_SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required;</pre><p>SCRAM users need creation in <a href="https://zookeeper.apache.org/">ZooKeeper</a>(this docker setup uses Zookeeper):</p><pre>docker exec kafka-1 kafka-configs --zookeeper zookeeper:32181 \<br>  --alter --add-config &#39;SCRAM-SHA-256=[password=secret],SCRAM-SHA-512=[password=secret]&#39; \<br>  --entity-type users --entity-name test</pre><p>Tests verify actual authentication:</p><pre>defmodule KafkaEx.Integration.SaslAuthenticationTest do<br>  use ExUnit.Case<br>  @moduletag :integration<br><br>  describe &quot;SASL/PLAIN authentication&quot; do<br>      @tag sasl: :plain<br>      test &quot;connects and produces with PLAIN&quot; do<br>        opts = [<br>          uris: [{&quot;localhost&quot;, 9192}],<br>          use_ssl: true,<br>          ssl_options: [verify: :verify_none],<br>          auth: KafkaEx.Auth.Config.new(%{<br>            mechanism: :plain,<br>            username: &quot;test&quot;,<br>            password: &quot;secret&quot;<br>          })<br>        ]<br>        {:ok, _pid} = KafkaEx.create_worker(:plain_worker, opts)<br>        assert :ok = KafkaEx.produce(&quot;test_topic&quot;, 0, &quot;test&quot;, worker_name: :plain_worker)<br>      end<br>    end<br>  describe &quot;SASL/SCRAM authentication&quot; do<br>    for algo &lt;- [:sha256, :sha512] do<br>      @tag sasl: :scram, algo: algo<br>      test &quot;connects with SCRAM-#{algo}&quot; do<br>        worker_name = :&quot;scram_#{unquote(algo)}_worker&quot;<br>        opts = [<br>          uris: [{&quot;localhost&quot;, 9292}],<br>          use_ssl: true,<br>          ssl_options: [verify: :verify_none],<br>          auth: KafkaEx.Auth.Config.new(%{<br>            mechanism: :scram,<br>            username: &quot;test&quot;,<br>            password: &quot;secret&quot;,<br>            mechanism_opts: %{algo: unquote(algo)}<br>          })<br>        ]<br>        {:ok, _pid} = KafkaEx.create_worker(worker_name, opts)<br>        assert :ok = KafkaEx.produce(&quot;test_topic&quot;, 0, &quot;test&quot;, worker_name: worker_name)<br>      end<br>    end<br>  end<br>end</pre><p>We don’t just test authentication succeeds, we verify Kafka operations work afterward.</p><p>Configuration is straightforward:</p><pre># PLAIN<br>config :kafka_ex,<br>  brokers: [{&quot;kafka.example.com&quot;, 9192}],<br>  use_ssl: true,<br>  sasl: %{<br>    mechanism: :plain,<br>    username: System.get_env(&quot;KAFKA_USERNAME&quot;),<br>    password: System.get_env(&quot;KAFKA_PASSWORD&quot;)<br>  }</pre><pre># SCRAM<br>config :kafka_ex,<br>  brokers: [{&quot;kafka.example.com&quot;, 9292}],<br>  use_ssl: true,<br>  sasl: %{<br>    mechanism: :scram,<br>    username: System.get_env(&quot;KAFKA_USERNAME&quot;),<br>    password: System.get_env(&quot;KAFKA_PASSWORD&quot;),<br>    mechanism_opts: %{algo: :sha256}<br>  }</pre><p>Then use kafka_ex normally, authentication is invisible.</p><h3>What’s Next</h3><p>The nice thing about this shape is that new mechanisms don’t sprawl through the client. OAUTHBEARER is basically the same contract with a different payload (a <a href="https://en.wikipedia.org/wiki/JSON_Web_Token">JWT</a> you fetch from a provider). MSK IAM would land the same way too: different credential source, same SASL seam.</p><p>Adding one is boring on purpose:</p><ul><li>implement mechanism_name/1 and authenticate/2</li><li>validate whatever it needs in Config</li><li>register the module in the mechanism map</li><li>wire up a broker in Docker so you can prove it works end-to-end</li></ul><p>Two lessons from doing this in a real codebase:</p><p>kafka_ex already has its own idioms. If you follow them, your changes look like they were always there and reviewers don’t have to “learn your framework” to approve auth.</p><p>And tests matter more than you want them to. The Docker setup took longer than writing PLAIN, but it’s what caught the real mistakes, the kind you never see in unit tests.</p><p>Also, fail loudly. Auth failures are not the place for “connection closed”. Returning tagged errors like {:error, :server_signature_mismatch} saves hours when something is misconfigured.</p><h3><strong>For Fresha and Beyond</strong></h3><p>We needed SASL to improve our Kafka setup. So I added it to kafka_ex in a way that doesn’t leak sockets into mechanisms and doesn’t turn auth into a special case: a small contract (Mechanism + send_fun), strict validation and tagged failures you can debug from logs.</p><p>PLAIN is deliberately simple (and deliberately TLS-only). SCRAM is the opposite: stateful, proof-based, mutual verification and still fits behind the same seam.</p><p>If you need SASL in kafka_ex, it’s there. If you’re implementing it elsewhere, the takeaway is simple:</p><blockquote><strong>Design the clean seam, test against a real broker and make insecure configurations impossible.</strong></blockquote><h4>Appreciation</h4><p>Thanks to Piotr Rybarczyk for the review, it helped to keep the scope tight and avoid unnecessary integration points.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=f7d6d3876cff" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/adding-authentication-to-elixir-native-kafka-client-trust-but-verify-f7d6d3876cff">Adding Authentication to Elixir Native Kafka Client: Trust, but Verify</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[StarRocks Incremental MV: A Bridge Over Shifting Ice]]></title>
            <link>https://medium.com/fresha-data-engineering/starrocks-incremental-mv-a-bridge-over-shifting-ice-759df57bc720?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/759df57bc720</guid>
            <category><![CDATA[technology]]></category>
            <category><![CDATA[starrocks]]></category>
            <category><![CDATA[big-data]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[real-time-analytics]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Wed, 26 Nov 2025 17:16:39 GMT</pubDate>
            <atom:updated>2025-11-26T19:38:07.202Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*uR3ztWVXP8tJukbzr1FQOQ.png" /></figure><blockquote>Note: The <a href="https://materializedview.io/p/everything-to-know-incremental-view-maintenance">Incremental View Maintenance</a> framework Phase 1 discussed here has been merged into <a href="https://www.starrocks.io/">StarRocks</a> but is not yet released. This article explores the architecture and concepts based on the <a href="https://github.com/StarRocks/starrocks/issues/61789">merged PRs</a>, discussing what’s coming and why it matters conceptually, not what you can run in production today yet.</blockquote><p>Incremental view maintenance is a very simple idea: when base data changes, update the view without recomputing everything. What makes it hard is threading deltas through joins, aggregations, and state makes it one of the harder problems in database engineering.</p><p>I’ve written about some of the conceptual underpinnings before in <a href="https://medium.com/fresha-data-engineering/freeze-the-rivers-flow-evolution-of-streaming-computation-models-d4e5d0889205">my streaming computation article</a>, where I argued that if we can rely on determinism, we can use simpler models to track the flow of changes without recomputing everything or synchronizing state globally. The amount of work becomes proportional to the amount of change. Though it was about streaming engines, it seems a more generic problem I also explored why we need <a href="https://medium.com/fresha-data-engineering/iceberg-cdc-stream-a-little-dream-of-me-a7c9f9e6e11d">more efficient CDC behavior from Iceberg</a> and how snapshot boundaries and metadata overhead shape what’s possible for incremental pipelines.</p><p>Now it’s time to put the whole bigger picture together. Because while all of that was interesting, it felt speculative, talks about a future that wasn’t quite here. Visionary and poetical, perhaps, but not yet practical.</p><p>But let’s see how it can be made practical, interleaved with that vision.</p><p><a href="https://docs.starrocks.io/docs/using_starrocks/async_mv/Materialized_view/">Materialized views</a> in StarRocks are powerful. The query rewrite engine transparently routes queries through fresh MVs and the refresh scheduler handles maintenance automatically. But before this work, they suffered a major drawback: refresh was partition-based. If 10 records touched 10 partitions, you’d recalculate 10 partitions. The granularity was too coarse, the cost unpredictable, sometimes dangerous.</p><p>IVM is basically the idea of making the amount of work proportional to the amount of input and threading it through the computation graph. <a href="https://www.feldera.com/">Feldera</a> is one of my favorite examples as DBSP model enforces a synchronous clock where time behaves like database transactions, with determinism guaranteeing that the same inputs always produce the same outputs. StarRocks borrows conceptually from this feature.</p><p>The determinism requirement isn’t incidental. As Jack Vanlightly recently detailed in <a href="https://jack-vanlightly.com/blog/2025/11/24/demystifying-determinism-in-durable-execution">the context of durable execution</a>: deterministic control flow means recovery is replay from durable inputs, not restoration from checkpointed state. Streaming IVM relies on the same foundation conceptually: deterministic operators over versioned deltas.</p><p>I should also mention <a href="https://docs.snowflake.com/en/user-guide/dynamic-tables-about">Snowflake’s Dynamic Tables</a> also prove incremental maintenance capabilities by utilising automatic refresh mechanisms that can process only changed data when the query supports it. But as it’s closed source I wouldn’t discuss it here and leave it for the interested people to reason about the implementation details.</p><p>The first part of this effort is append-only <a href="https://iceberg.apache.org/">Iceberg</a>-based MVs that can be maintained incrementally instead of refreshing whole partitions. Combined with StarRocks’ <a href="https://docs.starrocks.io/docs/using_starrocks/async_mv/use_cases/query_rewrite_with_materialized_views/">MV rewrite</a>, this forms a powerful synergy: fast refreshes save compute costs with transparent fallback to full calculation if the MV isn’t fresh enough. The more efficient this maintenance becomes, the closer it can be pushed toward real-time recalculation.</p><p>But we need better Iceberg CDC. If every delta requires reading through chain of manifests and metadata files, that overhead exists outside the engine — it’s a format-level limitation. That’s why <a href="https://lists.apache.org/thread/x7688x5w25klodlfqmtsv80r1obyqqrz">Iceberg V4’s proposals</a> matter: the Root Manifest concept would concentrate change detection at a single point, making delta discovery a metadata operation rather than a manifest-walking exercise. These foundational pieces in the format are the prerequisite for incremental computation that actually scales.</p><p>After reading the <a href="https://www.vldb.org/pvldb/vol16/p1601-budiu.pdf">DBSP paper</a>, I expressed the idea that this approach is so powerful that database engines should eventually support it natively. Now StarRocks is pushing in this direction, so for me it feels as the natural progression in some sense. And I’m very excited to explore it with you.</p><h3>Previous Model: PCT</h3><p>Before diving into the new architecture, it’s worth understanding what <a href="https://docs.starrocks.io/docs/sql-reference/sql-statements/materialized_view/REFRESH_MATERIALIZED_VIEW/">came before</a>.</p><p>StarRocks’ existing approach is called PCT as for <strong>Partition Change Tracking</strong>. The MaterializedView.RefreshMode enum captures the options:</p><pre>public enum RefreshMode {<br>    AUTO,<br>    PCT,          // Partition-based refresh mode (partition-change-tracking)<br>    FULL,         // Full refresh mode<br>    INCREMENTAL;  // Incremental refresh mode - the new IVM approach<br>}</pre><p>PCT tracks changes at the partition level of base tables to enable efficient incremental processing. When a base table’s partition is updated, StarRocks identifies which MV partitions are affected and refreshes only those via INSERT OVERWRITE. This is smarter than full refresh, but the granularity is still partitions.</p><p>The shortcomings compound in practice. If your base table isn’t partitioned, you can’t create a partitioned MV, there’s nothing to track incrementally. MV partitions must align strictly with base table partitions: if your base uses date_trunc partitioning, your MV must follow the same expression. And when MVs contain many group-by keys over large datasets, partition-level refresh can OOM because you&#39;re still reprocessing entire partition contents.</p><p>PCT was a reasonable solution within its constraints. But those constraints reflect a partition-centric worldview, put it simply, changes happen to partitions, not to rows. The new IVM framework inverts this: changes are row-level deltas that propagate through the computation graph.</p><figure><img alt="Side-by-side comparison of PCT and IVM refresh strategies. Left (PCT): base table has four partitions, Part2 and Part4 each have +1 row change. MV refresh shows Skip for Part1 and Part3, Overwrite for Part2 and Part4 — entire partitions recomputed. Right (IVM): same base table with Part2 and Part4 changed. MV refresh shows single box with Prev data preserved, plus two small +1 row appends. IVM work is proportional to change, PCT work is proportional to partition size." src="https://cdn-images-1.medium.com/max/1024/1*1idBrLk2IC6eQTYB0_wS7A.png" /><figcaption>PCT vs IVM refresh granularity: when two rows touch two partitions, PCT overwrites both entire partitions. IVM appends only the two changed rows, preserving existing data untouched.</figcaption></figure><h3>Time-Varying Relations and Computational Algebra</h3><p>The conceptual heart of the new framework is the Time-Varying Relation (TVR). Looking at the implementation, TVR represents a logical relation that evolves over time, supporting versioned and temporal queries.</p><p>Every base table tracked by an MV maintains a version range: where it was, where it is now. The delta between these versions defines exactly what changed. TvrTableSnapshot captures a point-in-time state, while TvrTableDelta represents the difference between two snapshots.</p><figure><img alt="TVR tracking: MV last refresh (snap_2) vs base table now (snap_3) produces delta. Engine scans only new file c.parquet, skipping unchanged a and b." src="https://cdn-images-1.medium.com/max/1024/1*6wHjAgHPXAoi0Bty5OpfBA.png" /><figcaption>TVR version tracking conceptually: the MV records its last-refreshed snapshot (snap_2), compares against the base table’s current snapshot (snap_3), and computes a delta that identifies only the new files. The engine scans c.parquet, ignoring unchanged data.</figcaption></figure><p>The MVIVMBasedMVRefreshProcessor orchestrates this. When refresh triggers, it first collects change snapshots by comparing each base table&#39;s current version against what was recorded at the last refresh:</p><pre>public ProcessExecPlan getProcessExecPlan(TaskRunContext taskRunContext) throws Exception {<br>    // ...<br>    // collect change snapshots<br>    try (Timer ignored = Tracers.watchScope(&quot;MVRefreshCheckChangedVersionRanges&quot;)) {<br>        final Map&lt;BaseTableInfo, TvrVersionRange&gt; mvTvrVersionRangeMap =<br>                mv.getRefreshScheme().getAsyncRefreshContext().getBaseTableInfoTvrVersionRangeMap();<br>        for (BaseTableSnapshotInfo snapshotInfo : snapshotBaseTables.values()) {<br>            TvrVersionRange changedVersionRange =<br>                    getBaseTableChangedVersionRange(snapshotInfo, mvTvrVersionRangeMap, currentRefreshMode);<br>            logger.info(&quot;Base table: {}, changed version range: {}&quot;,<br>                    snapshotInfo.getBaseTableInfo().getTableName(), changedVersionRange);<br>            // collect changed version range<br>            TvrTableSnapshotInfo tvrTableSnapshotInfo = (TvrTableSnapshotInfo) snapshotInfo;<br><br>            tempMvTvrVersionRangeMap.put(snapshotInfo.getBaseTableInfo(), changedVersionRange);<br>            // update the snapshot info with the changed version range<br>            tvrTableSnapshotInfo.setTvrSnapshot(changedVersionRange);<br>        }<br>    }<br>    boolean isTaskRunSkipped = snapshotBaseTables.values().stream()<br>            .map(snapshotInfo -&gt; (TvrTableSnapshotInfo) snapshotInfo)<br>            .map(TvrTableSnapshotInfo::getTvrSnapshot)<br>            .allMatch(TvrVersionRange::isEmpty);<br>    if (isTaskRunSkipped) {<br>        logger.info(&quot;No base table has changed, skip the refresh for materialized view: {}&quot;,<br>                mv.getName());<br>        return new ProcessExecPlan(Constants.TaskRunState.SKIPPED, null, null);<br>    }<br>    // ... <br>}</pre><p>The getBaseTableChangedVersionRange method computes the delta by comparing the MV&#39;s recorded version against the table&#39;s current snapshot:</p><pre>TvrVersionRange beforeTvrVersionRange = mvTvrVersionRangeMap.get(baseTableInfo);<br>TvrVersion beforeVersion = beforeTvrVersionRange.to;<br>if (beforeVersion.equals(currentVersion)) {<br>    // no change, so we can skip the refresh<br>    logger.info(&quot;Base table {} has not changed, skip to refresh&quot;, baseTableInfo.getTableName());<br>    return TvrTableDelta.of(beforeVersion, currentVersion);<br>}<br>return TvrTableDelta.of(beforeVersion, currentVersion);</pre><p>The processor then validates that all deltas in the range are append-only — the current Phase 1 limitation:</p><pre>for (TvrTableDeltaTrait deltaTrait : tableDeltaTraits) {<br>    if (!deltaTrait.isAppendOnly()) {<br>        throw new SemanticException(&quot;TvrTableDeltaTrait is not append-only for base table: %s.%s&quot;,<br>                baseTableInfo.getDbName(), baseTableInfo.getTableName(), deltaTrait);<br>    }<br>}</pre><p>The Iceberg connector integration is where this gets concrete. The listTableDeltaTraits method walks the snapshot ancestry:</p><pre>final Iterable&lt;Snapshot&gt; snapshots = SnapshotUtil.ancestorsBetween(<br>        toSnapshotIdInclusive, fromSnapshotIdExclusive, nativeTable::snapshot);<br>for (Snapshot snapshot : snapshots) {<br>    long currentSnapshotId = snapshot.snapshotId();<br>    TvrTableDelta delta = TvrTableDelta.of(currentSnapshotId, lastSnapshotId);<br>    TvrDeltaStats stats = TvrDeltaStats.of(snapshot.addedRows());<br>    if (snapshot.operation() != null &amp;&amp; snapshot.operation().equals(DataOperations.APPEND)) {<br>        tvrDeltaTraits.add(TvrTableDeltaTrait.ofMonotonic(delta, stats));<br>    } else {<br>        tvrDeltaTraits.add(TvrTableDeltaTrait.ofRetractable(delta, stats));<br>    }<br>}</pre><p>This distinguishes between monotonic changes (append-only) and retractable changes (updates/deletes). For Phase 1, only monotonic changes are supported — Iceberg’s IncrementalAppendScan API handles this cleanly:</p><pre>if (tvrVersionRange.start() != null &amp;&amp; tvrVersionRange.start().isPresent()) {<br>    IncrementalAppendScan incrementalAppendScan = nativeTbl.newIncrementalAppendScan();<br>    // Configure scan with version range<br>}</pre><p>Instead of scanning full partitions, the engine reads only the files added between two snapshots. Iceberg’s metadata already tracks this — every snapshot records which data files were added. The incremental scan produces exactly those files, nothing more.</p><h3>Aggregate Combinator Functions</h3><p>The hardest part of IVM isn’t reading deltas, it’s always computing through them. Aggregations are the canonical challenge: if you have SUM(amount) over a billion rows and 10 new rows arrive, you shouldn&#39;t rescan the billion.</p><p>StarRocks solves this with aggregate combinator functions. The comments in the header file shows the pattern:</p><pre>// agg_state_combine.h<br>struct AggStateCombineState<br>// This combinator is equivalent to calling `{agg_func}_union({agg_func}_state(arg_types))` in SQL,<br>// but with reduced function call overhead and memory allocation for better performance.<br>// eg:<br>// - SQL: sum_union(sum_state(col))<br>// - This combinator: sum_combine(col)<br>//<br>// DESC: intermediate_type {agg_func}_combine(arg types)<br>//  input type          : aggregate function&#39;s arg types<br>//  intermediate type   : aggregate function&#39;s intermediate_type<br>//  return type         : aggregate function&#39;s intermediate_type</pre><p>The key insight: aggregates have intermediate states that can be merged. For SUM, the intermediate state is just the running total. For COUNT, it&#39;s a counter. These states are associative — you can combine partial results in any order and get the same answer.</p><p>The combinator function suite includes:</p><ul><li>{agg}_state(args) — converts raw values to intermediate state</li><li>{agg}_union(state) — merges intermediate states</li><li>{agg}_merge(state) — produces final result from intermediate state</li><li>{agg}_combine(args) — shorthand for _union(_state(args))</li></ul><p>For IVM, the existing MV result becomes the accumulated state. New deltas convert to intermediate state via _state, merge with existing state via _union, and the combined state replaces the old. Work is proportional to delta size, not total data size.</p><figure><img alt="Diagram of aggregate combinator functions in IVM. Existing MV State contains groups A state(500), B state(300), C state(200). New Delta has raw values for group A (50, 25) and group C(75). Arrow labeled sum_state converts delta to intermediate form. Both streams feed into sum_union producing Updated MV State: A becomes state(575), B unchanged at state(300), C becomes state(275). Only changed groups recomputed." src="https://cdn-images-1.medium.com/max/1024/1*cX65S7wqiCBJh3gqLr9y9g.png" /><figcaption>Aggregate combinator flow: new delta rows convert to intermediate state via sum_state, then merge with existing MV state via sum_union. Group A’s three new values (50+25=75) fold into state(500) to produce state(575). Group C(75) also contributes. Group B stays untouched — no delta, no work.</figcaption></figure><p>The AggStateCombineCombinator in Java bridges this to the optimizer:</p><pre>public static Optional&lt;AggStateCombineCombinator&gt; of(AggregateFunction aggFunc) {<br>    Type intermediateType = aggFunc.getIntermediateTypeOrReturnType().clone();<br>    FunctionName functionName = new FunctionName(<br>        AggStateUtils.aggStateCombineFunctionName(aggFunc.functionName()));<br>    // ... build combinator with proper type handling<br>}</pre><p>This is classic incremental view maintenance made practical. The idea that aggregates with associative merge operations can be updated incrementally rather than recomputed has been known for decades. StarRocks implements it through aggregate state algebra: combinator functions expose intermediate representations that can be merged, so new contributions fold into existing results without touching the original data.</p><p>But it’s worth to note that the framework is architected for full differential computation — TvrTableDeltaTrait already distinguishes monotonic from retractable changes, and the combinator functions provide the algebraic foundation for mergeable state. Phase 1 implements the append-only path, but the abstractions are designed to extend toward indexed state and retraction propagation. What DBSP formalizes with <a href="https://www.feldera.com/blog/z-sets-representing-database-changes">Z-sets</a>, StarRocks is building toward through aggregate state algebra and TVR semantics concepts.</p><h3>Rewriting Queries for Incremental Execution</h3><p>The optimizer transformations make this work end-to-end. The RuleSet adds TVR-specific rules:</p><pre>import com.starrocks.sql.optimizer.rule.tvr.TvrAggregateRule;<br>import com.starrocks.sql.optimizer.rule.tvr.TvrFilterRule;<br>import com.starrocks.sql.optimizer.rule.tvr.TvrJoinRule;<br>import com.starrocks.sql.optimizer.rule.tvr.TvrProjectRule;</pre><p>Each rule transforms standard operators to their incremental equivalents. TvrAggregateRule rewrites aggregations to use combinator functions. TvrJoinRule handles join semantics, but here&#39;s an important nuance: for append-only inputs, new rows from the left side must still join against the <em>full</em> right side snapshot (and vice versa) to produce correct results. The delta identifies what&#39;s new, but the join still needs the complete picture from the other side.</p><figure><img alt="Flowchart of IVM join processing. Top shows Delta Left Part with new rows L1, L2 and Full Right Part with complete snapshot R1, R2, R3. Both feed into Join operation with condition L.key = R.key. Arrow labeled “Only new matches” points to result box containing L1⋈R1 and L2⋈R3. Final arrow labeled “Insert into MV” shows new results appending to materialized view. Key insight: delta from one side joins against full snapshot from other side." src="https://cdn-images-1.medium.com/max/1024/1*uwJMGAbOipQV5mqUMyLMoA.png" /><figcaption>Incremental join semantics for Phase 1: new rows from the left delta (L1, L2) join against the full right snapshot (R1, R2, R3). Only the new matches (L1⋈R1, L2⋈R3) are appended to the MV — existing results stay untouched.</figcaption></figure><p>This is where systems like Feldera have an edge. Feldera maintains indexed arrangements: compact, time-indexed structures that support efficient point lookups. When a delta arrives, the join probes the index rather than scanning the full relation. StarRocks’ current approach is more straightforward: read the delta from one side, read the current snapshot from the other, join them. It’s correct and still far better than partition-level refresh, but there’s room for optimization as indexed state structures mature in the engine.</p><p>The optimizer entry point gates this on a session variable:</p><pre>if (context.getSessionVariable().isEnableIVMRefresh()) {<br>    tree = logicalTvrRuleRewrite(tree, rootTaskContext, requiredColumns);<br>}</pre><p>Once enabled, the refresh processor prepares context and triggers the rewrite:</p><pre>ctx.getSessionVariable().setEnableIVMRefresh(true);<br>ctx.getSessionVariable().setTvrTargetMvid(GsonUtils.GSON.toJson(mv.getMvId()));</pre><p>The MV definition query gets rewritten. Table scans become incremental scans bounded by version ranges. Aggregations become state merges. The resulting plan processes deltas and produces delta outputs that append to the existing MV.</p><h3>Iceberg as the Causal Foundation</h3><p>Why Iceberg specifically? Because its snapshot model provides exactly the causal ordering that IVM requires. Though there is PR to do the same with Paimon, for example.</p><p>Every Iceberg commit creates a snapshot with a unique ID and a parent pointer. Snapshots form a linear history — you can always ask “what changed between snapshot A and snapshot B” and get a deterministic answer. The SnapshotUtil.ancestorsBetween call walks this chain.</p><p>More critically, each snapshot records its operation type. The code checks:</p><pre>if (snapshot.operation() != null &amp;&amp; snapshot.operation().equals(DataOperations.APPEND)) {<br>    tvrDeltaTraits.add(TvrTableDeltaTrait.ofMonotonic(delta, stats));<br>} else {<br>    tvrDeltaTraits.add(TvrTableDeltaTrait.ofRetractable(delta, stats));<br>}</pre><p>Append operations are monotonic: data only grows, never changes. This is the tractable case for Phase 1. The MV state is a pure function of all appended data and new appends extend it without invalidating existing results.</p><p>Retractable changes (updates, deletes) require the full differential machinery, tracking which rows were removed, propagating negative contributions through aggregations, handling join semantics where old matches must be undone. This is Phase 2 territory, likely requiring <a href="https://github.com/apache/iceberg/pull/11130">Iceberg V3’s row lineage features</a> to produce proper CDC streams.</p><h3>Creating an Incremental MV</h3><p>The proposed syntax is straightforward:</p><pre>CREATE MATERIALIZED VIEW test_mv1 PARTITION BY dt <br>REFRESH DEFERRED MANUAL <br>properties<br>(<br>    &quot;refresh_mode&quot; = &quot;incremental&quot;<br>)<br>AS SELECT t1.dt, t1.col1, t2.col1, t1.col2, t2.col2<br>FROM iceberg_catalog.db.t1 <br>JOIN iceberg_catalog.db.t2 ON t1.dt = t2.dt;</pre><p>The &quot;refresh_mode&quot; = &quot;incremental&quot; property triggers IVMBasedMVRefreshProcessor instead of MVPCTBasedRefreshProcessor. On refresh, the processor:</p><ol><li>Collects current snapshots for all base tables</li><li>Compares against the last-refreshed snapshots stored in baseTableInfoTvrDeltaMap</li><li>Computes deltas for each base table that changed</li><li>Rewrites the MV query to process only those deltas</li><li>Executes the incremental plan, appending results to the MV</li><li>Updates the stored snapshot references</li></ol><p>The test cases demonstrate the flow:</p><pre>REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;<br>-- Initial population</pre><pre>insert into iceberg_table values (&#39;new_row&#39;, 100, &#39;2023-12-01&#39;);<br>-- Base table gets new data</pre><pre>REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;<br>-- Only the new row is processed</pre><p>You can switch modes dynamically:</p><pre>ALTER MATERIALIZED VIEW test_mv1 SET(&quot;refresh_mode&quot; = &quot;auto&quot;);</pre><p>AUTO mode lets StarRocks choose the best strategy based on delta characteristics, falling back to partition-based or full refresh when incremental doesn&#39;t make sense.</p><h3>Limitations and the Road Ahead</h3><p>This is Phase 1 of a larger effort, and it’s important to be clear about current boundaries.</p><p>The implementation supports:</p><ul><li>Append-only Iceberg tables (no updates or deletes)</li><li>Select, Project, Filter, Join, Aggregate, UnionAll operators</li><li>Basic aggregate functions with proper combinator support</li></ul><p>What’s explicitly not yet supported:</p><ul><li>Retractable changes (updates, deletes) require Iceberg V3 CDC capabilities and more work</li><li>Window functions with complex framing</li><li>Non-deterministic expressions</li><li>Indexed state for efficient join lookups (joins still read full snapshots from one side)</li></ul><p>The retractable change support is particularly interesting. Iceberg V3 introduces row lineage with durable row IDs and sequence numbers that survive compaction. Combined with <a href="https://medium.com/@vincent_daniel/deletion-vectors-and-puffin-files-merge-in-the-new-v3-iceberg-format-565188036d0c">deletion vectors</a>, this enables precise CDC: you can ask “what rows were added, changed, or removed” and get exact answers without scanning. The TvrTableDeltaTrait.ofRetractable path is already stubbed out, waiting for that foundation.</p><h3>Bridging the dots</h3><p>What first seemed visionary with better Iceberg CDC, progress-based computation models, determinism as a correctness foundation, now it’s gradually being woven into actual database engine code.</p><p>You could always theoretically build this with Flink, for example: read CDC streams, maintain state, output to a sink. But the operational cost is immense. Flink requires dedicated infrastructure, careful state backend configuration, checkpoint tuning and job-specific operator graphs. For each materialized view, you’d be building a mini-streaming-application.</p><p>StarRocks is integrating this directly into the database engine. You write SQL, declare a refresh mode, and the optimizer handles the rest. The MV rewrite system serves queries transparently. The scheduler handles refresh timing. State lives in the MV itself, so no external systems, so no operational overhead.</p><p>This is a significant conceptual shift worth discussing, even before it ships. We’ve spent years building external streaming infrastructure to handle what databases should have done natively. The new papers and tools utilising deterministic computation over time-indexed deltas is both simpler and more powerful than coordination-based checkpointing, and it’s finally being absorbed into some query engines.</p><p>The current implementation has clear boundaries: append-only tables, joins that still read full snapshots from one side. Feldera’s indexed arrangements are more sophisticated for stateful operations and StarRocks is nowhere there in the current phase yet. Iceberg V4’s Root Manifest proposal would make delta discovery and computations truly cheap, but that’s still in design. These are honest limitations that would require time to overcome and improve on.</p><p>But the architecture is right. TVR as the semantic model with relations that vary over time, with deltas as first-class citizens. Combinator functions that make differential aggregation algebraically clean. Optimizer rules that rewrite queries transparently. Iceberg snapshots as the causal foundation, with V3’s lineage and V4’s possibility to operate with compact deltas pointing toward a future where CDC is a metadata operation.</p><p>What makes this exciting isn’t that it’s perfect: it’s that a production database engine is building the infrastructure for incremental computation natively. The amount of work is finally becoming proportional to the amount of change.</p><p>The bridge over shifting ice is being built right now.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/918/1*0wkzkl56r7CCMUtR_NBTjQ.png" /><figcaption>To put it simply :)</figcaption></figure><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=759df57bc720" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/starrocks-incremental-mv-a-bridge-over-shifting-ice-759df57bc720">StarRocks Incremental MV: A Bridge Over Shifting Ice</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Freeze the River’s Flow: Evolution of Streaming Computation Models]]></title>
            <link>https://medium.com/fresha-data-engineering/freeze-the-rivers-flow-evolution-of-streaming-computation-models-d4e5d0889205?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/d4e5d0889205</guid>
            <category><![CDATA[data]]></category>
            <category><![CDATA[apache-flink]]></category>
            <category><![CDATA[feldera]]></category>
            <category><![CDATA[technology]]></category>
            <category><![CDATA[data-engineering]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Wed, 12 Nov 2025 15:51:54 GMT</pubDate>
            <atom:updated>2025-11-12T17:29:45.171Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*iY-eTwz99xDzK_M2U7FaIQ.png" /></figure><blockquote>This piece builds on <a href="https://medium.com/fresha-data-engineering/what-the-fuss-with-fluss-flink-delta-force-1ab3d6be5c98">my recent work</a> with Apache Fluss and Flink 2.1’s DeltaJoin feature, zooming out to examine the broader philosophical divide emerging in streaming computation.</blockquote><h3>Enough Time to Memorise</h3><p>Everyone wants to make stream processors <em>intelligent</em>. Agents that optimise jobs, copilots that rewrite SQL, pipelines that explain themselves in the language of prompts.<br>But the real frontier for streaming computations isn’t in reasoning about data, it’s in <strong>remembering it</strong>.</p><p>A stream never stops. That’s its beauty and its curse.<br>Data flows through a cluster of machines, each doing its part: transforming, joining, aggregating. And somewhere in the middle of all that motion, we expect the system to remember to recover if a node dies and to start again without losing itself.<br>But how can something that never pauses learn to look back?</p><p>That question: “<em>how a distributed system remembers</em> ?” — has haunted computer science for forty years. It began, surprisingly, not with Flink or Kafka or the cloud, but with two researchers sketching an idea on <a href="https://lamport.azurewebsites.net/pubs/chandy.pdf">paper in 1985</a>. K. Mani Chandy and Leslie Lamport asked: could you record a system’s entire global state without ever stopping it?</p><p>Their answer was both elegant and impossible-sounding: yes, if you redefine what “at the same time” means.</p><p>In their world, processes don’t share a single clock. They exchange messages. Each message defines a relationship : <em>this happened before that.</em><br>So Chandy and Lamport proposed a simple trick: send a special message, a <strong>marker, </strong>through every communication channel. When a process receives that marker, it snapshots its local memory and forwards the marker onward. When all markers have circled the network, you can stitch those local pictures together into a single, consistent global cut as a moment in distributed time that never truly existed, yet represents the system perfectly.</p><figure><img alt="A diagram showing three processes labeled Process 1, Process 2, and Process 3 connected in sequence. Process 1 sends a marker to Process 2, which sends another to Process 3. Each process records its local state and inbound messages when the marker is received. The snapshots are collected in a shaded area labeled “Consistent Cut at Time T,” representing a consistent global state across all processes." src="https://cdn-images-1.medium.com/max/1024/1*mY5uFbzAiLF-rSwtLAGBPA.png" /><figcaption>The original Chandy–Lamport snapshot algorithm (1985) conceptually: one process initiates a snapshot by sending a marker to others. Each process, upon receiving the marker, records its own local state and any in-transit messages. Together, these local snapshots form a consistent global cut across the distributed system, a moment in time that never truly existed, yet represents the system perfectly.</figcaption></figure><p>No halting. No central coordinator. Just a chorus of local snapshots harmonised by causality.</p><p>It was the first real solution to the paradox: how to make a running system remember itself without breaking the flow of time.</p><p>Every distributed database, stream processor and checkpoint mechanism today is, in some way, a descendant of that idea. When we talk about checkpoints in Flink, we are really talking about an automated, industrial-scale version of that thought experiment — a continuous act of collective memory.</p><p>And as we push further: toward disaggregated state, externalised logs and time-aware computation, that old illusion begins to flicker. The river still flows, but maybe it no longer needs to freeze for us to remember its shape.</p><h3>Streams Go Stateful</h3><p>When Chandy and Lamport wrote their paper, the idea of a “stream” was still abstract: a metaphor for distributed messages, not the backbone of real-time data infrastructure.<br>Decades later, <a href="https://flink.apache.org/">Apache Flink</a> turned that metaphor into machinery. What began as a theoretical idea became a living mechanism that could survive crashes, scale across thousands of nodes and still remember exactly where it had been.</p><p>At its heart, Flink’s approach is a direct descendant of the original algorithm.<br>What Chandy and Lamport called a <em>marker</em>, Flink turned into a <strong>barrier</strong>,<strong> </strong>an actual message that flows through the data stream.<br>Each source injects barriers at regular intervals, tagging a consistent moment in logical time. As those barriers travel downstream, every operator waits until it has received the same barrier from all its inputs, then takes a local snapshot of its internal state. Once every operator has reported completion for that barrier ID, the JobManager declares a <strong>checkpoint</strong>: a global, consistent image of the dataflow at that instant.</p><figure><img alt="A diagram showing Flink’s checkpoint flow. The Coordinator sits at the top, connected to two operators in sequence. Operator 1 forwards a barrier to Operator 2. When each operator receives the barrier, it writes a snapshot of its local state to storage. The Coordinator then collects these snapshot handles to complete the checkpoint." src="https://cdn-images-1.medium.com/max/1024/1*T7Ucu-7dm0ry8yuUvmFCmQ.png" /><figcaption>How Flink turns the Chandy–Lamport snapshot into a living mechanism: each operator takes a local snapshot when it receives a barrier, persists its state, forwards the barrier downstream. The coordinator later gathers all state handles to form one consistent global checkpoint.</figcaption></figure><p>The crux is that the job never stops and barriers weave through the same channels as the data, letting the system capture itself without halting. That’s how consistency emerges as a by-product of flow.</p><p>But turning that elegant idea into production engineering took a decade of iteration. Real streams are messy: they backpressure, spill to disk and occasionally drown their operators in terabytes of state.<br>To cope, Flink learned new tricks and shifted the initial model.</p><h3>Learning to Cope with Scale</h3><p>As deployments grew from tens to thousands of tasks, the original checkpoint model started to bend. A single consistent cut meant propagating barriers across the graph, blocking input channels and serializing each operator’s <a href="https://rocksdb.org/">RocksDB</a> or heap state to remote storage. On multi-terabyte pipelines that could saturate I/O for minutes, so the community didn’t abandon the model, it enriched with ingenious ideas and clever tricks.</p><h4><a href="https://diogodssantos.medium.com/async-barrier-snapshots-for-distributed-dataflows-in-5-hours-java-hands-on-inspired-by-apache-7fc4e1541768">Asynchronous snapshots</a></h4><p>Originally, the operator thread stalled while state was written. From <strong>1.3.0</strong>, Flink moved serialization to a background I/O thread: the operator marks the cut at the barrier and keeps processing while the async phase persists bytes. This eliminated full-graph stalls during snapshotting.</p><h4><a href="https://flink.apache.org/2018/01/30/managing-large-state-in-apache-flink-an-intro-to-incremental-checkpointing/">Incremental checkpoints</a></h4><p>With <strong>RocksDB</strong>, SST files are immutable. Flink began persisting only newly created SSTs plus a manifest of referenced files, so recovery reuses prior files and downloads only deltas. This collapsed network traffic by orders of magnitude on large jobs.</p><h4>Unaligned checkpoints</h4><p>Barrier alignment became the next bottleneck: fast channels waited for slow ones. <strong>1.11</strong> introduced <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/">unaligned checkpoints</a>, which snapshot in-flight buffers as operator state. Alignment is no longer a prerequisite, snapshots grow slightly, but latency under backpressure drops dramatically.</p><h4>State changelog backend</h4><p><strong>1.15</strong> added a <a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/ops/state/state_backends/#enabling-changelog">changelog layer</a> that continuously logs state mutations (insert/update/delete) to remote storage. Periodic checkpoints reference changelog segments, recovery restores the base snapshot and replays to the last acknowledged sequence number, thus turning fault tolerance from a bursty event into an ongoing process.</p><h4>Disaggregated state backend</h4><p>In <strong>2.0+</strong>, Flink introduced <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855"><strong>disaggregated state</strong></a>: the authoritative copy of state lives on remote storage, local disks act as an optional cache. Compute and storage scale independently, checkpoints mostly write metadata (offsets/pointers) rather than duplicating data files. Recovery remaps operators to existing remote state fragments instead of moving large snapshots. <br>It was paired with another feature, an important step toward a much more interesting conceptual model of execution.</p><h4>Async execution model</h4><p>Flink 2.0 shifts from<strong> </strong>potentially<strong> blocking</strong> <strong>barrier alignment</strong> to some form of <strong>continuous progress tracking</strong>.<br>The new <strong>asynchronous execution framework</strong> (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model">FLIP-425</a>) breaks a record’s life into futures and callbacks executed on the <strong>mailbox</strong> thread, preserving single-threaded semantics while overlapping computation and I/O.<br>A <strong>KeyAccountingUnit</strong> keeps per-key order, an <strong>EpochManager</strong> tracks when all records in a watermark epoch finish so time can advance without waiting for every operator.<br>Checkpoints drain only bounded in-flight work and persist metadata, that is why it’s essential for <strong>disaggregated state</strong>, where recovery remaps operators to remote fragments instead of replaying full snapshots.</p><p>Building on this, <strong>Key-Ordered Async Lookup Join</strong> (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-519:++Introduce+async+lookup+key+ordered+mode">FLIP-519</a>) enforces sequential processing per key but parallelism across keys.<br>Together they partially replace coordinated pauses with incremental per key movement, effectively allowing to adjust to a runtime that doesn’t freeze for consistency, but <em>flows forward through it.</em></p><figure><img alt="A horizontal timeline labeled “Flink Optimizations.” Markers: Blocking Snapshots; Async &amp; Incremental Snapshots; Unaligned Checkpoints; Changelog Backend; Disaggregated State. Each marker has a short description explaining the benefit (async background serialization, incremental deltas, snapshotting in-flight buffers, state changelog with metadata commits, and remote-first state with metadata-only checkpoints)." src="https://cdn-images-1.medium.com/max/1024/1*WZb9M3-WYPXt8Lslwk2-gw.png" /><figcaption>Flink’s decade of checkpoint engineering: from stop-the-world snapshots to remote-first, metadata-driven recovery. Each milestone lowers a different cost of remembering: latency, I/O, alignment or coordination.</figcaption></figure><p>Each innovation reduced the cost of remembering, therefore snapshots became faster, lighter and more continuous.<br>Yet the fundamental motion remained the same: barriers marking time, operators freezing briefly, memory being externalised just long enough to catch up with the flow.</p><h3>Lookup for my state</h3><p>Each optimisation pushed checkpointing toward the edges: async I/O moved it off the critical path, incremental snapshots reduced what needed copying, changelog backends made it continuous, disaggregated state turned it into metadata. But state itself still lived <em>inside</em> Flink’s execution graph, owned by operators, versioned by barriers, recovered through checkpoint restoration.</p><p>The next step wasn’t to make snapshots faster. It was to make them <strong>optional for part of the pipeline</strong>.</p><p>What if lookup state didn’t need checkpointing at all? What if one operator’s state could be another job’s durable source, say a shared table that multiple pipelines read without duplication, where recovery meant re-attaching rather than restoring?</p><p>That’s one of the idea behind <a href="https://fluss.apache.org/"><strong>Fluss</strong></a>: externalizing lookup state into a versioned, shared log-structured table managed outside Flink’s lifecycle.</p><figure><img alt="Comparison of regular join vs DeltaJoin. On the left, two Kafka streams feed into a regular join operator that keeps left and right state internally, leading to large RocksDB state. On the right, DeltaJoin receives changelogs from Fluss, performs index lookups instead of maintaining state, and avoids holding left and right state inside the operator." src="https://cdn-images-1.medium.com/max/1024/1*Q2GlJzF7C0TDJ9t7kF1WTA.png" /><figcaption>Classical stream-stream lookup example: Regular join vs DeltaJoin externalizing history into Fluss. Instead of hoarding state, each side emits a changelog and probes the index on demand.</figcaption></figure><p>Computational state as running aggregations, windowed buffers transforms with every record. It must be operator-owned because it’s computation-specific. But <strong>lookup state is referential</strong>: dimension tables queried by multiple jobs, enrichment data that doesn’t compute, just matches.</p><p>With <strong>Fluss</strong>, lookup state externalizes entirely. It’s not operator-owned RocksDB on remote storage: it’s a <strong>shared, versioned table</strong> managed by a separate system. When a job performs a lookup join, it opens a Fluss snapshot ID, maintains a local cache and issues remote reads if needed.</p><p>The lookup state <strong>never enters Flink’s checkpoint model</strong>. Multiple jobs share the same table. Recovery means re-opening the table, not remapping operator state.</p><p>Barriers still coordinate exactly-once semantics across the pipeline. But lookup state no longer participates in checkpoint coordination — it lives outside the river entirely.</p><h3>Time as Memory</h3><p>If Flink taught streams how to remember, these newer systems teach them <strong>when</strong> to remember.<br>They stop synchronising work around barriers and start reasoning about <strong>progress</strong> — about what it means for time itself to move forward inside a distributed graph.</p><p>Where Flink builds global coordination on explicit messages (barriers), systems like <strong>Timely Dataflow</strong> and its descendants build <strong>implicit synchrony</strong> out of partial order and causality.</p><h4>Timely Dataflow: progress as protocol</h4><p>In <a href="https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43546.pdf">Timely Dataflow</a>, the basic unit of control isn’t a checkpoint: it’s a <strong>timestamp</strong>.<br>Every record carries a logical time — a structured timestamps clock describing its epoch and nested loop iteration.<br>Instead of global barriers, Timely’s runtime continuously exchanges small <strong>progress messages</strong> between operators.<br>Each message announces the creation or completion of events at a particular timestamp, effectively incrementing or decrementing a counter in a distributed ledger of time.</p><p>From these messages, every operator constructs a <strong>frontier</strong>: the minimal set of timestamps that may still produce output.<br>In formal terms, it’s an <em>antichain: </em>a set of times such that no element is less than another in the partial order.<br>When the frontier advances beyond time t, the operator knows that no future input ≤ t can arrive through any path in the dataflow.<br>At that instant, all computation for time t is <em>sealed</em>.</p><p>Operators can then compact state, aggregate results or emit finalized outputs, all without a coordinator.<br>Each dataflow shard progresses at its own pace, yet global consistency emerges naturally from the intersection of local frontiers.<br>There are no checkpoints, no distributed barriers, no backpressure from alignment.<br>The only coordination is informational: “I’m done with everything before this point.”</p><figure><img alt="A diagram showing two operators exchanging bidirectional “progress messages.” Each operator tracks a set of active timestamps (its local frontier). As progress messages arrive, the frontier advances, signifying that all work up to time t is done. Below, a table contrasts Flink’s barrier-based checkpointing with Timely’s progress-based frontiers." src="https://cdn-images-1.medium.com/max/1024/1*ps5zxocnHLfDkq-a8aaVFw.png" /><figcaption>Timely Dataflow conceptually: time itself becomes the coordinator. Operators exchange lightweight progress messages, constructing distributed frontiers that define which timestamps are complete. Once a frontier passes time t, all work for t is sealed: no checkpoints, no global barriers, only causality.</figcaption></figure><p>When operators are deterministic (<strong>pure functions of data and timestamp</strong>), recovery becomes a <strong>replay</strong> problem, not a restore problem.<br>Reinject the durable input streams and the same frontiers advance in the same order, producing identical results.<br>Flink externalizes state to guarantee safety, on the other side Timely internalizes causality to achieve it.</p><h4>Differential Dataflow: memory as difference</h4><p>Sitting atop Timely, <a href="https://www.cidrdb.org/cidr2013/Papers/CIDR13_Paper111.pdf"><strong>Differential Dataflow</strong></a> redefines what persistence means.<br>Instead of materializing entire states, it represents data as a time-indexed collection of <em>differences</em> as insertions, retractions and updates annotated with timestamps.<br>Each record is stored as a triple (data, time, diff), where diff expresses the weight or multiplicity of that change (e.g., +1 for an addition, –1 for a deletion).</p><p>Operators don’t rebuild results when inputs change, they simply propagate new differences through the graph.<br>Every operator maintains one or more <strong>arrangements</strong>, effectively<strong> </strong>compact, partially ordered indexes of these triples.<br>As frontiers advance, old differences whose effects are no longer visible are consolidated or discarded.<br>The result is a continuously compacting view of the world: new deltas flow in, old ones collapse into summarized state and no explicit snapshot is ever taken.</p><p>Given deterministic operators and a durable stream of (data, time, diff) events, recomputing the output is guaranteed to produce the same trace.<br>Persistence, when used, is purely an optimization, as the system can checkpoint its arrangements to disk to avoid full replay, but correctness doesn’t depend on it.</p><p>This is the inversion of checkpointing logic.<br>Instead of freezing a full image and resuming from it, Differential’s traces are <strong>self-stabilizing</strong>: their compactness grows as time advances.</p><h4>Feldera: declarativity on top of determinism</h4><p><a href="https://www.feldera.com/"><strong>Feldera</strong></a> reimagines streaming computation through <a href="https://www.vldb.org/pvldb/vol16/p1601-budiu.pdf"><strong>DBSP</strong></a><strong>: </strong>a synchronous model where time behaves like database transactions. Unlike Timely Dataflow’s timestamp-labeled events that can arrive out of order, DBSP enforces a single logical clock that partitions continuous time into discrete steps. Each step collects all input changes across every stream, computes incrementally, and emits one batch of output changes. There are no frontiers to track and no runtime out-of-order corrections to reconcile — just a strict alternation between input and output synchronized by causality.<br>Relations aren’t tables, but functions of logical time whose contents at step <em>t</em> depend only on inputs from steps 1…<em>t</em>.</p><p>This synchronous guarantee means correctness doesn’t depend on distributed snapshots and determinism ensures the same state can always be replayed from inputs. When a node fails, recovery simply replays the exact sequence of input batches from the last checkpoint. Checkpoints still exist for speed, capturing intermediate operator state to avoid full replay, but determinism itself guarantees consistency.</p><p>The cost is architectural: DBSP trades Timely’s flexible concurrency for transactional simplicity, making every logical clock tick an atomic boundary where all changes settle before time advances.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*CjZbyx6J7Mth7lIzsT3n-g.png" /><figcaption>The evolution of streaming memory: Each inward spiral represents a decade of optimization: from Chandy-Lamport’s markers to Flink’s decade of optimising coordination along with Timely’s frontiers curling tight. But what is in the Eye of the Storm?</figcaption></figure><h3>Uncomfortable question</h3><p>Streaming computations have evolved into two distinct paradigms and it would be blind not to notice that Flink’s decade of dominance faces a genuine challenge from a fundamentally cleaner computational model.</p><p>The operator-coordinated approach, mastered by Flink, has perfected the art of distributed coordination. Every innovation as async snapshots, incremental writes, disaggregated state has made checkpointing nearly invisible. This is remarkable engineering. Flink solved the impossible: making a distributed system remember itself without stopping, powering the world’s largest data infrastructures reliably, at scale.</p><p>But the progress-based model, pioneered by Timely and its descendants, asks a different question entirely. Instead of perfecting coordination, what if we eliminated it? No barriers, no synchronized snapshots, but operators tracking time frontiers, computing when causality permits. It’s not an optimization but a different computational philosophy.</p><p>Flink has invested deeply in its checkpoint protocol, embedded throughout the network stack, state backends, recovery logic. This isn’t a weakness, but the natural result of solving real production problems for a decade. Although it does mean adopting progress-based semantics would require rebuilding from scratch, not evolution.</p><p>Progress-based systems started with different foundations. They achieve exactly-once guarantees <strong>for incremental computation</strong> without barrier coordination: younger, yes, but unburdened by legacy decisions and<strong> </strong>unburdened by Flink’s requirement to handle<strong> non-deterministic operations</strong></p><p>The parallel to CISC versus RISC is tempting: complex instruction sets, perfected over decades, eventually challenged by simpler primitives that achieved the same results more elegantly. CISC processors weren’t wrong and they were exactly what we needed until we discovered <strong>different domains demanded different trade-offs</strong>. CISC persisted where compatibility mattered andRISC dominated where efficiency was critical. Streaming may follow the same path: checkpoint-based for general streaming, progress-based for incremental computation.</p><p>Let’s be crystal clear here: streaming systems are <strong>specializing</strong>, not replacing each other. Checkpoint-coordinated models excel at <strong>arbitrary stateful computation </strong>in messy production environments. Progress-tracked models excel at <strong>incremental computation with structured time</strong>. Both will coexist because they solve fundamentally different problems. But it would be silly to neglect the conceptual shift and elegance offered by newer models for workloads where Flink’s coordination machinery feels like overkill.</p><blockquote>The river is choosing a new course and the question arises: did we spend decades making checkpoints invisible, only to discover we could have made them unnecessary for the most useful type of computations?</blockquote><p>The answer might reshape not just how we build stream processors, but how we think about distributed time itself.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=d4e5d0889205" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/freeze-the-rivers-flow-evolution-of-streaming-computation-models-d4e5d0889205">Freeze the River’s Flow: Evolution of Streaming Computation Models</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Iceberg CDC: Stream a Little Dream of Me]]></title>
            <link>https://medium.com/fresha-data-engineering/iceberg-cdc-stream-a-little-dream-of-me-a7c9f9e6e11d?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/a7c9f9e6e11d</guid>
            <category><![CDATA[big-data]]></category>
            <category><![CDATA[apache-iceberg]]></category>
            <category><![CDATA[streaming]]></category>
            <category><![CDATA[technology]]></category>
            <category><![CDATA[data-lake]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Mon, 27 Oct 2025 15:05:41 GMT</pubDate>
            <atom:updated>2025-10-27T16:08:13.221Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*9kq00L18k7jhWLX4_s8z5Q.png" /><figcaption>Night freezes streams to a whisper: “I love you”</figcaption></figure><p>Another article with <strong>musical vibes. 😬</strong></p><p><a href="https://www.linkedin.com/in/ananthdurai/">Ananth Packkildurai</a> captured the mood perfectly in <em>“</em><a href="https://www.dataengineeringweekly.com/p/when-dimensions-change-too-fast-for"><em>When Dimensions Change Too Fast for Iceberg</em></a><em>”, </em>a track about what happens when real-time rhythm collides with immutable structure. <a href="https://iceberg.apache.org/">Iceberg</a> was written for steady tempos, not improvisation. This piece continues that melody: how v3 and v4 retune Iceberg’s metadata so streaming can finally play in time.</p><p>Streaming into Iceberg seems like it should be boring by now: a source emits <strong>inserts, updates, and deletes</strong>, then a sink writes them and queries see a <strong>fresh, consistent table.</strong> The friction starts the moment those updates are <strong>small, constant, and urgent.</strong></p><p>Iceberg’s superpower is <strong>immutability with time travel </strong>aka snapshots over append-only files. That same superpower turns every change into <strong>“add something new and teach readers to ignore something old.”</strong> Perfect for <strong>studio batch albums</strong>, twitchy with <strong>real-time improvisations.</strong></p><p>Most streaming writers don’t know where yesterday’s row physically lives. They take the easy contract with <strong>equality deletes</strong> that say <em>“drop any row where key = K.”</em> Throughput stays high, but <strong>readers inherit the tax</strong>: scan candidate files and re-apply predicates over and over. You can flip the trade-off with <strong>positional deletes</strong> or <a href="https://olake.io/blog/iceberg-delta-lake-delete-methods-comparison"><strong>deletion vectors</strong></a> and let readers skip exact row offsets, but then writers need a reliable way to find those offsets in real time. Without help, they can’t. That’s why <strong>upserts on Iceberg </strong>so often devolve into<strong> “cheap to write, expensive to read.”</strong></p><figure><img alt="Diagram comparing two delete methods in Iceberg-like tables. Left: the writer finds exact row positions and stores them in a positional deletes file; readers skip those rows, which depends on stable file layouts. Right: the writer records equality deletes such as “id = 1,” and readers filter via an anti-join. It shows why streaming writers rely on equality deletes — positional ones break after rewrites." src="https://cdn-images-1.medium.com/max/1024/1*6OEsKhksl0LL-ZVvP2dxlQ.png" /><figcaption><strong>Two paths for deleting data in Iceberg-like tables:</strong><br> <strong><em>Positional deletes</em></strong> remove rows by their <strong>physical location inside data files</strong>, while <strong><em>equality deletes</em></strong> filter rows <strong>by matching column values</strong>. <strong>Streaming</strong> sinks can only<strong> produce equality deletes</strong>, since positional deletes require <strong>stable file layouts</strong> and<strong> snapshot coordination</strong>.</figcaption></figure><p>There’s a second source of drag that’s quieter but just as important: <strong>identity and change detection.</strong> A clean change feed or a correct point-in-time join requires <strong>rows that remember who they are </strong>and a cheap way to say <strong>what changed between two moments.</strong> Iceberg <strong>V3</strong> moved the ball with <strong>row lineage</strong> and <strong>standardised delete semantics, </strong>so that updates stopped looking like arbitrary rewrites. But <strong>metadata still fanned out across many files</strong>, equality deletes still piled up, and anything spanning multiple tables remained awkward because each table ticks on its own clock.</p><p>The <a href="https://medium.com/data-engineering-with-dremio/the-state-of-apache-iceberg-v4-october-2025-edition-c186dc29b6f5"><strong>V4 proposals</strong></a> push on the part that hurts most in streaming: <strong>metadata.</strong> A single <strong>Root Manifest</strong> replaces sprawling lists, <strong>manifest-level delete vectors</strong> mark removals without rewriting children, and pruning lifts from <strong>rigid partition tuples</strong> into <strong>column and expression stats.</strong> “What changed” may be expressed at the <strong>top of the tree</strong> and discovered with <strong>tiny I/O.</strong> None of this bans equality deletes or conjures a built-in key index, it just makes <strong>small commits cheap</strong> and <strong>change planning explicit</strong>, which is exactly what streaming systems need.</p><p>The last mile isn’t in the table format at all, but <strong>orchestration.</strong> To make incremental joins reliable and upserts feel precise, you want a <strong>catalog that can hand out a global commit order</strong> and, where it’s truly needed, an optional way <strong>to map a hot key to its last position </strong>so writers can emit <strong>positional deletes on contact.</strong> With <strong>V3’s lineage</strong> and <strong>V4’s compact deltas</strong>, those services may shift from <strong>ad-hoc fixes to maintainable design</strong>.</p><p>The rest of this article unpacks how we got here, <strong>what V3 already fixed</strong>, <strong>what V4 plans to change in practice</strong>, <strong>what a catalog can add without becoming a database</strong>, and <strong>why CDC on Iceberg</strong> is finally drifting<strong> from hard to merely careful.</strong></p><h3>IceV3rg</h3><p><a href="https://iceberg.apache.org/spec/#version-3-extended-types-and-capabilities">Iceberg v3</a> made CDC <strong>possible without icy cold sweat </strong>by tightening two foundations: how we mark rows as gone, and how rows keep their identity as they move.</p><p>First, deletes got a shape that readers can exploit. In v2, a table could accumulate many small delete files per data file, forcing engines to juggle a pile of predicates. v3 formalized <strong>deletion vectors</strong> (bitmaps over row ordinals) and clarified how a reader targets them to the right files. When a DV is present, a scan can skip exact offsets instead of re-reading and filtering whole files. For batch merges that already know “the old row is in file F at position P,” this is gold, reads become surgical, and <strong>compaction can fold DVs back into clean base files.</strong> In streaming, that same precision is available <strong>if</strong> the writer can find positions, otherwise you still fall back to equality deletes because they’re the only contract a firehose can always satisfy. The main benefit is indirect, but conceptual: by localizing deletes, <strong>DVs shorten compaction cycles</strong> and <strong>narrow the gap</strong> between batch maintenance and real-time freshness.</p><figure><img alt="Diagram showing how a delete is encoded into a Roaring Bitmap, stored as an entry in a Puffin file, and linked to its Parquet data file. The manifest file references this Puffin entry, allowing readers to filter deleted rows during scans. The Puffin file acts as a compact sidecar storing deletion vectors, replacing traditional positional delete files." src="https://cdn-images-1.medium.com/max/1024/1*6HVtLIV-dqGniOwmrugl1A.png" /><figcaption><em>How Iceberg stores </em><strong><em>deletion vectors</em></strong><em>:</em> deletes are encoded as <strong>Roaring Bitmaps</strong>, written into<strong> Puffin file</strong>s, and <strong>linked through manifest entries</strong> so readers <strong>can filter</strong> Parquet rows <strong>without scanning separate delete files</strong>.</figcaption></figure><p>Second, v3 introduced <strong>row lineage</strong> so updates stop being lost in the churn of file-level rewrites. Each row gets a durable ID and a “last updated” sequence and when you rewrite a file for compaction, the IDs ride along unchanged, and when you truly change a row, its sequence ticks. That one idea removes two of CDC’s worst taxes: you <strong>no longer need an anti-join</strong> to deduplicate “final” versions, and you can tell an <strong>update from a rewrite</strong> without replaying history. It also lets a change feed show clean before/after pairs instead of ambiguous delete+insert shadows.</p><figure><img alt="Side-by-side diagram showing CDC before and after Iceberg row lineage. Left: two snapshots (S and T) are joined by business ID, then anti-joined or windowed to find CDC changes. Right: a single table with lineage filters rows by sequence range (S,T] and groups by row_id to produce CDC changes, showing a simpler flow." src="https://cdn-images-1.medium.com/max/1024/1*P17BN18IBDgW2Zskyyq4gA.png" /><figcaption>Before lineage, engines had to<strong> join two snapshot file lists by business ID and anti-join to filter duplicates</strong>.<br> After lineage, each<strong> row carries its own identity</strong> (_row_id) and <strong>sequence </strong>(_seq), allowing CDC queries to use <strong>simple filters over sequence ranges</strong> without additional joins.</figcaption></figure><p>Together, DVs and lineage let engines build snapshot-diff and change-log views that are correct and reasonably fast on a <strong>single table</strong>. You can ask “what changed between S and T,” get a small set of file tasks, and reconstruct updates without guessing. But v3 didn’t change two structural realities. Equality deletes remain the path of least resistance for streaming writers, so readers still inherit predicate cost unless you compact aggressively or have a way to issue positional deletes on the hot keys. And while a single table has a perfect, linear history, <strong>relations do not, </strong>so<strong> </strong>v3 doesn’t give you a global clock. <strong>Incremental joins</strong> across tables <strong>are still a balancing act</strong> performed by the catalog or the engine, not something the format solves for you.</p><p>So v3’s promise is clear: <strong>identity is stable, and deletes are expressible as precise masks</strong>. That’s enough to make CDC workable today with some tweaks. What it doesn’t do is make every<strong> micro-commit tiny</strong> or every “<strong>what changed?</strong>” <strong>query cheap to plan</strong>. That’s the part v4 aims squarely at.</p><h3>Iceberg V4</h3><p>V4 <a href="https://lists.apache.org/thread/x7688x5w25klodlfqmtsv80r1obyqqrz">plans to replace</a> the manifest-list fan-out with one “<strong>Root Manifest</strong>” per snapshot. That file directly records the <strong>delta</strong>: new/removed files, data-file deletion vectors (DVs), equality deletes, and <em>manifest</em> DVs (DVs that act on leaf manifests). The aim is to make both writing and <em>reading</em> metadata proportional to the size of the change, which is exactly what a CDC reader needs when polling for “what changed since X.”</p><p>Because the Root Manifest can carry <strong>MDVs that mark adds/removes inside leaf manifests</strong>, a reader can do change detection <em>at the root</em>: take the previous root you cached, read the new root, and compute added/removed entries by interpreting MDVs (either as a replacement DV or as a smaller “diff DV,” both patterns are discussed with storage/plan-time trade-offs). Practically, that means a CDC plan can be built from a tiny set of byte ranges rather than re-listing and re-parsing many manifests.</p><figure><img alt="Diagram showing Iceberg V4 snapshot structure. Snapshot S points to a Root Manifest that lists “Manifest M1 Added,” “Data D5 Added,” and “Manifest DV DV1 Added.” The Root Manifest links to the new data file D5, the manifest M1 containing entries D1–D3, and the manifest delete vector DV1 that applies deletions to M1. A CDC reader compares successive Root Manifests and uses DV1 to compute added and removed files efficiently." src="https://cdn-images-1.medium.com/max/1024/1*JRcpO7OAWbBoQLZNPrMuYQ.png" /><figcaption>In <strong>Iceberg V4</strong>, each snapshot will have a single <strong>Root Manifest </strong>listing everything changed in that commit:new <strong>manifests, new data files, and any manifest-level delete vectors</strong> (MDVs). The <strong>Root Manifest</strong> points to both the new<strong> data file D5</strong> and <strong>Manifest M1</strong>, while <strong>Manifest DV1</strong> records row-level removals <strong>inside M1</strong>. To detect deltas, a CDC reader just <strong>diffs the previous and current Root Manifest and applies MDVs</strong> and no need to re-scan all manifests</figcaption></figure><p>In V4, a manifest row is a generic <em>content entry</em> that can be <strong>a data file</strong>, <strong>a DV (data or manifest), an equality-delete file, or a leaf (data/delete) manifest</strong>. The schema is cleaned up and stats are standardized, still equality deletes remain first-class (note the carried-over equality_ids). This is important: CDC doesn’t <em>ban</em> equality deletes, it just gains better metadata to read around them.</p><figure><img alt="Side-by-side comparison of Iceberg V3 and V4 manifest entries. Left box “Iceberg Old Way” shows a manifest entry with fields partition_spec_id: 7 and partition_tuple: {country=UK, dt=2025–10–27}. Right box “Iceberg V4 Proposed” removes those and instead lists column bounds: country.lower=”US”, country.upper=”US”, dt.lower=2025–10–27, dt.upper=2025–10–27, plus a derived stat bucket(user_id)=3. V4 expresses partition scope via bounds and stats rather than fixed tuples." src="https://cdn-images-1.medium.com/max/1024/1*Favpp66Jht237Ckb2BCEWQ.png" /><figcaption><strong>Iceberg V4</strong> proposed way simplifies manifest metadata by <strong>removing the explicit partition tuple and spec ID</strong>. Instead, each entry <strong>records column-level lower and upper bounds</strong> (and derived stats for transforms like bucket or hour). If bounds are exact, <strong>readers can reconstruct the partition,</strong> otherwise, they treat the file as range-spanning. This<strong> shrinks metadata and lets mixed partitioned files share one manifest</strong>.</figcaption></figure><p>A big design thread in V4 is to <em>decouple manifests from partition specs</em>. The proposal shows multiple approaches, with the lead one removing the materialized partition tuple and relying on <strong>lower/upper bounds</strong> (including stats for derived expressions like bucket() or hours()). For equality-delete matching, readers reconstruct the needed partition values from stats, <em>but only if the bounds are exact </em>and fail if lower != upper for a partitioned expression. Writers, in turn, must produce complete (un-truncated) stats for identity-partitioned fields. This shrinks metadata and lets differently partitioned files live in the same manifest, but shifts some vigilance to writers/readers.</p><figure><img alt="A table comparing how Iceberg V4 readers infer partitions from column bounds. It has five columns: file name, lower and upper bounds for country and dt, and inferred partition. File f1 shows matching bounds (UK, 27 Oct) → inferred as (UK, 27 Oct); f2 has mixed date bounds (27–28 Oct) → marked as “unknown”; f3 has consistent values (DE, 28 Oct) → inferred as (DE, 28 Oct). Demonstrates how equality of bounds determines whether a partition can be reconstructed." src="https://cdn-images-1.medium.com/max/1024/1*_fDnZY289I-IPfJ23Lp1UQ.png" /><figcaption>A CDC reader can <strong>prune f1 and f3 precisely</strong> but must <strong>keep f2</strong> because it spans a range.</figcaption></figure><p>If you’d rather keep the tuple, there are alternative designs (<strong>union-tuple schema</strong> or a<strong> hybrid</strong> where identity partitions carry an explicit partition_value alongside bounds), each with storage or complexity trade-offs. The point: V4’s column-stats direction gives CDC planners enough information to prune and to match deletes without pinning manifests to a single spec.</p><p><strong>What this means for streaming CDC:</strong></p><ul><li><strong>Lighter polls:</strong> Engines can cache the last Root Manifest, fetch the next one, and compute added/removed files and DV deltas without re-walking the tree. That takes a meaningful chunk out of commit-to-visibility latency for CDC consumers.</li><li><strong>Better DV hygiene:</strong> The proposal details how follow-on deletes can either <em>replace</em> a prior DV or append a smaller “diff DV”: both patterns keep the root small and make change detection algebraic (subtract prior DV and apply delta). This is friendlier to incremental readers and aligns with CDC’s “tell me just the rows that flipped” planning.</li><li><strong>Equality deletes are still here on purpose.</strong> They’re retained in V4 (see equality_ids) because many streaming writers still rely on them for simple upserts. V4 doesn’t magically turn them into positional deletes, but it <em>does</em> make equality-heavy snapshots cheaper to <em>discover</em> and plan. If you wants positional efficiency, you can pair V3/V4 <strong>row lineage</strong> (immutable row IDs/sequence) with an index or periodic compaction to produce DVs, as V4’s metadata just lowers the overhead around those operations.</li></ul><p>How MDVs are stored (inline vs <a href="https://iceberg.apache.org/puffin-spec/">Puffin</a>), whether to declare optional data↔delete-manifest “affinity” to skip an extra reconciliation pass, and the exact rules for “change detection at the root” are explicitly called out as design levers. Their eventual choices will shape how much work a CDC client does per poll and how cache-friendly the path is.</p><p>Net-net: V4 doesn’t “solve CDC” by itself, deletes will still be equality-scoped, and writers may still choose <a href="https://estuary.dev/blog/apache-iceberg-cow-vs-mor/">COW vs MOR</a>, but it <strong>collapses metadata amplification</strong>, makes <strong>incremental planning a root-level operation</strong>, and <strong>loosens partition-schema coupling</strong> so the write path can evolve without hurting incremental readers. That’s exactly the kind of groundwork CDC needs.</p><h3>Catalogs with enough brains</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/908/1*B1bcPD2fCdaWfg85exZCNQ.png" /></figure><p>Iceberg plans to get you <em>row identity</em> (v3) and <em>cheap deltas</em> (v4).</p><p>The last mile is orchestration. Catalogs like <a href="https://docs.lakekeeper.io/">Lakekeeper </a>sit at the only place that sees every commit, across every table and branch. Two capabilities turn “possible” CDC into “pleasant” CDC: a <strong>global timeline</strong> and <strong>compact, byte-range deltas, </strong>plus an <em>optional</em> key locator for hot upserts. Here’s the shape of that without turning a catalog into a database.</p><p>A single table already has an ordered snapshot history, but multi-table relations don’t. Incremental joins, point-in-time pipelines, and multi-table change feeds all need one clock.</p><ul><li><strong>Per-branch sequence numbers.</strong> On each successful commit, the catalog stamps a strictly increasing <em>catalog sequence</em> for that branch (table name included in the record).</li><li><strong>Branch-aware reads.</strong> Readers can ask, “give me everything after seq=N on branch main,” then plan in snapshot order across tables. This is essential for correct <em>incremental joins</em> and <em>cross-table CDC</em>.</li></ul><pre>GET /v1/branches/{branch}/changes?after_seq=N<br>→ [<br>  {seq, table_id, snapshot_id, root_uri, mdv_refs[], op, ts},<br>  ...<br>]</pre><p>No re-listing of storage, no guessing, it’s the catalog that hands you the minimal set of things that changed.</p><p>V4 concentrates change detection in the <strong>Root Manifest</strong> and its <strong>manifest delete vectors</strong>. The best thing a catalog can do is expose exactly those byte ranges.</p><ul><li><strong>What to return:</strong> the new root’s URI (and byte range if the store supports it) plus any external MDV references.</li><li><strong>Why it matters:</strong> engines can diff the previous root they cached with the new one and apply MDVs to identify added/removed leaf entries — <em>without</em> walking leaf manifests or touching data files. That keeps commit-to-visibility latency low for CDC consumers.</li></ul><p>Readers thrive on stable IDs. Catalogs can make engines fast by standardizing cache keys.</p><ul><li><strong>Content-addressed roots/manifests.</strong> Use immutable IDs (e.g., content hash + ETag + length) for all returned artifacts so engines can deduplicate and persist safely.</li><li><strong>Coherent eviction.</strong> When a compaction job lands, the change feed is the single point that invalidates caches, everything else remains cold.</li></ul><figure><img alt="Diagram showing Engine → Catalog → Object Store data flow for ETag-based CDC. The engine compares its cached ETag with the one returned by the catalog. If unchanged, it skips work; if different, it downloads only the new root manifest from the object store, diffs it to find changes, and emits CDC events. ETags act as content hashes identifying each root manifest and enabling fast, cache-safe change detection." src="https://cdn-images-1.medium.com/max/1024/1*IidydD_HJULsSVvLSGettQ.png" /><figcaption>The catalog exposes each<strong> root manifest with its URI, ETag, and length</strong>. The engine compares the<strong> new ETag to the cached one</strong>: if identical, it stays idle, if<strong> changed, it means a new commit</strong>. The engine <strong>fetches the new root, diffs it with the previous one, and emits CDC</strong> events , byte-precise change detection.</figcaption></figure><h4>Optional concept: a PK locator for hot upserts</h4><p>Not required for correctness, but invaluable where upserts churn. Also it’s not only catalog changes for sure, requires changes in other parts as well, so it’s more on a side of a wishful thinking for future.</p><ul><li><strong>What it is:</strong> A tiny <a href="https://en.wikipedia.org/wiki/Log-structured_merge-tree">LSM</a>/KV that tails <strong>Iceberg table metadata (snapshots/manifests/DVs)</strong> and your <strong>compactors lineage feed</strong> and maintains key → (file_uri, row_position, seq).</li><li><strong>How it’s used:</strong> streaming sinks look up a key, then if found, they emit a <strong>positional delete</strong> <strong>(or DV)</strong> before appending the new row. Readers stay cheap even when keys flip often.</li><li><strong>Why optional:</strong> most tables don’t need it, keeping it out of core catalog avoids heavy SLAs and lock-in. Run it as a sidecar or plugin.</li></ul><p><strong>Lookup sketch</strong></p><pre>POST /v1/tables/{table}/pk-locator/lookup<br>→ {hit, file_uri, row_position, seq}</pre><p><strong>Correctness under rewrites:</strong> when compaction rewrites files or MDVs replace older ones, it patches the mapping. Writers always have a safe fallback: emit an equality delete if a lookup misses or races compaction.</p><figure><img alt="The catalog PK locator tails table metadata to maintain a small map of recent keys to their last known file and row position. Streaming sinks query it to emit positional deletes on contact, and compaction jobs patch it when files are rewritten. It stays lightweight: no separate store, no strict consistency, just advisory metadata for faster CDC." src="https://cdn-images-1.medium.com/max/1024/1*j4FZe5D_0yLnZvYe_h2q5A.png" /><figcaption>The catalog PK locator <strong>tails table metadata </strong>to maintain a small <strong>map of recent keys to their last known file and row position</strong>. Streaming sinks <strong>query it to emit positional deletes on contact</strong>, and compaction jobs <strong>patch it when files are rewritten</strong>. It stays lightweight: <strong>no separate store, no strict consistency</strong>, just advisory metadata for <strong>faster CDC</strong>.</figcaption></figure><ul><li><strong>Scope and privacy.</strong> A PK locator stores <em>pointers</em>, not values. If your compliance model is strict, keep it table-scoped or partition-scoped and encrypt.</li><li><strong>Degradation path.</strong> If the locator is down, writers fall back to equality deletes. Readers remain correct.</li></ul><p>This idea isn’t entirely hypothetical. Projects like <a href="https://github.com/Mooncake-Labs/moonlink"><strong>Moonlink</strong></a> already prototype an external indexing layer that tails Iceberg metadata on write path and maintains a live map of key → (file, row position). That lets writers emit <strong>positional deletes</strong> in real time instead of equality ones, shifting cost from readers to writers.<br> The catalog-side PK locator sketched here follows the same spirit, just a lighter, cooperative service rather than a separate database, using the best effort approach.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*eB_ovoxzo0hWCcSGQ3XCsw.png" /><figcaption>Tomorrow?</figcaption></figure><p><strong>What this buys you tomorrow</strong></p><ul><li>Engines poll a <strong>tiny delta</strong> (root + MDVs), plan from byte ranges, and touch only the files that changed.</li><li>Streaming writers remain simple, hot paths can be precise via a lookup.</li><li>Incremental joins become routine because commits across tables share a <strong>monotonic order</strong>.</li></ul><p>That’s the difference between “CDC possible” and “CDC pleasant.” The format gives you identity and compact deltas, the catalog supplies time and targeting, so just enough brains to keep the lake feeling live, without turning it into a database.</p><p><strong>Disclaimer:</strong></p><blockquote>Some of the mechanics described here are simplified to keep the focus on the bigger picture. The goal here isn’t to capture every edge case, but to sketch how these ideas connect and why they matter for streaming into Iceberg.</blockquote><figure><img alt="" src="https://cdn-images-1.medium.com/max/920/1*QR7sgM2c1L7RNSFMfG_mLA.png" /><figcaption>I love bad jokes, so also about tomorrow :)</figcaption></figure><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=a7c9f9e6e11d" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/iceberg-cdc-stream-a-little-dream-of-me-a7c9f9e6e11d">Iceberg CDC: Stream a Little Dream of Me</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[The Good, The Bad and The AutoMQ]]></title>
            <link>https://medium.com/fresha-data-engineering/the-good-the-bad-and-the-automq-5aa7a8748e71?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/5aa7a8748e71</guid>
            <category><![CDATA[data]]></category>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[distributed-systems]]></category>
            <category><![CDATA[technology]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Thu, 16 Oct 2025 05:56:53 GMT</pubDate>
            <atom:updated>2025-10-16T06:07:14.416Z</atom:updated>
            <content:encoded><![CDATA[<p>Diskless Kafka Western</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*x9n45XlfWcsYdt_HwTMm7A.png" /></figure><h3>The Wild West of Diskless Kafka</h3><p>Three gunslingers ride into the same town at dusk, each promising cheaper streams and quieter nights on call.<br>The town is <a href="https://kafka.apache.org/">Apache Kafka</a> and the fight is over where your bytes live and how much you pay to keep them moving.</p><figure><img alt="Diagram of classic Apache Kafka architecture showing producers writing to a leader broker, which replicates data to follower brokers, each maintaining its own local log segment on disk." src="https://cdn-images-1.medium.com/max/1024/1*x6acxNWWI9xbvqOyBiEPkQ.png" /><figcaption>Traditional Apache Kafka: each broker stores full local replicas of the partition log on its own disk.<br> Producers write to the <strong>leader</strong>, which replicates to <strong>followers</strong> across availability zones. Durability = replication.factor (typically 3).Reads come from <strong>page cache</strong>, not from shared storage.</figcaption></figure><p>For almost fifteen years, Kafka’s shared-nothing, disk-first design has delivered the goods: fast local writes, ISR replication for durability, and page-cache-powered reads. But the cloud changed the terrain. Object stores got faster (and cheaper), cross-AZ egress stayed pricey, and operators began asking a blunt question:</p><blockquote><em>Why are we triple-mirroring every hot byte when the object store already handles durability?</em></blockquote><p>The result is a new frontier <strong>diskless Kafka</strong>, or more broadly, <strong>shared-storage Kafka</strong>. Depending on who you ask, it either trims replication costs without rocking the boat… or throws out the boat entirely and builds a railroad.</p><p>This story has three main characters:</p><ul><li><strong>The Good — </strong><a href="https://slack.com/"><strong>Slack</strong></a><strong>’s </strong><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1176%3A%2BTiered%2BStorage%2Bfor%2BActive%2BLog%2BSegment"><strong>KIP-1176</strong></a><strong> (Fast Tiering):</strong> extends <a href="https://aiven.io/blog/apache-kafka-tiered-storage-in-depth-how-writes-and-metadata-flow?utm_source=chatgpt.com">Tiered Storage</a> to include the <strong>active</strong> write-ahead log. Leaders push fresh bytes to a fast local object store (<a href="https://aws.amazon.com/s3/storage-classes/express-one-zone/">S3 Express One Zone</a> or EBS), followers <strong>pull from the store</strong> instead of across AZs. Page cache stays king, acks = 1 remains instant, and Slack claims about 40 percent cost savings by cutting cross-AZ traffic. It’s pragmatic, <a href="https://aws.amazon.com/?nc2=h_home&amp;refid=649dc681-acce-453b-98f9-60db0009f901">AWS</a>-fluent, and close to merge-ready.</li><li><strong>The Bad —</strong><a href="https://aiven.io/"><strong> Aiven</strong></a><strong>’s </strong><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1150%3A+Diskless+Topics"><strong>KIP-1150</strong></a><strong> (Diskless Topics):</strong> “Bad” as in bold. Writes go straight to object storage brokers become compute only. The payoff is radical elasticity and up to 90 percent lower storage cost but at the price of re-architecting replication, transactions, and caching. Aiven’s newer <em>Diskless 2.0</em> merges this path with Tiered Storage, offering a single zero-copy format and reversible topic migration.</li><li><strong>The </strong><a href="https://www.automq.com/"><strong>AutoMQ</strong></a><strong> —</strong><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1183%3A+Unified+Shared+Storage"><strong> KIP-1183</strong></a><strong> (Unified Shared Storage):</strong> instead of choosing sides, AutoMQ proposes an abstraction layer: AbstractLog, AbstractLogSegment, and an optional Stream API for S3, HDFS, or NFS. It refactors Kafka’s core so local and shared architectures can coexist under one contract. Visionary and vendor-neutral, though based heavily on the already existing implementation if the community can stomach another layer of indirection.</li></ul><p>Each approach redraws Kafka’s durability contract in its own way.<br>The next sections follow their trails, first the good, then the bad, and finally the AutoMQ.<br> Saddle up.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/818/1*hZQiNngl-NdC4X3euyfhOw.png" /></figure><h3>The Good, Slack’s KIP-1176: Fast Tiering Done Right</h3><p>If Aiven’s diskless dream wants to rebuild the town from scratch, Slack’s proposal is the cautious sheriff: keep the rules, clean up the streets, and save everyone a fortune on bullets.</p><h4>The problem it rides in to solve</h4><p>Tiered Storage (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage">KIP-405</a>) already let Kafka move <strong>old, closed</strong> segments to object storage.</p><p>But the expensive part of any Kafka bill isn’t cold history, but the hot traffic:three brokers in three AZs endlessly mirroring every new byte.<br> Slack’s internal metrics showed that cross-AZ replication alone could eat <strong>50–60 % of cluster cost</strong>.</p><p>So KIP-1176 extends the tiered idea to the <strong>active log</strong>, the write-ahead stream still forming under producers’ hands.<br>Instead of sending replicas across zones, the leader <strong>uploads those fresh bytes to a fast object store</strong> such as <strong>S3 Express One Zone (S3E1Z)</strong> or <a href="https://aws.amazon.com/ebs/"><strong>EBS</strong></a>.<br>Followers don’t fetch from the leader anymore, they fetch from the same storage bucket.</p><h4>Architecture</h4><p>Think of it as inserting an <em>object-store hop</em> between the leader and its replicas:</p><pre>Producer → Leader (append to page cache)<br>        → Fast object store (S3E1Z/EBS)<br>        → Followers read from object store</pre><p>Kafka’s RemoteWalStorageManager handles these uploads and downloads in the background.<br>Every few milliseconds it batches 300-500 KB from multiple partitions, combines them into a single WAL object, and publishes metadata into __remote_wal_log_metadata.<br>Followers read that metadata, know which byte range to fetch, and stay in sync without ever touching another AZ.</p><p>Because the copy happens asynchronously, producers using <strong>acks = 1</strong> see <strong>identical latency</strong>.<br>For <strong>acks = all</strong>, the extra hop through the object store adds only a few milliseconds.<br>Under failure, the data-loss window roughly doubles (from ≈ 10 ms to ≈ 20 ms) and still acceptable for most real-time workloads.</p><figure><img alt="The leader appends records locally and uploads batches to a fast-tier object store. Followers read those segments from the shared store, maintaining ISR for coordination. Consumers read from brokers as usual. The flow illustrates Slack’s KIP-1176 design — replicating active data through fast object storage rather than direct broker-to-broker traffic." src="https://cdn-images-1.medium.com/max/1024/1*iDFLgxaQII-p5cH72F6yfQ.png" /><figcaption><em>The leader </em><strong><em>writes locally</em></strong><em> and </em><strong><em>offloads active segments</em></strong><em> to a fast object store (e.g., S3 Express). Followers fetch these slices</em><strong><em> from storage instead of cross-AZ replication</em></strong><em>, cutting latency and data-transfer costs while </em><strong><em>keeping standard ISR</em></strong><em> semantics.</em></figcaption></figure><h4>Performance and cost</h4><p>Slack’s benchmark claims the same throughput and almost identical latency for the acks = 1 path, while reducing broker footprint and AZ traffic by about <strong>43 %</strong>.<br>The magic lies in using a “fast tier” store that’s internally replicated, so Kafka’s replication factor can safely drop from 3 to 2 or even 1 for certain topics.<br>Page cache remains the performance anchor and with this Kafka’s read path doesn’t change.</p><p>Compared to the radical KIP-1150 “diskless” approach, KIP-1176 keeps brokers <em>stateful,</em> but<em> </em>they still have local segments and page cache, and that makes that state <strong>cheap to maintain</strong>.</p><h4>Failure handling</h4><p>Slack’s authors were careful to model full-AZ outages.<br>If both the leader and its co-located S3E1Z bucket disappear, Kafka simply elects a new leader in another zone, followers truncate or catch up exactly as in the classic model.<br>Durability and semantics remain unchanged with only the transport layer shifted.In other words, this KIP doesn’t challenge Kafka’s architecture, it <strong>optimises its plumbing</strong>.</p><h4>Why it‘s Good(pun intended)</h4><p>What makes 1176 “good” is its restraint.It reuses KIP-405 <a href="https://kafka.apache.org/36/javadoc/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.html">classes</a>, appending “WAL” variants RemoteWalStorageManager,RLMWalTask,RemoteWalCombinedLogSegmentMetadatarather than inventing a parallel subsystem.The design keeps compatibility with existing tiered storage plugins, Iceberg integrations, and Kafka’s consumer expectations.</p><p>Operators who already use Tiered Storage can enable the new path with a single topic-level flag:</p><pre>remote.wal.storage.enable=true</pre><p>Slack’s proposal shows that Kafka can <em>behave cloud-native</em> without becoming alien to itself.<br>It doesn’t remove disks, it just stops paying them to gossip across zones.<br>No new coordinators, no double formats, no fresh abstractions, just a smarter way to move bytes.</p><p>It’s easy to underestimate how radical that modesty is.<br>While others chase perfect statelessness, 1176 cuts the biggest operational bill and leaves everything else working exactly as before.<br>If the <a href="https://www.apache.org/">Apache community</a> wants a diskless-adjacent feature it can merge tomorrow, this is the one.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*fb1XVdO-ytkJVfCfocTeuQ.png" /></figure><h3>The Bad, Aiven’s KIP-1150: I Shot the Disk</h3><p>If Slack’s sheriff cleaned up the town, Aiven’s gunslinger came to burn the jail. He doesn’t want leaner replication, he wants brokers with <strong>no durable disks at all</strong>.</p><h4>The first shot, KIP-1150 “Diskless Kafka”</h4><p>The original Aiven proposal imagined Kafka without local storage.<br>Brokers would act purely as compute and all data would live in <strong>object storage</strong> such as S3. Durability would come from the object store’s own replication, so Kafka’s <strong>replication factor (RF)</strong> could drop to 1.</p><p>To coordinate writes and reads, KIP-1150 introduced a <strong>Batch Coordinator</strong> that managed ordering, deduplication, and offsets across producers.<br>Producers wrote straight to a shared WAL in the object store, consumers fetched through a distributed cache. It replaced leaders and followers with a flat, <strong>leaderless</strong> design: any broker could serve any partition because there was no local state to own.</p><figure><img alt="A simplified diagram of the Diskless 1.0 data flow. Producers send data to any Kafka broker, which accumulates messages and writes them to object storage. The Batch Coordinator records offset metadata and batch assignments in PostgreSQL. Consumers read via any broker: each fetch triggers a lookup to the Batch Coordinator, which returns object locations, and the broker fetches the corresponding object from storage." src="https://cdn-images-1.medium.com/max/1024/1*RCbJO2j8A_rHAtXpBvlcEA.png" /><figcaption><strong>Simplified Diskless 1.0</strong>:<strong> Stateless brokers, coordinator on the hot path</strong>.<br>Producers <strong>write directly to object storage</strong> through <strong>any broker</strong>. The Batch Coordinator<strong> assigns offsets and tracks batches, serving lookups</strong> for consumer fetches. All reads and writes flow through shared object storage, making brokers effectively <strong>stateless</strong> and <strong>durability fully delegated to the cloud layer</strong>.</figcaption></figure><p><a href="https://aiven.io/blog/guide-diskless-apache-kafka-kip-1150">That architecture</a> looked elegant on a whiteboard, yet catastrophic for anyone who’d run Kafka in anger. Core committers (<em>Jun Rao, Colin McCabe</em>) quickly pointed out what would break:</p><ul><li><strong>Semantics:</strong> transactions and idempotence assume a local leader’s log. You remove that and you rebuild consensus from scratch.</li><li><strong>Latency:</strong> even S3 Standard means 100–200 ms PUTs, far above Kafka’s &lt; 10 ms target.</li><li><strong>Complexity:</strong> the coordinator added a new distributed layer inside another distributed system.</li><li><strong>Migration:</strong> existing topics couldn’t be converted, they’d need to be recreated.</li></ul><p>KIP-1150 wasn’t a patch, but a new product.</p><h4>The Diskless 2.0 rewrite</h4><p>Aiven didn’t abandon the idea, they rebuilt it.<br>The September 2025 update, <a href="https://aiven.io/blog/diskless-unified-zero-copy-apache-kafka"><em>Diskless 2.0 Unified Zero-Copy Kafka</em></a>, folds Diskless into <strong>Tiered Storage (KIP-405)</strong> rather than replacing it.<br>The new principle: <strong>write once to shared WAL objects, reuse Tiered for reads.</strong></p><pre>Producer → Object-store WAL<br>         → Brokers rebuild per-partition local segments (Tiered format)<br>         → Consumers read Local → Tiered fallback</pre><p>Now Kafka still elects leaders and tracks ISR, but only for <strong>metadata</strong>, not data. The <strong>record bytes</strong> live in object storage and the <strong>metadata log</strong> remains replicated in Kafka for coordination. Followers assemble ephemeral local segments from the shared WAL and serve reads out of the page cache, leadership merely assigns which broker uploads those segments.</p><figure><img alt="A Diskless 2.0 diagram where producers write to any Kafka broker, which appends batches into a shared, object-backed write-ahead log and updates the Batch Coordinator with ordering metadata. One broker, marked as the upload leader, uploads rolled segments via Tiered Storage (RLM / RLMM / RSM) into the same bucket. Each broker keeps local per-partition segments as ephemeral caches to serve consumers. Consumers read from brokers: first locally, then via Tiered Storage." src="https://cdn-images-1.medium.com/max/1024/1*wJS-h4xOMK4aXYwWsVfDOw.png" /><figcaption><strong>Simplified Diskless 2.0: </strong><br> Producers append<strong> batches to a shared, object-backed write-ahead log</strong> (WAL). Brokers rebuild <strong>ephemeral per-partition segments on local disks</strong> and serve reads <strong>locally first</strong>, <strong>falling back to Tiered Storage</strong> when data rolls off cache. The <strong>Batch Coordinator</strong> only <strong>orders writes and keeps bounded metadata</strong>, while a <strong>rotating upload leader pushes rolled segments</strong> through the Tiered Storage plugin.</figcaption></figure><p>Key improvements over 1150:</p><ul><li><strong>Unified format</strong> with Tiered Storage → plugins (Iceberg, RLMM) just work.</li><li><strong>Bounded metadata</strong> → only ~two segments per partition tracked in the coordinator.</li><li><strong>Zero-copy migration</strong> → flip a topic type Classic ↔ Diskless without re-ingesting data.</li><li><strong>Stable latency</strong> → Batch Coordinator off the hot path.</li></ul><p>In essence, Diskless 2.0 restored leadership, ISR metadata, and transaction semantics while keeping data durability in the object store.<br>Kafka’s disks became <strong>caches</strong>, not <strong>sources of truth</strong>.</p><p>Still, the limits remain clear:</p><ul><li>Object storage is the only durable copy (RF = 1 for data).</li><li>Cold starts rebuild caches from WAL objects.</li><li>Fast, single-AZ stores (e.g. S3 Express) are assumed.</li><li>The code lives today in <a href="https://github.com/aiven/inkless"><strong>Aiven’s Inkless fork</strong></a>, not Apache Kafka core. Aiven confirmed they’ll upstream “once Tiered and Diskless formats converge.”</li></ul><h4>The verdict</h4><p>KIP-1150 began as an act of rebellion: a clean slate that broke every rule in Kafka’s book.<br>Diskless 2.0 rewrites that act as reform: it keeps the dream of stateless brokers but rebuilds on community foundations.<br>It’s no longer <em>“no disks”, but“no redundant disks.”</em></p><p>Technically audacious, politically feasible, but still a long ride from a merged Apache feature.<br>For now, the Diskless outlaw roams its own fork, testing how far Kafka’s principles can bend before they break.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/752/1*jZU3Z9HerdHSzI5KKN6eRA.png" /></figure><h3>The AutoMQ, KIP-1183: Storage Unification by Refactor</h3><p>AutoMQ doesn’t start at replication policy, it starts at the <strong>log engine boundary</strong>. <strong>KIP-1183</strong> proposes a new storage substrate inside Kafka so <em>both</em> disk-backed and shared-storage engines can plug into the same core. Concretely: introduce AbstractLog/AbstractLogSegment as first-class base type, keep today’s UnifiedLog as the file-backed implementation, add a <strong>SharedLog</strong> implementation for object/file/DFS backends. Managers that currently depend on UnifiedLog (log manager, cleaner, recovery) would instead target the abstract interfaces.</p><p>Because object storage isn’t a filesystem, the KIP sketches an optional <strong>Stream API, </strong>append/fetch/trim over <strong>stream slices </strong>and uses <strong>four streams per partition family</strong>: <strong>Data</strong>, <strong>Time</strong>, <strong>Txn</strong>, and <strong>Meta</strong>. The <strong>MetaStream</strong> holds producer snapshots and mapping info, when leadership moves, a new broker <strong>reloads metadata from the MetaStream</strong> to resume. Kafka’s controller (<a href="https://developer.confluent.io/learn/kraft/">KRaft</a>) would gain a tiny <strong>KV client</strong> and three controller RPCs to map partitions to their MetaStream IDs.</p><h4>What AutoMQ’s model really means</h4><ul><li><strong>Durability &amp; RF.</strong> Data durability is fully <strong>in the shared store,</strong> Kafka typically runs <a href="https://docs.confluent.io/kafka/design/replication.html"><strong>RF</strong></a><strong>=1</strong> at the Kafka layer. Failover therefore requires the next broker to <strong>rebuild state from storage</strong> (MetaStream + streams), which AutoMQ reports as roughly <strong>1–2 s</strong> in practice, reviewers pressed what this looks like with many idempotent producers/transactions.</li><li><strong>Leader semantics.</strong> AutoMQ stays <strong>leader-based</strong> (in contrast with leaderless 1150). However, <strong>brokers don’t exchange rich state,</strong> they <strong>reload it</strong> from MetaStream/KV on leadership change, which explains the slower failovers versus hot in-memory ISR hand-off.</li><li><strong>Scope.</strong> The KIP is intentionally <strong>about surfaces</strong>, not full implementation detail yet: it aims to “make storage pluggable” first, then let engines evolve behind the interface (Stream is <em>recommended</em>, but nominally optional).</li></ul><figure><img alt="Diagram showing a Kafka leader writing to shared storage through the Stream API. Followers use read/fence operations to open the same streams and take over on failover. Both leader and follower can keep small local caches. Consumers read through brokers, the shared storage layer provides durability and coordination." src="https://cdn-images-1.medium.com/max/934/1*ig1mYtXNHgUrXwxx8eEC0w.png" /><figcaption><strong>Simplified AutoMQ arch</strong>: Producers<strong> write to the partition leader,</strong> which <strong>appends to shared-storage streams</strong>.<br>New leaders <strong>reopen those streams in read mode</strong>,<strong> fence </strong>the old writer, and serve consumers <strong>directly from shared storage</strong>. Durability lives entirely in the <strong>Stream API</strong> layer, brokers are <strong>stateless</strong></figcaption></figure><h4>How it differs from Aiven (1150 → Diskless 2.0)</h4><p>Both AutoMQ and Aiven use a <strong>WAL-on-shared-storage</strong> idea, but they wire it into Kafka differently:</p><ul><li><strong>Engine vs pipeline.</strong> Aiven <strong>keeps Kafka’s current engine</strong> and <strong>unifies the pipeline</strong>: write once to shared <strong>WAL objects</strong>, then <strong>replicas re-materialize per-partition local segments</strong> in Tiered format for reads (“Tiered becomes the compactor”). AutoMQ <strong>changes the engine</strong>: a SharedLog built on Stream becomes a peer to UnifiedLog.</li><li><strong>State exchange.</strong> Diskless 2.0 <strong>re-introduces leaders and ISR (for metadata)</strong> and has replicas <strong>assemble caches</strong> from WAL so fetches are local and failover stays within normal ISR choreography. AutoMQ’s brokers <strong>don’t exchange metadata</strong> beyond KRaft KV/MetaStream, <strong>new leaders re-open streams and reload state</strong>, trading simplicity for <strong>~second-level</strong> recovery.</li><li><strong>Where RF applies.</strong> In Diskless 2.0, <strong>Kafka’s RF applies to the metadata log,</strong> <strong>data</strong> is single-sourced in the object store (and Tiered is reused for the read-optimized view). In AutoMQ’s design, Kafka runs <strong>RF=1</strong> and <strong>all</strong> durability comes from the shared backend, yet availability hinges on how fast a broker can <strong>reload MetaStream and reopen streams</strong>.</li></ul><h4>KRaft déjà vu</h4><p>The proposal adds <strong>new core APIs</strong> (AbstractLog, SharedLog, Stream, KVClient, plus controller RPCs). That’s powerful but it’s also <strong>merge surface</strong>. Reviewers explicitly flag <strong>fragmentation risk</strong>: too thin and plugins reinvent core logic (fencing, compaction/merge, caches), too thick and Kafka’s core complexity balloons. It’s the classic like it was with KRaft trade-off: big architectural win, big consensus tax. Even AutoMQ acknowledges the risk and frames the refactor as a way to <em>reduce</em> long-term forking.</p><h4>Bullet summary</h4><ul><li><strong>What it is:</strong> a <strong>storage-engine refactor</strong> (AbstractLog + Stream + KV) so Kafka can run on shared storage without forking the codebase.</li><li><strong>How it works:</strong> <strong>WAL-like streams</strong> per partition family, brokers <strong>reload MetaStream</strong> on failover, <strong>RF=1</strong> in Kafka, durability in storage.</li><li><strong>How it differs from Aiven:</strong> Aiven unifies the <strong>pipeline</strong> and keeps ISR for <strong>metadata and </strong>AutoMQ changes the <strong>engine</strong> and accepts <strong>slower failover</strong> in exchange for a cleaner “stateless” model.</li><li><strong>Biggest risk:</strong> <strong>API fragmentation</strong> and <strong>second-order complexity</strong> in plugins vs core, following the infamous <strong>KRaft lesson</strong>.</li></ul><h4>The reality: AutoMQ already runs the future</h4><p><strong>WarpStream</strong> was the first to market the idea of a diskless, object-store Kafka service, but <strong>AutoMQ were the first to make it work at visible production scale and to document it openly</strong>.<br>They built it, ran it in production, and <strong>shared their design and findings openly</strong>, giving everyone else, Aiven included, a real implementation to learn from and improve.</p><p>Their openness stands in sharp contrast to <strong>cloud vendors’ closed models</strong> like <a href="https://docs.aws.amazon.com/msk/latest/developerguide/msk-broker-types-express.html"><strong>MSK Express brokers</strong></a>, which are <strong>probably running diskless in some form</strong>, yet remain completely opaque.<br>With AutoMQ, we can trace how the system works, why certain choices were made, and where it can improve.<br>That transparency: <em>messy, evolving, but real</em> is what keeps progress visible.</p><p>That’s why I have a soft spot for AutoMQ as they don’t just build the future, they open it up for everyone to build together, yet continuing pushing the frontier with new ideas and features.</p><h3>The Frontier Shifts</h3><p>Something irreversible has started.<br>Kafka is no longer a broker farm with disks and replication: it’s becoming a topology of storage, compute, and intent.<br>The question is not <em>if</em> it detaches from local state, but <em>how</em> that detachment will be expressed and standardized.</p><p>Slack and Aiven are defining the safe roads, pragmatic ways to make Kafka cheaper, lighter, and easier to operate.<br>AutoMQ has already crossed the river and is building on the other side.<br>Their model isn’t the destination for everyone, but it proves that Kafka can survive and even thrive when storage is no longer a substrate but a medium.</p><p>What happens next will blur the boundary between streaming and storage entirely.<br>Kafka may stop being a product you deploy and become a behavior you compose: elastic, stateless, and transparent in cost and durability.<br> The work done now by Slack, by Aiven, by AutoMQ is not about who arrives first, but about expanding what <em>Kafka</em> can mean.</p><h4>Acknowledgment</h4><p>Thanks to the teams at <strong>Slack</strong>, <strong>Aiven</strong> and <strong>AutoMQ</strong> for pushing the frontier forward.<br>You’re doing the hard work that keeps Kafka evolving and the rest of us curious.</p><blockquote>Beyond the snapshots and the segments, something larger is forming: a log without the wall.</blockquote><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*-Bc_-RAmlYReL-bO1bRd-w.png" /><figcaption>Pink Floyd, obviously. Like I’d settle for less. :P</figcaption></figure><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=5aa7a8748e71" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/the-good-the-bad-and-the-automq-5aa7a8748e71">The Good, The Bad and The AutoMQ</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Switching me Softly]]></title>
            <link>https://medium.com/fresha-data-engineering/switching-me-softly-cb404d02c28b?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/cb404d02c28b</guid>
            <category><![CDATA[postgresql]]></category>
            <category><![CDATA[database]]></category>
            <category><![CDATA[aws]]></category>
            <category><![CDATA[big-data]]></category>
            <category><![CDATA[data-science]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Mon, 29 Sep 2025 12:38:35 GMT</pubDate>
            <atom:updated>2025-10-03T18:00:48.733Z</atom:updated>
            <content:encoded><![CDATA[<h4>Zero-downtime 12 -&gt; 17 PostgreSQL Upgrade at Fresha</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*dvovOp-wYZfxunmYSYoiCQ.png" /><figcaption>Killing me softly with his bash song,<br> Zero-downtime all along…</figcaption></figure><blockquote>Artwork by Bojan Jevtić(<a href="https://bojan-jevtic.pixels.com/">https://bojan-jevtic.pixels.com</a>). Used with permission.</blockquote><h3>Introduction</h3><p>Upgrading <a href="https://www.postgresql.org/">PostgreSQL</a> in production is never just about running pg_upgrade.<br> At <a href="https://www.fresha.com/">Fresha</a> we run <strong>around 200+ PostgreSQL databases</strong>, and by the end of 2024 some of our most critical and heavily loaded ones were <strong>still stuck on Postgres 12, </strong>specifically around <strong>20</strong> dbs. PostgreSQL 12 was heading for end-of-life in <strong>November 2024 -February 2025</strong> on <a href="https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Welcome.html">RDS</a>, but the risk and pain of upgrades had been high enough that we had postponed them again and again.</p><p>This created a dangerous situation: business-critical data on a soon-to-be unsupported version, and no safe path forward. We had <a href="https://debezium.io/">Debezium</a> streaming change events into <a href="https://kafka.apache.org/">Kafka</a>, outbox connectors producing ordered domain events, and fleets of replicas serving reads. Dropping into “maintenance mode” for even a few minutes was unacceptable.</p><p>For years, upgrades had been treated like staring into car headlights and everyone froze. I wanted to break that cycle. By combining database internals, streaming knowledge, and a healthy dose of bash automation, I showed how zero-downtime upgrades could work in practice. Once we proved it on the hardest PG12 databases, we scaled the method into a repeatable solution for dozens of teams and hundreds of databases.</p><p>Let step back for a little: the obvious path was always “schedule maintenance windows” and take the hit. But that’s not realistic when your system is 24/7 and globally distributed. The real challenge wasn’t just moving the data, it was dealing with the entire ecosystem around it:</p><ul><li><strong>Debezium CDC connectors</strong> continuously streaming changes into Kafka.</li><li><a href="https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/"><strong>Outbox pattern</strong></a><strong> connectors</strong> generating ordered business events.</li><li><strong>Replication slots and </strong><a href="https://www.postgresql.org/docs/current/wal-intro.html"><strong>WAL</strong></a><strong> management</strong> that can’t just be dropped and recreated without data loss.</li><li><strong>Physical replicas</strong> serving production reads.</li><li><a href="https://www.pgbouncer.org/"><strong>PgBouncer</strong></a> pools handling thousands of concurrent client connections.</li></ul><p>Instacart’s <a href="https://www.instacart.com/company/how-its-made/zero-downtime-postgresql-cutovers">zero-downtime cutover post</a> describes a well-known approach in DBA world:</p><ol><li>Take a consistent base backup (RDS snapshot or pg_basebackup).</li><li>Keep a replication slot open on the primary so WAL keeps accumulating.</li><li>Align the replica’s replication origin with the backup last known LSN.</li><li>Stream the changes from the slot until the replica catches up.</li><li>Promote the replica and cut traffic over.</li></ol><figure><img alt="Diagram showing a PostgreSQL blue-green upgrade process. The “Blue” database takes a backup that produces a snapshot with a saved LSN. The snapshot is restored into the “Green” database. A logical replication slot accumulates WAL changes during restore. A subscription on the green side is then advanced with pg_replication_origin_advance to the saved LSN, allowing replication to continue without data loss." src="https://cdn-images-1.medium.com/max/1024/1*0R_WZgqpbcf1cxvrtx6UbQ.png" /><figcaption>Logical replication during PostgreSQL blue-green upgrade: WAL is accumulated while restoring the snapshot, then replication origin is advanced on the new cluster to catch up seamlessly.</figcaption></figure><p>That mechanism works, but it’s only the foundation. In practice it doesn’t address the messy parts we faced: Debezium connectors tied to those slots, outbox event ordering, sequence alignment, PgBouncer switchovers, and idempotency guarantees. Those were the hard problems we had to solve to make upgrades safe and repeatable at Fresha. And yes we haven’t stopped on just major upgrades. We did <strong>12 -&gt; 17</strong>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/914/1*1pJYoq3Vah1mJfEMCh6hXg.png" /></figure><h3>Why Classic Approaches Don’t Work</h3><p>The classic “restore from snapshot, upgrade, and cut over” is conceptually simple. But in reality:</p><blockquote>Logical replication must continue through the upgrade process.</blockquote><blockquote>Every physical replica needs to remain consistent and available for read-only queries.</blockquote><blockquote>Kafka connectors tied to slots on the old cluster must transition cleanly, otherwise they either lose data or emit duplicates in unpredictable order.</blockquote><p>Before building our own method, we looked at the obvious options:</p><h4>RDS Blue/Green Deployments</h4><p>Amazon RDS offers <a href="https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/blue-green-deployments.html">Blue/Green upgrades</a> that spin up a parallel environment and promise minimal downtime. Sounds perfect until you realize it doesn’t integrate with the <a href="https://www.postgresql.org/docs/15/logical-replication.html"><strong>logical replication slots</strong> </a>and <strong>Debezium connectors</strong> we rely on. Blue/Green can swap endpoints, but it doesn’t handle Kafka offsets, connector states, or downstream deduplication different requirements. For us, that’s a non-starter.</p><h4>Direct pg_upgrade on Production</h4><p>In-place upgrades sound easy: just stop Postgres, run <a href="https://www.postgresql.org/docs/current/pgupgrade.html">pg_upgrade</a>, restart. But that implies <strong>minutes or hours of downtime</strong> on terabyte-scale databases. It also assumes you’re comfortable with no rollback option beyond restoring backups. We weren’t.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/922/1*4o04TMt33_FBvzVVInItbQ.png" /></figure><p>So we built something different:</p><ul><li>Start<strong> target database</strong> as a logical replica restored from a snapshot.</li><li>Rebuild its <strong>physical replicas</strong> alongside, so the new cluster mirrors the old one.</li><li>Use <strong>PgBouncer</strong> as the traffic switch, with a scripted pause/resume to freeze connections during the switchover.</li><li>Design explicit <strong>Debezium handling modes</strong> for both CDC and outbox connectors.</li></ul><figure><img alt="Diagram of PostgreSQL blue-green switchover with connectors. The “Blue” and “Green” databases start in sync. Matching outbox and CDC warehouse replication slots are created on both sides. New Kafka Connect connectors are set up to use these slots with accumulated changes. PgBouncer handles the switchover, after which the green database becomes the active writer while staying in sync with blue." src="https://cdn-images-1.medium.com/max/896/1*r3Kq-6GZKLRpUNSVvuFTpw.png" /><figcaption>Synchronizing slots and connectors: duplicate replication slots, create new Kafka Connect connectors, then switchover PgBouncer to move traffic to the upgraded database.</figcaption></figure><p>The rest of this article walks through how we turned that into an automated, developer-friendly upgrade framework that anyone on our team could run in ~5 minutes per database.</p><h3>Winter Is Coming: Prewarm &amp; Analyze</h3><p>Catching up a replica is only half the story. Before you flip traffic, you need to make sure the new cluster can actually carry production.</p><p><strong>Consistency checks.</strong> On staging we ran PG 17 with full app tests: migrations, queries, extensions. Don’t skip this. If the app isn’t green here, it won’t be green in prod.</p><p><a href="https://www.postgresql.org/docs/current/pgprewarm.html"><strong>Prewarm</strong></a><strong> and </strong><a href="https://www.postgresql.org/docs/current/sql-analyze.html"><strong>analyze</strong></a><strong>.</strong> A fresh RDS restore is cold. First queries crawl while blocks come in from object storage to load to disk. We prewarmed critical tables and ran ANALYZE so stats were correct and the planner didn’t go wild on the first real query.</p><p><strong>Two-phase switchover.</strong></p><ol><li><strong>Read-only flip</strong>: move replicas first. All writes still hit Blue, but reads come from Green. This is the production smoke test — if queries fail here, you still have a rollback button.</li><li><strong>Full flip</strong>: once RO is solid, pause PgBouncer pools, wait for replication to be fully caught up, then point writes at Green. Under the hood, “full” just means “RO first, then RW.”</li></ol><figure><img alt="Diagram of blue-green database switchover with PgBouncer. The Blue master and its replica are on the left, the Green master and replica are on the right. PgBouncer sits in the middle, controlling connections. The diagram highlights two phases: a read-only switchover (redirecting traffic from Blue replica to Green replica) and a full switchover (redirecting traffic from Blue master to Green master). Red X marks show disabled old paths once switchover occurs." src="https://cdn-images-1.medium.com/max/1024/1*YDrZlzUhWBI7aqzxEfBg5A.png" /><figcaption>PgBouncer routes traffic during switchover replicas can be switched read-only first, followed by a full master switchover.</figcaption></figure><p><strong>PgBouncer tricks.</strong> Running it on <a href="https://kubernetes.io/">Kubernetes</a> comes with its own set of quirks. One of the big ones: how do you safely reschedule pods without jolting the on-call awake at 3 a.m.? That rabbit hole probably deserves its own article.</p><p>In our case, the pain point was ConfigMaps. Redeploying them was just too slow. The fix? Pre-mount all configs on every pod, and when it’s time to switch over, skip the redeploy and just fire off a few <a href="https://www.pgbouncer.org/usage.html">admin</a> commands.</p><pre>SET conffile = &#39;/etc/pgbouncer/pgbouncer_new_rw.ini&#39;;<br>RELOAD;</pre><p>No pod restarts, no reconciliation delay.</p><h3>Mind the Gap (setval +100k)</h3><ul><li><strong>Advance early, outside the pause.</strong> Right before switchover we run sync_sequencesscript, which iterates all sequences from <a href="https://neon.com/postgresql/postgresql-tutorial/postgresql-sequences">pg_sequences</a> on <strong>Blue</strong>, reads each sequence’s <strong>last_value</strong>, and then <strong>bumps the same sequence on Green</strong> to last_value + sync_sequences_gap (default 100000). This keeps the flip window short.</li><li><strong>Exact mechanics.</strong> For each schemaname.sequencename:</li></ul><pre>-- On Green <br>SET transaction_read_only = OFF; <br>SELECT setval(&#39;&lt;schema.seq&gt;&#39;, &lt;blue.last_value&gt; + &lt;gap&gt;, true);</pre><p><strong>Why this is fine.</strong> We only need sequences to <strong>never collide</strong> after the flip. Pushing Green well ahead of Blue guarantees new inserts won’t reuse a value, and sequences are monotonic anyway — no need to “pull back” on rollback.</p><h3>Handling Debezium During Switchover</h3><p><strong>The problem:</strong> Debezium ties each connector to a <strong>logical replication slot</strong> on the source (Blue). When you bring up the target (Green) and want the same data to continue into the same topics, you may want to accept a short <strong>overlap window</strong> where both Blue and Green can emit the <em>same</em> changes.</p><h3>CDC to Warehouse (easy mode: dedup)</h3><p>For <strong>CDC →Debezium -&gt; </strong><a href="https://www.snowflake.com/en/"><strong>Snowflake</strong></a>, we already <strong>deduplicate</strong> downstream. So we intentionally create a small overlap:</p><figure><img alt="Diagram of WAL accumulation for change data capture (CDC) during a PostgreSQL blue-green upgrade. The Blue and Green databases are in sync. Each has a CDC warehouse slot that accumulates WAL changes. Red boxes illustrate “Accumulated WAL” on both sides, with an intentional “Overlap” zone in the middle. This overlap ensures no data loss during the switchover when replication switches from Blue to Green." src="https://cdn-images-1.medium.com/max/1024/1*MS-IzK0oXKEtGQlGfj01Ig.png" /><figcaption>Managing WAL overlap:both Blue and Green CDC slots accumulate changes, with a controlled overlap to ensure continuity.</figcaption></figure><ul><li>Keep Blue’s connector running.</li><li>Create Green’s connector <strong>with the same schema/topic config</strong>, start from the <strong>fresh slot</strong> on Green.</li><li>Allow a short <strong>duplication window</strong> (configurable) so both produce the same tail of WAL.</li><li>Stop Blue’s connector.</li></ul><p>Result: Snowflake reconciles quickly and retains <strong>one copy</strong> of each row/version. This is reliable, fast, and zero-risk if dedup is in place.</p><h3>Outbox/Event Streams (stricter)</h3><p>Outbox topics care about <strong>ordering semantics</strong>. Our rule of thumb:</p><ul><li>During the flip there should be <strong>no “funny ordering”, </strong>so the consumer either sees an event it has already processed, or it’s a <strong>brand-new</strong> event.</li></ul><p>We guarantee that by:</p><ul><li><strong>Pausing writes</strong> on Blue right before the RW flip (brief), giving Debezium time to flush the tail (controlled in config).</li><li>Only then starting the Green connector.</li><li>Keeping the <strong>topic keying</strong> and <strong>partitioning</strong> identical, so any duplicates are the <em>same key in the same partition</em>.</li></ul><figure><img alt="Diagram of outbox slot handover during PostgreSQL blue-green upgrade. The Blue and Green databases are shown in sync. An old connector writes to an outbox slot on Blue, accumulating WAL. Step 1: wait for the connector to flush all WAL. Step 2: a new connector begins reading from the outbox slot on Green, starting at the overlapped WAL range. This ensures continuity without missing or duplicating events." src="https://cdn-images-1.medium.com/max/1024/1*3A20gvIE7B2caCayKIcflw.png" /><figcaption>Outbox connector cutover: old connector flushes WAL, new connector starts reading from the overlapped slot on the upgraded database.</figcaption></figure><p>If your consumers are <strong>idempotent</strong> (exactly-once at the business level), this pattern is safe: duplicates are replays of the same message, never re-ordered across the cut.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*JdwzcIlc4a1w7F2BODZs0g.png" /></figure><p><strong>Slots &amp; offsets:</strong> You don’t “move” a slot, you <strong>create a new slot on Green</strong> and start the connector there. Old connector keeps its Blue slot until you stop it. Kafka <strong>offsets</strong> continue per-topic/partition and duplicates are identical payloads, not new keys.</p><p><strong>Overlap discipline:</strong> Keep the window short. Long overlaps create needless churn and noise in metrics.</p><p><strong>Sensible checks:</strong> Only flip RW after:</p><ul><li>Blue→Green replication lag under threshold.</li><li>Outbox writes paused and <strong>flush complete</strong> (we wait a bit to drain).</li><li>Green connector running and healthy.</li></ul><p><strong>Rollback:</strong> If anything looks off, stop the Green connector, resume Blue, unpause writes. Because we haven’t changed keys/ordering, consumers remain consistent.</p><h3>Orchestration mode: strict ordering, zero confusion</h3><p>When idempotency isn’t a sure bet for outbox consumers, we <strong>split the stream cleanly</strong>: old DB emits only “old” events, new DB emits only “new” events. No interleaving, no ambiguity.</p><p><strong>How we mark events</strong></p><ul><li>We add a boolean <strong>version column</strong> to outbox tables, e.g. use_pg17 BOOLEAN DEFAULT NULL (present on all partitions).</li><li>Right before the flip, we change the <strong>default</strong> on the <strong>target (Green)</strong> to TRUE (and keep it NULL/FALSE on <strong>source/Blue</strong>). New writes on Green automatically carry use_pg17=true.</li></ul><p><strong>How we route events</strong></p><ul><li>We create a <strong>new outbox connector</strong> on Green with a <a href="https://debezium.io/documentation/reference/stable/transformations/content-based-routing.html"><strong>JSR-223 Groovy filter</strong></a> that only passes rows where use_pg17 == true. The old connector keeps running on Blue but filters to “not true” (null/false). This cleanly divides the topic stream.<br>Green filter:<br> value.after.use_pg17 == true<br> Blue filter:<br> value.after.use_pg17 == null || value.after.use_pg17 == false</li><li>In the connector <a href="https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html">config</a> we set:</li></ul><pre>transforms=filter, <br>outbox transforms.filter.type=io.debezium.transforms.Filter,<br>transforms.filter.language=jsr223.groovy, <br>transforms.filter.condition=&lt;condition above&gt;,<br>transforms.outbox.type=io.debezium.transforms.outbox.EventRouter,<br>snapshot.mode=never</pre><p>This is applied to old and new connectors.</p><p><strong>Switchover choreography</strong></p><ol><li><strong>Prep:</strong> ensure use_pg17 exists on all outbox tables; the script checks this and fails fast if missing.</li><li><strong>Flip default on Green:</strong> set use_pg17 DEFAULT TRUE so any <strong>new</strong> events after the write flip carry the tag.</li><li><strong>Start Green outbox connector</strong> with the <strong>use_pg17==true</strong> filter, old Blue connector still runs with <strong>“not true”</strong> filter.</li><li><strong>Pause writes on Blue</strong>, let Debezium flush and old connector process current events, switchover to Green, <strong>resume writes</strong> and Green produces new (tagged) events, filtering duplicates from old.</li><li><strong>Retire Blue connector</strong> once the tail is drained; <strong>drop the filter</strong> on Green if desired (after Blue is off, everything is “new” anyway). Connector lifecycle and config mutation are automated via the<a href="https://developer.confluent.io/courses/kafka-connect/rest-api/"> Connect REST API</a> in the script.</li></ol><figure><img alt="Diagram of PostgreSQL outbox orchestration in a blue-green upgrade. Blue and Green databases are in sync. Events from Blue have use_pg17 = NULL, while events from Green have use_pg17 = true. Step 1: filtering ensures only the right events are passed. Step 2: the old connector flushes WAL from the Blue outbox slot. Step 3: the new connector starts reading from the Green outbox slot at the overlap. Step 4: filtering removes duplicates so only the Green events remain. The result is continuous, dupl" src="https://cdn-images-1.medium.com/max/1024/1*FubwsdBgyFP67IAoVmIYVA.png" /><figcaption>Outbox orchestration: old and new connectors overlap, using a version flag to filter duplicate events during switchover.</figcaption></figure><p><strong>Why this works</strong></p><ul><li>Topics never receive mixed “old/new” events during the window. Consumers either see <strong>already-processed</strong> events (from Blue) or <strong>brand-new</strong> events (from Green). There’s no reordering across the boundary because <strong>we pause writes</strong>, drain, then resume.</li><li>Rollback is symmetric: flip the defaults back, adjust filters (the script handles forward/reverse conditions automatically).</li></ul><h3>The Greatest Teacher, Failure Is</h3><p>No matter how carefully you plan, switchover days have a way of surfacing the unknowns. We built in multiple safety nets to make sure “wrong” never meant “irreversible.”</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/920/1*KTRs9GQZIkeS4nCWCNofMQ.png" /></figure><p><strong>CDC connectors are the easy part.</strong><br> If a Debezium CDC connector stumbles, our downstream (Snowflake) is already deduplicating on keys, so we can always re-consume the stream. Worst-case, we spin up a new connector with snapshot.mode=initial against a trimmed dataset, relying on table indices to cut over at a known safe point. Consumers will reconcile cleanly.</p><p><strong>Outbox connectors are trickier.</strong><br> Because they carry business events, we can’t just blindly replay snapshots. That’s why we leaned on idempotent consumers and, when possible, the Kafka Connect REST API to drop offsets. On older Connect versions, the fallback was heavier: create a fresh connector/slot pair, trim the table online to the events we care about, then re-bootstrap. You may use something like my <a href="https://medium.com/fresha-data-engineering/streamlining-data-cleanup-in-postgresql-a-ninja-approach-ae9df3507b6e">previous article</a> as a recipe.</p><p><strong>If the new database turns out to be unusable.</strong><br> Our orchestration doesn’t just go forward, but also it can reverse. Once the switchover completes, we can run the same scripts in reverse: switch PgBouncer back, flip replication slots/publications, and re-attach connectors to the old cluster. In theory, you can loop this forward/backward multiple times until you’re satisfied.</p><p><strong>Dry-run mode as insurance.</strong><br> We even added a “test catchup” mode in switchover.sh: it pauses writes, lets logical replication and slots catch up, but never actually flips traffic to the new cluster. This gave us two superpowers:</p><ul><li>measure how long the pause would last under real workload,</li><li>test whether the application tolerated read-only windows.</li></ul><p>If it didn’t? We’d just resume writes on the source and treat it as a rehearsal, not a disaster.</p><p><strong>Takeaway</strong>: A zero-downtime upgrade isn’t just about the happy path. It’s about building escape hatches: reset CDC, replay outbox carefully, reverse the switchover if needed, and dry-run until you trust the process.</p><h3>Everything Everywhere All at Once, known as YAML</h3><p>One of the biggest lessons we learned after the first few upgrades was: <em>you don’t want hardcoded scripts</em>. Each database had its quirks: different connector sets, replica layouts, outbox usage, so we pushed everything into a YAML config and a single entrypoint.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/688/1*-L27Sax934AHRbq-YxUv5g.png" /><figcaption>Yeah, you can do modules with Bash :)</figcaption></figure><p>Our switchover.sh script is really just a thin wrapper around sub modules in lib/, reading from switchover-config.yaml. That config defines:</p><p><strong>Source and target clusters</strong></p><pre>source:<br>  internal_name: postgres_source<br>  host: localhost<br>  port: 5433<br>  replicas:<br>    - name: postgres_source_ro_1<br>    - name: postgres_source_ro_2<br>target:<br>  internal_name: postgres_target<br>  host: localhost<br>  port: 5434<br>  replicas:<br>    - name: postgres_target_ro_1<br>    - name: postgres_target_ro_2</pre><p>This lets us model not just primaries, but physical replicas, so PgBouncer can safely switch <em>read-only</em> traffic separately from <em>read-write</em>.</p><p><strong>Database-level details</strong> (slots, publications, subscriptions):</p><pre>database:<br>  name: testdb<br>  user: testuser<br>  password: testpass<br>  logical_slot: dms_slot<br>  logical_publication: dms_pub<br>  logical_subscription: dms_sub<br>  transaction_wait_time: 3        # allow txs to finish before going RO on Blue</pre><p><strong>PgBouncer pools and config files</strong> for pre-delivered switchover configs:</p><pre>pgbouncer:<br>  switchover_rw_config_file: pgbouncer_new_rw.ini<br>  switchover_ro_config_file: pgbouncer_new_ro.ini</pre><ul><li><strong>Kafka connectors</strong> and how they map to slots:</li></ul><pre>kafka:<br>  connect_clusters:<br>    - name: cdc<br>      url: http://localhost:8083<br>    - name: outbox<br>      url: http://localhost:8084<br>debezium:<br>  outbox:<br>    version_column: use_pg17<br>    tables:<br>      - outbox_events_1<br>      - outbox_events_2</pre><p><strong>Replication tolerances</strong> (lag thresholds, sequence gaps, catchup timeout):</p><pre>replication:<br>  max_lag_bytes: 20000        # What to consider no lag.<br>  catchup_timeout: 10         # How long to pause to catchup<br>  sync_sequences_gap: 100000  # Some safe gap for sequences, <br># we don&#39;t want to create sequences on 1k tables during pause :)</pre><p>Why expose so many options? Because upgrades are never uniform:</p><ul><li>Sometimes you want <strong>duplication mode</strong> for Debezium (let duplicates flow, downstream deduplicates).</li><li>Sometimes you need <strong>orchestration mode</strong> (JSR filters + version columns to separate old vs new events).</li><li>Some teams only wanted to test <em>read-only</em> switchover, without touching writes.</li><li>On the biggest DBs, we needed to <em>pause transactions</em>, wait for catchup, then flip. On smaller ones, we could skip the wait entirely.</li></ul><p>Instead of maintaining a dozen scripts, we ended up with a single command:</p><pre>./run.sh switchover \<br>  --direction forward \<br>  --mode full \<br>  --debezium-mode orchestration \<br>  --transaction-mode wait \<br>  --test-catchup disabled \<br>  --config configs/switchover-config.yaml</pre><p>Every option maps directly to one of those operational decisions. This made the process reproducible, reversible, and explainable.</p><h3>Results</h3><p>Between January and February, we upgraded <strong>20+ PostgreSQL databases from PG12 to PG17</strong> without a single minute of downtime. Even the largest and most sensitive database, the one where we couldn’t assume idempotent consumers, was migrated safely using the orchestration trick with version columns and JSR filters.</p><p>And yes, after the final switchover, we had a cake. 🎂</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/920/1*dZHd148Yz2n5JaTdeHSCMg.png" /></figure><p>But more importantly, we ended up with a <strong>repeatable framework</strong>:</p><ul><li>The same scripts and configs could be applied across clusters.</li><li>Every option (duplication vs orchestration, test catchup vs full switchover) was codified and reusable.</li><li>Rollback and dry-run modes gave us confidence under pressure.</li></ul><figure><img alt="Full PostgreSQL blue-green switchover sequence diagram. On the left, Blue and Green databases start in sync. Steps include: prewarm and analyze, create replication slots in Green, move sequences, stop writes, allow transactions to finish, and pause the PgBouncer pool. Then PgBouncer changes hosts for read-only and read-write traffic, Debezium connectors are orchestrated, and the PgBouncer pool is resumed., writes are enabled in Green, Blue becomes a replica, and that’s it." src="https://cdn-images-1.medium.com/max/1024/1*rbCOHPb30qFOBm9foGKvdg.png" /><figcaption>End-to-end switchover flow: from prewarming and replication slot setup, through pausing PgBouncer and Debezium orchestration, to resuming traffic on Green.</figcaption></figure><h3>Conclusion</h3><p>Zero-downtime PostgreSQL upgrades are not just possible, but they’re now a reality for our team. With careful orchestration of CDC connectors, outbox event streams, replication slots, and PgBouncer traffic, we moved our production fleet forward to PG17 without disruption.</p><p>That said, the <strong>switchover is only half the story</strong>. The switchover itself is elegant, but the supporting work can still feel tedious:</p><ul><li>disabling migrations while replicas catch up</li><li>tweaking PgBouncer configs</li><li>changing <a href="https://developer.hashicorp.com/terraform">Terraform</a> definitions for connectors</li><li>creating PRs, waiting for approvals, and fixing the inevitable typo.</li></ul><p>We’re not stopping here. The scripts and configs are good enough today that <strong>teams across Fresha can already run upgrades on their own</strong> by following the documented process. The next step is to take the human glue out of it:</p><ul><li>templated configs</li><li>PRs generation</li><li>orchestration with <a href="https://aws.amazon.com/step-functions/">Step Functions</a> on AWS</li><li>and finally a workflow where upgrading Postgres feels like a safe, boring routine.</li></ul><p>We’ve proven this works at Fresha scale, and now we’re investing in making it a one-button operation. When PG18 lands, we don’t want anyone on-call to break a sweat</p><h3><strong>Credits</strong></h3><ul><li><strong>Anton Borisov</strong> — rule-breaker and architect of the impossible, brought the idea nobody thought would fly, built the first script, solved the hardest DB hurdle, and set the pace as the project’s boldest innovator. Basically the one person you shouldn’t let near production… unless you want results.</li><li><strong>Emiliano Mancuso</strong> — Head of infrastructure, believed in the process when others said it was impossible. Also doubles as our premier rubber duck.</li><li><strong>Jan Zaremba</strong> — leader of Infrastructure, encyclopedic knowledge and steady hand that made every iteration of the implementation safer than the last.</li><li><strong>Robert Pyciarz</strong> — doomsday architect, mapped out every failure path and tucked Kubernetes into all the right corners.</li><li><strong>Paritosh Anand &amp; Paweł Michna</strong> — deadline wrestlers, ran upgrades at crunch time and wrote down every step so the rest of us could sleep.</li><li><strong>Blend Halilaj</strong> — snapshot tactician, turned the idea of blue-green RDS restores into a clean, repeatable workflow.</li></ul><p>And a special encore for <strong>Rehan Ullah, Sam Stewart, and (again) Paweł Michna</strong>, the crew who is turning all of this into a one-button upgrade right now.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=cb404d02c28b" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/switching-me-softly-cb404d02c28b">Switching me Softly</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[What the Fuss with Fluss: Flink Delta Force]]></title>
            <link>https://medium.com/fresha-data-engineering/what-the-fuss-with-fluss-flink-delta-force-1ab3d6be5c98?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/1ab3d6be5c98</guid>
            <category><![CDATA[data]]></category>
            <category><![CDATA[apache-flink]]></category>
            <category><![CDATA[data-science]]></category>
            <category><![CDATA[programming]]></category>
            <category><![CDATA[data-engineering]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Mon, 22 Sep 2025 15:29:37 GMT</pubDate>
            <atom:updated>2025-09-22T20:11:04.487Z</atom:updated>
            <content:encoded><![CDATA[<p>Released July 31, 2025, <a href="https://flink.apache.org/2025/07/31/apache-flink-2.1.0-ushers-in-a-new-era-of-unified-real-time-data--ai-with-comprehensive-upgrades/">Flink 2.1</a> dropped a bombshell on the streaming world: <a href="https://lists.apache.org/thread/pvylkyfc5yq3tks0vx7kpzt5k8gjy0x1"><strong>DeltaJoin</strong></a><strong> and </strong><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/calcite/rel/rules/MultiJoin.html"><strong>MultiJoin</strong></a>. These aren’t your typical join operators, they’re surgical instruments designed to excise the “state tumor” that’s been metastasizing in production Flink clusters for years.</p><figure><img alt="Picture presenting otters as a mascot for Apache Fluss" src="https://cdn-images-1.medium.com/max/1024/1*deqJfBtBD15r4CVqwJgSiA.png" /><figcaption>After messing with Flink/Fluss you’ll get addicted to otters</figcaption></figure><p>Every <a href="https://flink.apache.org/">Flink</a> team knows the dirty secret: <strong>join state inflates faster than a London rental listing</strong>. You start with gigabytes, graduate to terabytes, then watch helplessly as checkpoints stretch from seconds to minutes to “let’s just restart the job.” <a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/datastream/fault-tolerance/state/">Traditional streaming joins hoard</a> everything — every customer record, every order, every update, because they might need it someday. It’s digital hoarding at datacenter scale.</p><p>The numbers are sobering. For many teams, a modest e-commerce platform joining orders with customers accumulates <strong>tens of gigabytes of state per billion events</strong>. Add product catalogs, inventory, shipping — suddenly you’re managing terabytes of state across your topology. Checkpoints that started at 30 seconds stretch to 5 minutes, then 20. Recovery time follows the same trajectory. Your on-call rotation becomes a game of Russian roulette: whose turn is it when the next OOM brings down production?</p><p>Sure, you can fight back — <strong>state TTLs to expire old data, interval joins with time bounds, broadcasting small dimension tables, or even falling back to batch reprocessing</strong>. Some teams pre-partition data to reduce per-operator state or switch to temporal joins where possible. But these workarounds either limit functionality or add operational complexity. <strong>That’s why the industry is racing toward fundamental solutions</strong>: DeltaJoin externalizes state entirely, <a href="https://risingwave.com/">RisingWave</a> uses cloud-native shared storage, <a href="https://www.feldera.com/">Feldera</a> pre-materializes everything, so different philosophies, same recognition that traditional join state is broken.</p><p><strong>DeltaJoin flips the script: why store when you can fetch?</strong> Instead of maintaining partner history in operator state, it queries an external indexed store at emit time. <strong>Flink’s DeltaJoin with eventual consistency always probes the latest counterpart at emit time (no snapshot pinning)</strong>. The payoff is dramatic: minimal checkpoint overhead, lightning-fast recovery, and jobs that actually scale elastically instead of just claiming to.</p><blockquote>Streaming is being reinvented in front of us.</blockquote><blockquote>Flink/Fluss, RisingWave, and Feldera are not rivals so much as different philosophies of state and paying attention now is how we learn which ideas will define the next decade.</blockquote><p>I’ll focus mainly on Flink and <a href="https://fluss.apache.org/">Fluss</a>, but the story wouldn’t be complete without glancing at how RisingWave and Feldera tackle the same challenge from other angles.</p><h3>Enter Fluss: The Storage Layer That Gets It</h3><p>This lookup-based rebellion demands a <a href="https://jack-vanlightly.com/blog/2025/9/2/understanding-apache-fluss">cooperative storage layer</a>. Apache Fluss (Incubating) isn’t just another streaming store, it’s purpose-built for this exact pattern. While other stores make you choose between streaming updates and efficient lookups, <strong>Fluss delivers both through its prefix lookup capability on primary-key tables</strong>.</p><p>The magic is in the prefix. Most lookup stores demand exact keys, meaning you need the complete primary key or you’re out of luck. <strong>Fluss lets you probe with just a prefix</strong>. Got a composite key (customer_id, order_id, item_id) but only know the customer? Traditional stores would force a scan or secondary index if supported. Fluss says &quot;no problem&quot; as your prefix lookup hits a tight <a href="https://rocksdb.org/">RocksDB</a> range scan on a single tablet. <a href="https://paimon.apache.org/"><strong>Paimon&#39;s</strong></a><strong> PK-based lookup works great when your join keys match the primary key exactly; Fluss&#39;s prefix lookup makes composite-key joins practical when only a prefix is available,</strong> and it’s<strong> </strong>the real-world case in enrichment pipelines.</p><figure><img alt="Comparison of Apache Paimon vs Fluss lookup behavior. On the left, Paimon can only join when the entire primary key matches; joining on a partial key like customer_id with a composite (customer_id, order_id) fails, requiring an aggregate workaround. On the right, Fluss supports prefix lookups, allowing joins on just customer_id with subranges resolved by order_id, enabling efficient composite-key joins." src="https://cdn-images-1.medium.com/max/1024/1*lKGfyOFxWWZki7cawQkx5Q.png" /><figcaption>Prefix lookups make the difference: Paimon requires full primary key matches (or workarounds like nested aggregates), while Fluss supports probing with just a prefix — making composite-key joins practical in real pipelines.</figcaption></figure><p>Under the hood, <strong>each Fluss table bucket maps to a </strong><a href="https://fluss.apache.org/blog/pk-key-tables-log-cache-streaming/"><strong>KV tablet backed by RocksDB, plus a child log tablet</strong></a><strong> for the changelog</strong>. When your lookup hits the tablet leader, it translates directly into RocksDB operations. The dual structure: mutable KV store plus changelog gives you both point-in-time lookups and CDC streams. This isn’t accidental, it’s architected specifically for patterns like DeltaJoin.</p><figure><img alt="Comparison of regular join vs DeltaJoin. On the left, two Kafka streams feed into a regular join operator that keeps left and right state internally, leading to large RocksDB state. On the right, DeltaJoin receives changelogs from Fluss, performs index lookups instead of maintaining state, and avoids holding left and right state inside the operator." src="https://cdn-images-1.medium.com/max/1024/1*owVyBr08YoQHYpPsFY5uTA.png" /><figcaption>Regular joins bloat operator state by holding both sides in memory, while DeltaJoin externalizes history into Fluss. Instead of hoarding state, each side emits a changelog and probes the index on demand.</figcaption></figure><p>DeltaJoin builds on two critical innovations. <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin"><strong>FLIP-486</strong></a> provides the core operator StreamingDeltaJoinOperator with bilateral LRU caches and the planner intelligence to know when to use them. When a record arrives on either side, it checks its cache, then triggers an async probe on miss. Two AsyncDeltaJoinRunner instances handle the bilateral lookups, each maintaining its own cache to avoid constant external calls.</p><p><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode"><strong>FLIP-519</strong></a><strong> solves the harder problem: async ordering chaos</strong>. The KeyedAsyncWaitOperator ensures updates for the same key process sequentially while different keys run concurrently. Without this, your upserts would corrupt faster than a politician&#39;s promises. It&#39;s the difference between eventual consistency and eventual catastrophe.</p><figure><img alt="Diagram of the KeyedAsyncWaitOperator. Three updates for key A (A1, A2, A3) arrive at times T1, T2, and T3. Even though the async tasks overlap in time, the operator enforces output order as A1 → A2 → A3. This ensures same-key updates are emitted sequentially while maintaining concurrency across different keys." src="https://cdn-images-1.medium.com/max/1024/1*LPNNsUIPzDnyE9MF11EIZQ.png" /><figcaption>The KeyedAsyncWaitOperator serializes updates per key while letting different keys run concurrently—turning async chaos into ordered streams.</figcaption></figure><h3>Configuration: Where Dreams Meet Reality</h3><p>Your setup starts simple:</p><pre>SET &#39;table.optimizer.delta-join.strategy&#39; = &#39;AUTO&#39;;<br>SET &#39;table.exec.async-lookup.key-ordered-enabled&#39; = &#39;true&#39;;<br>SET &#39;table.exec.async-lookup.output-mode&#39; = &#39;ALLOW_UNORDERED&#39;;<br>-- DeltaJoin caches are configurable per side; tune per workload</pre><p>But <strong>the optimizer is picky</strong>. Between source and join, only stateless operators survive the rewrite: scans with pushdowns, key-preserving calcs, watermarks, exchanges. One stateful operator in the chain? Back to traditional joins and their state baggage. No cascaded joins yet — the planner stays conservative.</p><p>Changelog streams face tighter scrutiny. <strong>Streams heavy in </strong><strong>UPDATE_BEFORE operations get rejected</strong>—DeltaJoin can&#39;t reconstruct &quot;before&quot; values from point lookups. Delete operations follow strict type-based rules: INNER joins tolerate deletes from one side only; LEFT/RIGHT joins only from the outer side. The planner stays conservative, choosing correctness over optimization every time.</p><figure><img alt="able listing DeltaJoin limitations. Topology: only stateless operators allowed between source and join. Multi-join: cascaded joins not supported, only a single DeltaJoin per plan. Changelog support: streams with UPDATE_BEFORE are rejected. Delete semantics: for INNER joins, deletes allowed only on one side; for LEFT/RIGHT joins, deletes allowed only from the outer side. Consistency: eventual consistency without snapshot isolation. Cache: bounded LRU caches per side, cache misses trigger async pr" src="https://cdn-images-1.medium.com/max/1024/1*4YQhlkT035xL9D--KXjWXA.png" /><figcaption><em>Current limitations of Flink’s DeltaJoin operator across topology, semantics, and consistency.</em></figcaption></figure><p><strong>Your Fluss table design determines performance</strong>:</p><pre>CREATE TABLE fluss_customers (<br>    customer_id BIGINT,<br>    region_id INT,<br>    order_count BIGINT,<br>    customer_data STRING,<br>    PRIMARY KEY (customer_id, region_id) NOT ENFORCED<br>) WITH (<br>    &#39;bucket.num&#39; = &#39;16&#39;,<br>    &#39;bucket.key&#39; = &#39;customer_id&#39;,  -- single-tablet path for lookups by customer<br>    &#39;table.log.ttl&#39; = &#39;7d&#39;          -- correct option name for retention<br>);</pre><p><strong>PK tables default to hash bucketing by primary key (excluding partition keys) unless you set </strong><strong>bucket.key</strong>. This configuration is mission-critical—set bucket.key = &#39;customer_id&#39; and every probe hits a single tablet. Skip this step, and Fluss distributes your lookups across the cluster.</p><h3>The Architecture Wars: Different Philosophies, Same Problem</h3><p>Here’s where the streaming world splits into camps. <strong>RisingWave takes the “streaming database” approach: </strong>it’s PostgreSQL-wire-compatible and maintains all join state in shared storage with tiered caching. When you write a join in RisingWave, you’re essentially creating a materialized view that updates incrementally. The engine manages multi-version concurrency control (MVCC) internally, giving you <strong>snapshot consistency by default</strong>. Your joins always see a consistent view of both sides, but you pay for it with storage overhead and checkpoint coordination across actors.</p><p>RisingWave’s architecture is fascinating: <strong>compute nodes are stateless</strong>, with all state living in <a href="https://risingwave.com/blog/hummock-a-storage-engine-designed-for-stream-processing/">Hummock</a> shared storage (typically S3-compatible object stores). Hot data stays in multi-level caches: block cache, meta cache, and optional disk cache (NVMe/EBS). When a join processes a record, it might hit memory, local SSD, or remote storage. The consistency protocol ensures all actors see the same epoch, preventing temporal anomalies. But this coordination has a cost: <strong>checkpoint barriers must traverse all actors, with latency bounded by the slowest operator and storage I/O</strong>.</p><p>Their Disk Cache shows impressive results, <strong>up to 94% fewer remote reads and ~75% fewer S3 GETs in their tests</strong>, drastically reducing both latency and cloud costs. <a href="https://risingwave.com/blog/atome-risingwave-real-time-risk-management-case-study/"><strong>Atome</strong></a><strong>, </strong>a BNPL payments company<strong>, </strong>migrated parts of their Flink-based pipeline to<strong> RisingWave</strong> specifically for operational simplicity and consistency guarantees.</p><p><strong>Feldera goes even further with incremental everything</strong>. Built on Differential Dataflow principles (<a href="https://sigmodrecord.org/publications/sigmodRecord/2403/pdfs/20_dbsp-budiu.pdf">DBSP</a>), Feldera doesn’t just maintain state — it maintains the entire computation graph as incremental operators. Every join is a <a href="https://medium.com/@feldera/database-computations-on-z-sets-5a1c32fbc0d0"><strong>Z-set </strong></a><strong>transformation</strong> that tracks insertions and deletions as weighted updates. When you query a join in Feldera, you’re reading from a pre-computed, constantly updating index. <strong>Feldera r</strong><a href="https://www.feldera.com/blog/backfill-explained"><strong>ecently rounded this out with first-class backfill orchestration</strong></a>, <strong>labeled connectors and staged historical-then-realtime ingest — so bootstrapping big state is part of the product, not a runbook</strong>.</p><p>Feldera’s advantage comes from maintaining the entire computation graph as incremental state. When you join the result of Join A with Join B (multi-hop), Feldera doesn’t recompute Join A from scratch, it maintains Join A’s output as a differential index that updates incrementally. Each downstream join operates on pre-materialized, indexed results from upstream joins. For recursive queries like supply chain tracing or graph algorithms, Feldera’s <strong>pre-computation model</strong> can outperform lookup-based approaches on multi-hop or recursive queries, because each iteration reuses <strong>incrementally maintained indexes</strong> instead of issuing fresh lookups or recomputing upstream joins.</p><figure><img alt="Three-column comparison of streaming architectures. Left column: Flink + Fluss — minimal operator state, external KV and log storage. Middle column: RisingWave — stateless compute actors, shared storage with multi-tiered cache. Right column: Feldera — incremental indexes and pre-materialized arrangements. Each column is visually boxed with corresponding project logos." src="https://cdn-images-1.medium.com/max/1024/1*bPbknmFCxz6DqXmSWHJu1g.png" /><figcaption>Different philosophies of state management: Flink + Fluss use minimal operator state with external KV + log, RisingWave relies on stateless compute actors over shared storage, while Feldera pre-materializes results using incremental indexes.</figcaption></figure><p><strong>DeltaJoin + Fluss takes the opposite bet</strong>: externalize everything. No internal state management, no MVCC overhead, no incremental index maintenance. Just probes. The philosophical triangle is clear:</p><ul><li><strong>Flink+Fluss (DeltaJoin)</strong>: “probe on demand, minimal job state, eventual consistency”</li><li><strong>RisingWave</strong>: “MVCC + materialized views over shared storage for snapshot-consistent results”</li><li><strong>Feldera</strong>: “pre-materialize and index every intermediate; strongest for multi-hop/recursive”</li></ul><h3>The Performance Reality Check</h3><p><strong>RisingWave checkpoints can reach 100–200MB/s throughput</strong> with consistent latency, but checkpoint size grows linearly with state. A billion-row dimension table means gigabytes of checkpoint data, even with compression. Recovery requires rebuilding consistent state across all actors, think minutes for large deployments. However, the shared storage model enables <strong>parallel recovery</strong>: new actors can start processing immediately while warming their caches asynchronously.</p><p><strong>Feldera’s checkpoints are incremental diffs</strong>, often smaller than RisingWave’s, but the storage footprint is significant. That same billion-row table requires substantial storage to maintain efficient Z-set indexes, and that’s before considering join output cardinality. The payoff? <strong>Sub-millisecond query latency</strong> on pre-computed results. Feldera can process millions of updates per second on a single node for moderately complex queries, but each join multiplies storage requirements.</p><p><strong>DeltaJoin checkpoints are tiny compared to stream-stream joins, </strong>it’s just cache entries and in-flight requests. Recovery means warming caches, not rebuilding state. The billion-row table lives in Fluss, accessed on-demand. The trade-off: <strong>uncached lookups add low single-digit milliseconds of latency</strong>, and cache misses during traffic spikes can cascade into backpressure. With proper cache configuration and hot-key patterns, <strong>high cache hit rates keep most lookups fast</strong>.</p><p>The implementation uses <strong>pooled result handlers and bounded in-flight async probing</strong> to manage resources efficiently. Failed lookups retry with exponential backoff, while partial caching helps balance memory usage against lookup frequency.</p><figure><img alt="A table comparing three streaming systems. RisingWave has large checkpoints, slow recovery, moderate latency, and medium memory/storage use. Feldera has medium checkpoints, moderate recovery, fast latency, and high storage requirements for large differential indexes. DeltaJoin+Fluss has small checkpoints, fast recovery, low latency, and low memory usage limited to LRU caches." src="https://cdn-images-1.medium.com/max/1024/1*A1A_M3xGy2FqAnWRUPdmXA.png" /><figcaption>Performance trade-offs matrix comparing RisingWave, Feldera, and DeltaJoin+Fluss across checkpoint size, recovery time, latency, and memory/storage footprint.</figcaption></figure><h3>Real-World Patterns and Anti-Patterns</h3><p>Choose RisingWave when you need<strong> SQL compatibility and epoch-based snapshot consistency </strong>for analytics and dashboards. Financial dashboards, operational analytics, anywhere business users write SQL directly. The PostgreSQL compatibility means your BI tools just work. The MVCC consistency means your reports always balance. The trade-off is operational complexity: you’re running a distributed database, not just a stream processor.</p><p><strong>Choose Feldera for complex, multi-hop incremental queries</strong>. Graph analytics, recursive CTEs, anything where you’re joining join results with other join results. Say a logistics company tracking packages through their network, like packages join with trucks, trucks join with routes, routes join with weather, all joining with real-time GPS. In Feldera, this entire graph updates incrementally. In DeltaJoin, you’d need millions of lookups per second, each adding latency.</p><p><strong>Choose DeltaJoin + Fluss for high-volume enrichment with bounded dimensions</strong>. User profile lookups, feature assembly, entity resolution, essentially the cases where one side is orders of magnitude larger than the other. <a href="https://www.taobao.com/">Taobao</a> enriching millions of events per second with user profiles and advertiser configs sees <a href="https://fluss.apache.org/blog/taobao-practice/">dramatic state reduction</a>. <strong>Traditional join state would be massive. With DeltaJoin + Fluss? Just cache overhead</strong>, with dimension data living in Fluss’s optimized storage.</p><figure><img alt="A table compares three stream processing systems. RisingWave is suitable when SQL and snapshot consistency are required, such as dashboards, but should be avoided if you want to minimize database-like operational overhead. Feldera is good for multi-hop or recursive incremental queries but less suited when storage and I/O budgets are tight. DeltaJoin + Fluss works well for high-volume enrichment with bounded dimensions but struggles when dimensions change at very high frequency." src="https://cdn-images-1.medium.com/max/1024/1*b9Cm9B39cz1n7lzNgXMpTA.png" /><figcaption><em>Guidelines for choosing between RisingWave, Feldera, and DeltaJoin+Fluss: best-fit use cases and situations to avoid.</em></figcaption></figure><p>But know the anti-patterns. <strong>DeltaJoin fails catastrophically with high-cardinality, high-mutation dimensions</strong>. If your dimension table has millions of rows changing every second, your cache becomes useless, every join becomes a lookup, and your network becomes the bottleneck. Imagine real-time inventory tracking: every sale updates inventory, invalidating caches instantly. It’s a lot better to stick to traditional joins with aggressive TTLs in this case.</p><figure><img alt="Diagram showing DeltaJoin between Sales and Inventory streams with external indexed storage. Cache is constantly invalidated due to high churn, forcing every join to query external storage. Problems illustrated include near 100% cache miss rate, lookup storms from constant external queries, and network saturation from bandwidth exhaustion. Highlights why DeltaJoin is unsuitable for highly mutable dimension tables." src="https://cdn-images-1.medium.com/max/1024/1*lDz4fXxgvq70KsudOCtZ5Q.png" /><figcaption>Anti-pattern: using DeltaJoin with high-churn dimension tables causes cache misses, lookup storms, and network saturation.</figcaption></figure><h3>The Uncomfortable Consistency Question</h3><p>Here’s what the documentation confirms: <strong>DeltaJoin delivers eventual, not snapshot consistency by design</strong>. Each probe sees the latest row version at lookup time. Concurrent updates to both join sides create transient mismatches that eventually converge.</p><p>Consider this scenario: An order for customer C arrives at T1. DeltaJoin looks up customer C, gets version V1. At T2, customer C updates their address. At T3, another order arrives and gets version V2. <strong>Your output stream now has two orders for the same customer with different addresses, even though they were placed seconds apart</strong>. RisingWave and Feldera users might scoff — they get consistency guarantees built-in. But those guarantees aren’t free — they’re paid in checkpoint size, recovery time, and operational overhead.</p><figure><img alt="Diagram comparing DeltaJoin and Feldera consistency guarantees. In the DeltaJoin lane, Order O1 joins with customer version V1 at T1, and Order O2 joins with version V2 after the customer updates their address at T2 — resulting in inconsistent output for the same customer. In the Feldera lane, both orders are aligned to version V1 until the checkpoint, after which they converge to V2. Highlights the trade-off between eventual consistency (faster, lighter) and snapshot consistency (heavier checkp" src="https://cdn-images-1.medium.com/max/1024/1*BO80Lldx-m5wFRmlNkdAyw.png" /><figcaption>Eventual vs. snapshot consistency: DeltaJoin emits orders with different customer versions until checkpoint convergence, while systems like Feldera align versions at snapshot boundaries.</figcaption></figure><p>Your downstream must be built for this reality. <strong>Upsert sinks that handle duplicates gracefully</strong>. Idempotent operations that survive replays. Business logic that tolerates temporary inconsistencies. Monitoring that tracks convergence time, not just throughput. For many real-time analytics use cases, this trade-off is brilliant: you exchange perfect consistency for operational sanity. <strong>For financial reconciliation requiring ACID guarantees? Keep walking</strong>.</p><h3>The Future State</h3><figure><img alt="Futuristic and optimistic vision of Data Streaming Architecture" src="https://cdn-images-1.medium.com/max/1024/1*hjxdmacafXUgCwWH_siImg.png" /><figcaption>Data Future is bright and shiny with these new tools!</figcaption></figure><p>While Fluss 0.6 introduced prefix lookups, <strong>version 0.8 targets full Flink 2.1/DeltaJoin integration</strong> as a key milestone. The roadmap includes potential optimizations like automatic bucket key inference and adaptive cache sizing, though specifics remain subject to change.</p><p><strong>RisingWave is doubling down on cloud-native architecture</strong>, with their Elastic Disk Cache showing significant S3 I/O reductions, investing a lot of effort in streaming <a href="https://iceberg.apache.org/"><strong>Iceberg</strong></a>. <strong>Feldera continues pushing incremental computation boundaries</strong>, with their backfill orchestration making it easier to bootstrap complex stateful pipelines.</p><p>The streaming world isn’t converging on a single solution — it’s diverging into specialized tools for different problems. Traditional approaches like Flink’s disaggregated state still treat state as job-private, offloaded but isolated. <a href="https://materialize.com/">Materialize</a>, Feldera, and RisingWave maintain rich indexed state within the engine, offering stronger consistency and incremental processing with smart intermediate caches at higher operational cost and a whole different set of tradeoffs.</p><p><strong>DeltaJoin + Fluss isn’t trying to be RisingWave or Feldera</strong>. It’s solving a specific problem: making traditional Flink joins scale without the traditional state explosion. For teams already invested in Flink, facing the state management wall, this combination offers a pragmatic escape route without switching engines entirely.</p><p>The constraints are real: eventual consistency, strict topology requirements, and the need for cooperative storage. But for teams willing to embrace these trade-offs, <strong>the payoff is substantial: joins that actually scale, checkpoints measured in seconds not hours, and operators who can sleep through the night</strong>.</p><blockquote>Shoutout to the Flink, Fluss, Feldera, and RisingWave teams for quietly revolutionizing how we think about data in motion.</blockquote><blockquote>While the rest of tech argues about AI, these folks are solving the unglamorous problems that actually keep the world’s data flowing and making it look elegant in the process.</blockquote><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=1ab3d6be5c98" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/what-the-fuss-with-fluss-flink-delta-force-1ab3d6be5c98">What the Fuss with Fluss: Flink Delta Force</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[The SELECT FOR UPDATE Trap Everyone Falls Into]]></title>
            <link>https://medium.com/fresha-data-engineering/the-select-for-update-trap-everyone-falls-into-8643089f94c7?source=rss-a199772355b8------2</link>
            <guid isPermaLink="false">https://medium.com/p/8643089f94c7</guid>
            <category><![CDATA[sql]]></category>
            <category><![CDATA[postgresql]]></category>
            <category><![CDATA[data]]></category>
            <category><![CDATA[database]]></category>
            <category><![CDATA[data-engineering-101]]></category>
            <dc:creator><![CDATA[Anton Borisov]]></dc:creator>
            <pubDate>Mon, 08 Sep 2025 08:57:03 GMT</pubDate>
            <atom:updated>2025-09-08T08:57:03.741Z</atom:updated>
            <content:encoded><![CDATA[<h4>You Probably Are Too</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/926/1*KLeWhEHCaXePi8qZcCj6Bw.png" /></figure><p>I wasn’t sure if I should write this article. After all, <a href="https://www.postgresql.org/docs/current/explicit-locking.html">row-level locking</a> in <a href="https://www.postgresql.org/">PostgreSQL</a> isn’t exactly breaking news — these features have been around since 2013. But here’s the thing: I’ve been asked about SELECT FOR UPDATE performance issues three times this month alone. Different companies, different engineers, same fundamental misunderstanding.</p><p>I have a simple rule for deciding whether to write about something: Would this article have saved me hours of debugging 2–5 years ago? If yes, then it’s worth writing, even if it seems obvious to me now. The truth is, we don’t write technical articles because we just discovered something cool and can’t wait to share. We write them for that developer who’s currently staring at a deadlock graph at 2 AM, wondering why their perfectly reasonable looking code is causing production issues.</p><p>So if you’ve worked with PostgreSQL in any production environment with <a href="https://www.postgresql.org/docs/current/mvcc-intro.html">concurrent</a> transactions, you’ve likely reached for SELECT FOR UPDATE to prevent race conditions. It seems like the obvious choice — after all, if you&#39;re going to update a row, you should lock it first, right?</p><p>Here’s the uncomfortable truth: SELECT FOR UPDATE is probably the wrong tool for your use case, and it&#39;s silently destroying your application&#39;s concurrency. Let me show you why this seemingly logical choice is actually a historical artifact that can bring your database to its knees.</p><h3>The Race Condition We’re All Trying to Solve</h3><p>Let’s start with why developers reach for row-level locking in the first place. Picture this scenario: you’re building an e-commerce platform where multiple users might try to purchase the last item in stock simultaneously.</p><p>Without proper locking, here’s what happens at the default READ COMMITTED <a href="https://www.postgresql.org/docs/current/transaction-iso.html">isolation level</a>:</p><pre>- Transaction A reads inventory<br>SELECT quantity FROM inventory WHERE product_id = 123;<br>-- Returns: quantity = 1<br><br>-- Meanwhile, Transaction B also reads the same row<br>SELECT quantity FROM inventory WHERE product_id = 123;<br>-- Also returns: quantity = 1<br><br>-- Both transactions think there&#39;s stock available<br>-- Transaction A updates<br>UPDATE inventory SET quantity = 0 WHERE product_id = 123;<br><br>-- Transaction B updates (overwrites A&#39;s change!)<br>UPDATE inventory SET quantity = 0 WHERE product_id = 123;</pre><figure><img alt="" src="https://cdn-images-1.medium.com/max/924/1*bqfI1a0T11IvFYZ00T__jg.png" /><figcaption>Yep, exactly so</figcaption></figure><p>Both customers think they got the item. Your warehouse is confused. Customer service is having a bad day. This is the classic “lost update” problem.</p><p>The traditional solution looks elegant:</p><pre>BEGIN;<br>SELECT quantity FROM inventory <br>WHERE product_id = 123 <br>FOR UPDATE;  -- Lock the row!<br><br>-- Now safely check and update<br>UPDATE inventory SET quantity = quantity - 1 <br>WHERE product_id = 123 AND quantity &gt; 0;<br>COMMIT;</pre><p>Problem solved, right? Not quite. This solution works, but it’s using a sledgehammer where a scalpel would do.</p><h3>The Four Horsemen of Row Locking</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/916/1*w0ICdA5guRnzymn76_w-Fg.png" /><figcaption>PostgreSQL row locks. Gloomy, gothic, yours…</figcaption></figure><p>Here’s where things get interesting. PostgreSQL doesn’t just have “locked” and “unlocked” states for rows. Since version 9.3, PostgreSQL actually has four distinct <a href="https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE">row-level lock modes</a>:</p><ol><li><strong>FOR KEY SHARE</strong> — The gentlest lock, used by foreign key checks</li><li><strong>FOR SHARE</strong> — Prevents modifications but allows other shared locks</li><li><strong>FOR NO KEY UPDATE</strong> — Prevents deletion but allows non-key updates</li><li><strong>FOR UPDATE</strong> — The nuclear option, preventing almost everything</li></ol><p>These aren’t arbitrary distinctions. They form a carefully designed hierarchy that enables PostgreSQL to maintain referential integrity while maximizing concurrency. The breakthrough insight was recognizing that not all updates are created equal.</p><p>Consider this example relationship:</p><pre>CREATE TABLE accounts (<br>    account_id BIGINT PRIMARY KEY,<br>    balance DECIMAL(10,2),<br>    status VARCHAR(20)<br>);<br><br>CREATE TABLE transactions (<br>    transaction_id BIGINT PRIMARY KEY,<br>    account_id BIGINT REFERENCES accounts(account_id),<br>    amount DECIMAL(10,2)<br>);</pre><p>When you insert a transaction, PostgreSQL needs to ensure the referenced account won’t disappear. But should that prevent you from updating the account’s balance? Of course not! The foreign key only cares that the account continues to exist with the same ID.</p><h3>The Hidden Cost of FOR UPDATE</h3><p>Here’s the critical insight that most developers miss: <strong>SELECT FOR UPDATE doesn&#39;t match what </strong><strong>UPDATE actually does</strong>.</p><p>When you execute an UPDATE statement, PostgreSQL is smart about which lock it acquires:</p><ul><li>If you’re changing a primary key or unique column: FOR UPDATE lock</li><li>If you’re changing any other column: FOR NO KEY UPDATE lock</li></ul><p>But when you manually write SELECT FOR UPDATE, you&#39;re always getting the strongest lock, even when you don&#39;t need it.</p><p>Let me demonstrate the catastrophic effect this can have:</p><pre>-- Session 1: Lock an account to update its balance<br>BEGIN;<br>SELECT * FROM accounts <br>WHERE account_id = 1001 <br>FOR UPDATE;<br><br>-- Session 2: Try to insert a transaction<br>-- This will BLOCK even though we&#39;re not changing the key!<br>INSERT INTO transactions (transaction_id, account_id, amount) <br>VALUES (5001, 1001, 100.00);<br>-- Waiting... waiting... waiting...</pre><p>Session 2 is blocked not because there’s a logical conflict, but because we used the wrong lock level. The correct approach:</p><pre>-- Session 1: Use the appropriate lock level<br>BEGIN;<br>SELECT * FROM accounts <br>WHERE account_id = 1001 <br>FOR NO KEY UPDATE;  -- This is what UPDATE would use!<br><br>-- Session 2: Insert proceeds without blocking<br>INSERT INTO transactions (transaction_id, account_id, amount) <br>VALUES (5001, 1001, 100.00);<br>-- Success! No waiting!</pre><h3>Real-World Impact: A Production Horror Story</h3><p>Back in the past I consulted my friend working for fintech startup that was experiencing mysterious deadlocks and timeout errors during peak hours. Their payment processing system would grind to a halt whenever transaction volume spike</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/910/1*qWJAFG78NGHQ-kkYtySg5w.png" /><figcaption>Financial = fintech support, good pun</figcaption></figure><p>The culprit? A seemingly innocent piece of code:</p><pre>-- Their original code<br>BEGIN;<br>-- Lock user account for payment processing<br>SELECT * FROM user_accounts <br>WHERE user_id = $1 <br>FOR UPDATE;<br><br>-- Process payment logic here<br>-- Update account balance<br>UPDATE user_accounts <br>SET balance = balance - $2,<br>    last_transaction = NOW()<br>WHERE user_id = $1;<br>COMMIT;</pre><p>Meanwhile, their audit system was trying to insert records:</p><pre>INSERT INTO audit_log (user_id, action, timestamp)<br>VALUES ($1, &#39;payment_initiated&#39;, NOW());</pre><p>The audit table had a foreign key to user_accounts. Every payment would block audit logging, which would block other systems, creating a cascade of locks throughout their database.</p><p>The fix was embarrassingly simple:</p><pre>-- Fixed code<br>BEGIN;<br>SELECT * FROM user_accounts <br>WHERE user_id = $1 <br>FOR NO KEY UPDATE;  -- Changed this one line<br><br>-- Rest of the code unchanged<br>UPDATE user_accounts <br>SET balance = balance - $2,<br>    last_transaction = NOW()<br>WHERE user_id = $1;<br>COMMIT;</pre><p>Result: 70% reduction in lock wait times, complete elimination of deadlocks, and their system could handle 3x the transaction volume.</p><h3>The Decision Tree: Which Lock Do You Actually Need?</h3><p>Here’s a practical guide for choosing the right lock level:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*_-Y1_n3b0x7itHBGoNks_A.png" /><figcaption>PostgreSQL row-lock decision tree: choose the weakest lock that still guarantees correctness — FOR UPDATE, FOR NO KEY UPDATE, FOR SHARE, or FOR KEY SHARE.</figcaption></figure><p>In practice, 90% of the time you want FOR NO KEY UPDATE. The name is admittedly terrible (a historical artifact from the PostgreSQL 9.3 upgrade that maintained backward compatibility), but it&#39;s almost always the right choice.</p><h3>Practical Recommendations</h3><ol><li><strong>Default to </strong><strong>FOR NO KEY UPDATE</strong>: Unless you&#39;re specifically deleting rows or modifying primary keys, this is your go-to lock.</li><li><strong>Reserve </strong><strong>FOR UPDATE for actual deletions</strong>: The only time you truly need FOR UPDATE is when you&#39;re about to DELETE or modify a column that could be referenced by a foreign key.</li><li><a href="https://www.postgresql.org/docs/current/monitoring-locks.html"><strong>Monitor</strong></a><strong> lock waits</strong>: Use pg_stat_activity and pg_locks to identify when lock contention is happening:</li></ol><pre>SELECT blocked.pid,<br>       blocked.query as blocked_query,<br>       blocking.query as blocking_query<br>FROM pg_stat_activity AS blocked<br>JOIN pg_stat_activity AS blocking <br>  ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))<br>WHERE blocked.wait_event_type = &#39;Lock&#39;;</pre><p>4.<strong> Document your locking strategy</strong>: Make it clear in your code why you’re choosing a <a href="https://www.cybertec-postgresql.com/en/row-locks-in-postgresql/">specific lock level</a>. Your future self (and your teammates) will thank you.</p><h3>The Takeaway</h3><p>SELECT FOR UPDATE is a powerful tool that&#39;s almost always the wrong choice. It&#39;s like using a sledgehammer to hang a picture frame — it&#39;ll work, but you&#39;ll damage the wall and annoy the neighbors.</p><p>The next time you’re tempted to write SELECT FOR UPDATE, pause and ask yourself: &quot;Am I actually going to delete this row or change its primary key?&quot; If the answer is no, use FOR NO KEY UPDATE instead. Your database will handle more concurrent operations, your users will experience fewer timeouts, and your ops team will sleep better at night.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*12MQZf1YQ14rbhGtpB_nmg.png" /></figure><p>Remember: in PostgreSQL, the lock level should match your actual intent. Don’t let a historical naming convention trick you into over-locking your data. Your application’s performance depends on it.</p><p><strong>UPDATE:</strong><br>I just discovered Laurenz Albe’s <a href="https://www.cybertec-postgresql.com/en/select-for-update-considered-harmful-postgresql/"><em>SELECT FOR UPDATE considered harmful (PostgreSQL)</em></a> — independently reaching same conclusion.</p><p>For a crisp rationale/history, read his; for a deeper, production-oriented guide — decision-tree, FK blocking examples, monitoring queries, and explain-driven tuning — stay here.</p><p>Both: prefer FOR NO KEY UPDATE unless deleting or changing FK-referenced keys.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=8643089f94c7" width="1" height="1" alt=""><hr><p><a href="https://medium.com/fresha-data-engineering/the-select-for-update-trap-everyone-falls-into-8643089f94c7">The SELECT FOR UPDATE Trap Everyone Falls Into</a> was originally published in <a href="https://medium.com/fresha-data-engineering">fresha-data-engineering</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
    </channel>
</rss>