spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-5383][SQL] Support alias for udtfs
Date Tue, 03 Feb 2015 20:16:35 GMT
Repository: spark
Updated Branches:
  refs/heads/master ca7a6cdff -> 5adbb3948


[SPARK-5383][SQL] Support alias for udtfs

Add support for alias of udtfs, such as
```
select stack(2, key, value, key, value) as (a, b) from src limit 5;

select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5

```

Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>
Author: Fei Wang <wangfei1@huawei.com>

Closes #4186 from scwf/multi-alias-names and squashes the following commits:

c35e922 [wangfei] fix conflicts
adc8311 [wangfei] minor format fix
2783aed [wangfei] convert it to a Generate instead of leaving it inside of a Project clause
a87668a [wangfei] minor improvement
b25d9b3 [wangfei] resolve conflicts
d38f041 [wangfei] style fix
8cfcebf [wangfei] minor improvement
12a239e [wangfei] fix test case
050177f [wangfei] added extendedCheckRules
3d69329 [wangfei] added CheckMultiAlias to analyzer
324150d [wangfei] added multi alias node
74f5a81 [Fei Wang] imports order fix
5bc3f59 [scwf] style fix
3daec28 [scwf] support alias for udfs with multi output columns


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5adbb394
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5adbb394
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5adbb394

Branch: refs/heads/master
Commit: 5adbb39482631998dbfe4a1da88f6e75b30fb5ac
Parents: ca7a6cd
Author: wangfei <wangfei1@huawei.com>
Authored: Tue Feb 3 12:16:31 2015 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Tue Feb 3 12:16:31 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  5 +--
 .../sql/catalyst/analysis/unresolved.scala      | 38 ++++++++++++++++++++
 .../catalyst/expressions/namedExpressions.scala |  2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |  1 +
 .../org/apache/spark/sql/hive/HiveQl.scala      | 16 ++++++---
 .../org/apache/spark/sql/hive/hiveUdfs.scala    | 19 +++++++++-
 .../sql/hive/execution/HiveQuerySuite.scala     | 12 +++++++
 7 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5adbb394/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index cefd70a..ae7f7b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -69,8 +69,9 @@ class Analyzer(catalog: Catalog,
       typeCoercionRules ++
       extendedRules : _*),
     Batch("Check Analysis", Once,
-      CheckResolution,
-      CheckAggregation),
+      CheckResolution ::
+      CheckAggregation ::
+      Nil: _*),
     Batch("AnalysisOperators", fixedPoint,
       EliminateAnalysisOperators)
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/5adbb394/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 6606028..f35921e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -128,6 +128,44 @@ case class UnresolvedStar(table: Option[String]) extends Star {
   override def toString = table.map(_ + ".").getOrElse("") + "*"
 }
 
+/**
+ * Used to assign new names to Generator's output, such as hive udtf.
+ * For example the SQL expression "stack(2, key, value, key, value) as (a, b)" could be represented
+ * as follows:
+ *  MultiAlias(stack_function, Seq(a, b))
+
+ * @param child the computation being performed
+ * @param names the names to be associated with each output of computing [[child]].
+ */
+case class MultiAlias(child: Expression, names: Seq[String])
+  extends Attribute with trees.UnaryNode[Expression] {
+
+  override def name = throw new UnresolvedException(this, "name")
+
+  override def exprId = throw new UnresolvedException(this, "exprId")
+
+  override def dataType = throw new UnresolvedException(this, "dataType")
+
+  override def nullable = throw new UnresolvedException(this, "nullable")
+
+  override def qualifiers = throw new UnresolvedException(this, "qualifiers")
+
+  override lazy val resolved = false
+
+  override def newInstance = this
+
+  override def withNullability(newNullability: Boolean) = this
+
+  override def withQualifiers(newQualifiers: Seq[String]) = this
+
+  override def withName(newName: String) = this
+
+  override def eval(input: Row = null): EvaluatedType =
+    throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
+  override def toString: String = s"$child AS $names"
+
+}
 
 /**
  * Represents all the resolved input attributes to a given relational operator. This is used

http://git-wip-us.apache.org/repos/asf/spark/blob/5adbb394/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index f388cd5..e6ab1fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -75,7 +75,7 @@ abstract class Attribute extends NamedExpression {
 /**
  * Used to assign a new name to a computation.
  * For example the SQL expression "1 + 1 AS a" could be represented as follows:
- *  Alias(Add(Literal(1), Literal(1), "a")()
+ *  Alias(Add(Literal(1), Literal(1)), "a")()
  *
  * Note that exprId and qualifiers are in a separate parameter list because
  * we only pattern match on child and name.

http://git-wip-us.apache.org/repos/asf/spark/blob/5adbb394/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 50f266a..922e61f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -323,6 +323,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         catalog.CreateTables ::
         catalog.PreInsertionCasts ::
         ExtractPythonUdfs ::
+        ResolveUdtfsAlias ::
         Nil
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5adbb394/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index ab305e1..74ca0d4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.sql.Date
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Context
@@ -968,14 +969,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
   }
 
   protected def selExprNodeToExpr(node: Node): Option[Expression] = node match {
-    case Token("TOK_SELEXPR",
-           e :: Nil) =>
+    case Token("TOK_SELEXPR", e :: Nil) =>
       Some(nodeToExpr(e))
 
-    case Token("TOK_SELEXPR",
-           e :: Token(alias, Nil) :: Nil) =>
+    case Token("TOK_SELEXPR", e :: Token(alias, Nil) :: Nil) =>
       Some(Alias(nodeToExpr(e), cleanIdentifier(alias))())
 
+    case Token("TOK_SELEXPR", e :: aliasChildren) =>
+      var aliasNames = ArrayBuffer[String]()
+      aliasChildren.foreach { _ match {
+        case Token(name, Nil) => aliasNames += cleanIdentifier(name)
+        case _ =>
+        }
+      }
+      Some(MultiAlias(nodeToExpr(e), aliasNames))
+
     /* Hints are ignored */
     case Token("TOK_HINTLIST", _) => None
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5adbb394/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 76d2140..34c21c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -33,8 +33,11 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, Project, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils.getContextOrSparkClassLoader
+import org.apache.spark.sql.catalyst.analysis.MultiAlias
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -321,6 +324,20 @@ private[hive] case class HiveGenericUdtf(
   override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
 }
 
+/**
+ * Resolve Udtfs Alias.
+ */
+private[spark] object ResolveUdtfsAlias extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan) = plan transform {
+    case p @ Project(projectList, _)
+      if projectList.exists(_.isInstanceOf[MultiAlias]) && projectList.size != 1
=>
+      throw new TreeNodeException(p, "only single Generator supported for SELECT clause")
+
+    case Project(Seq(MultiAlias(udtf @ HiveGenericUdtf(_, _, _), names)), child) =>
+        Generate(udtf.copy(aliasNames = names), join = false, outer = false, None, child)
+  }
+}
+
 private[hive] case class HiveUdafFunction(
     funcWrapper: HiveFunctionWrapper,
     exprs: Seq[Expression],

http://git-wip-us.apache.org/repos/asf/spark/blob/5adbb394/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 4c53b10..8e84d27 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -583,6 +583,18 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
     assert(sql("select key from src having key > 490").collect().size < 100)
   }
 
+  test("SPARK-5383 alias for udfs with multi output columns") {
+    assert(
+      sql("select stack(2, key, value, key, value) as (a, b) from src limit 5")
+        .collect()
+        .size == 5)
+
+    assert(
+      sql("select a, b from (select stack(2, key, value, key, value) as (a, b) from src)
t limit 5")
+        .collect()
+        .size == 5)
+  }
+
   test("SPARK-5367: resolve star expression in udf") {
     assert(sql("select concat(*) from src limit 5").collect().size == 5)
     assert(sql("select array(*) from src limit 5").collect().size == 5)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message