spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-13260][SQL] count(*) does not work with CSV data source
Date Fri, 12 Feb 2016 19:55:01 GMT
Repository: spark
Updated Branches:
  refs/heads/master c4d5ad80c -> ac7d6af1c


[SPARK-13260][SQL] count(*) does not work with CSV data source

https://issues.apache.org/jira/browse/SPARK-13260
This is a quicky fix for `count(*)`.

When the `requiredColumns` is empty, currently it returns `sqlContext.sparkContext.emptyRDD[Row]`
which does not have the count.

Just like JSON datasource, this PR lets the CSV datasource count the rows but do not parse
each set of tokens.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #11169 from HyukjinKwon/SPARK-13260.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac7d6af1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac7d6af1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac7d6af1

Branch: refs/heads/master
Commit: ac7d6af1cafc6b159d1df6cf349bb0c7ffca01cd
Parents: c4d5ad8
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Fri Feb 12 11:54:58 2016 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Fri Feb 12 11:54:58 2016 -0800

----------------------------------------------------------------------
 .../execution/datasources/csv/CSVRelation.scala | 84 ++++++++++----------
 .../execution/datasources/csv/CSVSuite.scala    |  2 +-
 2 files changed, 41 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac7d6af1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index dc449fe..f8e3a1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -197,52 +197,48 @@ object CSVRelation extends Logging {
     } else {
       requiredFields
     }
-    if (requiredColumns.isEmpty) {
-      sqlContext.sparkContext.emptyRDD[Row]
-    } else {
-      val safeRequiredIndices = new Array[Int](safeRequiredFields.length)
-      schemaFields.zipWithIndex.filter {
-        case (field, _) => safeRequiredFields.contains(field)
-      }.foreach {
-        case (field, index) => safeRequiredIndices(safeRequiredFields.indexOf(field))
= index
-      }
-      val rowArray = new Array[Any](safeRequiredIndices.length)
-      val requiredSize = requiredFields.length
-      tokenizedRDD.flatMap { tokens =>
-        if (params.dropMalformed && schemaFields.length != tokens.size) {
-          logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
-          None
-        } else if (params.failFast && schemaFields.length != tokens.size) {
-          throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
-            s"${tokens.mkString(params.delimiter.toString)}")
+    val safeRequiredIndices = new Array[Int](safeRequiredFields.length)
+    schemaFields.zipWithIndex.filter {
+      case (field, _) => safeRequiredFields.contains(field)
+    }.foreach {
+      case (field, index) => safeRequiredIndices(safeRequiredFields.indexOf(field)) =
index
+    }
+    val rowArray = new Array[Any](safeRequiredIndices.length)
+    val requiredSize = requiredFields.length
+    tokenizedRDD.flatMap { tokens =>
+      if (params.dropMalformed && schemaFields.length != tokens.size) {
+        logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+        None
+      } else if (params.failFast && schemaFields.length != tokens.size) {
+        throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
+          s"${tokens.mkString(params.delimiter.toString)}")
+      } else {
+        val indexSafeTokens = if (params.permissive && schemaFields.length > tokens.size)
{
+          tokens ++ new Array[String](schemaFields.length - tokens.size)
+        } else if (params.permissive && schemaFields.length < tokens.size) {
+          tokens.take(schemaFields.length)
         } else {
-          val indexSafeTokens = if (params.permissive && schemaFields.length >
tokens.size) {
-            tokens ++ new Array[String](schemaFields.length - tokens.size)
-          } else if (params.permissive && schemaFields.length < tokens.size) {
-            tokens.take(schemaFields.length)
-          } else {
-            tokens
-          }
-          try {
-            var index: Int = 0
-            var subIndex: Int = 0
-            while (subIndex < safeRequiredIndices.length) {
-              index = safeRequiredIndices(subIndex)
-              val field = schemaFields(index)
-              rowArray(subIndex) = CSVTypeCast.castTo(
-                indexSafeTokens(index),
-                field.dataType,
-                field.nullable,
-                params.nullValue)
-              subIndex = subIndex + 1
-            }
-            Some(Row.fromSeq(rowArray.take(requiredSize)))
-          } catch {
-            case NonFatal(e) if params.dropMalformed =>
-              logWarning("Parse exception. " +
-                s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
-              None
+          tokens
+        }
+        try {
+          var index: Int = 0
+          var subIndex: Int = 0
+          while (subIndex < safeRequiredIndices.length) {
+            index = safeRequiredIndices(subIndex)
+            val field = schemaFields(index)
+            rowArray(subIndex) = CSVTypeCast.castTo(
+              indexSafeTokens(index),
+              field.dataType,
+              field.nullable,
+              params.nullValue)
+            subIndex = subIndex + 1
           }
+          Some(Row.fromSeq(rowArray.take(requiredSize)))
+        } catch {
+          case NonFatal(e) if params.dropMalformed =>
+            logWarning("Parse exception. " +
+              s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+            None
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac7d6af1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index fa4f137..9d1f456 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -56,7 +56,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils
{
     val numRows = if (withHeader) numCars else numCars + 1
     // schema
     assert(df.schema.fieldNames.length === numColumns)
-    assert(df.collect().length === numRows)
+    assert(df.count === numRows)
 
     if (checkHeader) {
       if (withHeader) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message