spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code
Date Sat, 11 Jun 2016 04:12:10 GMT
Repository: spark
Updated Branches:
  refs/heads/master 468da03e2 -> 7504bc73f


[SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code

## What changes were proposed in this pull request?

In case of any bugs in whole-stage codegen, the generated code can't be compiled, we should
fallback to non-codegen to make sure that query could run.

The batch mode of new parquet reader depends on codegen, can't be easily switched to non-batch
mode, so we still use codegen for batched scan (for parquet). Because it only support primitive
types and the number of columns is less than spark.sql.codegen.maxFields (100), it should
not fail.

This could be configurable by `spark.sql.codegen.fallback`

## How was this patch tested?

Manual test it with buggy operator, it worked well.

Author: Davies Liu <davies@databricks.com>

Closes #13501 from davies/codegen_fallback.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7504bc73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7504bc73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7504bc73

Branch: refs/heads/master
Commit: 7504bc73f20fe0e6546a019ed91c3fd3804287ba
Parents: 468da03
Author: Davies Liu <davies@databricks.com>
Authored: Fri Jun 10 21:12:06 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Fri Jun 10 21:12:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/ExistingRDD.scala     |  5 ++++-
 .../spark/sql/execution/WholeStageCodegenExec.scala      | 11 ++++++++++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 ++++++++++-
 3 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 9ab98fd1..ee72a70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec(
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this).execute()
   }
 
   override def simpleString: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index e0d8e35..ac4c3aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * An interface for those physical operators that support codegen.
@@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode
with Co
       new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments()))
 
     logDebug(s"\n${CodeFormatter.format(cleanedSource)}")
-    CodeGenerator.compile(cleanedSource)
     (ctx, cleanedSource)
   }
 
   override def doExecute(): RDD[InternalRow] = {
     val (ctx, cleanedSource) = doCodeGen()
+    // try to compile and fallback if it failed
+    try {
+      CodeGenerator.compile(cleanedSource)
+    } catch {
+      case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback
=>
+        // We should already saw the error message
+        logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
+        return child.execute()
+    }
     val references = ctx.references.toArray
 
     val durationMs = longMetric("pipelineTime")

http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 437e093..27b1fff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -433,7 +433,14 @@ object SQLConf {
     .doc("The maximum number of fields (including nested fields) that will be supported before"
+
       " deactivating whole-stage codegen.")
     .intConf
-    .createWithDefault(200)
+    .createWithDefault(100)
+
+  val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback")
+    .internal()
+    .doc("When true, whole stage codegen could be temporary disabled for the part of query
that" +
+      " fail to compile generated code")
+    .booleanConf
+    .createWithDefault(true)
 
   val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches")
     .internal()
@@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with
Logging {
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
 
+  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
+
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message