spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Tootoonchian <...@levyx.com>
Subject Cache Shuffle Based Operation Before Sort
Date Mon, 25 Apr 2016 19:50:27 GMT
Caching shuffle RDD before the sort process improves system performance. SQL
planner can be intelligent to cache join, aggregate or sort data frame
before executing next sort process.

For any sort process two job is created by spark, first one is responsible
for producing range boundary for shuffle partition and second one complete
sort process by creating a new shuffle RDD.

When an input of sort process is output of other shuffle process then reduce
part of shuffle RDD is re-evaluated and the intermediate  shuffle data is
read twice. If input shuffle RDD (exchange based data frame) is saved, sort
process can be completed faster. Remember that Spark saves RDD in parquet
format which usually compressed and its size is smaller than original data.

Let’s look at an example,
The following query is modified version of q3 of TPCH test bench.
tpchQuery = 
       """        
        |select *
        |from
        | customer,
        | orders,
        | lineitem
        |where
        | c_mktsegment = 'MACHINERY'
        | and c_custkey = o_custkey
        | and l_orderkey = o_orderkey
        | and o_orderdate < '1995-03-15'
        | and l_shipdate > '1995-03-15'
        |order by
        | o_orderdate
       """.stripMargin

The query can be executed in one step using current Spark SQL planner. The
other approach for execute this query is two steps. 
    Compute and cache output of join process
    Execute order by command
Following command show how second approach can be implemented

tpchQuery =
      """
        |select *
        |from
        | customer,
        | orders,
        | lineitem
        |where
        | c_mktsegment = 'MACHINERY'
        | and c_custkey = o_custkey
        | and l_orderkey = o_orderkey
        | and o_orderdate < '1995-03-15'
        | and l_shipdate > '1995-03-15'
      """.stripMargin
val joinDf = sqlContext.sql(tpchQuery).cache
val queryRes = joinDf.sort("o_orderdate")

Let’s look at details of execution for 10 and 100 scale factor input


By comparing stage 4, 9, 10 and 15, 20, 21 of two approaches, you can find
out that amount of data is read during sort process can be reduced by factor
2.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Cache-Shuffle-Based-Operation-Before-Sort-tp17331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message