spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-15230][SQL] distinct() does not handle column name with dot properly
Date Thu, 23 Jun 2016 03:06:43 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e0a43235d -> 5b4a9a4c3


[SPARK-15230][SQL] distinct() does not handle column name with dot properly

## What changes were proposed in this pull request?

When table is created with column name containing dot, distinct() will fail to run. For example,
```scala
val rowRDD = sparkContext.parallelize(Seq(Row(1), Row(1), Row(2)))
val schema = StructType(Array(StructField("column.with.dot", IntegerType, nullable = false)))
val df = spark.createDataFrame(rowRDD, schema)
```
running the following will have no problem:
```scala
df.select(new Column("`column.with.dot`"))
```
but running the query with additional distinct() will cause exception:
```scala
df.select(new Column("`column.with.dot`")).distinct()
```

The issue is that distinct() will try to resolve the column name, but the column name in the
schema does not have backtick with it. So the solution is to add the backtick before passing
the column name to resolve().

## How was this patch tested?

Added a new test case.

Author: bomeng <bmeng@us.ibm.com>

Closes #13140 from bomeng/SPARK-15230.

(cherry picked from commit 925884a612dd88beaddf555c74d90856ab040ec7)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b4a9a4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b4a9a4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b4a9a4c

Branch: refs/heads/branch-2.0
Commit: 5b4a9a4c37822cd7528c6bb933da3454fd3bcd37
Parents: e0a4323
Author: bomeng <bmeng@us.ibm.com>
Authored: Thu Jun 23 11:06:19 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Thu Jun 23 11:06:38 2016 +0800

----------------------------------------------------------------------
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala   | 8 +++++++-
 .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 5 +++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b4a9a4c/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 02cc398..f1d33c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1812,7 +1812,13 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
-    val groupCols = colNames.map(resolve)
+    val resolver = sparkSession.sessionState.analyzer.resolver
+    val allColumns = queryExecution.analyzed.output
+    val groupCols = colNames.map { colName =>
+      allColumns.find(col => resolver(col.name, colName)).getOrElse(
+        throw new AnalysisException(
+          s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(",
")})"""))
+    }
     val groupColExprIds = groupCols.map(_.exprId)
     val aggCols = logicalPlan.output.map { attr =>
       if (groupColExprIds.contains(attr.exprId)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5b4a9a4c/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c8a0f71..1afee9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1536,4 +1536,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       Utils.deleteRecursively(baseDir)
     }
   }
+
+  test("SPARK-15230: distinct() does not handle column name with dot properly") {
+    val df = Seq(1, 1, 2).toDF("column.with.dot")
+    checkAnswer(df.distinct(), Row(1) :: Row(2) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message