spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9415][SQL] Throw AnalysisException when using MapType on Join and Aggregate
Date Sat, 01 Aug 2015 05:26:32 GMT
Repository: spark
Updated Branches:
  refs/heads/master 14f263448 -> 3320b0ba2


[SPARK-9415][SQL] Throw AnalysisException when using MapType on Join and Aggregate

JIRA: https://issues.apache.org/jira/browse/SPARK-9415

Following up #7787. We shouldn't use MapType as grouping keys and join keys too.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #7819 from viirya/map_join_groupby and squashes the following commits:

005ee0c [Liang-Chi Hsieh] For comments.
7463398 [Liang-Chi Hsieh] MapType can't be used as join keys, grouping keys.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3320b0ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3320b0ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3320b0ba

Branch: refs/heads/master
Commit: 3320b0ba262159c0c7209ce39b353c93c597077d
Parents: 14f2634
Author: Liang-Chi Hsieh <viirya@appier.com>
Authored: Fri Jul 31 22:26:30 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Fri Jul 31 22:26:30 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 16 +++--
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 68 +++++++++++++++++++-
 .../spark/sql/DataFrameAggregateSuite.scala     | 10 ---
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  8 ---
 4 files changed, 77 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 0ebc3d1..364569d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -92,8 +92,11 @@ trait CheckAnalysis {
               case p: Predicate =>
                 p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
               case e if e.dataType.isInstanceOf[BinaryType] =>
-                failAnalysis(s"expression ${e.prettyString} in join condition " +
-                  s"'${condition.prettyString}' can't be binary type.")
+                failAnalysis(s"binary type expression ${e.prettyString} cannot be used "
+
+                  "in join conditions")
+              case e if e.dataType.isInstanceOf[MapType] =>
+                failAnalysis(s"map type expression ${e.prettyString} cannot be used " +
+                  "in join conditions")
               case _ => // OK
             }
 
@@ -114,13 +117,16 @@ trait CheckAnalysis {
 
             def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match {
               case BinaryType =>
-                failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can
" +
-                  s"not be binary type.")
+                failAnalysis(s"binary type expression ${expr.prettyString} cannot be used
" +
+                  "in grouping expression")
+              case m: MapType =>
+                failAnalysis(s"map type expression ${expr.prettyString} cannot be used "
+
+                  "in grouping expression")
               case _ => // OK
             }
 
             aggregateExprs.foreach(checkValidAggregateExpression)
-            aggregateExprs.foreach(checkValidGroupingExprs)
+            groupingExprs.foreach(checkValidGroupingExprs)
 
           case Sort(orders, _, _) =>
             orders.foreach { order =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 2588df9..aa19cdc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -181,7 +181,71 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter {
     val error = intercept[AnalysisException] {
       SimpleAnalyzer.checkAnalysis(join)
     }
-    error.message.contains("Failure when resolving conflicting references in Join")
-    error.message.contains("Conflicting attributes")
+    assert(error.message.contains("Failure when resolving conflicting references in Join"))
+    assert(error.message.contains("Conflicting attributes"))
+  }
+
+  test("aggregation can't work on binary and map types") {
+    val plan =
+      Aggregate(
+        AttributeReference("a", BinaryType)(exprId = ExprId(2)) :: Nil,
+        Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
+        LocalRelation(
+          AttributeReference("a", BinaryType)(exprId = ExprId(2)),
+          AttributeReference("b", IntegerType)(exprId = ExprId(1))))
+
+    val error = intercept[AnalysisException] {
+      caseSensitiveAnalyze(plan)
+    }
+    assert(error.message.contains("binary type expression a cannot be used in grouping expression"))
+
+    val plan2 =
+      Aggregate(
+        AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)) ::
Nil,
+        Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
+        LocalRelation(
+          AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
+          AttributeReference("b", IntegerType)(exprId = ExprId(1))))
+
+    val error2 = intercept[AnalysisException] {
+      caseSensitiveAnalyze(plan2)
+    }
+    assert(error2.message.contains("map type expression a cannot be used in grouping expression"))
+  }
+
+  test("Join can't work on binary and map types") {
+    val plan =
+      Join(
+        LocalRelation(
+          AttributeReference("a", BinaryType)(exprId = ExprId(2)),
+          AttributeReference("b", IntegerType)(exprId = ExprId(1))),
+        LocalRelation(
+          AttributeReference("c", BinaryType)(exprId = ExprId(4)),
+          AttributeReference("d", IntegerType)(exprId = ExprId(3))),
+        Inner,
+        Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
+          AttributeReference("c", BinaryType)(exprId = ExprId(4)))))
+
+    val error = intercept[AnalysisException] {
+      caseSensitiveAnalyze(plan)
+    }
+    assert(error.message.contains("binary type expression a cannot be used in join conditions"))
+
+    val plan2 =
+      Join(
+        LocalRelation(
+          AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
+          AttributeReference("b", IntegerType)(exprId = ExprId(1))),
+        LocalRelation(
+          AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
+          AttributeReference("d", IntegerType)(exprId = ExprId(3))),
+        Inner,
+        Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
+          AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))
+
+    val error2 = intercept[AnalysisException] {
+      caseSensitiveAnalyze(plan2)
+    }
+    assert(error2.message.contains("map type expression a cannot be used in join conditions"))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 228ece8..f9cff74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -190,14 +190,4 @@ class DataFrameAggregateSuite extends QueryTest {
       emptyTableData.agg(sumDistinct('a)),
       Row(null))
   }
-
-  test("aggregation can't work on binary type") {
-    val df = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
-    intercept[AnalysisException] {
-      df.groupBy("c").agg(count("*"))
-    }
-    intercept[AnalysisException] {
-      df.distinct
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3320b0ba/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 27c08f6..5bef1d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -490,12 +490,4 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
         Row(3, 2) :: Nil)
 
   }
-
-  test("Join can't work on binary type") {
-    val left = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast
BinaryType)
-    val right = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("d").select($"d" cast
BinaryType)
-    intercept[AnalysisException] {
-      left.join(right, ($"left.N" === $"right.N"), "full")
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message