spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-20854][SQL] Extend hint syntax to support expressions
Date Thu, 01 Jun 2017 22:56:36 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4cba3b5a3 -> bb3d900b4


[SPARK-20854][SQL] Extend hint syntax to support expressions

SQL hint syntax:
* support expressions such as strings, numbers, etc. instead of only identifiers as it is
currently.
* support multiple hints, which was missing compared to the DataFrame syntax.

DataFrame API:
* support any parameters in DataFrame.hint instead of just strings

Existing tests. New tests in PlanParserSuite. New suite DataFrameHintSuite.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #18086 from bogdanrdc/SPARK-20854.

(cherry picked from commit 2134196a9c0aca82bc3e203c09e776a8bd064d65)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb3d900b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb3d900b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb3d900b

Branch: refs/heads/branch-2.2
Commit: bb3d900b48d50d27188a52662d5eb95738265669
Parents: 4cba3b5
Author: Bogdan Raducanu <bogdan@databricks.com>
Authored: Thu Jun 1 15:50:40 2017 -0700
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Thu Jun 1 15:56:26 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   6 +-
 .../sql/catalyst/analysis/ResolveHints.scala    |   8 +-
 .../apache/spark/sql/catalyst/dsl/package.scala |   3 +
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  11 +-
 .../sql/catalyst/plans/logical/hints.scala      |   6 +-
 .../sql/catalyst/analysis/DSLHintSuite.scala    |  53 ++++++++++
 .../sql/catalyst/parser/PlanParserSuite.scala   | 100 ++++++++++++++++---
 .../scala/org/apache/spark/sql/Dataset.scala    |   2 +-
 .../apache/spark/sql/DataFrameHintSuite.scala   |  62 ++++++++++++
 9 files changed, 225 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 1ecb3d1..31b1c67 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -371,7 +371,7 @@ querySpecification
        (RECORDREADER recordReader=STRING)?
        fromClause?
        (WHERE where=booleanExpression)?)
-    | ((kind=SELECT hint? setQuantifier? namedExpressionSeq fromClause?
+    | ((kind=SELECT (hints+=hint)* setQuantifier? namedExpressionSeq fromClause?
        | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?)
        lateralView*
        (WHERE where=booleanExpression)?
@@ -381,12 +381,12 @@ querySpecification
     ;
 
 hint
-    : '/*+' hintStatement '*/'
+    : '/*+' hintStatements+=hintStatement (','? hintStatements+=hintStatement)* '*/'
     ;
 
 hintStatement
     : hintName=identifier
-    | hintName=identifier '(' parameters+=identifier (',' parameters+=identifier)* ')'
+    | hintName=identifier '(' parameters+=primaryExpression (',' parameters+=primaryExpression)*
')'
     ;
 
 fromClause

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 86c788a..62a3482 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
@@ -91,7 +92,12 @@ object ResolveHints {
           ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
         } else {
           // Otherwise, find within the subtree query plans that should be broadcasted.
-          applyBroadcastHint(h.child, h.parameters.toSet)
+          applyBroadcastHint(h.child, h.parameters.map {
+            case tableName: String => tableName
+            case tableId: UnresolvedAttribute => tableId.name
+            case unsupported => throw new AnalysisException("Broadcast hint parameter
should be " +
+              s"an identifier or string but was $unsupported (${unsupported.getClass}")
+          }.toSet)
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index ed423e7..beee93d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -381,6 +381,9 @@ package object dsl {
 
       def analyze: LogicalPlan =
         EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))
+
+      def hint(name: String, parameters: Any*): LogicalPlan =
+        UnresolvedHint(name, parameters, logicalPlan)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d4dccf1..10db749 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -406,7 +406,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with
Logging
         val withWindow = withDistinct.optionalMap(windows)(withWindows)
 
         // Hint
-        withWindow.optionalMap(hint)(withHints)
+        hints.asScala.foldRight(withWindow)(withHints)
     }
   }
 
@@ -532,13 +532,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with
Logging
   }
 
   /**
-   * Add a [[UnresolvedHint]] to a logical plan.
+   * Add [[UnresolvedHint]]s to a logical plan.
    */
   private def withHints(
       ctx: HintContext,
       query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
-    val stmt = ctx.hintStatement
-    UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
+    var plan = query
+    ctx.hintStatements.asScala.reverse.foreach { case stmt =>
+      plan = UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(expression),
plan)
+    }
+    plan
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index 5fe6d2d..d16fae5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -23,9 +23,11 @@ import org.apache.spark.sql.internal.SQLConf
 /**
  * A general hint for the child that is not yet resolved. This node is generated by the parser
and
  * should be removed This node will be eliminated post analysis.
- * A pair of (name, parameters).
+ * @param name the name of the hint
+ * @param parameters the parameters of the hint
+ * @param child the [[LogicalPlan]] on which this hint applies
  */
-case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan)
+case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan)
   extends UnaryNode {
 
   override lazy val resolved: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala
new file mode 100644
index 0000000..48a3ca2
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DSLHintSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.analysis.AnalysisTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class DSLHintSuite extends AnalysisTest {
+  lazy val a = 'a.int
+  lazy val b = 'b.string
+  lazy val c = 'c.string
+  lazy val r1 = LocalRelation(a, b, c)
+
+  test("various hint parameters") {
+    comparePlans(
+      r1.hint("hint1"),
+      UnresolvedHint("hint1", Seq(), r1)
+    )
+
+    comparePlans(
+      r1.hint("hint1", 1, "a"),
+      UnresolvedHint("hint1", Seq(1, "a"), r1)
+    )
+
+    comparePlans(
+      r1.hint("hint1", 1, $"a"),
+      UnresolvedHint("hint1", Seq(1, $"a"), r1)
+    )
+
+    comparePlans(
+      r1.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")),
+      UnresolvedHint("hint1", Seq(Seq(1, 2, 3), Seq($"a", $"b", $"c")), r1)
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index bed5ca9..954f6da 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.parser
 
 import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGenerator, UnresolvedInlineTable,
UnresolvedTableValuedFunction}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator,
UnresolvedInlineTable, UnresolvedTableValuedFunction}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -496,19 +496,13 @@ class PlanParserSuite extends PlanTest {
     val m = intercept[ParseException] {
       parsePlan("SELECT /*+ HINT() */ * FROM t")
     }.getMessage
-    assert(m.contains("no viable alternative at input"))
-
-    // Hive compatibility: No database.
-    val m2 = intercept[ParseException] {
-      parsePlan("SELECT /*+ MAPJOIN(default.t) */ * from default.t")
-    }.getMessage
-    assert(m2.contains("mismatched input '.' expecting {')', ','}"))
+    assert(m.contains("mismatched input"))
 
     // Disallow space as the delimiter.
     val m3 = intercept[ParseException] {
       parsePlan("SELECT /*+ INDEX(a b c) */ * from default.t")
     }.getMessage
-    assert(m3.contains("mismatched input 'b' expecting {')', ','}"))
+    assert(m3.contains("mismatched input 'b' expecting"))
 
     comparePlans(
       parsePlan("SELECT /*+ HINT */ * FROM t"),
@@ -516,27 +510,103 @@ class PlanParserSuite extends PlanTest {
 
     comparePlans(
       parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"),
-      UnresolvedHint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("BROADCASTJOIN", Seq($"u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"),
-      UnresolvedHint("MAPJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq($"u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"),
-      UnresolvedHint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
+      UnresolvedHint("STREAMTABLE", Seq($"a", $"b", $"c"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"),
-      UnresolvedHint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
+      UnresolvedHint("INDEX", Seq($"t", $"emp_job_ix"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"),
-      UnresolvedHint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq(UnresolvedAttribute.quoted("default.t")),
+        table("default.t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"),
-      UnresolvedHint("MAPJOIN", Seq("t"),
+      UnresolvedHint("MAPJOIN", Seq($"t"),
         table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
   }
+
+  test("SPARK-20854: select hint syntax with expressions") {
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"),
+      UnresolvedHint("HINT1", Seq($"a",
+        UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)),
+        table("t").select(star())
+      )
+    )
+
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"),
+      UnresolvedHint("HINT1", Seq($"a",
+        UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)),
+        table("t").select(star())
+      )
+    )
+
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1(a, 5, 'a', b) */ * from t"),
+      UnresolvedHint("HINT1", Seq($"a", Literal(5), Literal("a"), $"b"),
+        table("t").select(star())
+      )
+    )
+
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1('a', (b, c), (1, 2)) */ * from t"),
+      UnresolvedHint("HINT1",
+        Seq(Literal("a"),
+          CreateStruct($"b" :: $"c" :: Nil),
+          CreateStruct(Literal(1) :: Literal(2) :: Nil)),
+        table("t").select(star())
+      )
+    )
+  }
+
+  test("SPARK-20854: multiple hints") {
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1(a, 1) hint2(b, 2) */ * from t"),
+      UnresolvedHint("HINT1", Seq($"a", Literal(1)),
+        UnresolvedHint("hint2", Seq($"b", Literal(2)),
+          table("t").select(star())
+        )
+      )
+    )
+
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1(a, 1),hint2(b, 2) */ * from t"),
+      UnresolvedHint("HINT1", Seq($"a", Literal(1)),
+        UnresolvedHint("hint2", Seq($"b", Literal(2)),
+          table("t").select(star())
+        )
+      )
+    )
+
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1(a, 1) */ /*+ hint2(b, 2) */ * from t"),
+      UnresolvedHint("HINT1", Seq($"a", Literal(1)),
+        UnresolvedHint("hint2", Seq($"b", Literal(2)),
+          table("t").select(star())
+        )
+      )
+    )
+
+    comparePlans(
+      parsePlan("SELECT /*+ HINT1(a, 1), hint2(b, 2) */ /*+ hint3(c, 3) */ * from t"),
+      UnresolvedHint("HINT1", Seq($"a", Literal(1)),
+        UnresolvedHint("hint2", Seq($"b", Literal(2)),
+          UnresolvedHint("hint3", Seq($"c", Literal(3)),
+            table("t").select(star())
+          )
+        )
+      )
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 68d1a6b..f491e3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1086,7 +1086,7 @@ class Dataset[T] private[sql](
    * @since 2.2.0
    */
   @scala.annotation.varargs
-  def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
+  def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan {
     UnresolvedHint(name, parameters, logicalPlan)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bb3d900b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala
new file mode 100644
index 0000000..60f6f23
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameHintSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class DataFrameHintSuite extends PlanTest with SharedSQLContext {
+  import testImplicits._
+  lazy val df = spark.range(10)
+
+  private def check(df: Dataset[_], expected: LogicalPlan) = {
+    comparePlans(
+      df.queryExecution.logical,
+      expected
+    )
+  }
+
+  test("various hint parameters") {
+    check(
+      df.hint("hint1"),
+      UnresolvedHint("hint1", Seq(),
+        df.logicalPlan
+      )
+    )
+
+    check(
+      df.hint("hint1", 1, "a"),
+      UnresolvedHint("hint1", Seq(1, "a"), df.logicalPlan)
+    )
+
+    check(
+      df.hint("hint1", 1, $"a"),
+      UnresolvedHint("hint1", Seq(1, $"a"),
+        df.logicalPlan
+      )
+    )
+
+    check(
+      df.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")),
+      UnresolvedHint("hint1", Seq(Seq(1, 2, 3), Seq($"a", $"b", $"c")),
+        df.logicalPlan
+      )
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message