Exploring Hive/Impala partitions

By | October 5, 2016

Table partitioning is a common practice in RDBMS world and all major databases support it.
Basically it is splitting the table data into several physical parts based on a function, range or value of a column or a set of columns, while keeping all the data under the same logical unit which is the table.
This speeds up queries as the query engine does not have to scan through all the data but only the relevant partitions.
Hive also supports partitions, but the syntax and the implementation of it is a little different than in most relational databases.
Since Impala and Hive share the same metadata, Impala queries can benefit from partitioning just as Hive does.

For this demo I used the zip code dataset from US IRS (I used the 2014 data file). The dataset has wide rows and I do not need most of that information for my demo. So I discarded everything but the first Sevencolumns.

Here is the data structure:

View full size image

The original file contains only about 27,000 lines, which is a little small for a benchmark. So I ran this command several times to grow it:

 cat 14zpallagi.csv >> data.csv

First I will create a regular, unpartitioned table:

create table zip (
statefips string,
state string,
zipcode int,
agi_stub int,
n1 int,
mars1 int,
mars2 int,
mars4 int) row format delimited fields terminated by ',' stored as textfile;

 

And then a partitioned one (partition on state code):

create table zip_part (
statefips string,
zipcode int,
agi_stub int,
n1 int,
mars1 int,
mars2 int,
mars4 int) partitioned by (state string) 
row format delimited fields terminated by ',' stored as textfile;

 

But wait a second – where is the state column ?! Everyone who used RDBMS system and created some partitions can see that the state column is missing. But this is no typo. If you think of it, stating the partition key column in the column list and then again in the partition clause is redundant. Hive/Impala prefers to avoid this redundancy and only state the column name once. It can conclude from the partition clause that there is an additional column “state”. The cost is that (at least in my opinion) the code is less human readable than in RDBMS.

Now we will load the data from the file to the hive table:

load data local inpath '/var/lib/hive/data.csv' into table zip;
Loading data to table default.zip
Table default.zip stats: [numFiles=1, totalSize=773604]
OK
Time taken: 1.577 seconds

 

This one is straightforward. Now we will take the records from zip table and insert them into zip_part table.
The process is s bit different from with a non-partitioned table. First of all, dynamic partitioning is off by default. Hive does not automatically redirect the rows into the appropriate partition and you have to explicitly specify it in the insert statement. If you want automatic partitioning, you have to set these two parameters in Hive:

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

 

Second, we have to specify the “partition” clause along with the partition key. And third, the partition key column must be specified last in the select statement, like you can see below:

insert into zip_part partition(state) select statefips,zipcode,agi_stub,n1,mars1,mars2,mars4,state from zip;

The output of the job shows that hive has automatically created partitions as needed according to the data:

Query ID = hive_20160929144444_58e2fff1-d0a8-4817-ab82-94e8b870b1b3
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1475136353099_0017, Tracking URL = http://cloudera6.lan:8088/proxy/application_1475136353099_0017/
Kill Command = /opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/hadoop/bin/hadoop job -kill job_1475136353099_0017
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 0
2016-09-29 14:44:25,829 Stage-1 map = 0%, reduce = 0%
2016-09-29 14:44:35,052 Stage-1 map = 2%, reduce = 0%, Cumulative CPU 8.29 sec
2016-09-29 14:44:37,286 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 8.29 sec
2016-09-29 14:44:43,279 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 14.58 sec
MapReduce Total cumulative CPU time: 14 seconds 580 msec
Ended Job = job_1475136353099_0017
Stage-4 is filtered out by condition resolver.
Stage-3 is selected by condition resolver.
Stage-5 is filtered out by condition resolver.
Launching Job 3 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1475136353099_0018, Tracking URL = http://cloudera6.lan:8088/proxy/application_1475136353099_0018/
Kill Command = /opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/hadoop/bin/hadoop job -kill job_1475136353099_0018
Hadoop job information for Stage-3: number of mappers: 11; number of reducers: 0
2016-09-29 14:45:02,507 Stage-3 map = 0%, reduce = 0%
2016-09-29 14:45:23,959 Stage-3 map = 9%, reduce = 0%, Cumulative CPU 11.02 sec
2016-09-29 14:45:25,104 Stage-3 map = 18%, reduce = 0%, Cumulative CPU 11.95 sec
2016-09-29 14:45:28,486 Stage-3 map = 23%, reduce = 0%, Cumulative CPU 20.4 sec
2016-09-29 14:45:29,690 Stage-3 map = 27%, reduce = 0%, Cumulative CPU 26.34 sec
2016-09-29 14:45:31,890 Stage-3 map = 36%, reduce = 0%, Cumulative CPU 28.09 sec
2016-09-29 14:45:43,401 Stage-3 map = 45%, reduce = 0%, Cumulative CPU 38.22 sec
2016-09-29 14:45:44,565 Stage-3 map = 55%, reduce = 0%, Cumulative CPU 38.89 sec
2016-09-29 14:45:50,106 Stage-3 map = 64%, reduce = 0%, Cumulative CPU 45.51 sec
2016-09-29 14:45:52,341 Stage-3 map = 68%, reduce = 0%, Cumulative CPU 50.99 sec
2016-09-29 14:45:53,509 Stage-3 map = 73%, reduce = 0%, Cumulative CPU 51.56 sec
2016-09-29 14:45:59,230 Stage-3 map = 82%, reduce = 0%, Cumulative CPU 56.4 sec
2016-09-29 14:46:01,501 Stage-3 map = 91%, reduce = 0%, Cumulative CPU 60.3 sec
2016-09-29 14:46:02,656 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 62.39 sec
MapReduce Total cumulative CPU time: 1 minutes 2 seconds 390 msec
Ended Job = job_1475136353099_0018
Loading data to table default.zip_part partition (state=null)
 Time taken for load dynamic partitions : 3390
 Loading partition {state=STATE}
 Loading partition {state=FL}
 Loading partition {state=AZ}
 Loading partition {state=AK}
 Loading partition {state=CO}
 Loading partition {state=CA}
 Loading partition {state=AL}
 Loading partition {state=DE}
 Loading partition {state=CT}
 Loading partition {state=DC}
 Loading partition {state=AR}
 Time taken for adding to write entity : 20
Partition default.zip_part{state=AK} stats: [numFiles=1, numRows=18468, totalSize=391476, rawDataSize=373008]
Partition default.zip_part{state=AL} stats: [numFiles=1, numRows=197334, totalSize=3993078, rawDataSize=3795744]
Partition default.zip_part{state=AR} stats: [numFiles=1, numRows=167466, totalSize=3246093, rawDataSize=3078627]
Partition default.zip_part{state=AZ} stats: [numFiles=1, numRows=98838, totalSize=2151009, rawDataSize=2052171]
Partition default.zip_part{state=CA} stats: [numFiles=1, numRows=507813, totalSize=11185566, rawDataSize=10677753]
Partition default.zip_part{state=CO} stats: [numFiles=1, numRows=135432, totalSize=2831760, rawDataSize=2696328]
Partition default.zip_part{state=CT} stats: [numFiles=1, numRows=89946, totalSize=1864299, rawDataSize=1774353]
Partition default.zip_part{state=DC} stats: [numFiles=1, numRows=8208, totalSize=195396, rawDataSize=187188]
Partition default.zip_part{state=DE} stats: [numFiles=1, numRows=19494, totalSize=443175, rawDataSize=423681]
Partition default.zip_part{state=FL} stats: [numFiles=1, numRows=299022, totalSize=6905778, rawDataSize=6606756]
Partition default.zip_part{state=STATE} stats: [numFiles=1, numRows=57, totalSize=1425, rawDataSize=1368]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Cumulative CPU: 14.58 sec HDFS Read: 66542471 HDFS Write: 50612035 SUCCESS
Stage-Stage-3: Map: 11 Cumulative CPU: 62.39 sec HDFS Read: 33271739 HDFS Write: 33209055 SUCCESS
Total MapReduce CPU Time Spent: 1 minutes 16 seconds 970 msec
OK
Time taken: 126.114 seconds

 

Now we have two tables with the same data: zip which is not partitioned and zip_part which is partitioned by state. Each table contains 1,542,078 rows.

Let’s run a simple query on both of them.

select avg (mars4) from zip where state=’AL’;

Running it on zip table took 44.3 seconds and MpReduce CPU time was almost 8 seconds:

Starting Job = job_1475136353099_0023, Tracking URL = http://cloudera6.lan:8088/proxy/application_1475136353099_0023/
Kill Command = /opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/hadoop/bin/hadoop job -kill job_1475136353099_0023
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2016-09-29 15:03:17,509 Stage-1 map = 0%, reduce = 0%
2016-09-29 15:03:23,568 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 5.31 sec
2016-09-29 15:03:26,106 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 7.98 sec
MapReduce Total cumulative CPU time: 7 seconds 980 msec
Ended Job = job_1475136353099_0023
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 7.98 sec HDFS Read: 110651556 HDFS Write: 761941 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 980 msec
OK
437.58809936452917
Time taken: 44.363 seconds, Fetched: 1 row(s)

 

Running the same query on zip_part took only 28.28 seconds with MapReduce CPU time of 6 seconds.

Starting Job = job_1475136353099_0025, Tracking URL = http://cloudera6.lan:8088/proxy/application_1475136353099_0025/
Kill Command = /opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/hadoop/bin/hadoop job -kill job_1475136353099_0025
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-09-29 15:23:35,599 Stage-1 map = 0%, reduce = 0%
2016-09-29 15:23:41,195 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 5.59 sec
2016-09-29 15:23:42,324 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 6.01 sec
MapReduce Total cumulative CPU time: 6 seconds 10 msec
Ended Job = job_1475136353099_0025
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 6.01 sec HDFS Read: 7998221 HDFS Write: 500712 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 10 msec
OK
437.58809936452917
Time taken: 28.285 seconds, Fetched: 1 row(s)

 

We can also examine the explain plan of each table. Unfortunately, unlike RDBMS’s it does not explicitly show the use of partition scan. We can infer the use of partition scan only from the number of phases and the number of rows scanned.
Here is a part of the explain plan for the zip table query:

hive> explain select avg (mars4) from zip where state='AL';
OK
STAGE DEPENDENCIES:
 Stage-1 is a root stage
 Stage-0 depends on stages: Stage-1

STAGE PLANS:
 Stage: Stage-1
 Map Reduce
 Map Operator Tree:
 TableScan
 alias: zip
 Statistics: Num rows: 7349238 Data size: 44095428 Basic stats: COMPLETE Column stats: NONE
 Filter Operator
 predicate: (state = 'AL') (type: boolean)
 Statistics: Num rows: 3674619 Data size: 22047714 Basic stats: COMPLETE Column stats: NONE
 Select Operator
 expressions: mars4 (type: int)
 outputColumnNames: mars4
 Statistics: Num rows: 3674619 Data size: 22047714 Basic stats: COMPLETE Column stats: NONE
 Group By Operator
 aggregations: avg(mars4)
 mode: hash
 outputColumnNames: _col0
 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
 Reduce Output Operator
 sort order:
 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
 value expressions: _col0 (type: struct<;count:bigint,sum:double,input:int)

 

You can see that the starting population is 7,349,238 rows and that it has Three stages: Filter the rows where stat=’AL’, select the mars4 column and run average aggregation on it. Now here is the explain plan of the same query on the partitioned table:

 Map Operator Tree:
 TableScan
 alias: zip_part
 Statistics: Num rows: 197334 Data size: 3795744 Basic stats: COMPLETE Column stats: NONE
 Select Operator
 expressions: mars4 (type: int)
 outputColumnNames: mars4
 Statistics: Num rows: 197334 Data size: 3795744 Basic stats: COMPLETE Column stats: NONE
 Group By Operator
 aggregations: avg(mars4)
 mode: hash
 outputColumnNames: _col0
 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
 Reduce Output Operator
 sort order:
 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
 value expressions: _col0 (type: struct&amp;amp;lt;count:bigint,sum:double,input:int)
 Reduce Operator Tree:
 Group By Operator
 aggregations: avg(VALUE._col0)
 mode: mergepartial
 outputColumnNames: _col0

Here you can see that the filter by state phase is omitted (since we only scan the “AL” partition) and the starting population is only 197,334 rows. Probably the count of the rows in the “AL” partition.

There are several things to note:

  • There is a parameter that determines how many partitions can be created in every node:
    hive.exec.max.dynamic.partitions.pernode

    The default value is 100 and you should increase it using the set statement if you want to partition a table by a column that has more than 100 possible values.

  • All partitioning rules such as when you should partition your data and which columns are a good candidates for partition key are the same as with relational databases.
  • There is a close relative of partitioning called Bucketing. I haven’t tested it yet but you can read about it here and here.

 

In the next part I cover how Hive physically implements partitioning.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.