<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Scott Haines on Medium]]></title>
        <description><![CDATA[Stories by Scott Haines on Medium]]></description>
        <link>https://medium.com/@newfrontcreative?source=rss-3b4cab6af83e------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/1*yxJ4a0E87cbQoNwXoPV47w@2x.jpeg</url>
            <title>Stories by Scott Haines on Medium</title>
            <link>https://medium.com/@newfrontcreative?source=rss-3b4cab6af83e------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Sat, 16 May 2026 02:09:01 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@newfrontcreative/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[Beyond the Data Abyss]]></title>
            <link>https://newfrontcreative.medium.com/beyond-the-data-abyss-6bf2d1e6e34a?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/6bf2d1e6e34a</guid>
            <category><![CDATA[life-lessons]]></category>
            <category><![CDATA[software-engineering]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[kafka]]></category>
            <category><![CDATA[enterprise-architecture]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Thu, 17 Jul 2025 23:40:12 GMT</pubDate>
            <atom:updated>2025-08-08T19:23:52.725Z</atom:updated>
            <content:encoded><![CDATA[<h4>How We Learned to Fall in Love with our Streaming Data Again.</h4><figure><img alt="A colorful rendering of what Space could look like abound with reds and yellows, blues and blacks. Not the classic void of space but something more happy and inviting." src="https://cdn-images-1.medium.com/max/1024/1*voiikhPgCxFK5fP_XpfQXQ.jpeg" /><figcaption>Imaging life at the brink of creation. Your data is in a similar primordial state until you gain control and learn to harness it. <a href="https://www.midjourney.com/jobs/d34f8835-ed7f-44e8-9a41-ad6163574cf3?index=0">Image by Author</a> via MidJourney.</figcaption></figure><p>In part one of this series “<a href="https://newfrontcreative.medium.com/escaping-the-void-of-the-data-abyss-337770a39fbc">Escaping the Void of the Data Abyss</a>”, we put our heads together and learned about the countless horrors that will sneak up on you in the night (or over the years) with respect to streaming data at massive scale — through the lens of what didn’t work at Nike. We then learned about the <em>strategy taken at Nike</em> to right the sins of the past, and how <a href="https://buf.build/">Buf</a> came to our rescue and saved the day. Lastly, I left us with a cliff hanger at the end of the first entry saying <em>I’d talk about some of the problems (and solutions) that came about as we started to scale up our </em><strong><em>protobuf-first strategy</em></strong>. Alas, here we are. Ready. Set. Let’s Go.</p><h3>Aligning on a Protobuf-First Data Strategy</h3><p>This can be a hard sell —<em> at first</em>. Most people when given the choice between <em>a) object-flexibility, or mutability</em> and <em>b) strict type-safety, object-level governance, or message-level immutability</em>, will tend <em>to do what is easiest which falls in the mutability camp. </em>Mutability ultimately equals extra work since you’ll be paying the piper on ingest as things change, which in many cases means “after emission into a Kafka-like service”.</p><p>This is a problem since streaming JSON data <em>is still mutable</em> and on its own is missing an understanding of schema governance. This just means it has a comedic way of surfacing corrupt records at the worst times. This can be annoying for web services, but it is crippling for event streaming since there is no way to rewind and replay corrupt events — <em>simply put it will lead to data loss and is an outage!</em></p><blockquote>Most people when given the choice between <em>a) object-flexibility, or mutability</em> and <em>b) strict type-safety, object-level governance, or message-level immutability</em>, will tend <em>to do what is easiest which falls in the mutability camp</em></blockquote><p>So to recap. JSON is great for HTTP services. <strong>JSON is bad for reliable streaming services</strong>. Protobuf makes JSON services more manageable as well (through type-saftey and object-level invariants) — so a win win in both respects and an easier sell is simply to use protobuf as a backbone for traditional web services and as a go-to for event streaming. For us at Nike, we needed it for both.</p><h4><strong>Sidebar: The Streaming Data Problem</strong></h4><p>The problem I mentioned earlier is fairly ubiquitous across most enterprises — in a nutshell, non-binary serializable structured data + streaming causing problems. The trouble is that problems sneak up on us over time with slowly changing data (row or columnar) and this can eventually corrupt historically captured data — to the point of no longer being able to read records.</p><blockquote>There is likely a graph somewhere that shows the length of time a data product has been in existence (vs) the number of changes to the underlying data structure(s) as a function of increasingly invalid records within said dataset across the lifetime of that data product. <em>If not, then imagine a chart showing something getting slowly worse as a function of time</em>.</blockquote><p><em>Not breaking backwards compatibility is one reason why protovalidate is so important (</em><a href="https://newfrontcreative.medium.com/escaping-the-void-of-the-data-abyss-337770a39fbc"><em>see part 1 for more details</em></a><em>) and one of the easier ways of selling the protobuf-first strategy. It’s simply *impossible to force through a breaking change.</em></p><p>Back to aligning on the strategy.</p><h4><strong>Moving Past the Hard Sell</strong></h4><p>Going back to the hard sell (<em>which was “hey, we’re going to go all in on protobuf”</em>), and making it less of a hard sell takes some careful planning. Remember people are most often afraid of what they don’t know, and unless people really trust you, they’ll need to see “proof” (which we had through the clickstream project), and they’ll also be on the lookout for an “easy-button” unless they are convinced they can continue to deliver while undergoing a complete re-architecture.</p><p>One of the myths we found floating around the rumor-mill was that “Protobuf is too ridged to work with due to it’s immutability”. Luckily, we could easily squash that myth since while protobuf is immutable, <em>it is not immutable in the way we think</em> of sealed or final objects (in software), in fact, protobuf definitions can and will change over time. <strong>The big difference is that the specification accounts for how changes can be made while ensuring backwards compatibility with prior versions of “said message self”</strong>.</p><p>Knowing that we could achieve backwards-compatibility and gain compile-time guarantees, along with the time-saving benefits of code-generation proved to be the lightbulb moment most engineers were waiting for. Most of them had been dealing with broken data promises and their associated outages (botched deployments, 400s, 500s, broken data pipelines) for years and were happy to swim to shore. <strong>An easy sell for engineers is always “you’ll do less work”.</strong></p><blockquote><strong><em>Regarding message and service immutability<br></em></strong>A protobuf definition is immutable “at the point in time” that it is compiled. It is easier to think about “this point in time” in terms a version. We simply say “for a given version of a protobuf message, it is immutable” — <strong>but how do we account for a given version?</strong></blockquote><h4>Lessons Learned: Versioning Strategy for our Protobuf</h4><p><em>Meanwhile, back in Nike-land</em>. We ran into the problem of versioning our protobuf early-on in the project. What’s the best pattern? especially when we were compiling many <strong>“artifacts”</strong> for various platforms and languages? We asked ourselves:</p><ul><li>Do we cut a new release on every change?</li><li>What if changes occur in places we don’t care about?</li><li>How do we version and release specific sub-sets of messages?</li><li>What about releases? Do we wait to release a new “complete” snapshot using git-tags following our <a href="https://docs.github.com/en/repositories/releasing-projects-on-github/about-releases">standard release versioning</a>?</li><li>Are there a better way to share common messages so we don’t fall into old habits (like denormalizing our messages vs normalizing via composition)?</li></ul><p><em>As you can tell there was a lot on our minds.</em></p><p>Initially, we followed the data-domain pattern (a play on the domain-driven modeling approach) within a large monorepo. Since we were using a <a href="https://www.atlassian.com/git/tutorials/monorepos">monorepo</a>, we could create shared resources (using local references), and then our domain specific resources (our protobuf messages and connect services) could still benefit from sandboxing (within a given domain) — while still sharing the same parent directories. In essence, we could take <em>the one ring to rule them all approach</em> and hope for the best.</p><pre>proto/<br>  common/<br>    product/<br>      item.proto<br>      sku.proto<br>    membership/<br>      user.proto<br>      ...<br>    order/<br>      order.proto<br>      return.proto<br>    iso/<br>      country.proto<br>      language.proto<br>    ...<br>  domain_a/<br>  domain_b/<br>  ...<br>  domain_n/</pre><p><strong>The Problem</strong>: Monorepos tend to start off with the best of intentions, but depending on the size and scale of the enterprise, they can become unruly. When we crossed the threshold of around <strong>1000 protobuf types </strong>— types being message, enum, rpc’s—we realized we were headed for a deadend.</p><p>Additionally, the problem of the monorepo isn’t just tech related, how to scale the number of collaborators (around 20–30 people) who all have their own needs and deadlines is a chore in and of itself. <em>We needed a way to share versioned artifacts without requiring everything to live under the same roof.</em></p><p>We had to find an easier way. We needed decentralized modules. Really, we simply needed a solution that understood the human-scalability concerns.</p><p><strong>The Solution:</strong> <a href="https://buf.build/product/bsr">The Buf Schema Registry</a> (BSR).</p><blockquote>BSR is a single pane of glass for all things within the protobuf ecosystem. It proved to be a wonderful tool for coordination and collaboration as well. This means that engineers across our API services, data-domains, analytic domains could come together in a central place — while still operating with autonomy.</blockquote><h4>The Swiss-Army Knife that is the Buf Schema Registry</h4><p>To fix the issues of scaling out our protobuf-first strategy, across both humans and also across organizational domains, we (Nike) ended up solidified a longer-term partnership with Buf.</p><p>It was a no brainer to move forward because of the Buf Schema Registry (BSR), but around the same time that we were going through procurement, there was a new product offering just on the horizon — <a href="https://buf.build/product/bufstream">Bufstream</a> (<em>we’ll get to that at the end of this post</em>) and that was of great interest to our plan for protobuf-based domination.</p><p>BSR provided a way for us to create versioned <strong>remote modules</strong>, rather than continuing to maintain a centralized monorepo. Adoption of BSR helped solve the problem of too many cooks in the mono-kitchen almost overnight. Now teams could operate within organizations independently (similar to how you would utilize organizations within Github), and we could all still share common modules through a “common” organization within specific collections (repos) — <em>For example, if you think about common IETF standards like ISO codes, these can be shared as a collection of enums vs “strings” which means no more typos.</em></p><p>All of the tricky dependency management is handled by Buf automatically through the <a href="https://buf.build/product/cli">Buf Cli</a> with assistance from the <a href="https://buf.build/docs/configuration/v2/buf-yaml/#deps">buf.yaml</a> (updates your buf.lock). When it comes time to create “versioned” resources, this is done through “labels” (in a similar way to using git tags for releases) — bound to a specific git commit (at least how we’d recommend doing it).<em> See Figure 1–1 which showcases versioning in action.</em></p><figure><img alt="Shows a User Interface (UI) showcasing the documentation for Buf’s protovalidate. The tab on the web page is open to the “docs” tab, and the README file of the project is rendered beautifully." src="https://cdn-images-1.medium.com/max/1024/1*oMwKxT38rUaMti_gTezj4w.png" /><figcaption>Figure 1–1: A view of the “docs” tab within the public Buf Schema Registry. <a href="https://buf.build/explore">Explore Here</a>.</figcaption></figure><p>With the ability to version our protobuf modules in a standardized way including the beautiful addition of remote dependency tracking — all in a decentralized way—we were finally cooking with fire.</p><p><em>But we still needed to figure out the best pattern for releasing our artifacts</em>. Luckily, this was already a feature baked into the Buf Schema Registry.</p><h4>Light Bulb Moment: Server-Side Artifact Generation</h4><p>One of the more amazing features of the BSR is the ability to lazily generate artifacts. In Figure 1–2, you’ll see the SDK tab highlighted. The view provides you (the engineer) with simple directions to fetch the specific “versioned” resources you need.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*FsYFvXhkvCRllP0F34l_sw.png" /><figcaption>Figure 1–2: Utilizing Lazy SDK Generation hints via the SDK Tab.</figcaption></figure><p>This feature really cuts down on the level of effort for working with globally distrubuted teams and we could now do more work asynchronously. Teams no longer needed to reach out in slack to understand which version was safe to deploy for a given artifact. Gone were the useless meetings to coordinate releases across our API services. We now had enterprise-wide invariants — <em>we could finally trust-first, ask questions later!</em></p><h4>Lesson Learned: Git Actions Rule</h4><p>Utilizing protobuf and building grpc services (via <a href="https://connectrpc.com/">connect</a>) was a huge win. The icing on the cake was the power of BSR’s <a href="https://buf.build/docs/bsr/admin/instance/bot-users/">bot-user’s</a> and Buf’s official github action (<a href="https://github.com/marketplace/actions/buf-action">buf-a</a>ction).</p><blockquote><strong><em>What are Bot Users?</em></strong><br>If you are unfamiliar with bot-users, they are also referred to as “headless” users, or service principals. In general, this is a user that will never need to “see” the UI, and they are typically used for CI/CD or for other automations.</blockquote><p>We realized we could create a new bot user for each onboarded organization within our enterprise BSR. We could then utilized github actions (shown in Figure 1–3) to enforce “stage&gt;pr&gt;push” patterns in a unified way.</p><p>Our custom github workflow included a step would fetch the bot-user token and afterwards all commits to BSR would be signed by the bot-user of a given organization. While it can be nice to “test” things out locally and push experiments (as a human), in the long run we turned off “write” access for non-owner’s within the BSR to prevent garbage from littering the environment. This strategy may not be the correct way of working for your company, but for us the additional safe-guard meant we could <em>enforce our ways of working </em>and sleep well at night.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*uNDM5lLnwa2bRLkDBMYIDQ.png" /><figcaption>Figure 1–3. Buf’s official Github Action.</figcaption></figure><p>Now that we had a strategy for how we’d roll out protobuf across the company, we could start to build additional layers onto this solid foundation. This was a good time for us to step back and consider why things were going well.</p><h4>Project Retrospective</h4><p>We started small. Proved that our ideas would work and scale. Often it feels like you are moving at snail-speed getting everything required into place, but this is more of a time dilation. The momentum of a new project brings with it so many novel ideas but we had to remember to stick to milestones and finish what we started. Then we could add additional layers and complexity with the trust provided by our apriori actions and successes.</p><p>The other thing that we did was <a href="https://engineering.atspotify.com/2020/08/how-we-use-golden-paths-to-solve-fragmentation-in-our-software-ecosystem">pave a golden path</a> for all teams that would be following in our footsteps. This meant we had already paid down the system-wide complexity, figured out ways of working that evolved through real trial and error, but we did so with mission-critical focus and concentration.</p><h3>Semantic Streaming with Bufstream</h3><p>In part one of this post (and highlighted here in Figure 1–4), I talked about our ingestion architecture and how we built our gateway ingestion services to receive clickstream events, validate this event streams, and either a) emit error messages back to the client, or b) write to Kafka to be consumed within Databricks.</p><figure><img alt="Shows a data ingestion architectural diagram with client SDKs on the left. Event data is emitted via a Gateway API Service and each record is written to Kafka for consumption downstream via Databricks." src="https://cdn-images-1.medium.com/max/1024/1*OdPW23rE7HdnY293Skx6xg.png" /><figcaption>Figure 1–4. Our Original Ingestion Architecture</figcaption></figure><p>This architecture was our starting point. Given the goal of having a protobuf-first data strategy, we realized fairly quickly that we were going to run into additional points of contention even with our generic “protobuf-aware” pyspark applications. The overhead alone of running full-time streaming applications, or coordinating scheduled jobs using structured streaming and trigger(availableNow=True) meant that we’d need to bring best in class automation to the table (literally).</p><p>Luckily for us, we had the <strong>Bufstream ace-up-our-sleeve</strong>. While we are currently working to scale out this new architecture, the early results are looking really promising. For us, reducing the overhead required for ingesting from N Kafka topics and writing all records to N lakehouse tables means we’ll reduce our ingestion complexity by N. This is huge, even if it doesn’t seem that way.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*s8Tm2nFFUYnVnb4VgXW5hw.png" /><figcaption>Figure 1–4. Future Facing Ingestion Architecture</figcaption></figure><p>This is all made possible given <a href="https://buf.build/docs/bufstream/iceberg/reference/">Bufstream can do zero-copy writes</a> into the Iceberg Lakehouse format. Given the acquisition of <a href="https://www.tabular.io/">Tabular</a> by Databricks, Apache Iceberg was now a first-class citizen within Unity Catalog for our Databricks workspace(s), which means we wouldn’t need to do anything complex other than write our analytical data to a Bufstream topic. Now that <a href="https://buf.build/blog/buf-announces-support-for-databricks-managed-iceberg-tables">Buf provides support for Unity Catalog managed Iceberg tables</a>, this is a simple win — <em>take the win!</em> The hard work is now done behind the scenes for us, all within our own VPC.</p><p>I’ll be writing more in depth about Bufstream, so look out for future posts.</p><h3>The Future is Bright (Nike x Buf)</h3><p>Now that Nike has partnered with Buf, and after the process of integrating BSR, nailing down the golden paths for managing our versioned protobuf, figuring out the right set of github actions to simplify deployment of our connect rpc services, and beginning the journey towards simplifying our Kafka footprint using Bufstream — I can say the future is looking extremely bright.</p><p>The one last thing for this post is a quote from one of my close friends at Nike — he said “before using Buf’s tooling we (Nike) were looking at months to do basic changes to our existing APIs, and now that we’ve nailed down the right patterns, we are looking at DAYS to do the same work.”</p><blockquote>before using Buf’s tooling we (Nike) were looking at months to do basic changes to our existing APIs, and now that we’ve nailed down the right patterns, we are looking at DAYS to do the same work.</blockquote><p>Now, when asked if you prefer <em>a) object-flexibility, or mutability</em> and <em>b) strict type-safety, object-level governance, or message-level immutability — </em>the answer is clear. Choose protobuf and the message-level invariants that you will come to grow and trust.</p><p>If you missed Part 1 of this series. Check it out below.</p><p><a href="https://newfrontcreative.medium.com/escaping-the-void-of-the-data-abyss-337770a39fbc">Escaping the Void of the Data Abyss</a></p><p>* impossible — you can force a breaking change by turning off protovalidate, luckily with <a href="https://buf.build/docs/bsr/checks/">BSR you can enforce breaking-change detection</a> at the registry across the entire enterprise.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=6bf2d1e6e34a" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Escaping the Void of the Data Abyss]]></title>
            <link>https://newfrontcreative.medium.com/escaping-the-void-of-the-data-abyss-337770a39fbc?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/337770a39fbc</guid>
            <category><![CDATA[protobuf]]></category>
            <category><![CDATA[data-modeling]]></category>
            <category><![CDATA[distributed-systems]]></category>
            <category><![CDATA[grpc]]></category>
            <category><![CDATA[software-engineering]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Mon, 30 Jun 2025 21:31:30 GMT</pubDate>
            <atom:updated>2025-08-26T21:33:36.251Z</atom:updated>
            <content:encoded><![CDATA[<h4>Leaning on the Guiding Lights of Structured Data and Protocol-Level Invariants to Avoid the Evil Clutches of Bad Data and Rewrite the Sins of the Past.</h4><figure><img alt="A monster in the clouds appears to be chasing a classic car against a background of storm clouds in red and gray" src="https://cdn-images-1.medium.com/max/1024/1*iHRVD1H4t-D1mjRZyFPhPg.png" /><figcaption>Does your Data Scare You? Image By Author via <a href="https://www.midjourney.com/jobs/cedeedb3-e66b-43b1-9ac6-ab68f22b27cb?index=0">Midjourney</a>.</figcaption></figure><p>If you’re anything like me you don’t frighten easily. I love horror movies. I truly appreciate the clever use of foreshadowing, especially when we all know what is clearly coming next. Being a member of this silent audience — sitting there on the edge of our seats—provides us with the excitement of premonition (from the safety of our seats) while the movie takes us right around <em>the corner</em>. <em>What comes next is literally why we’re here!</em></p><h3>Does Your Data Scare You?</h3><p>Jump scares and coming face to face with monsters isn’t what we crave though when it comes to our data. It just happens to be one of the realities of working at large enterprises. Comically bad data can make you feel just as helpless as <em>the damsel in distress</em>, and unfortunately leaves you paralyzed simply being dragged along for the ride just like the audience in the movie theater — <em>even though we already understand what is probably right around the corner and in this case it is broken data promises and sadness</em>.</p><p><strong>So why does our data scare us?</strong></p><p>It doesn’t have to. In fact, just like the unsung hero in most horror movies, it is up to us to flip the script and <em>change the outcome of the story</em>. What comes next is in your hands (dear audience) but as a lifeline, I’ll tell you a recent story from Nike in hopes that you too can escape the event horizon of the data abyss!</p><h3>Steering around Data Swamps and Preventing the Impending Datapocalypse</h3><p>I joined <a href="https://www.nike.com/">Nike</a> in 2022 and my mission was fairly “<em>simple”</em> — I was being hired to <strong>change the data culture</strong> and find creative ways to fix the monstrous problems we had with <em>“all things enterprise data and analytics”</em>. Simple. Right? Nope.</p><p>I had unwittingly just stumbled into a horror movie set in a scary world of hopes and *poorly executed data pipeline dreams — luckily this wasn’t my first time <a href="https://medium.com/97-things/preventing-the-data-lake-abyss-9a44e1f9f628">escaping the abyss</a>. Just like <a href="https://en.wikipedia.org/wiki/Van_Helsing_(film)">Van Helsing</a>, I too had tools at my disposal and prior success to help me on my way. What I needed first was to assemble a crew, and materialize a mission that could set off a series of events to rid the world of the data swamp monsters lurking in the data deep—once and for all.</p><p>Oh. But where to get started?</p><h4>Focusing on Mission Criticality and Solving the Hard Problems</h4><p>It’s easy to do work in the shadows. Alone. It is much harder to get work accomplished with the blinding light of <a href="https://en.wikipedia.org/wiki/Sauron">Sauron’s eye</a> watching your every move — all the while building trust (at a new company) and assembling a core team as an IC (<a href="https://www.linkedin.com/in/scotthaines/">distinguished software engineer</a>).</p><p>When you’re <em>working to disrupt the status quo</em> (highest probability of why we were here), there are people who want you to succeed — and then there is everyone else. Luckily for me, I had the support of my boss (head of ED&amp;AI) on my side and found a crew of folk who wanted to work hard and solve complex problems (across multiple organizations in a matrix company… scary!).</p><p>In order for us to <strong><em>solve our data problems</em></strong> (which I’ll go into next), we were going to need to roll back the clocks to <em>a time before the worst of the bad decisions</em> were made, and then rewrite the playbook—and as we also found out later; <em>the entire analytics and data serving stack</em>.</p><p>We were going to once again need to lean heavily on my best friend <a href="https://protobuf.dev/">Protobuf</a>, but this time we were going to pick up some new advanced tools along the way <em>(foreshadowing!)</em>.</p><blockquote>If you’ve ever heard me talk before, you’ve probably heard me <a href="https://www.slideshare.net/slideshow/the-happy-marriage-of-redis-and-protobuf-by-scott-haines-of-twilio-redis-day-seattle-2020/225949350">talk about my love of all things protobuf</a>. If not, check out the video from <a href="https://youtu.be/HBtScr7MQxU?si=oj6VM4XXsnLUx6A8">RedisConf 2020</a>.</blockquote><p>So about those data problems!</p><h3>The Hard Problem: Nike’s Clickstream</h3><p>After a few months at Nike something became apparent (seemingly bubbling to the surface on its own). There was something wrong with the clickstream. This wasn’t like a simple “oops”, this was so much more than “ooo-oo-oooops”. This was sinister and insidious like a good haunted house story.</p><p>Something was systemically wrong and oh, this is it (I thought to myself), this is why I’m here. I found my mission. I figured out where I can lean in and create meaningful change. But what was so wrong (?) you might be asking — cause you don’t know the story like I do.</p><figure><img alt="Creepy abandoned house in a barren field with an oak tree and face hiding in the background. There is some ominous dread here. Scary" src="https://cdn-images-1.medium.com/max/1024/1*6DeyshzKTuMHfdIva64HIQ.png" /><figcaption>Not the Data Lakehouse you were Expecting!. Image by Author via <a href="https://www.midjourney.com/jobs/a2f8930d-e4b9-4b53-a973-1c871873de80?index=0">Midjourney</a>.</figcaption></figure><p>Oftentimes what starts out with the best of intentions — essentially all of our clickstream events, our hand-tuned SDKs, and even our end-to-end data delivery strategy — was beginning to crack, crumble, and fall apart. There were even early warning signs of complete system-wide collapse (things had grown too big for the architecture) — much like the abandoned “maybe haunted house” in the picture above—things were not looking too good. We had to make a call. Do we Fix It (and can we)? Or do we abandon ship (and essentially burn things to the ground). This is a difficult question that deserves some truly objective reasoning.</p><h4>Retrofit or Rebuild?</h4><p>There are pros and cons when using JSON for data intensive applications — mostly cons though when it comes to reliable streaming of mission critical data. In this case, the mutability of JSON data was causing system-wide issues on the clickstream. So the question was still there though — do we retrofit or rebuild? <em>And can we fix the sins of the past?</em></p><blockquote>If you some time on your hands, you could dig further into a longer post on analytical stream processing. Goes into the why behind “why not JSON”.</blockquote><p><a href="https://medium.com/data-science/a-modest-introduction-to-analytical-stream-processing-db58b3694263">A Modest Introduction to Analytical Stream Processing</a></p><p>Initially, the idea was that we could just retrofit the system <em>since after all it was built with the best of intentions</em>. So the team and I dug in, and we uncovered some more ugly truths.</p><h4>The Sins of the Past</h4><p>Back in the day (sometime around 2014-ish) a consulting company (one of the big ones with gigantic market cap) sold Nike on the idea of using “TSV” (yep, tab-separated values) files to encapsulate individual events for the clickstream. This solution was created to solve the problem of long lead times for changes on the clickstream (events specifically). The main problem identified by “said consulting company” was the delta between modeling an event, and then instrumenting said event on our desktop and mobile sites (and applications) with proper testing — it was simply taking “longer than expected”.</p><p><em>So people nodded along, and “said consulting company” was paid to implement the solution, train as many teams as possible across Nike to use their complex solution, and then they left their solution in “our very capable hands”.</em></p><p>The solution was that any product manager (PM) could edit a (tsv) file, and then “it” would be parsed and compiled into <a href="https://json-schema.org/">JSON-SCHEMA</a>. This was the solution (plus a read-only UI) so that teams across Nike could then utilize the event schemas (after implementing their own interpretation of what each event “required”) for any given event. <em>Sounds okay, right?</em></p><p>In theory, this wasn’t a bad idea. In practice, there were problems that arose quickly — like for example when “a typo” had the amazing effect of removing a previously “generated” value from <a href="https://json-schema.org/understanding-json-schema/reference/enum">json-schema’s enum</a>. To paint a clearer picture, this little typo had the cascading power to break all downstream data consumers leading to broken ML models, dashboards and reports. In short, the butterfly effect in the data world. Cause there was no testing for breaking changes (<em>more foreshadowing!</em>) and zero object-level governance or semantic validation at play.</p><blockquote>When faced with problems, what is better than adding additional complexity?</blockquote><p>Due to symptoms of the <a href="https://thedecisionlab.com/biases/the-sunk-cost-fallacy">sunk cost fallacy</a> an even wilder problem was created on top of a bad solution. Rather than continuing to utilize unified events, communicate between teams, and test changes per release using our shiny new (fragile) system, and ultimately “share” reusable events across all experiences (nike.com, nike app ios/android, and others), the product managers decided that they could simply copy (or fork) events so they could “own” their own and not bother with a unified strategy. So where we once had 1 event, we now had between 9 and 36 versions of each event (<em>yes the math doesn’t make sense, but remember the horror theme?</em>).</p><h4>Love it or Hate It. Object-Level Governance is Incredibly Important</h4><p>Given the strain on the system, the years of events being forked (vs reused or composed), and the complexity in painting a unified analytical story across the various Nike experience event streams — we were quickly coming to a difficult realization. As you probably guessed, the <strong>retrofit was off the table</strong>.</p><p>After all, about 8 years had already gone by at this point in time and the cracks in the system were essentially the entire system. So we decided that the best path forward was to do a rebuild. But we were going to do it right this time. <em>Even if it would kill us</em>. <em>Mission Accepted</em>.</p><p>The fact that we had loose definitions of events — given the type-free nature of TSV based definitions — we effectively had <strong>zero object-level governance and could kiss semantic validation goodbye</strong>. This meant that there was no system in place to “modify” an event in a standard way. If you think about classic relational databases (OLTP) each table has a schema. If you want to modify the table’s schema, then the protocol-level invariants (promises) enforce “how” a change can occur and “how&gt;what&gt;when” the change will affect said given table.</p><blockquote>When it comes to streaming systems, like the classic event stream (clickstream), you really want to abide by similar rules for schema evolution (as with classic relational databases) even though it may feel <em>like you are moving a little slow (that will change)</em>. This includes rules about <em>type-saftey</em>, field-level position (or columnar position), and how nested-objects will be encoded — as well as how backwards and fowards compatibility will be achieved and even enforced.</blockquote><p>In order to provide event-level invariants, we’d need to lean on a type-safe message format that supports backwards and forwards compatibility — which again means leaning on <a href="https://protobuf.dev/">protocol buffers</a>.</p><h3>Fixing Nike’s Clickstream and Behavioral Analytics</h3><p>We had a mission. We had purpose. What we didn’t have yet was a clear plan of attack— we just knew we needed to <em>fix the sins of the past</em>.</p><h4>Establishing a Plan of Attack</h4><p>We had a kick off brainstorming meeting between the small core team of engineers — there were 3 of us, let’s call the other two Doug and Christian (to keep them anonymous), representing Data Ingestion &amp; Platform, Apps &amp; Experiences, as well as Data, Analytics, and AI. The goal of the session was to come up with a set of standards, system expectations, as well as ways of working that we could stake our reputation on. Given we’d be doing a lot of work asynchronously and on a limited timeline (3 months to prove things would work), ways of working were not optional — they were essential.</p><p><strong>The end result of our time, coffee, and some beers was the following:</strong></p><ol><li><strong><em>Standardize on Protobuf</em></strong>. Not simply for API services, but truly end to end across analytics and ML as well. <strong>End-to-End Protobuf</strong> was not a nice to have, it was <em>the way</em>. We’d need a way to ensure our protobuf was written in a standard way across the company, and need to write some test harnesses to ensure backwards compatibility (see zero breaking changes below). <em>JSON was the reason the initial system fell apart — given its mutability and lax standards across the company we wouldn’t fall prey to “loosely” structured data</em>.</li><li><strong><em>Lean on Code Generation &amp; Semantic Validation</em></strong>. Ridding ourselves of the long lead time for change in the system would mean we’d need the ability to cross-compile our event definitions so we’d have no excuse not to “reuse” and “compose” shared events in a much more governed way. In addition, we’d need a way to compile our definition of “correct” for each event to reduce the time taken from event ingestion to insight — we’d need some kind of compiled validation logic. <em>This would flip the script on the prior system where new events would take months to finally release across all experiences</em>.</li><li><strong><em>gRPC for our SDKs</em></strong>. Taking things one step further, we’d compile down our SDKs utilizing gRPC to prevent the issues we’d encountered with traditional REST for our clickstream. This way, we’d write our interfaces (using IDL for gRPC) and cross-compile for javascript, typescript, swift, kotlin, and go. <em>Reducing the time required to implement new events.</em></li><li><strong><em>Zero Breaking Changes</em></strong>. We’d guarantee that all events would always be backwards compatible within a major version (1.0.0 vs 2.0.0). We would only make a breaking change when it mattered. <em>This was in direct response to the lack of governance in the prior system. Trust is built when things just work release after release</em>.</li><li><strong><em>Automate Event Ingestion to Databricks</em></strong>. We were using databricks for our Lakehouse environment. One of the issues of the prior system was a large pain point when it came to scaling out the JSON-based event streams — due to zero semantic validation and no governance the streams would be corrupt more often than not. We’d fix that issue by automating the ingestion of our event streams, leaning on apriori semantic validation, to ensure that only trust-worth data would be appended to our tables and made available via <a href="https://www.databricks.com/product/unity-catalog">Unity Catalog</a>.</li></ol><p>So in short, we’d agreed to <strong>1)</strong> standardize on protobuf, <strong>2)</strong> implement code generation and semantic validation, <strong>3)</strong> utilize gRPC for our SDKs to simplify the exchange of events, <strong>4)</strong> provide compile-time guarantees and a mission of zero breaking changes, and lastly, <strong>5)</strong> we’d automate the last mile ingestion of event data by leaning on protocol level invariants, and high-trust built on semantic event validation at the ingestion edge. Now the hard work would begin.</p><h3>From Mission Briefing to Production</h3><p>During the early research phase we stumbled upon a company called <a href="https://buf.build/">Buf</a> — by accident really. We’d been digging into some newer changes enabling native protobuf support for Apache Spark and found a file called <a href="https://github.com/apache/spark/blob/v4.0.0/core/src/main/protobuf/buf.yaml">buf.yaml</a> while digging into the depths of <a href="https://spark.apache.org/docs/latest/spark-connect-overview.html">spark-connect</a> (spark’s gRPC client).</p><blockquote>Sometimes things in life are simply serendipitous and we couldn’t have hoped for a better find this early on in the project.</blockquote><h4>Simplified Standardization with Protobuf</h4><p>The <a href="https://buf.build/docs/configuration/v2/buf-yaml/"><strong>buf.yaml</strong></a> turned out to be a specification that worked along with the <a href="https://buf.build/docs/cli/">Buf CLI</a> — that checked off two of the known unknowns from our list. First, <em>how we’d simplify breaking change detection</em> — which we learned <a href="https://buf.build/docs/reference/cli/buf/breaking/">buf breaking</a> could be used for just that (this was clutch), and secondly, <em>how we’d provide capabilities to ensure our protobuf messages were written in a standard way</em> — this ended up being the secret behind <a href="https://buf.build/docs/reference/cli/buf/lint/">buf lint</a>.</p><pre>version: v1<br>breaking:<br>  use:<br>    - FILE<br>lint:<br>  use:<br>    - BASIC</pre><p><a href="https://buf.build/">Buf</a></p><p>So we’d <em>accidentally discovered amazing tooling</em> that we could use to drastically simplify how we’d manage our protobuf definitions over time. We’d also discovered a way to provide protobuf message-level semantic validation at the Nike edge using <a href="https://buf.build/bufbuild/protovalidate">protovalidate</a>.</p><h4>Semantic Validation with Protovalidate</h4><p>What is protovalidate you ask? It provides a critical missing component to the protobuf specification — runtime field level semantic validation. If you’ve used protobuf before, you are probably familiar with <strong>end-of-wire</strong> exceptions. These occur while deserializing a binary payload back into a concrete message and are more often than not caused by changing field types in a non-backwards compatible way. I bring this up since an end-of-wire exception points to exceptions due to unknown changes in the “shape” of the protobuf message — but with perfect type-saftey the message can still be invalid due to missing “fields”.</p><blockquote>If you want to understand Protobuf Best Practices. Read the <a href="https://protobuf.dev/best-practices/dos-donts/">protobuf do’s/dont’s</a>. For more on protovalidate, read on.</blockquote><p>If you think about API-level contracts, some fields are marked as “required” and other’s are marked as “optional”. This provides the end-user with an understanding of what fields they can rely on, vs which one’s may exist or not. Consistent values within the “required” fields (as well as the optional when they show up) is critical not just for APIs, but for event streams.</p><p>With protovalidate, we simply annotate our message (shown below), and we can compile-down the validation rules to then use for runtime semantic validation checks.</p><pre>message Order {<br>  // Each Order takes place at a point in Time<br>  google.protobuf.Timestamp order_created = 1 [(buf.validate.field).cel = {<br>    id: &quot;not_from_the_future&quot;,<br>    message: &quot;we are not ready to offer scheduled orders. Maybe in the future&quot;,<br>    // Ensure that the server&#39;s local time (utc) is used as a gating mechanism for sane timestamps<br>    expression: &quot;this &lt;= now&quot;<br>  }];<br>  // An Order can be purchased at a CoffeeCo Location, otherwise where is the coffee going to be made<br>  // It is true that the Store could be online, but that makes this reference more complicated than necessary<br>  coffeeco.v1.Store purchased_at = 2 [(buf.validate.field).required = true];<br>  // A Customer can Order from our Coffee Location<br>  coffeeco.v1.Customer customer = 3 [(buf.validate.field).required = true];<br>  // Each Order may have one or more items. We cannot have an Order without something to Purchase<br>  repeated coffeeco.v1.Product items = 4 [(buf.validate.field).required = true];<br>  // Each Order has a monetary value<br>  coffeeco.v1.Total total = 5 [(buf.validate.field).required = true];<br>}</pre><p>The Order above can now be validated at runtime.</p><pre>if err := s.Validator.Validate(req.Msg); err != nil {<br>  log.Println(&quot;validation failed:&quot;, err)<br>  response = err.Error()<br>} else {<br>  // do something with the valid protobuf object<br>} </pre><p>Finally, here is a longer example that showcases the full validation flow (at the go service level).</p><pre>func (s *CoffeeserviceServer) CoffeeOrder(ctx context.Context,<br> req *connect.Request[coffeeservicev1.CoffeeOrderRequest]) (*connect.Response[coffeeservicev1.CoffeeOrderResponse], error) {<br> log.Println(&quot;Request Headers: &quot;, req.Header())<br> var order = req.Msg.Order<br> <br> response := &quot;&quot;<br><br> if err := s.Validator.Validate(req.Msg); err != nil {<br>  log.Println(&quot;validation failed:&quot;, err)<br>  response = err.Error()<br> } else {<br>  data, err := proto.Marshal(order)<br>  if err != nil {<br>   log.Println(err)<br>  }<br>  err = s.Kafka.Produce(&amp;kafka.Message{<br>   TopicPartition: kafka.TopicPartition{<br>    Topic:     &amp;s.TopicName,<br>    Partition: kafka.PartitionAny,<br>   },<br>   Key:   []byte(order.Customer.Name),<br>   Value: data,<br>  }, nil)<br>  if err != nil {<br>   log.Println(&quot;Failed to Send Order : stream/coffeeco.v1.orders&quot;)<br>   response = fmt.Sprintf(&quot;We Failed to Send your Order, %s\n&quot;, order.Customer.Name)<br>  } else {<br>   log.Println(&quot;Order Published Successfully : stream/coffeeco.v1.orders&quot;)<br>   response = fmt.Sprintf(&quot;Thanks for the Order, %s\n&quot;, order.Customer.Name)<br>  }<br> }<br> log.Println(&quot;New Order: &quot;, order)<br> res := connect.NewResponse(&amp;coffeeservicev1.CoffeeOrderResponse{<br>  Response: response,<br> })<br> res.Header().Set(&quot;CoffeeService-Version&quot;, &quot;v1&quot;)<br> return res, nil<br>}</pre><p>The service above provides edge-level validation ensuring that any downstream consumer of “the data” can trust that the data being processed has been edge validated. While this may not seem like a “big deal”, in practice most data that ends up in the hands of data engineers is required to go through brutle “cleansing” steps in order to provide “non-corrupt” data for further downstream processing. <em>Why reprocess all of your data when you can instead just rely on it being “semantically valid”?</em>.</p><p>Around the same time we’d hit a wall with the time-to-first-byte cost of running gRPC across our desktop experiences (due to true page-loads on nike.com vs single-page style application architecture), and while all signs were pointing in the right direction elsewhere, this was crippling. We’d hit a potential pitfall that could break the project. <em>Or so we thought!</em></p><h4>Connect is the Missing Link for Enterprise gRPC</h4><p>During our experiments with protovalidate (originally protoc-gen-validate), we’d discovered the <a href="https://connectrpc.com/">Connect protocol</a> — another hidden treasure in the Buf treasure chest. When we thought things were going to simply fall apart, and when <em>back to square one was not on the menu, </em>we found a life line. Connect.</p><blockquote><strong>From the Docs</strong>: Connect is a family of libraries for building browser and gRPC-compatible APIs. If you’re tired of hand-written boilerplate and turned off by massive frameworks, Connect is for you</blockquote><p>Connect provides gRPC networking using the native networking clients for javascript (ecmascript) and typescript (as well as swift, kotlin, and go). This meant that all of our SDKs could lean on gRPC but with faster load times and more native integrations. This meant that we could spin up a connect session on the desktop (or mobile web) without sacrificing time to first byte, and our analytics events could be emitted using our new “unified” analytics stack.</p><p>Connect was the bridge and glue to “connect” the final set of dots, and we were well on our way to production, there was just one last thing we needed to get done. <em>We needed to make good on our promise to automate the data ingestion pipelines.</em></p><h3>Automating Data Ingestion</h3><p>At this point in our journey, things we’re smooth sailing and the crew was having a blast. We knew we were onto something great here, and had found an incredible company to partner with — Buf. There was one last thing missing from our initial brainstorming session, we’d need to prove we could automate the ingestion of our clickstream events into Databricks without a full-time dedicated team sitting around. We’d need to make sure the new clickstream could scale out and provide value in a way that most data engineers are familiar with — at the table level.</p><h4>Data Ingestion Architecture</h4><p>The ingestion architecture that we’d come up with wasn’t really novel. It was a take on <a href="https://www.youtube.com/watch?v=_PoftYRXMlQ&amp;list=PLBJeAtDBQjJMANOGMxUd30nSIhdw9bcNt&amp;index=14">some earlier work from my days at Twilio</a>, the big change was in the available tooling and in some of the changes made to Delta Lake.</p><p>I’d been riding the Delta Lake waves since around 2018, and it provided a lot of the same protocol-level invariants that we got from protobuf, just as a columnar-oriented format vs a row-based format. Given protobuf was natively supported in Apache Spark, and given that Delta Lake provided <a href="https://docs.delta.io/latest/delta-streaming.html">sophisticated streaming capabilities</a>, we could simply leverage the best of both worlds to provide end-to-end streaming.</p><figure><img alt="The Nike Analytics Ingestion Network. Utilizing Buf, BSR, and Connect to provide end to end consistency." src="https://cdn-images-1.medium.com/max/1024/1*goN46s8xt6u9_DGEW9zrOA.png" /><figcaption>End-to-End Data Ingestion: Circa 2024. Image by Author.</figcaption></figure><p>We were so proud of what we had achieved that a good friend of mine Ashok Singamaneni and I decided to present our work during the Data+AI Summit.</p><p><em>Given the length of this post, I’d suggest watching the following video if you want to dive deeper into the streaming architecture.</em></p><iframe src="https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2FBbDysZ8lF0Y%3Ffeature%3Doembed&amp;display_name=YouTube&amp;url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DBbDysZ8lF0Y&amp;image=https%3A%2F%2Fi.ytimg.com%2Fvi%2FBbDysZ8lF0Y%2Fhqdefault.jpg&amp;type=text%2Fhtml&amp;schema=youtube" width="854" height="480" frameborder="0" scrolling="no"><a href="https://medium.com/media/7763d61508a5293e7d78ec46da858de8/href">https://medium.com/media/7763d61508a5293e7d78ec46da858de8/href</a></iframe><h3>Being Brave and Standing up against the Status Quo</h3><p>It took us three months to get to production. It then took us another year to get everyone on board across the company to ensure the strategy and vision would continue as the number of people on the project grew. The biggest revalation here was that a core group of passionate engineers can accomplish a lot more than people expect, and this doesn’t just happen by burning the midnight oil. In fact, nothing ever happens for free.</p><p>The results were astonishing though. In a matter of months, the core team was able to deliver and we scaled through Black Friday and Cyber Week with a measly additional operating cost of around $20/day. This was down around 100x from the prior system and we couldn’t have done any of that without the folks from Buf. They were true silent partners offering advice and support, and helping us along our way. In all reality, if it wasn’t for the tooling provided by Buf (buf generate, buf breaking, buf lint, buf image and the connect protocol), <em>Nike would be still suffering under the weight of prior bad decisions and we’d never be able to escape the clutches of bad data decision making</em>.</p><p>Now that we were fully committed to moving forwards, it was time to solidify our partnership with the folks over at Buf and that will take us to <a href="https://newfrontcreative.medium.com/beyond-the-data-abyss-6bf2d1e6e34a"><strong>part two of this story</strong></a>, where we’ll talk about some of the problems (and solutions) that came about as we started to scale up our protobuf-first strategy.</p><p>Continue on to Part 2.</p><p><a href="https://newfrontcreative.medium.com/beyond-the-data-abyss-6bf2d1e6e34a">Beyond the Data Abyss</a></p><p>*: Not all data pipelines were terrible. There are always teams that are willing to work against the status quo. There was truthfully a lot of slop though and that happens overtime, and usually comes from a place of neglect and data hoarding.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=337770a39fbc" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[A Modest Introduction to Analytical Stream Processing]]></title>
            <link>https://medium.com/data-science/a-modest-introduction-to-analytical-stream-processing-db58b3694263?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/db58b3694263</guid>
            <category><![CDATA[deep-dives]]></category>
            <category><![CDATA[distributed-system-design]]></category>
            <category><![CDATA[stream-processing]]></category>
            <category><![CDATA[software-engineering]]></category>
            <category><![CDATA[data]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Tue, 15 Aug 2023 14:06:57 GMT</pubDate>
            <atom:updated>2024-06-07T04:14:39.974Z</atom:updated>
            <content:encoded><![CDATA[<h4>Architectural Foundations for Building Reliable Distributed Systems.</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*sFy1VJSPiOYaJIgkO92dgQ.png" /><figcaption>Distributed Streaming Data Networks are Unbounded and Growing at Incredible Rates. Image Created via <a href="https://www.midjourney.com/app/jobs/3cf0ebab-3ec4-487d-a0ea-24ef01387eae/">Author’s MidJourney</a></figcaption></figure><h3>Foundations of Stream Processing</h3><p>Foundations are the unshakable, unbreakable base upon which structures are placed. When it comes to building a successful data architecture, the data is the core central tenant of the entire system and the principal component of that foundation.</p><blockquote>Given the most common way data flows into our data platforms is through stream processing platforms like <a href="https://kafka.apache.org/">Apache Kafka</a> and <a href="https://pulsar.apache.org/">Apache Pulsar</a> this post covers this topic area.</blockquote><p>Therefore it becomes critical to ensure we (as software engineers) provide hygienic capabilities and frictionless guardrails to reduce the problem space related to data quality “after” data has entered into these fast-flowing data networks.</p><p>This means establishing api-level contracts surrounding our data’s <em>schema (types, and structure)</em>, field-level <em>availability (nullable, etc)</em>, and field-type <em>validity (expected ranges, etc) </em>become the critical underpinnings of our data foundation especially given the decentralized, distributed streaming nature of today’s modern data systems.</p><p>However, to get to the point where we can even begin to establish blind-faith — or high-trust data networks — we must first establish intelligent system-level design patterns.</p><h4>Building Reliable Streaming Data Systems</h4><p>As software and data engineers, building reliable data systems is literally our job, and this means data downtime should be measured like any other component of the business. You’ve probably heard of the terms <em>SLAs</em>, <em>SLOs</em> and <em>SLIs</em> at one point or another. In a nutshell, these acronyms are associated to the <strong>contracts</strong>, <strong>promises</strong>, and <strong>actual measures</strong> in which we grade our end-to-end systems.</p><p>As service owners, we will be held <em>accountable</em> for our own successes and failures, but with a little upfront effort, standard metadata, and common standards and best practices can ensure things are running smooth across the board.</p><p>Additionally, the same metadata can also provide valuable insights into the quality and trust of our data-in-flight, along its journey until it finds its terminal area to rest. The lineage tells a story all on its own.</p><h4>Adopting the Owners Mindset</h4><p>For example, <em>Service Level Agreements</em> (SLAs) between your team, or organization, and your customers (both internal and external) are used to create a binding contract with respect to the service you are providing. For data teams, this means identifying and capturing metrics (KPMs — key performance metrics) based on your <em>Service Level Objectives</em> (SLOs). The SLOs are the promises you intend to keep based on your SLAs, this can be anything from a promise of near perfect (99.999%) service uptime (API or JDBC), or something as simple as a promise of 90-day data retention for a particular dataset. Lastly, your <em>Service Level Indicators</em> (SLIs) are the proof that you are operating in accordance with the service level contracts and are typically presented in the form of operational analytics (dashboards) or reports.</p><p>Knowing where we want to go can help establish the plan to get there. This journey begins at the inset (or ingest point), and with the data. Specifically, with the formal structure and identity of each data point. Considering the observation that “more and more data is making its way into the data platform through stream processing platforms like Apache Kafka” it helps to have <em>compile time guarantees</em>, <em>backwards compatibility</em>, and <em>fast binary serialization</em> of the data being emitted into these data streams. Data accountability can be a challenge in and of itself. Let’s look at why.</p><h4>Managing Streaming Data Accountability</h4><p>Streaming systems operate 24 hours a day, 7 days a week, and 365 days of the year. This can complicate things if the right up front effort isn’t applied to the problem, and one of the problems that tends to rear its head from time to time is that of corrupt data, aka <em>data problems in flight</em>.</p><h3>Dealing with Data Problems in Flight</h3><p>There are two common ways to reduce data problems in flight. First, you can introduce gatekeepers at the edge of your data network that negotiate and validate data using traditional <em>Application Programming Interfaces</em> (APIs), or as a second option, you can create and compile helper libraries, or Software Development Kits (SDKs), to enforce the data protocols and enable distributed writers (data producers) into your streaming data infrastructure, you can even use both strategies in tandem.</p><h4>Data Gatekeepers</h4><p>The benefit of adding gateway APIs at the edge (in-front) of your data network is that you can enforce <em>authentication</em> (can this system access this API?), <em>authorization</em> (can this system publish data to a specific data stream?), and <em>validation</em> (is this data acceptable or valid?) at the point of data production. The diagram in Figure 1–1 below shows the flow of the data gateway.</p><figure><img alt="A Distributed Systems Architecture showing authentication and authorization layers at a Data Intake Gateway. Flowing from left to right, approved data is published to Apache Kafka for downstream processing" src="https://cdn-images-1.medium.com/max/1024/1*MNn45M-oi7lYKhXA2c119A.png" /><figcaption><strong>Figure 1–1</strong>: A Distributed Systems Architecture showing authentication and authorization layers at a Data Intake Gateway. Flowing from left to right, approved data is published to Apache Kafka for downstream processing. Image Credit by <a href="https://medium.com/u/3b4cab6af83e">Scott Haines</a></figcaption></figure><p>The <strong>data gateway service</strong> acts as the digital gatekeeper (bouncer) to your protected (internal) data network. With the main role of controlling , limiting, and even restricting unauthenticated access at the edge (see APIs/Services in figure 1–1 above), by authorizing which upstream services (or users) are allowed to publish data (commonly handled through the use of service <a href="https://docs.confluent.io/platform/current/kafka/authorization.html">ACLs</a>) coupled with a provided identity (think service identity and access <a href="https://spiffe.io/">IAM</a>, web identity and access <a href="https://jwt.io/">JWT</a>, and our old friend OAUTH).</p><p>The core responsibility of the gateway service is to validate inbound data before publishing potentially corrupt, or generally bad data. If the gateway is doing its job correctly, only “good” data will make its way along and into the data network which is the conduit of event and operational data to be digested via Stream Processing, in other words:</p><blockquote>“This means that the upstream system producing data can <em>fail fast</em> when producing data. This stops corrupt data from entering the streaming or stationary data pipelines at the edge of the data network and is a means of establishing a conversation with the producers regarding exactly why, and how things went wrong in a more automatic way via error codes and helpful messaging.”</blockquote><h4>Using Error Messages to Provide Self-Service Solutions</h4><p>The difference between a good and bad experience come down to how much effort is required to pivot from bad to good. We’ve all probably worked with, or on, or heard of, services that just fail with no rhyme or reason (null pointer exception throws random 500).</p><p>For establishing basic trust, a little bit goes a long way. For example, getting back a HTTP 400 from an API endpoint with the following message body (seen below)</p><pre>{<br>  &quot;error&quot;: {<br>    &quot;code&quot;: 400,<br>    &quot;message&quot;: &quot;The event data is missing the userId, and the timestamp is invalid (expected a string with ISO8601 formatting). Please view the docs at http://coffeeco.com/docs/apis/customer/order#required-fields to adjust the payload.&quot;  <br>  }<br>}</pre><p>provides a reason for the 400, and empowers engineers sending data to us (as the service owners) to fix a problem without setting up a meeting, blowing up the pager, or hitting up everyone on slack. When you can, remember that everyone is human, and we love closed loop systems!</p><h4>Pros and Cons of the API for Data</h4><p>This API approach has its pros and cons.</p><p>The pros are that most programming languages work out of box with HTTP (or HTTP/2) transport protocols — or with the addition of a tiny library — and JSON data is just about as universal of a data exchange format these days.</p><p>On the flip side (cons), one can argue that for any new data domain, there is yet another service to write and manage, and without some form of API automation, or adherence to an open specification like <a href="https://spec.openapis.org/oas/latest.html#format">OpenAPI</a>, each new API route (endpoint) ends up taking more time than necessary.</p><blockquote>In many cases, failure to provide updates to data ingestion APIs in a “timely” fashion, or compounding issues with scaling and/or api downtime, random failures, or just people not communicating provides the necessary rational for folks to bypass the “stupid” API, and instead attempt to directly publish event data to Kafka. While APIs can feel like they are getting in the way, there is a strong argument for keeping a common gatekeeper, especially after data quality problems like corrupt events, or accidentally mixed events, begin to destabilize the streaming dream.</blockquote><p>To flip this problem on its head (and remove it almost entirely), good documentation, change management (CI/CD), and general software development hygiene including actual unit and integration testing — enable fast feature and iteration cycles that don’t reduce trust.</p><blockquote>Ideally, the data itself (schema / format) could dictate the rules of their own data level contract by enabling field level validation (predicates), producing helpful error messages, and acting in its own self-interest. Hey, with a little route or data level metadata, and some creative thinking, the API could automatically generate self-defining routes and behavior.</blockquote><p>Lastly, gateway APIs can be seen as centralized troublemakers as each failure by an upstream system to emit valid data (eg. blocked by the gatekeeper) causes valuable information (event data, metrics) to be dropped on the floor. <em>The problem of blame here also tends to go both ways</em>, as a bad deployment of the gatekeeper can blind an upstream system that isn’t setup to handle retries in the event of gateway downtime (if even for a few seconds).</p><p>Putting aside all the pros and cons, using a gateway API to stop the propagation of corrupt data before it enters the data platform means that when a problem occurs (cause they always do), the surface area of the problem is reduced to a given service. This sure beat debugging a distributed network of data pipelines, services, and the myriad final data destinations and upstream systems to figure out that bad data is being directly published by “someone” at the company.</p><p>If we were to cut out the middle man (gateway service) then the capabilities to govern the transmission of “expected” data falls into the lap of “libraries” in the form of specialized SDKS.</p><h3>Software Development Kits (SDKs)</h3><p>SDKs are libraries (or micro-frameworks) that are imported into a codebase to streamline an action, activity, or otherwise complex operation. They are also known by another name, <em>clients</em>. Take the example from earlier about using good error messages and error codes. This process is necessary in order <em>to inform a client</em> that their prior action was invalid, however it can be advantageous to add appropriate guard rails directly into an SDK to reduce the surface area of any potential problems. For example, let’s say we have an API setup to track customer’s coffee related behavior through event tracking.</p><h4>Reducing User Error with SDK Guardrails</h4><p>A client SDK can theoretically include <em>all the tools necessary</em> to manage the interactions with the API server, including authentication, authorization, and as for validation, if the SDK does its job, the validation issues would go out the door. The following code snippet shows an example SDK that could be used to reliably track customer events.</p><pre>import com.coffeeco.data.sdks.client._<br>import com.coffeeco.data.sdks.client.protocol._<br><br>Customer.fromToken(token)<br>  .track(<br>    eventType=Events.Customer.Order,<br>    status=Status.Order.Initalized,<br>    data=Order.toByteArray<br>  )</pre><p>With some additional work (aka the client SDK), the problem of data validation or event corruption can just about go away entirely. Additional problems can be managed within the SDK itself like for example how to retry sending a request in the case of the server being offline. Rather than having all requests retry immediately, or in some loop that floods a gateway load balancer indefinitely, the SDK can take smarter actions like employing exponential backoff. See “The Thundering Herd Problem” for a dive into what goes wrong when things go, well, wrong!</p><p><strong>The Thundering Herd Problem</strong><br> <br> Let’s say we have a single gateway API server. You’ve written a fantastic API and many teams across the company are sending event data to this API. Things are going well until one day a new internal team starts to send invalid data to the server (and instead of respecting your http status codes, they treat all non-200 http codes as a reason to retry. But wait, they forgot to add any kind of retry heuristics like exponential backoff, so all requests just retry indefinitely — across an ever increasing retry queue). Mind you, before this new team came on board there was never a reason to run more than one instance of the API server, and there was never a need to use any sort of service level rate limiter either, because everything was running smoothly within the agreed upon SLAs.</p><figure><img alt="A happy cartoon whale. This is what happens when the “fail whale” is out of hot water and back into their natural habitat again." src="https://cdn-images-1.medium.com/max/1010/1*wdwkzRSSfwEGvbV9MZcLAQ.png" /><figcaption>The Not-So-Fail-Whale. What can happen when you restore problems and get back out of the hot water again. Image via <a href="https://www.midjourney.com/app/jobs/fd36ca2e-848f-4916-8125-2d0105da8fb4/">Midjourney via the Author.</a></figcaption></figure><p>Well, that was before today. Now your service is offline. Data is backing up, upstream services are filling their queues, and people are upset because their services are now starting to run into issues because of your single point of failure…<br> <br> These problems all stem from a form of resource starvation coined “The Thundering Herd Problem”. This problem occurs when many processes are awaiting an event, like system resources being available, or in this example, the API server coming back online. Now there is a scramble as all of the processes compete to attempt to gain resources, and in many cases the load on the single process (api server) is enough to take the service back offline again. Unfortunately, starting the cycle of resource starvation over again. This is of course unless you can calm the herd or distribute the load over a larger number of working processes which decreases the load across the network to the point where the resources have room to breathe again. <br> <br> While the initial example above is more of an unintentional <a href="https://www.cloudflare.com/learning/ddos/what-is-a-ddos-attack/">distributed denial of service attack</a> (DDoS), these kinds of problems can be solved at the client (with exponential backoff or self-throttling) and at the API edge via load balancing and rate limiting.</p><p>Ultimately, without the right set of eyes and ears, enabled by operational metrics, monitors and system level (<a href="https://cloud.google.com/blog/products/devops-sre/sre-fundamentals-slis-slas-and-slos">SLAs/SLIs/SLOs</a>) alerting, data can play the disappearing act, and this can be a challenge to resolve.</p><p>Whether you decide to add a <em>data gateway API</em> to the edge of your data network, employ a <em>custom SDK</em> for upstream consistency and accountability, or decide to take an alternative approach when it comes to dealing with getting data into your data platform it is still good to know what your options are. Regardless of the path in which data is emitted into your data streams this introduction to streaming data wouldn’t be complete without a proper discussion of data formats, protocols, and the topic of binary serializable data. Who knows we may just uncover a better approach to handling our data accountability problem!</p><h3>Selecting the Right Data Protocol for The Job</h3><p>When you think of structured data the first thing to come to mind might be JSON data. JSON data has structure, is a standard web-based data protocol, and if nothing else it is super easy to work with. These are all benefits in terms of getting started quickly, but over time, and without the appropriate safeguards in place, you could face problems when it comes to standardizing on JSON for your streaming systems.</p><h4>The Love / Hate Relationship with JSON</h4><p>The first problem is that JSON data is mutable. This means as a data structure it is flexible and therefore fragile. Data must be consistent to be accountable, and in the case of transferring data across a network (on-the-wire) the serialized format (binary representation) should be highly compactable. With JSON data, you must send the keys (for all fields) for each object represented across the payload. Inevitably this means that you’ll typically be sending a large amount of additional weight for each additional record (after the first) in a series of objects.</p><p>Luckily, this is not a new problem, and it just so happens that there are best practices for these kinds of things, and multiple schools of thought regarding what is the best strategy for optimally serializing data. This is not to say that JSON doesn’t have its merits. Just when it comes to laying a solid data foundation the more structure the better and the higher level of compaction the better as long as it doesn’t burn up a lot of CPU cycles.</p><h3>Serializable Structured Data</h3><p>When it comes to efficiently encoding and transferring binary data two serialization frameworks tend to always come up: <a href="https://avro.apache.org/">Apache Avro</a> and Google <a href="https://developers.google.com/protocol-buffers">Protocol Buffers</a> (protobuf). Both libraries provide CPU efficient techniques for serializing row-based data structures, and in addition to both technologies also provide their own <em>remote procedure call</em> (RPC) frameworks and capabilities. Let’s look at <em>avro</em>, then <em>protobuf</em>, and we will wrap up looking at <em>remote procedure calls</em>.</p><h4>Avro Message Format</h4><p>With <a href="https://avro.apache.org/"><em>avro</em></a>, you define declarative schemas for your structured data using the concept of records. These records are simply JSON formatted data definitions files (schemas) stored with the file type <em>avsc</em>. The following example shows a Coffee schema in the avro descriptor format.</p><pre>{<br>  &quot;namespace&quot;: &quot;com.coffeeco.data&quot;,<br>  &quot;type&quot;: &quot;record&quot;,<br>  &quot;name&quot;: &quot;Coffee&quot;,<br>  &quot;fields&quot;: [<br>    (&quot;name&quot;: &quot;id&quot;, &quot;type: &quot;string&quot;},<br>    {&quot;name&quot;: &quot;name&quot;, &quot;type&quot;: &quot;string&quot;},<br>    {&quot;name&quot;: &quot;boldness&quot;, &quot;type&quot;: &quot;int&quot;, &quot;doc&quot;: &quot;from light to bold. 1 to 10&quot;},<br>    {&quot;name&quot;: &quot;available&quot;, &quot;type&quot;: &quot;boolean&quot;}<br> ]<br>}</pre><p>Working with avro data can take two paths that diverge relating to how you want to work at runtime. You can take the compile time approach, or the figure things out on-demand at runtime. This enables a flexibility that can enhance an interactive data discovery session. For example, avro was originally created as an efficient data serialization protocol for storing large collections of data, as partitioned files, long-term within the Hadoop file system. Given data was typically read from one location, and written to another, within HDFS, avro could store the schema (used at write time) once per file.</p><h4>Avro Binary Format</h4><p>When you write a collection of avro records to disk the process encodes the schema of the avro data directly into the file itself (once). There is a similar process when it comes to Parquet file encoding, where the schema is compressed and written as a binary file footer. We saw this process firsthand, at the end of chapter 4, when we went through the process of adding StructField level documentation to our <em>StructType</em>. This schema was used to encode our DataFrame, and when we wrote to disk it preserved our inline documentation on the next read.</p><h4>Enabling Backwards Compatibility and Preventing Data Corruption</h4><p>In the case of reading multiple files, as a single collection, problems can arise in the case of schema changes between records. Avro encodes binary records as byte arrays and applies a schema to the data at the time of deserialization (conversation back from a byte array into an object).</p><p>This means you taking extra precaution to preserve backwards compatibility, otherwise you’ll find yourself running into issues with <em>ArrayIndexOutOfBounds</em> exceptions.</p><p>Broken schema promises can happen in other subtle ways too. For example, say you need to change an integer value to a long value for a specific field in your schema. Don’t. This will break backwards compatibility due to the increase in byte size from an int to a long. This is due to the use of the schema definition for defining the starting and ending position in the byte array for each field of a record. To maintain backwards compatibility, you’ll need to deprecate the use of the integer field moving forwards (while preserving it in your avro definition) and add (append) a new field to the schema to use moving forwards.</p><h4>Best Practices for Streaming Avro Data</h4><p>Moving from static avro files, with their useful embedded schemas, to an unbounded stream of well <em>binary data</em>, the main differentiator is that you need to <em>bring your own schema to the party</em>. This means that you’ll need to support backwards compatibility (in the case that you need to rewind and reprocess data before and after a schema change), as well as forward compatibility, in the case that you have existing readers already consuming from a stream.</p><p>The challenge here is support both forms of compatibility given that avro doesn’t have the ability to ignore unknown fields, which is a requirement for supporting forward compatibility. In order to support these challenges with avro, the folks at Confluence open-sourced their <a href="https://docs.confluent.io/platform/current/schema-registry/index.html">schema registry</a> (for use with Kafka) which enables schema versioning at the Kafka topic (data stream) level.</p><p>When supporting avro without a schema registry, you’ll have to ensure you’ve updated any active readers (spark applications or otherwise) to use the new version of the schema prior to updating the schema library version on your writers. The moment you flip the switch otherwise, you could find yourself at the start of an incident.</p><h3>Protobuf Message Format</h3><p>With protobuf, you define your structured data definitions using the concept of messages. Messages are written in a format that feels more like defining a struct in C. These message files are written into files with the proto filename extension. Protocol Buffers have the advantage of using <em>imports</em>. This means you can define common message types and enumerations, that can be used within a large project, or even imported into external projects enabling wide scale reuse. A simple example of creating the Coffee record (message type) using protobuf.</p><pre>syntax = &quot;proto3&quot;;<br>option java_package=&quot;com.coffeeco.protocol&quot;;<br>option java_outer_classname=&quot;Common&quot;;<br><br>message Coffee {<br>  string id       = 1;<br>  string name     = 2;<br>  uint32 boldness = 3;<br>  bool available  = 4;<br>}</pre><p>With protobuf you define your messages once, and then compile down for your programming language of choice. For example, we can generate code for Scala using the coffee.proto file using the standalone compiler from the <a href="https://scalapb.github.io/">ScalaPB</a> project (<em>created and maintained by </em><a href="https://www.linkedin.com/in/nadav-samet/"><em>Nadav Samet</em></a>), or utilize the brilliance of <a href="https://buf.build/">Buf</a>, which created an invaluable set of tools and utilities around protobuf and grpc.</p><h4>Code Generation</h4><p>Compiling protobuf enables simple code generation. The following example is taken from the /ch-09/data/protobuf directory. The directions in the chapter READMEj covers how to install ScalaPB and includes the steps to set the correct environment variables to execute the command.</p><pre>mkdir /Users/`whoami`/Desktop/coffee_protos<br>$SCALAPBC/bin/scalapbc -v3.11.1 \<br>  --scala_out=/Users/`whoami`/Desktop/coffee_protos \<br>  --proto_path=$SPARK_MDE_HOME/ch-09/data/protobuf/ \<br>  coffee.proto</pre><p>This process saves time in the long run by freeing you up from having to write additional code to serialize and deserialize your data objects (across language boundaries or within different code bases).</p><h4>Protobuf Binary Format</h4><p>The serialized (binary wire format) is <a href="https://developers.google.com/protocol-buffers/docs/encoding">encoded</a> using the concept of binary field level separators. These separators are used as markers that identify the data types encapsulated within a serialized protobuf message. In the example, coffee.proto, you probably noticed that there was an indexed marker next to each field type (string id = 1;), this is used to assist with encoding / decoding of messages on / off the wire. This means there is a little additional overhead compared to the avro binary, but if you read over the <a href="https://developers.google.com/protocol-buffers/docs/encoding">encoding specification</a>, you’ll see that other efficiencies more than make up for any additional bytes (such as bit packing, efficient handling of numeric data types, and special encoding of the first 15 indices for each message). With respect to using protobuf as your binary protocol of choice for streaming data the pros far outweigh the cons in the grand scheme of things. One of the ways in which it more than makes up for itself is with support for both backwards and forwards compatibility.</p><h4>Enabling Backwards Compatibility and Preventing Data Corruption</h4><p>There are similar rules to keep in mind when it comes to modifying your protobuf schemas like we discussed with avro. As a rule of thumb, you can change the name of a field, but you never change the type or change the position (index) unless you want to break backwards compatibility. These rules can be overlooked when it comes to supporting any kind of data in the long term and can be especially difficult as teams become more proficient with their use of protobuf. There is this need to rearrange, and optimize, that can come back to bite you if you are not careful. (See the Tip below called <em>Maintaining Data Quality Over Time</em> for more context).</p><h4>Best Practices for Streaming Protobuf Data</h4><p>Given protobuf supports both <em>*backwards</em> and <em>*forwards</em> compatibility, this means that you can deploy new writers without having to worry about updating your readers first, and the same is true of your readers, you can update them with newer versions of your protobuf definitions without worrying about a complex deploy of all your writers. Protobuf supports forward compatibility using the notion of unknown fields. This is an additional concept that doesn’t exist within the avro specification, and it is used to track the indices and associated bytes it was unable to parse due to the divergence between the local version of the protobuf and the version it is currently reading. The beneficial thing here is that you can also <em>opt-in</em>, at any point, to newer changes in the protobuf definitions.</p><p>For example, say you have two streaming applications (a) and (b). Application (a) is processing streaming data from an upstream Kafka topic (x), enhancing each record with additional information, and then writing it out to a new Kafka topic (y). Now, application (b) reads from (y) and does its thing. Say there is a newer version of the protobuf definition, and application (a) has yet to be updated to the newest version, while the upstream Kafka topic (x) and application (b) are already updated and expecting to use some new fields available from the upgrade. The amazing thing is that it is still possible to pass the unknown fields through application (a) and onto application (b) without even knowing they exist.</p><p>See <em>“Tips for maintaining good data quality over time”</em> for an additional deep dive.</p><p><strong>Tip: Maintaining Data Quality over Time</strong></p><p>When working with either <em>avro</em> or <em>protobuf</em>, you should treat the schemas no different than you would code you want to push to production. This means creating a project that can be committed to your companies <em>github</em> (or whatever version control system you are using), and it also means you <em>should</em> write unit tests for your schemas. Not only does this provides living examples of how to use each message type, but the important reason for testing your data formats is to ensure that changes to the schema don’t break backwards compatibility. The icing on the cake is that in order to unit test the schemas you’ll need to first compile the (.avsc or .proto) files and use the respective library code generation. This makes it easier to create releasable library code, and you can also use release versioning (version 1.0.0) to catalog each change to the schemas.</p><p>One simple method to enable this process is by serializing and storing a binary copy of each message, across all schema changes, as part of the project lifecycle. I have found success adding this step directly into the unit tests themselves, using the test suite to create, read and write these records directl into the project test resources directory. This way each binary version, across all schema changes, is available within the code base itself.</p><p>With a little extra upfront effort, you can save yourself a lot of pain in the grand scheme of things, and rest easy at night knowing your data is safe (at least on the producing and consuming side of the table)</p><h4>Using Buf Tooling and Protobuf in Spark</h4><p>Since writing this chapter back in 2021, <strong>Buf Build</strong> (<a href="https://buf.build/">https://buf.build/</a>) has materialized into the <em>all-things-protobuf</em> company. Their tooling is simple to use, free-and-open-source, and appeared at just the right time to power a few initatives in the Spark community. The <a href="https://spark.apache.org/">Apache Spark</a> project introduced full native support for <a href="https://github.com/apache/spark/tree/v3.4.1/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf">Protocol Buffers in Spark 3.4</a> in order to support the <a href="https://spark.apache.org/docs/latest/spark-connect-overview.html">spark-connect</a>, and are using Buf for compiling GRPC services and messages. Spark Connect is after all a GRPC native connector for embedding Spark applications outside of the JVM.</p><p>Traditional Apache Spark application must run as a driver application somewhere, and in the past this meant using <strong>pyspark</strong> or native spark, which in both cases still run on top of a JVM process.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/862/1*K9gkV0RTK4ylsVd-cvcsKw.png" /><figcaption><a href="https://github.com/apache/spark/tree/v3.4.1/connector/connect/common/src/main">Directory structure</a> via Spark Connect. Shows the protobuf definitions, along with buf.gen.yaml and buf.work.yaml which help with code generation.</figcaption></figure><p>At the end of the day, Buf Build enables peace of mind in the build process. In order to generate the code, one must run a simple command: buf generate . For simple linting and consistent formatting, buf lint &amp;&amp; buf format -w . The icing on the cake however is the breaking change detection. buf breaking --against .git#branch=origin/main is all it takes to ensure that new changes to your message definitions won’t negatively affect anything that is currently running in production. *In the future, I will do a write up on using <strong>buf </strong>for enterprise analytics, but for now, it is time to conclude this chapter.</p><p>So where were we. You now know that there are benefits to using avro or protobuf when it comes to your long-term data accountability strategy. By using these language agnostic, row-based, structured data formats you reduce the problem of long-term language lock-in, leaving the doors open to whatever the programing language is later down the line. Cause honestly it can be a thankless task to be supporting legacy libraries and code bases. Additionally, the serialized formats help to reduce the network bandwidth costs and congestion associated with sending and receiving large amounts of data. This helps as well to reduce the storage overhead costs for retaining your data long-term.</p><p>Lastly, let’s look at how these structured data protocols enable additional efficiencies when it comes to sending and receiving data across the network using remote procedure calls.</p><h3>Remote Procedure Calls</h3><p><em>RPC</em> frameworks, in a nutshell, enable <em>client</em> applications to transparently call <em>remote</em> (server-side) methods (procedures) via local function calls by passing serialized messages back and forth. The <em>client</em> and <em>server-side</em> implementations use the same <em>public interface</em> definition to define the functional <em>RPC</em> methods and services available. The Interface Definition Language (IDL) defines the protocol and message definitions and acts as a contract between the client and server-side. Let’s see this in action looking at the popular open-source RPC framework <a href="https://grpc.io/">gRPC</a>.</p><h3>gRPC</h3><p>First conceptualized and created at Google, <a href="https://grpc.io/">gRPC</a> which stands for “generic” remote procedure call, is a robust open-source framework being used for high performance services ranging from distributed database coordination, as seen with <a href="https://www.cockroachlabs.com/docs/stable/architecture/distribution-layer.html">CockroachDB</a>, to real-time analytics, as seen with <a href="https://docs.microsoft.com/en-us/azure/media-services/live-video-analytics-edge/grpc-extension-protocol">Microsofts Azure Video Analytics</a>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*qz-0jiH6R4R3ytnZYGqy0g.png" /><figcaption><strong>Figure 1–2</strong>. RPC (in this example gRPC) works by passing serializing messages to and from a client and server. The client implements the same Interface Definition Language (IDL) interface and this acts as an API contract between the client and server. (photo credit: <a href="https://grpc.io/docs/what-is-grpc/introduction/)">https://grpc.io/docs/what-is-grpc/introduction/)</a></figcaption></figure><p>The diagram shown in Figure 9–3 shows an example of gRPC at work. The server-side code is written in C++ for speed, while clients written in both ruby and java can interoperate with the service using protobuf messages as their means of communicating.</p><p>Using protocol buffers for message definitions, serialization, as well as the declaration and definition of services, gRPC can simplify how you capture data and build services. For example, let’s say we wanted to continue the exercise of creating a tracking API for customer coffee orders. The API contract could be defined in a simple services file, and from there the server-side implementation and any number of client-side implementations could be built using the same service definition and message types.</p><h4>Defining a gRPC Service</h4><p>You can define a service interface, the request and response objects, as well as the message types that need to be passed between the client and server as easily as 1–2–3.</p><pre>syntax = &quot;proto3&quot;;<br><br>service CustomerService {<br>    rpc TrackOrder (Order) returns (Response) {}<br>    rpc TrackOrderStatus (OrderStatusTracker) returns (Response) {}<br>}<br><br>message Order {<br>    uint64 timestamp    = 1;<br>    string orderId      = 2;<br>    <br>    string userId       = 3;<br>    Status status       = 4;<br>}<br><br>enum Status {<br>  unknown_status = 0;<br>  initalized     = 1;<br>  started        = 2;<br>  progress       = 3;<br>  completed      = 4;<br>  failed         = 5;<br>  canceled       = 6;<br>}<br><br>message OrderStatusTracker {<br>  uint64 timestamp = 1;<br>  Status status    = 2;<br>  string orderId   = 3;<br>}<br><br>message Response {<br>    uint32 statusCode = 1;<br>    string message    = 2;<br>}</pre><p>With the addition of gRPC, it can be much easier to implement, and maintain both the server-side and client-side code used within your data infrastructure. Given that protobuf supports backwards and forwards compatibility, this means that older gRPC clients can still send valid messages to newer gRPC services without running into common problems and pain points (discussed earlier under “Data Problems in Flight”).</p><h4>gRPC speaks HTTP/2</h4><p>As a bonus, with respect to modern service stacks, gRPC is able to use HTTP/2 for its transport layer. This also means you can take advantage of modern data meshes (like <a href="https://www.envoyproxy.io/">Envoy</a>) for proxy support, routing and service level <a href="https://www.envoyproxy.io/docs/envoy/v1.17.2/api-v2/config/filter/http/ext_authz/v2/ext_authz.proto#envoy-api-file-envoy-config-filter-http-ext-authz-v2-ext-authz-proto">authentication</a>, all while also reduce the problems of TCP packet congestion seen with standard HTTP over TCP.</p><p>Mitigating data problems in flight and achieving success when it comes to data accountability starts with the data and fans outwards from that central point. Putting processes in place when it comes to how data can enter into your data network should be considered a prerequisite to check off before diving into the torrent of streaming data.</p><h3>Summary</h3><p>The goal of this post is to present the moving parts, concepts, and background information required to arm ourselves before blindly leaping from a more traditional (stationary) batch-based mindset to one that understandings the risks and rewards of working with real-time streaming data.</p><p>Harnessing data in real-time can lead to fast, actionable insights, and open the doors to state-of-the-art machine learning and artificial intelligence.</p><p>However, distributed data management can also become a data crisis if the right steps aren’t taken into consideration ahead of time. Remember that without a strong, solid data foundation, built on top of valid (trustworthy) data, that the road to real-time will not be a simple endeavor, but one has its fair share of bumps and detours along the way.</p><p>I hope you enjoyed the second half of Chapter 9. To read the first part of this series, head on over to <a href="https://medium.com/towards-data-science/a-gentle-introduction-to-stream-processing-f47912a2a2ea">A Gentle Introduction to Analytical Stream Processing</a>.</p><p><a href="https://towardsdatascience.com/a-gentle-introduction-to-stream-processing-f47912a2a2ea">A Gentle Introduction to Analytical Stream Processing</a></p><p>— — — — — — — — — — — — — — — — — — — — — — — —</p><p>If you want to find dig in even deeper, please check out my book, or support me with a high five.</p><p><a href="https://www.amazon.com/Modern-Engineering-Apache-Spark-Hands/dp/1484274512">Modern Data Engineering with Apache Spark: A Hands-On Guide for Building Mission-Critical Streaming Applications</a></p><p>If you have access to <a href="https://medium.com/u/fbfa235a954c">O’Reilly Media</a> then you can also read the book entirely for free (good for you, not so good for me), but please find the book for free somewhere if you have the opportunity, or get an ebook to save on shipping cost (or needing to find a place for a 600+ page book).</p><p><a href="https://learning.oreilly.com/library/view/modern-data-engineering/9781484274521/">Modern Data Engineering with Apache Spark: A Hands-On Guide for Building Mission-Critical Streaming Applications</a></p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=db58b3694263" width="1" height="1" alt=""><hr><p><a href="https://medium.com/data-science/a-modest-introduction-to-analytical-stream-processing-db58b3694263">A Modest Introduction to Analytical Stream Processing</a> was originally published in <a href="https://medium.com/data-science">TDS Archive</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[A Gentle Introduction to Analytical Stream Processing]]></title>
            <link>https://medium.com/data-science/a-gentle-introduction-to-stream-processing-f47912a2a2ea?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/f47912a2a2ea</guid>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[engineering]]></category>
            <category><![CDATA[deep-dives]]></category>
            <category><![CDATA[mental-models]]></category>
            <category><![CDATA[software-engineering]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Fri, 31 Mar 2023 14:57:35 GMT</pubDate>
            <atom:updated>2024-02-09T05:47:04.143Z</atom:updated>
            <content:encoded><![CDATA[<h4>Building a Mental Model for Engineers and Anyone in Between</h4><figure><img alt="The image captures streams naturally coalescing into a waterfall. It is gentle before things begin to move faster, this is similar to data networks and streaming environments." src="https://cdn-images-1.medium.com/max/1024/1*2RtSIbeOitXpjJrk9srIjg.png" /><figcaption>Stream Processing can be handled gently and with care, or wildly, and almost out of control! You be the judge of what future you’d rather embrace. credit: <a href="https://unsplash.com/@psalms">@psalms</a> <a href="https://unsplash.com/photos/o3Ggpo3BvqM">original_photo</a></figcaption></figure><h3>Introduction</h3><p>In many cases, processing data in-stream, or as it becomes available, can help reduce an enormous data problem (due to the volume and scale of the flow of data) into a more manageable one. By processing a smaller set of data, <em>more often</em>, you effectively divide and conquer a data problem that may otherwise be cost and time prohibitive.</p><blockquote>How you transition from a batch mindset to a streaming mindset although can also be tricky, so let’s start small and build.</blockquote><h4>From Enormous Data back to Big Data</h4><p>Say you are tasked with building an analytics application that must process around <em>1 billion events</em> (1,000,000,000) a day. While this might feel far-fetched at first, due to the sheer size of the data, it often helps to step back and think about the intention of the application (what does it do?) and what you are processing (what does the data look like)? Asking yourself if the event data can be broken down (divided and partitioned) and processed in parallel as a streaming operation (aka in-stream), or must you process things in series, across multiple steps? In either case, if you modify the perspective of the application to look at bounded windows of time, then you now only need to create an application that can ingest, and processing, a mere <em>11.5 thousand (k) events a second</em> (or around 695k events a minute if the event stream is constant), which is an easier number to rationalize.</p><p>While these numbers may still seem out of reach, this is where distributed stream processing can really shine. Essentially, you are reducing the perspective, or scope, of the problem, to accomplish a goal over time, across a partitioned data set. While not all problems can be handled in-stream, a surprising number of problems do lend themselves to this processing pattern.</p><p><strong><em>Note</em></strong><em>: This chapter is part of my book </em><a href="https://www.amazon.com/Modern-Engineering-Apache-Spark-Hands/dp/1484274512"><em>“Modern Data Engineering with Apache Spark: A Hands-On Guide for Building Mission-Critical Streaming Applications”</em></a><em>. The book takes you on the journey building from simple scripting, to composing applications, and finally deploying and monitoring your mission critical Apache Spark applications.</em></p><h3>What you will learn this Chapter</h3><p>This chapter will act as a gentle introduction to stream processing making room for us to jump directly into building our own end to end Structured Streaming application in <a href="https://github.com/newfront/spark-moderndataengineering/tree/main/ch-10">chapter 10</a> without the need to backtrack and discuss a lot of the theory behind the decision-making process.</p><p>By the end of the chapter, you should understand the following (at a high level):</p><ol><li>How to Reduce Streaming Data Problems in Data Problems <em>over Time</em></li><li>The Trouble with Time, Timestamps, and Event Perspective</li><li>The Different Processing Modes for Shifting from a Batch to Streaming Mental Model</li></ol><h3>Stream Processing</h3><p>Streaming data is <em>not stationary</em>. In fact, you can think of it as being alive (if even for a short while). This is because streaming data is data that encapsulates the <em>now, it records events and actions as they occur in flight</em>. Let’s look at a practical, albeit theoretical, example that begins with a simple event stream of sensor data. Fix into your mind’s eye the last parking lot (or parking garage) you visited.</p><h3>Use Case: Real-Time Parking Availability</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*byrdBWeJ_RK4xm84dK82zg.png" /><figcaption>Parking is a Nightmare: The problem with most parking infrastructure, or a common pain point for the customer, is more often than not finding an available spot while still being able to get places on time. <a href="https://unsplash.com/photos/k1AFA4N8O0g">Photo via Unspash</a> and <a href="https://unsplash.com/@ryansearle">@ryansearle</a></figcaption></figure><p>Imagine you just found a parking spot all thanks to some helpful signs that pointed you to an open space. Now let’s say that this was all because of the data being emitted from a connected network of local parking sensors. Sensors which operate with the sole purpose of <em>being used to identify the number of available parking spaces</em> available at that precise moment in time.</p><p>This is a real-time data problem where the real-time accuracy is both measurable, as well as physically noticeable, by a user of the parking structure. Enabling these capabilities all began with the declaration of the system scenario.</p><blockquote><strong>Product Pitch</strong>: “We’d like to create a system that keeps track of the status of all available parking spaces, that identifies when a car parks, how long the car remains in a given spot, and lastly this process should be automated as much as possible”</blockquote><p>Optimizing a system like this can begin with <em>a simple sensor</em> located in each parking spot (associated with a sensor.id / spot.id reference). Each sensor would be responsible for emitting data in the form of <strong>an event</strong> with a spot identifier, timestamp, and simple bit (0 or 1), to denote if a spot is empty or occupied. This data can then be encoded into a compact message format, like the example from Listing 9–1, and be efficiently sent periodically from each parking spot.</p><p><strong>Listing 9–1</strong>. An example sensor event (encapsulated in the <a href="https://protobuf.dev/">Google Protocol Buffer</a> message format) is shown for clarity.</p><pre>message ParkingSensorStatus {<br>  uint32 sensor_id = 1;<br>  uint32 space_id = 2;<br>  uint64 timestamp = 3;<br>  bool available = 4;<br>}</pre><p>During the normal flow of traffic throughout the day, <strong>the state</strong> (availability of a parking spot) via the sensors would flip on or off (<strong>binary states</strong>) as cars arrive or leave each spot. This behavior is unpredictable due to the dynamic schedules of each individual drivers, but patterns always emerge at scale.</p><p>Using the real-time state provided by the collected sensor data , it is easily feasible to build real-time, real-life (IRL) “reporting” to update drivers on the active state of the parking structure: is the parking infrastructure full, or not, and if it isn’t full, that there are <em>now X total number of available spots</em> in the garage.</p><h4>What the Sensor Data Achieves</h4><p>This data can help to automate the <em>human decision-making process</em> for drivers and could even be made available online, through a simple web service, for real-time status tracking, since ultimately drivers just want to park already and not waste time! Additionally, this data can also be used to track when each sensor last checked in (refreshed) which can be used to diagnosis faulty sensors, and even track how often sensors go offline or fail.</p><p>Nowadays, more technologically advanced garages even go so far as to direct the driver (via directional signs and cues) to the available spots within the structure. This acts to both reduce inter-garage traffic and congestion, which in turn raises customer satisfaction, all by simply capturing a live stream of sensor data and processing it in near-real-time.</p><h4>Surge Pricing and Data Driven Decision Making</h4><p>Given the temporal (timestamp) information gathered from these streams of sensor events, a savvy garage operation could use prior trends to even decrease or increase the daily or hourly prices, based on the demand for parking spots, with respect to current availability (number of spots left) in real-time. By optimizing the pricing (within realistic limits) an operator could find the perfect threshold where the price per hour / price per day leads to a full garage more times than it doesn’t. In other words, <em>“at what price will most people park and spots don’t go unused?”</em>.</p><p>This is an example of an optimization problem that stems from the collection of real-time sensor data. It is becoming more common for organizations to look at how they reuse data to solve multiple problems at the same time. The <em>Internet of Things</em> (IOT) use cases are just one of the numerous possible streams of data you could be working with when writing streaming applications.</p><p>Earlier in the book we discussed “creating a system that could take information about Coffee store occupancy, which would inform folk what shop nearest to them has seating for a party of their size” at that point in the story we simply created a synthetic table that could be joined to showcase this example, but this is another problem that can be solved with sensors, or something as simple as a check-in system, that emits relevant event data to be passed reliably downstream via our friend the streaming data pipeline.</p><p>Both examples discussed here (parking infrastructure and coffee empire expansion) employ basic analytics (statistics) and can benefit from simple machine learning to uncovering new patterns of behavior that lead to more optimal operations. Before we get too far ahead of ourselves, let’s take a short break to dive deeper into the capabilities streaming data networks provide.</p><h3>Time Series Data and Event Streams</h3><p>Moving from a stationary data mindset, about a fixed view or moment in time, to one that interprets data as it flows over time, in terms of streams of unbounded data across many views and moments in time, is an exercise in perspective but also one that can be challenging to adopt at first. Often when you think about streaming systems, the notion of streams of continuous events bubble to the surface. This is one of the more common use cases and can be used as more of a gentle introduction to the concept of <em>streaming data</em>. Take for example the abstract time series shown in <strong>Figure 9–1</strong>.</p><figure><img alt="Time is unbounded. How we perceive time is bounded to a scope. This is represented as a contiguous line with views (windows) over time represented by w sub 1, and broken into finite moments represented by T sub 1 to T sub 4" src="https://cdn-images-1.medium.com/max/1024/1*umwtU07GaUb41dHBlL4iRw.png" /><figcaption><strong>Figure 9–1</strong>: Events occur at precise moments of time and can be collected and processed individually (t1-&gt;t4), or can be aggregated across windows of time (w1). Image Credit: Author (Scott Haines)</figcaption></figure><p>As you can see, data itself exists across various states depending on the perspective or vantage point applied by a given system (or application). Each event (T1-&gt;T4) individually understand only <em>what has occurred</em> within their narrow pane of reference, or to put that differently, events capture a limited (relative) perspective of <em>time</em>. When a series of events are processed together in a bounded collection (window), then you have a series of data points (events) that encapsulate either <em>fully realized ideas</em>, or <em>partially realized ideas</em>. When you zoom out and look at the entire timeline then you can paint a more accurate story of what happened from first event to last.</p><p>Let’s take this idea one step further.</p><h3>Do Events Stand Alone?</h3><p>Consider this simple truth. Your event data exists as a complete idea, or as partial ideas or thoughts. I have found that thinking of data as a story over time helps to give life to these bytes of data. Each data point is therefore responsible for helping to compose a complete story, <em>as a series of interwoven ideas and thoughts that assemble or materialize over time</em>.</p><p>Data composition is a useful lens through which to look as you work on adopting a distributed data view of things. I also find it lends itself well while building up and defining new distributed data models, as well as, while working on real world data networks (fabrics) at scale. Viewed as a composition, these events come together to tell <em>a specific story</em>, whose event-based breadcrumbs can inform of the order in which something came to be and is greatly enhanced with the timestamp of each occurrence. Events without time paint a flat view of how something occurred while the addition of time grants you the notion of momentum or speed, or a slowing down and stretching of the time between events or for a full series of data points. Understanding the behavior of the data flowing through the many pipelines and data channels is essential to data operations and requires reliable monitoring to keep data flowing at optimal speeds.</p><p>Let’s look at a use case where the dimension of time helps paint a better story of a real-world scenario.</p><h3>Use Case: Tracking Customer Satisfaction</h3><figure><img alt="A welcoming, simple and clean, independent coffee shop. A barista is seen making a drink behind the bar." src="https://cdn-images-1.medium.com/max/1024/0*dtoX_xE35-AO4X4u" /><figcaption>A quiet coffee shop pouring love with every cup. Photo by <a href="https://unsplash.com/@nputra?utm_source=medium&amp;utm_medium=referral">Nafinia Putra</a> on <a href="https://unsplash.com?utm_source=medium&amp;utm_medium=referral">Unsplash</a></figcaption></figure><blockquote>Put yourself in the shoes of a data engineer working with the data applications feature teams in a fake coffee empire named “CoffeeCo”, the conversation is about what data paints a good story of customer satisfaction over time (time series analysis).</blockquote><p>What if I told you <strong><em>two customers</em></strong> came into our coffee shop, ordered drinks and left the store with their drinks. You might ask me why I bothered to tell you that since that is what happens in coffee shops. What if I told you that the two coffee orders were made <strong><em>around the same</em> </strong>and that <em>the first customer in the story was in and out of the coffee shop in under five minutes</em>. What if I told you, it was <strong>a weekday</strong>, and this story took place <strong>during morning rush hour</strong>? What if I told you that the second customer, who happened to be next in line (or right after the first customer) and was in the coffee shop for thirty minutes? You might ask if the customer stayed to read the paper or maybe use the facilities. <em>Both are valid questions</em>.</p><p>If I told you that the second customer was waiting around because of an error that occurred <em>between step 3 and 4</em> of a four-step <strong>coffee pipeline, </strong>then we’d have a better understanding of how to streamline the customer experience in the future.</p><p>The four steps are:</p><p>1. <strong>Customer Orders: </strong>{customer.order:initialized}<br>2. <strong>Payment Made {</strong>customer.order:payment:processed}<br>3. <strong>Order Queued</strong>: {customer.order:queued}<br>4. <strong>Order Fulfilled</strong>: {customer.order:fulfilled}</p><p>Whether the error was in the automation, or because of a breakdown in the real-world system (printer jam, barista missed an order, or any other reason), the result here is that the customer needed to step in (human in the loop) and inform the operation (coffee pipeline) that <em>“it appears that someone forgot to make my drink”</em>.</p><p>At this point the discussion could turn towards how to handle the customers emotional response, which could swing widely across both positive and negative reactions: from happy to help (1), to mild frustration (4), all the way to outright anger (10) at the delay and breakdown of the coffee pipeline. But by walking through a hypothetical use case, we are all now more familiar with how the art of capturing good data can be leveraged for all kinds of things.</p><h4>The Event Time, Order of Events Captured, and the Delay Between Events All Tell a Story</h4><p>Without the knowledge of <em>how much time elapsed</em> from the first event (customer.order:initialized) until the terminal event (customer.order:fulfilled), or how long each step typically takes to accomplish, we’d have no way to score the experience or really understand what happened, essentially creating a blind spot to abnormal delays or faults in the system. It pays to know the statistics (average, median, and 99th percentiles) of the time a customer typically waits for a variable sized order, as these historic data points can be used via automation to step in to fix a problem preemptively when, for example, an order is taking longer than expected. It can literally mean the difference between an annoyed customer, and a lifetime customer.</p><p>This is one of the big reasons why companies solicit feedback from their customers — be it a thumbs up / thumbs down on an experience, rewarding application-based participation (spend your points on free goods and services), and to track real-time feedback like in the case of “your order is taking longer than expected, here is $2 off your next coffee. Just use the app to redeem”. This data, collected and captured through real-world interactions, encoded as events, and processed for your benefit, are worth it in the end if it positively affects the operations and reputation of the company. Just be sure to follow data privacy rules and regulations and ultimately don’t creep out your customers.</p><p>This little thought experiment was intended to shed light on the fact that the details captured within your event data (as well as the lineage of the data story over time) can be a game changer and furthermore that time is the dimension that gives these journeys momentum or speed. There is just one problem with time.</p><h3>The Trouble with Time</h3><p>While events occur at <em>precise moments in time</em> the trouble with time is that it is also subject to the problems of time and space (location). Einstein used his theory of relativity to explain this problem on a cosmic scale, but this is also a problem on a more localized scale as well. For example, I have family living in different parts of the United States. It can be difficult to <em>coordinate time</em> where everyone’s <em>schedule syncs up</em>. This happens for simple events like catching up with everyone over video (remotely) or meeting up in the real-world for reunions (locally). Even when everything is all coordinated, people have a habit of just <em>running a little bit late</em>.</p><p>Zooming out from the perspective of my family, or people in general, with respect to central coordination of events, you will start to see that the problem isn’t just an issue relating to synchronization across time zones (east / central or west coast), but if you look closer you can see that time, relative to our local / physical space, is subject to some amount of temporal drift or clock skew.</p><p>Take the modern digital clock. It runs as a process on your smart phone, watch or any number of many “smart” connected devices. What remains constant is that time stays noticeably in sync (even if the drift is on the order of milliseconds). Many people still have analog, non-digital, clocks. These devices run the full spectrum from incredibly accurate, in the case of high-end watches (“timepieces”) to cheap clocks that sometimes need to be reset every few days.</p><p>The bottom line here is that it is rare that two systems agree on the precise time in the same way that two or more people share similar trouble coordinating within both time and space. Therefore, a central reference (point of view) must be used to synchronize the time with respect to systems running across many time zones.</p><h4><strong>Correcting Time</strong></h4><p>Servers running in any modern cloud infrastructures utilize a process called <a href="https://en.wikipedia.org/wiki/Network_Time_Protocol">Network Time Protocol</a> (NTP) to correct the problem of time drift. The <em>ntp</em> process is charged with synchronizing the local server clock using a reliable central time server. This process corrects the local time to within a few milliseconds of the Universal Coordinated Time (UTC). This is an important concept to keep in mind since an application running within a large network, producing event data, will be responsible for creating timestamps, and these timestamps need to be precise in order for distributed events to line up. There is also the sneaky problem of daylight savings (gain or lose an hour ever 6 months) so coordinating data from systems across time zones as well as across local datetime semantics (globally) requires time to be viewed from this central, synchronized, perspective.</p><p>We’ve looked at time as it theoretically relates to event-based data but to round out the background we should also look at time as it relates to the <em>priority</em> in which data needs to be captured and processed within a system (streaming or otherwise).</p><h3>Priority Ordered Event Processing Patterns</h3><p>You may be familiar with this quote. Time is of the essence. This is a way of saying something is important and a top priority. The speed to resolution matters. This sense of priority can be used as an instrument, or defining metric, to make the case for <em>real-time</em>, <em>near-real-time</em>, <em>batch</em> or <em>eventual (on-demand) processing</em> when process critical data. These four processing patterns handles time in a different way by creating a narrow, or wide focus on the data problem at hand. The scope here is based on the speed in which a process must complete which in turn limits the complexity of the job as a factor of time. Think of these styles of processing as being deadline driven, there is only a certain amount of time in which to complete an action.</p><h3>Real-Time Processing</h3><p>The expectations of real-time systems are that end-to-end latency from the time an upstream system emits an event, until the time that event is processed and available to be used for analytics and insights, occurs in the milliseconds to low seconds. These events are emitted (written) directly to an event stream processing service, like Apache Kafka, which under normal circumstances enables listeners (consumers) to immediately use that event once it is written. There are many typical use cases for true real-time systems, including logistics (like the parking space example as well as finding a table at a coffee shop), and then processes that impact a business on a whole new level like fraud detection, active network intrusion detection or other bad actor detection where the longer the mean time to detection (average milliseconds / seconds to detection) can lead to devastating consequences both in terms of reputation, financially or both.</p><p>For other systems, it is more than acceptable to run in near real-time. Given that answering tough problems requires time, real-time decision making requires a performant, pre-computed or low-latency answer to the questions it will ask. This really is pure in-memory stream processing.</p><h3>Near Real-Time Processing</h3><p>Near real-time is what most people think of when they consider real-time. A similar pattern occurs here as you just read about under Real-Time, the only difference is that the expectations of end-to-end latency are relaxed to a high number of seconds to a handful of minutes. For most systems, there is no real reason to react immediately to every event as it arrives, so while time is still of the essence, the priority of the SLA for data availability is extended.</p><p>Operational dashboards and metric systems that are kept up to date (refreshing graphs and checking monitors every 30s — 5 minutes) are usually fast enough to catch problems and give a close representation of the world. For all other data systems, you have the notion of batch or on-demand.</p><h3>Batch Processing</h3><p>We covered batch processing and reoccurring scheduling in the last two chapters but for clarity having periodic jobs that push data from a reliable source of truth (data lake or database) into other connected systems has been, and continues, to be how much of the worlds data is processed.</p><p>The simple reason for this is cost. Which factors down to both the cost of operations and the human cost for maintaining large streaming systems.</p><p>Streaming systems demand full time access to a variable number of resources from CPUs and GPUs to Network IO and RAM, with an expectation that these resources won’t be scarce since delays (blockage) in stream processing can pile up quick. Batch on the other hand can be easier to maintain in the long run assuming the consumers of the data understand that there will always be a gap from the time data is first emitted upstream, until the data becomes available for use downstream.</p><p>The last consideration to keep in mind is on-demand processing (or just-in-time processing).</p><h3>On-Demand or Just-In-Time Processing</h3><p>Let’s face it. Some questions (aka queries) are asked so rarely, or in a way that is just not suitable to any predefined pattern.</p><p>For example, custom reporting jobs and exploratory data analysis are two styles of data access that lend themselves nicely to these paradigms. Most of the time, the backing data to answer these queries is loaded directly from the data lake, and then processed using shared compute resources, or isolated compute clusters. The data that is made available for these queries may be the by-product of other real-time or near-real-time systems, that were processed and stored for batch or historic analysis.</p><p>Using this pattern data, can be defrosted, and loaded on-demand by importing records from slower commodity object storage like Amazon S3 into memory, or across fast-access solid state drives (SSDs), or depending on the size, format, and layout of the data, can be queried directly from the cloud object store. This pattern can be easily delegated to Apache Spark using <em>SparkSQL</em>. This enables ad-hoc analysis via tools like Apache Zeppelin, or directly in-app through JDBC bindings using the <a href="https://spark.apache.org/docs/latest/sql-distributed-sql-engine.html">Apache Spark thrift-server</a> and the Apache Hive Metastore.</p><p>The differentiator between these four flavors of processing is <em>time</em>.</p><p>Circling back to the notion of views and perspective, each approach or pattern, has its <em>time and place</em>. Stream processing deals with events captured at specific <em>moments in time</em> and as we’ve discussed during the first half of this chapter, how we associate time and how we capture and measure a series of events (as data) all come together to paint a picture of what is happening now, or what has happened in the past. As we move through this gentle introduction to stream processing it is important to also talk about the foundations of stream processing. In this next section, we’ll walk through some of the common problems and solutions for dealing with continuous, unbounded streams of data. It would only make sense to therefore discuss data as a central pillar and expand outward from there.</p><p>I hope you enjoyed the first half of Chapter 9. If you’d like to go ahead and read part 2. It is linked below. 👇</p><p><a href="https://towardsdatascience.com/a-modest-introduction-to-analytical-stream-processing-db58b3694263">A Modest Introduction to Analytical Stream Processing</a></p><p>If you want to find out more, please check out my book!</p><p><a href="https://www.amazon.com/Modern-Engineering-Apache-Spark-Hands/dp/1484274512">Modern Data Engineering with Apache Spark: A Hands-On Guide for Building Mission-Critical Streaming Applications</a></p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=f47912a2a2ea" width="1" height="1" alt=""><hr><p><a href="https://medium.com/data-science/a-gentle-introduction-to-stream-processing-f47912a2a2ea">A Gentle Introduction to Analytical Stream Processing</a> was originally published in <a href="https://medium.com/data-science">TDS Archive</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Working with Spark SQL Time Functions]]></title>
            <link>https://newfrontcreative.medium.com/working-with-spark-sql-time-functions-aa85ed34295e?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/aa85ed34295e</guid>
            <category><![CDATA[data-preprocessing]]></category>
            <category><![CDATA[analytics]]></category>
            <category><![CDATA[spark-sql]]></category>
            <category><![CDATA[spark]]></category>
            <category><![CDATA[data-engineering]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Tue, 07 Mar 2023 16:28:00 GMT</pubDate>
            <atom:updated>2023-03-31T15:00:09.032Z</atom:updated>
            <content:encoded><![CDATA[<h3>Working with Time Functions in Spark SQL</h3><p>A Hands-On Guide to Time with Apache Spark</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*1mQ1KbIJPteKYmvc10tD7Q.jpeg" /><figcaption>Time is tricky. Time is also very important for Batch and Streaming Analytics. Photo by <a href="https://unsplash.com/@jontyson?utm_source=medium&amp;utm_medium=referral">Jon Tyson</a> on <a href="https://unsplash.com/?utm_source=medium&amp;utm_medium=referral">Unsplash</a></figcaption></figure><blockquote>Over the next few weeks, and in preparation for the 1 year anniversary of my book “<a href="https://www.amazon.com/Modern-Engineering-Apache-Spark-Hands/dp/1484274512">Modern Data Engineering with Apache Spark: A Hands-on Guide for Building Mission Critical Streaming Applications</a>”, I will start to share more content from the book, and <a href="https://github.com/newfront/spark-moderndataengineering/tree/main/ch-12">source code</a>.</blockquote><h3>Working with Timestamps and Dates</h3><p>Working with time can be a difficult thing. You learned in <strong>chapter 9</strong> how clock drift and incorrect time zone information can cause problems (<em>the trouble with time)</em> and how issues can be generally resolved simply by standardizing on UTC and synchronizing clock drift on your servers using NTP. While standardizing can help future projects, you may find yourself needing to work with time in different ways, or even to correct and normalize timestamps.</p><h4>How to Use the Following Code Snippets</h4><p>You can install <a href="https://spark.apache.org/downloads.html">Spark Locally</a>, use <a href="https://community.cloud.databricks.com/login.html">Databricks Community Edition</a>, or use <a href="https://zeppelin.apache.org/docs/latest/quickstart/install.html#using-the-official-docker-image">Apache Zeppelin</a> docker run -p 8080:8080 --rm --name zeppelin apache/zeppelin:0.10.0</p><h3>Common Date and Timestamp functions</h3><p>Starting from your empty note (in Zeppelin or Databricks), copy the code block (listing 12–3) and run the paragraph. This will install the <a href="https://spark.apache.org/docs/latest/api/sql/index.html">Spark SQL Functions</a>, and then the SQL statement generates a row with columns representing the date and time information captured by Spark at runtime.</p><pre>%spark<br>import org.apache.spark.sql.functions._<br><br>spark.sql(&quot;&quot;&quot;<br>SELECT current_timestamp() as ts,<br>current_timezone() as tz,<br>current_date() as date,<br>TIMESTAMP &#39;yesterday&#39; as yesterday,<br>TIMESTAMP &#39;today&#39; as today,<br>TIMESTAMP &#39;tomorrow&#39; as tomorrow<br>&quot;&quot;&quot;).show(6,0,true)</pre><p><strong><em>Listing 12–3</em></strong><em>. Using Spark SQL to get a sense of time</em></p><p>The output of the resulting row will be a snapshot of standard system date and time information collected when the code is evaluated.</p><pre>-RECORD 0 - - - - - - - - - - - - - - - -<br>ts | 2021–09–12 00:54:03.691464<br>tz | Etc/UTC<br>date | 2021–09–12<br>yesterday | 2021–09–11 00:00:00<br>today | 2021–09–12 00:00:00<br>tomorrow | 2021–09–13 00:00:00</pre><p>The current_timestamp, current_timezone, current_date and the <em>TIMESTAMP</em> constants yesterday, today, and tomorrow are all higher order Spark SQL datetime functions. Next, we will recreate the same output use the Spark DSL functions directly.</p><h3>Applying Higher-Order Functions using withColumn</h3><p>Create a new paragraph. Inside the paragraph we will create a single 1x1 (row/column) <em>DataFrame</em> storing only a timestamp. Mimicking the <em>current_timestamp</em> expression, listing 12–4 shows how you can wrap a simple Java <em>Instant</em> to replicate the <em>current_timestamp</em> expression.</p><pre>%spark<br>val tsDf = Seq(Instant.now).toDF(&quot;ts&quot;)</pre><p><strong><em>Listing 12–4</em></strong><em>. Create a DataFrame with a single row and a single column storing a Timestamp</em></p><p>This simple technique (12–4) takes advantage of implicit conversions to encode a Scala <em>Seq[Instant]</em> as a Catalyst Row (DataFrame) with a <em>TimestampType</em> column. Using the <em>tsDf DataFrame</em> we can now add additional columns using the withColumn method on the DataFrame.</p><pre>DataFrame.withColumn(colName, col)</pre><p>The <em>withColumn</em> method is used to add a new column iteratively across all rows of a DataFrame. The col (Column) parameter is a powerful primitive that encapsulates a columnar SQL expression and can be used to enable User Defined Functions to your applications. It is important to keep in mind that withColumn can only reference data from within adjacent columns of a row. There are other techniques that can be used to process all rows or a subset of rows to generate derived aggregates using window functions.</p><p>The following code block (listing 12–5) adds a column literal containing the time zone as well as the derived date using the column operation to_date.</p><pre>%spark<br>import org.apache.spark.sql.functions._<br>import org.apache.spark.sql.types._<br>val dtInfoDf = tsDf<br>.withColumn(&quot;tz&quot;,<br>lit(spark.conf.get(&quot;spark.sql.session.timeZone&quot;))<br>)<br>.withColumn(&quot;date&quot;, to_date($&quot;ts&quot;))</pre><p><strong><em>Listing 12–5</em></strong><em>. Using withColumn to add a column literal, “literally this column is exactly what you see”, and a derived date column using to_date</em></p><p>The result of the transformation is a new DataFrame representing the combination of two new columns added to the source DataFrame. Remember however that because Spark operates lazily, no actual work will be executed until you arrive at a specific action. So, the operation from Listing 12–5 adds the promise to provide the new date column when the time comes to execute on the final series. The first column uses a special Column named lit. The <em>lit</em> column wraps and encodes the underlying data type as a typed Column using an implicit typed encoder (if available). The second column (date) is referred to as a derived column. When you use the <em>to_date</em> function and pass a source column ($”ts”), Spark will use the reference column (ts) to generate a DateType.</p><p>Printing the schema of <em>dtInfoDf</em> at this point, you would see the following.</p><pre>root<br>| - ts: timestamp (nullable = true)<br>| - tz: string (nullable = false)<br>| - date: date (nullable = true)</pre><p>Now, tackling the rest of the missing columns needed to recreate the DataFrame from Listing 12–3 leaves the columns <em>yesterday</em>, <em>today</em>, and <em>tomorrow</em>.</p><h3>Using Date Addition and Subtraction</h3><p>The datetime functions <em>date_sub</em>, <em>date_add</em>, as well as a simple <em>cast</em> expression can be combined to derive yesterday, today, and tomorrow as shown in listing 12–6.</p><pre>tsDf<br>…<br>.withColumn(&quot;yesterday&quot;,<br>date_sub($&quot;date&quot;, 1).cast(TimestampType))<br>.withColumn(&quot;today&quot;, $&quot;date&quot;.cast(TimestampType))<br>.withColumn(&quot;tomorrow&quot;,<br>date_add($&quot;date&quot;, 1).cast(TimestampType))</pre><p><strong><em>Listing 12–6</em></strong><em>. Using date subtraction, addition and data type casting to mimic the output from listing 12–3</em></p><p>You can now create timestamps, derive dates from timestamps, and add or subtract dates. Rounding out the tour of datetime functions, we will be looking at the year, month, dayofmonth, dayofweek and dayofyear calendar functions.</p><h3>Calendar Functions</h3><p>It is common to compare explicit periods (windows) of time using the calendar for analytics and insights. Trend analysis and timeseries forecasting are techniques that use statistics to measure the rate of change over time (deltas) between data points. Data sets can be partitioned (bucketed) and analyzed using Spark SQL to create aggregations broken down by seconds, minutes, hours, days, weeks, months, and even years using <em>fixed</em> or <em>relative</em> windows.</p><p><strong>Fixed and Relative Windows</strong><br> <br> A <em>fixed window</em> is defined by an explicit start and end time. For example, yesterday is a window defined by the 24-hour period beginning at 00:00:00 and ending at 23:59:59. Fixed windows are typically used to compute changes across two or more data sets commonly computed hour over hour, day over day, week over week and month over month. <br> <br> A <em>relative window</em> uses a non-fixed point in time to define one edge of a time-based boundary. This boundary can then be used to compute either the beginning or ending timestamp producing an arbitrary window to observe statistics. For instance, you can use relative time to isolate a dataset encapsulating the last thirty minutes rather than splitting an hour at a fixed 30-minute interval (06:00:00–06:29:59 | 06:30:00–06:59:59).</p><p>Using the following code (listing 12–7) as reference, we will add five additional columns to the <em>tsDf</em> DataFrame.</p><pre>%spark<br>tsDf<br>…<br>.withColumn(&quot;year&quot;, year($&quot;date&quot;))<br>.withColumn(&quot;month&quot;, month($&quot;date&quot;))<br>.withColumn(&quot;day&quot;, dayofmonth($&quot;date&quot;))<br>.withColumn(&quot;day_of_week&quot;, dayofweek($&quot;date&quot;))<br>.withColumn(&quot;day_of_year&quot;, dayofyear($&quot;date&quot;))</pre><p><strong><em>Listing 12–7</em></strong><em>. Deriving the year, month, day, the day within the current week, the day within the current year using a single DateTime column.</em></p><p>The final DataFrame represents the current date with columns expressing different observations generated from the initial timestamp column.</p><pre><br>-RECORD 0 — — — — — — — — — — — — — — — — <br>ts | 2021–09–12 01:04:39.04086<br>tz | Etc/UTC<br>date | 2021–09–12<br>yesterday | 2021–09–11 00:00:00<br>today | 2021–09–12 00:00:00<br>tomorrow | 2021–09–13 00:00:00<br>year | 2021<br>month | 9<br>day | 12<br>day_of_week | 1<br>day_of_year | 255</pre><p><em>Using the Time Functions in a Derived DataFrame</em></p><p>You now have a solid reference to go back to whenever you need a quick refresher on working with date and time, but what about the actual time zone?</p><h3>Time Zones and the Spark SQL Session</h3><p>Working with time-based data requires conversion between time zones for more reasons that just adherence to a common time zone like UTC. You can also use a time zone as a lens to view a data set from the perspective of observers of specific events. This use case comes up when producing Insights that are tied to specific geolocations and time zones.</p><h3>Configuring the Time Zone</h3><p>Spark defaults to using the local system time of its environment (your laptop or a remote server). Using the default system time can cause discrepancies when processing data. To ensure consistent behavior regardless of where the application is run, you can configure the default time zone using the config <strong>spark.sql.session.timeZone</strong>.</p><h3>Modifying the Spark Time Zone at Runtime</h3><p>The SparkSession handles timestamp conversions automatically for you globally. However, there may be times when you want to explicitly change the time zone used by a specific query. Create a new paragraph in Zeppelin and add the example code block (listing 12–8) which shows how to dynamically set the time zone.</p><pre>%spark<br>import java.time._<br>val ts = Seq(Instant.now).toDF(&quot;ts&quot;)<br>spark.conf.set(&quot;spark.sql.session.timeZone&quot;, &quot;UTC&quot;)<br>ts.show(truncate=false) // utc<br>spark.conf.set(&quot;spark.sql.session.timeZone&quot;, &quot;America/Los_Angeles&quot;)<br>ts.show(truncate=false) // pst</pre><p><strong><em>Listing 12–8</em></strong><em>. Observing time zone changes in Spark’s output of time based on the SparkSession time zone configuration</em></p><p>Running the paragraph, you will see first-hand how Spark’s observation of time changes in step with the value of the time zone configuration. The output captures the reference to the single immutable timestamp (ts) as observed through the lens of the UTC and PST time zones respectively.</p><pre>UTC |2021–09–12 03:02:32.434387<br>PST |2021–09–11 20:02:32.434387</pre><p>The ability to shift the observation of time using the Spark runtime config enables you to use a single source of immutable truth (UTC timestamps) for your backing data while simplifying how downstream applications compose queries for specific time zones without the headache of creating multiple copies of a dataset to handle the conversion between time zones for timestamps.</p><h3>Using Set Time Zone</h3><p>You can also use SET TIME ZONE (shown in listing 12–9) to directly set the time zone config using Spark SQL to switch between time zones dynamically.</p><pre>SET TIME ZONE &#39;America/Los_Angeles&#39;;<br>SELECT TIMESTAMP &#39;now&#39; as now_pst;</pre><pre>SET TIME ZONE &#39;UTC&#39;;<br>SELECT TIMESTAMP &#39;now&#39; as now_utc;</pre><p><strong><em>Listing 12–9</em></strong><em>: Set time zone for your Spark Session using SET TIME ZONE</em></p><p>You can now declaratively change the time zone in Spark using the SparkSession. This simple configuration enables each application to select how time should be observed while querying and displaying time-based data. Just remember to use a default time zone for your Spark applications to ensure reliable repeated results.</p><h3>Seasonality, Time Zones, and Insights</h3><p>Consider the following example. You are tasked with designing a system that automatically tracks and compares the relative changes in observed customer shopping behavior across goods being sold at CoffeeCo to produce insights on the purchase trends of customers. While testing that the system is working as expected, you stumble upon what at first appears to be an anomaly in sales of a particular item. Upon further investigation it turns out that the item is a <em>Pumpkin Spice Latte</em>, and the sales numbers went from zero average sales per day, to customers buying these pumpkin lattes at records levels, as compared to all other items in category, in only a few days. Is this an anomaly? What other data might be necessary?</p><p>Some goods are only available for a limited amount of time, or at a specific time of year. A Pumpkin spice latte is seasonal since it is considered a fall (season) beverage, but what is it about the seasons (holidays) that affect sales? What about emotions?</p><p>I grew up on the northeast coast of the United States, and for me, October always marked an observable change in the seasons, leaves would fall from the trees, and the colder temperatures meant sweaters and jackets and the peace and quiet reflection of snow in New England. With this change came anticipation of the winter holidays, which meant cider (mulling spices) and pumpkin pie, and as an adult pumpkin spice lattes bring back fond memories. Is this unique only to me? Is there perhaps a correlation to higher sales on the east coast of the United States? Are sales driven by temperature or other weather patterns like snowstorms or rain?</p><p>While you might not have any emotional connection to fall in New England and you might hate pumpkin spice lattes, it is important to think about what kind of data can be useful to generate insights from analytical observations. Bottom line, think like a data detective or partner with a great data analyst to ensure that the work that goes into data collection, aggregation, and analysis can produce novel insights that can be used to drive experiences that make happy customers.</p><h3>Timestamps and Dates Summary</h3><p>Thinking about the different angles, lenses, or views you can derive from a single timestamp provides you a path towards event-based insights. You can calculate information about the time of year (seasonality), whether a date falls on a weekend or weekday (specific to the geolocation of an event, or even the local time zone), you can check if the event occurred on a holiday, or even if the date is within proximity contextually to any other meaningful historic date. Associating what happened, and correlating data to real-world behaviors is a driving factor behind many successful analytics and insights initiatives. If you want any more proof, search online for “diapers beer correlations” to see how retailers derive insight from customer transactions to move commonly purchased items closer together to increase sales across seemingly irrelevant items. We will now look at some techniques for preparing data for analysis.</p><p>I hope you enjoyed the start of Chapter 12. If anyone is interested in a PySpark version of the chapter then let me know (in comments, or by clapping for the post). If anyone is interested in the writing process, you can check out:</p><p><a href="https://newfrontcreative.medium.com/problems-all-writers-face-c7563c395b69">Problems All Writers Face</a></p><p>Cheers. Scott Haines</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=aa85ed34295e" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Data+AI 2022: Spark Inception:]]></title>
            <link>https://levelup.gitconnected.com/data-ai-2020-spark-inception-8ef5b67b9acb?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/8ef5b67b9acb</guid>
            <category><![CDATA[data-notebooks]]></category>
            <category><![CDATA[apache-spark]]></category>
            <category><![CDATA[scala]]></category>
            <category><![CDATA[redis]]></category>
            <category><![CDATA[data-engineering]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Wed, 29 Jun 2022 07:35:24 GMT</pubDate>
            <atom:updated>2022-07-29T17:44:34.564Z</atom:updated>
            <content:encoded><![CDATA[<h4>Harnessing the Spark REPL to power Streaming Notebook Environments. For Fun. For Profit?</h4><p>This blog post is essentially covering the gist of the presentation, and providing all useful links to the <a href="https://github.com/newfront/spark-inception">source code</a>, and <a href="https://github.com/newfront/spark-inception/blob/main/docs/howto.md">how-to docs</a>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1002/1*hjX-Q98VMZYg9M2u4owpfA.png" /><figcaption>Student is solving a complex problem: Data problems can also feel like this. (<a href="https://www.istockphoto.com/photo/physics-teacher-writing-math-equations-on-a-blackboard-gm1220610073-357483632">photo credit</a>)</figcaption></figure><blockquote>A friend came to me today at 3:45pm and looked at the slides I was going to present in a matter of some odd minutes (4:45pm), and said, “what will keep me at your session, why should I care?” I (introverted engineer) was paying so much more attention to <em>how cool the code was, how easily it worked, and how neat it would be to share this new thing with the world that I missed the most critical component of a </em><strong><em>good presentation</em></strong><em>.</em></blockquote><p>He couldn’t have been more right on the nose though. <em>Thank you </em><a href="https://twitter.com/ItaiYaffe"><em>Itai Yaffee</em></a></p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*692xG32-LJuHXE_wvVcnMw.png" /></figure><h3>The Presentation</h3><iframe src="https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2FND1zXumt2zU%3Ffeature%3Doembed&amp;display_name=YouTube&amp;url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DND1zXumt2zU&amp;image=https%3A%2F%2Fi.ytimg.com%2Fvi%2FND1zXumt2zU%2Fhqdefault.jpg&amp;key=a19fcc184b9711e1b4764040d3dc5c07&amp;type=text%2Fhtml&amp;schema=youtube" width="854" height="480" frameborder="0" scrolling="no"><a href="https://medium.com/media/3395b116fbe88631a3d20de4106adea2/href">https://medium.com/media/3395b116fbe88631a3d20de4106adea2/href</a></iframe><p>The presentation was split into three pieces.</p><h4>The History: Where did Notebook Environments Come From?</h4><p>I started with some history on why the notebook environment exists. Historically, there is a concept called the “scientific process”. In a nutshell, you have an idea (hypothesis) and in order to prove (convert the idea into law), you must run a series of experiments (using the same input and output) and gain similar if not perfect results at the conclusion of each run.</p><h4>The Mental Model</h4><p>Next, we looked the notebook paradigm and broke it down into component parts to help building the shared nomenclature (words) for talking about notebook environments — namely that notebooks consist of one or more paragraphs (cells) that can be “run” or “played” to trigger the code to be compiled and run <em>interactively</em>.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*o5nplCF98b420-LNc59Stw.png" /><figcaption>This is an example of a <a href="https://docs.databricks.com/notebooks/index.html">DataBricks Notebook</a></figcaption></figure><p>The above example of the databricks notebook shows a mix between making simple SQL calls, and outputting the data in a well formatted table (rows/cols). Now that you understand a little more about the notebook environment, let’s go ahead and build our own.</p><h3>Building the Streaming Notebook Application</h3><p>Given we want to build a similar environment to that of the Databricks Notebook Environment, and yes we are using Apache Spark to get there, but with those little worries behind us, let’s look at the high level application architecture to give us a better handle on my vision, so you can follow along and adopt my mental model.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*FHwJCTAzaz0Sq7WFFtzBDA.png" /><figcaption>The Streaming Notebook Environment: (Left: Redis (Stream)) connects to our (Middle: Spark Structured Streaming) application over a DataStreamReader of type ‘redis’. Magic happens from here on out.</figcaption></figure><p>The DataStreamReader (redis) acts as our streaming API endpoint, and is tasked with delivering the goods (commands) to the application. Once the inbound stream has been processed (Dataset[NetworkCommand]), the application takes advantage of <strong>ds.writeStream.foreachBatch</strong> to generate arbitrarily stateful operations, in this case, the reading, evaluation, printing, and looping (<strong>REPL</strong>) for our dynamically generated Spark / Scala. All while running inside of a structured streaming application. (Think of this like Level 2 in the movie Inception).</p><p>Once the new code has been interpreted, we are essentially constructing a new Spark application <em>on-the-fly</em>. This process is similar to how a more traditional Notebook environment works, however, there isn’t a streaming message bus (maybe this was the first of its kind!).</p><blockquote>Each message (you’ll see how we make those next), have contextual headers (<strong>%spark</strong>) and (<strong>%sql</strong>) that help hint at how to interpret the code represented within the Notebook Paragraph:</blockquote><blockquote><strong>Example<br></strong>%spark<br>val df = spark.read.delta(“s3a://…”).createOrReplaceTempView(‘theview’)</blockquote><p>The last step in the process is just writing the output (after compiling new code or evaluating a sequence of actions) out to an external Redis table (hashset).</p><p>Now let’s dive into the code:</p><p>Open up the entire project at <a href="https://github.com/newfront/spark-inception">https://github.com/newfront/spark-inception</a>. Then let’s start at the edge with the <strong>SparkInceptionControllerApp</strong>.</p><pre>object SparkInceptionControllerApp extends SparkStructuredStreamingApplication[NetworkCommand, NetworkCommand] {<br>  val <em>logger</em>: Logger = Logger.<em>getLogger</em>(&quot;com.coffeeco.data.SparkInceptionControllerApp&quot;)<br><br>  <em>/**<br>   * Modify the base inputStream to<br>   */<br>  </em>override lazy val <em>inputStream</em>: DataStreamReader = {<br>    super.streamReader()<br>      .schema(Encoders.<em>product</em>[NetworkCommand].schema)<br>  }<br><br>  <em>/**<br>   * Use to control the general options on the output Stream<br>   * </em><strong><em>@param writer </em></strong><em>The DataStreamWriter reference<br>   * </em><strong><em>@param sparkSession </em></strong><em>The implicit SparkSession<br>   * </em><strong><em>@return </em></strong><em>The decorated DataStreamWriter<br>   */<br>  </em>override def outputStream(writer: DataStreamWriter[NetworkCommand])<br>                           (implicit sparkSession: SparkSession)<br>  : DataStreamWriter[NetworkCommand] = super.outputStream(writer)<br>    .trigger(Trigger.<em>ProcessingTime</em>(&quot;5 seconds&quot;))<br><br>  @transient implicit lazy val <em>sparkRemoteSession</em>: SparkRemoteSession[_] = <em>SparkRemoteSession</em>()<br>  <em>/**<br>   * Your application will take a DataStreamReader, do something with the inbound micro-batch data<br>   * and then ultimately the data will flow back out of the application, through a DataStreamWriter<br>   *<br>   * </em><strong><em>@return </em></strong><em>The StreamingQuery (which is the full source-&gt;transform-&gt;sink.start)<br>   */<br>  </em>override def <strong>runApp</strong>(): StreamingQuery = {<br>    <em>logger</em>.info(s&quot;run.app.called&quot;)<br>    import <em>sparkSession</em>.implicits._<br><br>    // the inception pipeline<br>    <em>outputStream</em>(<br>      <em>NetworkCommandProcessor</em>(<em>sparkSession</em>).process(<br>        <strong><em>inputStream</em>.load().as[NetworkCommand]</strong><br>      ).<strong>writeStream</strong><br>    ).<strong>foreachBatch</strong>((ds: Dataset[NetworkCommand], batchId: Long) =&gt; <strong><em>processBatch</em></strong>(ds, batchId)<br>    ).start()<br>  }<br><br>  <em>/**<br>   * For each micro-batch, collect the RPC command stream to the driver, process, and pass the results onto Redis<br>   * </em><strong><em>@param ds </em></strong><em>The RPC commands (via Redis)<br>   * </em><strong><em>@param batchId </em></strong><em>The batchId (can be used to skip reprocessing events if checkpoints are enabled)<br>   */<br>  </em>def <strong>processBatch</strong>(ds: <strong>Dataset[NetworkCommand]</strong>, batchId: Long): Unit = {<br>    import <em>sparkSession</em>.implicits._<br><br>    // Collect all of the Distributed Commands and bring down to the Driver<br>    val localResults = <strong>ds.collect().map</strong> { networkCommand =&gt;<br>      // this is running on the driver now (not the executors)<br>      val res = <strong><em>sparkRemoteSession</em>.processCommand</strong>(networkCommand)<br>      // wrap the execution details so we can write the results to redis<br>      <strong>NotebookExecutionDetails</strong>(<br>        networkCommand.notebookId,<br>        networkCommand.paragraphId,<br>        networkCommand.command,<br>        networkCommand.requestId,<br>        networkCommand.userId,<br>        res.commandStatus,<br>        res.consoleOutput<br>      )<br>    }.toSeq<br><br>    // generate a new dataframe and then write back to redis<br>    val forRedis = <strong><em>sparkSession</em>.createDataset[NotebookExecutionDetails](localResults)</strong><br>    forRedis<br>      .write<br>      .format(&quot;org.apache.spark.sql.redis&quot;)<br>      .options(<em>sparkSession</em>.sparkContext<br>        .getConf<br>        .getAllWithPrefix(appConfig.<em>SinkStreamOptions</em>).toMap[String, String])<br>      .mode(SaveMode.<em>Append</em>)<br>      .save()<br>  }<br>  run()<br>}</pre><p>The <em>SparkInceptionControllerApp</em> is responsible for reliably reading, transforming, and processing an unbounded stream of <a href="https://github.com/newfront/spark-inception/blob/main/src/main/scala/com/coffeeco/data/rpc/Command.scala#L42">NetworkConnect</a> objects. Let’s look at the <strong>processBatch</strong> function in more details next.</p><pre>def <strong>processBatch</strong>(ds: Dataset[NetworkCommand], batchId: Long): Unit = {<br>  import <em>sparkSession</em>.implicits._<br><br>  // Collect all of the Distributed Commands and bring down to the Driver<br>  val localResults = <strong>ds.collect().map</strong> { networkCommand =&gt;<br>    // this is running on the driver now (not the executors)<br>    val res = <strong><em>sparkRemoteSession</em>.processCommand(networkCommand)<br>    </strong>... (we will look at this more later)<br>}</pre><p>From the <em>processBatch</em> method, we collect the contents of our dataset back to the Spark driver application (so we can generate new dataframes and create new DAGs from our interpreted code). Let’s look at the <a href="https://github.com/newfront/spark-inception/blob/main/src/main/scala/com/coffeeco/data/controller/SparkRemoteSession.scala#L219">SparkRemoteSession</a> class for more details.</p><pre>package com.coffeeco.data.controller<br><br>import com.coffeeco.data.SparkInceptionControllerApp<br>import com.coffeeco.data.controller.SparkRemoteSession.<em>InitializationCommands<br></em>import com.coffeeco.data.rpc.Command.{<em>SparkCommand</em>, <em>SparkSQLCommand</em>}<br>import com.coffeeco.data.rpc.{Command, NetworkCommand, NetworkCommandResult, Status}<br>import com.coffeeco.data.traits.SparkApplication<br>import org.apache.commons.io.output.ByteArrayOutputStream<br>import org.apache.log4j.Logger<br>import org.apache.spark.repl.SparkILoop<br>import org.apache.spark.sql.DataFrame<br><br>import java.io.{File, PrintStream}<br>import java.net.URL<br>import java.nio.file.{Files, Paths}<br>import java.util.concurrent.atomic.AtomicBoolean<br>import scala.reflect.ClassTag<br>import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader<br>import scala.tools.nsc.Settings<br>import scala.util.Properties.{javaVersion, javaVmName, <em>versionString</em>}<br>import scala.util.{Failure, Success, Try}<br><br>object SparkRemoteSession {<br>  val <em>logger</em>: Logger = Logger.<em>getLogger</em>(&quot;com.coffeeco.data.controller.SparkRemoteSession&quot;)<br>  /*<br>    The initialization command bootstraps the internal SparkILoop.<br>    See https://github.com/apache/spark/blob/v3.2.1/repl/src/main/scala-2.12/org/apache/spark/repl/SparkILoop.scala#L45<br>    for reference.<br>    - These commands are evaluated in order, creating and customizing your Spark Session.<br>    - If there are specific things you need to Ensure exist - then this technique helps you setup exactly that<br>    */<br>  val <em>InitializationCommands</em>: Seq[String] = Seq(<br>    &quot;import com.coffeeco.data.SparkInceptionControllerApp&quot;,<br>    &quot;&quot;&quot;<br>    println(&quot;SPARK INCEPTION REPL: Initialization&quot;)<br>    &quot;&quot;&quot;,<br>    &quot;&quot;&quot;<br>    @transient val spark = SparkInceptionControllerApp.sparkSession<br>    println(spark)<br>    &quot;&quot;&quot;,<br>    &quot;import org.apache.spark.SparkContext._&quot;,<br>    &quot;import spark.implicits._&quot;,<br>    &quot;import org.apache.spark.sql.functions._&quot;<br>  )<br><br>  @transient<br>  @volatile protected[data] var <em>sparkRemoteSession</em>: SparkRemoteSession[_] = _<br><br>  def apply(replInitCommands: Seq[String] = <em>InitializationCommands</em>): SparkRemoteSession[_] = {<br>    if (<em>sparkRemoteSession </em>== null) {<br>      synchronized {<br>        if (<em>sparkRemoteSession </em>== null) {<br>          <em>sparkRemoteSession </em>= new SparkRemoteSession(<br>            app = SparkInceptionControllerApp,<br>            replInitializationCommands = replInitCommands)<br>        }<br>      }<br>    }<br>    <em>sparkRemoteSession<br>  </em>}<br><br>}<br><br>class <strong>SparkRemoteSession</strong>[T &lt;: SparkApplication : ClassTag](app: T, replInitializationCommands: Seq[String]) {<br><br>  import SparkRemoteSession.<em>logger<br><br>  </em>import scala.tools.nsc.interpreter.JPrintWriter<br>  // application enables you to get to the SparkSession, SparkConf, etc<br>  // we use this to ensure we create a shared context since the SparkRemoteSession<br>  // exists (inside of) the SparkInceptionControllerApp, and runs in its own separate<br>  // runtime Context.<br><br>  // save the initial Console output stream<br>  val <strong><em>replOutputStream</em></strong><em> </em>= new ByteArrayOutputStream()<br>  private[this] final val <strong><em>initialConsoleOutputStream</em></strong>: PrintStream = System.<em>out</em>;<br>  // forwarding std.out PrintStream to the replOutputStream (^^)<br>  private[this] final val <strong><em>consolePrintStream</em></strong><em> </em>= new PrintStream(<em>replOutputStream</em>, true)<br>  private[this] final val <strong><em>outputStream</em></strong><em> </em>= new JPrintWriter(<em>replOutputStream</em>, true)<br>  lazy val <em>sparkILoop</em>: SparkILoop = {<br>    val sets: Settings = new Settings<br>    sets.processArguments(<br>      <em>List</em>(&quot;-Yrepl-class-based&quot;, &quot;-Yrepl-outdir&quot;, s&quot;<strong>$</strong>{<em>outputDir</em>.getAbsolutePath}&quot;),<br>      processAll = true<br>    )<br>    if (sets.<em>classpath</em>.isDefault) {<br>      <em>logger</em>.info(s&quot;sets.classpath=default update:java.class.path&quot;)<br>      sets.<em>classpath</em>.value = sys.<em>props</em>(&quot;java.class.path&quot;)<br>    }<br>    sets.<em>usejavacp</em>.value = true<br><br>    // adding additional user jars<br>    if (<em>extraJarsDir</em>.nonEmpty) {<br>      <em>logger</em>.info(s&quot;spark.repl.extra.jars.dir=<strong>$</strong><em>extraJarsDir</em>&quot;)<br>      val jarsDir = new File(<em>extraJarsDir</em>)<br>      val jars: Seq[URL] = jarsDir.listFiles().map {<br>        _.toURI.toURL<br>      }.toSeq<br>      val classPathValue = sets.<em>classpath</em>.value<br>      val updateClassPathValue = (jars ++ Seq(classPathValue)).mkString(File.<em>pathSeparator</em>)<br>      <em>logger</em>.info(s&quot;updated.classpath.value: <strong>$</strong>updateClassPathValue\n&quot;)<br>      sets.<em>classpath</em>.value = updateClassPathValue<br>      updateClassLoader(new URLClassLoader(jars, Thread.<em>currentThread</em>().getContextClassLoader))<br>    }<br>    // set the reference classes that can be used for T variables and use in the local Spark ILoop<br>    sets.embeddedDefaults(<em>_contextClassLoader</em>)<br><br>    <em>logger</em>.info(s&quot;generating the SparkILoop&quot;)<br><br>    lazy val <strong>sparkILoop</strong> = new <strong>SparkILoop</strong>(None, <strong><em>outputStream</em></strong>) {<br>      override val <em>initializationCommands</em>: Seq[String] = <em>InitializationCommands<br><br>      </em>/* replace the standard Spark welcome message */<br>      override def printWelcome(): Unit = {<br>        import org.apache.spark.<em>SPARK_VERSION<br>        </em>echo(<br>          &quot;&quot;&quot;<br>          spark.version %s<br>          &quot;&quot;&quot;.format(<em>SPARK_VERSION</em>))<br>        val welcomeMsg = &quot;Using Scala %s (%s, Java %s)&quot;.format(<br>          <em>versionString</em>, javaVmName, javaVersion)<br>        echo(welcomeMsg)<br>        echo(&quot;Type in expressions to have them evaluated.&quot;)<br>        echo(&quot;Type :help for more information.&quot;)<br>      }<br>    }<br><br>    sparkILoop.<em>settings </em>= sets<br>    // load the jar for this class<br>    sparkILoop.createInterpreter()<br>    sparkILoop.initializeSpark()<br>    sparkILoop.initializeSynchronous()<br><br>    sparkILoop<br>  }<br>  val <em>isInitialized</em>: AtomicBoolean = new AtomicBoolean(false)<br>  // enables setting of user_jars, if empty nothing will be loaded<br>  val <strong><em>extraJarsDir</em></strong>: String = app.<em>sparkSession</em>.<em>conf</em>.get(app.appConfig.<em>ReplExtraJarsDir</em>, &quot;&quot;)<br>  // storage location for dynamic compiled classes and for replaying the console history<br>  val <strong><em>replClassDirectory</em></strong>: String = app.<em>sparkSession</em>.<em>conf</em>.get(<br>    app.appConfig.<em>ReplClassDir</em>,<br>    sys.<em>props</em>.getOrElse(&quot;java.io.tmpdir&quot;, &quot;&quot;))<br>  // use this stream to capture console output (like when printing tables)<br><br>  /* where our dynamic classes will be written out to */<br>  val <em>outputDir</em>: File = {<br>    val f = Files.<em>createTempDirectory</em>(Paths.<em>get</em>(<em>replClassDirectory</em>), &quot;spark&quot;).toFile<br>    f.deleteOnExit() // works with normal exit<br>    f<br>  }<br>  private[this] val <em>scalaTypePrefixPattern </em>= &quot;^\\$ires\\w*\\:\\W[A-Za-z]*\\W\\=\\W*&quot;.r<br>  private[this] var <em>_contextClassLoader</em>: ClassLoader = Thread.<em>currentThread</em>().getContextClassLoader<br><br>  <em>/**<br>   * Takes the resulting output from the Repl Output Buffer, cleans the buffer, and returns the output<br>   *<br>   * </em><strong><em>@return<br>   </em></strong><em>*/<br>  </em>def <strong>readOutput</strong>(): Seq[String] = {<br>    synchronized {<br>      <strong>this.<em>replOutputStream</em>.flush</strong>()<br>      <strong>val result = this.<em>replOutputStream</em>.toString(&quot;utf-8&quot;)<br>        .split(&quot;\n&quot;)<br>        .toSeq<br>        .map(filterOutput)<br>        .filter(_.nonEmpty)<br>      this.<em>replOutputStream</em>.reset()<br>      result</strong><br>    }<br>  }<br><br>  def initialize(): Unit = {<br>    this.<em>isInitialized</em>.set(true)<br>    <strong><em>sparkILoop</em></strong>.isInitializeComplete<br>  }<br><br>  <em>/**<br>   * Cleans up the application. Should be part of the SparkApplication shutdown process<br>   * additionally you can also just use sys.addShutdownHook { sparkRemoteSession.close() }<br>   * just make sure you close up shop after stopping the outer Spark Application<br>   */<br>  </em>def close(): Unit = {<br><br>    // these are all autoflushing - lets force flush &amp; close up shop<br>    this.<em>replOutputStream</em>.flush()<br>    this.<em>consolePrintStream</em>.flush()<br>    this.<em>outputStream</em>.flush()<br>    // clean up after ourselves<br>    this.<em>replOutputStream</em>.close()<br>    this.<em>outputStream</em>.close()<br>    this.<em>consolePrintStream</em>.close()<br>    if (this.<em>isInitialized</em>.get()) {<br>      this.<em>sparkILoop</em>.closeInterpreter()<br>    }<br>  }<br>  // sparkILoop.replOutput (gives all the information from IMain)<br>  // sparkILoop.settings (cascade of all the things)<br><br>  <em>/**<br>   * THIS IS WHERE THE MAGIC HAPPENS:<br>   * Will trigger an action (evaluating the Remote NetworkCommand in the SparkILoop via the SparkRemoteSession)<br>   *<br>   * </em><strong><em>@param cmd </em></strong><em>The NetworkCommand being processed<br>   * </em><strong><em>@return </em></strong><em>The results of processing the command<br>   */<br>  </em>def <strong>processCommand</strong>(cmd: <strong>NetworkCommand</strong>): <strong>NetworkCommandResult</strong> = {<br>    initialize()<br>    val user = cmd.userId.getOrElse(&quot;nobody&quot;)<br>    val parsed = cmd.parse()<br>    if (parsed._1 != Command.<em>UnsupportedCommand </em>&amp;&amp; authCheck(user)) {<br>      <em>logger</em>.debug(s&quot;security.gate.passed&quot;)<br>      // evaluate the command</pre><pre>      val results = <strong>Console.<em>withOut</em>(<em>consolePrintStream</em>)</strong> {<br>        <strong>System.<em>setOut</em>(Console.<em>out</em>)</strong><br>        parsed._1 match {<br>          <strong>case <em>SparkCommand </em>=&gt; processSparkScala(parsed._2)</strong><br>          <strong>case <em>SparkSQLCommand </em>=&gt; processSparkSQL(parsed._2)</strong><br>          case _ =&gt; (Status.<em>Failure</em>, s&quot;<strong>$</strong>{cmd.command} is not supported&quot;)<br>        }<br>      }<br>      <strong>System.<em>setOut</em>(<em>initialConsoleOutputStream</em>)</strong><br><br>      <strong>NetworkCommandResult</strong>(<br>        requestId = cmd.requestId,<br>        commandStatus = results._1,<br>        consoleOutput = results._2<br>      )<br>    } else NetworkCommandResult(cmd.requestId, &quot;Failure&quot;, s&quot;<strong>$</strong>user is not authorized&quot;)<br>  }<br><br>  <em>/**<br>   * Using the SparkILoop, eval code, interact with live Spark directly<br>   *<br>   * </em><strong><em>@param cmd </em></strong><em>The scala block to evaluate<br>   * </em><strong><em>@return </em></strong><em>The results of running the Scala block<br>   */<br><br>  </em>def <strong>processSparkScala</strong>(cmd: String): (String, String) = {<br>    val result = <strong><em>sparkILoop</em>.interpret</strong>(cmd, synthetic = true)<br>    val consoleOutput = <strong>readOutput</strong>()<br>    (result.toString, consoleOutput.mkString(&quot;\n&quot;))<br>  }<br><br>  <em>/**<br>   * Using the Native app.sparkSession pointer run Spark SQL commands directly<br>   *<br>   * </em><strong><em>@param cmd </em></strong><em>The SQL command will fail or succeed, stack trace will be output in the case of a failure<br>   * </em><strong><em>@return </em></strong><em>The results of interpreting the Spark SQL Command<br>   */<br>  </em>def <strong>processSparkSQL</strong>(cmd: String): (String, String) = {<br>    // note: In the case where you want delete protection for tables<br>    // or want to add specific limits (like limit 10 for open queries)<br>    // then you can parse the cmd string and add magic<br><br>    <em>Try</em>(<strong>app.<em>sparkSession</em>.sql(cmd)</strong>) match {<br>      case Success(df: DataFrame) =&gt;<br>        (Status.<em>Success</em>, <strong>df.toJSON.collect()</strong>.toSeq.mkString(&quot;\n&quot;))<br>      case Failure(ex: Exception) =&gt;<br>        ex.printStackTrace(<em>consolePrintStream</em>)<br>        (Status.<em>Failure</em>, readOutput().mkString(&quot;\n&quot;))<br>      case Failure(thr: Throwable) =&gt;<br>        thr.printStackTrace(<em>consolePrintStream</em>)<br>        (Status.<em>Failure</em>, readOutput().mkString(&quot;\n&quot;))<br>      case _ =&gt;<br>        (Status.<em>Failure</em>, &quot;Something went wrong&quot;)<br>    }<br>  }<br><br>  <em>/**<br>   * Basic check - not comprehensive security<br>   *<br>   * </em><strong><em>@param user </em></strong><em>The name of the user of an inbound Command<br>   * </em><strong><em>@return </em></strong><em>True if we want to process the inbound command, false for auth block<br>   */<br>  </em>def authCheck(user: String): Boolean = {<br>    user == app.<em>sparkSession</em>.sparkContext.<em>sparkUser<br>  </em>}<br><br>  <em>/**<br>   * Use this method to filter the Notebook processing output. Eg. do you care about class definitions or types?<br>   * what about formatting the output - like df.show(10, true) will convert to a console printed table.<br>   * Do you want to see that $iresN Unit: ()? probably not<br>   *<br>   * </em><strong><em>@param str </em></strong><em>The string to evaluate<br>   * </em><strong><em>@return </em></strong><em>empty string or cleaned string<br>   */<br>  </em>def filterOutput(str: String): String = {<br><br>    val out = str match {<br>      case _ if str.startsWith(&quot;##&quot;) =&gt; &quot;&quot;<br>      case st if str.startsWith(&quot;$ires&quot;) =&gt;<br>        <em>scalaTypePrefixPattern</em>.replaceFirstIn(st, &quot;&quot;)<br>      case _ =&gt; str<br>    }<br>    out.trim<br>  }<br><br>  private def updateClassLoader(classLoader: ClassLoader): Unit = {<br>    <em>_contextClassLoader </em>= classLoader<br>  }<br><br>}</pre><p>Now moving back from the <strong>SparkILoop</strong> and the <strong>SparkRemoteSession</strong> work back to the <strong><em>SparkInceptionControllerApp.processBatch</em></strong> method to close the final loop.</p><pre>def processBatch(ds: Dataset[NetworkCommand], batchId: Long): Unit = {<br>  import <em>sparkSession</em>.implicits._<br><br>  // Collect all of the Distributed Commands and bring down to the Driver<br>  val localResults = ds.collect().map { networkCommand =&gt;<br>    // this is running on the driver now (not the executors)<br>    val res = <strong><em>sparkRemoteSession</em>.processCommand</strong>(networkCommand)<br>    // wrap the execution details so we can write the results to redis<br>    NotebookExecutionDetails(<br>      networkCommand.notebookId,<br>      networkCommand.paragraphId,<br>      networkCommand.command,<br>      networkCommand.requestId,<br>      networkCommand.userId,<br>      res.commandStatus,<br>      res.consoleOutput<br>    )<br>  }.toSeq<br><br>  // generate a new dataframe and then write back to redis<br>  val forRedis = <em>sparkSession</em>.createDataset[NotebookExecutionDetails](localResults)<br>  forRedis<br>    .write<br>    .format(&quot;org.apache.spark.sql.redis&quot;)<br>    .options(<em>sparkSession</em>.sparkContext<br>      .getConf<br>      .getAllWithPrefix(appConfig.<em>SinkStreamOptions</em>).toMap[String, String])<br>    .mode(SaveMode.<em>Append</em>)<br>    .save()<br>}</pre><p>That is it for the main classes in the Spark Inception Controller Application. You can read the notes and use the <a href="https://github.com/newfront/spark-inception/blob/main/README.md">README.md</a> in the source code for more learnings. Or ask questions in the comments.</p><h3>Running the End to End Demo</h3><h4>Data+AI Summit 2022: <a href="https://databricks.com/dataaisummit/session/spark-inception-exploiting-apache-spark-repl-build-streaming-notebooks">Spark Inception: Exploiting the Spark REPL to Build Streaming Notebooks</a></h4><p>In order to run the demo, you need to have docker installed and create a bridged network `docker network create mde`.</p><ol><li>Use the following `docker-compose.yml` to get `redis` up and running.</li></ol><pre>version: &#39;3&#39;<br><br>services:<br>  minio:<br>    image: minio/minio:${_MINIO_VERSION:-latest}<br>    volumes:<br>      - ${DATA_ENGINEERING_BASEDIR}/minio/data:/data<br>    expose:<br>      - &quot;9000&quot;<br>      - &quot;9001&quot;<br>    environment:<br>      MINIO_ROOT_USER: minio<br>      MINIO_ROOT_PASSWORD: minio_admin<br>    command: server --console-address &quot;:9001&quot; /data<br>    healthcheck:<br>      test: [&quot;CMD&quot;, &quot;curl&quot;, &quot;-f&quot;, &quot;http://localhost:9000/minio/health/live&quot;]<br>      interval: 30s<br>      timeout: 20s<br>      retries: 3<br>    ports:<br>      - 9000:9000<br>      - 9001:9001<br>    networks:<br>      - mde<br>    hostname: minio<br>    restart: always<br><br>  redis:<br>    image: redis:${_REDIS_VERSION:-latest}<br>    container_name: redis<br>    hostname: redis<br>    networks:<br>      - mde<br>    ports:<br>      - 6379:6379<br>    healthcheck:<br>      test: [&quot;CMD&quot;, &quot;redis-cli&quot;, &quot;ping&quot;]<br>      interval: 5s<br>      timeout: 30s<br>      retries: 50<br>    restart: always<br><br>networks:<br>  mde:<br>    external: true<br>    name: mde</pre><p>use docker compose to fire up the dependent services.</p><pre>docker compose -f /path/to/docker-compose.yml up -d`</pre><p>2. Once Redis is up and running. Open a new terminal window since it is time to spin up the <strong>spark-inception-controller</strong>. The following command will spin up the Spark Structured Streaming application and provides you with everything you need to be successful while evaluating the demo.</p><pre>docker run \<br>  -p 4040:4040 \<br>  --hostname spark-inception-controller \<br>  --network mde \<br>  -it newfrontdocker/spark-inception-controller:1.0.0 \<br>  /opt/spark/bin/spark-submit \<br>  --verbose \<br>  --master &quot;local[*]&quot; \<br>  --class &quot;com.coffeeco.data.SparkInceptionControllerApp&quot; \<br>  --deploy-mode &quot;client&quot; \<br>  --jars &quot;/opt/spark/app/jars/spark-inception-controller.jar&quot; \<br>  --conf &quot;spark.driver.extraClassPath=/opt/spark/app/jars/spark-inception-controller.jar&quot; \<br>  --driver-java-options &quot;-Dconfig.file=/opt/spark/app/conf/application-live.conf&quot; \<br>  /opt/spark/app/jars/spark-inception-controller.jar</pre><blockquote>The spark-inception-controller container has Apache Spark located at `/opt/spark` and the `spark-inception-controller` is located in `/opt/spark/app`. Next, you’ll spin up the Spark Structured Streaming application.</blockquote><p>Now you have a fully functioning Apache Spark application. The trouble is the application doesn’t do anything since it is essentially a blank canvas waiting for your input. Consider this no different than a Notebook environment, you need to `add some content` in order for something amazing to happen. So we will do just that.</p><h4>Interacting with the Spark Inception Controller</h4><p>We have two services running. The first is <strong><em>redis</em></strong>. Which introduced the Stream data type in Redis 5.0, which we will use as a lightweight streaming message bus, and the second service is our Spark Structured Streaming Application (which is configured to process up to 10 “commands” per micro-batch). Each “command” is the equivalent of a paragraph within a notebook (think Apache Zeppelin, Jupyter, or Databricks Notebooks).</p><p>The command RPC structure is as follows:</p><pre>NetworkCommand {<br> notebookId: String<br> paragraphId: String<br> command: String<br> requestId: String<br> userId: Option[String]<br>}</pre><p>The <strong>NetworkCommand</strong> is used to send either <strong>%spark</strong> or <strong>%sql</strong> commands to be processed in the Spark Inception Controller.</p><p>Let’s look at an example putting all the pieces together.</p><p>We will have <strong>three</strong> <strong>docker exec processes</strong> at play to trigger remote commands (redis#1) on our Spark Inception Controller (spark-inception-controller), and to receive data back (redis#2).</p><p>The results of processing our NotebookCommand:command, aka a paragraph, are fed into a Redis HashSet (used in Spark as a memory optimized columnar table) and the Status of the command “Success, Failure, or Error” is tracked in the <strong>NotebookExecutionDetails</strong> object.</p><pre>NotebookExecutionDetails {<br> notebookId: String,<br> paragraphId: String,<br> command: String,<br> requestId: String,<br> userId: Option[String],<br> commandStatus: String,<br> result: String<br>}</pre><p>This gives us all the pieces now to have fun and exploit Apache Spark.</p><h4>Run the End to End Demo</h4><ol><li>Open up a new Terminal (Console) tab for our Redis Command Line</li></ol><pre>docker exec -it redis redis-cli</pre><p>2. In another terminal window, we will monitor the commands and data being passed back and forth. This is our immediate feedback that a command was received, processed, and we can see what the result of running the command actually was!</p><pre>docker exec -it redis redis-cli monitor</pre><p>We will be sending the three following commands: (encoded as Redis X objects for the Redis Stream)</p><p><strong>Command #1</strong></p><pre>%sql<br>show tables</pre><p><strong>Command #2</strong></p><pre>%spark<br>case class Person(name: String, age: Int)<br>val people = Seq(Person(\”scott\”,37),Person(\”willow\”,12),Person(\”clover\”,6))<br>val df = spark.createDataFrame(people)<br>df.createOrReplaceTempView(\”people\”)</pre><p>Command #3</p><pre>%sql<br>select * from people</pre><p>Now you can execute the following command: <strong>NetworkCommand</strong> in the terminal window you opened up in Step 1.</p><p>The following command creates a new <strong>case class Person</strong>, generates a sequence of <strong>People</strong>, uses the SparkSession reference inside of the SparkILoop to generate a new DataFrame (on-the-fly), and then registers the data as a temp view named people.</p><p><strong>Run the First Remote Streaming Notebook Paragraph</strong></p><pre>xadd com:coffeeco:notebooks:v1:notebook1:rpc MAXLEN ~ 3000 * notebookId notebook1 paragraphId paragraph1 command “\n%spark\ncase class Person(name: String, age: Int)\nval people = Seq(Person(\”scott\”,37),Person(\”willow\”,12),Person(\”clover\”,6))\nval df = spark.createDataFrame(people)\ndf.createOrReplaceTempView(\”people\”)\n” requestId request1 userId “1000”</pre><p>You will see lots of things happening in the process running the Spark Inception Controller. If this is the first command you’ve sent to the application, then the singleton <em>SparkILoop</em> will be scaffolded reusing the same <em>SparkSession</em> as our structured streaming application, and otherwise you will see the input/output information for the application. The <em>SparkILoop</em> is generated inside of the <em>SparkRemoteSession</em> class, and is essentially a customized instance of the <strong>spark-shell</strong> running inside of our Structured Streaming Application. This enables us to <em>define new classes, construct tables, you name it</em> all while never having thought of it ahead of time.</p><p><strong>Let’s see the 3 items in our generated people table</strong></p><p>To view the contents of the <strong>people</strong> table we created using the first remote command just pop back into the <strong>redis-cli</strong> (redis#1) from before, and execute the following command.</p><pre>xadd com:coffeeco:notebooks:v1:notebook1:rpc MAXLEN ~ 3000 * notebookId notebook1 paragraphId paragraph2 command “\n%sql\nselect * from people” requestId request2 userId “1000”</pre><p>You should now see the results of the people table output as newline separated JSON in redis.</p><p>From the redis-cli. Run the following command.</p><pre>hget “com:coffeeco:notebooks:v1:notebook1:results:paragraph2” result</pre><p>Which will give you the table output that has been transformed to json data for simplicity: `(df.toJSON.collect.mkString(“\n”))` in the <strong>people</strong> view you created with the first command.</p><pre>“{\”name\”:\”scott\”,\”age\”:37}\n{\”name\”:\”willow\”,\”age\”:12}\n{\”name\”:\”clover\”,\”age\”:6}”</pre><p>That’s it. You now have a Streaming Notebook environment using Spark to power the dynamic generation of new Spark applications, or just to fiddle around with and come up with whacky new ideas!</p><p>I hope you find this session companion article useful to further support the Data+AI Summit talk material.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=8ef5b67b9acb" width="1" height="1" alt=""><hr><p><a href="https://levelup.gitconnected.com/data-ai-2020-spark-inception-8ef5b67b9acb">Data+AI 2022: Spark Inception:</a> was originally published in <a href="https://levelup.gitconnected.com">Level Up Coding</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Towards Analytics with Redis]]></title>
            <link>https://levelup.gitconnected.com/towards-analytics-with-redis-f2bccb6db77c?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/f2bccb6db77c</guid>
            <category><![CDATA[data]]></category>
            <category><![CDATA[data-structures]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[analytics]]></category>
            <category><![CDATA[redis]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Fri, 25 Feb 2022 07:11:14 GMT</pubDate>
            <atom:updated>2022-02-27T18:58:25.264Z</atom:updated>
            <content:encoded><![CDATA[<h4>Using Hashes and Binary Data Structures</h4><figure><img alt="Shows Redis Logo and SQL with a question mark? Asking the question if the Redis Data Structure Store can be used for analytics using SQL-Like Capabilities" src="https://cdn-images-1.medium.com/max/1024/1*1DgrG1ysbUABvMSXMrlbwA.jpeg" /><figcaption>Image by Author. Redis Logo via <a href="https://redis.io/">https://redis.io/</a>. Copyright Redis LTD.</figcaption></figure><p>You are probably wondering why anyone would do any kind of analytical processing directly in Redis. Why not just stick to traditional OLAP-style SQL?</p><p>I’m glad you asked. Redis has been around for a while now and is a proven, high-performance, low-maintenance database and excellent multipurpose cache. Redis is special because it is actually a <strong>Data Structure</strong> store with superior random access to complex data representations: from <a href="https://redis.io/commands#bitmap"><strong>Bitmaps</strong></a> and <a href="https://redis.io/commands#hash"><strong>Hash</strong></a> sets to the real-time <a href="https://redis.io/commands#stream"><strong>Stream</strong></a> type, which is essentially a <a href="https://redis.io/commands#sorted-set">Sorted Set</a> of rows sorted by insert time order where each row is represented by a Hash set like entry.</p><p>Getting back on track, Redis supports access patterns that aren’t available with respect to conventional analytical processing databases which means you can flex your creative muscle to solve problems that would otherwise require you to support multiple alternative databases, streaming platforms (Kafka/Pulsar), and API services to achieve the *same results.</p><p>Next, we’ll look at how to use the Bitmap, Hash, and Stream data types to solve actual problems.</p><h4>Get Redis up and Running on Docker</h4><p>You can follow along by spinning up a single node Redis cluster using Docker using the following command from your terminal (or favorite shell program).(*Requires <a href="https://www.docker.com/products/docker-desktop">Docker to be installed</a> on your machine).</p><pre>docker network create --driver bridge analytics<br>docker run --network analytics -p 6379:6379 --name redis6 -e ALLOW_EMPTY_PASSWORD=yes -dP redis redis-server</pre><p>The command (you just ran) will download the <a href="https://hub.docker.com/_/redis">latest official stable Redis image</a> (6.2.6 at the time of writing) unless you have redis:latest downloaded to your docker image cache (you can check using: docker images redis), and then it will start up the container (run) using a bridge to your host (laptop / computer) network named <em>analytics </em>as a detached -d process. The detached process just means that the stdout / stderr container logs are not piped through to your terminal, you can reattach to the container later if you need.</p><p>Next, we’ll just do a simple sanity check that Redis is running. In the same terminal window, you can use the docker exec command to access and execute the <strong>redis-cli</strong> (aka the redis shell) program.</p><pre>docker exec -it redis6 redis-cli</pre><p>You will see the <em>127.0.0.1:6379&gt;</em> prompt which means you are ready to rock and roll my friend!</p><blockquote><strong>Note on Container Behavior: </strong><em>We created a named container called</em> <strong><em>redis6</em></strong> in the docker run <em>command earlier. This lets us easily access the container using the docker exec command using the name you supplied (redis6) vs the random name given by the Docker runtime. The side effect is that you will have to stop and remove this container to make container config modifications. For example, if you try to run the command (from above) again you’ll see the following error message:</em></blockquote><blockquote>docker: Error response from daemon: Conflict. The container name “/redis6” is already in use by container “{long container uuid}”. You have to remove (or rename) that container to be able to reuse that name.</blockquote><blockquote><em>You can fix this little problem by simply executing the following: </em><em>docker stop redis6 &amp;&amp; docker rm redis6.</em></blockquote><blockquote><strong><em>Last little gotcha with Data Persistence</em></strong><em>. Because the Redis database (stored as an </em>rdb<em> file) is created and stored internally to the container (redis6), this more importantly means that when you execute the stop &amp; </em>remove<em> command that you’ll also be deleting any data stored in the database.</em></blockquote><blockquote><em>This is because </em>containers<em> are architected to be immutable (non-alterable) for the lifecycle of a specific tag and sha256 pair, while the container can be changed using environment variables and volumeMounts to temporarily augment the container. The actual container image has zero memory regarding your modifications. This guarantees you’ll have a reliable and consistent runtime environment with respect to the state of the container following a strict no surprises rule in terms of the file system, os, etc.</em></blockquote><blockquote><em>You can solve the data persistence problem, lack of data persistence, using </em><a href="https://docs.docker.com/storage/volumes/"><em>volumes</em></a><em> — which are persistent disks (or memory) slices attached to a particular path in the container. Volumes are commonly used to mount secrets and other configs to the common base container image.</em></blockquote><p><strong>Creating and using a Persistent Volume</strong><em><br></em>Now to modify things before we get started. You can go ahead and stop and remove the redis6 container if you created it earlier, then move ahead with the following command to create the redis6 volume.</p><pre>docker volume create redis6</pre><p>Now that you have a persistent volume created you can mount the volume to your Redis container and then you’ll have a way to get back to the data (unless you loose your disk).</p><blockquote><strong>Note</strong>: the total disk space allocated for the Docker runtime on your Host machine will be used for this volume. You can view all docker volumes using docker volume ls and specific information for the redis6 volume using docker volume inspect redis6.</blockquote><h4>Starting Redis back up with a Persistent Volume and Consistent Redis Image</h4><p>Like I covered in the note regarding container behavior above, the first quick start command created a transient Redis container that will forget any changes made to it during the runtime of the container. Luckily, using a persistent volume means your data will exist between stop/remove cycles so you can change container configurations, etc with minimal concern.</p><pre>docker run \<br> — name redis6 \<br> — network analytics \<br> -v redis6:/data \<br> -p 6379:6379 \<br> -e ALLOW_EMPTY_PASSWORD=yes \<br> -dP \ redis:6.2.6@sha256:eaa2697033320e8b58b920f5f4aa0c56e8fa5eacf9094168504c7eeabbac8468 redis-server</pre><p>Now let’s move on and learn to store daily and weekly data using Redis Hashes.</p><h4>Data Structure Pattern: Storing Daily and Weekly Hourly Totals</h4><p>Glad you’re still reading along.</p><p><strong>Data Structure</strong>: <a href="https://redis.io/topics/data-types#hashes">Hash</a><br><strong>Key Pattern</strong>: {key-prefix}:{version}:{region}:{dataset}:{identifier_tag}:{date-pattern}<br><strong>Example Key</strong>:acme:v1:us:carts:completed:2022:w:1</p><p>In the above example key we are using the <em>numeric week of the year</em>. This way no one needs to bother with date math, and we can let Redis efficiently compress the shorter key in the global Redis keyspace. The value of date-pattern is {w:1} for week 1 of 2022. <em>*The date pattern can also be simplified to your own use cases. Just make sure you try some things to see what works or follow other best practices like using yyyy:MM:dd style string patterns.</em></p><blockquote>Objective: Store the hour of day across 7 days in the week for the hourly totals</blockquote><p>The Redis hash set associated with our key (<strong>acme:v1:us:carts:completed:w:1</strong>) stores a collection of day of week (0–6) and hour of day (0–23) pointers, for example [0–6]:[0–23] stores the numeric value associated with the [total completed carts] for the exclusive hour of the specific day of the week.</p><p>So how do we set this value?</p><h4>Writing the Hourly Cart Data using HSET</h4><p>Using the Redis cli you can set the total number of completed cart transactions for Sunday @ 00:00:00–00:59:59 into the hash: The signature for adding one or more key value pairs to the common Hash is HSET key field value [field value...]</p><pre>127.0.0.1:6379&gt; HSET acme:v1:us:carts:completed:w:1 0:0 100000<br>~~~</pre><blockquote>Tip: Now you wouldn’t expect to have someone manually entering all of this data, rather you would write this analytical data as a by product of a pipeline job or streaming aggregation into Redis for the fast access from other data systems within your data ecosystem, to accomplish the following: a.) to cache the data for more complex use cases, or b.) to protect more expensive calls to your source of truth data sources.</blockquote><h4>Fetching the Cart Data using HGETALL</h4><p>To collect all keys and completed cart totals (168 total possible keys if we do <em>0–6:0–23</em> for all hours of the week records) available in the hash map, or to collect a subset or partial week of data, you can use <strong><em>HGETALL</em></strong> to get all tuples in the <strong><em>Hash</em></strong> associated with the <em>Key</em>.</p><pre>127.0.0.1:6379&gt; HGETALL acme:v1:us:carts:completed:w:1<br> 1) &quot;0:0&quot;<br> 2) &quot;100000&quot;<br> 3) &quot;0:1&quot;<br> 4) &quot;120000&quot;<br> 5) &quot;0:2&quot;<br> 6) &quot;150000&quot;<br> 7) &quot;0:3&quot;<br> 8&quot;250000&quot;<br> 9) &quot;0:4&quot;<br>10) &quot;290000&quot;<br>11) &quot;0:5&quot;<br>12) &quot;450000&quot;<br>13) &quot;0:6&quot;<br>14) &quot;450000&quot;<br>15) &quot;0:7&quot;<br>16) &quot;450000&quot;<br>17) &quot;0:8&quot;<br>18) &quot;450000&quot;<br>19) &quot;0:9&quot;<br>20) &quot;1200000&quot;<br>21) &quot;0:10&quot;<br>22) &quot;1800000&quot;<br>23) &quot;0:11&quot;<br>24) &quot;2300000&quot;<br>25) &quot;0:12&quot;<br>26) &quot;2900000&quot;<br>27) &quot;0:13&quot;<br>28) &quot;3600000&quot;<br>29) &quot;0:14&quot;<br>30) &quot;2600000&quot;<br>31) &quot;0:15&quot;<br>32) &quot;2200000&quot;<br>33) &quot;0:16&quot;<br>34) &quot;1200000&quot;<br>35) &quot;0:17&quot;<br>36) &quot;900000&quot;<br>37) &quot;0:18&quot;<br>38) &quot;1000000&quot;<br>39) &quot;0:19&quot;<br>40) &quot;1200000&quot;<br>41) &quot;0:20&quot;<br>42) &quot;1600000&quot;<br>43) &quot;0:21&quot;<br>44) &quot;1300000&quot;<br>45) &quot;0:22&quot;<br>46) &quot;600000&quot;<br>47) &quot;0:23&quot;<br>48) &quot;650000&quot;<br>...</pre><p>The underlying metrics (cart totals) shown in the output above are all efficiently stored internally as unsigned integers, unless you add negative numbers <em>(which you wouldn’t want to since 0 sales is the least amount unless you start thinking about returns — but that would complicate things at this point in the explanation)</em>. Aside from being able to fetch your analytic data for use in dashboards or other APIs for internal or external customers, you can also fetch individual tuples by signature (day_of_week:hour_of_day) as a basis of comparison to compose higher order analytics (like the abandonment rate, or percentage of completed vs started carts per hour), by simply comparing other simple values stored in simple Redis data structures.</p><p>If I generate a full day [0:0…0:23] for Sunday and take a step back to look at the overhead in Redis. I can use the debug object feature to fetch the <em>encoding</em> and <em>serializedlength</em> properties of the data structure represented by the key from earlier with a minor change in the key pattern (d represents a day, while w represented a week — this pattern lets you use days of the year or weeks of the year) without changing the shared common prefix of the key (<em>acme:v1:us:carts:{…}</em>).</p><pre>127.0.0.1:6379&gt; debug object acme:v1:us:carts:completed:d:1<br>Value at:0x7ff7c4866670 refcount:1 encoding:ziplist serializedlength:217 lru:8641926 lru_seconds_idle:30</pre><p>The output of calling debug object informs us that storing one full day of unsigned integers takes up just 217 bytes per day. Furthermore, due to the size of the Hash Set (24 string/uint32 tuples) Redis encodes the data using <a href="https://redis.com/ebook/part-2-core-concepts/01chapter-9-reducing-memory-use/9-1-short-structures/9-1-1-the-ziplist-representation/">ziplist</a> encoding for up to a default of 512 entries.</p><p>Now, back to using the Hash data structure.</p><h4>Fetch the hourly total for Sunday at 12pm UTC</h4><p>In order to fetch an entry, we just need to know the position. Sunday is 0 (while Saturday would be 6), and the hour offset is 11 for the 12th hour. You can adjust the pointers to begin with 1 for sanities sake as well.</p><pre>127.0.0.1:6379&gt; HGET acme:v1:us:carts:completed:w:1 0:12<br>&quot;2300000&quot;</pre><blockquote>Tip: Creating a RedisKey Builder library or Specific High Level SDK can assist with the complexities introduced by using Redis like I am showing here.</blockquote><pre>val total = Carts({version:1,region:us,path:acme})<br>  .weekOf(&quot;2022-02-21&quot;)<br>  .completed()<br>  .getOrElse(...)</pre><p>Next. We will look at storing boolean data using Bitfields.</p><h4>Data Structure Pattern: Store Daily User Activity</h4><p>This next pattern will show you how to efficiently store daily user activity using Bitmaps in Redis.</p><blockquote><strong>Note:</strong> Many of us have written or at least read about how to build a registration system. Integer based identifiers vs UUIDs can be a hot topic. Given the maximum integer value is ~2.1 billion, let’s just assume for the point of this example that you have a distributed registration system ensuring that each customer (User) in your system gets their own auto incrementing id, and your scale is less than 2.14 billion. If we use the integer to represent the User, then we get a much smaller key which can help to reduce overhead in the key space, otherwise UUID v4 is fine but will come with additional overhead.</blockquote><p><strong>Data Structure</strong>: Binary Strings (Byte Array with Bit manipulation)<br><strong>Key Pattern</strong>: {key-prefix}:{version}:{region}:{dataset}:{identifier_tag}:{date-pattern:yyyy:MM}<br><strong>Example Key</strong>:acme:v1:us:users:active:0:2022-02</p><p>The example key above corresponds to the user with the integer id of 0. If using UUIDs, then you might see a key like acme:v1:us:users:active:917dfa61–3c56–4e59-a595–37bb20c40a1d:2022–02 which has some additional memory overhead due to the length of the key.</p><h4>Setting Daily Activity Bits</h4><p>With the red tape out of the way (see the note just above). Let’s dive in with User zero. We will set the active/inactive stats for the user using SETBIT.</p><p>We will start out here with setting the first 3 days of the month. [0–2], active on the 1st and the 3rd of the month of February 2022. We will add the first record, check the memory overhead, and then add the other two days.</p><pre>127.0.0.1:6379&gt; SETBIT acme:v1:us:users:active:2022–02 0 1</pre><p>Check the memory overhead here.</p><pre>127.0.0.1:6379&gt; memory usage acme:v1:us:users:active:2022–02<br>(integer) 76</pre><p>Now add two additional days.</p><pre>127.0.0.1:6379&gt; SETBIT acme:v1:us:users:active:2022–02 1 0<br>127.0.0.1:6379&gt; SETBIT acme:v1:us:users:active:2022–02 2 1</pre><p>This process is probably getting tedious now. Given we can add more values to the Bitmap using the <a href="https://redis.io/commands/bitfield">BITFIELD</a> command. Let’s set the rest of the days in the first 7 days of February using the Bitfield command.</p><pre>BITFIELD acme:v1:us:users:active:2022–02 SET u1 3 1 SET u1 4 1 SET u1 5 0 SET u1 6 0</pre><blockquote>The BITFIELD command is <em>variadic, </em>meaning it can take zero or more additional SET {encoding} {offset} {value} pairs. Here we set the days 3–6 for the user. Interestingly enough because we are dealing with bits here, we don’t change the memory overhead required to store this information due to the &lt; 8 values set on the initial byte array. 1 byte is 8 bits and we need some buffer space too. We can confirm this by calling debug object acme:v1:us:users:active:2022–02. You’ll see the serializedlength is still 2 bytes.</blockquote><blockquote>127.0.0.1:6379&gt; debug object acme:v1:us:users:active:2022–02<br>Value at:0xf55d8330 refcount:1 encoding:raw serializedlength:2 lru:1578061 lru_seconds_idle:46</blockquote><p>Given the default value for each bit is 0, we don’t actually need to do anything when it comes to inactive days. We just need to eventually allocate a large enough binary string to hold the offsets for future days. The Redis bitmap is just access methods on a binary encoded string.</p><p>Let’s assume our User in this use case (above) goes away on vacation for 7 days, and then starts intermittently coming back to whatever awesome service you provide. We wouldn’t need to do anything until adding a value the next time the User is active. Let’s look at the support for “Sparse” fields next.</p><h4>Sparse Field Support</h4><p>This year February has 28 days. Our bit offset index is 0–27 values to ultimately account for. We’ve filled in active / inactibive bits for the first 7 days [0–6]. We skip the following 7 days [7–14] since the user is on vacation. They hit the service again when coming back on the 16th of the month. <em>*Yes it is just as confusing to write this as it is to read, but we don’t want to waste bits!</em></p><p>The following sets the active user bit for the 16th, 19th, 24th and the 28th of the month.</p><pre>BITFIELD acme:v1:us:users:active:2022–02 SET u1 15 1 SET u1 18 1 SET u1 23 1 SET u1 27 1</pre><p>Now we can see how the memory was allocated behind the Redis scenes enabling the backing byte array to grow dynamically on account for our user being active on specific days of the week within the 28 days encapsulating the month of February.</p><pre>127.0.0.1:6379&gt; memory usage acme:v1:us:users:active:2022–02<br>(integer) 84</pre><pre>127.0.0.1:6379&gt; debug object acme:v1:us:users:active:2022–02<br>Value at:0xf55d8330 refcount:1 encoding:raw serializedlength:5 lru:1579962 lru_seconds_idle:53</pre><h4>Finding the Total Active Days of the Month</h4><p>Now what is interesting here is that we can figure out if a User has been active or inactive (for the entire month) because there would be no record for the User in the case where no key was created. Check. Interestingly enough, we can also figure out how many days (in total) a user has been active by checking the <a href="https://redis.io/commands/bitcount"><strong>BITCOUNT</strong></a> of the bitmap.</p><pre>127.0.0.1:6379&gt; bitcount acme:v1:us:users:active:2022–02<br>(integer) 7</pre><p>So we know that the user has been active for 7 days. The bitcount operation is O(N) but given the small size it acts like O(1). I don’t know many alternatives to constant time analytics. Consider how much more efficient this is (yes, it is more difficult and not super simple to follow, however…) the alternatives to this approach require querying User activity in what may be a computationally expensive query over activity data stored in your Data Warehouse, or god forbid, your Data Lake!</p><h4>Figuring out if a User was active on a specific Day of the month</h4><p>This final example just shows how to use the BITFIELD command to get a specific value by offset, in our case, the day-1 index. We will check two days, we explicitly set the value of the 24th of the month, but never added a record for the 22nd.</p><pre>127.0.0.1:6379&gt; BITFIELD acme:v1:us:users:active:2022–02 GET u1 21<br>1) (integer) 0</pre><pre>127.0.0.1:6379&gt; BITFIELD acme:v1:us:users:active:2022–02 GET u1 23<br>1) (integer) 1</pre><p>We get back an array of integer response. This is because we have the option of getting multiple values returned from our Bitmap (just like the SET functionality was variadic, so is the GET).</p><pre>127.0.0.1:6379&gt; BITFIELD acme:v1:us:users:active:2022–02 GET u1 21 GET u1 23<br>1) (integer) 0<br>2) (integer) 1</pre><h4>Redis Streams</h4><p>I will cover how to use the Redis Streams data type in a book I have coming out in April 2022 called Modern Data Engineering with Apache Spark. It covers more advanced use cases than this blog post can cover, with hands-on material you can take with you no matter where your career takes you.</p><p><a href="https://www.amazon.com/Modern-Engineering-Apache-Spark-Hands/dp/1484274512/">Modern Data Engineering with Apache Spark: A Hands-On Guide for Building Mission-Critical Streaming Applications</a></p><h4>Conclusion</h4><p>I hope you found this introduction to using Redis on Docker for analytics exciting or at least interesting. Redis is a fascinating swiss-army knife in your data engineering and engineering toolbox. It is also just lovable. I was lucky enough to “have to” use Redis back in 2012 and ten years later I still love what I am working with.</p><blockquote>*Yes. I am purposely skipping the conversation about limitations of Redis clustering, cost of memory vs flash vs ssd vs hdd, economies of scale on humongous data stores with multiple millions of keys, pros/cons of OLAP databases like <a href="https://druid.apache.org/">Druid</a> / <a href="https://pinot.apache.org/">Pinot</a> / <a href="https://clickhouse.com/">ClickHouse</a>, and substantiating my above argument regarding simplifying the data stack with Redis with actual hard facts. Data decision making and selection of technology is multi faceted and this post is intended to introduce you to a nuanced approach to how to serve data.</blockquote><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=f2bccb6db77c" width="1" height="1" alt=""><hr><p><a href="https://levelup.gitconnected.com/towards-analytics-with-redis-f2bccb6db77c">Towards Analytics with Redis</a> was originally published in <a href="https://levelup.gitconnected.com">Level Up Coding</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Problems All Writers Face]]></title>
            <link>https://newfrontcreative.medium.com/problems-all-writers-face-c7563c395b69?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/c7563c395b69</guid>
            <category><![CDATA[publishing]]></category>
            <category><![CDATA[writing-life]]></category>
            <category><![CDATA[lessons-learned]]></category>
            <category><![CDATA[personal-development]]></category>
            <category><![CDATA[data-engineering]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Wed, 26 May 2021 15:30:30 GMT</pubDate>
            <atom:updated>2023-06-01T05:56:26.155Z</atom:updated>
            <content:encoded><![CDATA[<h4>Learning to break mental blocks and unstuck yourself</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Uy743Kdn33-qZ_I0tXOrjQ@2x.jpeg" /></figure><p>What inpires you? Photo via <a href="https://medium.com/u/2053395ac335">Unsplash</a></p><h3>In Pursuit of Writing ✍️</h3><p>I’ve been writing off and on now for the better half of a decade. Mostly technical writing here on medium now a days, but over the course of the past few years, I’ve had a few articles published, some even showed up in printed magazines, and I was offered an opportunity in 2019 to take a risk and work on my first book.</p><p><a href="https://learning.oreilly.com/library/view/the-rise-of/9781492073697/">The Rise of Operational Analytics</a></p><h3>In it for the long haul</h3><p>I’m working on a new book now and it has me writing many nights and every weekend. This will be the longest thing I’ve ever written weighing in at around 400 pages (or more as there seems to be an endless amount of new content that fits the bill).</p><h4>Understanding My Weaknesses</h4><p>This project has taught me many things about myself, for instance I realized early on that I am terrible at estimating the actual time it takes to write each draft of a chapter. See, I have a problem context switching from left brain to right brain activities and the book I am writing is a hands-on book on Modern Data Engineering.</p><p>This means that each chapter has some flourish, setting up the chapter, connecting the dots from earlier chapters, laying a concrete foundation for more complex and advanced topics. But moving from the setup and the theory, to easily digestible “Fully functioning” code is always like pulling the ebrake at 90 miles an hour. I’m in for a bumpy and dangerous transition. Writing tutorials and trainings takes a surprising amount of time. When I used to present and do live trainings it would take around a month to put together the content and I would workshop the material at conferences and meetups sometimes for over a year like with the example below.</p><iframe src="https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2FhoNa4meDX_c%3Ffeature%3Doembed&amp;display_name=YouTube&amp;url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DhoNa4meDX_c&amp;image=https%3A%2F%2Fi.ytimg.com%2Fvi%2FhoNa4meDX_c%2Fhqdefault.jpg&amp;key=a19fcc184b9711e1b4764040d3dc5c07&amp;type=text%2Fhtml&amp;schema=youtube" width="854" height="480" frameborder="0" scrolling="no"><a href="https://medium.com/media/9f89b3a714814bc012d246d3bce75259/href">https://medium.com/media/9f89b3a714814bc012d246d3bce75259/href</a></iframe><p>I also have ADHD and while that can help me focus, it also means I have to try harder to keep a train of thought, lest things fall apart due to simple distractions. I found that it helped to sit down to write, put on my headphones and just start writing off my outline. I wouldn’t stop to edit, reread or tweak anything until the idea was out.</p><blockquote>Just get something on the page, since you can always go back and edit it later but interrupting the flow of words is usually a costly mistake.</blockquote><h4>Better Together</h4><p>On the side of wins, I’ve had the pleasure of having conversations with other authors and editors during the past year and it’s been a great opportunity to learn about the publishing industry and what it means to be a writer in the competitive field of technology and computer science. I also became a technical reviewer for other embattled authors and it has been a nice 360 degree immersion into all sides and the many angles viewed and visited from first page to publication.</p><p>Through this process I’ve also had the opportunity to really reflect on my time spent teaching and giving tech talks and learned a lot about myself through the course of this journey. Part of me thinks this process has helped me grow as an author, part of me hates myself for even going down this path.</p><blockquote>What is life but a balancing act between idle time and deadlines</blockquote><h4>The Struggle</h4><p>The things I’ve struggled most with during the process have been finding inspiration, removing distractions from my writing environment, and learning to live with the healthy fear that no matter how many edits, and read throughs, that I’ll always find one or more things that I can tweak. A slight improvement here or there (I’m doing a meta edit as we speak). There is no perfect draft of a chapter.</p><p>I also learned that a good development editor is really key to help grind down and polish what feels horribly unfinished, slightly rushed and offer up simple suggestions that yield fantastic enhancements that would have never happened without the give and take.</p><p>Also communication is key, sharing my doubts, stress, writers block, new ideas and avenues of writers drift. Essentially, I brought traditional software feature creep into my writing as things were moving faster (from a technology point of view) and out pacing my ability to write and deliver new chapters that didn’t feel dated the day they were delivered.</p><p>Ultimately, I learned to share early and often, and looked forwards to the feedback, notes and conversations.</p><p>Let’s walk through what I’ve distilled as the problems all authors will face.</p><h4>Inspiration</h4><p>Most people get inspired by listening to music, going out, going on or even just planning a new and exciting adventure somewhere. Simply put we escape our normal lives and leave our emotional baggage at the door. Getting out to do something new and adventurous can naturally produce seratonin and dopamine, which trigger the brain to experience happiness, motivation and more importantly creativity. All which have been harder with the pandemic raging, so finding inspiration in the little things has been key.</p><p>This simple act can help to break up a day that could have otherwise be just a grind. If you don’t have the luxury of being able to just get up and take off then you can look for inspiration by getting up and leaving the room your writing in and just wait for inspiration to return. I will sometimes just wake up in the morning and I feel the words assembling into sentences and I have to write cause clearly it’s a words day.</p><blockquote>It feels like a words day</blockquote><h4>Words Days</h4><p>Word days, for me at least, don’t just happen all the time. I would love for them to just be everyday but as a mental process that I can’t control there is no way of arguing with it. Honestly, I think the words are really just the end result of the mind just chewing on a new idea and giving you no deadline, so it is always refreshingly serendipitous.</p><p>Unfortunately, writing has its ups and downs.</p><h3>Distraction</h3><p>There is no better problem left unsolved as there is with distraction. Since with enough distraction you’ll never remember where you began. Jokes aside, this is the part where I bring up my ADHD for the second time. It is easy for me to get distracted. I learned to fight it and also cater to it. I might find myself running some errand and suddenly I inspiration hits in unexpected ways, and this random serendipity seemed to be come alongside crushing fear and anxiety, running up to the deadline on a deliverable. I’d much rather be focused than distracted, but sometimes distraction is what the mind needs to complete a thought without “You” interrupting it.</p><h4>In the End</h4><p>I’m not done with the book yet. I’m close though and I’m doing the best I can to balance my career, family, friends and getting this book out without giving up cycles of rest and downtime or sacrificing my personal life in pursuit of publication. No one tells you that books are emotionally and physically demanding, and they yearn to be completed. But sometimes, it is good to just sit back, relax and write something else to help the words flow across a different medium.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=c7563c395b69" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Analytical Hashing Techniques]]></title>
            <link>https://medium.com/data-science/analytical-hashing-techniques-5a0393f3fc1c?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/5a0393f3fc1c</guid>
            <category><![CDATA[computer-science]]></category>
            <category><![CDATA[cryptography]]></category>
            <category><![CDATA[apache-spark]]></category>
            <category><![CDATA[analytics]]></category>
            <category><![CDATA[data-engineering]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Thu, 11 Mar 2021 01:52:22 GMT</pubDate>
            <atom:updated>2021-03-11T07:46:47.976Z</atom:updated>
            <content:encoded><![CDATA[<h4>Spark SQL Functions to Simplify your Life</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*UJOY9x38Az5YC6fM-yWP9g.png" /><figcaption>Photo Credit: <a href="https://unsplash.com/@swimstaralex">https://unsplash.com/@swimstaralex</a></figcaption></figure><p>Anyone working in the field of analytics and machine learning will eventually need to generate strong composite grouping keys, and idempotent identifiers, for the data they are working with. These cryptographically strong identifiers help to reduce the amount of effort required to do complex bucketing, deduplication, and a slew of other important tasks.</p><p>We will look at two ways of generating hashes:</p><ol><li>Using Base64 Encoding and String Concatenation</li><li>Using Murmur Hashing &amp; Base64 Encoding</li></ol><h4>Spark SQL Functions</h4><p>The <a href="https://spark.apache.org/docs/latest/api/sql/index.html">core spark sql functions library</a> is a prebuilt library with over 300 common SQL functions. However, looking at the functions index and simply listing things isn’t as memorable as running the code itself. If you have the <strong>spark-shell</strong>, then you can follow along and learn some analytical hashing techniques.</p><h4>Spin up Spark</h4><pre>$SPARK_HOME/bin/spark-shell</pre><figure><img alt="Image contents shows the Apache Spark Shell environment running Spark 3.1.1 on Java 11 and using Scala 2.12" src="https://cdn-images-1.medium.com/max/964/1*LvjeiFkfZc8Pz9BdJqfing.png" /><figcaption>Above: Reference to my Shell Environment</figcaption></figure><p>With the spark-shell up and running, you can follow the next step just by running <strong>:paste</strong> in the shell to paste multiline. <em>*(:paste, then paste the code, and then cmd+D to process the code)</em></p><h4>Import the Libraries and Implicits</h4><pre>import org.apache.spark.sql._<br>import org.apache.spark.sql.functions._<br>import org.apache.spark.sql.types._<br>import spark.implicits._</pre><h4>Create a DataFrame</h4><pre>val schema = new StructType()<br>  .add(StructField(&quot;name&quot;, StringType, true))<br>  .add(StructField(&quot;emotion&quot;, StringType, true))<br>  .add(StructField(&quot;uuid&quot;, IntegerType, true))</pre><pre>val df = spark<br>  .createDataFrame(<br>    spark.sparkContext.parallelize(<br>      Seq(<br>        Row(&quot;happy&quot;,&quot;smile&quot;, 1),Row(&quot;angry&quot;, &quot;frown&quot;, 2))<br>      ),<br>      schema<br>    )</pre><p>At this point you should have a very simple DataFrame that you can now apply the Spark SQL Functions to. The contents of which are shown using <strong>df.show()</strong>.</p><pre>scala&gt; df.show()</pre><pre>+-----+-------+----+<br>| name|emotion|uuid|<br>+-----+-------+----+<br>|happy|  smile|   1|<br>|angry|  frown|   2|<br>+-----+-------+----+</pre><p>Now we have a simple data frame. Next we can add a base64 encoder column to the DataFrame simply by using the <strong>withColumn</strong> function and passing in the Spark SQL Functions we want to use.</p><h4>Hashing Strings</h4><p>Base64 Encoded String Values</p><pre>val hashed = df.withColumn(<br>  &quot;hash&quot;, base64(<br>    concat_ws(&quot;-&quot;, $&quot;name&quot;, $&quot;emotion&quot;)<br>  )<br>)</pre><p>The results of this transformation yield us a new column that is the result of base64 encoding the concatenated string values from the columns <strong>name</strong> and <strong>emotion</strong>. This is broken down as the following flow.</p><pre>df.withColumn(&quot;concat&quot;,<br>  concat_ws(&quot;-&quot;,$&quot;name&quot;,$&quot;emotion&quot;))<br>  .select(&quot;concat&quot;)<br>  .show</pre><pre>+-----------+<br>|     concat|<br>+-----------+<br>|happy-smile|<br>|angry-frown|<br>+-----------+</pre><p>The end result of the full columnar expression is as follows.</p><pre>scala&gt; hashed.show()<br>+-----+-------+----+----------------+<br>| name|emotion|uuid|            hash|<br>+-----+-------+----+----------------+<br>|happy|  smile|   1|aGFwcHktc21pbGU=|<br>|angry|  frown|   2|YW5ncnktZnJvd24=|<br>+-----+-------+----+----------------+</pre><p>Nice. Right.</p><p>Next. We can look at a stronger technique for hashing. This uses the Murmur3 Hashing algorithm, and explicit binary transformations before feeding into the base64 encoder.</p><h4>Murmur Hashing and Binary Encoding</h4><p>There are many ways to generate a hash, and the application of hashing can be used from bucketing, to graph traversal. When you want to create strong hash codes you can rely on different hashing techniques from <a href="https://en.wikipedia.org/wiki/Cyclic_redundancy_check">Cyclic Redundancy Checks (CRC)</a>, to the efficient <a href="https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3">Murmur Hash (v3)</a>. We will use what we can get for free in Spark which is the Murmur3.</p><pre>hashed.withColumn(&quot;binhash&quot;,<br>  base64(bin(hash($&quot;name&quot;,$&quot;emotion&quot;)))<br>)<br>.select(&quot;uuid&quot;, &quot;hash&quot;, &quot;binhash&quot;)<br>.show(false)</pre><p>Which will return the following rows (comparing the two hashing methods) based on the same input data.</p><pre>+----+----------------+--------------------------------------------+<br>|uuid|hash            |binhash                                     |<br>+----+----------------+--------------------------------------------+<br>|1   |aGFwcHktc21pbGU=|MTAxMTEwMDAxMTAwMDAwMTAwMDAwMDEwMTExMDAxMA==|<br>|2   |YW5ncnktZnJvd24=|MTEwMTAwMDEwMTExMTExMDEwMDAwMDExMDAxMTAxMA==|<br>+----+----------------+--------------------------------------------+</pre><h4>Looking at the Spark Code Generation</h4><p>If you are curious to see how Spark works behind the scenes there is a great new feature of the <strong>explain</strong> function that will enable you to view the code that Spark generates (and optimizes) for your transformations. To view this, all you need to do is the following.</p><pre>hashed.withColumn(&quot;binhash&quot;,<br>  base64(bin(hash($&quot;name&quot;,$&quot;emotion&quot;)))<br>)<br>.select(&quot;binhash&quot;)<br>.explain(&quot;codegen&quot;)</pre><p>This will output the java code and explain more about your computation.</p><figure><img alt="Output a lot of Java code that has been optimized for Spark Catalyst" src="https://cdn-images-1.medium.com/max/1024/1*3RHPyaDwgZTu0zbhN-qugA.png" /><figcaption>Above : Looking at the Spark Code Generation</figcaption></figure><p>This code is part of <a href="https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html">Spark’s Catalyst Optimizer</a> and luckily there is a high probability you will never have to work at this lower level and can likely just go on with your life. But if you are writing custom data source readers and writers then this will likely be something you will want to deep dive into. If nothing else you can learn more about the underlying mechanics, in the example use case from above, the codegen details the use of the murmur hash library being used. This is a nice tool for debugging and for those who just want to learn in a 360 degree model.</p><h4>Summary</h4><p>You now have two more techniques that you can use in order to create strong composite keys or to use as a spring board for creating idempotent keys. I just thought it would be fun to share these techniques since they come in handy and reuse the core libraries that ship along side Spark. Happy Trails.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=5a0393f3fc1c" width="1" height="1" alt=""><hr><p><a href="https://medium.com/data-science/analytical-hashing-techniques-5a0393f3fc1c">Analytical Hashing Techniques</a> was originally published in <a href="https://medium.com/data-science">TDS Archive</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Spark on Kubernetes]]></title>
            <link>https://levelup.gitconnected.com/spark-on-kubernetes-3d822969f85b?source=rss-3b4cab6af83e------2</link>
            <guid isPermaLink="false">https://medium.com/p/3d822969f85b</guid>
            <category><![CDATA[docker]]></category>
            <category><![CDATA[apache-spark]]></category>
            <category><![CDATA[devops]]></category>
            <category><![CDATA[kubernetes]]></category>
            <category><![CDATA[engineering]]></category>
            <dc:creator><![CDATA[Scott Haines]]></dc:creator>
            <pubDate>Tue, 29 Sep 2020 06:42:56 GMT</pubDate>
            <atom:updated>2022-01-11T07:40:35.044Z</atom:updated>
            <cc:license>http://creativecommons.org/licenses/by/4.0/</cc:license>
            <content:encoded><![CDATA[<h4>A Hands-On Introduction: Getting Up and Running.</h4><figure><img alt="An Orchestra Playing a song. Violins and people fading into the background" src="https://cdn-images-1.medium.com/max/1024/1*2tjWcbrgNiubNZlyKdc3sw.jpeg" /><figcaption>Photo by <a href="https://unsplash.com/@larisabirta?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText">Larisa Birta</a> on <a href="https://unsplash.com/s/photos/orchestra?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText">Unsplash</a></figcaption></figure><p>This tutorial is aimed at engineers who want to understand how to get up and running on <em>Spark on Kubernetes</em>. It is my hope that you will be able to use the skills developed across this series in order to become proficient at building and deploying Spark applications using the Kubernetes scheduler.</p><p>Given this is the first tutorial in the series, it is also naturally the most simplistic. The idea here is to introduce the concepts and components we will be using across the series, including <a href="https://kubernetes.io/">Kubernetes</a> (K8s), <a href="https://www.docker.com/">Docker</a> and <a href="http://spark.apache.org/">Spark 3.0.1</a>.</p><h4>Pre-Requisites</h4><p>A basic understanding of Apache Spark.</p><p><a href="https://databricks.com/spark/about">Apache Spark™ - What is Spark</a></p><h4>What You Will Learn</h4><p>This first tutorial will cover the following.</p><ol><li>Getting up and Running on Docker Desktop</li><li>Configuring your Docker Environment</li><li>Using the bundled Kubernetes from Docker Desktop</li><li>Installing Kubernetes Dashboard</li><li>Basic Kubernetes Commands</li><li>Local Spark Installation</li><li>How to Build a Custom Spark Docker Image</li><li>How to Push the Custom Spark Image to DockerHub</li><li>Launching your first Spark Application inside of your Local Kubernetes Cluster</li><li>Using the Kubernetes Dashboard to see the Application running</li></ol><p>The next two sections are intended to cover the basics of the technology being used, if you are in a hurry, then you can skip to <strong>Getting up and Running on Docker Desktop </strong>and come back later to learn more about what Kubernetes is, and a little about the <a href="https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md">Spark Kubernetes Operator</a> or Spark’s official Kubernetes scheduler.</p><h3>What is Kubernetes?</h3><figure><img alt="Left: Boxes describing containerized applications with one or more replicas. Middle: Ship Steering Wheel. Right: Success" src="https://cdn-images-1.medium.com/max/1024/1*6bIZkX0MBbBSBh9iS3xHTw.png" /><figcaption><strong>Figure 1–1</strong>: Deploy, auto-scale and heal containerized applications orchestrated by Kubernetes: Image Credit via <a href="https://kubernetes.io/docs/concepts/overview/what-is-kubernetes/">Kubernetes</a> from What is Kubernetes?</figcaption></figure><p><a href="https://kubernetes.io/">Kubernetes</a> in a nutshell is an open-source infrastructure management framework that allows you to provision logical slices of your clustered compute infrastructure in order to deploy, scale and manage containerized applications of varying sizes.</p><p>At first glance parts of this mental model feels familiar. If you’ve been on the DevOps side of things for a while — like I have you have — you’ve probably seen many similar solutions in spirit. From the humble shell script, across different service bootstrapping libraries like <a href="https://capistranorb.com/">Capistrano</a> (remember Ruby), <a href="https://community.chef.io/">Chef</a> (…more ruby), <a href="https://www.ansible.com/">Ansible</a> (python) across in-house, pre-cloud, and cloud based deployments we have all seen things that work well in one place, and fail horribly in another.</p><blockquote>Remember the last time you or someone you knew changed a configuration, restarted a process and forgot the change was made? Only to see the same issue occur again in the next deployment :)</blockquote><h4>Automation</h4><p>We are in the age of computerized automation, and being able to automate the healing process, trigger replacements or autonomously scaling up a fleet of production machines — all based on “infrastructure as code” can be extremely beneficial. What works locally is a mere blueprint for replicating the exact environment so you can rely on durable, dependable deployments across all your environments — this is the gist of it.</p><p><strong>Infrastructure-as-code</strong> enables configurations of services, shared secrets (locations and not the actual secret…cause we don’t check passwords into github!), disk and ram allocations to be checked into your github and released alongside typical CI/CD pipelines. These configurations are Idempotent, meaning that unless there is a real change (config hash changes), then pushing the same configuration to the cluster twice won’t change the services running inside your cluster. At first glance this may seem trivial but Kubernetes is built off of years of Google sized deployments and their best practices come bundled across the stack.</p><p>For your containerized applications, Spark included, everything is wrapped inside of a versioned container that must be released first into your container registry, or to a public registry.</p><p>Kubernetes supports multiple container formats to prevent lock in, but most people you talk to will tell you about what they are doing with Docker.</p><p>See the <a href="https://kubernetes.io/docs/concepts/containers/#container-runtime">Container Runtime</a> information for more details from the official Kubernetes documentation or take a look through the book linked below.</p><h4><strong>Pods</strong></h4><p>Pods wrap your application configuration, machine layout (file system paths, installed OS, packages, etc), shared resources like cpus (cgroups), ram allocations, application configurations, system settings, and even locally accessible shared files via Volumes (python files, other scripts) along with any additional role based access controls.</p><p>The name comes from a pod, which in nature is a protected resource like a “pod of whales” or like the “pea-in-a-pod”. See <a href="https://kubernetes.io/docs/concepts/workloads/pods/">What is a Pod</a> for more details.</p><p>In the case of Spark applications, this application runtime logic is the standard <em>spark.conf.* </em>settings, as well as any shared volumes (disk), application dependencies (jars/python/r etc) and really anything you want fine grained access to when running your spark application.</p><h4>Recommended Reading</h4><p>I wanted to cover the basic lingo but for full end-to-end coverage of Kubernetes I’d recommend the defacto-standard book (below), or just going through the <a href="https://kubernetes.io/docs/home/">documentation</a>.</p><p><a href="https://learning.oreilly.com/library/view/kubernetes-up-and/9781492046523/">Kubernetes: Up and Running, 2nd Edition</a></p><p>Given this post isn’t a primer on Kubernetes, I will cut myself off so we can get into the meat of the tutorial. In the next section we will take a look at Spark on Kubernetes, get an idea of what it is, and then get into setting up the locally environment and running our first deployment.</p><h3>Spark on Kubernetes</h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/761/1*tkWi16Clt_6RjLJNmyLVkg.png" /><figcaption><strong>Figure 1–2</strong>: Spark Driver Running inside a Pod. Image via <a href="https://spark.apache.org/docs/latest/running-on-kubernetes.html">Spark Documentation</a></figcaption></figure><h4>The Kubernetes Scheduler</h4><p>The <a href="https://spark.apache.org/docs/latest/running-on-kubernetes.html">Spark Kubernetes Scheduler</a> allows you to deploy your Apache Spark application inside a containerized package, alongside your application configuration, custom environment variables, shared secrets, and shared disk access via Volume mounts, as a what is know as the Driver Pod. This can be seen as an improvement over deployments on Spark Standalone for the reasons below.</p><h4>From Spark Standalone to Spark Kubernetes</h4><p>If you have been working with Spark in standalone mode this can be a bit different in terms of the overall mental model, here is how it breaks down. With <a href="http://spark.apache.org/docs/latest/spark-standalone.html">Spark Standalone</a> deploy mode, you had to deploy a stand alone Master (cluster coordinator), and for High Availability (HA) a second Master running in Standby mode. <em>These machines don’t have to be beefy machines. 2 cores and 4gb ram with decent network did the trick</em>.</p><p>This did however require <a href="https://zookeeper.apache.org/">Zookeeper</a> to manage the cluster state, eg. What applications were running, what allocations each application had assigned — aka where did my CPU and RAM go? — and lastly what executor instances where running where. For many teams wanting to run Spark there was already a good amount of DevOps overhead just to get the environment ready to run the first Spark app.</p><p>You also had to configure and setup your Spark Worker instances in a first step deployment in order to have a cluster to submit work to.</p><h4>Standalone Spark Versioning</h4><p>There was also the tricky cluster version lock in. If you had a cluster running Spark 2.3, well then you would need to deploy and migrate your applications to a new cluster for Spark 2.4 and beyond. There were of course corner cases where minor releases still functioned appropriately, but in general it always felt better to migrate the applications vs hoping for the best.</p><h4>New Mental Model</h4><p>Now if we think abstractly, Kubernetes is equivalent conceptually to a Spark Cluster. Although this isn’t necessarily true, hear me out. Kubernetes is a general purpose infrastructure management framework that can schedule deployments, autoscale applications, and mitigate common infrastructure pitfalls like hosts going down — with self-healing. There is a rich ecosystem that enables you to easily monitor the behavior of the cluster, via the Kubernetes Dashboard, and you are essentially carving out space in an elastically scalable and highly distributed ecosystem very much like the shared compute resources in a traditional Spark Cluster. <em>Sure, maybe I am reaching.</em></p><h4>BYO-Cluster</h4><p>Now given you already have a Kubernetes cluster provisioned, the really big deal and what I see as the greatest value add is the following. Each Spark application driver essentially allocates an ad-hoc Spark cluster, as assigned through Executor Pods, based off of an immutable Docker image, and idempotent Pod config.</p><blockquote>Each Spark application driver essentially allocates an ad-hoc Spark cluster, as assigned through Executor Pods, based off of an immutable Docker image, and idempotent Pod config</blockquote><p>If you’ve been in the habit of running Spark Standalone and co-located Spark Applications across these shared compute resources, then you also know the growing pains of having to migrate older applications to new versions of Spark to reduce the tech debt accumulated over the years.</p><p>Now this is a eureka! moment, at least for me, given that my team at <a href="https://www.twilio.com/">Twilio</a> has written and is running over 60 different Spark applications across different versions of Spark and also scala.</p><h4>Next Steps</h4><p>Okay. There was a lot to say there and I didn’t really even scratch the surface. Let’s move on to actually doing something hands-on. In the next section we will install and get <a href="https://www.docker.com/products/docker-desktop">Docker Desktop</a> up and running.</p><h3><strong>Getting up and Running on Docker Desktop</strong></h3><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*lafvZDmkB62xcYqGrAHGDQ.png" /><figcaption>Docker Desktop. Image via <a href="https://www.docker.com/products/docker-desktop">Docker</a></figcaption></figure><p>First things first. Let’s get <a href="https://www.docker.com/products/docker-desktop">Docker Desktop</a> installed so we are all on the same even playing field.</p><h4>Installing Docker Desktop</h4><p>This part is fairly easy given the only step is to download the installer for your platform of choice.</p><ol><li>Install <a href="https://www.docker.com/products/docker-desktop">Docker Desktop</a>.</li><li>Create a <a href="https://hub.docker.com/">Docker Hub</a> account. (* only if you want to build and upload custom Spark Images.)</li></ol><p>I tend to use 4 of my CPUs and 10gb RAM for . Depending on your machine size you can increase the resources, if you are running <em>mainly</em> on Docker/K8s vs running mainly on your core system OS. <em>Just remember to leave some RAM for the core system</em>. I tend to leave 1 core, 2gb RAM left on the machine for background operations.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*YpUWzoBDlF1sRwfcA6Y1vw.png" /><figcaption>My laptop is 6-core i9, 64gb ram, 2TB SSD — 16in MacBook Pro. This Setting tends to work well for Demos.</figcaption></figure><h4><strong>Enabling Kubernetes</strong></h4><p>Your Single-Node Cluster is just a click away. Seriously, this was a life saver since it reduced the cognitive burden of getting started with K8s.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*pN7aunP-uYYFfCdk9lJDcQ.png" /><figcaption>Really. Just click Enable Kubernetes and you’re ready to Rock and Roll</figcaption></figure><h4><strong>Verifying that Kubernetes is Running</strong></h4><p>If all has gone well. Which it probably has. Then you can open up your favorite terminal program (Terminal, iTerm, etc) and you should have the K8s command line interface at your disposal.</p><pre>kubectl cluster-info</pre><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*CzMMIG6SilozFa_b8HadbQ.png" /><figcaption>You should see a similar dialog if things are working well.</figcaption></figure><h4><strong>Installing the K8s Dashboard</strong></h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*JASF8D2oTqN0X4fpsdJuEw.png" /><figcaption>The <a href="https://github.com/kubernetes/dashboard#install">Kubernetes Dashboard</a>: Makes it easy to run things yourself</figcaption></figure><p>The most recent set of installation steps can be found on <a href="https://github.com/kubernetes/dashboard#install">Github</a>. Things were much easier than I had first anticipated and hopefully things are as easy for you too. Below is the component you will be installing. Think of this Dashboard as your guide and assistant for common K8s tasks. The integrated Dashboard allows you to click buttons vs remembering the Commands to do common things like stopping deployments, executing commands on containers and so much more.</p><h4>Apply the Dashboard</h4><p>Use the command line tool to apply the Dashboard config. If you want to download the <a href="https://yaml.org/">YAML</a> and see what it actually is doing please do so vs blindly applying this link!</p><pre>kubectl apply -f <a href="https://raw.githubusercontent.com/kubernetes/dashboard/v2.0.4/aio/deploy/recommended.yaml">https://raw.githubusercontent.com/kubernetes/dashboard/v2.0.4/aio/deploy/recommended.yaml</a></pre><p>Then just start the Kubernetes Proxy.</p><p>This proxy will enable you to use localhost:8001 instead of your cluster DNS which can make it easy to find your way back to any UI running on your cluster.</p><pre>kubectl proxy</pre><h4>Opening up the Dashboard</h4><p>Dashboard: <a href="http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#/login">http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#/login</a></p><p>If you try to load the Kubernetes Dashboard you will get Yelled at with <strong>No Token</strong>. This is due to security-by-design. You have done nothing wrong. You may be asking yourself <em>“But wait. I just clicked a button to </em><strong><em>Enable Kubernetes</em></strong><em>. How do I fix it?”</em> since I don’t know where things are even configured. That is okay. You are not alone.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*KgS0itV77r0XfTupgKzQDw.png" /><figcaption>Oh Snap. What do we do now? Image via K8s Dashboard UI</figcaption></figure><h4><strong>Accessing the Default Token</strong></h4><p>When your K8s environment is bootstrapped from Docker Desktop the default system token is generated. You can use the following command to get the token to login to the Dashboard.</p><pre>kubectl -n kube-system describe secret default</pre><p>Using the command above will yield the following. Just copy the <strong>token</strong> string and login.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*PEhpvivgZFFB8EgPqzcPsg.png" /><figcaption>Copy the Token: You’ll paste it into the Auth Dialog</figcaption></figure><p>Now go back, paste in the token, and log into the dashboard.</p><h4><strong>Success</strong>.</h4><p>You should now have your Kubernetes Dashboard Up and Running. Really take a moment to pat yourself on the shoulder. We are learning new things and that is really what this is all about.</p><h3>Installing Spark 3.x</h3><p>Installing Spark is fairly simple to do. Just go to the Downloads page below and select pre-built (tgz) or for the brave get the full source code to build yourself. I’ll wait it will take a few minutes while you download everything.</p><p><a href="http://spark.apache.org/downloads.html">Downloads | Apache Spark</a></p><p>Hopefully you now have the tarball (tgz) downloaded.</p><h4><strong>Create an Install Location</strong></h4><p>Please note. For this tutorial I had spark-3.0.1-bin-hadoop3.2 downloaded.</p><pre>mkdir ~/install &amp;&amp; cd ~/install<br>mv ~/Downloads/spark-3.0.1*3.2.tgz .</pre><h4><strong>Extract the Tarball and Cleanup</strong></h4><pre>tar -xvzf spark-3.0.1-bin-hadoop3.2.tgz<br>rm -rf spark-3.0.1-bin-hadoop3.2.tgz</pre><h4><strong>Update your BashRC or ZshRC with Spark Home</strong></h4><p>You will need to have a simple place to reference where you’ve installed Spark. It is standard to just use the <strong>SPARK_HOME</strong>. I am using zsh, if you are using bash then vim ~/.bashrc but you can follow the same commands.</p><p>Open and Edit your ZshRC</p><pre>vim ~/.zshrc</pre><p>And add the following</p><pre>export SPARK_HOME=/Users/`whoami`/install/spark-3.0.1-bin-hadoop3.2</pre><pre>alias spark-shell=”$SPARK_HOME/bin/spark-shell”</pre><p>Now ensure these settings stick with the current open Terminal window. Any new window you open will already inherit from your *rc.</p><pre>source ~/.zshrc</pre><p>At this point you have everything installed and ready.</p><h4><strong>Where we are in the process?</strong></h4><p>At this point in the tutorial we’ve done a lot of downloading, installing, configuring but we’ve also gotten our local environment setup and ready for Spark application development.</p><ol><li>We’ve configured the docker settings to be 4 core by 10gb RAM.</li><li>We’ve Enabled Kubernetes so we can use the simple single node cluster.</li><li>We’ve Installed the Kubernetes Desktop package. This will let us see our application as it is running.</li><li>We’ve downloaded and installed Spark locally on our machine</li></ol><p>Now we are ready to get started running Spark on Kubernetes.</p><h3><strong>Setting up Spark to Run on K8s</strong></h3><p>We want to be able to build and run local Docker images inside of our Kubernetes cluster. This is essential to running Spark on Kubernetes, and also useful if you want to create a base image for Spark to reuse within your organization. It is a critical skill to be able to reuse base images, and efficiently extend these basic shared images — like for example PySpark with custom runtime installation.</p><h4>Building the Base Images</h4><p>First let’s locate the Official Spark Dockerfile. You should be able to find it in the directory of your local Spark installation.</p><pre>ls -l $SPARK_HOME/kubernetes/dockerfiles/spark</pre><p>Here is what the base Dockerfile looks like.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*q6J4qUJdadw2pQnU5bw_TQ.png" /><figcaption>The Spark Dockerfile. Via <a href="https://github.com/apache/spark/tree/branch-3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark">Spark on GitHub</a></figcaption></figure><h4><strong>Modifications</strong></h4><p>I wanted to build Spark with Java 14. Spark 3.x finally enables you to move away from Java 8, so you can take advantage of using modern Java now. I took a look on <em>docker hub</em> and found the <a href="https://hub.docker.com/_/openjdk">OpenJDK</a> resources. I ended up using <strong>14-slim </strong>vs the standard <strong>8-jre-slim </strong>from the default Dockerfile.</p><p><strong>Note</strong>: Spark runs as <em>spark_uid</em>=185 by default. If you are building an image you may want to use your own unprivileged user.</p><h4>Containers on a Diet</h4><p>OpenJdk 14-slim is built off of Alpine linux distribution which is very minimal build. Slim cause it is on a diet. Most of the default command-line tools and packages are left out to reduce file size. However it is worth pointing out that in the case of Docker and Kubernetes, the smaller the container the faster things will come online due to the lighter bandwidth overhead. This also means that images in cache don’t bloat the cache, or cause other images to be evicted from cache.</p><p>Using this tactic you can also create minimal Spark applications by only shipping the jars that you need that are not also provided by the Spark Runtime. It is worth taking the time to inspect the <strong>jars </strong>that are packaged with Spark.</p><pre>ls -l $SPARK_HOME/jars/</pre><p>and using <strong>provided scope</strong> in your maven or sbt.</p><h4><strong>Building and Pushing Custom Spark Images</strong></h4><p>If you want to download the Images I built then feel free. I have them uploaded to Docker Hub.</p><figure><img alt="Image shows a reference to the DockerHub location of the newfrontdocker/spark Docker container" src="https://cdn-images-1.medium.com/max/1024/1*_EOuGJZNbak5ky3meN5yjQ.png" /><figcaption><strong>Spark</strong>: Java 14 slim via <a href="https://hub.docker.com/repository/docker/newfrontdocker/spark">DockerHub</a></figcaption></figure><figure><img alt="This image shows the DockerHub location of the newfrontdocker/spark-py Docker container" src="https://cdn-images-1.medium.com/max/1024/1*OXEu9lk1t2RlJVuuZP5Wcg.png" /><figcaption><strong>PySpark</strong>: Additional Python Dependencies vis <a href="https://hub.docker.com/repository/docker/newfrontdocker/spark-py">DockerHub</a></figcaption></figure><p>Otherwise. Let’s go through the process of creating the Spark Image.</p><h4><strong>Create the Docker Images</strong></h4><p>At the time of writing I build Spark 3.0.1 on the Slim Java 14 open-jdk. I also opted into Python3 support. The following command uses the Spark <strong>docker-image-tool.sh</strong> script. If you want to try this out for yourself, then just replace <strong>newfrontdocker</strong> with your dockerhub username.</p><pre>$SPARK_HOME/bin/docker-image-tool.sh -r newfrontdocker -t v3.0.1-j14 -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile -b java_image_tag=14-slim build</pre><p>This will run for a little while. If things worked out correctly you should see the following.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/788/1*i747jXG8ggl8Wu0AuZll8A.png" /><figcaption><strong>Command-Line</strong>: Successful Output for the Java/Scala Spark Docker</figcaption></figure><figure><img alt="" src="https://cdn-images-1.medium.com/max/962/1*Md6M7HqqiRK5x2w_HUf3vw.png" /><figcaption><strong>Command-Line</strong>:<strong> </strong>Successful Output for PySpark Build. Notice it skips the java image tag — this isn’t an error!</figcaption></figure><p>You should now have two new images located in your local docker image cache. You can check them with the following docker command.</p><pre>docker images </pre><figure><img alt="shows the output of running docker images command. Listing two images built" src="https://cdn-images-1.medium.com/max/1024/1*dObQR5L-3B_il3Yerh_irg.png" /></figure><p>Having the images locally is good for testing that things work the way you expect. The end game however is usually to push the build up to your Docker Repository. We will do that now.</p><h4><strong>Pushing your Spark Image to DockerHub</strong></h4><p>Given that all we did was build the spark &amp; pyspark images — we still need to push to DockerHub.</p><p><strong>Tip</strong>: Given that your Kubernetes cluster can be running locally or remotely it makes sense to push the final Spark build when you are ready to start working on things outside of your local machine.</p><p>If you want to remove the images you created, perhaps you didn’t like the potential repository name. Then you can use the following command to do just that.</p><pre>docker image rm {containerId}</pre><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*74soyz6TpbZNlc96ULdFIg.png" /><figcaption><strong>Example: </strong>removing an image I didn’t like.</figcaption></figure><h4>Pushing the Images</h4><p>If you are happy with things feel free to ship the build to your container registry.</p><p>The first step is to make sure you are logged into docker. Here are the two ways to do it.</p><h4><strong>Use the credentials from Docker Desktop</strong></h4><pre>docker login docker.io</pre><h4><strong>Fresh Login</strong></h4><pre>docker login <a href="http://docker.io">docker.io</a> — username newfrontdocker</pre><p>Either should end up with</p><pre>&gt; Login Succeeded</pre><h4><strong>Push the Build</strong></h4><p><strong>Spark Python Docker Image</strong>:</p><pre>docker push newfrontdocker/spark-py:v3.0.1-j14</pre><p><strong>Spark Docker Image</strong>:</p><pre>docker push newfrontdocker/spark:v3.0.1-j14</pre><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*69D86eGfmbIQLiIXrF9bIQ.png" /><figcaption><strong>Command-Line</strong>: Pushing to Docker</figcaption></figure><p>Well at this point in the tutorial we are just at the main mission. In the next section we will finally be running a Spark application on Kubernetes. If you have stuck through up until this point thanks.</p><h3><strong>Spark on Kubernetes: Hello World</strong></h3><p>We are now preparing to run our first Spark Job on Kubernetes. This is a big deal. This is why we are here!</p><ol><li>We need to find the cluster master.</li></ol><pre>kubectl cluster-info</pre><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*CzMMIG6SilozFa_b8HadbQ.png" /><figcaption><strong>Remember This</strong>: Now we are actually using the values from the Cluster Info</figcaption></figure><p>What we care about in our case is the <strong>Kubernetes Master</strong> since we need to reference that in our <em>Spark Submit</em>.</p><pre><em>$SPARK_HOME/bin/spark-submit \<br>  --master k8s://https://kubernetes.docker.internal:6443 \<br>  --deploy-mode cluster \<br>  --name spark-pi \<br>  --class org.apache.spark.examples.SparkPi \<br>  --conf spark.executor.instances=5 \<br>  --conf spark.kubernetes.container.image=newfrontdocker/spark:v3.0.1-j14 \<br>  local:///opt/spark/examples/jars/spark-examples_2.12–3.0.1.jar</em></pre><blockquote>It is worth pointing out that the <strong>local:///opt/</strong> is actually referencing the JAR Path inside of the Docker Container. This can be useful when you want to access local configuration files, spark-environment variables, logging profiles, and so much more from within your deployment.</blockquote><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Tx9fK03HIwsfgCoRonqzPA.png" /><figcaption><strong>Spark Submit Output</strong>: You should see something similar when running your Spark application</figcaption></figure><h4><strong>Spark in a Pod</strong></h4><p>If you open up the <strong><em>Kubernetes Dashboard</em></strong> while your Spark Application is running you will see a similar view. It is worth noting that since we configured spark.executor.instances=5 in our Spark Submit that you will see 5 pods running for your application. This is equivalent to having 5 executor instances in traditional Spark Standalone world.</p><figure><img alt="Kubernetes Dashboard: View of the Spark Executor Pods Running — there is a circular pie chart above a tabular list of pods" src="https://cdn-images-1.medium.com/max/1024/1*5F1Bd319NCtvvUFPLDaO3w.png" /><figcaption><strong>Kubernetes Dashboard</strong>: View of the Spark Executor Pods Running</figcaption></figure><h3>That’s a Wrap</h3><p>We really covered a lot of ground during this first tutorial. I really enjoy this part of the learning cycle. It is when the creative juices start flowing and new ideas seems to come alive, it is one of the brains ways of rewarding the learning process and I hope you were able to stick around up until this point. Over the course of this tutorial we covered a lot of ground fairly quickly. So here is a rewind.</p><h4><strong>Docker Desktop</strong></h4><p>We installed and got Docker Desktop up and Running locally on our computers. High Fives all around. This included provisioning some space on our laptops in order to power the Docker environment. I went with a 4 core by 10gb ram slice off the old laptop. If you have more cores and more ram, Kubernetes will gladly take that from you to power things to run quicker with more parallelism.</p><h4><strong>Local Kubernetes</strong></h4><p>We took things further with the click of a checkbox and enabled Kubernetes (single-node) cluster support in our local Docker. This was the start of our kubernetes journey together.</p><h4><strong>Kubernetes Dashboard</strong></h4><p>We fetched and applied the configuration and information for running the Kubernetes Dashboard so we could get some Eyes on the cluster. This is a much better view of the cluster than just having the <strong>Running</strong> green light on the Docker Desktop Dashboard.</p><ul><li>We explored some of the `kubectl` commands to find our tokens and actually login into the dashboard</li><li>We saw how to get some really important cluster information by using the `kubectl cluster-info` command</li></ul><h4><strong>Installing Apache Spark</strong></h4><p>We installed Apache Spark locally in order to gain access to the core Dockerfile that ships alongside the project. This is an important part of the process since when we create a new Docker image we need to be able to ship the Spark source locally as well.</p><p>If we were just looking to do some basic things and get to know Spark with the spark-shell then we could have used that out of the box to just play around with the Spark Runtime. $SPARK_HOME/bin/spark-shell and $SPARK_HOME/bin/pyspark respectively give you access to the local shell.</p><h4><strong>Building the Spark Docker Image</strong></h4><p>We were able to build a local version of Spark bringing our own JDK to the table openjdk:14-slim — cause diets are nice when shipping containers. This allowed us to <strong>choose</strong> how we want to build these images without bringing in the collective kitchen sink. The ability to test locally, list local images, remove images selectively and lastly pushing an image to a public repository is all in your toolbox now.</p><h4><strong>Running Spark Locally on Kubernetes</strong></h4><p>We finished up this whirlwind deep dive by running one of the example Spark applications. This was done as a <strong>HelloWorld</strong> for testing the cluster and now we can use the work completed here in order to move on to bigger and better things.</p><h4>Next Steps</h4><p>In my next tutorial I am going to show you how to use these two base Spark Images in order to simplify Running Spark Scala applications as well as PySpark applications that require custom python installations. We will look more into how Kubernetes configurations work in order to <strong><em>enable service and role based access</em></strong> to the processes running within our <em>driver</em> and <em>executor</em> pods.</p><p>Thanks for taking the time to dive into Spark on Kubernetes.</p><h4><strong>Notes and Stumbling Blocks</strong></h4><p>As an aside. Going off of the Spark on Kubernetes Docs provided the answers for getting started. The hard part was actually getting the Images to build and upload to DockerHub. I am embarrassed to say I mashed my keyboard for over 2 hours trying to understand the -r &lt;repo&gt; was meant to be just my username — eg. newfrontdocker. Shame on me for overthinking things.</p><p>DockerHub’s registry actually helped me debug the issue since I couldn’t create folders within a Repository. Ahh. Looking at the docker-image-tool.sh from the Spark Kubernetes directory of the source was helpful to see how the artifacts were being composited. That helped me figure out that I should create two repositories spark and spark-py.</p><h4><strong>Links and Thanks</strong></h4><p>There were a ton of resources available online that provided help as I was journeying through this process as well. I have broken down the links by content.</p><h4>Docker</h4><p><a href="https://docs.docker.com/docker-hub/">https://docs.docker.com/docker-hub/</a></p><p><a href="https://hub.docker.com/_/openjdk?tab=tags&amp;page=1&amp;name=14-slim">https://hub.docker.com/_/openjdk?tab=tags&amp;page=1&amp;name=14-slim</a></p><p><a href="https://docs.docker.com/engine/reference/commandline/login/">https://docs.docker.com/engine/reference/commandline/login/</a></p><p><a href="https://www.docker.com/blog/docker-hub-new-personal-access-tokens/">https://www.docker.com/blog/docker-hub-new-personal-access-tokens/</a></p><p><a href="https://docs.docker.com/engine/reference/commandline/push/">https://docs.docker.com/engine/reference/commandline/push/</a></p><p><a href="https://docs.docker.com/docker-hub/repos/">https://docs.docker.com/docker-hub/repos/</a></p><p><a href="https://stackoverflow.com/questions/54410909/what-is-the-correct-upload-url-for-a-dockerhub-repo">https://stackoverflow.com/questions/54410909/what-is-the-correct-upload-url-for-a-dockerhub-repo</a></p><h4>Spark on K8s</h4><p><a href="https://spark.apache.org/docs/latest/running-on-kubernetes.html">https://spark.apache.org/docs/latest/running-on-kubernetes.html</a></p><p><a href="https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template">https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template</a></p><p><a href="https://www.slideshare.net/databricks/running-apache-spark-jobs-using-kubernetes">https://www.slideshare.net/databricks/running-apache-spark-jobs-using-kubernetes</a></p><p><a href="https://github.com/marcelonyc/igz_sparkk8s">https://github.com/marcelonyc/igz_sparkk8s</a></p><p><a href="https://collabnix.com/kubernetes-dashboard-on-docker-desktop-for-windows-2-0-0-3-in-2-minutes/">https://collabnix.com/kubernetes-dashboard-on-docker-desktop-for-windows-2-0-0-3-in-2-minutes/</a></p><p><a href="https://www.oreilly.com/content/how-to-run-a-custom-version-of-spark-on-hosted-kubernetes/">https://www.oreilly.com/content/how-to-run-a-custom-version-of-spark-on-hosted-kubernetes/</a></p><h4><strong>Docker Desktop Alternative</strong></h4><p>Lastly. If you just want to run <em>Kubernetes</em> then you can also use <a href="https://kubernetes.io/docs/setup/learning-environment/minikube/"><em>MiniKube</em></a>.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=3d822969f85b" width="1" height="1" alt=""><hr><p><a href="https://levelup.gitconnected.com/spark-on-kubernetes-3d822969f85b">Spark on Kubernetes</a> was originally published in <a href="https://levelup.gitconnected.com">Level Up Coding</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
    </channel>
</rss>