fix: prevent duplicate offset assignment with etcd lease-based partition ownership#123
Merged
novatechflow merged 1 commit intoKafScale:mainfrom Mar 1, 2026
Conversation
d5f8206 to
4f77b6d
Compare
novatechflow
approved these changes
Mar 1, 2026
Collaborator
novatechflow
left a comment
There was a problem hiding this comment.
Thank you @klaudworks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
fixes #122 #120, prepares for #115.
Multiple brokers can receive produce requests for the same partition concurrently. Each broker reads the starting offset from etcd once, increments in-memory, and writes it back as a simple PUT at flush time. When two brokers do this for the same partition, one silently overwrites the other's messages in S3.
How it works
Each partition is now exclusively owned by one broker at a time, enforced through etcd.
Broker side: Before writing to a partition, the broker claims ownership by creating an etcd key (
/kafscale/partition-leases/{topic}/{partition}) using an atomic compare-and-swap — only succeeds if no other broker has claimed it. The key is attached to an etcd lease with a 10-second TTL. The broker sends periodic heartbeats to keep the lease alive. If the broker crashes and stops heartbeating, etcd expires the lease after 10 seconds and deletes the key, allowing another broker to take over. On graceful shutdown, the broker revokes the lease immediately so failover is instant. All partition keys share a single etcd lease, so there is only one heartbeat stream per broker regardless of how many partitions it owns.Proxy side: The proxy watches the etcd lease keys and maintains an in-memory routing table mapping each partition to its owning broker. Produce requests are routed directly to the right broker. If the routing table is stale and a broker rejects a request with
NOT_LEADER_OR_FOLLOWER, the proxy retries on a different broker (up to 3 attempts). Requests spanning partitions on different brokers are split, forwarded concurrently, and responses are merged back into a single response for the client.Changes
Broker: etcd lease-based partition ownership. New
PartitionLeaseManageracquires and manages partition leases.handleProducechecks ownership before writing. Leases are released immediately on graceful shutdown.Proxy: partition-aware produce routing. New
PartitionRouterwatches lease keys for the routing table. New produce routing path splits, forwards, and merges requests by owning broker. Per-client connection pool avoids re-dialing the same broker on every request.Protocol: produce request/response codec. When a produce request targets partitions on different brokers, the proxy needs to split it into smaller requests (one per broker) and merge the individual responses back into a single response for the client. This required adding serialization and deserialization for produce requests and responses. Round-trip tested across protocol versions 3-10.
Test infrastructure. Extracted embedded etcd setup into
internal/testutilfor reuse acrossetcd_store_test.go,partition_lease_test.go, andpartition_router_test.go.Tests
I could cover the multi-broker behavior well despite relying only on the in-memory etcd.
Follow-up
We should use the same establishes lease mechanism to route consumers to the correct broker for fetches and consumer group coordination. This would fix consumer group coordination #121 and at the same time guarantee cache hits for consumers because they read from the broker that serves the partition and has the relevant segments in cache already. This would spare us all the complex performance optimizations I implemented for #115 and didn't publish yet.