From "Markovitz, Dudu" <>
Subject RE: Querying Hive tables from Spark
Date Mon, 27 Jun 2016 14:50:33 GMT
Hi Mich

I could not figure out what is the point you are trying to make.
Could you please clarify?



From: Mich Talebzadeh []
Sent: Monday, June 27, 2016 12:20 PM
To: user @spark <>; user <>
Subject: Querying Hive tables from Spark


I have done some extensive tests with Spark querying Hive tables.

It appears to me that Spark does not rely on statistics that are collected by Hive on say
ORC tables. It seems that Spark uses its own optimization to query the Hive tables irrespective
of Hive has collected by way of statistics etc?

Case in point I have a FACT table bucketed on 5 dimensional foreign keys like below

 CREATE TABLE IF NOT EXISTS oraclehadoop.sales2
  PROD_ID        bigint                       ,
  CUST_ID        bigint                       ,
  TIME_ID        timestamp                    ,
  CHANNEL_ID     bigint                       ,
  PROMO_ID       bigint                       ,
  QUANTITY_SOLD  decimal(10)                  ,
  AMOUNT_SOLD    decimal(10)
TBLPROPERTIES ( "orc.compress"="SNAPPY",

Table is sorted in the order of prod_id, cust_id,time_id, channel_id and promo_id. It has
22 million rows.

A simple query like below:

val s = HiveContext.table("sales2")
  s.filter($"prod_id" ===13 && $"cust_id" === 50833 && $"time_id" === "2000-12-26
00:00:00" && $"channel_id" === 2 && $"promo_id" === 999 ).explain
  s.filter($"prod_id" ===13 && $"cust_id" === 50833 && $"time_id" === "2000-12-26
00:00:00" && $"channel_id" === 2 && $"promo_id" === 999 ).collect.foreach(println)

Shows the plan as

== Physical Plan ==
Filter (((((prod_id#10L = 13) && (cust_id#11L = 50833)) && (time_id#12 = 977788800000000))
&& (channel_id#13L = 2)) && (promo_id#14L = 999))
+- HiveTableScan [prod_id#10L,cust_id#11L,time_id#12,channel_id#13L,promo_id#14L,quantity_sold#15,amount_sold#16],
MetastoreRelation oraclehadoop, sales2, None

Spark returns 24 rows pretty fast in 22 seconds.

Running the same on Hive with Spark as execution engine shows:

  Stage-0 is a root stage
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
          alias: sales2
          Filter Operator
            predicate: (((((prod_id = 13) and (cust_id = 50833)) and (UDFToString(time_id)
= '2000-12-26 00:00:00')) and (channel_id = 2)) and (promo_id = 999)) (type: boolean)
            Select Operator
              expressions: 13 (type: bigint), 50833 (type: bigint), 2000-12-26 00:00:00.0
(type: timestamp), 2 (type: bigint), 999 (type: bigint), quantity_sold (type: decimal(10,0)),
amount_sold (type: decimal(10,0))
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6

And Hive on Spark returns the same 24 rows in 30 seconds

Ok Hive query is just slower with Spark engine.

Assuming that the time taken will be optimization time + query time then it appears that in
most cases the optimization time does not really make that impact on the overall performance?

Let me know your thoughts.


Dr Mich Talebzadeh


