What is Apache Parquet?
Storage Matters!
Although we are in the era of cool streaming platforms and we want to think about real-time applications in machine learning but often we just want to churn and burn through a significant amount of data and train our models or query for patterns. We need to make sure our data storage is performant, efficient, and reliable.
Let’s define our goals for efficiency a little. What are the goals for Data Lake Storage?
- Good usability, Easy to backup, Minimal learning curve, Easy integration with existing tools: We want to integrate it into our stack and we want our users to figure it out without bothering us.
- Resource-efficient: we want it to be resource-efficient because resources cost real dollars.
When we are dealing with Big Data, those little costs add up both in terms of money and compute time. Let’s look at I/O time as an example:
I/O costs really hurt, and we are not only talking about disk I/O, we are querying in a distributed environment, so we have to get data off disk and then across the network to our distributed compute cluster. So we have disk and network I/O to consider. It’s nanoseconds to memory, microseconds to SSD, milliseconds to disk, and tens of milliseconds to network.
We compare three options for storage: Files, Compressed Files, Databases.
Files are easy to use, there is no learning curve, there are no administrations, and they integrate really well with whatever your stack is. But files have severe drawbacks. They are big and slow, have a high disk, and I/O cost, and the performance is terrible during query time(because Spark has to pull the whole file to get a column and filter them). Can we do better with databases?
Though databases are great tools, data lake storage is the wrong job for them. It’s not practical to scale a database to handle multiple terabytes.
Can we combine the ease of use of files with the query time of a database?
Below, you can see a comparison between Parquet and CSV query time.
Parquet Format
Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model, or programming language. Lots of data systems support this data format because of its great advantage of performance.
One cool feature of Parquet is that it can handle arbitrarily nested data. Apache Parquet is a binary encoding like Apache Thrift, so it is not human-readable and it’s very different from some textual format like JSON, XML, and CSV.
You can see Parquet's high-level schema above. So we have Column Name, Optional/ Required/ Repeated, Data Type, Encoding Info, Repetition Value, and Defenition Value.
Optional and required means whether it can be null or not and Repeated is exactly what it means. If the Data Type is binary we have Encoding Info about the way the data is encoded. We will discuss this a little later.
We also have Repetition and Definition values. This is where Parquet is borrowing really heavily from Google Dremel. Dremel was a system designed to do arbitrary nested fast query storage.
You can think of R-values as whether or not a column is repeated and what level of my nesting it’s repeated. Here you can see that “links” is a repeated column and we have to dig down to get that repetition. D-value or Definition is how far we have to dig in nesting to see whether or not that individual piece of data is gonna be null.
How Parquet Works
Below you can see Parquet high-level structure in the file system.
_SUCCESS is a success message from Hadoop. common_metadata and metadata are schema summary files written by spark and are turned off by default. The meat and potatoes are compressed files.
If we write our table in a row-oriented way, we have String, different String, Int. All of these types mixed together and this is tough on our cache, when read into L1 L2 cache is not going to be well aligned. Also, if values are null, we make some branch mispredictions, that's gonna be unfriendly to our processor.
It is much better to write it in a column-oriented way. We have all of column one and so on, we are not storing nulls it’s all there, roughly the same size, much more friendly with our hardware.
Another advantage of columnar storage is that we are getting this data that have similar properties which means we can pull out every single encoding trick to squeeze the data.
So let’s take an example and look at dictionary encoding:
Here we have just two types of record labels so we can create a small dictionary and we can get 84 bytes to 10 bits and squeeze that down to 2 bytes. There are lots of more encoding schemes such as Plain, Dictionary Encoding, Run-length Encoding/Bit Packing Hybrid, Delta Encoding, Delta-Length Byte Array, Delta Strings(incremental encoding).
Slicing and Dicing with Compressed Files
Within each Parquet file, there is a tree of structures. We start with the root node which is File Metadata. This is where we store our schema, drift headers, offsets and etc. The next element is the Row Group. Spark is not going to save all rows(e.g. 1 million) at once. Row group is a subset of all the rows we want to save.
In order to identify the beginning and end of the Parquet file, it uses a Magic Number(4 special bytes) as a separator. Following the first magic number, there are several Row Groups and then Footer. FileMetaData is placed in Footer because metadata is written after the data is written. Row Groups are about data.
Some important conceptions listed below.
- Block (HDFS block): This means a block in HDFS and the meaning is unchanged for describing this file format. The file format is designed to work well on top of HDFS.
- File: A HDFS file that must include the metadata for the file. It does not need to actually contain the data.
- Row group: A logical horizontal partitioning of the data into rows. There is no physical structure that is guaranteed for a row group. A row group consists of a column chunk for each column in the dataset.
- Column chunk: A chunk of the data for a particular column. These live in a particular row group and are guaranteed to be contiguous in the file.
- Page: Column chunks are divided up into pages. A page is conceptually an indivisible unit (in terms of compression and encoding). There can be multiple page types that are interleaved in a column chunk.
- Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages.
For each Row Group, we will treat each column individually and I’m going to cut these columns into chunks. Two important details here, if an individual value is null we are not going to save it, also if a whole column is null we won’t save that neither. As an example, you can see column 2 has two chunks but column one has just one chunk.
Inside each column chunk, we have a shared Page Header which brings us to the main event which Is pages. Pages are the indivisible atomic unit of Parquet. Here we have a little bit of metadata, some R-values and D-values, and Encoded Data
If you want to know more about format spec please visit: https://github.com/apache/parquet-format
Get Efficiency with Spark
How does Spark work with parquet to make that efficiency? How do we make those queries so fast?
Well, the first thing is that just like any columnar storage, we are going to just pull the column we need. Next thing is that we can write using partitioning, this is not unique to parquet but the API provides a nice way to do this.
Commonly, we will partition our data by year, month, day(time-series).
Another thing is that spark takes our queries and if we have filters on them, it's gonna push them down into the data scanning process.
For Where clauses, Having clauses, etc. in SparkSQL, the Data Loading layer will test the condition before pulling a column chunk into Spark memory.
We push the filtering down to the column chunks. Because column chunks have statistics like mins and maxes where we can evaluate this filter condition to see whether or not we have to pull this column. This is a huge I/O saving for the queries.
To summarize, with Parquet, you can get just the partition you need, just the columns you need, and just the chunk of columns that fit your filtering conditions.