spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiao Li (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-23032) Add a per-query codegenStageId to WholeStageCodegenExec
Date Fri, 26 Jan 2018 00:19:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Xiao Li resolved SPARK-23032.
-----------------------------
       Resolution: Fixed
    Fix Version/s: 2.3.0

> Add a per-query codegenStageId to WholeStageCodegenExec
> -------------------------------------------------------
>
>                 Key: SPARK-23032
>                 URL: https://issues.apache.org/jira/browse/SPARK-23032
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Kris Mok
>            Priority: Major
>             Fix For: 2.3.0
>
>
> Proposing to add a per-query ID to the codegen stages as represented by {{WholeStageCodegenExec}}
operators. This ID will be used in
> * the explain output of the physical plan, and in
> * the generated class name.
> Specifically, this ID will be stable within a query, counting up from 1 in depth-first
post-order for all the {{WholeStageCodegenExec}} inserted into a plan.
> The ID value 0 is reserved for "free-floating" {{WholeStageCodegenExec}} objects, which
may have been created for one-off purposes, e.g. for fallback handling of codegen stages that
failed to codegen the whole stage and wishes to codegen a subset of the children operators.
> Example: for the following query:
> {code:none}
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
> scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 'y).orderBy('x).select('x
+ 1 as 'z, 'y)
> df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint]
> scala> val df2 = spark.range(5)
> df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
> scala> val query = df1.join(df2, 'z === 'id)
> query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field]
> {code}
> The explain output before the change is:
> {code:none}
> scala> query.explain
> == Physical Plan ==
> *SortMergeJoin [z#9L], [id#13L], Inner
> :- *Sort [z#9L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(z#9L, 200)
> :     +- *Project [(x#3L + 1) AS z#9L, y#4L]
> :        +- *Sort [x#3L ASC NULLS FIRST], true, 0
> :           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
> :              +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
> :                 +- *Range (0, 10, step=1, splits=8)
> +- *Sort [id#13L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(id#13L, 200)
>       +- *Range (0, 5, step=1, splits=8)
> {code}
> Note how codegen'd operators are annotated with a prefix {{"*"}}.
> and after this change it'll be:
> {code:none}
> scala> query.explain
> == Physical Plan ==
> *(6) SortMergeJoin [z#9L], [id#13L], Inner
> :- *(3) Sort [z#9L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(z#9L, 200)
> :     +- *(2) Project [(x#3L + 1) AS z#9L, y#4L]
> :        +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0
> :           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
> :              +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
> :                 +- *(1) Range (0, 10, step=1, splits=8)
> +- *(5) Sort [id#13L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(id#13L, 200)
>       +- *(4) Range (0, 5, step=1, splits=8)
> {code}
> Note that the annotated prefix becomes {{"*(id) "}}
> It'll also show up in the name of the generated class, as a suffix in the format of
> {code:none}
> GeneratedClass$GeneratedIterator$id
> {code}
> for example, note how {{GeneratedClass$GeneratedIteratorForCodegenStage3}} and {{GeneratedClass$GeneratedIteratorForCodegenStage6}}
in the following stack trace corresponds to the IDs shown in the explain output above:
> {code:none}
> "Executor task launch worker for task 424@12957" daemon prio=5 tid=0x58 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
> 	  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter$(generated.java:32)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:41)
> 	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(generated.java:42)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:101)
> 	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513)
> 	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
> 	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
> 	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
> 	  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	  at org.apache.spark.scheduler.Task.run(Task.scala:109)
> 	  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> 	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	  at java.lang.Thread.run(Thread.java:748)
> {code}
> Rationale:
> Right now, the codegen from Spark SQL lacks the means to differentiate between a couple
of things:
> 1. It's hard to tell which physical operators are in the same WholeStageCodegen stage.
Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is
only to delineate codegen units.
> There can be adjacent physical operators that are both codegen'd but are in separate
codegen stages. Some of this is due to hacky implementation details, such as the case with
SortMergeJoin and its Sort inputs -- they're hard coded to be split into separate stages although
both are codegen'd.
> When printing out the explain output of the physical plan, you'd only see the codegen'd
physical operators annotated with a preceding star ('*') but would have no way to figure out
if they're in the same stage.
> 2. Performance/error diagnosis
> The generated code has class/method names that are hard to differentiate between queries
or even between codegen stages within the same query. If we use a Java-level profiler to collect
profiles, or if we encounter a Java-level exception with a stack trace in it, it's really
hard to tell which part of a query it's at.
> By introducing a per-query codegen stage ID, we'd at least be able to know which codegen
stage (and in turn, which group of physical operators) was a profile tick or an exception
happened.
> The reason why this proposal uses a per-query ID is because it's stable within a query,
so that multiple runs of the same query will see the same resulting IDs. This both benefits
understandability for users, and also it plays well with the codegen cache in Spark SQL which
uses the generated source code as the key.
> The downside to using per-query IDs as opposed to a per-session or globally incrementing
ID is of course we can't tell apart different query runs with this ID alone. But for now I
believe this is a good enough tradeoff.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message