spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "JESSE CHEN (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-14318) TPCDS query 14 causes Spark SQL to hang
Date Thu, 31 Mar 2016 23:34:25 GMT

    [ https://issues.apache.org/jira/browse/SPARK-14318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15220864#comment-15220864
] 

JESSE CHEN commented on SPARK-14318:
------------------------------------

Q14 is as follows:
{noformat}
with  cross_items as
 (select i_item_sk ss_item_sk
 from item
 JOIN
 (select brand_id, class_id, category_id from
 (select iss.i_brand_id brand_id
     ,iss.i_class_id class_id
     ,iss.i_category_id category_id
 from store_sales
     ,item iss
     ,date_dim d1
 where ss_item_sk = iss.i_item_sk
   and ss_sold_date_sk = d1.d_date_sk
   and d1.d_year between 1999 AND 1999 + 2) x1
 JOIN
 (select ics.i_brand_id
     ,ics.i_class_id
     ,ics.i_category_id
 from catalog_sales
     ,item ics
     ,date_dim d2
 where cs_item_sk = ics.i_item_sk
   and cs_sold_date_sk = d2.d_date_sk
   and d2.d_year between 1999 AND 1999 + 2) x2
   ON x1.brand_id = x2.i_brand_id and
      x1.class_id = x2.i_class_id and
      x1.category_id = x2.i_category_id
 JOIN
 (select iws.i_brand_id
     ,iws.i_class_id
     ,iws.i_category_id
 from web_sales
     ,item iws
     ,date_dim d3
 where ws_item_sk = iws.i_item_sk
   and ws_sold_date_sk = d3.d_date_sk
   and d3.d_year between 1999 AND 1999 + 2) x3
   ON x1.brand_id = x3.i_brand_id and
      x1.class_id = x3.i_class_id and
      x1.category_id = x3.i_category_id
 ) x4
 where i_brand_id = x4.brand_id
      and i_class_id = x4.class_id
      and i_category_id = x4.category_id
),
 avg_sales as
 (select avg(quantity*list_price) average_sales
  from (select ss_quantity quantity
             ,ss_list_price list_price
       from store_sales
           ,date_dim
       where ss_sold_date_sk = d_date_sk
         and d_year between 1999 and 1999 + 2
       union all
       select cs_quantity quantity
             ,cs_list_price list_price
       from catalog_sales
           ,date_dim
       where cs_sold_date_sk = d_date_sk
         and d_year between 1999 and 1999 + 2
       union all
       select ws_quantity quantity
             ,ws_list_price list_price
       from web_sales
           ,date_dim
       where ws_sold_date_sk = d_date_sk
         and d_year between 1999 and 1999 + 2) x)
  select  * from
 (select 'store' channel, i_brand_id,i_class_id,i_category_id
        ,sum(ss1.ss_quantity*ss1.ss_list_price) sales, count(*) number_sales
 from store_sales ss1
     JOIN item ON ss1.ss_item_sk = i_item_sk
     JOIN date_dim dd1 ON ss1.ss_sold_date_sk = dd1.d_date_sk
     JOIN cross_items ON ss1.ss_item_sk = cross_items.ss_item_sk
     JOIN avg_sales
     JOIN date_dim dd2 ON dd1.d_week_seq = dd2.d_week_seq
                     where dd2.d_year = 1999 + 1
                       and dd2.d_moy = 12
                       and dd2.d_dom = 11
 group by average_sales,i_brand_id,i_class_id,i_category_id
 having sum(ss1.ss_quantity*ss1.ss_list_price) > avg_sales.average_sales) this_year,
 (select 'store' channel, i_brand_id,i_class_id
        ,i_category_id, sum(ss1.ss_quantity*ss1.ss_list_price) sales, count(*) number_sales
 from store_sales ss1
     JOIN item ON ss1.ss_item_sk = i_item_sk
     JOIN date_dim dd1 ON ss1.ss_sold_date_sk = dd1.d_date_sk
     JOIN cross_items ON ss1.ss_item_sk = cross_items.ss_item_sk
     JOIN avg_sales
     JOIN date_dim dd2 ON dd1.d_week_seq = dd2.d_week_seq
                     where dd2.d_year = 1999
                       and dd2.d_moy = 12
                       and dd2.d_dom = 11
 group by average_sales, i_brand_id,i_class_id,i_category_id
 having sum(ss1.ss_quantity*ss1.ss_list_price) > avg_sales.average_sales) last_year
 where this_year.i_brand_id= last_year.i_brand_id
   and this_year.i_class_id = last_year.i_class_id
   and this_year.i_category_id = last_year.i_category_id
 order by this_year.channel, this_year.i_brand_id, this_year.i_class_id, this_year.i_category_id
   limit 100
{noformat}



> TPCDS query 14 causes Spark SQL to hang
> ---------------------------------------
>
>                 Key: SPARK-14318
>                 URL: https://issues.apache.org/jira/browse/SPARK-14318
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 2.0.0
>            Reporter: JESSE CHEN
>              Labels: hangs
>
> TPCDS Q14 parses successfully, and plans created successfully. Spark tries to run (I
used only 1GB text file), but "hangs". Tasks are extremely slow to process AND all CPUs are
used 100% by the executor JVMs.
> It is very easy to reproduce:
> 1. Use the spark-sql CLI to run the query 14 (TPCDS) against a database of 1GB text file
(assuming you know how to generate the csv data). My command is like this:
> {noformat}
> /TestAutomation/downloads/spark-master/bin/spark-sql  --driver-memory 10g --verbose --master
yarn-client --packages com.databricks:spark-csv_2.10:1.3.0 --executor-memory 8g --num-executors
4 --executor-cores 4 --conf spark.sql.join.preferSortMergeJoin=true --database hadoopds1g
-f $f > q14.out
> {noformat}
> The Spark console output:
> {noformat}
> 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Starting task 26.0 in stage 17.0 (TID
65, bigaperf138.svl.ibm.com, partition 26,RACK_LOCAL, 4515 bytes)
> 16/03/31 15:45:37 INFO cluster.YarnClientSchedulerBackend: Launching task 65 on executor
id: 4 hostname: bigaperf138.svl.ibm.com.
> 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Finished task 23.0 in stage 17.0 (TID
62) in 829687 ms on bigaperf138.svl.ibm.com (15/200)
> 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Starting task 27.0 in stage 17.0 (TID
66, bigaperf138.svl.ibm.com, partition 27,RACK_LOCAL, 4515 bytes)
> 16/03/31 15:45:52 INFO cluster.YarnClientSchedulerBackend: Launching task 66 on executor
id: 4 hostname: bigaperf138.svl.ibm.com.
> 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Finished task 26.0 in stage 17.0 (TID
65) in 15505 ms on bigaperf138.svl.ibm.com (16/200)
> 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Starting task 28.0 in stage 17.0 (TID
67, bigaperf138.svl.ibm.com, partition 28,RACK_LOCAL, 4515 bytes)
> 16/03/31 15:46:17 INFO cluster.YarnClientSchedulerBackend: Launching task 67 on executor
id: 4 hostname: bigaperf138.svl.ibm.com.
> 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Finished task 27.0 in stage 17.0 (TID
66) in 24929 ms on bigaperf138.svl.ibm.com (17/200)
> 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Starting task 29.0 in stage 17.0 (TID
68, bigaperf137.svl.ibm.com, partition 29,NODE_LOCAL, 4515 bytes)
> 16/03/31 15:51:53 INFO cluster.YarnClientSchedulerBackend: Launching task 68 on executor
id: 2 hostname: bigaperf137.svl.ibm.com.
> 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Finished task 10.0 in stage 17.0 (TID
47) in 3775585 ms on bigaperf137.svl.ibm.com (18/200)
> {noformat}
> Notice that time durations between tasks are unusually long: 2~5 minutes.
> When looking at the Linux 'perf' tool, two top CPU consumers are:
>     86.48%        java  [unknown]   
>     12.41%        libjvm.so
> Using the Java hotspot profiling tools, I am able to show what hotspot methods are (top
5):
> {noformat}
> org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten()	46.845276	9,654,179
ms (46.8%)	9,654,179 ms	9,654,179 ms	9,654,179 ms
> org.apache.spark.unsafe.Platform.copyMemory()	18.631157	3,848,442 ms (18.6%)	3,848,442
ms	3,848,442 ms	3,848,442 ms
> org.apache.spark.util.collection.CompactBuffer.$plus$eq()	6.8570185	1,418,411 ms (6.9%)
1,418,411 ms	1,517,960 ms	1,517,960 ms
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeValue()	4.6126328
955,495 ms (4.6%)	955,495 ms	2,153,910 ms	2,153,910 ms
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write()	4.581077	949,930 ms
(4.6%)	949,930 ms	19,967,510 ms	19,967,510 ms
> {noformat}
> So as you can see, the test has been running for 1.5 hours...with 46% CPU spent in the

> org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() method. 
> The stacks for top two are:
> {noformat}
> Marshalling	
> I
> java/io/DataOutputStream.writeInt() line 197
> org.​apache.​spark.​sql	
> I
> org/apache/spark/sql/execution/UnsafeRowSerializerInstance$$anon$2.writeValue() line
60
> org.​apache.​spark.​storage	
> I
> org/apache/spark/storage/DiskBlockObjectWriter.write() line 185
> org.​apache.​spark.​shuffle	
> I
> org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 150
> org.​apache.​spark.​scheduler	
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
> I
> org/apache/spark/scheduler/Task.run() line 82
> org.​apache.​spark.​executor	
> I
> org/apache/spark/executor/Executor$TaskRunner.run() line 231
> Dispatching Overhead,​ Standard Library Worker Dispatching	
> I
> java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
> I
> java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
> I
> java/lang/Thread.run() line 745
> {noformat}
> and 
> {noformat}
> org.​apache.​spark.​unsafe	
> I
> org/apache/spark/unsafe/Platform.copyMemory() line 172
> org.​apache.​spark.​sql	
> I
> org/apache/spark/sql/catalyst/expressions/UnsafeRow.copy() line 504
> Class Loading	
> I
> org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator.processNext()
> org.​apache.​spark.​sql	
> I
> org/apache/spark/sql/execution/BufferedRowIterator.hasNext() line 41
> I
> org/apache/spark/sql/execution/WholeStageCodegen$$anonfun$doExecute$2$$anon$2.hasNext()
line 375
> scala.​collection	
> I
> scala/collection/Iterator$$anon$11.hasNext() line 369
> org.​apache.​spark.​shuffle	
> I
> org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 147
> org.​apache.​spark.​scheduler	
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78
> I
> org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46
> I
> org/apache/spark/scheduler/Task.run() line 82
> org.​apache.​spark.​executor	
> I
> org/apache/spark/executor/Executor$TaskRunner.run() line 231
> Dispatching Overhead,​ Standard Library Worker Dispatching	
> I
> java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142
> I
> java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617
> I
> java/lang/Thread.run() line 745
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message