Skip to content

Integrations and tools for DPL

Parse.ly’s Data Pipeline uses two core AWS services, S3 and Kinesis, as described in Getting Access.

The following code examples demonstrate how to access this data using common open source programming tools.

Quick start with awscli and S3

AWS maintains a command-line client called awscli with a fully featured S3 command-line interface. AWS provides full documentation about this client. With a configured Python interpreter and the pip installer tool, install the client:

$ pip install awscli

Get help about using the client for S3 access:

$ aws s3 help

After receiving access to Parse.ly’s Data Pipeline, Parse.ly Support sends AWS credentials for the S3 bucket through a self-expiring email. The credentials appear as follows:

The first line is the bucket name, which always starts with the prefix parsely-dw. The next line is the Access Key ID, which is always shorter. The final line is the Secret Access Key.

parsely-dw-mashable
AKIAIOSFODNN7EXAMPLE
wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

Configure these credentials with the CLI:

$ aws configure --profile=parsely
AWS Access Key ID [****************CORQ]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [****************TYt+]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Default region name [us-east-1]: us-east-1
Default output format [None]: json

The json output format is recommended since that is commonly expected by other tools.

This writes a $HOME/.aws/credentials file containing these credentials under the parsely header.

Once configured, use the following command:

$ aws --profile=parsely s3 ls s3://parsely-dw-XXXXXXXX

Replace XXXXXXXX with the site or network bucket name. The following examples use mashable.com so the bucket name is parsely-dw-mashable.

A freshly configured S3 bucket typically has a single prefix named events/, producing output like this:

$ aws --profile=parsely s3 ls s3://parsely-dw-mashable
        PRE events/

Download files from S3:

$ aws --profile=parsely s3 sync s3://parsely-dw-mashable/events/2016/06/ ./06

This downloads all data from the month of June 2016 to a single folder, ./06, on the local machine.

The page on S3 access describes caveats such as file generation frequency, typical file sizes, and file naming conventions.

On transferring targe amounts of data

S3 allows storage and transfer of terabytes of data with ease. But there are good and bad ways of working with S3. For example, AWS users benefit greatly from downloading S3 data to an EC2 instance in the us-east-1 region. This is because data transfer within Amazon’s network can happen at gigabit speeds. Further, although awscli is convenient, it isn’t particularly fast. Tools like Spark speed up S3 loading via parallelization.

Code integration overview

The open source ecosystem around data analytics can be overwhelming, with a “paradox of choice”. This section summarizes the most popular and production-ready tools and how they satisfy various use cases.

Summary of preferred tool options

ToolAccess mechanismLanguagesSimple vs AdvancedFault tolerant?Data latency
aws s3S3ShellSimpleNo15m-60m
aws kinesisKinesisShellSimpleNo0s-5s
boto3.s3S3PythonSimpleNo15m-60m
Pandas or RS3Python, RSimpleNo15m-60m
boto3.kinesisKinesisPythonAdvancedNo0s-5s
Spark (Bulk)S3Python, ScalaAdvancedYes15m-60m
Spark (Stream)KinesisPython, ScalaAdvancedYes0s-5s
StormKinesisPython, JavaAdvancedYes0s-5s
RedshiftS3 => COPY commandSQLAdvancedYes30m-60m
BigQuery (Bulk)S3 => Google Storage XferSQLAdvancedYes30m-60m
BigQuery (Stream)Kinesis => Insert APISQLAdvancedYes5s-10s

Guide to common use cases

Synchronize data to nodes or an S3 bucket. Use aws s3 from the command line. Use boto3 with an S3 bucket from Python. Other languages have libraries similar to boto3.

Download data locally for in-memory analysis using Pandas, Spark, R, or similar tools. Use aws s3 from the command line. Pandas and Spark have built-in support for S3 URIs (e.g. s3://parsely-dw-mashable) via their file loaders. R has a module called aws.s3 that will access S3 buckets easily.

Build real-time alerting or real-time analytics in simplest way possible. The boto3 library can be easily connected to a Kinesis stream. A single process can consume all shards of a Kinesis stream and respond to events as they come in.

Use Amazon EMR or Databricks Cloud to bulk process gigabytes or terabytes of raw analytics data for historical analyses, machine learning models, or similar tasks. This requires Spark in batch mode via Scala or Python (pyspark). Alternatively, use traditional Hadoop or tools built on it, such as Apache Pig.

Build real-time alerting or real-time analytics, but support scale and fault tolerance that goes beyond a single data consumer node. For cases where micro-batching suffices without one-at-a-time processing, use Spark Streaming with its built-in Kinesis connector. For one-at-a-time processing, use Apache Storm.

Synchronize an S3 bucket with an Amazon Redshift instance every few hours. Redshift knows how to bulk load data from S3 via its COPY command. Use sqlalchemy with Python and the Redshift dialect to execute this command regularly.

Synchronize an S3 bucket with a Google BigQuery instance every few hours. Google provides a command-line tool that works with both S3 and Google Cloud Storage (GCS), which is called gsutil. The bq command-line tool allows you to modify and create BigQuery Datasets (tables) with their JSON schema language.

Stream real-time events into a Google BigQuery instance. BigQuery Datasets (tables) can be managed using bq, and the Python google-api-python-client performs streaming inserts using table().insertAll(...). Wiring a process from Kinesis to this API enables streaming events in BigQuery. Also, Spark Streaming can be used easily for this use case.

Basic integrations

Python code for S3 with boto3

The boto3 Python library simplifies programmatic S3 data access. The following code snippet prints three files from S3 programmatically, filtering on a specific day of data.

from pprint import pprint
import boto3
BUCKET = "parsely-dw-mashable"
# s3 client
s3 = boto3.resource('s3')
# s3 bucket
bucket = s3.Bucket(BUCKET)
# all events in hour 2016-06-01T00:00Z
prefix = "events/2016/06/01/00"
# pretty-print the first 3 objects
files = bucket.objects.filter(Prefix=prefix)
pprint(list(files)[:3])

This will produce output like this:

[
  s3.ObjectSummary(bucket_name='parsely-dw-mashable', key=u'events/2016/06/01/00/parsely-dw-mashable-001.gz'),
  s3.ObjectSummary(bucket_name='parsely-dw-mashable', key=u'events/2016/06/01/00/parsely-dw-mashable-002.gz'),
  s3.ObjectSummary(bucket_name='parsely-dw-mashable', key=u'events/2016/06/01/00/parsely-dw-mashable-003.gz')
]

The full documentation has more details.

Getting started with Kinesis and awscli

Kinesis provides real-time streaming data by offering two primary primitives:

  • Streams: 24-hour logs of real-time data.
  • Shards: partitions of real-time data to allow for scale-out consumers.

To consume a stream, first get a list of shards, then acquire a “shard iterator”, and finally request the shard iterator’s most recent records.

To acquire a shard iterator at the command line, use awscli as follows:

$ SHARD_ITERATOR=$(aws kinesis get-shard-iterator   --shard-id shardId-000000000000   --shard-iterator-type LATEST   --query 'ShardIterator'   --stream-name 'parsely-dw-mashable')

This produces a shard iterator that looks like this:

$ echo $SHARD_ITERATOR
"AAAAAAAAAAGryM+pw4kCuLiEUfOIJsf...shnLTfHtriA=="

And the actual shard iterator identifier is stored in the $SHARD_ITERATOR shell/environment variable. It expires in a few minutes if unused.

Use this with aws kinesis get-records to fetch data. The following example filters the data to only show the PartitionKey, since the actual data itself is gzip-compressed JSON and thus not readable from the shell.

$ aws kinesis get-records --shard-iterator $SHARD_ITERATOR | grep 'PartitionKey'
"PartitionKey": "http://mashable.com/2016/05/03/game-thrones-heir/",
"PartitionKey": "http://mashable.com/tech/?utm_cid=mash-prod-nav-ch",
"PartitionKey": "http://mashable.com/2016/05/03/game-thrones-heir/",
...

This is streaming real-time records from the command line.

Python code for Kinesis with boto3

This boto3 code snippet connects to a Kinesis Stream (set by STREAM) and finds all its shards, dumping one JSON record from each.

This performs programmatically what the previous command-line shell example demonstrated. The processing can be fully customized. Also, boto3 automatically decompresses data records, though the JSON messages require manual parsing.

import json
import boto3
STREAM = "parsely-dw-mashable"
# kinesis client
kinesis = boto3.client('kinesis')
# gets all shard iterators in a stream,
# and fetches most recent data
def get_kinesis_shards(stream):
  """Return list of shard iterators, one for each shard of stream."""
  shard_ids = [shard[u"ShardId"]
        for shard in shards]
  shard_iters = [kinesis.get_shard_iterator(
          StreamName=stream,
          ShardId=shard_id,
          ShardIteratorType="LATEST")
        for shard_id in shard_ids]
  return shard_iters
# acquire all iterators
shard_iters = get_kinesis_shards(STREAM)
# essentially tail -n1 for the Kinesis stream on each shard
for shard in shard_iters:
  records = kinesis.get_records(
    ShardIterator=shard[u"ShardIterator"],
    Limit=1)[u"Records"]
  for record in records:
    datum = json.loads(record[u"Data"])
    print(json.dumps(datum, indent=4, sort_keys=True))

This will produce output like this:

{
  "action": "pageview",
  "apikey": "mashable.com",
  "ts_action": "2016-06-17 01:21:24",
  "ua": "Mozilla/5.0 (iPad; CPU OS 9_3_2 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13F69 Safari/601.1",
  "url": "http://mashable.com/2016/06/16/game-of-thrones-season-6-finale-predicitons/#kEZPFgyuygqJ",
  "visitor_site_id": "ecbcb5ea-5403-4e6f-8648-3fbe71900746"
  // ... other fields elided ...
}

Wiring this code to any real-time streaming process—such as streaming writes to Google BigQuery—is a straightforward next step.

Other Kinesis clients available

Beyond boto3, Kinesis has a number of other clients for other programming languages, and even a “high-level” client that depends upon Amazon DynamoDB for fault tolerance and high availability. See the AWS documentation for more information.

Advanced integrations

Moving beyond single-node and in-memory analytics, these advanced integrations demonstrate how to use cluster computing technologies and build a highly available, fault-tolerant ETL for cloud-hosted SQL analytics engines.

Warning on advanced topic

Loading data into Spark, Redshift, and BigQuery is an advanced topic, requiring knowledge of distributed computing and how the Amazon and Google public clouds work. Parse.ly has made loading raw data into these tools as simple as possible, but feel free to skip this section if not needed.

Using Spark with S3

Spark retrieves AWS credentials from the operating system environment. Create a script in this directory called env_parsely containing:

export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE;
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY;

Replace these values with the provisioned AWS access and secret keys.

Run source env_parsely before Spark commands that require S3 access. Configure automatic sourcing of this script by all Spark commands by including it in spark-env.sh. For a Spark installation rooted at /opt/spark, here is how that would look:

cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
cat env_parsely >>/opt/spark/conf/spark-env.sh

Then, /opt/spark/bin/pyspark would open a shell where it would be possible to do the following:

> lines = sc.textFile("s3://parsely-dw-mashable/events/2016/06/01/00")
> lines.take(1)
'{"apikey": "mashable.com", "url": "http://mashable.com/", ...}'

Using Spark Streaming with Kinesis

See Spark’s official Kinesis integration documentation.

Using Storm with Kinesis

See the AWS Labs Kinesis Storm Spout repository.

SQL Engines and ETLs

An ETL, or extract-transform-load process, takes data from Parse.ly’s Data Pipeline and enriches it before loading it into a “final resting place” where teams can run useful ad hoc queries against it.

Parse.ly designed its raw data schema to simplify this process, especially with modern cloud analytics SQL engines like Amazon Redshift and Google BigQuery.

Real-time vs delayed

When building an ETL for a SQL engine, determine the importance of real-time data.

Real-time Analytics—data that can be queried almost as fast as it arrives—requires specific approaches with limited options. This requires Parse.ly’s real-time stream and streaming writes of that data. The ETL must not add significant latency. The best option is wiring Kinesis to BigQuery’s “streaming insert” API.

Near-Real-Time Analytics. With 15-minute to 60-minute delays acceptable, use either the Kinesis stream (with batching/buffering) or the S3 bucket (waiting for new objects to arrive).

24-Hour Delayed Analytics. With a 24-hour delay acceptable, use daily bulk load jobs against the S3 bucket. This is, perhaps, the simplest and most reliable option. Both Redshift and BigQuery can do bulk data loads of gigabytes of data at rest, and this can run as a cron job or similar rather than requiring an always-on pipeline consumer. This option also provides the best advantage of systems like Amazon EMR and its spot instance cluster computing model.

Redshift vs BigQuery

Redshift and BigQuery are the two most popular SQL engines available on the public cloud market, and they are run by Amazon and Google, respectively.

JSON format makes the raw data easy to read, and Amazon Redshift and Google BigQuery each have native support for parsing lines of compressed gzip JSON data, like the kind Parse.ly offers. Indeed, the main way Parse.ly does end-to-end testing on its raw data formats is to ensure they properly bulk and stream load into these cloud SQL data stores.

However, these engines also have various data integration options, as will be discussed in the data loading tutorials below.

Loading data into Redshift

In the case of Redshift, bulk data loads from S3 are a “one-liner”. Simply use the Redshift COPY command like this after creating a schema/table named parsely.rawdata:

COPY parsely.rawdata
FROM 's3://parsely-dw-mashable/events/2016/06/01/12'
CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXX;aws_secret_access_key=XXXXXXXXXXXXXXXX'
REGION AS 'us-east-1'
FORMAT AS json 'auto'
DATEFORMAT 'auto';

Customize the S3 bucket name, date prefix, and access credentials as discussed in the Getting Access page. Monitor the bulk data load in the Redshift AWS panel.

To test proper data loading, run a global pageview count:

SELECT COUNT(action) as views
FROM parsely.rawdata
WHERE action = 'pageview';

See parsely_raw_data and its example Redshift schema for more information.

Loading data into BigQuery

BigQuery offers two options: bulk load or streaming inserts.

To bulk load data, the easiest option is to transfer data from S3 to Google Cloud Storage (GCS) using Google’s [Storage Transfer][transfer] service. This approach uses a gs:// bucket in sync with an s3:// bucket, enabling bulk data loading using Google’s bq command-line tool.

Perform one-off S3 to GCP copies using gsutil cp.

$ gsutil -m cp -r   s3://parsely-dw-mashable/**   gs://parsely-dw-mashable/

Once data is in Google Storage, use bq to bulk load it quickly. For example:

$ bq load   --max_bad_records=1000   --source_format=NEWLINE_DELIMITED_JSON   parsely.rawdata   gs://parsely-dw-mashable/events/2016/06/01/12   bq.json

Where bq.json is a JSON file containing the BigQuery schema definition.

To test proper data loading, run a global pageview count:

SELECT COUNT(action) as views
FROM parsely.rawdata
WHERE action = 'pageview';

To stream data in, use the Python Google BigQuery client library and run tabledata().insertAll(...) function calls as data arrives on the Amazon Kinesis Stream. Example Python code for this is available in the Parse.ly GitHub repository.

See parsely_raw_data and its example BigQuery schema for more information.

Other example code in parsely_raw_data

Example streaming (Kinesis) and bulk (S3) integration code in Python is available, along with a representation of a single Parse.ly raw event in Python.

Getting help with integration

  • For existing Parse.ly customers, sites are already instrumented for Parse.ly’s Data Pipeline. No additional integration steps are required to start leveraging raw data. Contact Parse.ly Support to discuss secure access key ID and secret access key provisioning. The service can then be used freely, with options to instrument custom events and custom data.
  • For organizations not yet Parse.ly customers, start with the basic integration. Contact Parse.ly for a demo to review necessary integration steps.

Last updated: December 30, 2025