spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "tomzhu (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22593) submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming
Date Tue, 28 Nov 2017 09:02:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16268408#comment-16268408
] 

tomzhu commented on SPARK-22593:
--------------------------------

yes, u are right, it just waste a little time which it's not important compared to most cases
where the most significant time is the job execution on servers. I haven't test yet, anyway,
it's not a bug. it's neither a question or issue, I do this because it may be better to avoid
this.

> submitMissingTask in DagScheduler will call partitions function many times whch may be
time consuming
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22593
>                 URL: https://issues.apache.org/jira/browse/SPARK-22593
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: tomzhu
>            Priority: Minor
>
> when dagScheduler call submitMissing task, will create tasks and calling stage.rdd.partitions,
it will can many times which may be time-consuming, the code is:
> {quote}
>     val tasks: Seq[Task[_]] = try {
>       val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
>       stage match {
>         case stage: ShuffleMapStage =>
>           stage.pendingPartitions.clear()
>           partitionsToCompute.map { id =>
>             val locs = taskIdToLocations(id)
>             val part = stage.rdd.partitions(id)
>             stage.pendingPartitions += id
>             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
>               taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
>               Option(sc.applicationId), sc.applicationAttemptId)
>           }
>         case stage: ResultStage =>
>           partitionsToCompute.map { id =>
>             val p: Int = stage.partitions(id)
>             val part = stage.rdd.partitions(p)  //here is a little time  consuming.
>             val locs = taskIdToLocations(id)
>             new ResultTask(stage.id, stage.latestInfo.attemptId,
>               taskBinary, part, locs, id, properties, serializedTaskMetrics,
>               Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
>           }
>       }
>     } 
> {quote}
> for example, for a parallelCollectionRdd with 3 slices or partitions, to create task,
the code will call stage.rdd.partitions three times, since stage.rdd.partitions will call
getPartitions, so getPartions will call three times, it is a little time-cousuming. the stage.rdd.partitions
code :
> {quote}  
> final def partitions: Array[Partition] = {
>     checkpointRDD.map(_.partitions).getOrElse {
>       if (partitions_ == null) {
>         partitions_ = getPartitions
>         partitions_.zipWithIndex.foreach { case (partition, index) =>
>           require(partition.index == index,
>             s"partitions($index).partition == ${partition.index}, but it should equal
$index")
>         }
>       }
>       partitions_
>     }
>   }
> {quote}
> it would be better to avoid this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message