hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@cloudtechnologypartners.co.uk>
Subject Re: Using Spark functional programming rather than SQL, Spark on Hive tables
Date Wed, 24 Feb 2016 16:20:22 GMT
 

HI, 

TOOLS 

SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 

OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND
RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON
HIVE TABLES 

UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 

The main differences in timings come from running the queries and
fetching data. If you look the transformation part that is 

val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))


Takes I second. On the other hand using SQL the query 1 takes 19 seconds
compared to just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes. 

These are my assumptions. 

 	* Running SQL the full query is executed in Hive which means that Hive
can take advantage of ORC optimization/storage index etc?
 	* Running FP requires that data is fetched from the underlying tables
in Hive and brought back to Spark cluster (standalone here) and the
joins etc are done there

The next step for me would be to: 

 	* Look at the query plans in Spark
 	* Run the same code on Hive alone and compare results

Any other suggestions are welcome. 

STANDARD SQL CODE 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
println ("ncreating data set at "); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc
 , c.channel_desc
 , SUM(s.amount_sold) AS TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 09:00:50.50]
res1: org.apache.spark.sql.DataFrame = [result: string] 

creating data set at [24/02/2016 09:00:53.53]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0) 

First query at [24/02/2016 09:00:54.54]
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query at [24/02/2016 09:01:13.13]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at [24/02/2016 09:01:31.31 

CODE USING FUNCTIONAL PROGRAMMING 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c =
HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t =
HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
println ("ncreating data set at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

RESULTS 

Started at [24/02/2016 08:52:27.27]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string] 

creating data set at [24/02/2016 08:52:30.30]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query at [24/02/2016 08:52:31.31]
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query at [24/02/2016 08:56:17.17]
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[24/02/2016 09:00:14.14] 

On 24/02/2016 06:27, Sabarish Sasidharan wrote: 

> When using SQL your full query, including the joins, were executed in Hive(or RDBMS)
and only the results were brought into the Spark cluster. In the FP case, the data for the
3 tables is first pulled into the Spark cluster and then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab
 
Mime
View raw message