spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-12789][SQL] Support Order By Ordinal in SQL
Date Mon, 21 Mar 2016 10:08:53 GMT
Repository: spark
Updated Branches:
  refs/heads/master c35c60fa9 -> 2c5b18fb0


[SPARK-12789][SQL] Support Order By Ordinal in SQL

#### What changes were proposed in this pull request?
This PR is to support order by position in SQL, e.g.
```SQL
select c1, c2, c3 from tbl order by 1 desc, 3
```
should be equivalent to
```SQL
select c1, c2, c3 from tbl order by c1 desc, c3 asc
```

This is controlled by config option `spark.sql.orderByOrdinal`.
- When true, the ordinal numbers are treated as the position in the select list.
- When false, the ordinal number in order/sort By clause are ignored.

- Only convert integer literals (not foldable expressions). If found foldable expressions,
ignore them
- This also works with select *.

**Question**: Do we still need sort by columns that contain zero reference? In this case,
it will have no impact on the sorting results. IMO, we should not allow users do it. rxin
cloud-fan marmbrus yhuai hvanhovell
-- Update: In these cases, they are ignored in this case.

**Note**: This PR is taken from https://github.com/apache/spark/pull/10731. When merging this
PR, please give the credit to zhichao-li

Also cc all the people who are involved in the previous discussion: adrian-wang chenghao-intel
tejasapatil

#### How was this patch tested?
Added a few test cases for both positive and negative test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #11815 from gatorsmile/orderByPosition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c5b18fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c5b18fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c5b18fb

Branch: refs/heads/master
Commit: 2c5b18fb0fdeabd378dd97e91f72d1eac4e21cc7
Parents: c35c60f
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Mon Mar 21 18:08:41 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Mon Mar 21 18:08:41 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/CatalystConf.scala       | 10 ++++-
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 31 +++++++++++++-
 .../spark/sql/catalyst/planning/patterns.scala  | 13 ++++++
 .../sql/catalyst/analysis/AnalysisTest.scala    |  4 +-
 .../analysis/DecimalPrecisionSuite.scala        |  2 +-
 .../optimizer/BooleanSimplificationSuite.scala  |  5 ++-
 .../optimizer/EliminateSortsSuite.scala         | 13 ++++--
 .../org/apache/spark/sql/internal/SQLConf.scala |  7 ++++
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 43 ++++++++++++++++++++
 9 files changed, 117 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 3588413..e10ab97 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._
 private[spark] trait CatalystConf {
   def caseSensitiveAnalysis: Boolean
 
+  def orderByOrdinal: Boolean
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used to determin
if two
    * identifiers are equal.
@@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf {
   override def caseSensitiveAnalysis: Boolean = {
     throw new UnsupportedOperationException
   }
+  override def orderByOrdinal: Boolean = {
+    throw new UnsupportedOperationException
+  }
 }
 
 /** A CatalystConf that can be used for local testing. */
-case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf {
+case class SimpleCatalystConf(
+    caseSensitiveAnalysis: Boolean,
+    orderByOrdinal: Boolean = true)
+  extends CatalystConf {
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e4e934a..333a54e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.planning.IntegerIndex
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -40,7 +41,10 @@ import org.apache.spark.sql.types._
  * references.
  */
 object SimpleAnalyzer
-  extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))
+  extends Analyzer(
+    EmptyCatalog,
+    EmptyFunctionRegistry,
+    new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
 /**
  * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
@@ -618,13 +622,36 @@ class Analyzer(
    * clause.  This rule detects such queries and adds the required attributes to the original
    * projection, so that they will be available during sorting. Another projection is added
to
    * remove these attributes after sorting.
+   *
+   * This rule also resolves the position number in sort references. This support is introduced
+   * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting.
+   * - When the sort references are not integer but foldable expressions, ignore them.
+   * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too.
    */
   object ResolveSortReferences extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+      case s: Sort if !s.child.resolved => s
+      // Replace the index with the related attribute for ORDER BY
+      // which is a 1-base position of the projection list.
+      case s @ Sort(orders, global, child)
+          if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty)
=>
+        val newOrders = orders map {
+          case s @ SortOrder(IntegerIndex(index), direction) =>
+            if (index > 0 && index <= child.output.size) {
+              SortOrder(child.output(index - 1), direction)
+            } else {
+              throw new UnresolvedException(s,
+                s"Order/sort By position: $index does not exist " +
+                s"The Select List is indexed from 1 to ${child.output.size}")
+            }
+          case o => o
+        }
+        Sort(newOrders, global, child)
+
       // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
       case sa @ Sort(_, _, child: Aggregate) => sa
 
-      case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
+      case s @ Sort(order, _, child) if !s.resolved =>
         try {
           val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
           val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 62d54df..ef74504 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.IntegerType
 
 /**
  * A pattern that matches any number of project or filter operations on top of another relational
@@ -202,3 +203,15 @@ object Unions {
     }
   }
 }
+
+/**
+ * Extractor for retrieving Int value.
+ */
+object IntegerIndex {
+  def unapply(a: Any): Option[Int] = a match {
+    case Literal(a: Int, IntegerType) => Some(a)
+    // When resolving ordinal in Sort, negative values are extracted for issuing error messages.
+    case UnaryMinus(IntegerLiteral(v)) => Some(-v)
+    case _ => None
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index ef825e6..39166c4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
 trait AnalysisTest extends PlanTest {
 
   val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
-    val caseSensitiveConf = new SimpleCatalystConf(true)
-    val caseInsensitiveConf = new SimpleCatalystConf(false)
+    val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
+    val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
 
     val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
     val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index b2613e4..9aa685e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
 
 
 class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
-  val conf = new SimpleCatalystConf(true)
+  val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
   val catalog = new SimpleCatalog(conf)
   val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index da43751..47b79fe 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper
{
   }
 
   private val caseInsensitiveAnalyzer =
-    new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false))
+    new Analyzer(
+      EmptyCatalog,
+      EmptyFunctionRegistry,
+      new SimpleCatalystConf(caseSensitiveAnalysis = false))
 
   test("(a && b) || (a && c) => a && (b || c) when case insensitive")
{
     val plan = caseInsensitiveAnalyzer.execute(

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 27c15e8..a4c8d1c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -25,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 
 class EliminateSortsSuite extends PlanTest {
+  val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
+  val catalog = new SimpleCatalog(conf)
+  val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
@@ -48,8 +53,8 @@ class EliminateSortsSuite extends PlanTest {
     val x = testRelation
 
     val query = x.orderBy(SortOrder(3, Ascending), SortOrder(-1, Ascending))
-    val optimized = Optimize.execute(query.analyze)
-    val correctAnswer = x.analyze
+    val optimized = Optimize.execute(analyzer.execute(query))
+    val correctAnswer = analyzer.execute(x)
 
     comparePlans(optimized, correctAnswer)
   }
@@ -58,8 +63,8 @@ class EliminateSortsSuite extends PlanTest {
     val x = testRelation
 
     val query = x.orderBy(SortOrder(3, Ascending), 'a.asc)
-    val optimized = Optimize.execute(query.analyze)
-    val correctAnswer = x.orderBy('a.asc).analyze
+    val optimized = Optimize.execute(analyzer.execute(query))
+    val correctAnswer = analyzer.execute(x.orderBy('a.asc))
 
     comparePlans(optimized, correctAnswer)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9e0878a..3d1d5b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -435,6 +435,11 @@ object SQLConf {
     defaultValue = Some(true),
     doc = "When false, we will treat bucketed table as normal table.")
 
+  val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
+    defaultValue = Some(true),
+    doc = "When true, the ordinal numbers are treated as the position in the select list.
" +
+          "When false, the ordinal numbers in order/sort By clause are ignored.")
+
   // The output committer class used by HadoopFsRelation. The specified class needs to be
a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
   //
@@ -634,6 +639,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with
Loggin
 
   def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS)
 
+  override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b18fb/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6716982..b765fd8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,6 +21,8 @@ import java.math.MathContext
 import java.sql.Timestamp
 
 import org.apache.spark.AccumulatorSuite
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin}
 import org.apache.spark.sql.functions._
@@ -2156,6 +2158,47 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("order by ordinal number") {
+    checkAnswer(
+      sql("SELECT * FROM testData2 ORDER BY 1 DESC"),
+      sql("SELECT * FROM testData2 ORDER BY a DESC"))
+    // If the position is not an integer, ignore it.
+    checkAnswer(
+      sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"),
+      sql("SELECT * FROM testData2 ORDER BY b ASC"))
+
+    checkAnswer(
+      sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
+      sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"))
+    checkAnswer(
+      sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"),
+      sql("SELECT * FROM testData2 SORT BY a DESC, b ASC"))
+    checkAnswer(
+      sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"),
+      Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2)))
+  }
+
+  test("order by ordinal number - negative cases") {
+    intercept[UnresolvedException[SortOrder]] {
+      sql("SELECT * FROM testData2 ORDER BY 0")
+    }
+    intercept[UnresolvedException[SortOrder]] {
+      sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC")
+    }
+    intercept[UnresolvedException[SortOrder]] {
+      sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC")
+    }
+  }
+
+  test("order by ordinal number with conf spark.sql.orderByOrdinal=false") {
+    withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") {
+      // If spark.sql.orderByOrdinal=false, ignore the position number.
+      checkAnswer(
+        sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
+        sql("SELECT * FROM testData2 ORDER BY b ASC"))
+    }
+  }
+
   test("natural join") {
     val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1")
     val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message