spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-23223][SQL] Make stacking dataset transforms more performant
Date Mon, 29 Jan 2018 17:00:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master fbce2ed0f -> 2d903cf9d


[SPARK-23223][SQL] Make stacking dataset transforms more performant

## What changes were proposed in this pull request?
It is a common pattern to apply multiple transforms to a `Dataset` (using `Dataset.withColumn`
for example. This is currently quite expensive because we run `CheckAnalysis` on the full
plan and create an encoder for each intermediate `Dataset`.

This PR extends the usage of the `AnalysisBarrier` to include `CheckAnalysis`. By doing this
we hide the already analyzed plan  from `CheckAnalysis` because barrier is a `LeafNode`. The
`AnalysisBarrier` is in the `FinishAnalysis` phase of the optimizer.

We also make binding the `Dataset` encoder lazy. The bound encoder is only needed when we
materialize the dataset.

## How was this patch tested?
Existing test should cover this.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #20402 from hvanhovell/SPARK-23223.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d903cf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d903cf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d903cf9

Branch: refs/heads/master
Commit: 2d903cf9d3a827e54217dfc9f1e4be99d8204387
Parents: fbce2ed
Author: Herman van Hovell <hvanhovell@databricks.com>
Authored: Mon Jan 29 09:00:54 2018 -0800
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Mon Jan 29 09:00:54 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala      | 16 ++++++++++++++--
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala |  1 +
 .../spark/sql/catalyst/analysis/AnalysisTest.scala  |  3 +--
 .../main/scala/org/apache/spark/sql/Dataset.scala   |  8 ++++++--
 .../apache/spark/sql/execution/QueryExecution.scala | 16 ++--------------
 .../org/apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 6 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2d903cf9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2b14c82..91cb036 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -98,6 +98,19 @@ class Analyzer(
     this(catalog, conf, conf.optimizerMaxIterations)
   }
 
+  def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
+    val analyzed = execute(plan)
+    try {
+      checkAnalysis(analyzed)
+      EliminateBarriers(analyzed)
+    } catch {
+      case e: AnalysisException =>
+        val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
+        ae.setStackTrace(e.getStackTrace)
+        throw ae
+    }
+  }
+
   override def execute(plan: LogicalPlan): LogicalPlan = {
     AnalysisContext.reset()
     try {
@@ -178,8 +191,7 @@ class Analyzer(
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
-      CleanupAliases,
-      EliminateBarriers)
+      CleanupAliases)
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2d903cf9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index ef91d79..90bda2a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -356,6 +356,7 @@ trait CheckAnalysis extends PredicateHelper {
     }
     extendedCheckRules.foreach(_(plan))
     plan.foreachUp {
+      case AnalysisBarrier(child) if !child.resolved => checkAnalysis(child)
       case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
       case _ =>
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2d903cf9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 549a435..3d7c918 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -54,8 +54,7 @@ trait AnalysisTest extends PlanTest {
       expectedPlan: LogicalPlan,
       caseSensitive: Boolean = true): Unit = {
     val analyzer = getAnalyzer(caseSensitive)
-    val actualPlan = analyzer.execute(inputPlan)
-    analyzer.checkAnalysis(actualPlan)
+    val actualPlan = analyzer.executeAndCheck(inputPlan)
     comparePlans(actualPlan, expectedPlan)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2d903cf9/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index edb6644..cc5b647 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -62,7 +62,11 @@ import org.apache.spark.util.Utils
 
 private[sql] object Dataset {
   def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T]
= {
-    new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
+    val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
+    // Eagerly bind the encoder so we verify that the encoder matches the underlying
+    // schema. The user will get an error if this is not the case.
+    dataset.deserializer
+    dataset
   }
 
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
@@ -204,7 +208,7 @@ class Dataset[T] private[sql](
 
   // The deserializer expression which can be used to build a projection and turn rows to
objects
   // of type T, after collecting rows to the driver side.
-  private val deserializer =
+  private lazy val deserializer =
     exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).deserializer
 
   private implicit def classTag = exprEnc.clsTag

http://git-wip-us.apache.org/repos/asf/spark/blob/2d903cf9/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 8bfe3ef..7cae24b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -44,19 +44,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan)
{
   // TODO: Move the planner an optimizer into here from SessionState.
   protected def planner = sparkSession.sessionState.planner
 
-  def assertAnalyzed(): Unit = {
-    // Analyzer is invoked outside the try block to avoid calling it again from within the
-    // catch block below.
-    analyzed
-    try {
-      sparkSession.sessionState.analyzer.checkAnalysis(analyzed)
-    } catch {
-      case e: AnalysisException =>
-        val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
-        ae.setStackTrace(e.getStackTrace)
-        throw ae
-    }
-  }
+  def assertAnalyzed(): Unit = analyzed
 
   def assertSupported(): Unit = {
     if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
@@ -66,7 +54,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan)
{
 
   lazy val analyzed: LogicalPlan = {
     SparkSession.setActiveSession(sparkSession)
-    sparkSession.sessionState.analyzer.execute(logical)
+    sparkSession.sessionState.analyzer.executeAndCheck(logical)
   }
 
   lazy val withCachedData: LogicalPlan = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2d903cf9/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 7287e20..59708e7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -575,7 +575,7 @@ private[hive] class TestHiveQueryExecution(
     logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
     referencedTestTables.foreach(sparkSession.loadTestTable)
     // Proceed with analysis.
-    sparkSession.sessionState.analyzer.execute(logical)
+    sparkSession.sessionState.analyzer.executeAndCheck(logical)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message