tech blog

AppNexus is today’s most powerful, open, and customizable ad tech platform. Advertising’s largest and most innovative companies build their businesses on AppNexus.

Parquet: Columnar Storage for Hadoop Data

| Comments

Overview

At AppNexus, over 2MM log events are ingested into our data pipeline every second. Log records are sent from upstream systems in the form of protobuf messages. Raw logs are compressed in Snappy when stored on HDFS. That said, even with compression, this still leads to over 25TB of log data collected every day. On top of logs, we also have 100s of MapReduce jobs that process and generate aggregated data. Collectively, we store petabytes of data in our primary Hadoop cluster.

Parquet is a columnar storage format in the Hadoop ecosystem. Compared to a traditional row oriented format, it is much more efficient in storage and has better query performance. Parquet is widely used in the Hadoop world for analytics workloads by many query engines. Among them are engines on top of Hadoop, such as Hive, Impala and systems which go beyond MapReduce to improve performance(Spark, Presto).

Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression. It is especially good for queries which read particular columns from a “wide” (with many columns) table, since only needed columns are read and IO is minimized. Read this for more details on Parquet.

Data is important but storage costs money, not to mention the processing resources at this scale, including CPU, network IO, etc. With limited hardware provided, we started looking at member COGS since end of last year and would like to figure out a way to serve the on-growing data needs. We think Parquet is the choice as it serves both needs, efficient and performant in both storage and processing.

The benchmark here measures some key performance metrics, including disk usage, resource usage and query performance. We did comparisons on two storage formats:

  • Sequence Files with Protobuf compressed by Snappy
  • Parquet Files compressed by Snappy

Note: This benchmark is not intended to provide a performance comparison among query engines out on the market. For now, we have chosen Hive as the simple query engine and we used default configurations for Hive without optimization, with the goal to make the results reproducible.

DataSet

For testing, we picked one of our biggest tables, aggregated_impressions. Rows in this table represent impressions, callbacks and auctions joined at transactions level. It is now the biggest table stored in our Hadoop cluster, which currently takes 270TB of HDFS storage (810TB in raw storage after 3 replications), and serves as the primary source of data for most of the higher level aggregated tables.

On HDFS, files for this table are stored in data blocks of size 128MB(134217728 bytes).

Disk Storage

We compare HDFS usage of a single day for this table in different storage formats and break down in hours. The result is shown in the following chart.

Query Performance

Query performance for Parquet tables really depends on the number of columns needed to process in SELECT, WHERE and GROUP BY clauses of the query. Say the table has n columns, and m columns are needed in the query, we denote column ration t as determined by m / n. The smaller the ratio t is, the bigger performance improvement you will see for that query on Parquet tables than regular row-oriented tables.

We picked 3 common used queries on aggregated_impressions and measure their execution times and resource consumptions. The same set of queries were run against data stored in the two data formats, Parquet and Sequence with Protobuf.

  1. The first query does a simple aggregation for the table aggregated_impressions on one particular field. The only dimension used here is imp_type which has a low cardinality of 10. The aggregation generates metrics on double_monetary_field1 and double_monetary_field2.

  2. The second query is pulled from one of our hive jobs, agg_high_dimesion_data. The query performs a more complex, high cardinality aggregation on multiple different dimensions and metrics.

  3. The third query is a common type ad hoc query to find some sparse records in the massive dataset. Sometimes, such type of query may produce no output.

For all 3 tests, Hive queries ran with the same configuration.

  • Block size is 134217728 bytes
  • Hive max split size is 256000000 bytes

Comparison over map reduce metrics

#### Query 1

1
2
3
4
5
6
SELECT   imp_type,
         Sum(double_monetary_field1),
         Sum(double_monetary_field1)
FROM     aggregated_impressions
WHERE    dh = '${hour}'
GROUP BY imp_type;

Query 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
INSERT overwrite TABLE *agg_high_dimesion_data* partition
       (
              dy='2015',
              dm='2015-03',
              dd='2015-03-30',
              dh='${hour}'
       )
SELECT   date_trunc('hour',date_time) AS ymdh,
         bl.aid,
         bl.cgid,
         bl.cid,
         bl.crid,
         bl.pid,
         bl.vid,
         CASE
                  WHEN int_field1 < 20 THEN int_field1
                  ELSE 20
         END AS int_field1_cal,
         CASE
                  ...
                  // A large CASE WHEN statement
         END                                          AS int_field2_cal,
         sum(bl.isI)                                  AS i_count,
         sum(bl.isC)                                  AS c_count,
         sum(pc_number)                               AS pc_count,
         sum(bl.pv_number)                            AS pv_count,
         to_numeric(sum(double_field)/1000, '%18.6f') AS double_field
FROM     (
                SELECT *
                FROM   aggregated_impressions
                WHERE  date_time >= '${hour}:00:00'
                AND    date_time <= '${hour}:59:59'
                AND    dh = '${hour}'
                ) bl
GROUP BY date_trunc('hour',date_time) ,
         bl.aid,
         bl.cgid,
         bl.cid,
         bl.crid,
         bl.pid,
         bl.vid,
         CASE
                  WHEN some_frequency < 20 THEN some_frequency
                  ELSE 20
         END ,
         CASE
                  ...
                  // A large CASE WHEN statement
         END

Query 3

1
2
3
4
5
6
7
8
9
SELECT   auction_id_64,
         imp_type,
         date_time
FROM     aggregated_impressions
WHERE    aid=1234567
AND      id1=1111
AND      id2=4567
AND      dh = '${hour}'
ORDER BY date_time ASC limit 25;

For all 3 tests, queries on Parquet table needed only half as many maps as those on Protobuf table. We created several plots below which break down some of the key map-reduce job performance metrics.

  • Total CPU Time

  • Average Map Time

As you can see from the above charts, Parquet really excels when the query is on sparse data or low cardinality in column selection. The CPU usage can be reduced by more than 98% in total and the total memory consumption is cut by at least half. To summarize, Parquet is a better technology in storage and computation resources. At the time of this writing, we have migrated a few of our biggest tables on Parquet.

Upstream log data comes in as Protobuf. After ingestion, we validate this data before loading to HDFS and also whenever jobs read any Protobuf messages from HDFS. This validation framework, along with our entire map reduce job API framework are tied to the Protobuf data format. You may be wondering how we smoothly made this migration seamlessly without having to revamp our entire pipeline framework. In our next blog, we will shed more light on implementation and migration details.

Comments