Comparing Hive with Spark SQL

This tutorial is adapted from Web Age course Hadoop Programming on the Cloudera Platform.

In this tutorial, you will work through two functionally equivalent examples / demos – one written in Hive (v. 1.1) and the other written using PySpark API for the Spark SQL module (v. 1.6) – to see the differences between the command syntax of these popular Big Data processing systems. Both examples / demos have been prepared using CDH 5.13.0 and that’s what you are going to use in this exercise.

We will work against the same input file in both examples, running functionally equivalent query statements against that file.

Both Hive and PySpark shells support the Unix command-line short-cuts that allow you to quickly navigate along a single command line:

Ctrl-a Moves the cursor to the start of the line.

Ctrl-e Moves the cursor to the end of the line.

Ctrl-k Deletes (kills) the line contents to the right of the cursor.

Ctrl-u Deletes the line contents to the left of cursor.

Part 1 – Connect to the  Environment

  • Download and install Cloudera’s Quickstart VM. Follow this link for instructions.

Part 2 – Setting up the Working Environment

All the steps in this tutorial will be performed in the /home/cloudera/Works directory. Create this direectory if it is not existing.

1. In a new terminal window, type in the following command:

cd ~/Works

Before we begin, we need to make sure that Hive-related services are up and running.

2. Enter the following commands one after another:

sudo /etc/init.d/hive-metastore status  
sudo /etc/init.d/hive-server2 status 

If you see their status as not running, start the stopped service(s) using the following commands:

sudo /etc/init.d/hive-metastore start 
sudo /etc/init.d/hive-server2 start 

3. Enter the following command:

hive --version

You should see the following output:

Hive 1.1.0-cdh5.13.0
Subversion file:///data/jenkins/workspace/generic-package-rhel64-6-0/topdir/BUILD/hive-1.1.0-cdh5.13.0 -r Unknown
Compiled by jenkins on Wed Oct 4 11:06:55 PDT 2017
From source with checksum 4c9678e964cc1d15a0190a0a1867a837

4. Enter the following command to download the input file:

wget 'http://bit.ly/36fGR32'  -O files.csv

The file is in the CSV format with a header row; it has the following comma-delimited fields:

1. File name
2. File size 
3. Month of creation 
4. Day of creation

And the first four rows of the file are:

FNAME,FSIZE,MONTH,DAY
a2p,112200,Feb,21
abrt-action-analyze-backtrace,13896,Feb,22
abrt-action-analyze-c,12312,Feb,22

5. Enter the following commands to put the file on HDFS:

hadoop fs -mkdir hive_demo
hadoop fs  -put files.csv hive_demo/

The file is now in the hive_demo directory on HDFS – that’s where we are going to load it from when working with both Hive and Spark.

Part 3 – The Hive Example / Demo

We will use the hive tool to start the interactive Hive shell (REPL) instead of the now recommended beeline tool which is a bit more ceremonial and here we are going to ignore its client-server architecture advantages in favor of the simplicity of hive.

1. Enter the following command to Start Hive REPL:

hive 

Note: You start the Beeline shell in embedded mode by running this command:

beeline -u jdbc:hive2://

To connect to a remote Hive Server (that’s is the primary feature of Beeline, use this command:

beeline -n hive -u jdbc:hive2://<hive server IP>:<port>,

e.g beeline -n hive -u jdbc:hive2://quickstart:10000

Now, let’s create a table using the Hive Data Definition Language (DDL).

2. Enter the following command:

CREATE EXTERNAL TABLE xfiles
(fileName STRING,fileSize INT,month STRING,day INT)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION '/user/cloudera/hive_demo/'
    tblproperties ("skip.header.line.count"="1");

Note that we have created it an external Hive table.

3. Enter the following command to print the basic table schema info:

describe xfiles;

You should see the following output:

filename            	string              	                    
filesize            	int                 	                    
month               	string              	                    
day                 	int 

4. Enter the following command to print the extended table metadata:

desc formatted xfiles;

You should see the following output:

# col_name            	data_type           	comment             
	 	 
filename            	string              	                    
filesize            	int                 	                    
month               	string              	                    
day                 	int                 	                    
	 	 
# Detailed Table Information	 	 
Database:           	default             	 
Owner:              	cloudera            	 
CreateTime:         	Mon Feb 24 07:35:26 PST 2020	 
LastAccessTime:     	UNKNOWN             	 
Protect Mode:       	None                	 
Retention:          	0                   	 
Location:           	hdfs://quickstart.cloudera:8020/user/cloudera/hive_demo 
Table Type:         	EXTERNAL_TABLE      	 
Table Parameters:	 	 
	EXTERNAL            	TRUE                
	skip.header.line.count	1                   
	transient_lastDdlTime	1582558526          
	 	 
# Storage Information	 	 
SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe	 
InputFormat:        	org.apache.hadoop.mapred.TextInputFormat	 
OutputFormat:       	org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat	 
Compressed:         	No                  	 
Num Buckets:        	-1                  	 
Bucket Columns:     	[]                  	 
Sort Columns:       	[]                  	 
Storage Desc Params:	 	 
	field.delim         	,                   
	serialization.format	,   

5. Enter the following command to fetch the first ten rows of the table:

select * from xfiles limit 10;

6. Enter the following basic SQL command:

select month, count(*) from xfiles group by month;

You should see the following output:

Apr	305
Aug	93
Dec	103
Feb	279
Jan	5
Jul	63
Jun	129
Mar	2
May	5
Nov	221
Oct	9
Sep	66

7. Enter the following more advanced SQL command:

select month, count(*) as fc from xfiles group by month having fc > 100;

You should see the following output:

Apr	305
Dec	103
Feb	279
Jun	129
Nov	221

8. Enter the following Hive DDL and DML commands (we are creating and populating a table in PARQUET columnar store format):

CREATE TABLE xfiles_prq LIKE xfiles STORED AS PARQUET;
INSERT OVERWRITE TABLE xfiles_prq SELECT * FROM xfiles;

9. Enter the following command:

desc formatted xfiles_prq;

Locate in the output of the above command the location of the xfiles_prq Hive managed table; it should be:

hdfs://quickstart.cloudera:8020/user/hive/warehouse/xfiles_prq

It is a directory that contains a file in the PARQUET format:

/user/hive/warehouse/xfiles_prq/000000_0

Part 4 – The Spark Example / Demo

1. Start a new terminal and navigate to ~/Works

2. Start a new PySpark session

Notice that PySpark v.1.6 uses Python v.2.6.6.

3. Enter the following command:

xfiles = sqlContext.read.load('/user/hive/warehouse/xfiles_prq/')

We are reading the file in the PARQUET format, which is the default for the load method.

4. Enter the following command:

type(xfiles)

You should see the following output:

<class 'pyspark.sql.dataframe.DataFrame'>

5. Enter the following command to print the xfiles‘s schema (it is somewhat equivalent to Hive’s describe xfiles command):

xfiles.printSchema()

You should see the following output (notice that Spark accurately recreated the schema of the file. From where do you think it got the clues?)

root
 |-- filename: string (nullable = true)
 |-- filesize: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)

Now we will repeat all the queries from the Hive shell lab part.

In PySpark you have two primary options for querying structured data:

  • using the sqlContext’s sql method, or

  • using the Spark DataFrame API

we will illustrate both.

To use the sqlContext’s sql method, you need to first register the DataFrame as a temp table that is associated with the active sqlContext; this temp table’s lifetime is tied to that of your current Spark session.

6. Enter the following command:

xfiles.registerTempTable('xfiles_tmp')

7. Enter the following command to fetch the first ten rows of the table:

sqlContext.sql('select * from xfiles_tmp').show(10)

You should see the following output (abridged for space below):

+--------------------+--------+-----+---+
|            filename|filesize|month|day|
+--------------------+--------+-----+---+
|                 a2p|  112200|  Feb| 21|
|abrt-action-analy...|   13896|  Feb| 22|
|abrt-action-analy...|   12312|  Feb| 22|
|abrt-action-analy...|    6676|  Feb| 22|
|abrt-action-analy...|   10720|  Feb| 22|
|abrt-action-analy...|   11016|  Feb| 22|
. . . 

8. Enter the following command:

xfiles.select ('*').show(10)

9. Enter the following command:

sqlContext.sql('select month, count(*) from xfiles group by month').show(10)

You should see the following output:

+-----+---+                                                                     
|month|_c1|
+-----+---+
|  Jul| 63|
|  Jun|129|
|  Apr|305|
|  Feb|279|
|  Oct|  9|
|  Nov|221|
|  Mar|  2|
|  May|  5|
|  Dec|103|
|  Aug| 93|
+-----+---+

10. Enter the following command:

 xfiles.groupBy('month').count().show(10)

You should see similar to the above output:

+-----+-----+
|month|count|
+-----+-----+
|  Jul|   63|
|  Jun|  129|
|  Apr|  305|
|  Feb|  279|
|  Oct|    9|
|  Nov|  221|
|  Mar|    2|
|  May|    5|
|  Dec|  103|
|  Aug|   93|
+-----+-----+

11. Enter the following command:

sqlContext.sql('select month, count(*) as fc from xfiles group by month having fc > 100').show()

You should see the following output:

+-----+---+
|month| fc|
+-----+---+
|  Jun|129|
|  Apr|305|
|  Feb|279|
|  Nov|221|
|  Dec|103|
+-----+---+

12. Enter the following command:

xfiles.groupBy('month').count()\
.rdd.filter(lambda r: r['count'] > 100)\
.toDF().show()

You should see the output matching the one generated by the previous command.

Alternatively, you can use this more verbose but more Pythonian type of command:

map(lambda r: (r[‘month’], r[‘count’]), filter(lambda r: r[‘count’] > 100, xfiles.groupBy(‘month’).count().collect()) )

The output will be a bit more difficult to read, though:

[(u'Jun', 129), (u'Apr', 305), (u'Feb', 279), (u'Nov', 221), (u'Dec', 103)] 

Part 5 – Creating a Spark DataFrame from Scratch

You can create a Spark DataFrame from raw data sitting in memory and have Spark infer the schema from the data itself. The choices for column types are somewhat limited: PySpark does not support dates or booleans, but in most practical cases what PySpark supports is more than enough.

1. Enter the following commands to create a list of tuples (records) representing our data (we simulate data coming form different sources here):

txnids = [1,2,3]
items = ['A', 'B', 'C']
dates = ['2020-02-03', '2020-02-10', '2020-02-17']
prices = [123.99, 3.50, 45.67]
records = [(t,d,i,p) for t,d,i,p in zip(txnids, dates, items, prices)]

2. Enter the following command to create a list of column names:

col_names = ['txnid', 'Date', 'Item', 'Price']

3. Enter the following command to create a DataFrame:

df = sqlContext.createDataFrame(records, col_names)

If you print the DataFrame’s schema and its data, you will see these outputs:

root
 |-- txnid: long (nullable = true)
 |-- Date: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Price: double (nullable = true

+-----+----------+----+------+
|txnid|      Date|Item| Price|
+-----+----------+----+------+
|    1|2020-02-03|   A|123.99|
|    2|2020-02-10|   B|   3.5|
|    3|2020-02-17|   C| 45.67|
+-----+----------+----+------+

You can apply date-time related transformations using the functions from the pyspark.sql.functions module.

4. Enter the following commands:

from pyspark.sql.functions import date_format
df2 = df.select (df.txnid, df.Item, date_format('Date', 'MM-dd-yyyy').alias('usdate'))

This command is functionally equivalent to the INSERT INTO …. SELECT … FROM SQL command data transfer idiom. We also apply the date formatting function date_format().

The df2 DataFrame has the following data:

+-----+----+----------+
|txnid|Item|    usdate|
+-----+----+----------+
|    1|   A|02-03-2020|
|    2|   B|02-10-2020|
|    3|   C|02-17-2020|
+-----+----+----------+

 

Leave a Reply

Your email address will not be published. Required fields are marked *