<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Max Krog on Medium]]></title>
        <description><![CDATA[Stories by Max Krog on Medium]]></description>
        <link>https://medium.com/@maxkrog?source=rss-88c88a14741------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/1*yRMOan2y3VSMx7W72LVbGw.png</url>
            <title>Stories by Max Krog on Medium</title>
            <link>https://medium.com/@maxkrog?source=rss-88c88a14741------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Wed, 03 Jun 2026 13:54:47 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@maxkrog/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[GCP Serverless Design Pattern: Adhering to rate & concurrency limits with Cloud Tasks]]></title>
            <link>https://medium.com/data-science/gcp-serverless-design-pattern-adhering-to-rate-concurrency-limits-with-cloud-tasks-30aa756da763?source=rss-88c88a14741------2</link>
            <guid isPermaLink="false">https://medium.com/p/30aa756da763</guid>
            <category><![CDATA[serverless-architecture]]></category>
            <category><![CDATA[cloud-tasks]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[cloud-functions]]></category>
            <category><![CDATA[google-ads]]></category>
            <dc:creator><![CDATA[Max Krog]]></dc:creator>
            <pubDate>Fri, 11 Sep 2020 13:40:41 GMT</pubDate>
            <atom:updated>2021-02-08T14:41:45.579Z</atom:updated>
            <content:encoded><![CDATA[<p>Even though I consider myself knowledgeable about the multiple GCP products related to data engineering, I had not heard of a use case for Cloud Tasks before.</p><p>This post aims to shed some light on the use case for Cloud Tasks by bringing a specific problem to the table and discussing it from a PubSub vs Cloud Tasks perspective.</p><h3>The Challenge</h3><p>As part of a customer-data-segmentation project I encountered the challenge of sending user data to the <a href="https://developers.google.com/adwords/api/docs/reference/v201809/AdwordsUserListService?hl=de">Google Ads Remarketing Audience API</a>, which has the following restrictions:</p><ol><li>Every request can only contain around 50.000 user records. Performance grows exponentially slower with more records per request.</li><li>Only one (1) request can be processed per Google Ads Account at a time. Submitting another request simultaneously causes all other ongoing requests to error out.</li><li>Every request takes between one to five minutes to process.</li></ol><p><strong>Some context on the project:</strong></p><p>The data that needed to be sent to the API would be arriving in Cloud Storage from BigQuery in the form of a CSV file. Every expected CSV file would contain between 50.000 and 4.000.000 records.</p><p>The orchestration engine (Cloud Composer) was only responsible for running the business logic in BigQuery and saving the result to Cloud Storage. Cloud composer does not handle the outgoing data pipeline as the outgoing data pipeline should be reactive &amp; serverless.</p><p>The customer base that was to be segmented contained around 4 million customers and each customer would belong to at least one segment. The orchestration engine would run one business logic query — per segment to be pushed to Google Ads - in BigQuery, resulting in 7 different files arriving in Cloud Storage during the span of a few minutes.</p><h3>Partial Solution</h3><p>When a new file lands in Cloud Storage, an event can be submitted to PubSub or fed immediately to a Cloud Function. More information about this <a href="https://cloud.google.com/storage/docs/pubsub-notifications">here</a>.</p><p>By setting up a Cloud Function to be triggered from the Cloud Storage Bucket we can solve api restriction 1 (max 50k records per request), by splitting the file into smaller parts.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Ua85DWAQLABp1CdeUdtXBg.png" /></figure><p>But what now? Simply looping over the original file and pushing the partial chunks of records into Google Ads is a very fragile way to handle things. Just consider the following:</p><h4><strong>How long do we expect the function to run?</strong></h4><p>4.000.000/50.000=80 parts. Let’s say each part of 50.000 records takes 5 minutes to transfer. We’re talking almost 7 hours of continuous runtime. <a href="https://cloud.google.com/functions/docs/concepts/exec#timeout">This is way further than what Cloud Functions support.</a></p><h4><strong>What if we get another file landing in GCS when we’re already transferring one?</strong></h4><p>As stated in the previous section, our orchestration engine will produce one file per segment. That means that when we’ve started pushing the chunks of the first segment to Google Ads, another will appear and trigger Cloud Function execution in parallel. As per point two (2) of the API restrictions this would cause both requests to fail. Without some way of ensuring that only one concurrent dispatch can happen we would have to spread out the time between the segments arriving in Cloud Storage by quite a bit.</p><h3>The problem with Cloud PubSub (for this challenge)</h3><p>My initial approach was to follow the architecture outlined in this solution architected by Google: <a href="https://cloud.google.com/solutions/serverless-integration-solution-for-google-marketing-platform"><em>A serverless integration solution for GMP</em></a><em>. </em>This solution makes use of a combination of Initiator and Transport functions, as well as 3 PubSub Topics. To understand their proposed architecture, head to the <a href="https://cloud.google.com/solutions/serverless-integration-solution-for-google-marketing-platform#architectural_overview"><em>Architectural Overview</em></a> section of the solution.</p><p>I believe that the architecture used in that solution can be slimmed down a bit and explained like this:</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*BefR7EOXF-FMbb7KXSks3A.png" /></figure><ol><li>New files lands in cloud storage and triggers the cloud function<em> Initiator</em></li><li><em>Initiator</em> splits up the file into several smaller bits and publishes them to the PubSub topic <em>Operation Log</em>.</li><li>After finishing (2),<em> Initiator</em> sends a empty message to the <em>Operation Trigger</em> PubSub Topic</li><li><em>Operation Trigger</em> pushes the empty message via a subscription to <em>Operation Executer</em></li><li><em>Operation Executer</em> executes and starts with pulling a message from the <em>Operation Log. </em><strong>If no message remains in <em>Operation Log</em>, nothing more is done. The cycle ends here.</strong></li><li>If a message was retrieved from <em>Operation Log</em>, <em>Operation Executer </em>tries to push it to Google Ads.</li><li>If the previous step (6) was successful, <em>Operation Executer </em>acknowledges the previously pulled message from <em>Operation Log</em> and publishes an empty message<em> </em>to <em>Operation Trigger</em>.<strong> Continue from 4.</strong></li></ol><h4>Problems with this approach</h4><p>I believe there are several problems with this approach:</p><ul><li>If <em>operation executer </em>errors or fails, the whole cycle could potentially be broken. If the error is because of a Google Ads API-side error, you need to catch this in your code and send an empty message to <em>Operation Trigger </em>to continue the cycle. This is not inline with the Fail Fast philosophy (more on that below) and is more demanding to develop.</li><li>If the error above happens when there’s only one (1) message left in the operation log, that message is not available to be pulled from the <em>Operation Log </em>until the acknowledgement deadline has passed. The subsequent execution of <em>Operation Executer</em> would think there’s no more messages left and end the cycle.</li><li>To adhere to restriction 2 (concurrent requests) of the Google Ads API, there can only be one message flowing between <em>Operation Trigger</em> and <em>Operation Executer</em>. This would be violated when the second segments-file arrives in Cloud Storage.</li><li>I believe the architecture is complicated to debug and understand what’s going on.</li></ul><p>This solution would work better if we didn’t have the restrictions of the output API (Google Ads) that we do.</p><p>As this approach was not viable without some hard thinking, I decided to look for alternative approaches.</p><h3>Introducing Cloud Tasks</h3><p>Cloud Tasks is a distributed task queue. You define one or many queues to which you can send tasks. Queues are what they sound like. Tasks are things to be done, usually defined as ‘run this HTTP-request and wait until you get a 200/OK-response code back’, if not, try again in X amount of time.</p><p><strong>On a queue level you have the following (and more) </strong><a href="https://cloud.google.com/tasks/docs/configuring-queues#routing"><strong>settings</strong></a><strong>:</strong></p><ul><li><em>Max dispatches per second:</em> How quickly can this queue process new tasks?</li><li><em>Max concurrent dispatches:</em> How many tasks can be running/executing at the same time?</li><li><em>Max attempts:</em> How many attempts can be made on a task before it’s put in a “failed” state?</li></ul><p><strong>On a task level you have the following (and more) </strong><a href="https://cloud.google.com/tasks/docs/creating-http-target-tasks"><strong>settings</strong></a><strong>:</strong></p><ul><li>Task type: We’ll only be discussing the HTTP Target type in this post.</li><li><em>HTTP Task httpMethod</em>: The HTTP Method (GET/POST)</li><li><em>HTTP Task Request body</em>: The HTTP request body. Max 100kb.</li></ul><p>Please note that Cloud Tasks is not a message queue (like PubSub), it’s only interested task definition and supports a max 100kb request body to describe where to find any eventual data.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*itDLLIb9xPfFFad1A3rSqg.jpeg" /><figcaption>Real life queue. Image credit: Alexander Popov, Unsplash, <a href="https://unsplash.com/photos/Xbh_OGLRfUM">https://unsplash.com/photos/Xbh_OGLRfUM</a></figcaption></figure><h3>Full Solution with Cloud Tasks</h3><p>To integrate Cloud Tasks with our partial solution, we can break down the big transfer segment files arriving in Cloud Storage into bits and create a task for each bit. We’ll send all of these tasks to a Task Queue that we have configured with <em>Max concurrent dispatches</em> set to one (1), to avoid overrunning the Google Ads API.</p><p>As Cloud Tasks does not execute the task (they only call an HTTP-endpoint and wait for a 200/OK code back), we put the actual Google Ads API call inside a Cloud Function.</p><h4>Architectural Overview</h4><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*ksF0toIqJQ5RDPb6miE_lg.png" /><figcaption>Architecture diagram utilising Cloud Tasks to break up one large request into multiple smaller requests.</figcaption></figure><ol><li>BigQuery writes resulting output file to GCS.</li><li>Upon finalising writing to GCS, the Cloud Function <em>Task Creator</em> is triggered from GCS.</li><li><em>Task Creator</em><strong><em> </em></strong>reads in the file and splits it into several smaller parts, saving each part to another GCS bucket and creating a task for each part in a the <em>Segments queue</em> in Cloud Tasks. <em>Task Creator</em> gets the target API and other attributes from the filename of the triggering file in GCS, it sends this along with the path to the partial file in the Task Body.</li><li>The tasks in <em>Segments queue </em>are processed one by one, each invoking the Cloud Function <em>Task Handler</em>.</li><li>For every invocation of <em>Task Handler</em><strong><em> </em></strong>it decodes the Task Body and retrives the path to the partial file in GCS as well as the API Configuration. It then pushes the partial file to Google Ads and returns a 200/OK response code to Cloud Tasks when finished.</li></ol><h4>What does this architecture give us?</h4><p>Cloud Tasks acts as a buffer between our input and output, ensuring the output adheres to the rate limits of the API.</p><p>It also helps us in our development speed by supplying a layer for retries, enabling us to write integration code that <a href="https://en.wikipedia.org/wiki/Fail-fast">fails fast.</a> Should the <em>Task Handler</em> function fail, just let it fail and default to sending an http-error-code back to Cloud Tasks. Cloud tasks will retry the task again in due time. As long as the error was on Google Ads side, the task will eventually execute successfully.</p><h4>Scaling out</h4><p>It’s also easy to scale out, to support more accounts (let’s say for an advertiser active in several countries) in Google Ads we only need to create additional queues in Cloud Tasks. We add a bit of code to the <em>Task Creator </em>so it can choose a Task Queue Dynamically, as well as some code to <em>Task Handler </em>so it can get the API credentials/configs dynamically. Other than that, exactly the same functions can be used.</p><h3>Final words</h3><p>This use case was clearly not intended to be solved by PubSub, and I’m happy that it wasn’t, since it taught me a lot about the limitations with PubSub.</p><p>Cloud Tasks are clearly aimed at this domain of problems and it’s a really easy service to pick up. I strongly encourage everyone to play around with it, since it might just be one of the best (serverless) services for handling with poor performance APIs.</p><p>I’m planning to play a bit with combining Cloud Tasks and PubSub for this integration challenge. I have an idea for how they could work together.</p><p>Feel free to reach or leave a comment if you want to discuss this further or disagree with any of the points I’ve made. I’m very much here to learn 👨‍💻🤓</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=30aa756da763" width="1" height="1" alt=""><hr><p><a href="https://medium.com/data-science/gcp-serverless-design-pattern-adhering-to-rate-concurrency-limits-with-cloud-tasks-30aa756da763">GCP Serverless Design Pattern: Adhering to rate &amp; concurrency limits with Cloud Tasks</a> was originally published in <a href="https://medium.com/data-science">TDS Archive</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[My local environment setup for Data Engineering on GCP]]></title>
            <link>https://maxkrog.medium.com/my-local-environment-setup-for-data-engineering-on-gcp-5611e78aa28?source=rss-88c88a14741------2</link>
            <guid isPermaLink="false">https://medium.com/p/5611e78aa28</guid>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[gcp]]></category>
            <category><![CDATA[google-cloud-platform]]></category>
            <category><![CDATA[cloud-client-libraries]]></category>
            <category><![CDATA[personal]]></category>
            <dc:creator><![CDATA[Max Krog]]></dc:creator>
            <pubDate>Fri, 04 Sep 2020 15:42:53 GMT</pubDate>
            <atom:updated>2021-08-17T09:06:11.504Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*62FzviaGnSxVVydEIydYtw.png" /></figure><h3>My personal reference sheet for analytics engineering on GCP</h3><p>This guide is intended to be a handy reference for myself when i’m looking for a specific link or command, or setting up a new workstation. I’m expecting this guide to grow with more commands over time.</p><p>I’ll be covering the following areas in this guide:</p><ul><li>Homebrew</li><li>Pyenv</li><li>Virtual env</li><li>Google Cloud Python Client</li><li>Google Auth Library</li><li>Authentication from environment variables</li><li>Handy ~/.bashrc or ~/.zshrc lines</li></ul><h3>Homebrew, Pyenv &amp; Virtual env</h3><p>This holy trinity should be taught in all tutorials. If you want control and understanding of your local environment, the combination of homebrew, pyenv &amp; venv is the only way to go.</p><h4>Homebrew</h4><p>Homebrew is the de-facto package manager for mac/linux. Find more information about how to install it <a href="https://brew.sh/">here</a>.</p><p>Commands to know:</p><pre>brew doctor              #Performs a health-check on your install<br>brew install *name*      #For installing cli-based applications<br>brew cask install *name* #For installing gui-based applications</pre><h4>Pyenv</h4><p>Forgot about ‘installing python’. Install pyenv (with homebrew) and use it to install and select python versions. Pyenv basically intercepts the command ‘python’ in your terminal and makes it point to the specific python version you want. More information on pyenv can be found <a href="https://github.com/pyenv/pyenv">here</a>.</p><pre>brew install pyenv</pre><p>To make sure every terminal session has pyenv initiated you need to put an init-script in your .bash_profile or .zshrc. Information on how to do this can be found after point at point 3 of <strong>‘Basic GitHub Checkout’ </strong><a href="https://github.com/pyenv/pyenv#basic-github-checkout">here</a>.</p><p>When you’ve got pyenv working you can install your preferred python version like so:</p><pre>pyenv install 3.7.8   #Installs python 3.7.8 </pre><pre>pyenv global 3.7.8    #Sets python 3.7.8 to be your global version</pre><p>Type ‘python’ in your terminal and watch the magic of pyenv.</p><pre>python --version<br>&gt; Python 3.7.8</pre><h4>Venv</h4><p>Packages in Python are by default installed to a global packages folder. If you want to ensure your code performs the same in the cloud as on your local computer, this is not ideal.</p><p>Venv solves this problem by creating <em>virtual environments</em> that are project specific. Packages can be installed to this virtual environment instead of the global scope. By utilizing a requirements.txt file to keep track of packages you want installed for a specific project you can ensure consistency between the cloud and your local development environment.</p><p>The venv-module is bundled with python since 3.5.</p><p>To create a venv-config folder run the following in your terminal:</p><pre>python -m venv venv-config      #Creates the venv-config folder<br>source venv-config/bin/activate #Takes you inside the virtual env</pre><p>You are now inside the virtual environment. Feel free to install any packages you want. Preferably these are listed in a requirements.txt file and can be installed with this command:</p><pre>pip install -r requirements.txt</pre><p>When you want to leave the virtual environment, type:</p><pre>deactivate</pre><h3>GCP client libraries</h3><p>To be able to access googles services you need a client library that be ‘imported’ from and used as a module in your Python code. This comes in two forms (one for all discovery-based APIs and one for interacting with services on GCP). There’s a slight overlap from the first one to the second one, my advise is to use the second one when available</p><h4><a href="https://github.com/googleapis/google-cloud-python">Google Cloud Python Client</a></h4><p>Supports all GCP services. Please note that this is just a container repo, all specific clients have their own specific libraries. For example the Cloud Storage API Client can be found <a href="https://github.com/googleapis/python-storage">here</a></p><p>All client libraries can be pip installed (or put in requirements.txt) on the format:</p><pre>google-cloud-*service*</pre><p>And imported in your code (main.py) like so:</p><pre>from google.cloud import *service*</pre><p>Please note that there’s also a <a href="https://github.com/googleapis/google-api-python-client">Google API Python Client</a>. This library is intended to be used for the discovery-based APIs, that is to say Googles products outside of GCP. For example the Google Analytics API. I found this rather confusing at first.</p><h4><a href="https://github.com/googleapis/google-auth-library-python">Google Auth Library</a></h4><p>This library contains the authorization-layer depended upon by all the google-cloud-*service* libraries. For clarity i like putting this in the requirements.txt when i’m specifically using it:</p><pre>google-auth</pre><p>And this is how you go about creating a client with specific credentials:</p><pre>from google.oauth2 import service_account<br>from google.cloud import storage</pre><pre>credentials = service_account.Credentials.from_service_account_file(<br>    &#39;path_to_service_account_key.json&#39;,<br>    scopes=[&#39;https://www.googleapis.com/auth/devstorage.read_only&#39;]<br>)</pre><pre>storage_client = storage.Client(credentials=credentials)</pre><p>A full list of available scopes can be found <a href="https://developers.google.com/identity/protocols/oauth2/scopes">here.</a></p><h3>Authentication from environment variables~</h3><h4>~/.bashrc or ~/.zshrc</h4><p>Depending on your shell driver, one of these files is executed when you start a new shell session. By placing initializiation calls and the definition of handy environment variables here you…</p><h4><em>GOOGLE_APPLICATION_CREDENTIALS</em></h4><p>When deploying functions or apps to GCP, GCP automatically injects a service account to be used by all clients created in the code that It does this by setting the environment variable <em>GOOGLE_APPLICATION_CREDENTIALS</em> to point to a path with the service account json key.</p><p>In order to get the same feature to work when developing and testing code locally you need to set up the environment variable <em>GOOGLE_APPLICATION_CREDENTIALS</em> to have an absolute path to the service account json key that you want to use for local development.</p><pre>export GOOGLE_APPLICATION_CREDENTIALS= &quot;/path_to_key.json&quot;</pre><p>With this you can initialize clients in this way:</p><pre>from google.cloud import storage</pre><pre>#Client with credentials from environment variable.<br>storage_client = storage.Client()</pre><h4>Handy ~/.bashrc or ~/.zshrc lines</h4><p>Instead of memorizing venv-related commands, give them easy to remember aliases:</p><pre>alias venv=&quot;python -m venv venv&quot;<br>alias venva=&quot;source venv/bin/activate&quot;<br>alias pipi=&quot;pip install -r requirements.txt&quot;</pre><p>pyenv:</p><pre>export PYENV_ROOT=&quot;$HOME/.pyenv&quot;<br>export PATH=&quot;$PYENV_ROOT/bin:$PATH&quot;</pre><pre>eval &quot;$(pyenv init -)&quot;</pre><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=5611e78aa28" width="1" height="1" alt="">]]></content:encoded>
        </item>
    </channel>
</rss>