spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Artur Sukhenko (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19578) Poor pyspark performance
Date Wed, 03 May 2017 10:09:04 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Artur Sukhenko updated SPARK-19578:
-----------------------------------
    Attachment:     (was: pyspark_incorrect_inputsize.png)

> Poor pyspark performance
> ------------------------
>
>                 Key: SPARK-19578
>                 URL: https://issues.apache.org/jira/browse/SPARK-19578
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Web UI
>    Affects Versions: 1.6.1, 1.6.2, 2.0.1
>         Environment: Spark 1.6.2 Hortonworks
> Spark 2.0.1 MapR
> Spark 1.6.1 MapR
>            Reporter: Artur Sukhenko
>         Attachments: reproduce_log, spark_shell_correct_inputsize.png
>
>
> Simple job in pyspark takes 14 minutes to complete.
> The text file used to reproduce contains multiple millions lines of one word "yes"
>  (it might be the cause of poor performance)
> {code}
> var a = sc.textFile("/tmp/yes.txt")
> a.count()
> {code}
> Same code took 33 sec in spark-shell
> Reproduce steps:
> Run this  to generate big file (press Ctrl+C after 5-6 seconds)
> [spark@c6401 ~]$ yes > /tmp/yes.txt
> [spark@c6401 ~]$ ll /tmp/
> -rw-r--r--  1 spark     hadoop  516079616 Feb 13 11:10 yes.txt
> [spark@c6401 ~]$ hadoop fs -copyFromLocal /tmp/yes.txt /tmp/
> [spark@c6401 ~]$ pyspark
> {code}
> Python 2.6.6 (r266:84292, Feb 22 2013, 00:00:18) 
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
> 17/02/13 11:12:36 INFO SparkContext: Running Spark version 1.6.2
> {code}
> >>> a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated
size 341.1 KB, free 341.1 KB)
> 17/02/13 11:12:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory
(estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:12:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43389
(size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:12:58 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
> {code}
> >>> a.count()
> {code}
> 17/02/13 11:13:03 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:13:03 INFO SparkContext: Starting job: count at <stdin>:1
> 17/02/13 11:13:03 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 4 output
partitions
> 17/02/13 11:13:03 INFO DAGScheduler: Final stage: ResultStage 0 (count at <stdin>:1)
> 17/02/13 11:13:03 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at count
at <stdin>:1), which has no missing parents
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated
size 5.7 KB, free 375.1 KB)
> 17/02/13 11:13:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory
(estimated size 3.5 KB, free 378.6 KB)
> 17/02/13 11:13:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43389
(size: 3.5 KB, free: 517.4 MB)
> 17/02/13 11:13:03 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008
> 17/02/13 11:13:03 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (PythonRDD[2]
at count at <stdin>:1)
> 17/02/13 11:13:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:13:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost,
partition 0,ANY, 2149 bytes)
> 17/02/13 11:13:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:13:03 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:13:03 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
> 17/02/13 11:13:03 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
> 17/02/13 11:13:03 INFO deprecation: mapred.task.partition is deprecated. Instead, use
mapreduce.task.partition
> 17/02/13 11:13:03 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
> 17/02/13 11:16:37 INFO PythonRunner: Times: total = 213335, boot = 317, init = 445, finish
= 212573
> 17/02/13 11:16:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2182 bytes result
sent to driver
> 17/02/13 11:16:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost,
partition 1,ANY, 2149 bytes)
> 17/02/13 11:16:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:16:37 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:16:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 213605
ms on localhost (1/4)
> 17/02/13 11:20:05 INFO PythonRunner: Times: total = 208227, boot = -81, init = 122, finish
= 208186
> 17/02/13 11:20:05 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2182 bytes result
sent to driver
> 17/02/13 11:20:05 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost,
partition 2,ANY, 2149 bytes)
> 17/02/13 11:20:05 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:20:05 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:20:05 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 208302
ms on localhost (2/4)
> 17/02/13 11:23:37 INFO PythonRunner: Times: total = 212021, boot = -27, init = 45, finish
= 212003
> 17/02/13 11:23:37 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2182 bytes result
sent to driver
> 17/02/13 11:23:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost,
partition 3,ANY, 2149 bytes)
> 17/02/13 11:23:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:23:37 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:23:37 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 212072
ms on localhost (3/4)
> 17/02/13 11:26:35 INFO PythonRunner: Times: total = 177879, boot = -4, init = 9, finish
= 177874
> 17/02/13 11:26:35 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2182 bytes result
sent to driver
> 17/02/13 11:26:35 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) finished
in 811.885 s
> 17/02/13 11:26:35 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 177937
ms on localhost (4/4)
> 17/02/13 11:26:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed,
from pool 
> 17/02/13 11:26:35 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 812.147354
s
> {code}
> 258039808
> [spark@c6401 ~]$ spark-shell 
> scala> var a = sc.textFile("/tmp/yes.txt")
> {code}
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated
size 341.1 KB, free 341.1 KB)
> 17/02/13 11:32:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory
(estimated size 28.3 KB, free 369.4 KB)
> 17/02/13 11:32:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:44733
(size: 28.3 KB, free: 517.4 MB)
> 17/02/13 11:32:26 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
> a: org.apache.spark.rdd.RDD[String] = /tmp/yes.txt MapPartitionsRDD[1] at textFile at
<console>:21
> {code}
> scala> a.count()
> {code}
> 17/02/13 11:32:45 INFO FileInputFormat: Total input paths to process : 1
> 17/02/13 11:32:46 INFO SparkContext: Starting job: count at <console>:24
> 17/02/13 11:32:46 INFO DAGScheduler: Got job 0 (count at <console>:24) with 4 output
partitions
> 17/02/13 11:32:46 INFO DAGScheduler: Final stage: ResultStage 0 (count at <console>:24)
> 17/02/13 11:32:46 INFO DAGScheduler: Parents of final stage: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Missing parents: List()
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting ResultStage 0 (/tmp/yes.txt MapPartitionsRDD[1]
at textFile at <console>:21), which has no missing parents
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated
size 3.0 KB, free 372.4 KB)
> 17/02/13 11:32:46 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory
(estimated size 1801.0 B, free 374.1 KB)
> 17/02/13 11:32:46 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:44733
(size: 1801.0 B, free: 517.4 MB)
> 17/02/13 11:32:46 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1008
> 17/02/13 11:32:46 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (/tmp/yes.txt
MapPartitionsRDD[1] at textFile at <console>:21)
> 17/02/13 11:32:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
> 17/02/13 11:32:46 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost,
partition 0,ANY, 2149 bytes)
> 17/02/13 11:32:46 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 17/02/13 11:32:46 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:0+134217728
> 17/02/13 11:32:46 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
> 17/02/13 11:32:46 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
> 17/02/13 11:32:46 INFO deprecation: mapred.task.partition is deprecated. Instead, use
mapreduce.task.partition
> 17/02/13 11:32:46 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
> 17/02/13 11:32:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 bytes result
sent to driver
> 17/02/13 11:32:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost,
partition 1,ANY, 2149 bytes)
> 17/02/13 11:32:55 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 17/02/13 11:32:55 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:134217728+134217728
> 17/02/13 11:32:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 8857
ms on localhost (1/4)
> 17/02/13 11:33:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2137 bytes result
sent to driver
> 17/02/13 11:33:02 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost,
partition 2,ANY, 2149 bytes)
> 17/02/13 11:33:02 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
> 17/02/13 11:33:02 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:268435456+134217728
> 17/02/13 11:33:02 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 7928
ms on localhost (2/4)
> 17/02/13 11:33:12 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2137 bytes result
sent to driver
> 17/02/13 11:33:12 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost,
partition 3,ANY, 2149 bytes)
> 17/02/13 11:33:12 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
> 17/02/13 11:33:12 INFO HadoopRDD: Input split: hdfs://c6401.ambari.apache.org:8020/tmp/yes.txt:402653184+113426432
> 17/02/13 11:33:12 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9517
ms on localhost (3/4)
> 17/02/13 11:33:18 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 2137 bytes result
sent to driver
> 17/02/13 11:33:18 INFO DAGScheduler: ResultStage 0 (count at <console>:24) finished
in 32.724 s
> 17/02/13 11:33:18 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 6443
ms on localhost (4/4)
> 17/02/13 11:33:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed,
from pool 
> 17/02/13 11:33:18 INFO DAGScheduler: Job 0 finished: count at <console>:24, took
32.929721 s
> {code}
> res0: Long = 258039808
> Also Input Size metrics in Spark UI is wrong when running pyspark, it says 64.0 KB (hadoop),
however, when running in spark-shell it will show correct info 128.1 MB (hadoop).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message