spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-21845][SQL][TEST-MAVEN] Make codegen fallback of expressions configurable
Date Tue, 05 Sep 2017 16:04:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master 02a4386ae -> 2974406d1


[SPARK-21845][SQL][TEST-MAVEN] Make codegen fallback of expressions configurable

## What changes were proposed in this pull request?
We should make codegen fallback of expressions configurable. So far, it is always on. We might
hide it when our codegen have compilation bugs. Thus, we should also disable the codegen fallback
when running test cases.

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19119 from gatorsmile/fallbackCodegen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2974406d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2974406d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2974406d

Branch: refs/heads/master
Commit: 2974406d17a3831c1897b8d99261419592f8042f
Parents: 02a4386
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Tue Sep 5 09:04:03 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Tue Sep 5 09:04:03 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/internal/SQLConf.scala   |  6 +++---
 .../org/apache/spark/sql/execution/SparkPlan.scala      | 11 ++++++-----
 .../spark/sql/execution/WholeStageCodegenExec.scala     |  2 +-
 .../org/apache/spark/sql/DataFrameFunctionsSuite.scala  |  2 +-
 .../scala/org/apache/spark/sql/DataFrameSuite.scala     | 12 +++++++++++-
 .../org/apache/spark/sql/test/SharedSQLContext.scala    |  2 ++
 .../scala/org/apache/spark/sql/hive/test/TestHive.scala |  1 +
 7 files changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2974406d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c407874..db5d65c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -559,9 +559,9 @@ object SQLConf {
     .intConf
     .createWithDefault(100)
 
-  val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback")
+  val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
     .internal()
-    .doc("When true, whole stage codegen could be temporary disabled for the part of query
that" +
+    .doc("When true, (whole stage) codegen could be temporary disabled for the part of query
that" +
       " fail to compile generated code")
     .booleanConf
     .createWithDefault(true)
@@ -1051,7 +1051,7 @@ class SQLConf extends Serializable with Logging {
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
 
-  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
+  def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
 
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2974406d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index c7277c2..b263f10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -56,15 +56,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with
Serializ
 
   protected def sparkContext = sqlContext.sparkContext
 
-  // sqlContext will be null when we are being deserialized on the slaves.  In this instance
-  // the value of subexpressionEliminationEnabled will be set by the deserializer after the
-  // constructor has run.
+  // sqlContext will be null when SparkPlan nodes are created without the active sessions.
+  // So far, this only happens in the test cases.
   val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
     sqlContext.conf.subexpressionEliminationEnabled
   } else {
     false
   }
 
+  // whether we should fallback when hitting compilation errors caused by codegen
+  private val codeGenFallBack = (sqlContext == null) || sqlContext.conf.codegenFallback
+
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
     SparkSession.setActiveSession(sqlContext.sparkSession)
@@ -370,8 +372,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with
Serializ
     try {
       GeneratePredicate.generate(expression, inputSchema)
     } catch {
-      case e @ (_: JaninoRuntimeException | _: CompileException)
-          if sqlContext == null || sqlContext.conf.wholeStageFallback =>
+      case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack =>
         genInterpretedPredicate(expression, inputSchema)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2974406d/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index bacb709..a41a7ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode
with Co
     try {
       CodeGenerator.compile(cleanedSource)
     } catch {
-      case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback
=>
+      case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
         // We should already saw the error message
         logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
         return child.execute()

http://git-wip-us.apache.org/repos/asf/spark/blob/2974406d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 0681b9c..50e4759 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext
{
         v
       }
       withSQLConf(
-        (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString),
+        (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString),
         (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) {
         val df = spark.range(0, 4, 1, 4).withColumn("c", c)
         val rows = df.collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/2974406d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5eb34e5..1334164 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
 
     val filter = (0 until N)
       .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string"))
-    df.filter(filter).count
+
+    withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") {
+      df.filter(filter).count()
+    }
+
+    withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") {
+      val e = intercept[SparkException] {
+        df.filter(filter).count()
+      }.getMessage
+      assert(e.contains("grows beyond 64 KB"))
+    }
   }
 
   test("SPARK-20897: cached self-join should not fail") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2974406d/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 1f073d5..cd8d070 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
@@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with
Eventua
     new SparkConf()
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set(SQLConf.CODEGEN_FALLBACK.key, "false")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2974406d/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 10c9a2d..0f6a81b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -51,6 +51,7 @@ object TestHive
       "TestSQLContext",
       new SparkConf()
         .set("spark.sql.test", "")
+        .set(SQLConf.CODEGEN_FALLBACK.key, "false")
         .set("spark.sql.hive.metastore.barrierPrefixes",
           "org.apache.spark.sql.hive.execution.PairSerDe")
         .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message