spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9949] [SQL] Fix TakeOrderedAndProject's output.
Date Sat, 15 Aug 2015 00:35:20 GMT
Repository: spark
Updated Branches:
  refs/heads/master 18a761ef7 -> 932b24fd1


[SPARK-9949] [SQL] Fix TakeOrderedAndProject's output.

https://issues.apache.org/jira/browse/SPARK-9949

Author: Yin Huai <yhuai@databricks.com>

Closes #8179 from yhuai/SPARK-9949.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/932b24fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/932b24fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/932b24fd

Branch: refs/heads/master
Commit: 932b24fd144232fb08184f0bd0a46369ecba164e
Parents: 18a761e
Author: Yin Huai <yhuai@databricks.com>
Authored: Fri Aug 14 17:35:17 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Fri Aug 14 17:35:17 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/basicOperators.scala    | 12 +++++++++++-
 .../spark/sql/execution/PlannerSuite.scala      | 20 +++++++++++++++++---
 2 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/932b24fd/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 247c900..77b9806 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -237,7 +237,10 @@ case class TakeOrderedAndProject(
     projectList: Option[Seq[NamedExpression]],
     child: SparkPlan) extends UnaryNode {
 
-  override def output: Seq[Attribute] = child.output
+  override def output: Seq[Attribute] = {
+    val projectOutput = projectList.map(_.map(_.toAttribute))
+    projectOutput.getOrElse(child.output)
+  }
 
   override def outputPartitioning: Partitioning = SinglePartition
 
@@ -263,6 +266,13 @@ case class TakeOrderedAndProject(
   protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(),
1)
 
   override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def simpleString: String = {
+    val orderByString = sortOrder.mkString("[", ",", "]")
+    val outputString = output.mkString("[", ",", "]")
+
+    s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/932b24fd/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 937a108..fad93b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -162,9 +162,23 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("efficient limit -> project -> sort") {
-    val query = testData.sort('key).select('value).limit(2).logicalPlan
-    val planned = ctx.planner.TakeOrderedAndProject(query)
-    assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+    {
+      val query =
+        testData.select('key, 'value).sort('key).limit(2).logicalPlan
+      val planned = ctx.planner.TakeOrderedAndProject(query)
+      assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+      assert(planned.head.output === testData.select('key, 'value).logicalPlan.output)
+    }
+
+    {
+      // We need to make sure TakeOrderedAndProject's output is correct when we push a project
+      // into it.
+      val query =
+        testData.select('key, 'value).sort('key).select('value, 'key).limit(2).logicalPlan
+      val planned = ctx.planner.TakeOrderedAndProject(query)
+      assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+      assert(planned.head.output === testData.select('value, 'key).logicalPlan.output)
+    }
   }
 
   test("PartitioningCollection") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message