<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Giannis Polyzos on Medium]]></title>
        <description><![CDATA[Stories by Giannis Polyzos on Medium]]></description>
        <link>https://medium.com/@ipolyzos_?source=rss-1427b7a7e5e5------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/1*yFOYLKnlKrMwyOgMNc_pVQ.jpeg</url>
            <title>Stories by Giannis Polyzos on Medium</title>
            <link>https://medium.com/@ipolyzos_?source=rss-1427b7a7e5e5------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Thu, 04 Jun 2026 08:32:41 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@ipolyzos_/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[Streams, Tables, and The Illusion of Duality]]></title>
            <link>https://medium.com/@ipolyzos_/streams-tables-and-the-illusion-of-duality-3aa973dbaad6?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/3aa973dbaad6</guid>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Tue, 19 Aug 2025 06:28:10 GMT</pubDate>
            <atom:updated>2025-08-19T07:31:58.911Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*pmVe6jgfqSME_HfaDTwuLA.png" /></figure><h4>What Is Stream/Table Duality?</h4><p>At its core:</p><ul><li>A <strong>stream</strong> is a never-ending log of changes 📜.</li><li>A <strong>table</strong> is the <em>materialized view</em> of those changes, the current state 🗄️.</li></ul><p>👉 Every table can be expressed as a stream of updates.<br>👉 Every stream of updates can be materialized into a table.</p><p>For example:</p><ul><li>A database emits INSERT, UPDATE, and DELETE events → <strong>stream</strong>.</li><li>Apply those events in order, and you reconstruct the <strong>table</strong>.</li><li>Conversely, capture every change to a table → you get the <strong>changelog stream</strong>.</li></ul><p>It’s bidirectional. That’s the duality. 💡</p><h4>⚙️ Core Properties of Duality</h4><p>To make this magic work, you need:</p><ol><li><strong>Changelog semantics: </strong>the stream must carry not just new rows, but also updates and deletes.</li><li><strong>Primary keys: </strong>so the system knows <em>which row</em> to update.</li><li><strong>Time as a first-class citizen</strong>: streams provide ordering, tables represent “state at time T.”</li><li><strong>Materialization</strong>: a table is a materialized view of a stream.</li><li><strong>Consistency: </strong>replaying the same stream always yields the same table.</li></ol><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*3QTjksdLcu-HRe6tG1wZhA.png" /></figure><h4>🛑 Where It Breaks Down</h4><p>Here’s where things get interesting.. Lots of work takes place in terms of integrating Apache Kafka with Apache Iceberg.</p><p>Some position this as a <strong>Stream/Table duality. </strong>Let’s break things down:</p><h4>🧊 Apache Iceberg</h4><p>Iceberg is a great open table format for managing <strong>append-only data</strong>.</p><ul><li>But ❌ it does <strong>not enforce primary keys</strong>.</li><li>❌ It doesn’t natively support row-level updates or deletes in streaming mode.</li><li>That means you can’t get true stream/table duality; you only get <em>snapshots of appended data</em>.</li></ul><h4>📦 Apache Kafka</h4><ul><li>Kafka is an <em>event log</em>, not a changelog.</li><li>By default, it just stores raw events; it doesn’t have the notion of updates or deletes.</li><li>So ❌ Kafka alone is <em>not</em> a changelog stream.</li></ul><blockquote>But…</blockquote><h4>✅ With Debezium or Flink CDC</h4><ul><li>You capture <strong>real database changes</strong> (with keys + operation types).</li><li>Kafka topics then carry true changelog streams.</li><li>From there, Flink can materialize those streams into tables.</li></ul><blockquote>Kafka is an <em>event log</em>, not a changelog.</blockquote><p>This is particularly important to highlight because this is where things get complex downstream.</p><p>Stream processing engines like <strong>Apache Flink </strong>build<strong> </strong>on the<strong> concept of the changelog</strong>.</p><p>Since Kafka doesn’t generate a changelog, <strong>Flink requires adding an expensive operator</strong> (ChangelogNormalize) as an intermediate step to normalize data from Kafka, <strong>resulting in heavy states and redundant storage</strong>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*mnuNpl-JPiZnyEKcp1DIgw.png" /></figure><p>Moreover, this changelog is not reused, which means that if you consume the same Kafka topic to perform different operations in different jobs, <strong>each job requires creating and storing that changelog in its state</strong>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*p317Z6RW5z_3OooLEMSi7w.png" /></figure><h4>🤔 So, Do Streams and Tables Need to Be in the Same System?</h4><p>This is something I have been debating with..</p><p>There are lots of systems that have at their core the concept of the Stream/Table duality, like <strong>Postgres</strong>, <strong>MySQL</strong>, <strong>Apache Beam</strong>, but also systems I’m working heavily with, like <strong>Apache Flink</strong>, <a href="https://paimon.apache.org/"><strong>Apache Paimon</strong></a>, and these days with<a href="https://fluss.apache.org/"> <strong>Apache Fluss</strong></a><strong>. </strong>So I guess<strong> </strong>it’s only natural to advocate in favor of <strong>“it needs to be in the same system”.</strong></p><p>Otherwise, I would argue it’s <strong>an integration between systems that can’t support both natively</strong>… However, since there is no formal definition, the answer should be, <strong>not necessarily</strong>.</p><p>Systems like <strong>Apache Flink, Kafka Streams, and others </strong>expose both APIs in one place, making duality seamless. But you can also have <strong>separate systems, I guess.</strong></p><p>As long as the <strong>ordering, keys, and changelog semantics</strong> are preserved, duality holds.</p><p>Based on what was discussed above, though, I doubt it’s fair to consider an integration between <strong>Apache Kafka &amp; Apache Iceberg </strong>as a <strong>Stream/Table duality.</strong></p><h3>🚀 The Takeaway</h3><ul><li><strong>Streams = the story</strong> (all the events that ever happened).</li><li><strong>Tables = the snapshot</strong> (what’s true right now).</li><li>Together, they form a powerful duality that underpins modern real-time analytics, stream processing, and data lakehouses.</li></ul><p>But beware 👀:</p><ul><li>Kafka ≠ changelog (unless CDC is involved).</li><li>Iceberg ≠ table duality (because no primary keys, only appends).</li></ul><p>There is more to this that goes further down different approaches and trends in the industry these days, but for now, that’s where I wanted to focus on.</p><p><strong>PS: </strong>If you are looking for a solution that builds on all the above principles and gives you extra benefits, like the ability to actually query streams, built-in caches, and more, <a href="https://fluss.apache.org/">Apache Fluss</a> might be of interest.</p><p>Keep on streaming 🌊 🤘</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=3aa973dbaad6" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[The Importance of Unified Batch and Streaming in Modern Data Processing.]]></title>
            <link>https://medium.com/@ipolyzos_/the-importance-of-unified-batch-and-streaming-in-modern-data-processing-09e40331f7a7?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/09e40331f7a7</guid>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Mon, 07 Oct 2024 07:10:54 GMT</pubDate>
            <atom:updated>2024-10-12T21:05:19.315Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*vEosZdsDKxqcDbHtWolmnQ.png" /></figure><p>In the past decade, data-driven companies have evolved from processing periodic batches of data to needing real-time insights from continuous streams. However, the journey hasn’t been seamless.</p><p>Historically, companies have been forced to use <strong>two technology stacks</strong> to handle <strong>batch</strong> and <strong>streaming</strong> workloads separately. This separation has resulted in various challenges, from increased operational complexity to delayed insights.</p><p>Today, we’re seeing a shift toward the <strong>unification of batch and streaming</strong> processing into a single stack. Unified data platforms offer the promise of managing both workloads seamlessly, reducing complexity and enabling real-time analytics at scale.</p><p><em>The following is a great presentation from Linkedin, exploring batch and streaming unification.</em></p><iframe src="https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2FImXFxB5hXs0%3Fstart%3D46%26feature%3Doembed%26start%3D46&amp;display_name=YouTube&amp;url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DImXFxB5hXs0&amp;image=https%3A%2F%2Fi.ytimg.com%2Fvi%2FImXFxB5hXs0%2Fhqdefault.jpg&amp;key=a19fcc184b9711e1b4764040d3dc5c07&amp;type=text%2Fhtml&amp;schema=youtube" width="854" height="480" frameborder="0" scrolling="no"><a href="https://medium.com/media/2f2ded95e42d4d693012030108922bb3/href">https://medium.com/media/2f2ded95e42d4d693012030108922bb3/href</a></iframe><p>In this article, we’ll explore why unifying batch and streaming is critical, what properties a modern unified data platform must have, and how it affects various layers of the data ecosystem — from ingestion and compute to storage and development.</p><h4>Separate Stacks for Batch and Streaming</h4><p>Traditionally, organizations have employed two distinct architectures to process data, each tailored to specific needs:</p><p><strong>Batch Processing</strong>:</p><ul><li>Typically used for large-scale, <strong>periodic processing</strong> of data (e.g., nightly ETL jobs).</li><li>Systems like Apache Hadoop and later <strong>Apache Spark</strong> have been popular for running heavy batch workloads.</li><li><strong>Latency</strong> is generally higher, ranging from hour to day level, depending on how frequently the batch jobs are triggered.</li></ul><p><strong>Stream Processing</strong>:</p><ul><li>Designed for <strong>real-time</strong> or near-real-time processing, continuously consuming data from event streams (e.g., Apache Kafka).</li><li>Technologies like <a href="https://nightlies.apache.org/flink/flink-docs-master/"><strong>Apache Flink</strong></a> are widely used for low-latency, real-time analytics.</li><li><strong>Latency</strong> is typically in milliseconds or seconds, providing up-to-the-moment insights.</li></ul><p>The problem is that companies end up managing <strong>two separate infrastructures</strong> for ingesting, storing, and processing data. <strong>Data duplication</strong> often occurs, where the same data must be ingested and stored differently for batch and streaming pipelines, while <strong>operational overhead</strong> <strong>increases</strong> as teams maintain two technology stacks with different tools, frameworks, and performance trade-offs.</p><h4>The Importance of Unifying Batch and Streaming!</h4><p>With the growing demand for real-time data insights alongside historical batch processing, many organizations now recognize the need for <strong>unified data processing platforms</strong> that can handle both batch and streaming workloads efficiently.</p><p>Key reasons why unifying these two paradigms is essential include:</p><p><strong>Operational Efficiency</strong>: By converging batch and streaming, companies can reduce the complexity of managing separate infrastructures. This means fewer moving parts, less data duplication, and simplified operational management.</p><p><strong>Real-Time and Historical Data Processing</strong>: organizations can <strong>access and</strong> <strong>process both real-time and historical data</strong> without needing to switch between systems. This provides a complete picture, leveraging both real-time analytics with historical trends for better decision-making.</p><p><strong>Consistency Across Workloads</strong>: A unified platform ensures <strong>consistent processing models</strong> across both batch and streaming. This eliminates the discrepancies that arise from using different technologies with different data semantics, ensuring that data transformations applied to streams are also applied to historical data.</p><p><strong>Scalability and Flexibility</strong>: Unified architectures are inherently more <strong>scalable</strong> because they are designed to handle varying workloads — whether it’s processing a large batch job or handling high-throughput streams of data. Organizations gain flexibility to adjust resource allocation dynamically based on current workloads.</p><p><strong>Cost-Effectiveness</strong>: Maintaining two distinct systems leads to redundancy in terms of resources and infrastructure costs. Unifying batch and streaming on a single platform can significantly reduce infrastructure costs while optimizing resource usage.</p><h4>Key Properties of a Unified Data Stack</h4><p>To support both batch and streaming workloads efficiently, a <strong>unified data stack</strong> must have specific characteristics across all layers of the data pipeline, from ingestion to storage and compute.</p><p>Next, let’s see the necessary properties of a unified platform and how it addresses both batch and streaming needs.</p><h4>Unified Data Ingestion Layer</h4><p>The ingestion layer collects data from different sources — whether it’s event-driven data from Kafka, relational databases, or log files. In a unified architecture, this layer must:</p><ul><li><strong>Support both batch and streaming data sources</strong>: Whether the data arrives as a continuous stream (e.g., via Kafka or Pulsar) or as periodic batches (e.g., from flat files or databases), the ingestion framework should handle both seamlessly.</li><li><strong>Seamless CDC Integration</strong>: Change Data Capture (CDC) technology should be supported natively to keep the data lake synchronized with source systems in both real-time and batch scenarios.</li><li><strong>Latency Tolerance</strong>: The ingestion framework should accommodate low-latency streaming while also supporting high-throughput batch ingestion, with the ability to switch between or combine these modes as needed. We refer to this functionality as <strong>snapshot and incremental data reading</strong>.</li></ul><h4>Unified Compute Layer</h4><p>The compute layer is where data is processed, transformed, and enriched. A unified compute framework must:</p><ul><li><strong>Handle both real-time and batch processing</strong>: The compute engine must be capable of processing data in real-time streams and large-scale batch workloads using the same APIs and data models.</li><li><strong>Stateful Processing</strong>: Real-time processing often requires maintaining state (e.g., tracking user sessions or windowed aggregations). The compute engine must manage the state efficiently in both streaming and batch contexts.</li><li><strong>Event-Time Semantics</strong>: In streaming applications, the ability to process data based on the event time (when the event occurred) is crucial. Unified compute engines should maintain accurate event-time processing for both batch and streaming data.</li><li><strong>Exactly-Once Semantics</strong>: To ensure data consistency, the compute engine should support exactly-once processing guarantees, which are critical for both real-time and batch pipelines.</li></ul><p>It also offers capabilities to support batch processing, like a hybrid shuffle mode, adaptive and speculative execution, operator fusion codegen and more</p><h4>Unified Storage Layer</h4><p>The storage layer in a unified architecture needs to support data retention and retrieval in a way that accommodates both real-time and historical querying:</p><ul><li><strong>Transactional Support</strong>: Data storage systems should provide ACID transactions to ensure that both batch and streaming updates are consistent and atomic, even when different workloads write to the same data store.</li><li><strong>Efficient Querying</strong>: The storage layer should support low-latency querying for real-time data while also being optimized for large-scale batch processing.</li><li><strong>Schema Evolution and Flexibility</strong>: Since both batch and streaming data may undergo schema changes, the storage system must support schema evolution without breaking existing queries or applications.</li><li><strong>Time-Travel and Versioning</strong>: To enable access to historical snapshots alongside real-time data, the storage layer should support <strong>time travel</strong> — the ability to query past versions of the data — while also maintaining efficient storage for real-time updates.</li><li><strong>Streaming reads/writes: </strong>To enable real-time analytics, in addition to batch reads/writes, the storage needs to be able to support streaming read/writes. In addition, it also needs to support upserts efficiently as they are crucial for streaming data processing.</li></ul><h4>Unified Development and API Layer</h4><p>Another important aspect of a unified batch and streaming platform is the development experience. Engineers should be able to use the same development tools, languages, and APIs for both streaming and batch workloads:</p><ul><li><strong>Single API for Batch and Streaming</strong>: A unified platform should offer a single API that allows developers to write code once and run it in both batch and streaming contexts without changes.</li><li><strong>Unified Data Models</strong>: The data model for batch and streaming should be consistent, allowing data transformations, aggregations, and joins to work seamlessly across both paradigms.</li><li><strong>Extensibility and Integration</strong>: The platform should integrate easily with the broader data ecosystem, supporting plugins for various sources, sinks, and connectors (e.g., JDBC, Kafka, S3).</li></ul><h4>The Holy Grail Of Unified Batch and Stream Processing</h4><p><a href="https://nightlies.apache.org/flink/flink-cdc-docs-master/"><strong>Apache Flink CDC</strong></a>, <a href="https://flink.apache.org/"><strong>Apache Flink</strong></a>, and <a href="https://paimon.apache.org/"><strong>Apache Paimon</strong></a> are a powerful trio, that enable organizations to unify batch and streaming architectures.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*GGTsq2YPyg3_9DrfBLQiCQ.png" /></figure><p>Apache Flink provides a unified API for both batch and streaming processing, which means users can run the same set of code whether it’s batch or stream.</p><p>Apache Flink CDC is a framework that provides snapshot and incremental data ingestion. It provides also a seamless experience for schema synchronization, full database synchronization, and new table discovery.</p><p>Apache Paimon is a modern lake storage layer that supports streaming reads/writes, and batch reads/writes. It also provides ACID transactions, time travel, and schema evolution, making them ideal for unified architectures that handle both batch and streaming.</p><p>Moreover, Flink 1.20 introduced the <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/"><strong>Materialized Table</strong></a>, for simplifying the development of batch and streaming applications. The engine can intelligently decide if it’s a batch, incremental, or stream processing job.</p><h4>Conclusion</h4><p>As data needs continue to evolve, the separation between batch and streaming is becoming an unnecessary burden. Companies need both real-time insights and large-scale historical analysis in their data pipelines. By unifying batch and streaming into a single architecture, organizations can dramatically simplify their operations, reduce costs, and unlock new capabilities in real-time and historical analytics.</p><p><strong>Apache Flink CDC, Apache Flink,</strong> and <strong>Apache Paimon </strong>combined with the <strong>Materialized Table </strong>help shape this unified future by providing the necessary infrastructure and capabilities to support both types of workloads.</p><p>Whether you’re processing massive batch ETL jobs or handling high-frequency event streams, a unified stack can offer a scalable, consistent, and efficient way to manage your data.</p><p>By consolidating batch and streaming into a cohesive stack, companies can gain the agility needed to adapt to modern data demands, providing real-time insights and large-scale historical analysis with minimal complexity.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=09e40331f7a7" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[The majesty of Apache Flink and Paimon]]></title>
            <link>https://medium.com/@ipolyzos_/the-majesty-of-apache-flink-and-paimon-d36e73571fc9?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/d36e73571fc9</guid>
            <category><![CDATA[flink]]></category>
            <category><![CDATA[data-lakehouse]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[data-lake]]></category>
            <category><![CDATA[analytics]]></category>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Sun, 07 Jul 2024 08:46:26 GMT</pubDate>
            <atom:updated>2024-07-17T08:56:19.317Z</atom:updated>
            <content:encoded><![CDATA[<h3>The majesty of Apache Flink and Paimon.</h3><p>Why Apache Paimon is the go-to lakehouse storage for Apache Flink.</p><p>Roughly one year ago I discovered <a href="https://paimon.apache.org/">Apache Paimon</a> — Flink Table Store at the time.</p><p>As a stream processing practitioner and enthusiast — focusing on Apache Flink — I was always intrigued by what’s next and what innovations can help users unlock more use cases and do “more with less”.</p><p>At the time I was looking to explore what would be the most appropriate table format to adopt along with Apache Flink. I had done many Delta Lake along with Apache Spark back in the day, but for Flink, it was mainly between Apache Iceberg and Hudi.</p><p>Long story short, Apache Paimon came to be and I never looked back ever since, although it took me a while to grasp the full potential of the project.</p><p>In this blog post, I aim to <strong><em>share my thoughts</em></strong> on why I think Apache Paimon is the best lake storage for Apache Flink with the hope of helping engineers and decision-makers better understand <strong><em>the WHY</em></strong>.</p><h4>Setting the State ✅</h4><p>If you look into official <a href="https://paimon.apache.org/">Apache Paimon</a> documentation you will see it is a lake format and all the rich functionality the project provides.</p><p>But to fully grasp the true value as an analytical solution, we need to “zoom out” and try and see the bigger picture with <strong><em>Paimon as part of an ecosystem along with Apache Flink and Flink CDC</em></strong><em>.</em></p><blockquote><strong>A different approach to Paimon and as I like to say: </strong>Apache Paimon is not yet another table format, but it brings streaming data and analysis on the lake, by leveraging Lakehouse primitives. You can think of it as a<strong> gen2 lake format</strong> that extends Apache Flink’s capabilities on the lake.</blockquote><p>A “naive way” to think about it compared to other table formats and engines is the following:</p><blockquote>A traditional data warehouse versus a more realtime one like Clickhouse, that allows connecting upstream and downstream tables updating in realtime as new data arrives.</blockquote><p>Streaming data and analysis has been perceived to be something complex and expensive. At the same time, there is no good “low-cost” solution for use cases that require more than a few seconds of latency (&gt;30 seconds to a few minutes).</p><p>This means users <strong>can’t easily make cost/latency trade-offs</strong> and also experiment with streaming data easily before investing more into it.</p><blockquote><strong>People might wonder, why yet another lake format for Apache Flink?</strong> Delta Lake, Apache Iceberg and Apache Hudi are the three table formats, used along with Apache Spark and other query engines. Apache Flink though as a streaming first engine, has different requirements and can expand to more use cases compared to traditional batch oriented query engines. This alone sets a need for a lake storage that can unlock the full power of such an engine.</blockquote><p>The above are key points, I hope to address in this blog post and how Apache Flink and Paimon, can provide more for you.</p><p>But first, let’s start with some fun facts 😄.</p><p><a href="https://paimon.apache.org/">Apache Paimon</a> has its name from a character in <a href="https://genshin.hoyoverse.com/en/">Genshin Impact</a> and the following illustration summarizes the relationship between Apache Flink and Paimon 😊</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*am027VZ8ZmoAILPHFLLb4w.png" /></figure><blockquote><strong>Note:</strong> Apache Paimon has a sailfish as a logo, which holds the record for the highest speed marine animal.</blockquote><h4><strong>Apache Flink Heritage</strong></h4><p>Being born under the <a href="https://flink.apache.org/">Apache Flink</a> umbrella, it is a first-class citizen for Apache Flink as the compute engine; It leverages its sophisticated batch and stream capabilities to enable streaming data and analysis on the Lakehouse, by <strong><em>keeping latencies </em></strong>and<strong><em> resources low</em></strong>.</p><p>It uses <strong><em>LSM (Log Structured Merge Trees) </em></strong>as the underlying data structure and uses the concept of the bucket for data separation. This allows a single table abstraction to achieve <strong><em>Message Queue functionality</em></strong>, <strong><em>Range Scan Queries,</em></strong> and <strong><em>Key Value Lookups</em></strong>.</p><blockquote>It also provides partitioning that allows operating on slices of the data.</blockquote><p>One SQL via Flink, for Ad-Hoc (OLAP), Batch, and Streaming queries.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*8KaFriN0slejqPwaIH3VFA.png" /></figure><p>In the context of Apache Flink the term Unified is used to denote — a single engine for both Batch and Streaming data.</p><p>We can think of <strong><em>Batch as a specialized case of Stream</em></strong> and similarly<strong><em> OLAP as a specialized case of Batch</em></strong>.</p><p>Similarly, Apache Paimon inherits the term Unified lakehouse storage, i.e. a single storage layer abstraction to handle batch, OLAP, and streaming data.</p><h4><strong>LSM (Log-Structured Merge Tree)</strong></h4><p>LSM is the go-to data structure for high-speed data ingestion, used by many popular data systems like RocksDB, Cassandra, ScyllaDB, Clickhouse, and more. Paimon<strong><em> innovatively combines the LSM structure with the lake format, bringing real-time streaming updates into the lakehouse architecture. </em></strong>It supports columnar formats like parquet and orc but also row-based like avro.</p><p>When dealing with streaming data, we typically see two types of streams:</p><ol><li><strong><em>Changelog Streams:</em></strong> for dealing with upserts and CDC data, by specifying some <strong><em>primary key field</em></strong>.</li><li><strong><em>Append-Only Streams: </em></strong>for dealing with append-only data and keeping complete log data.</li></ol><p>Apache Paimon provides different table modes to account for different scenarios:</p><ul><li><strong>Primary Key Table: </strong>to handle updates and CDC data on the Lakehouse</li><li><strong>Append Table: </strong>For high-speed ingestion, when strict ordering guarantees are not a requirement; this table mode is not bound by the parallelism.</li><li><strong>Bucketed Append Table: </strong>Strict ordering guarantees are a requirement. By leveraging the concept of buckets <strong><em>it can act as a partition</em></strong><em> </em><strong><em>similar to a Kafka topic</em></strong> and <strong><em>allows implementing message queue functionality</em></strong> on the Lakehouse.</li></ul><h4><strong>Strong Integration with Apache Flink CDC</strong></h4><p>As a “best buddy to Apache Flink”, Paimon integrates seamlessly with <a href="https://medium.com/@ipolyzos_/a-glimpse-into-flink-cdc-3-0-a985fb5895a5">Apache Flink CDC</a> and fully automates the CDC Lakehouse ingestion.</p><p>It can handle all the upstream <strong><em>snapshot and incremental data reading</em></strong>, <strong><em>schema table changes</em></strong>, and perform a <strong><em>full database synchronization.</em></strong></p><p>Vendors like <a href="https://www.ververica.com/"><em>Ververica</em></a>, also provide more functionality, like <strong><em>new table discovery </em></strong>and<strong><em> source merge optimizations.</em></strong></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*t6UbdQKuKRjyCNJCNDsqAw.png" /></figure><p>Currently, it supports operational data stores like <strong><em>PostgreSQL</em></strong>, <strong><em>MySQL</em></strong>, <strong><em>MongoDB</em></strong>, and <strong><em>Amazon RDS</em></strong>.</p><p>At the same time, it also supports streaming storage layers, like <strong><em>Apache Kafka</em></strong>, <strong><em>Apache Pulsar,</em></strong> and <strong><em>Redpanda</em></strong>. For such systems, it supports several <a href="https://paimon.apache.org/docs/master/flink/cdc-ingestion/kafka-cdc/#supported-formats">cdc formats</a> and it can extract all the schema changes from the message payload.</p><h4><strong>Ingestion with Tags</strong></h4><p>When performing streaming CDC ingestion, data is synchronized in real time, but for <strong><em>historical analysis scenarios, it might be required to create daily views for analysis and time travel</em></strong>.</p><p>This is important for querying the state of your tables at different points in time or identifying what changed between different snapshots. This helps with use cases when you want to perform <strong><em>data backfills</em></strong>, <strong><em>identify data corruptions</em></strong>, or <strong><em>restore to previous versions</em></strong>.</p><p>For example, you might want to analyze business sales, across different quarters, identify data that has been corrupted or deleted, or reproduce business reports.</p><p>While doing CDC ingestion users can leverage <a href="https://paimon.apache.org/docs/master/maintenance/manage-tags/#manage-tags">Tags</a> to achieve the above.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*hGRrbs3Hp9FouczsXLW1Xg.png" /></figure><p>The important part of this is the <strong><em>file reuse mechanism</em></strong>, because of the LSM that helps with redundant storage costs.</p><p>Typically with other data lake/lakehouse solutions, when a tag is created, it needs <strong><em>to maintain a full copy of the data</em></strong> for that tag. For example, in scenarios that require to keep around data for 100 days — 100 copies of that data will need to be stored. <strong><em>Apache Paimon can leverage file reuse since it uses LSM</em></strong>.</p><p><strong><em>What is file reuse?</em></strong><em> </em>The LSM structure is as follows:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*GcraZngp7gAB0O814hSFJw.png" /></figure><p>The minor compaction of the LSM means that incremental data will only merge the files in the first few levels (levels 0 and 1 for example).</p><p>This means that the lower levels (which also have most of the data) will not participate in the compaction process(unless there is too much incremental data). These large files <strong><em>can be reused between multiple tags</em></strong> and <strong><em>will not cause redundant file storage amplifications.</em></strong></p><p>In the above scenario, that data needs to be stored for 100 days, Paimon’s file reuse will result in only 1 or 2 files of the data, instead of 100 files.</p><p>File reuse happens automatically via Paimon’s Snapshot management; the end user doesn’t have to consider it.</p><h4><strong>Cost-efficient Streaming ETL</strong></h4><p>Apache Paimon provides a rich variety of <a href="https://paimon.apache.org/docs/master/primary-key-table/merge-engine/">merge engines</a> that control, how data updates are handled. This is important when dealing with data mutation coming from changelog streams.</p><p>All states that need to be accumulated and stored, i.e. from computing aggregations; can now leverage object storage instead of Flink’s state.</p><p>This reduces local disk storage and managed memory cost requirements.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*JOaFH5pXSiOj4HHhM5FRhA.png" /></figure><p>Supported merge engines include:</p><ul><li><strong><em>Deduplication and First Row:</em> </strong>remove duplicates and you can choose whether you want to keep the latest or first entry respectively.</li><li><strong><em>Aggregation: </em></strong>a rich variety of aggregation functions (with more being added continuously). The end-user only needs to specify the aggregation function and Paimon will handle the rest.</li><li><strong><em>Partial-Update:</em></strong> It’s a more efficient way of dealing with table widening on the primary key. It allows for <strong><em>replacing expensive streaming joins</em></strong> and can be used along with the aggregation engine. The partial-update engine also provides built-in functionality via the <a href="https://paimon.apache.org/docs/master/primary-key-table/merge-engine/#sequence-group"><strong><em>sequence groups</em></strong></a> that allow the user to control, how updates are handled; for example when <strong><em>records arrive late</em></strong>.</li></ul><h4><strong>Compaction Computing Costs</strong></h4><p>Since Paimon uses LSM and compaction runs automatically, there might be concerns about computing costs. Having the compaction process running along with the job, might raise concerns that more resources will be needed and it might be better to have a scheduled compaction job that runs periodically.</p><p>The design goal of Paimon is to be low cost and thus the compaction process is lightweight. For scenarios with much incremental data, the writer might backpressure as it needs to wait for the compaction process to finish. In such scenarios, compaction can be configured to<strong><em> </em></strong><a href="https://paimon.apache.org/docs/master/primary-key-table/compaction/#asynchronous-compaction"><strong><em>run completely asynchronously</em></strong></a>.</p><p>At the same time, users can always run <a href="https://paimon.apache.org/docs/master/maintenance/dedicated-compaction/">dedicated compaction</a> jobs if required, which also helps to separate compaction resources</p><blockquote>A dedicated compaction job is also recommended for scenarios where multiple jobs are writing to the same table concurrently.</blockquote><h4><strong>Changelog Producers</strong></h4><p>Apache Paimon has the concept of the <a href="https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/">changelog-producers</a>.</p><p>Database Systems generate and keep track of the generated changes. Similarly, the changelog-producers generate a complete changelog; i.e. keep track and record the different operations that can take place <em>+I</em>, <em>-U</em>, <em>+U</em>, <em>-D</em>, which is important for downstream consumers to always “see” correct results.</p><p>Changelog producers also allow to disable the Normalize operator (set `scan.remove-normalize`to true) that is used by Flink for result correctness, which is an expensive one.</p><h4><strong>Consumer Mechanism for Streaming Reads</strong></h4><p>To <strong>connect upstream</strong> and <strong>downstream tables</strong> via streaming, the consumers need to leverage the changelog and overall provide strong streaming capabilities.</p><p>Table formats provide version management mechanisms that are relatively heavy. They generate snapshots that not only manage incremental files but also manage files during the compaction process.</p><p>In production, snapshots are continuously generated, but because too many snapshots will result in a large number of files and redundant data storage, a cleanup mechanism is needed. However consumers with other table formats, consumers are not aware of this.</p><p>If the snapshots expire and your consumers are reading “older” data, you are likely to run into <strong>FileNotFoundExceptions</strong>.</p><p>To address this<strong><em> Paimon has a consumer mechanism and leverages consumer-ids</em></strong>. The progress of the consumers is recorded in the file system.</p><p>Snapshot expiration will not delete the required snapshots and can ensure the safety of the stream reading and provide a stream reading progress similar to Kafka’s group-id.</p><h4>Separation of Changelog Lifecycle Management</h4><p>The consumer mechanism<strong>, </strong>safeguards against snapshot expirations and running into unpleasant scenarios that your data gets deleted, while stream reading.</p><p>But still older snapshots need to be kept around for longer resulting in redundant storage amplification.</p><p>Paimon addresses the problem of keeping too many files around by <strong><em>separating the management of Snapshot and Changelog files</em></strong>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*29zGyTbOYbttwXMuy49U7w.png" /></figure><p>When a snapshot needs to be deleted, its changelog files get stored separately for management, allowing the users to keep around <strong><em>only the relevant data</em></strong>.</p><p>For example, you can store the changelog files for 24 hours or a few days, to ensure your downstream consumers have the data available for reading. On the other hand, you can set the snapshots to expire after one or two hours.</p><p>This functionality provides an experience <strong><em>consistent with message queues</em></strong>, which you can also implement with Apache Paimon, but at much lower costs.</p><h4><strong>Ecosystem</strong></h4><p>When it comes to building a Lakehouse or a Streaming Lakehouse the ecosystem is an important part.</p><p>This requirement is set to provide openness and enable organizations to have a true “single source of truth” for their data; then any query engine can be integrated on top.</p><p>You always have the option to leverage Flink SQL for your batch and OLAP queries.</p><p><strong>Side Note:</strong></p><p>Although Apache Flink has been historically known for its stream processing capabilities, over the last releases (see 1.18+) <strong><em>it has reached industry standards like Apache Spark in terms of batch processing</em></strong>.</p><p>It has added features like:</p><p>✅ <strong>Operator Fusion Codegen:</strong> to optimize the code generated by SQL planner.<br>✅ <strong>Adaptive local hash aggregate:</strong> to dynamically decide whether to use local aggregations.<br>✅ <strong>Runtime filters and dynamic data pruning:</strong> to optimize data processing efficiency.<br>✅ <strong>Adaptive Execution Plan (AQE):</strong> to perform automatic concurrency inference and dynamic load balancing.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*f9HkaUsM9aDlWMR9T5qmrw.jpeg" /></figure><p>Of course, <strong><em>there is no “one size fits it all” solution, and other query engines, might have richer ecosystems </em></strong>— for example, Apache Spark in terms of data science and ML.</p><p>Apache Paimon supports all the usual suspects with good connector implementations, like Apache Spark, Trino, Presto, StarRocks, Apache Doris, and Apache Hive.</p><p>The <strong>Trino</strong> and <strong>StarRocks</strong> integrations allow for <strong><em>blazing-fast OLAP queries</em></strong> directly on the Lakehouse, especially when used in combination with <a href="https://medium.com/@ipolyzos_/apache-paimon-introducing-deletion-vectors-584666ee90de">Deletion Vectors</a>.</p><p>Openness and integration is an important piece.</p><p>Although Apache Paimon supports all the usual suspects on the open-source side, there is always the need for integration with commercial systems. These systems can include — Databricks (there is already a strong integration with Apache Spark), Starburst, Snowflake, and Amazon Athena.</p><p>At the same time, there are two kinds of users currently observed:</p><ol><li>Those that have already implemented a Lakehouse and introduced Apache Paimon along with Apache Iceberg tables, to unlock more use cases.</li><li>Those getting started invest in Apache Flink and require integrations with commercial offerings like Snowflake and Amazon Athena.</li></ol><p>To address the above — and as Apache Paimon shares a similar file layout to Apache Iceberg due to its simplicity — in the next release Apache Paimon will introduce an Apache Iceberg snapshot so that users can integrate with more engines.</p><p><strong>Key Takeaways</strong></p><p>Some key takeaways here include that when it comes to Apache Paimon:</p><ul><li>It has a strong integration with Apache Flink and Flink CDC, providing a seamless analytical solution.</li><li>It allows efficient handling of changelog streams — data mutations — on the lake; required for analyzing streaming data.</li><li>It provides a single table abstraction for batch, OLAP, and streaming data.</li><li>It allows streaming data and real-time analysis on the lake and connecting upstream and downstream tables, by providing lots of safeguards for streaming reads.</li><li>It aims to keep resources and latencies low to enable users to make better trade-offs between costs and latencies</li></ul><p>To conclude, the current status quo includes real-time streaming via Apache Kafka and Flink and then offloading — typically append-only data — on a Lakehouse; then query engines take over, which is more batch-oriented.</p><p>If you want to do more though, handle data mutations and have the processing to Flink’s excellent capabilities; create real-time views ready to be queried by other engines, or replace expensive streaming workloads that can afford some extra latency then Apache Flink along with Paimon, might be a good alternative for you.</p><p>Make sure to keep an eye on the project, give it a try and if you like it, don’t forget to give it some ❤️ via ⭐ on <a href="https://github.com/apache/flink-cdc">GitHub</a>.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=d36e73571fc9" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[A Glimpse into Flink CDC 3.0]]></title>
            <link>https://medium.com/@ipolyzos_/a-glimpse-into-flink-cdc-3-0-a985fb5895a5?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/a985fb5895a5</guid>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Tue, 21 May 2024 06:14:53 GMT</pubDate>
            <atom:updated>2024-05-21T06:23:27.509Z</atom:updated>
            <content:encoded><![CDATA[<h3>The Next-generation of Streaming Data Integration</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*-yJPcDAeytde8IAYvO58Fg.png" /></figure><p>Apache Flink CDC <a href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/">3.1.0</a>, was released last week marking the first official release after the project was graduated to the Apache Incubator.</p><p><strong><em>Disclaimer: This article is written with Leonard Xu, PMC of Apache Flink and Flink CDC project Lead</em></strong></p><p>This blog post aims to provide some insight, into the goals of Flink CDC 3.0. Flink CDC 2.0 set the bar high as a CDC solution and has been adopted in many large-scale production environments, to address problems users would typically come across with alternative solutions.</p><p>This blog post aims to provide, some insight into the project, with the hope of helping engineers and architects better understand why they should consider this solution regardless of the scale.</p><h3>Introduction</h3><p>Flink CDC is a real-time data integration framework based on the Change Data Capture (CDC) technology of database changelogs.</p><p>It provides multiple advanced features, such as <strong><em>full and incremental data synchronization</em></strong>, <strong><em>lock-free reading</em></strong>, <strong><em>parallel reading</em></strong>, <strong><em>automatic synchronization of schema changes</em></strong>, and distributed architecture, on top of <a href="https://flink.apache.org/">Apache Flink</a>’s excellent processing capability and robust ecosystem.</p><blockquote><strong>Full and incremental data synchronization</strong> refers to the process of reading all the historical data within the database and then automatically switching to reading the incremental data.</blockquote><h4>Background</h4><p><strong><em>Flink CDC 2.0</em></strong> received a lot of traction from many users. However, although it provided many benefits compared to existing CDC solutions, resulting in a strong adoption, users experienced the following pain points:</p><ul><li><strong>User experience:</strong> <a href="https://www.alibabacloud.com/blog/what-is-change-data-capture-cdc_601052">Flink CDC</a> provides only source connectors and does not support end-to-end data integration, making it difficult to create jobs via SQL syntax or the DataStream API.</li><li><strong>Frequent maintenance:</strong> Frequent table creation and deletion operations are necessary due to the frequent changes of schemas in source databases.</li><li><strong>Scalability:</strong> Large amounts of resources are required to synchronize data from thousands of tables and ingest tens of thousands of tables into data lakes or data warehouses. In addition, scaling cannot be automatically performed to handle different resource requirements for the full synchronization and incremental synchronization stages.</li></ul><p>To tackle the above challenges <a href="https://nightlies.apache.org/flink/flink-cdc-docs-master/">Flink CDC 3.0</a> was introduced.</p><p>Flink CDC was donated to the <a href="https://www.ververica.com/blog/ververica-donates-flink-cdc-empowering-real-time-data-integration-for-the-community">Apache Foundation</a>, aspiring to become a complete streaming integration framework, based on the following design principles:</p><ul><li><strong>End-to-end experience:</strong> As an end-to-end data integration framework, Flink CDC 3.0 provides high-level abstractions for setting up data movement pipelines easily.</li><li><strong>Schema Synchronization: </strong>It can automatically synchronize schema changes from upstream to downstream system, allowing users to also add tables to existing jobs at any time.</li><li><strong>Elasticity:</strong> Idle resources can be automatically reclaimed, and a single sink instance can write to multiple tables simultaneously.</li><li><strong>Large data volume:</strong> Users&#39; legacy databases can be large, commonly containing over 100 TB of data.</li><li><strong>Real-time processing of incremental data: </strong>The business value of incremental data is higher than that of historical data but decreases over time, which leads to high requirements for data freshness; new incoming events need to be processed as soon as possible.</li><li><strong>Data ordering:</strong> Support for global preservation of data ordering to ensure the consistency of processed data.</li></ul><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*UshmdBE0op97_Llo0uU8GA.png" /></figure><h3>Design of Flink CDC 3.0</h3><h4><strong>Architecture</strong></h4><p>The architecture of Flink CDC 3.0 is divided into four layers.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*p0EyK9zhFMKc2CB-7eg52w.png" /></figure><p>The architecture of Flink CDC 3.0 is divided into four layers:</p><ul><li><strong>Flink CDC API:</strong> YAML-formatted API operations are provided to help end users configure data synchronization pipelines. Users can call the API operations in Flink CDC CLI.</li><li><strong>Flink CDC Connect:</strong> Source and sink connectors are provided to interact with external systems. Flink CDC 3.0 encapsulates the source connectors of Apache Flink and Flink CDC to read and write data to external systems.</li><li><strong>Flink CDC Composer:</strong> This layer translates data synchronization tasks into Flink DataStream jobs.</li><li><strong>Flink CDC Runtime:</strong> Custom Flink operators are provided for different data synchronization scenarios to implement advanced features, such as schema changes, routing, and transformations.</li></ul><h3>User-Friendly API Design</h3><p>Flink CDC 3.0 is tailored for seamless streaming data integration scenarios. Users do not need to worry about the implementation details of the framework.</p><p>They can easily create data synchronization pipelines by using a yaml file; configuring data sources, sinks, and intermediate transformations or routes.</p><p>The following figure shows a sample yaml for synchronizing data from a MySQL database to Apache Kafka or Paimon.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*e_KCzWeI8fLvl_7qyjG77g.png" /><figcaption>Ingestion Pipeline from MySQL to Kafka or Paimon</figcaption></figure><h3>Pipeline Connector API</h3><p>To facilitate the integration of external systems into data synchronization pipelines, Flink CDC 3.0 introduced the <strong><em>Pipeline Connector API</em></strong>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*DMsxH6TWkxxz5g2nc7RCXw.png" /></figure><ul><li><strong>DataSource:</strong> it is used to collect change events from external systems and pass them to downstream operators. It is composed of the EventSourceProvider and MetadataAccessor. EventSourceProvider builds Flink sources, whereas MetadataAccessor accesses metadata.</li><li><strong>DataSink:</strong> it is used to apply schema changes received from upstream operators and write the changed data to external systems. It is composed of EventSinkProvider and MetadataApplier. EventSinkProvider builds Flink sinks, whereas MetadataApplier applies metadata changes (such as table schema changes) to the destination system.</li></ul><p>To ensure compatibility with the Flink ecosystem, the design of DataSource and DataSink follows the same logic as Apache Flink. Developers can easily integrate external systems with Flink CDC 3.0 by using Flink connectors.</p><h3>Core Features of Flink CDC 3.0</h3><p>To achieve high performance in scenarios such as schema changes, full database synchronization, and table merging, Flink CDC 3.0 integrates the capabilities of Apache Flink and provides multiple custom Flink operators to support various synchronization modes.</p><h3>Schema Evolution</h3><p>Schema evolution is a common but challenging feature of data synchronization frameworks. Flink CDC 3.0 <strong><em>introduces a SchemaRegistry</em></strong> to map jobs in topology and uses <strong><em>a SchemaOperator to manage schema changes</em></strong> in job topologies.</p><p>Here’s how Flink CDC 3.0 handles schema changes:</p><ul><li>When a schema change is detected in a data source, SchemaRegistry issues a pause request to SchemaOperator. After receiving the request, SchemaOperator pauses the streaming ingestion and flushes the data to maintain schema consistency.</li><li>Once the schema change is synchronized to the external system, SchemaRegistry issues a resume request to SchemaOperator. After receiving the request, SchemaOperator resumes with the streaming ingestion.</li></ul><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*1_yVSnPF5OJ7JW7zYO1xXw.png" /></figure><h3>Full Database Synchronization</h3><p>Users can specify a multi-table or full database synchronization task by configuring the DataSource in the configuration file of Flink CDC 3.0.</p><p>The schema evolution feature enables automatic synchronization for the entire database. When new tables are detected, SchemaRegistry automatically creates replicas in the destination system.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*RwImoEyOTvtqtfS7FoqmZg.png" /></figure><h3>Table Merging</h3><p>Another common use case of Flink CDC 3.0 is merging multiple source tables into a single sink table. Flink CDC 3.0 employs a Route mechanism to implement table merging and synchronization. Users can define routing rules in the configuration file of Flink CDC 3.0 by using regular expressions to specify the source tables and the sink table.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Yyp0QeZwpejpHTObZAAumA.png" /></figure><h3>High-performance Data Structure</h3><p>To reduce serialization overhead during data transmission, Flink CDC 3.0 adopts a high-performance data structure.</p><ul><li><strong>Schemaless deserialization:</strong> Schemaless deserialization decouples schema information from changed data. Before sending changed data, DataSource sends the schema description, which is tracked by the framework. This way, schema information does not need to be bound to each changed record, and the serialization cost for wide tables is significantly reduced.</li><li><strong>Binary storage format:</strong> Data is stored in a binary format during synchronization. Deserialization is performed only when the detailed data of a field is read (such as when the table is partitioned by the primary key) to reduce serialization costs.</li></ul><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Ib8SGGuEwdkKcUNsGETQoA.png" /></figure><p>In addition to fundamental data synchronization capabilities, Flink CDC 3.0 provides multiple advanced features, such as automatic synchronization of schema changes, full database synchronization, and table merging and synchronization, <strong><em>to cater to complex data integration scenarios</em></strong>.</p><p>The automatic synchronization of schema changes frees users from manual intervention when schema changes occur in a data source, greatly reducing operational costs.</p><p>Moreover, only a few operations are needed to configure a multi-table or multi-database synchronization task, facilitating users’ development.</p><h4>Conclusion</h4><p>Apache Flink CDC 3.0 sets a new direction and the future looks quite promising.</p><p>It supports a rich variety of connectors already, but for the 3.1.0 version, the streaming data integration framework supports out-of-the-box — <strong>MySQL, Apache Doris, StarRocks, Apache Kafka and Apache Paimon.</strong></p><p>Apache Flink CDC 3.0 is part of the unified stack.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/700/0*qkD1KMruA2DrNT0V" /></figure><p>It also powers <a href="https://www.ververica.com/?_gl=1*16gwgmg*_up*MQ..&amp;gclid=Cj0KCQjw6auyBhDzARIsALIo6v-0tHxCNqPv28VKXWy1nrR67PDqSJTeFwPN5AXSv5OUiYmS-XLM5EUaAjGSEALw_wcB">Ververica’s</a> streaming data movement framework.</p><blockquote><strong>Unified Ingestion refers to the process</strong> of being able to read all the historical data within the database (batch reads) and then without locking the database, automatically switch to reading the incremental data (streaming reads).</blockquote><blockquote>At the same time, the framework needs to be able to ensure data consistency, downscale resources and make sure it doesn&#39;t put pressure to the source system.</blockquote><p>Make sure to keep an eye on the project, give it a try and if you like it, don’t forget to give it some ❤️ via ⭐ on <a href="https://github.com/apache/flink-cdc">GitHub</a>.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=a985fb5895a5" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Apache Paimon: Introducing Deletion Vectors]]></title>
            <link>https://medium.com/@ipolyzos_/apache-paimon-introducing-deletion-vectors-584666ee90de?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/584666ee90de</guid>
            <category><![CDATA[apache-paimon]]></category>
            <category><![CDATA[olap]]></category>
            <category><![CDATA[data-lakehouse]]></category>
            <category><![CDATA[apache-spark]]></category>
            <category><![CDATA[apache-flink]]></category>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Mon, 13 May 2024 06:36:44 GMT</pubDate>
            <atom:updated>2024-05-13T10:24:11.387Z</atom:updated>
            <content:encoded><![CDATA[<h4>Near real-time updates and extremely fast queries</h4><p><a href="https://paimon.apache.org/">Apache Paimon</a> is now a Top-Level Project under the <a href="https://www.ververica.com/blog/ververica-celebrates-as-apache-paimon-graduates-to-top-level-project?utm_campaign=Apache%20Paimon&amp;utm_content=290224673&amp;utm_medium=social&amp;utm_source=linkedin&amp;hss_channel=lcp-5385380">Apache Software Foundation</a>.</p><p>It started as a project under the Apache Flink umbrella (originally called Flink Table Store) and quickly moved from an early-stage umbrella project to an Apache incubator project. In less than 12 months, it has been deployed in many production environments and has grown a strong community.</p><p><strong><em>Apache Paimon uses LSM trees (Log-structured merge trees)</em></strong><em>, the go-to data structure for high-speed data ingestion, used by many popular data systems like RocksDB, Cassandra, ScyllaDB, Clickhouse, etc. </em><strong><em>It Innovatively combines the LSM structure with the lake format, bringing real-time streaming updates into the lake architecture.</em></strong></p><p>Apache Flink is a <strong>unified compute</strong> engine and Apache Paimon provides <strong>unified lakehouse storage. </strong>When integrated with <a href="https://nightlies.apache.org/flink/flink-cdc-docs-master/">Flink CDC</a> which provides <strong>unified data ingestion</strong>, we get an end-to-end unified batch &amp; streaming stack for real-time data analytics.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/939/0*dNJqcOoHc86PRkRL" /></figure><p>More information on Apache Flink’s roadmap and the vision for the project (along with Flink CDC and Apache Paimon) can be found <a href="https://flink.apache.org/what-is-flink/roadmap/#batch--streaming-unification-and-mixing">here</a>.</p><p><strong>Unified Storage</strong> aims to provide a<strong> single table abstraction</strong> for batch, stream, and OLAP queries.</p><h3>Background</h3><p>Paimon 0.8, was <a href="https://paimon.apache.org/releases/release-0.8/">released</a> last week and marks the first official release after the project graduation.</p><p>This new release brings many important features and improvements, along with important doc improvements.</p><p>In this blog post, we will explore a new feature called deletion vectors.</p><p>We will explore why they are needed, and how they can help provide an even better balance between streaming writes and fast queries.</p><p><strong><em>Disclaimer: </em></strong><em>This article contains contributions from </em><a href="https://github.com/JingsongLi"><em>Jingsong Li</em></a><em>, PMC chair of Apache Paimon.</em></p><h4>Business Use Case</h4><p>Let’s take a business example, we have an <strong><em>orders table </em></strong>in our database and want to ingest it into the data lake.</p><pre>CREATE TABLE orders (<br>    order_id          BIGINT,<br>    order_name        STRING,<br>    order_user_id     BIGINT,<br>    order_shop_id     BIGINT,<br>    order_product_id  BIGINT,<br>    order_fee         DECIMAL(20, 2),<br>    order_create_time TIMESTAMP(3),<br>    order_update_time TIMESTAMP(3),<br>    order_state       INT,<br>    PRIMARY KEY (order_id) NOT ENFORCED<br>)</pre><p>After entering the lake, you can perform batch ETL scheduling and analysis, and query. The general structure is as follows:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*RpZWAv2BD09uBf4l-40Kcg.png" /></figure><p>Generally speaking, <strong>Batch ETL</strong> doesn’t have high requirements for read performance and can be completed in minutes. <strong>Analytical queries</strong> on the other hand need to return within seconds; we need to provide analysts with a good user experience 😄</p><p>Next, let’s take a look at how Paimon’s underlying design satisfies the above architecture.</p><h4><strong>Primary Key Table</strong></h4><p>This table requires a primary key to be set. It can handle updates automatically in realtime and can also be queried in real-time.</p><p>The basic file structure of Paimon is as follows:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*oiISrgeZPLjzozckrG9Vww.png" /></figure><p>The table or partition contains multiple buckets, and each bucket is a separate LSM tree structure containing multiple files.</p><p><strong><em>The writing process of LSM is as follows: </em></strong>With every checkpoint, batches of data get stored on disk. L0 files are flushed on disk and compaction is triggered automatically to merge the data and handle small files.</p><p>By default, Paimon doesn’t require you to make MoR or CoW tradeoffs — due to the LSM, but it can mimic similar behavior:</p><ul><li><strong><em>MoR (Merge On Read):</em></strong> The data merging process is semi-asynchronous by default (but when there are too many L0 files; i.e lots of incremental data, it might backpressure the writing); To avoid this you can set the compaction to be <a href="https://paimon.apache.org/docs/master/maintenance/write-performance/#asynchronous-compaction">fully asynchronous</a> (no backpressure during writing).</li><li><strong><em>CoW (Copy On Write): </em></strong>The merged data can also be set to be synchronous; that is, after writing trigger a full compaction and merge all the files.</li></ul><p><strong>Merge-On-Read</strong></p><p>The MoR mode requires merging all files. Because files are ordered, a multi-way merging is performed which requires a comparison between the primary keys.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*rkJXA9eyhYAUIjD85of-XA.png" /></figure><p>There is a problem here.</p><p>A single LSM tree can only be read by a single thread, so the parallelism of reads is limited. If the amount of data within a bucket is too large, it can result in poor reading performance.</p><p><strong><em>The recommended bucket size is between 200MB and 1GB. This allows keeping the query performance &lt; 10 seconds.</em></strong></p><p>On the other hand, if the Bucket is too small, there will be more small files being read and written, which will put pressure on the file system.</p><p>In addition, due to the merging process, <strong><em>filter-based data skipping</em></strong> can’t be performed on non-primary key columns, otherwise, new data will be filtered out, resulting in incorrect old data.</p><p>This mode provides the best writing performance, as data does not need to be forcibly merged. However, when reading the LSM, there are performance issues due to multi-path merging:</p><ol><li>Single LSM, single thread, limited concurrency.</li><li>Non-primary key columns cannot be filtered and pushed down.</li><li>Multi-way merging requires certain performance consumption.</li></ol><p><strong>Copy-On-Write</strong></p><p>An intuitive idea is whether the data can be merged directly during writing. We can set <strong><em>full-compaction.delta-commits </em></strong>to<strong><em> 1, </em></strong>to force a full compaction with every new snapshot. (this is the most “forceful mode”, so you can experiment with different intervals)</p><pre>ALTER TABLE orders SET  (&#39;full-compaction.delta-commits&#39; = &#39;1&#39;);</pre><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*0R_qRgoEfjOAIwsbMtJjaQ.png" /></figure><p>When reading the data now, there is no need to merge multiple files, so we can get really fast queries. However, since we trigger <strong><em>full compaction </em></strong>with<strong><em> </em></strong>every write, we can notice serious write amplifications.</p><p>So basically MoR and CoW can provide two extreme alternatives; either really fast writes or really fast queries.</p><p>Although <strong><em>Paimon’s MoR (due to the LSM) is sufficient in most cases</em></strong>, such as Batch ETL, there are some shortcomings in some scenarios that <strong><em>require high-performance query analysis</em></strong>.</p><p><strong><em>Is there a mode that we can achieve a better trade-off between reads and writes?</em></strong></p><p><strong>Deletion Vectors</strong></p><p>Paimon 0.8 introduces the <a href="https://cwiki.apache.org/confluence/display/PAIMON/PIP-16%3A+Introduce+deletion+vectors+for+primary+key+table">Deletion Vectors</a>.</p><p>The Deletion Vectors mode is designed to take into account both data reading and writing efficiency.</p><p>Vectors are generated during writing, representing which data in the file has been deleted. Unnecessary data can be directly filtered out when reading. This is equivalent to the completion of the merge during full compaction and doesn’t affect read performance.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*HJtZZ8dctuYQzE4KlMUc8A.png" /></figure><p>A simple example is as follows:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*AJMYt8_iwgIEN3oTeM6FCg.png" /></figure><p>Delete data can be marked directly to the Delete file. Upsert updates data by first deleting and then adding.</p><p>Let’s look at this mode of reading and writing:</p><ol><li><strong><em>The reading performance is good: </em></strong>during reading, data can be directly retrieved by employing data with deletion vectors, avoiding additional merge costs between different files. Furthermore, data reading concurrency is no longer limited, and non-primary key columns can also be used for filter push-down.</li><li><strong><em>Writing performance: </em></strong>additional overhead (looking up LSM Tree and generating the corresponding Deletion File) will be introduced during writing. You need to query and mark data corresponding to the same primary key, and modify the Deletion Vectors of the historical file.</li></ol><p>Generally speaking, in this mode, we can get a <strong><em>huge improvement in read performance without losing too much write performance</em></strong>.</p><p><strong><em>Dealing with deletion files is also easy with Paimon, </em></strong>as it uses LSM and the most common application of LSM is point lookups.</p><p>This means we can quickly find the files that need to be deleted, along with their line numbers using Paimon LSM’s point lookup capability.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*7XMBFKDKTx_hgKuCxuZnXQ.png" /></figure><p>When the data is written, it will go to the Lookup LSM Tree and produce the corresponding Deletion File. The deleted data can be directly filtered out when reading.</p><p>For those familiar with Paimon, it uses the same underlying mechanism with the <a href="https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup"><strong><em>lookup changelog-producer</em></strong></a>. Each bucket will produce a corresponding Deletion File.</p><p>The structure of the file is as follows:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*nxDv3II_3Cff-xJsl6j1UA.png" /></figure><p>Each file saves its Deletion Vector through Bitmap. One bucket and one deletion file can minimize the problem of <strong><em>too many small files caused by Deletion files</em></strong>.</p><p>Each Bitmap uses a RoaringBitmap structure. <strong>Apache Iceberg</strong> and <strong>Delta Lake</strong> already use this approach for query acceleration during batch deletion. The RoaringBitmap is a compressed bitmap that can significantly reduce the storage space amplification.</p><h3><strong>Testing the Effect of Deletion Vectors</strong></h3><h4><strong>Test Environment</strong></h4><p>Running Apache Flink 1.17 with Paimon 0.8, for writing on Amazon s3. DV by default is disabled, so <strong><em>deletion-vectors.enabled</em></strong> needs to be set to <strong><em>true</em>.</strong></p><p>Then we will be querying the data using Spark 3.3.1 and Trino 422. <strong><em>The latest Paimon-Trino version has been optimized for ORC reading.</em></strong></p><h4>Data size</h4><p>The table schema is the <strong><em>orders table</em></strong> defined at the beginning of this blog post. We generate <strong><em>500 million records </em></strong>using the <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/datagen/">Flink Datagen</a> connector with the primary key ranging from 1 to 1 billion; using <strong><em>bucket = 8</em></strong>.</p><p>After the writing is completed, <strong><em>a single bucket</em></strong> will contain about<strong><em> 40+ files</em></strong>, with a size of <strong><em>~5 GB (yes more than the recommended; for testing the effect)</em></strong></p><h4>Write performance</h4><ul><li><strong><em>Without DV enabled:</em></strong> 455 seconds, single concurrent writing of ~135.000 records per second</li><li><strong><em>Turn on DV:</em></strong> 937 seconds, single concurrent writing of ~66.000 records per second</li></ul><p>The writing performance is twice as slow, but <strong><em>we will continue to optimize it in subsequent versions</em></strong>.</p><h3>Query performance</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*x6r54hVoRkxECexeANqdbw.png" /></figure><p>As you can see, there is no difference between Trino and Spark, because they both share the same Reader implementation when they need to be merged.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*2s0yUqzBbbK_2gQZn5HJhQ.png" /></figure><p>When DV is turned on, Spark’s query performance is greatly improved, while Trino’s improvement is even greater.</p><p><strong>Why?</strong> Because Trino uses Trino’s ORC Reader; its column storage structure is not needed when merging. This allows native read performance.</p><h4><strong>Conclusion</strong></h4><p>Apache Paimon’s Deletion Vectors mode provides a good balance between reads and writes. We can sacrifice some writing performance, but in return, we can get way faster read queries.</p><p>In the future, we can expect optimized Vector Deletion support for <a href="https://www.starrocks.io/">StarRocks</a>. We should probably expect a powerful performance experience, as StarRocks can provide the best OLAP for Paimon.</p><p>Apache Paimon <strong><em>keeps enhancing the Streaming Lakehouse experience. </em></strong><em>As </em><strong><em>a unified lakehouse storage</em></strong>, it allows support for all scenarios; batch, OLAP, and stream at minute-level latencies.</p><p>Make sure to keep an eye on the project <a href="https://cwiki.apache.org/confluence/display/PAIMON/PIP-16%3A+Introduce+deletion+vectors+for+primary+key+table">PIPs</a> and if you like it, don&#39;t forget to give it some ❤️ via ⭐ on <a href="https://github.com/apache/paimon">GitHub</a>.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=584666ee90de" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Understanding Watermarks in Apache Flink]]></title>
            <link>https://medium.com/@ipolyzos_/understanding-watermarks-in-apache-flink-c8793a50fbb8?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/c8793a50fbb8</guid>
            <category><![CDATA[streaming]]></category>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[big-data]]></category>
            <category><![CDATA[apache-flink]]></category>
            <category><![CDATA[flink]]></category>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Tue, 07 Mar 2023 07:29:00 GMT</pubDate>
            <atom:updated>2023-03-07T11:57:43.358Z</atom:updated>
            <content:encoded><![CDATA[<p>Stories In the land of Streams</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*cHthp8CNCDGpCkDatbHhSw.png" /></figure><blockquote>In the realm of time</blockquote><blockquote>… one traveler once asked, <strong>what is the use of Watermarks?</strong></blockquote><h3>1. Introduction</h3><p>Consider the following event stream</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*AxkDQBBE4rWj2Fy8dE0k3Q.png" /></figure><p>and assume we want to perform different operations like sorting the events or performing some aggregates. We perform these operations in specified time intervals, i.e from<em> t1 to t2</em>, <em>t2 to t3,</em> and so on.</p><p>As depicted in the picture we can observe events arriving <em>late </em>and<em> out of order. </em>So we ask ourselves:</p><ul><li>When do I consider my results <strong>complete</strong>?</li><li>How long should I wait to make sure events don’t arrive out of order?</li></ul><p>A watermark helps us address these questions as it provides a way to keep track of the progress of time.</p><p>In a stream of events if we see a watermark of <strong><em>time t</em></strong> we know that up to this specific point in time <strong>t</strong> the stream is complete. — or at least we consider it complete.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/752/1*NiP6vtcrcRJuYRNK9ICA-Q.png" /></figure><h3><em>2. How do watermarks work?</em></h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*cL-coIYpAjvP7RaoUDGTLw.png" /></figure><p>All the events flowing through Flink pipelines and being processed are considered <strong>StreamElements</strong>. These StreamElements can be either <strong>StreamRecords</strong> (i.e every event that is being processed) or a <strong>Watermark</strong>.</p><p>A watermark is nothing more than a special record injected into the stream that carries a timestamp <strong>(t)</strong>.</p><p>It flows through the operators and tells each operator that <strong>no elements with a timestamp older or equal to the watermark timestamp (t) should arrive.</strong></p><p>This way the operator knows that all the results up to this point in time, can be considered complete and is ready to emit those results.</p><h3>3. <strong>How are Watermarks generated </strong>in Apache Flink<strong>?</strong></h3><p>When working with event-time you need a way to actually tell Flink how to extract the timestamp from the incoming events and generate Watermarks.</p><p>Flink allows you to achieve this by using a <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies">WatermarkStrategy</a>. A WatermarkStrategy informs Flink how to extract an event’s timestamp and assign watermarks.</p><p>The following snippet uses a <em>WatermarkStrategy</em> to extract the <em>eventTime</em> from a <em>ClickEvent</em>.</p><pre>WatermarkStrategy<br>  .forBoundedOutOfOrderness&lt;ClickEvent&gt;(Duration.ofSeconds(5))<br>  .withTimestampAssigner(<br>      SerializableTimestampAssigner { event: ClickEvent, _: Long -&gt;<br>          event.eventTime<br>      } as SerializableTimestampAssigner&lt;ClickEvent&gt;?<br>)</pre><p>The same can be achieved in the Flink SQL API using the following snippet</p><pre>CREATE TABLE events (<br>  userid STRING,<br>  eventTime_ltz AS TO_TIMESTAMP_LTZ(eventTime, 3),<br>  ....<br>  userSession STRING,<br>    WATERMARK FOR eventTime_ltz AS eventTime_ltz - INTERVAL &#39;5&#39; SECONDS<br>) WITH (...)</pre><p>Notice the <strong><em>BoundedOutOfOrderness</em></strong> in the first snippet and <strong><em>eventTime_ltz — INTERVAL ‘5’ SECONDS</em></strong><em> </em>in the second one.</p><p>A watermark is basically a heuristic and Flink provides a few <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/built_in/#fixed-amount-of-lateness">built-in </a>Watermark Generators with <em>BoundedOutOfOrderness </em>being one of them.</p><p>This allows Flink to set a Watermark and the user can set a <em>threshold </em>for which it’s acceptable to wait for results to arrive late — i.e the maximum delay we are allowed to wait.</p><p>Let’s assume that in our event stream example assume we can afford to wait up to 5 seconds for out-of-order events, but no more. Within those<em> 5 seconds,</em> element <em>4 </em>can arrive and be included in the computation, but <em>element 9 </em>might be too late (i.e arrive more than 5 seconds later) and be excluded.</p><blockquote>Different applications have different needs and there is a trade-off between completeness and latency.</blockquote><blockquote>We can add some extra delay for late arriving events, <strong>but how much delay is enough?</strong></blockquote><p>Latency-sensitive applications may not afford to wait, so:</p><ul><li>they may provide incomplete results</li><li>or after providing initial (incomplete) early results, provide updated results as late data arrives.</li></ul><p>Less timely applications can afford to wait longer for out-of-order events.</p><h3>4. How are watermarks propagated in Apache Flink?</h3><p>Each parallel instance of a source operates independently based on the events it processes. Assume the following graph processing two input Kafka topics with two partitions.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/982/1*aQ6wWTFIcNlx-y4Bc0o23A.png" /></figure><p>The current watermark for a task with multiple inputs is the minimum watermark from all of its input</p><p>Look at the W<strong>indow(1)</strong> operator for example. It receives<em> 29 </em>and <em>14 </em>as watermark inputs<em> </em>and sets the current watermark to <em>14 — </em>that is the minimum of both.</p><p>When the operator receives the next watermark input it updates its local version of the current watermark and passes it onward, <strong>only after the watermark is processed by the task</strong> —<strong> and along with that the event-time clock also advances.</strong></p><p><strong>Note:</strong></p><ul><li>Operators that do not internally buffer elements can always forward the watermark that they receive.</li><li>Operators that buffer elements, such as window operators, must forward a watermark <strong>after</strong> the emission of elements that are triggered by the arriving</li></ul><h3>5. The curse of Idle Sources</h3><p>Up to this point, we know that Watermarks help make progress and update the event-time clock.</p><p>But what happens when a source becomes idle, i.e produces no data?</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/688/1*k6376OGZYq203R3YguCXwg.png" /></figure><p>With no events arriving, an idle source can cause the entire pipeline to stall, as it has no basis for advancing the current watermark. This can frequently be the root cause for cases when while we perform a window or join operation, we observe that no results are being emitted downstream.</p><p>Some potential workarounds to the idle source problem can be:</p><ul><li>Set the watermark for a mostly idle stream to <em>Watermark.MAX_WATERMARK</em>- e.g for streams that are<strong> </strong>rarely changing or<strong> </strong>rarely evolving over time.</li><li>Use <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources"><em>withIdleness</em></a><em>— </em>marks streams as idle after some duration with no events. Idle streams do not hold back watermarks from active streams.</li><li>Do a <em>rebalance() </em>on the stream in order to mix idle and non-idle streams<em>— </em>this can cause expensive network shuffles though.</li><li>Use <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_">Watermark Alignment</a> — currently still in beta</li></ul><blockquote>What if all sources become idle?</blockquote><p>No events mean no watermarks and thus timers won’t fire — windows and joins won’t send results downstream.</p><p>If the idleness is not temporary you can emit keep-alive events from your source(s) — i.e implement watermarking that detects idleness and manually advances the watermark.</p><blockquote>What is a good rule of thumb for setting Watermarks with some extra delay?</blockquote><p>This is a common question for people starting with Apache Flink and in order to be able to make the right choice, try and answer the following:</p><ul><li>Is my application latency-sensitive and needs to emit results as soon as possible?</li><li>Can I afford to wait an extra time period — at the cost of adding unnecessary extra delay?</li><li>How much delay is good enough to ensure a good trade-off between latency and results completeness?</li></ul><h3>6. Conclusion</h3><p>In this article, we took a closer look at watermarks and how they work inside Apache Flink.</p><p>Watermarks are one of the most important properties of stream processing and at the same time a source of confusion and the culprit for many unexpected behaviors.</p><p>Understanding how they work and testing them throughout your application’s lifecycle can save you a few headaches.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=c8793a50fbb8" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Apache Flink SQL: A Gentle Introduction]]></title>
            <link>https://medium.com/@ipolyzos_/streaming-sql-with-apache-flink-a-gentle-introduction-8a3af4fa3194?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/8a3af4fa3194</guid>
            <category><![CDATA[flink]]></category>
            <category><![CDATA[streaming]]></category>
            <category><![CDATA[flink-sql]]></category>
            <category><![CDATA[kafka]]></category>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Mon, 06 Feb 2023 00:00:27 GMT</pubDate>
            <atom:updated>2023-03-08T08:31:27.821Z</atom:updated>
            <content:encoded><![CDATA[<p>Flink SQL is a powerful high-level API for running queries on streaming (and batch) datasets. In this article we will see:</p><ol><li>Why it’s powerful and how it helps democratize Stream Processing and Analytics</li><li>Understand basic concepts around Streaming and Flink SQL</li><li>Setup Kafka and Flink Clusters and get started with Flink SQL</li><li>Understand different kinds of Processing Operators and Functions</li><li>Different ways of running Flink SQL Queries</li></ol><h3>1. Streaming (and Batch) SQL</h3><h3>1.1 Unified Batch and Streaming</h3><p>When we hear about SQL (referenced as batch SQL here) we think of the following tabular format you typically find in RDBMS, on which we operate and run computations — from simple projections (like SELECT and Filter) to Aggregations to Windowed Functions.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*cEzWph8Gaux1f1jY.png" /></figure><p>Batch SQL Queries operate on static data, i.e. on data stored on disk, already available and the results are considered complete.</p><blockquote>How can Tables relate with Streams?</blockquote><p>Let’s think of a data stream now</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*SkuIUTlnm3TlOXdZ.png" /></figure><p>A stream is basically an <em>unbounded</em> dataset of incoming events, i.e. it has no end. In the heart of a stream is the <em>append-only log</em>, i.e. each incoming event can be considered as a <em>row</em> that gets appended at the end of the log — similar to a database table.</p><p>In practice, if we follow this mental model we can think of a stream as a collection of snapshots of bounded datasets.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*QziJHnYAq9QS-cqC.png" /></figure><p>This is what enables Unified Batch and Streaming Architecture and allows the use of a single API — like Flink SQL — to handle both Batch and Streaming data; no underlying code changes are needed.</p><h3>1.2 Streaming SQL Semantics</h3><p>The rules are as follows:</p><ol><li>The Input tables are constantly changing and possibly unbounded</li></ol><ul><li><strong><em>Append Only Streams:</em> </strong>Keeps all the history in the stream. Every new event is an insert operation in the append-only log</li><li><strong><em>Changelog Streams:</em></strong> Keeps the most recent value (for some key).</li></ul><p>2. Query results are never final, continuously updated, and potentially unbounded</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*ke9H_PaoO2muirrU.png" /></figure><p>On the left side, we have our append-log (or a collection of bounded datasets as we discussed above) and we run a <em>Streaming SQL Query</em> on that table.</p><p>As we keep ingesting new events, they get appended as new rows to the log. These events yield changes, which results in the output table is continuously updated.</p><p>This is called a <em>Dynamic Table</em>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*t_UQJjVdbZ7sdfYV.png" /></figure><p>Flink consists of <em>catalogs</em> that hold metadata for databases, tables, functions and views.</p><p>A catalog can be non-persisted (In Memory Catalog) or persistent backed by an external system like the PostgresCatalog, the PulsarCatalog and the HiveCatalog.</p><p>For InMemory catalogs all metadata will be available only for the lifetime of the session.</p><p>In contrast, catalogs like PostgresCatalog enables users to connect the two systems and then Flink automatically references existing metadata by mapping them to its corresponding metadata.</p><p>For example, Flink can map Postgres tables to its own table automatically, and users don’t have to manually re-writing DDLs in Flink SQL.</p><p>Within the catalogs, you create databases and tables in these databases.</p><p>When creating a table its full table name identifier is: *..* and when a catalog and/or database is not specified the default ones are used.</p><h3>3. Environment Setup</h3><p>As a warmup exercise let’s start the Flink SQL CLI to run a few commands, but first, we need to have a Flink and a Kafka Cluster up and running.</p><p>Make sure you have <a href="https://docs.docker.com/compose/">docker-compose</a> installed on your machine, as we will use the following docker-compose.yaml file to set up the required clusters.</p><pre>version: &quot;3.7&quot;<br>services:<br>  zookeeper:<br>    image: bitnami/zookeeper:3.8.0<br>    ports:<br>      - &quot;2181:2181&quot;<br>    environment:<br>      ALLOW_ANONYMOUS_LOGIN: &quot;yes&quot;<br>  kafka:<br>    image: bitnami/kafka:3.3.1<br>    ports:<br>      - &quot;9092:9092&quot;<br>    environment:<br>      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181<br>      ALLOW_PLAINTEXT_LISTENER: &quot;yes&quot;<br>      KAFKA_ADVERTISED_PORT: 9092<br>      KAFKA_ADVERTISED_HOST_NAME: kafka<br>      KAFKA_LISTENERS: &gt;-<br>        INTERNAL://:29092,EXTERNAL://:9092<br>      KAFKA_ADVERTISED_LISTENERS: &gt;-<br>        INTERNAL://kafka:29092,EXTERNAL://localhost:9092<br>      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: &gt;-<br>        INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT<br>      KAFKA_INTER_BROKER_LISTENER_NAME: &quot;INTERNAL&quot;<br>    depends_on:<br>      - zookeeper<br>  jobmanager:<br>    build: .<br>    container_name: jobmanager<br>    ports:<br>      - &quot;8081:8081&quot;<br>    command: jobmanager<br>    environment:<br>      - |<br>        FLINK_PROPERTIES=<br>        jobmanager.rpc.address: jobmanager<br>  taskmanager:<br>    build: .<br>    depends_on:<br>      - jobmanager<br>    command: taskmanager<br>    deploy:<br>      replicas: 1<br>    environment:<br>      - |<br>        FLINK_PROPERTIES=<br>        jobmanager.rpc.address: jobmanager<br>        taskmanager.numberOfTaskSlots: 1</pre><h3>3.1 The Flink SQL Client</h3><p>Run docker-compose up, wait for a few seconds and your clusters should be up and running.</p><p>Let’s start the Flink SQL CLI by running docker exec -it jobmanager ./bin/sql-client.sh and then execute the following commands as a warmup with the SQL client:</p><pre>Flink SQL&gt; SHOW CATALOGS;<br>+-----------------+<br>|    catalog name |<br>+-----------------+<br>| default_catalog |<br>+-----------------+<br>1 row in set<br><br>// Create a new Database<br>Flink SQL&gt; CREATE DATABASE mydbl<br><br>Flink SQL&gt; SHOW DATABASES;<br>+------------------+<br>|    database name |<br>+------------------+<br>| default_database |<br>|            mydbl |<br>+------------------+<br>2 rows in set<br><br>Flink SQL&gt; use mydbl;<br>[INFO] Execute statement succeed.<br><br>// currently we have no tables<br>Flink SQL&gt; SHOW TABLES;<br>Empty set<br><br>// a truncated output of some available functions.<br>Flink SQL&gt; SHOW FUNCTIONS;<br>+-------------------------------+<br>|                 function name |<br>+-------------------------------+<br>|             AGG_DECIMAL_MINUS |<br>|              AGG_DECIMAL_PLUS |<br>|                ARRAY_CONTAINS |<br>|                      COALESCE |<br>|             CURRENT_WATERMARK |<br>|                      GREATEST |<br>|                        IFNULL |<br>|                       IS_JSON |<br>|                    JSON_ARRAY |<br>|  JSON_ARRAYAGG_ABSENT_ON_NULL |<br>|    JSON_ARRAYAGG_NULL_ON_NULL |<br>|                   JSON_EXISTS |<br>|                   JSON_OBJECT |<br>| JSON_OBJECTAGG_ABSENT_ON_NULL |<br>|   JSON_OBJECTAGG_NULL_ON_NULL |<br>|                    JSON_QUERY |<br>|                   JSON_STRING |<br>|                    JSON_VALUE |<br>|                         LEAST |<br>|              SOURCE_WATERMARK |</pre><p>It’s time now to get into some interesting stuff, but before that let’s also create the Kafka topics we will be using for our examples.</p><pre>docker exec -it depths-of-flink-kafka-1 kafka-topics.sh --create \<br>  --topic sensor.info \<br>  --partitions 1 \<br>  --config cleanup.policy=compact \<br>  --bootstrap-server localhost:9092<br><br>docker exec -it depths-of-flink-kafka-1 kafka-topics.sh --create \<br>  --topic sensor.readings \<br>  --partitions 3 \<br>  --bootstrap-server localhost:9092<br>  <br>docker exec -it depths-of-flink-kafka-1 kafka-topics.sh --bootstrap-server localhost:9092 --describe<br><br>------- Output -------<br>Topic: sensor.info TopicId: zFY47WiRS721XIUik2nRBg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact<br> Topic: sensor.info Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001<br> <br>Topic: sensor.readings TopicId: HGvGHOeKQQCxG3cly2R7Lw PartitionCount: 3 ReplicationFactor: 1 Configs:<br> Topic: sensor.readings Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001<br> Topic: sensor.readings Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001<br> Topic: sensor.readings Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001</pre><h3>3.2 Create Tables</h3><p>Let’s go back to our Flink SQL cli and the first thing we need is some tables to work with.</p><p>We will be using the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/">Kafka Flink SQL connector</a> to read sensor information and sensor readings from the two Kafka topics we already created. The following block shows how to create a table from a Kafka topic.</p><p><em>Note:</em> We will be using the default catalogs as well as the default database.</p><pre>CREATE TABLE readings (<br>  sensorId STRING,<br>  reading DOUBLE,<br>  eventTime_ltz AS TO_TIMESTAMP_LTZ(`timestamp`, 3),<br>  `ts` TIMESTAMP(3) METADATA FROM &#39;timestamp&#39;,<br>  `timestamp` BIGINT,<br>    WATERMARK FOR eventTime_ltz AS eventTime_ltz - INTERVAL &#39;30&#39; SECONDS<br>) WITH (<br>  &#39;connector&#39; = &#39;kafka&#39;,<br>  &#39;topic&#39; = &#39;sensor.readings&#39;,<br>  &#39;properties.bootstrap.servers&#39; = &#39;kafka:29092&#39;,<br>  &#39;properties.group.id&#39; = &#39;group.sensor.readings&#39;,<br>  &#39;format&#39; = &#39;json&#39;,<br>  &#39;scan.startup.mode&#39; = &#39;earliest-offset&#39;,<br>  &#39;json.timestamp-format.standard&#39; = &#39;ISO-8601&#39;,<br>  &#39;json.fail-on-missing-field&#39; = &#39;false&#39;,<br>  &#39;json.ignore-parse-errors&#39; = &#39;true&#39;<br>);<br>  <br>  <br>  Flink SQL&gt; DESCRIBE readings;<br>+---------------+----------------------------+------+-----+-------------------------------------+----------------------------------------+<br>|          name |                       type | null | key |                              extras |                              watermark |<br>+---------------+----------------------------+------+-----+-------------------------------------+----------------------------------------+<br>|      sensorId |                     STRING | TRUE |     |                                     |                                        |<br>|       reading |                     DOUBLE | TRUE |     |                                     |                                        |<br>| eventTime_ltz | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE |     | AS TO_TIMESTAMP_LTZ(`timestamp`, 3) | `eventTime_ltz` - INTERVAL &#39;30&#39; SECOND |<br>|            ts |               TIMESTAMP(3) | TRUE |     |           METADATA FROM &#39;timestamp&#39; |                                        |<br>|     timestamp |                     BIGINT | TRUE |     |                                     |                                        |<br>+---------------+----------------------------+------+-----+-------------------------------------+----------------------------------------+<br>5 rows in set</pre><p>The <strong><em>CREATE TABLE</em></strong> syntax consists of column definitions, watermarks and connector properties (more details <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table">here</a>).</p><p>We can observe the following column types in Flink SQL:</p><ol><li><em>Physical (or regular) columns</em></li><li><em>Metadata columns:</em> like the ts column in our statement that is basically Kafka metadata for accessing the timestamp from a Kafka Record.</li><li><em>Computed columns:</em> virtual columns like the <em>eventTime_ltz</em> in our statement, which is a formatted timestamp derived from our timestamp BIGINT column. Virtual Columns can reference other columns, perform simple computations (like 5 * reading) or use built-in functions</li></ol><p><em>Note:</em> Specifying Time attributes (here <em>eventTime_ltz</em>) and watermarks is what allows us to operate properly on even time and also set constraints on temporal operators, which we will see shortly.</p><p>Let’s also create a table for our sensor information topic.</p><pre>CREATE TABLE sensors (<br>  sensorId STRING,<br>  latitude String,<br>  longitude String,<br>  sensorType STRING,<br>  generation STRING,<br>  deployed BIGINT<br>) WITH (<br>  &#39;connector&#39; = &#39;kafka&#39;,<br>  &#39;topic&#39; = &#39;sensor.info&#39;,<br>  &#39;properties.bootstrap.servers&#39; = &#39;kafka:29092&#39;,<br>  &#39;properties.group.id&#39; = &#39;group.sensor.info&#39;,<br>  &#39;format&#39; = &#39;json&#39;,<br>  &#39;scan.startup.mode&#39; = &#39;earliest-offset&#39;,<br>  &#39;json.timestamp-format.standard&#39; = &#39;ISO-8601&#39;,<br>  &#39;json.fail-on-missing-field&#39; = &#39;false&#39;,<br>  &#39;json.ignore-parse-errors&#39; = &#39;true&#39;<br>);<br>  <br>Flink SQL&gt; DESCRIBE sensors;<br>+------------+--------+------+-----+--------+-----------+<br>|       name |   type | null | key | extras | watermark |<br>+------------+--------+------+-----+--------+-----------+<br>|   sensorId | STRING | TRUE |     |        |           |<br>|   latitude | STRING | TRUE |     |        |           |<br>|  longitude | STRING | TRUE |     |        |           |<br>| sensorType | STRING | TRUE |     |        |           |<br>| generation | STRING | TRUE |     |        |           |<br>|   deployed | BIGINT | TRUE |     |        |           |<br>+------------+--------+------+-----+--------+-----------+</pre><p>You might wonder why there is no time attribute or watermark specified on this topic.</p><p>The sensor information is basically used to hold <em>state</em> — i.e it’s a <em>changelog stream</em> backed by a compacted Kafka topic (compared to an <em>append-only stream</em>) because we are only interested in keeping the latest sensor information to perform event enrichment later (when we will discuss joins.)</p><p>At this point, we have two tables created within our default database.</p><pre>Flink SQL&gt; SHOW TABLES;<br>&gt;<br>+------------+<br>| table name |<br>+------------+<br>|   readings |<br>|    sensors |<br>+------------+<br>2 rows in set</pre><p>Next up we need some data in our topics (and tables) to work with. I will use the following Producer code that can be found <a href="https://github.com/polyzos/select-star-from-streams/blob/main/src/main/kotlin/io/ipolyzos/producers/SensorProducer.kt">here</a> to generate information for <strong><em>10 sensors</em></strong> and <strong><em>10.000 readings</em></strong>. You can modify the code to ingest more data if you want.</p><p><em>Note:</em> I skipped adding the implementation here since our focus is on Flink SQL.</p><h3>3.3 Run our First Query</h3><p>Let’s run the following query and see an output similar to the following with information for our 10 sensors.</p><pre>SELECT * FROM sensors;<br><br>------- Sample Output -------<br><br>+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+<br>| op |                       sensorId |                       latitude |                      longitude |                     sensorType |                     generation |             deployed |<br>+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+<br>| +I |                              1 |                      83.964156 |                      47.567865 |                      PROXIMITY |                              0 |        1610920880369 |<br>| +I |                              2 |                      70.211600 |                      87.285699 |                      PROXIMITY |                              0 |        1669725385766 |<br>| +I |                              3 |                     -78.843922 |                     -159.70556 |                    TEMPERATURE |                              2 |        1645179854537 |<br>| +I |                              4 |                      77.304485 |                      102.32052 |                      PROXIMITY |                              2 |        1596841078647 |<br>| +I |                              5 |                     -11.876575 |                      57.576944 |                      PROXIMITY |                              3 |        1667359403631 |<br>| +I |                              6 |                      59.134005 |                     -159.71549 |                      PROXIMITY |                              0 |        1604501300175 |<br>| +I |                              7 |                     -16.478654 |                      141.49999 |                    TEMPERATURE |                              1 |        1614461145113 |<br>| +I |                              8 |                     -64.380075 |                      164.37186 |                      PROXIMITY |                              2 |        1673640554153 |<br>| +I |                              9 |                     -33.693995 |                     -2.4277239 |                    TEMPERATURE |                              3 |        1645551899832 |<br>| +I |                             10 |                     -88.115880 |                      11.500759 |                      PROXIMITY |                              2 |        1623336384463 |</pre><h3>4. Operators</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*2qrbSYGf8bSrr4r5.png" /></figure><h3>4.1 Stateless Operators</h3><p>Stateless Operators are the simplest and include common operations like Projections and Filters that require no state.</p><p><strong><em>Query</em> Only Sensor Readings &gt; 40</strong></p><pre>SELECT  sensorId, reading, eventTime_ltz<br>FROM    readings<br>WHERE   reading &gt; 40<br><br>------- Sample Output -------<br><br>+----+--------------------------------+--------------------------------+-------------------------+<br>| op |                       sensorId |                        reading |           eventTime_ltz |<br>+----+--------------------------------+--------------------------------+-------------------------+<br>| +I |                              1 |                          40.18 | 2023-01-30 20:17:45.297 |<br>| +I |                              1 |                          41.87 | 2023-01-30 20:17:45.334 |<br>| +I |                              1 |                          41.72 | 2023-01-30 20:17:45.577 |<br>| +I |                              8 |                          40.91 | 2023-01-30 20:17:45.825 |<br>| +I |                              5 |                          40.94 | 2023-01-30 20:17:46.030 |<br>| +I |                              7 |                          40.73 | 2023-01-30 20:17:46.164 |<br>| +I |                              5 |                          40.13 | 2023-01-30 20:17:46.468 |<br>| +I |                              5 |                          40.22 | 2023-01-30 20:17:46.495 |<br>| +I |                              7 |                          40.02 | 2023-01-30 20:17:46.890 |<br>| +I |                              7 |                          40.92 | 2023-01-30 20:17:46.971 |</pre><h3>4.2 Materializing Operators</h3><p>Materializing Operators perform computations that are not constrained by temporal conditions and thus never complete — the input/output records are constantly updated or deleted.</p><p>Consider a <em>GROUP BY sensorId operation</em>. The query needs to maintain the <em>state</em> for every sensorId, in order to update the results accordingly each time a new event for a sensor arrives.</p><p>This means the state is kept around forever and constantly growing with every new sensor-generated event.</p><p><strong><em>Query:</em> Total readings per Sensor</strong></p><pre>SELECT   sensorId, COUNT(reading) as totalReadings<br>FROM     readings<br>GROUP BY sensorId<br><br>------- Sample Output -------<br><br>+----+--------------------------------+----------------------+<br>| op |                       sensorId |        totalReadings |<br>+----+--------------------------------+----------------------+<br>| +I |                              4 |                    1 |<br>| -D |                              4 |                    1 |<br>| +I |                              6 |                    1 |<br>| -D |                              6 |                    1 |<br>| +I |                              4 |                    2 |<br>| -D |                              4 |                    2 |<br>| +I |                              6 |                    2 |<br>| -D |                              6 |                    2 |<br>| +I |                              4 |                    6 |<br>| -D |                              4 |                    6 |<br>| +I |                              6 |                    3 |<br>| -D |                              6 |                    3 |<br>| +I |                              4 |                    7 |<br>| -D |                              4 |                    7 |<br>| +I |                              6 |                    7 |<br>| -D |                              6 |                    7 |<br>| +I |                              4 |                    8 |<br>| -D |                              4 |                    8 |</pre><p><em>Notice the op column</em> — when we have an update for a given <em>key</em> the previous row is deleted and updated to the new value. For example:</p><pre>| +I |                              4 |                    1 |<br>| -D |                              4 |                    1 |<br>| +I |                              4 |                    2 |</pre><p>Now consider a query that joins the <em>sensor information</em> and <em>sensor readings</em> tables.</p><p><strong><em>Query:</em> Enrich Sensor readings with Sensor Information (Regular Join)</strong></p><pre>SELECT <br>  sensors.sensorId, <br>  reading, <br>  eventTime_ltz,<br>  latitude,<br>  longitude,<br>  sensorType<br>FROM readings<br>  JOIN sensors ON readings.sensorId = sensors.sensorId<br>  <br>------- Sample Output -------<br><br>+----+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+<br>| op |                       sensorId |                        reading |           eventTime_ltz |                       latitude |                      longitude |                     sensorType |<br>+----+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+<br>| +I |                              1 |                          40.18 | 2023-01-30 20:17:45.297 |                      83.964156 |                      47.567865 |                      PROXIMITY |<br>| +I |                              1 |                          38.95 | 2023-01-30 20:17:45.301 |                      83.964156 |                      47.567865 |                      PROXIMITY |<br>| +I |                              1 |                          41.87 | 2023-01-30 20:17:45.334 |                      83.964156 |                      47.567865 |                      PROXIMITY |<br>| +I |                              1 |                          39.92 | 2023-01-30 20:17:45.375 |                      83.964156 |                      47.567865 |                      PROXIMITY |<br>| +I |                              1 |                          39.28 | 2023-01-30 20:17:45.408 |                      83.964156 |                      47.567865 |                      PROXIMITY |<br>| +I |                              7 |                          39.99 | 2023-01-30 20:17:45.443 |                     -16.478654 |                      141.49999 |                    TEMPERATURE |<br>| +I |                              1 |                          38.27 | 2023-01-30 20:17:45.551 |                      83.964156 |                      47.567865 |                      PROXIMITY |<br>| +I |                              7 |                          38.46 | 2023-01-30 20:17:45.553 |                     -16.478654 |                      141.49999 |                    TEMPERATURE |<br>| +I |                              1 |                          41.72 | 2023-01-30 20:17:45.577 |                      83.964156 |                      47.567865 |                      PROXIMITY |<br>| +I |                              7 |                          40.73 | 2023-01-30 20:17:46.164 |                     -16.478654 |                      141.49999 |                    TEMPERATURE |</pre><p><strong><em>Note: </em></strong><em>For data enrichment between tables backed by Kafka topics, there is another approach that can leverage the </em><strong><em>upsert-kafka connector </em></strong><em>and use Temporal Joins, but wanted to keep it simple in this blog post — as there is more theory involved.</em></p><p>Both tables are kept in memory which means the state will keep growing for both sides of the joins and thus it’s important to expire state by using a ttl.</p><p>You can achieve this using <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/config/#table-exec-state-ttl">table.exec.state.ttl</a>. Keep in mind though that there is a tradeoff between accuracy and state size, as expiring state too early might result in incomplete results.</p><p><em>If you need to keep a really large state around you will need to configure Flink to use Rocksdb as a state backend,</em></p><p>Unlike Materializing Operators, Temporal Operators (that we will see next) will automatically expire state that is no longer useful, by using time constraints — although you may need to set an idle state retention interval</p><p>For example, a time Window that is considered <em>complete</em> doesn’t need to be kept around in the state.</p><h3>4.3 Temporal Operators</h3><p>Temporal Operators are constrained by time. Records and computations are associated with a temporal condition, i.e a time window of 30 seconds and accept new records — previously added records can not be updated or deleted.</p><p>As we previously mentioned they hold records and/or results in state, but only until they are no longer required.</p><p><strong><em>Query:</em> Find the 1 minute average value for each sensor</strong></p><pre>SELECT<br>  sensorId,<br>  window_start,<br>  window_end,<br>  COUNT(reading) AS totalReadings,<br>  LISTAGG(CAST(reading AS STRING)) AS readingsList,<br>  ROUND(AVG(reading),1) as averageReading<br>FROM TABLE(TUMBLE(TABLE readings, DESCRIPTOR(eventTime_ltz), INTERVAL &#39;1&#39; MINUTE))<br>GROUP BY sensorId, window_start, window_end<br><br>------- Sample Output -------<br><br>+----+--------------------------------+-------------------------+-------------------------+----------------------+--------------------------------+--------------------------------+<br>| op |                       sensorId |            window_start |              window_end |        totalReadings |                   readingsList |                 averageReading |<br>+----+--------------------------------+-------------------------+-------------------------+----------------------+--------------------------------+--------------------------------+<br>| +I |                              4 | 2023-01-30 16:44:00.000 | 2023-01-30 16:45:00.000 |                   17 | 40.59,40.17,39.98,39.66,40.... |                           40.1 |<br>| +I |                              1 | 2023-01-30 16:44:00.000 | 2023-01-30 16:45:00.000 |                   26 | 40.23,38.84,36.6,39.31,39.9... |                           39.6 |<br>| +I |                              4 | 2023-01-30 16:45:00.000 | 2023-01-30 16:46:00.000 |                  440 | 40.31,43.09,40.19,40.35,39.... |                           39.9 |<br>| +I |                              1 | 2023-01-30 16:45:00.000 | 2023-01-30 16:46:00.000 |                  469 | 41.03,40.12,40.7,38.88,40.8... |                           40.0 |<br>| +I |                              1 | 2023-01-30 16:46:00.000 | 2023-01-30 16:47:00.000 |                  469 | 39.49,39.42,40.09,40.66,38.... |                           39.9 |<br>| +I |                              4 | 2023-01-30 16:46:00.000 | 2023-01-30 16:47:00.000 |                  447 | 40.44,40.98,39.79,39.21,40.... |                           40.0 |<br>| +I |                              4 | 2023-01-30 16:47:00.000 | 2023-01-30 16:48:00.000 |                  459 | 36.82,40.19,39.66,39.83,42.... |                           40.0 |<br>| +I |                              1 | 2023-01-30 16:47:00.000 | 2023-01-30 16:48:00.000 |                  494 | 40.45,39.37,41.69,40.41,39.... |                           40.1 |<br>| +I |                              1 | 2023-01-30 16:48:00.000 | 2023-01-30 16:49:00.000 |                  494 | 40.35,39.02,41.26,37.56,41.... |                           40.0 |<br>| +I |                              4 | 2023-01-30 16:48:00.000 | 2023-01-30 16:49:00.000 |                  447 | 41.15,39.46,38.72,37.01,39.... |                           40.0 |<br>| +I |                              2 | 2023-01-30 16:44:00.000 | 2023-01-30 16:45:00.000 |                   20 | 39.12,41.12,41.68,38.75,39.... |                           40.3 |<br>| +I |     </pre><p><strong><em>Query:</em> Find reading statistics (max, min, average and stddev) for all readings per sensorId over the previous minute.</strong></p><pre>SELECT <br>  eventTime_ltz,<br>  sensorId, <br>  reading,<br>  ROUND(AVG(reading) OVER minuteInterval, 1) AS minuteAvgTemp,<br>  MAX(reading) OVER minuteInterval AS minuteMinTemp,<br>  MIN(reading) OVER minuteInterval AS minuteMaxTemp,<br>  ROUND(STDDEV(reading) OVER minuteInterval, 5) AS minuteStdevTemp<br>FROM readings <br>WINDOW minuteInterval AS (<br>  PARTITION BY sensorId<br>  ORDER BY eventTime_ltz<br>  RANGE BETWEEN INTERVAL &#39;1&#39; MINUTE PRECEDING AND CURRENT ROW <br>);</pre><h3>5. (Temporary) Views</h3><p>As we mentioned Flink SQL is quite rich and provides a lot of functions — so covering everything in this article is impossible. One more useful feature I want to mention is <em>Temporary Views</em>.</p><p>Similar to database Views it can be used to store the results of a query. A view is not physically materialized, but instead it is run every time the view is referenced in a query. Temporary Views are very useful to structure and decompose more complicated queries or reuse queries within other queries.</p><p>Once more let us better illustrate this with an example.</p><p>Following our previous query that calculates statistics we can make use of Temporary Views to store the output of the query and reuse it the calculated statistics to filter readings; for example find <em>readings &gt; average + 2 * standard deviation</em>.</p><p>That’s a simple example that can be used to build more sophisticated outlier detection logic.</p><pre>--- Create a Temporary View -- CREATE [TEMPORARY] VIEW <br>CREATE VIEW readings_stats AS <br>SELECT <br>  eventTime_ltz,<br>  sensorId, <br>  reading,<br>  ROUND(AVG(reading) OVER minuteInterval, 1) AS minuteAvgTemp,<br>  MAX(reading) OVER minuteInterval AS minuteMinTemp,<br>  MIN(reading) OVER minuteInterval AS minuteMaxTemp,<br>  ROUND(STDDEV(reading) OVER minuteInterval, 5) AS minuteStdevTemp<br>FROM readings <br>WINDOW minuteInterval AS (<br>  PARTITION BY sensorId<br>  ORDER BY eventTime_ltz<br>  RANGE BETWEEN INTERVAL &#39;1&#39; MINUTE PRECEDING AND CURRENT ROW <br>);<br><br>--- Run a filter query on the results to get the readings we want<br>SELECT <br>  sensorId,<br>  reading,<br>  ROUND(minuteAvgTemp + 2 * minuteStdevTemp, 2) as threshold <br>FROM readings_stats<br>WHERE reading &gt; minuteAvgTemp + 2 * minuteStdevTemp<br><br>------- Sample Output -------<br>+----+--------------------------------+--------------------------------+--------------------------------+<br>| op |                       sensorId |                        reading |                      threshold |<br>+----+--------------------------------+--------------------------------+--------------------------------+<br>| +I |                              5 |                           41.6 |                          41.42 |<br>| +I |                              3 |                           42.1 |                          41.69 |<br>| +I |                              5 |                           41.6 |                          41.52 |<br>| +I |                              3 |                           42.6 |                          42.52 |<br>| +I |                              5 |                           41.7 |                          41.69 |<br>| +I |                              3 |                           41.5 |                          41.38 |<br>| +I |                              7 |                           41.1 |                          41.07 |<br>| +I |                              4 |                           42.5 |                          41.97 |<br>| +I |                              1 |                           41.3 |                           41.2 |<br>| +I |                              1 |                           41.8 |                          41.74 |</pre><h3>6. The TableEnvironment and SQL Queries</h3><p>Up to this point, we have been using the Flink SQL cli to submit sql queries. For production cases though — or if you are running in environments like Kubernetes for example using the <a href="https://github.com/apache/flink-kubernetes-operator">Flink Operator</a>, you might need other ways to achieve this.</p><p><em>Note 1:</em> Flink <em>1.16</em> introduced <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/"><em>Flink SQL Gateway</em></a> that you can use to submit queries.</p><p>Next, we will see how we can use the TableEnvironment to run such queries through code.</p><p><em>Note 2:</em> If you are running on kubernetes using the Flink Operator you might wanna also check these example <a href="https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example">here</a></p><p><em>Note 3:</em> Seeing the sample code below might seem weird as I’m using Kotlin. Whether you are using Java, Kotlin or Scala should be exactly the same — I’m just using kotlin these days and because Java 17 is unfortunately not supported yet at Flink I wanted to leverage Kotlin for multiline strings to write my queries.</p><h3>6.1 Running SQL Queries with Code</h3><pre>class EnrichmentStream {<br>    private val checkpointsDir  = &quot;file://${System.getProperty(&quot;user.dir&quot;)}/checkpoints/&quot;<br>    private val rocksDBStateDir = &quot;file://${System.getProperty(&quot;user.dir&quot;)}/state/rocksdb/&quot;<br><br>    companion object {<br>        @JvmStatic<br>        fun main(args: Array&lt;String&gt;) {<br>            EnrichmentStream().runStream()<br>        }<br>    }<br><br>    fun runStream() {<br>        val environment = StreamExecutionEnvironment<br>            .createLocalEnvironmentWithWebUI(Configuration())<br><br>        environment.parallelism = 3<br><br>        // Checkpoint Configurations<br>        environment.enableCheckpointing(5000)<br>        environment.checkpointConfig.minPauseBetweenCheckpoints = 100<br>        environment.checkpointConfig.setCheckpointStorage(checkpointsDir)<br><br>        val stateBackend = EmbeddedRocksDBStateBackend()<br>        stateBackend.setDbStoragePath(rocksDBStateDir)<br>        environment.stateBackend = stateBackend<br><br>        environment.checkpointConfig.externalizedCheckpointCleanup =<br>            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION<br><br>        // Configure Restart Strategy<br>        environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))<br><br>        val tableEnvironment = StreamTableEnvironment.create(environment)<br><br>        // Run some SQL queries to check the existing Catalogs, Databases and Tables<br>        tableEnvironment<br>            .executeSql(&quot;SHOW CATALOGS&quot;)<br>            .print()<br><br>        tableEnvironment<br>            .executeSql(&quot;SHOW DATABASES&quot;)<br>            .print()<br><br>        tableEnvironment<br>            .executeSql(&quot;SHOW TABLES&quot;)<br>            .print()<br><br>        tableEnvironment<br>            .executeSql(Queries.CREATE_SENSORS_TABLE)<br>            .print()<br><br>        tableEnvironment<br>            .executeSql(Queries.CREATE_READINGS_TABLE)<br>            .print()<br><br>        tableEnvironment<br>            .executeSql(&quot;SHOW TABLES&quot;)<br>            .print()<br><br>        tableEnvironment<br>            .executeSql(Queries.JOIN_SENSOR_READINGS_WITH_INFO_QUERY)<br>            .print()<br>    }<br>}</pre><p>The TableEnvironment is the entrypoint for Table API and SQL integration and is responsible for:</p><ul><li>Registering a Table in the internal catalog</li><li>Registering catalogs</li><li>Loading pluggable modules</li><li>Executing SQL queries</li><li>Registering a user-defined (scalar, table, or aggregation) function</li><li>Converting between DataStream and Table (in case of StreamTableEnvironment)</li></ul><p>The code snippet illustrated above runs the <em>Join Operations</em> we saw before (you can find the queries <a href="https://github.com/polyzos/depths-of-flink/blob/main/src/main/kotlin/io/ipolyzos/compute/Queries.kt">here</a>) and the output should be similar.</p><h3>6.2 A Short Discussion: Checkpoints and State</h3><p>As extras you might notice two things:</p><ol><li>Checkpoint is enabled</li><li>Rocksdb as a state backend is enabled.</li></ol><p>We are not going into detail about these concepts here as it’s a story for another day. For those interested, if you run the code above you should see two output directories, checkpoints and state.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/0*W6ZERq2UL32JhgDl.png" /></figure><p>Since we are consuming (at most) from 3 kafka partitions, I’m using a parallelism of 3 as you might have also noticed in the code and so the stateful operator that runs the join will store state for each of the three tasks.</p><p>You can use a sample code file I have added <a href="https://github.com/polyzos/depths-of-flink/blob/main/src/main/kotlin/io/ipolyzos/inspect/RocksDBInspect.kt">here</a> to see what gets written in the state.</p><pre>Processing state of operator: job_848920cbd5178c2a525827b244d1e530_op_StreamingJoinOperator_8b481b930a189b6b1762a9d95a61ada1__2_3__uuid_97fb2e4b-e6f4-413e-98a5-0c4ad24ed20e}<br> Column Family &#39;right-records&#39; has 3 entries.<br> Column Family &#39;left-records&#39; has 670 entries.<br>Processing state of operator: job_848920cbd5178c2a525827b244d1e530_op_StreamingJoinOperator_8b481b930a189b6b1762a9d95a61ada1__3_3__uuid_46df8b7d-d015-4cc3-ad53-b86b7ecb0f1e}<br> Column Family &#39;right-records&#39; has 4 entries.<br> Column Family &#39;left-records&#39; has 879 entries.<br>Processing state of operator: job_848920cbd5178c2a525827b244d1e530_op_StreamingJoinOperator_8b481b930a189b6b1762a9d95a61ada1__1_3__uuid_6166550d-6e03-489f-94ab-4004ea7ec50e}<br> Column Family &#39;right-records&#39; has 3 entries.<br> Column Family &#39;left-records&#39; has 521 entries.</pre><p>You can see that we have two <a href="https://github.com/EighteenZi/rocksdb_wiki/blob/master/Column-Families.md">column families</a> — one for the left side of the join (sensor readings) and one for the right (sensor information). Notice for example right-records since we have <em>10 sensor ids</em> these keys are distributed among the three tasks.</p><p>Also, note as discussed previously since there are no time constraints you will notice the state growing indefinitely (unless table.exec.state.ttl is configured).</p><p>I hope I sparked some interest in those curious that want to dive deeper themselves. In this article though we will conclude with this high-level overview.</p><h3>7. Wrap Up</h3><p>Flink is a powerful Stateful Stream Processing engine, enabling Unified Batch and Streaming architectures.</p><p>Flink SQL is a high-level API, using the well-known SQL syntax making it easy for everyone — like scientists or non-JVM (or python) engineers to leverage the power of Stream Processing with Apache Flink.</p><p>Flink SQL is extremely rich and supports a wide variety of built-in operators and functions</p><p>Unless there are some really sophisticated use cases (that can not be expressed in SQL and need low-level Datastream API access) Flink SQL is the best candidate for Stream Processing</p><p><em>Originally published at </em><a href="https://blog.rockthejvm.com/flink-sql-introduction/"><em>https://blog.rockthejvm.com</em></a><em> on February 6, 2023.</em></p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=8a3af4fa3194" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[In the Land of Streams — Kafka Part 4: My Cluster is Lost!! — Embracing Failure]]></title>
            <link>https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-4-my-cluster-is-lost-embracing-failure-916cec7f06f0?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/916cec7f06f0</guid>
            <category><![CDATA[streaming]]></category>
            <category><![CDATA[cloud]]></category>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[data]]></category>
            <category><![CDATA[kotlin]]></category>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Tue, 13 Dec 2022 14:24:05 GMT</pubDate>
            <atom:updated>2022-12-14T06:23:20.430Z</atom:updated>
            <content:encoded><![CDATA[<h3>In the Land of Streams — Kafka Part 4: My Cluster is Lost!! — Embracing Failure</h3><h4>A Kafka Streaming Ledger</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/800/1*EEITZWkhLusPO4oWK1oQFA.png" /><figcaption><a href="https://www.vecteezy.com/free-vector/squirrel-cartoon">https://www.vecteezy.com/free-vector/squirrel-cartoon</a></figcaption></figure><p><strong>The Blob post series consists of the following parts:<br></strong>- <strong>Part 1:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-1-a-producers-message-ba92518c856e">A Producer’s Message</a> <br>- <strong>Part 2:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-2-the-rise-of-the-consumers-2b750d3e6bf6">The Rise of the Consumers </a><br>- <strong>Part 3:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-3-offsets-and-how-to-handle-them-3417016cfb05">Offsets and how to handle them</a><br>- <strong>Part 4:</strong> My Cluster is Lost!! — Embracing Failure (this blog post)</p><p>In the previous parts of the series, we have looked at how things work mainly from the application’s perspective.</p><p>In this final part, we will focus a little bit on the infrastructure part.</p><p>Typically enterprises use Kafka as the backbone of their whole data platform. This means it accommodates a wide range of business-critical workloads and sooner or later no matter how well-prepared you are, things are doomed to fail. This is why it is important to embrace failure when designing (<a href="https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing">yes fallacies are real</a>) your overall architecture and think in terms of backups and disaster recovery.</p><p>In the previous parts, I mentioned (and used) <a href="https://aiven.io/kafka">Aiven</a> for Kafka. Aiven is a complete future-proof data platform and what I really like is how easy it is to set up disaster recovery solutions whether it’s on one or multiple cloud providers.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/907/1*AcMviD8GiNJAWkLv3_oCug.png" /></figure><p>Let’s assume you want to implement a Multi-Cloud Disaster Recovery Solution with Aiven for Kafka. There are two things to highlight here:</p><p>1. As Aiven offers<strong><em> no cloud vendor lock-in </em></strong>it allows to easily deploy clusters across different clouds.</p><p>2. For data replication to design Disaster Recovery solutions it uses <a href="https://docs.aiven.io/docs/products/kafka/kafka-mirrormaker.html">MirrorMaker2</a>.</p><p>The two main disaster recovery patterns are:</p><ol><li><strong>Active / Passive Pattern: <br></strong>uses a secondary cluster that acts as a backup</li><li><strong>Active / Active Pattern:</strong><br>uses two clusters replicating data between them</li></ol><p>Some data syncing/replication patterns can also account for disaster (since data is replicated as well)</p><ol><li><strong>Fan-out Pattern:</strong><br>data is replicated from one cluster to multiple clusters. <br><strong>Use Case: </strong>replicate data to different clouds and/or regions</li><li><strong>Aggregation Pattern:</strong><br>many edge clusters send data to a centralized cluster that acts as the aggregator<br><strong>Use Case: </strong>aggregate data to a centralized location; for example for data warehouse or operational</li><li><strong>Full-Mesh Pattern:</strong><br>many clusters send data to each other<br><strong>Use Case: </strong>different clusters operate in different regions/countries and data needs to become available as a whole in every region/country for operational requirements for example.</li></ol><p>The most common pattern (at least based on my own experience) is the Active/Passive — i.e having one cluster active that syncs its data to a secondary one that will become active in case the currently active one becomes unavailable (failover).</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*E-t9SAxZZjndFUP4zoM19g.png" /><figcaption>Data Replication between multi cloud clusters</figcaption></figure><p>An important thing to note for the Aiven services is that the overall communication between the different clouds takes place using an I<a href="https://en.wikipedia.org/wiki/IPsec">PSec Tunnel</a> to provide secure networking.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*bLdQopR6GrR-8phpvfr7vA.png" /><figcaption>Data syncing between the active and the failover clusters.</figcaption></figure><p>MirrorMaker 2 leverages the Connect framework to replicate data between Kafka clusters. You can deploy MirrorMaker2 either on AWS or GCP.</p><p>MM2 includes several new features, like:</p><ul><li>topic data and consumer group replication</li><li>topic configuration and ACLs replication</li><li>cross-cluster offsets synchronization</li><li>partitioning is preserved</li></ul><p>In the happy path, you have the producing and consuming applications operating on the Active cluster (on AWS in this case), and then <strong>boom </strong>the unhappy path reveals itself.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*sCFSrzGMfeMoHTOhRaNCCg.png" /><figcaption>AWS cluster becomes unavailable</figcaption></figure><p>The whole cluster goes down and the applications become unavailable.</p><p>When working with distributed systems and planning for disaster, one crucial question you and your team need to answer is — <strong>what disaster means for us?</strong></p><p>For example, your cluster might temporarily become unavailable due to some temporary networking issue, but after a few minutes time, everything is operational again. Is this kind of scenario acceptable for your business? Can you tolerate this or does your application needs to switch as soon as the cluster seems lost? Do you need to switch all your applications immediately?</p><p>Answering these kinds of questions will put you in a better position when designing your solution.</p><p>The last missing piece is how failover will actually look in practice. <br>One approach to that problem would be to wrap your application logic with some retry logic. <br>There are many good libraries out there that provide fault tolerance and resiliency like <a href="https://github.com/resilience4j/resilience4j">Resilience4j</a> for Java, <a href="https://arrow-kt.io/docs/fx/">Arrow Fx</a> for Kotlin, and <a href="https://zio.dev/">ZIO</a> for Scala (notice how all of them are functional libraries).</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*gd-Djf4ncIkuJ1gwT_iLdg.png" /></figure><p>As depicted in the snippet above, the goal here is — once the applications lose connectivity to the cluster, catch that, have the retry logic kick in, and eventually if needed fallback to the healthy cluster.</p><p><strong>Note: </strong>You might already be running hundreds of apps in production or have many different teams each creating their own apps. Adding resiliency to your app logic may be hard between implementation and coordination. Alternatives can be adding a LoadBalancer to handle the traffic or using Service Mesh technologies.</p><h4>Wrapping Up</h4><p>Failure is something that will happen sooner or later and you need to account for it. When designing disaster recovery solutions you might need to ask:</p><ol><li>What should be the definition of — my cluster is lost — within our business context?</li><li>How fast do my apps need to switch to a healthy cluster and be operational?</li><li>What is the best approach to implementing an application failover?</li></ol><p>This is the end of <strong>In the land of Streams with Kafka series</strong>.</p><p>Stay around for more StreamingLedger stories 👋😁</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=916cec7f06f0" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[In the Land of Streams — Kafka Part 3: Offsets and How to Handle Them]]></title>
            <link>https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-3-offsets-and-how-to-handle-them-3417016cfb05?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/3417016cfb05</guid>
            <category><![CDATA[streaming]]></category>
            <category><![CDATA[cloud]]></category>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[data]]></category>
            <category><![CDATA[kotlin]]></category>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Tue, 13 Dec 2022 14:23:31 GMT</pubDate>
            <atom:updated>2022-12-13T14:44:18.455Z</atom:updated>
            <content:encoded><![CDATA[<h3>In the Land of Streams — Kafka Part 3: Offsets and How to Handle Them</h3><h4>A Kafka Streaming Ledger</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/800/1*EEITZWkhLusPO4oWK1oQFA.png" /><figcaption><a href="https://www.vecteezy.com/free-vector/squirrel-cartoon">https://www.vecteezy.com/free-vector/squirrel-cartoon</a></figcaption></figure><p><strong>The Blob post series consists of the following parts:<br></strong>- <strong>Part 1:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-1-a-producers-message-ba92518c856e">A Producer’s Message</a><br>- <strong>Part 2:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-2-the-rise-of-the-consumers-2b750d3e6bf6">The Rise of the Consumers</a><br>- <strong>Part 3:</strong> Offsets and how to handle them (this blog post)<br>- <strong>Part 4:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-4-my-cluster-is-lost-embracing-failure-916cec7f06f0">My Cluster is Lost!! — Embracing Failure</a></p><p>This part aims to cover the following:</p><ol><li>What’s the role of offsets in Kafka</li><li>What are the caveats when working with offsets</li><li>Different approaching for handling offsets</li></ol><blockquote>It’s a cycle — the message lifecycle</blockquote><p>Up to this point, we have seen the whole message lifecycle in Kafka — PPC (Produce, Persist, Consume)</p><p>One thing really important though — especially when you need to trust your system provides the best guarantees when processing each message exactly once — is committing offsets.</p><p>Fetching messages from Kafka, processing them and marking them as processed, by actually providing such guarantees has a few pitfalls and is not provided out of the box.</p><p>This is what we will see next, i.e what do I need to take into account to get the best possible exactly-once processing guarantees out of my applications?</p><p><strong>Committing Offsets Scenarios</strong></p><p>We will take a look at a few scenarios for committing offsets and what caveats each approach might have.</p><p><strong>Scenario 1: Committing Offsets Automatically</strong></p><p>This is the default behavior with <em>enable.auto.commit </em>set to true. The caveat here is that the message is consumed and the offsets will be committed periodically, BUT this doesn’t mean the message has been successfully processed. If the message fails for some reason, its offset might have been committed and as far as Kafka is concerned that message has been processed successfully.</p><p><strong>Scenario 2: Committing Offsets Manually</strong></p><p>Setting <em>enable.auto.commit </em>to false takes Kafka consumers out of the <strong><em>“autopilot mode”</em></strong> and it’s up to the application to commit the offsets. This can be achieved by using the <strong><em>commitSync()</em></strong> or <strong><em>commitAsync() </em></strong>methods on the consumer API. <br>When committing offsets manually we can do so either when the whole batch returned from the <em>poll() </em>method has finished processing in which case all the offsets up to the highest one will be committed or we might want to commit after each individual message is done with it’s processing for even stronger guarantees.</p><p><strong>Commit/Message</strong></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*vCV4IPrgTxAmGT-bJzLu2A.png" /><figcaption>Committing offsets per message</figcaption></figure><p><strong>Commit/Batch</strong></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*vePl2RZUbOofPkXH-Akorg.png" /><figcaption>Committing offsets per batch</figcaption></figure><p>This gives us control over how message offsets are committed and we can trust that we will wait for the actual processing to finish before committing the offset. <br>For those who want to account for (or at least try to) every <em>unhappy path</em> there is also the possibility that things fail in the commit process itself. In this case the message will be reprocessed</p><p><strong>Scenario 3: Idempotency with External Storage</strong></p><p>You can use an external data store and keep track of the offsets there (for example like <a href="https://aiven.io/cassandra">cassandra</a>).</p><p>Consuming messages and using something like a transaction for both processing the message as well as committing the offsets will guarantee that either both will succeed or fail and thus idempotency is ensured.</p><p>One thing to note here is that offsets are now stored in an external datastore. When starting a new consumer or a rebalancing takes place you need to make sure your consumer fetches the offsets from the external datastore.</p><p>One way to achieve this can be adding a <strong>ConsumerRebalanceListener </strong>and when <strong><em>onPartitionsRevoked</em> </strong>and<strong> <em>onPartitionsAssigned</em> </strong>methods are called<strong> </strong>store (commit) or retrieve the offsets from the external datastore.</p><h3>Wrapping Up</h3><p>In this post, we saw the importance of offsets. As key takeaways here:</p><ol><li>Consuming a message is different from actually processing it successfully</li><li>Auto-committing offsets can have a negative impact on your application guarantees</li></ol><p>and how consuming the messages from actually processing is different.</p><p>We also reviewed a few different of how you might want to approach committing offsets back to Kafka and different caveats you might encounter with each approach.</p><p><strong>Check Next:</strong><a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-4-my-cluster-is-lost-embracing-failure-916cec7f06f0"> Part 4 My Cluster is Lost!! — Embracing Failure</a></p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=3417016cfb05" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[In the Land of Streams — Kafka Part 2: The rise of the Consumers]]></title>
            <link>https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-2-the-rise-of-the-consumers-2b750d3e6bf6?source=rss-1427b7a7e5e5------2</link>
            <guid isPermaLink="false">https://medium.com/p/2b750d3e6bf6</guid>
            <category><![CDATA[streaming]]></category>
            <category><![CDATA[data]]></category>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[kotlin]]></category>
            <category><![CDATA[cloud]]></category>
            <dc:creator><![CDATA[Giannis Polyzos]]></dc:creator>
            <pubDate>Tue, 13 Dec 2022 14:22:54 GMT</pubDate>
            <atom:updated>2022-12-13T14:26:57.505Z</atom:updated>
            <content:encoded><![CDATA[<h3>In the Land of Streams — Kafka Part 2: The rise of the Consumers</h3><h4>A Kafka Streaming Ledger</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/800/1*EEITZWkhLusPO4oWK1oQFA.png" /><figcaption><a href="https://www.vecteezy.com/free-vector/squirrel-cartoon">https://www.vecteezy.com/free-vector/squirrel-cartoon</a></figcaption></figure><p><strong>The Blob post series consists of the following parts:<br></strong>- <strong>Part 1:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-1-a-producers-message-ba92518c856e">A Producer’s Message </a><br>- <strong>Part 2:</strong> The Rise of the Consumers (this blog post)<br>- <strong>Part 3:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-3-offsets-and-how-to-handle-them-3417016cfb05">Offsets and how to handle them</a><br>- <strong>Part 4:</strong> <a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-4-my-cluster-is-lost-embracing-failure-916cec7f06f0">My Cluster is Lost!! — Embracing Failure</a></p><p>In the previous post, we discussed how the producing side works when we sent messages, and with data stored inside the topic let’s zoom in on the consuming side now.</p><p>This part aims to cover the following:</p><ol><li>How the consuming side works</li><li>How scaling consumer groups works</li><li>How scaling with the parallel consumer works</li><li>Tuning to avoid slow consumers</li></ol><blockquote>Switching to the other side of the wall</blockquote><p>You can find the relevant code samples on Github <a href="https://github.com/polyzos/kafka-streaming-ledger/tree/main/src/main/kotlin/io/ipolyzos/consumers">here</a>.</p><p>A typical Kafka consumer loop should look similar to the following snippet</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*4KV-x8wakIVCW46zz-gKTw.png" /><figcaption>Consumer poll() Loop</figcaption></figure><p>We trigger the <em>poll()</em> method on the consumer, simulate a small amount of work, and finally show the records it processed.</p><p><strong>Note: </strong>The<strong> </strong>show() method on the records comes from a helper extension function for printing the records in a nice and structured way:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*YkCB33YkN6iUUVdayqFbNQ.png" /></figure><p>So let’s try to better understand what happens here. The following diagram provides a more detailed explanation.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*T7BggUx-SEDnhB7y6Xj4LQ.png" /></figure><p>Kafka uses a pull-based model for data fetching. At the “<em>heart of the consumer”</em><strong> </strong>sits the poll loop. The poll loop is important for two reasons:</p><ol><li>It is responsible for fetching data (providing <strong><em>ConsumerRecords</em></strong>)<strong> </strong>for the consumer to process and</li><li>Sends heartbeats and coordinates the consumers so the consumer group knows the available consumers and if a rebalancing needs to take place.</li></ol><p>The consuming applications maintain TCP connections with the brokers and sent fetch requests to fetch data. The data is cached and periodically returned from the <em>poll()</em> method. When data is returned from the<em> poll() </em>method the actual processing takes place and once it’s finished more data is requested and so on.</p><p>What’s important to note here (and we will dive deeper into it in the next part) is committing message offsets. This is Kafka’s way of knowing that a message has been fetched and processed successfully. By default, offsets are committed automatically at regular intervals.</p><p>The amount of data - how much it is going to be fetched, when more data needs to be requested etc. are dictated by configuration options like, <em>fetch.min.bytes, max.partition.fetch.bytes, fetch.max.bytes, fetch.max.wait.ms. </em>You might think that the default options might be ok for you, but it’s important to test them out and think through your use case carefully.</p><p>To make this more clear let’s assume that you fetch <em>500</em> records from the<em> poll() </em>loop to process, but the processing for some reason takes too long for each message. <em>max.poll.interval.ms </em>dictates<strong><em> </em></strong>the maximum time a consumer can be idle before fetching more records; i.e calling the poll method and if this threshold is reached the consumer is considered <strong><em>lost</em> </strong>and a rebalance will be triggered — although our application was just slow on processing.</p><p>So decreasing the number of records the <em>poll() </em>loop should return and/or better tuning some configurations like <em>heartbeat.interval.ms </em>and<em> session.timeout.ms</em> used for consumer group coordination might be reasonable in this case.</p><h4>Running the Consumer</h4><p>At this point, I will start one consuming instance on my <strong><em>ecommerce.events. </em></strong>Remember from part 1 that this topic consists of 5 partitions. I will execute against my <a href="https://aiven.io/kafka"><strong><em>Aiven for Kafka</em></strong></a> cluster, using the default consumer configuration options and my goal is to see how long it takes for a consumer to read <em>10.000 messages</em> from the topic, assuming a <em>20ms processing time</em> per message. You can find the code <a href="https://github.com/polyzos/kafka-streaming-ledger/blob/main/src/main/kotlin/io/ipolyzos/consumers/SingleThreadConsumer.kt">here</a>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*YOyEPTwNG2yjQohYNI3zgA.png" /></figure><p>We can see that it takes a single consumer around 4 minutes for this kind of processing. So how can we do better?</p><p><strong>Scaling the Consuming Side</strong></p><blockquote>Consumer Groups and the Parallel Consumer Pattern</blockquote><p>Consumer Groups are Kafka’s way of sharing the work between different consumers and also the level of parallelism. The highest level of parallelism you can achieve with Kafka is having one consumer consuming from each partition of a topic.</p><p><strong>Scenario 1: #Partitions &gt; #Consumers</strong></p><p>In the scenario, the available partitions will be shared equally among the available consumers of the group and each consumer will have ownership of those partitions.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/730/1*RK5XnCGrVh6pt7izERga7g.png" /><figcaption>Partitions are shared among the available consumers</figcaption></figure><p><strong>Scenario 2: #Partitions = #Consumers</strong></p><p>When the partition number is equal to the available consumers each consumer will be reading from exactly one partition. In this scenario, we also reach the maximum parallelism we can achieve on a particular topic.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/675/1*thD8pcWU1EjmO8gzPYaJuA.png" /><figcaption>1:1 consumer-partition mapping</figcaption></figure><p><strong>Scenario 3: #Partitions &lt; #Consumers</strong></p><p>This scenario is similar to the previous one, only now we will have one consumer running but stays idle. On the one hand, this means we waste resources, but we can also use this consumer as a <strong>Failover </strong>in case another one in the group goes down.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/640/1*NQJmDiZtLHdD_pLeu-a-1w.png" /><figcaption>#consumer &gt; #partitions the extra consumers are idle</figcaption></figure><p>When a consumer goes down or similarly a new one joins the group, Kafka will have to trigger a rebalance. This means that partitions need to be revoked and reassigned to the available consumers in the group.</p><p>Let’s run again our previous example — <em>consuming 10k messages</em> — but this time having 5 consumers in our consumer group. I will be creating 5 consuming instances from within a single JVM (using kotlin <a href="https://kotlinlang.org/docs/coroutines-overview.html"><em>coroutines</em></a><em>), </em>but you can easily re-adjust the code (found <a href="https://github.com/polyzos/kafka-streaming-ledger/blob/main/src/main/kotlin/io/ipolyzos/consumers/PerPartitionConsumer.kt">here</a>) and just start multiple JVMs.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Y2l_LD6qDCfw1fIUNeVOqw.png" /></figure><p>As expected we can see that the consumption time dropped to less than a minute time.</p><p>But if Kafka’s maximum level of parallelism is one consumer per partition, does this mean we hit the scaling limit? Let’s see how to tackle this next.</p><h4>What about the parallel consumer pattern?</h4><p>Up to this point, we might have two questions in mind:</p><ol><li>If #partitions = #consumers in the consumer group, how can I scale even further if needed? It’s not always easy to calculate the number of partitions beforehand and/or I might need to account for sudden spikes.</li><li>How can I minimize rebalancing time?</li></ol><p>One solution to this can be the parallel consumer pattern. You can have consumers in your group consuming from one or more partitions of the topic, but then they propagate the actual processing to other threads.</p><p>One such implementation can be found <a href="https://github.com/confluentinc/parallel-consumer">here</a>.</p><p>It provides three ordering guarantees — <strong><em>Unordered</em></strong>, <strong><em>Keyed</em></strong> and <strong><em>Partition.</em></strong></p><ol><li><strong>Unordered —</strong> provides no guarantees</li><li><strong>Key</strong> — guarantees ordering per key BUT with the caveat that the keyspace needs to be quite large, otherwise you might not observe much performance improvement.</li><li><strong>Partition</strong>—Only one message will be processed per partition at any time.</li></ol><p>Along with that it also provides different ways for committing offset. This is a pretty nice library you might want to look at.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/917/1*zliIMq6Dvh2Ar1BsFQz8eg.png" /><figcaption>The Parallel Consumer Pattern</figcaption></figure><p>Going once more back to our example to answer the question — how can we break the scaling limit? We will be using the parallel consumer pattern — you can find the code <a href="https://github.com/polyzos/kafka-streaming-ledger/blob/main/src/main/kotlin/io/ipolyzos/consumers/ParallelScalingConsumer.kt">here</a>.<br>Using <em>one</em> <em>parallel consumer instance</em> on our <em>5-partition</em> topic, specifying a <em>Key Ordering,</em> and using a parallelism of <em>100 threads</em></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Y4NDf8asLvm4_EX2RG-hoQ.png" /><figcaption>1 parallel consuming instance</figcaption></figure><p>makes the consuming and processing time of <strong><em>10k messages </em></strong>take as much as <strong><em>6 seconds</em></strong><em>. <br>Notice on the screenshot how different batches are processed on different threads on the same consumer instance.</em></p><p>and if we use <em>5</em> <em>parallel consumer instances</em></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*lxnyO3AMUamiiwE7xnS2Jg.png" /><figcaption>5 parallel consuming instances</figcaption></figure><p>we accomplished getting that down to <strong>3 seconds.</strong><br><em>Notice in the screenshot how different batches are processed on different threads on different consumer instances.</em></p><h3>Wrapping Up</h3><p>In this part, we saw how the consuming side of Kafka works. As takeaways when creating consuming applications:</p><ul><li>We need to take into account the number of partitions each topic has</li><li>Think of our requirements in terms of processing and try to account for slow consumers.</li><li>How we can scale both with consumer groups and the parallel consumer pattern?</li><li>Message ordering, the number of keyspace, and partition guarantees need to be taken into account here and see what approach works the best (or a combination of both).</li></ul><p><strong>Check Next: </strong><a href="https://medium.com/@ipolyzos_/in-the-land-of-streams-kafka-part-3-offsets-and-how-to-handle-them-3417016cfb05">Part 3 Offsets and how to handle them.</a></p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=2b750d3e6bf6" width="1" height="1" alt="">]]></content:encoded>
        </item>
    </channel>
</rss>