spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hao Ren (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-8102) Big performance difference when joining 3 tables in different order
Date Mon, 08 Jun 2015 13:41:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14577106#comment-14577106
] 

Hao Ren edited comment on SPARK-8102 at 6/8/15 1:40 PM:
--------------------------------------------------------

Here are the physical plan for the two queries:

Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18], BuildRight
  Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
   Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
    CartesianProduct
     Project [categoryName#0,refCategoryID#3]
      PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5]
at mapPartitions at SQLContext.scala:439
     Project [regionName#10,regionCode#9]
      PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
  Exchange (HashPartitioning [category#17,region#18], 12)
   Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
    PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}

Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [region#18], [regionCode#9], BuildRight
  Exchange (HashPartitioning [region#18], 12)
   Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
    ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
     Exchange (HashPartitioning [refCategoryID#3], 12)
      Project [categoryName#0,refCategoryID#3]
       PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5]
at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [category#17], 12)
      Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [regionCode#9], 12)
   Project [regionName#10,regionCode#9]
    PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}

And they are different.

We find `CartesianProduct` in Query 1 's physical plan. that might be why it is slower than
the second one. As Spark is based on hive 0.13.+ in which implicit join notation, like the
2 queries above, is supported. It would be better that Spark can choose the best execution
plan. Users have no idea why performance is improved by just permuting two tables.

Just by curiosity, I tested a third query which has the same physical plan as the second one,
but it takes much more time (277 s) without `CartesianProduct`:

Query 3
{code}
SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
FROM t_zipcode z, click_meter_site_grouped g, t_category c
WHERE c.refCategoryID = g.category AND z.regionCode = g.region
{code}
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [category#17], [refCategoryID#3], BuildRight
  Exchange (HashPartitioning [category#17], 12)
   Project [regionName#10,list_id#16L,period#20L,category#17,action#15]
    ShuffledHashJoin [regionCode#9], [region#18], BuildRight
     Exchange (HashPartitioning [regionCode#9], 12)
      Project [regionName#10,regionCode#9]
       PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [region#18], 12)
      Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [refCategoryID#3], 12)
   Project [categoryName#0,refCategoryID#3]
    PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5]
at mapPartitions at SQLContext.scala:439
{code}


was (Author: invkrh):
Here are the physical plan for the two queries:

Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18], BuildRight
  Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
   Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
    CartesianProduct
     Project [categoryName#0,refCategoryID#3]
      PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5]
at mapPartitions at SQLContext.scala:439
     Project [regionName#10,regionCode#9]
      PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
  Exchange (HashPartitioning [category#17,region#18], 12)
   Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
    PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}

Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [region#18], [regionCode#9], BuildRight
  Exchange (HashPartitioning [region#18], 12)
   Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
    ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
     Exchange (HashPartitioning [refCategoryID#3], 12)
      Project [categoryName#0,refCategoryID#3]
       PhysicalRDD [categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], MapPartitionsRDD[5]
at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [category#17], 12)
      Project [timestamp_sec#13L AS period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD [syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [regionCode#9], 12)
   Project [regionName#10,regionCode#9]
    PhysicalRDD [cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}

And they are different.

We find `CartesianProduct` in Query 1 's physical plan. that might be why it is slower than
the second one. As Spark is based on hive 0.13.+ in which implicit join notation, like the
2 queries above, is supported. It would be better that Spark can choose the best execution
plan. Users have no idea why performance is improved by just permuting two tables.

> Big performance difference when joining 3 tables in different order
> -------------------------------------------------------------------
>
>                 Key: SPARK-8102
>                 URL: https://issues.apache.org/jira/browse/SPARK-8102
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.3.1
>         Environment: spark in local mode
>            Reporter: Hao Ren
>
> Given 3 tables loaded from CSV files: 
> ( tables name => size)
> *click_meter_site_grouped* =>10 687 455 bytes
> *t_zipcode* => 2 738 954 bytes
> *t_category* => 2 182 bytes
> When joining the 3 tables, I notice a large performance difference if they are joined
in different order.
> Here are the SQL queries to compare:
> {code}
> -- snippet 1
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, t_zipcode z, click_meter_site_grouped g
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> {code}
> -- snippet 2
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, click_meter_site_grouped g, t_zipcode z
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> As you see, the largest table *click_meter_site_grouped* is the last table in FROM clause
in the first snippet,  and it is in the middle of table list in second one.
> Snippet 2 runs three times faster than Snippet 1.
> (8 seconds VS 24 seconds)
> As the data is just sampled from a large data set, if we test it on the original data
set, it will normally result in a performance issue.
> After checking the log, we found something strange In snippet 1's log:
> 15/06/04 15:32:03 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split: file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> It seems that *t_zipcode* is loaded 56 times !!! And, for snippet 2, everything is fine,
all the three tables are loaded only once.
> Knowing that SparkSQL's join can automatically broadcast table in join when its size
is below *autoBroadcastJoinThreshold*. Not sure if the over-load is caused by auto broadcast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message