spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-22462][SQL] Make rdd-based actions in Dataset trackable in SQL UI
Date Sat, 11 Nov 2017 11:34:34 GMT
Repository: spark
Updated Branches:
  refs/heads/master 223d83ee9 -> 154351e6d


[SPARK-22462][SQL] Make rdd-based actions in Dataset trackable in SQL UI

## What changes were proposed in this pull request?

For the few Dataset actions such as `foreach`, currently no SQL metrics are visible in the
SQL tab of SparkUI. It is because it binds wrongly to Dataset's `QueryExecution`. As the actions
directly evaluate on the RDD which has individual `QueryExecution`, to show correct SQL metrics
on UI, we should bind to RDD's `QueryExecution`.

## How was this patch tested?

Manually test. Screenshot is attached in the PR.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19689 from viirya/SPARK-22462.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/154351e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/154351e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/154351e6

Branch: refs/heads/master
Commit: 154351e6dbd24c4254094477e3f7defcba979b1a
Parents: 223d83e
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Sat Nov 11 12:34:30 2017 +0100
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Sat Nov 11 12:34:30 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    | 27 +++++++++++++++++---
 1 file changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/154351e6/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5eb2aff..1620ab3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2594,7 +2594,7 @@ class Dataset[T] private[sql](
    * @group action
    * @since 1.6.0
    */
-  def foreach(f: T => Unit): Unit = withNewExecutionId {
+  def foreach(f: T => Unit): Unit = withNewRDDExecutionId {
     rdd.foreach(f)
   }
 
@@ -2613,7 +2613,7 @@ class Dataset[T] private[sql](
    * @group action
    * @since 1.6.0
    */
-  def foreachPartition(f: Iterator[T] => Unit): Unit = withNewExecutionId {
+  def foreachPartition(f: Iterator[T] => Unit): Unit = withNewRDDExecutionId {
     rdd.foreachPartition(f)
   }
 
@@ -2851,6 +2851,12 @@ class Dataset[T] private[sql](
    */
   def unpersist(): this.type = unpersist(blocking = false)
 
+  // Represents the `QueryExecution` used to produce the content of the Dataset as an `RDD`.
+  @transient private lazy val rddQueryExecution: QueryExecution = {
+    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+    sparkSession.sessionState.executePlan(deserialized)
+  }
+
   /**
    * Represents the content of the Dataset as an `RDD` of `T`.
    *
@@ -2859,8 +2865,7 @@ class Dataset[T] private[sql](
    */
   lazy val rdd: RDD[T] = {
     val objectType = exprEnc.deserializer.dataType
-    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
-    sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
+    rddQueryExecution.toRdd.mapPartitions { rows =>
       rows.map(_.get(0, objectType).asInstanceOf[T])
     }
   }
@@ -3114,6 +3119,20 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * Wrap an action of the Dataset's RDD to track all Spark jobs in the body so that we can
connect
+   * them with an execution. Before performing the action, the metrics of the executed plan
will be
+   * reset.
+   */
+  private def withNewRDDExecutionId[U](body: => U): U = {
+    SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {
+      rddQueryExecution.executedPlan.foreach { plan =>
+        plan.resetMetrics()
+      }
+      body
+    }
+  }
+
+  /**
    * Wrap a Dataset action to track the QueryExecution and time cost, then report to the
    * user-registered callback functions.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message