spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-15443][SQL] Fix 'explain' for streaming Dataset
Date Thu, 23 Jun 2016 23:05:04 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6cb24de99 -> 05677bb5a


[SPARK-15443][SQL] Fix 'explain' for streaming Dataset

## What changes were proposed in this pull request?

- Fix the `explain` command for streaming Dataset/DataFrame. E.g.,
```
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- 'MapElements <function1>, obj#6: java.lang.String
   +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0,
StringType).toString, StructField(value,StringType,true))), obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]

== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)),
obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)),
obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]

== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
   +- *DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)),
obj#5: org.apache.spark.sql.Row
      +- *Filter <function1>.apply
         +- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
```

- Add `StreamingQuery.explain` to display the last execution plan. E.g.,
```
== Parsed Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)),
obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- Relation[value#12] text

== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)),
obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- Relation[value#12] text

== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
   +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)),
obj#5: org.apache.spark.sql.Row
      +- Filter <function1>.apply
         +- Relation[value#12] text

== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
   +- *DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)),
obj#5: org.apache.spark.sql.Row
      +- *Filter <function1>.apply
         +- *Scan text [value#12] Format: org.apache.spark.sql.execution.datasources.text.TextFileFormat1836ab91,
InputPaths: file:/Users/zsx/stream/a.txt, file:/Users/zsx/stream/b.txt, file:/Users/zsx/stream/c.txt,
PushedFilters: [], ReadSchema: struct<value:string>
```

## How was this patch tested?

The added unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13815 from zsxwing/sdf-explain.

(cherry picked from commit 0e4bdebece892edb126fa443f67c846e44e7367e)
Signed-off-by: Yin Huai <yhuai@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05677bb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05677bb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05677bb5

Branch: refs/heads/branch-2.0
Commit: 05677bb5a1fed91711a0e1be466dbc86d15bbf8e
Parents: 6cb24de
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Thu Jun 23 16:04:16 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Thu Jun 23 16:04:58 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   | 18 +++++++++-
 .../spark/sql/execution/command/commands.scala  | 11 +++++-
 .../streaming/IncrementalExecution.scala        |  1 +
 .../execution/streaming/StreamExecution.scala   | 20 +++++++++++
 .../execution/streaming/StreamingRelation.scala | 14 ++++++++
 .../spark/sql/streaming/StreamingQuery.scala    | 15 ++++++++
 .../sql/streaming/FileStreamSourceSuite.scala   | 36 ++++++++++++++++++++
 .../spark/sql/streaming/StreamSuite.scala       | 29 ++++++++++++++++
 8 files changed, 142 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 8e2f2ed..b619d4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
-import org.apache.spark.sql.execution.streaming.MemoryPlan
+import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation,
StreamingRelation, StreamingRelationExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQuery
 
@@ -307,6 +307,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan]
{
     }
   }
 
+  /**
+   * This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`.
+   * It won't affect the execution, because `StreamingRelation` will be replaced with
+   * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation`
will
+   * be replaced with the real relation using the `Source` in `StreamExecution`.
+   */
+  object StreamingRelationStrategy extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case s: StreamingRelation =>
+        StreamingRelationExec(s.sourceName, s.output) :: Nil
+      case s: StreamingExecutionRelation =>
+        StreamingRelationExec(s.toString, s.output) :: Nil
+      case _ => Nil
+    }
+  }
+
   // Can we automate these 'pass through' operations?
   object BasicOperators extends Strategy {
     def numPartitions: Int = self.numPartitions

http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 38bb6e4..7eaad81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.streaming.IncrementalExecution
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
 
 /**
@@ -98,7 +100,14 @@ case class ExplainCommand(
 
   // Run through the optimizer to generate the physical plan.
   override def run(sparkSession: SparkSession): Seq[Row] = try {
-    val queryExecution = sparkSession.sessionState.executePlan(logicalPlan)
+    val queryExecution =
+      if (logicalPlan.isStreaming) {
+        // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`,
so the
+        // output mode does not matter since there is no `Sink`.
+        new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>",
0)
+      } else {
+        sparkSession.sessionState.executePlan(logicalPlan)
+      }
     val outputString =
       if (codegen) {
         codegenString(queryExecution.executedPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index bc0e443..0ce0055 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -37,6 +37,7 @@ class IncrementalExecution private[sql](
 
   // TODO: make this always part of planning.
   val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+    sparkSession.sessionState.planner.StreamingRelationStrategy +:
     sparkSession.sessionState.experimentalMethods.extraStrategies
 
   // Modified planner with stateful operations.

http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 1428b97..f1af79e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -473,6 +474,25 @@ class StreamExecution(
     }
   }
 
+  /** Expose for tests */
+  def explainInternal(extended: Boolean): String = {
+    if (lastExecution == null) {
+      "N/A"
+    } else {
+      val explain = ExplainCommand(lastExecution.logical, extended = extended)
+      sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect()
+        .map(_.getString(0)).mkString("\n")
+    }
+  }
+
+  override def explain(extended: Boolean): Unit = {
+    // scalastyle:off println
+    println(explainInternal(extended))
+    // scalastyle:on println
+  }
+
+  override def explain(): Unit = explain(extended = false)
+
   override def toString: String = {
     s"Streaming Query - $name [state = $state]"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 4d65d2f..e8b0009 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.DataSource
 
 object StreamingRelation {
@@ -50,6 +53,17 @@ case class StreamingExecutionRelation(source: Source, output: Seq[Attribute])
ex
   override def toString: String = source.toString
 }
 
+/**
+ * A dummy physical plan for [[StreamingRelation]] to support
+ * [[org.apache.spark.sql.Dataset.explain]]
+ */
+case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) extends LeafExecNode
{
+  override def toString: String = sourceName
+  override protected def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException("StreamingRelationExec cannot be executed")
+  }
+}
+
 object StreamingExecutionRelation {
   def apply(source: Source): StreamingExecutionRelation = {
     StreamingExecutionRelation(source, source.schema.toAttributes)

http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index dc81a5b..19d1ecf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -107,6 +107,7 @@ trait StreamingQuery {
    * method may block forever. Additionally, this method is only guaranteed to block until
data that
    * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]]
    * prior to invocation. (i.e. `getOffset` must immediately reflect the addition).
+   * @since 2.0.0
    */
   def processAllAvailable(): Unit
 
@@ -116,4 +117,18 @@ trait StreamingQuery {
    * @since 2.0.0
    */
   def stop(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   * @since 2.0.0
+   */
+  def explain(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   *
+   * @param extended whether to do extended explain or not
+   * @since 2.0.0
+   */
+  def explain(extended: Boolean): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6971f93..0eade71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -592,6 +592,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       )
     }
   }
+
+  test("explain") {
+    withTempDirs { case (src, tmp) =>
+      src.mkdirs()
+
+      val df = spark.readStream.format("text").load(src.getCanonicalPath).map(_ + "-x")
+      // Test `explain` not throwing errors
+      df.explain()
+
+      val q = df.writeStream.queryName("file_explain").format("memory").start()
+        .asInstanceOf[StreamExecution]
+      try {
+        assert("N/A" === q.explainInternal(false))
+        assert("N/A" === q.explainInternal(true))
+
+        val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+        val finalFile = new File(src, tempFile.getName)
+        require(stringToFile(tempFile, "foo").renameTo(finalFile))
+
+        q.processAllAvailable()
+
+        val explainWithoutExtended = q.explainInternal(false)
+        // `extended = false` only displays the physical plan.
+        assert("Relation.*text".r.findAllMatchIn(explainWithoutExtended).size === 0)
+        assert("TextFileFormat".r.findAllMatchIn(explainWithoutExtended).size === 1)
+
+        val explainWithExtended = q.explainInternal(true)
+        // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1
physical
+        // plan.
+        assert("Relation.*text".r.findAllMatchIn(explainWithExtended).size === 3)
+        assert("TextFileFormat".r.findAllMatchIn(explainWithExtended).size === 1)
+      } finally {
+        q.stop()
+      }
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/05677bb5/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b8e40e7..c4a894b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -242,6 +242,35 @@ class StreamSuite extends StreamTest {
     val o2 = OutputMode.Complete
     assert(o2 === InternalOutputModes.Complete)
   }
+
+  test("explain") {
+    val inputData = MemoryStream[String]
+    val df = inputData.toDS().map(_ + "foo")
+    // Test `explain` not throwing errors
+    df.explain()
+    val q = df.writeStream.queryName("memory_explain").format("memory").start()
+      .asInstanceOf[StreamExecution]
+    try {
+      assert("N/A" === q.explainInternal(false))
+      assert("N/A" === q.explainInternal(true))
+
+      inputData.addData("abc")
+      q.processAllAvailable()
+
+      val explainWithoutExtended = q.explainInternal(false)
+      // `extended = false` only displays the physical plan.
+      assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0)
+      assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1)
+
+      val explainWithExtended = q.explainInternal(true)
+      // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
+      // plan.
+      assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3)
+      assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1)
+    } finally {
+      q.stop()
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message