hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Markovitz, Dudu" <dmarkov...@paypal.com>
Subject RE: Using Spark on Hive with Hive also using Spark as its execution engine
Date Tue, 12 Jul 2016 09:35:01 GMT
The principals are very clear and if our use-case was a complex one, combined from many stages
I would expect performance benefits from the Spark engine.
Since our use-case is a simple one and most of the work here is just reading the files, I
don’t see how we can explain the performance differences unless the data was already cached
in the Spark test.
Clearly, we’re missing something.

Dudu

From: Mich Talebzadeh [mailto:mich.talebzadeh@gmail.com]
Sent: Tuesday, July 12, 2016 12:16 PM
To: user <user@hive.apache.org>
Cc: user @spark <user@spark.apache.org>
Subject: Re: Using Spark on Hive with Hive also using Spark as its execution engine

That is only a plan not what execution engine is doing.

As I stated before Spark uses DAG + in-memory computing. MR is serial on disk.

The key is the execution here or rather the execution engine.

In general

The standard MapReduce  as I know reads the data from HDFS, apply map-reduce algorithm and
writes back to HDFS. If there are many iterations of map-reduce then, there will be many intermediate
writes to HDFS. This is all serial writes to disk. Each map-reduce step is completely independent
of other steps, and the executing engine does not have any global knowledge of what map-reduce
steps are going to come after each map-reduce step. For many iterative algorithms this is
inefficient as the data between each map-reduce pair gets written and read from the file system.

The equivalent to parallelism in Big Data is deploying what is known as Directed Acyclic Graph
(DAG<https://en.wikipedia.org/wiki/Directed_acyclic_graph>) algorithm. In a nutshell
deploying DAG results in a fuller picture of global optimisation by deploying parallelism,
pipelining consecutive map steps into one and not writing intermediate data to HDFS. So in
short this prevents writing data back and forth after every reduce step which for me is a
significant improvement, compared to the classical MapReduce algorithm.

Now Tez is basically MR with DAG. With Spark you get DAG + in-memory computing. Think of it
as a comparison between a classic RDBMS like Oracle and IMDB like Oracle TimesTen with in-memory
processing.

The outcome is that Hive using Spark as execution engine is pretty impressive. You have the
advantage of Hive CBO + In-memory computing. If you use Spark for all this (say Spark SQL)
but no Hive, Spark uses its own optimizer called Catalyst that does not have CBO yet plus
in memory computing.

As usual your mileage varies.

HTH



Dr Mich Talebzadeh



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content
is explicitly disclaimed. The author will in no case be liable for any monetary damages arising
from such loss, damage or destruction.



On 12 July 2016 at 09:33, Markovitz, Dudu <dmarkovitz@paypal.com<mailto:dmarkovitz@paypal.com>>
wrote:
I don’t see how this explains the time differences.

Dudu

From: Mich Talebzadeh [mailto:mich.talebzadeh@gmail.com<mailto:mich.talebzadeh@gmail.com>]
Sent: Tuesday, July 12, 2016 10:56 AM
To: user <user@hive.apache.org<mailto:user@hive.apache.org>>
Cc: user @spark <user@spark.apache.org<mailto:user@spark.apache.org>>

Subject: Re: Using Spark on Hive with Hive also using Spark as its execution engine

This the whole idea. Spark uses DAG + IM, MR is classic


This is for Hive on Spark

hive> explain select max(id) from dummy_parquet;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1
STAGE PLANS:
  Stage: Stage-1
    Spark
      Edges:
        Reducer 2 <- Map 1 (GROUP, 1)
      DagName: hduser_20160712083219_632c2749-7387-478f-972d-9eaadd9932c6:1
      Vertices:
        Map 1
            Map Operator Tree:
                TableScan
                  alias: dummy_parquet
                  Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE
Column stats: NONE
                  Select Operator
                    expressions: id (type: int)
                    outputColumnNames: id
                    Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE
Column stats: NONE
                    Group By Operator
                      aggregations: max(id)
                      mode: hash
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats:
NONE
                      Reduce Output Operator
                        sort order:
                        Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column
stats: NONE
                        value expressions: _col0 (type: int)
        Reducer 2
            Reduce Operator Tree:
              Group By Operator
                aggregations: max(VALUE._col0)
                mode: mergepartial
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats:
NONE
                  table:
                      input format: org.apache.hadoop.mapred.TextInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
Time taken: 2.801 seconds, Fetched: 50 row(s)

And this is with setting the execution engine to MR

hive> set hive.execution.engine=mr;
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider
using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.

hive> explain select max(id) from dummy_parquet;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: dummy_parquet
            Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE Column
stats: NONE
            Select Operator
              expressions: id (type: int)
              outputColumnNames: id
              Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE Column
stats: NONE
              Group By Operator
                aggregations: max(id)
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  sort order:
                  Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats:
NONE
                  value expressions: _col0 (type: int)
      Reduce Operator Tree:
        Group By Operator
          aggregations: max(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
Time taken: 0.1 seconds, Fetched: 44 row(s)


HTH



Dr Mich Talebzadeh



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content
is explicitly disclaimed. The author will in no case be liable for any monetary damages arising
from such loss, damage or destruction.



On 12 July 2016 at 08:16, Markovitz, Dudu <dmarkovitz@paypal.com<mailto:dmarkovitz@paypal.com>>
wrote:
This is a simple task –
Read the files, find the local max value and combine the results (find the global max value).
How do you explain the differences in the results? Spark reads the files and finds a local
max 10X (+) faster than MR?
Can you please attach the execution plan?

Thanks

Dudu



From: Mich Talebzadeh [mailto:mich.talebzadeh@gmail.com<mailto:mich.talebzadeh@gmail.com>]
Sent: Monday, July 11, 2016 11:55 PM
To: user <user@hive.apache.org<mailto:user@hive.apache.org>>; user @spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Using Spark on Hive with Hive also using Spark as its execution engine

In my test I did like for like keeping the systematic the same namely:


  1.  Table was a parquet table of 100 Million rows
  2.  The same set up was used for both Hive on Spark and Hive on MR
  3.  Spark was very impressive compared to MR on this particular test.

Just to see any issues I created an ORC table in in the image of Parquet (insert/select from
Parquet to ORC) with stats updated for columns etc

These were the results of the same run using ORC table this time:

hive> select max(id) from oraclehadoop.dummy;

Starting Spark Job = b886b869-5500-4ef7-aab9-ae6fb4dad22b
Query Hive on Spark job[1] stages:
2
3
Status: Running (Hive on Spark job[1])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
[StageCost]
2016-07-11 21:35:45,020 Stage-2_0: 0(+8)/23     Stage-3_0: 0/1
2016-07-11 21:35:48,033 Stage-2_0: 0(+8)/23     Stage-3_0: 0/1
2016-07-11 21:35:51,046 Stage-2_0: 1(+8)/23     Stage-3_0: 0/1
2016-07-11 21:35:52,050 Stage-2_0: 3(+8)/23     Stage-3_0: 0/1
2016-07-11 21:35:53,055 Stage-2_0: 8(+4)/23     Stage-3_0: 0/1
2016-07-11 21:35:54,060 Stage-2_0: 11(+1)/23    Stage-3_0: 0/1
2016-07-11 21:35:55,065 Stage-2_0: 12(+0)/23    Stage-3_0: 0/1
2016-07-11 21:35:56,071 Stage-2_0: 12(+8)/23    Stage-3_0: 0/1
2016-07-11 21:35:57,076 Stage-2_0: 13(+8)/23    Stage-3_0: 0/1
2016-07-11 21:35:58,081 Stage-2_0: 20(+3)/23    Stage-3_0: 0/1
2016-07-11 21:35:59,085 Stage-2_0: 23/23 Finished       Stage-3_0: 0(+1)/1
2016-07-11 21:36:00,089 Stage-2_0: 23/23 Finished       Stage-3_0: 1/1 Finished
Status: Finished successfully in 16.08 seconds
OK
100000000
Time taken: 17.775 seconds, Fetched: 1 row(s)

Repeat with MR engine

hive> set hive.execution.engine=mr;
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider
using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.

hive> select max(id) from oraclehadoop.dummy;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions.
Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = hduser_20160711213100_8dc2afae-8644-4097-ba33-c7bd3c304bf8
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1468226887011_0008, Tracking URL = http://rhes564:8088/proxy/application_1468226887011_0008/
Kill Command = /home/hduser/hadoop-2.6.0/bin/hadoop job  -kill job_1468226887011_0008
Hadoop job information for Stage-1: number of mappers: 23; number of reducers: 1
2016-07-11 21:37:00,061 Stage-1 map = 0%,  reduce = 0%
2016-07-11 21:37:06,440 Stage-1 map = 4%,  reduce = 0%, Cumulative CPU 16.48 sec
2016-07-11 21:37:14,751 Stage-1 map = 9%,  reduce = 0%, Cumulative CPU 40.63 sec
2016-07-11 21:37:22,048 Stage-1 map = 13%,  reduce = 0%, Cumulative CPU 58.88 sec
2016-07-11 21:37:30,412 Stage-1 map = 17%,  reduce = 0%, Cumulative CPU 80.72 sec
2016-07-11 21:37:37,707 Stage-1 map = 22%,  reduce = 0%, Cumulative CPU 103.43 sec
2016-07-11 21:37:45,999 Stage-1 map = 26%,  reduce = 0%, Cumulative CPU 125.93 sec
2016-07-11 21:37:54,300 Stage-1 map = 30%,  reduce = 0%, Cumulative CPU 147.17 sec
2016-07-11 21:38:01,538 Stage-1 map = 35%,  reduce = 0%, Cumulative CPU 166.56 sec
2016-07-11 21:38:08,807 Stage-1 map = 39%,  reduce = 0%, Cumulative CPU 189.29 sec
2016-07-11 21:38:17,115 Stage-1 map = 43%,  reduce = 0%, Cumulative CPU 211.03 sec
2016-07-11 21:38:24,363 Stage-1 map = 48%,  reduce = 0%, Cumulative CPU 235.68 sec
2016-07-11 21:38:32,638 Stage-1 map = 52%,  reduce = 0%, Cumulative CPU 258.27 sec
2016-07-11 21:38:40,916 Stage-1 map = 57%,  reduce = 0%, Cumulative CPU 278.44 sec
2016-07-11 21:38:49,206 Stage-1 map = 61%,  reduce = 0%, Cumulative CPU 300.35 sec
2016-07-11 21:38:58,524 Stage-1 map = 65%,  reduce = 0%, Cumulative CPU 322.89 sec
2016-07-11 21:39:07,889 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU 344.8 sec
2016-07-11 21:39:16,151 Stage-1 map = 74%,  reduce = 0%, Cumulative CPU 367.77 sec
2016-07-11 21:39:25,456 Stage-1 map = 78%,  reduce = 0%, Cumulative CPU 391.82 sec
2016-07-11 21:39:33,725 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU 415.48 sec
2016-07-11 21:39:43,037 Stage-1 map = 87%,  reduce = 0%, Cumulative CPU 436.09 sec
2016-07-11 21:39:51,292 Stage-1 map = 91%,  reduce = 0%, Cumulative CPU 459.4 sec
2016-07-11 21:39:59,563 Stage-1 map = 96%,  reduce = 0%, Cumulative CPU 477.92 sec
2016-07-11 21:40:05,760 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 491.72 sec
2016-07-11 21:40:10,921 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 499.37 sec
MapReduce Total cumulative CPU time: 8 minutes 19 seconds 370 msec
Ended Job = job_1468226887011_0008
MapReduce Jobs Launched:
Stage-Stage-1: Map: 23  Reduce: 1   Cumulative CPU: 499.37 sec   HDFS Read: 403754774 HDFS
Write: 10 SUCCESS
Total MapReduce CPU Time Spent: 8 minutes 19 seconds 370 msec
OK
100000000
Time taken: 202.333 seconds, Fetched: 1 row(s)

So in summary

Table             MR/sec                 Spark/sec
Parquet           239.532                14.38
ORC               202.333                17.77

 Still I would use Spark if I had a choice and I agree that on VLT (very large tables), the
limitation in available memory may be the overriding factor in using Spark.

HTH



Dr Mich Talebzadeh



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content
is explicitly disclaimed. The author will in no case be liable for any monetary damages arising
from such loss, damage or destruction.



On 11 July 2016 at 19:25, Gopal Vijayaraghavan <gopalv@apache.org<mailto:gopalv@apache.org>>
wrote:

> Status: Finished successfully in 14.12 seconds
> OK
> 100000000
> Time taken: 14.38 seconds, Fetched: 1 row(s)

That might be an improvement over MR, but that still feels far too slow.


Parquet numbers are in general bad in Hive, but that's because the Parquet
reader gets no actual love from the devs. The community, if it wants to
keep using Parquet heavily needs a Hive dev to go over to Parquet-mr and
cut a significant number of memory copies out of the reader.

The Spark 2.0 build for instance, has a custom Parquet reader for SparkSQL
which does this. SPARK-12854 does for Spark+Parquet what Hive 2.0 does for
ORC (actually, it looks more like hive's VectorizedRowBatch than
Tungsten's flat layouts).

But that reader cannot be used in Hive-on-Spark, because it is not a
public reader impl.


Not to pick an arbitrary dataset, my workhorse example is a TPC-H lineitem
at 10Gb scale with a single 16 box.

hive(tpch_flat_orc_10)> select max(l_discount) from lineitem;
Query ID = gopal_20160711175917_f96371aa-2721-49c8-99a0-f7c4a1eacfda
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id
application_1466700718395_0256)

---------------------------------------------------------------------------
-------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING
PENDING  FAILED  KILLED
---------------------------------------------------------------------------
-------------------
Map 1 ..........      llap     SUCCEEDED     13         13        0
0       0       0
Reducer 2 ......      llap     SUCCEEDED      1          1        0
0       0       0
---------------------------------------------------------------------------
-------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 0.71 s

---------------------------------------------------------------------------
-------------------
Status: DAG finished successfully in 0.71 seconds

Query Execution Summary
---------------------------------------------------------------------------
-------------------
OPERATION                            DURATION
---------------------------------------------------------------------------
-------------------
Compile Query                           0.21s
Prepare Plan                            0.13s
Submit Plan                             0.34s
Start DAG                               0.23s
Run DAG                                 0.71s
---------------------------------------------------------------------------
-------------------

Task Execution Summary
---------------------------------------------------------------------------
-------------------
  VERTICES   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS
OUTPUT_RECORDS
---------------------------------------------------------------------------
-------------------
     Map 1         604.00             0            0     59,957,438
      13
 Reducer 2         105.00             0            0             13
       0
---------------------------------------------------------------------------
-------------------

LLAP IO Summary
---------------------------------------------------------------------------
-------------------
  VERTICES ROWGROUPS  META_HIT  META_MISS  DATA_HIT  DATA_MISS  ALLOCATION
    USED  TOTAL_IO
---------------------------------------------------------------------------
-------------------
     Map 1      6036         0        146        0B    68.86MB    491.00MB
479.89MB     7.94s
---------------------------------------------------------------------------
-------------------

OK
0.1
Time taken: 1.669 seconds, Fetched: 1 row(s)
hive(tpch_flat_orc_10)>


This is running against a single 16 core box & I would assume it would
take <1.4s to read twice as much (13 tasks is barely touching the load
factors).

It would probably be a bit faster if the cache had hits, but in general
14s to read a 100M rows is nearly a magnitude off where Hive 2.2.0 is.

Cheers,
Gopal











Mime
View raw message