spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-15677][SQL] Query with scalar sub-query in the SELECT list throws UnsupportedOperationException
Date Fri, 03 Jun 2016 19:04:41 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 da29762f1 -> 396be560d


[SPARK-15677][SQL] Query with scalar sub-query in the SELECT list throws UnsupportedOperationException

## What changes were proposed in this pull request?
Queries with scalar sub-query in the SELECT list run against a local, in-memory relation throw
UnsupportedOperationException exception.

Problem repro:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select (select min(c1) from t2) from t1").show()

java.lang.UnsupportedOperationException: Cannot evaluate expression: scalar-subquery#62 []
  at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:215)
  at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.eval(subquery.scala:62)
  at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142)
  at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:45)
  at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:29)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$37.applyOrElse(Optimizer.scala:1473)
```
The problem is specific to local, in memory relations. It is caused by rule ConvertToLocalRelation,
which attempts to push down
a scalar-subquery expression to the local tables.

The solution prevents the rule to apply if Project references scalar subqueries.

## How was this patch tested?
Added regression tests to SubquerySuite.scala

Author: Ioana Delaney <ioanamdelaney@gmail.com>

Closes #13418 from ioana-delaney/scalarSubV2.

(cherry picked from commit 9e2eb13ca59fc7ac66c6accd49469f339700b23b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/396be560
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/396be560
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/396be560

Branch: refs/heads/branch-2.0
Commit: 396be560deda9ca3a00b57baa5757937969af4e6
Parents: da29762
Author: Ioana Delaney <ioanamdelaney@gmail.com>
Authored: Fri Jun 3 12:04:27 2016 -0700
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri Jun 3 12:04:37 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  7 ++++-
 .../org/apache/spark/sql/SubquerySuite.scala    | 27 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/396be560/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 93762ad..e455d42 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1472,10 +1472,15 @@ object DecimalAggregates extends Rule[LogicalPlan] {
  */
 object ConvertToLocalRelation extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case Project(projectList, LocalRelation(output, data)) =>
+    case Project(projectList, LocalRelation(output, data))
+        if !projectList.exists(hasUnevaluableExpr) =>
       val projection = new InterpretedProjection(projectList, output)
       LocalRelation(projectList.map(_.toAttribute), data.map(projection))
   }
+
+  private def hasUnevaluableExpr(expr: Expression): Boolean = {
+    expr.find(e => e.isInstanceOf[Unevaluable] && !e.isInstanceOf[AttributeReference]).isDefined
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/396be560/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 4819692..a932125 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -123,6 +123,33 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
     )
   }
 
+  test("SPARK-15677: Queries against local relations with scalar subquery in Select list")
{
+    withTempTable("t1", "t2") {
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
+
+      checkAnswer(
+        sql("SELECT (select 1 as col) from t1"),
+        Row(1) :: Row(1) :: Nil)
+
+      checkAnswer(
+        sql("SELECT (select max(c1) from t2) from t1"),
+        Row(2) :: Row(2) :: Nil)
+
+      checkAnswer(
+        sql("SELECT 1 + (select 1 as col) from t1"),
+        Row(2) :: Row(2) :: Nil)
+
+      checkAnswer(
+        sql("SELECT c1, (select max(c1) from t2) + c2 from t1"),
+        Row(1, 3) :: Row(2, 4) :: Nil)
+
+      checkAnswer(
+        sql("SELECT c1, (select max(c1) from t2 where t1.c2 = t2.c2) from t1"),
+        Row(1, 1) :: Row(2, 2) :: Nil)
+    }
+  }
+
   test("SPARK-14791: scalar subquery inside broadcast join") {
     val df = sql("select a, sum(b) as s from l group by a having a > (select avg(a) from
l)")
     val expected = Row(3, 2.0, 3, 3.0) :: Row(6, null, 6, null) :: Nil


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message