spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: Revert "[SPARK-24244][SQL] Passing only required columns to the CSV parser"
Date Wed, 23 May 2018 18:52:11 GMT
Repository: spark
Updated Branches:
  refs/heads/master df125062c -> 5a5a868dc


Revert "[SPARK-24244][SQL] Passing only required columns to the CSV parser"

This reverts commit 8086acc2f676a04ce6255a621ffae871bd09ceea.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a5a868d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a5a868d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a5a868d

Branch: refs/heads/master
Commit: 5a5a868dc410ad8c97851d7f3f0ea1c9fc1db90c
Parents: df12506
Author: Xiao Li <gatorsmile@gmail.com>
Authored: Wed May 23 11:51:13 2018 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Wed May 23 11:51:13 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  1 -
 .../org/apache/spark/sql/internal/SQLConf.scala |  7 ----
 .../execution/datasources/csv/CSVOptions.scala  |  3 --
 .../datasources/csv/UnivocityParser.scala       | 26 +++++-------
 .../datasources/csv/CSVBenchmarks.scala         | 42 -------------------
 .../execution/datasources/csv/CSVSuite.scala    | 43 ++++----------------
 6 files changed, 18 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a5a868d/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index fc26562..f1ed316 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1825,7 +1825,6 @@ working with timestamps in `pandas_udf`s to get the best performance,
see
   - In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone
in the input timestamp string, which breaks the assumption that the input timestamp is in
a specific timezone. Therefore, these 2 functions can return unexpected results. In version
2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will
return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10
00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However,
`from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8,
will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care
about this problem and want to retain the previous behaivor to keep their query unchanged,
you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed
in Spark 3.0 and should only be used as a temporary w
 orkaround.
   - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores
table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive
table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`,
too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting
Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES
(parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark
2.3, and in Spark 2.4, the result would be uncompressed parquet files.
   - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance.
Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own
ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED
AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted
into Spark's ORC data source table and ORC vectorization would be applied. To set `false`
to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
-  - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column
value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs
an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when
it contains malformed column values requested from CSV datasource, other values can be ignored.
As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection
of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier
it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled`
to `false`.
 
 ## Upgrading From Spark SQL 2.2 to 2.3
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5a868d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fb98df5..15ba10f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1307,13 +1307,6 @@ object SQLConf {
   object Replaced {
     val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
   }
-
-  val CSV_PARSER_COLUMN_PRUNING = buildConf("spark.sql.csv.parser.columnPruning.enabled")
-    .internal()
-    .doc("If it is set to true, column names of the requested schema are passed to CSV parser.
" +
-      "Other column values can be ignored during parsing even if they are malformed.")
-    .booleanConf
-    .createWithDefault(true)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5a868d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index dd41aee..1066d15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -25,7 +25,6 @@ import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
 
 class CSVOptions(
     @transient val parameters: CaseInsensitiveMap[String],
@@ -81,8 +80,6 @@ class CSVOptions(
     }
   }
 
-  private[csv] val columnPruning = SQLConf.get.getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
-
   val delimiter = CSVUtils.toChar(
     parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
   val parseMode: ParseMode =

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5a868d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 4f00cc5..99557a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -34,10 +34,10 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 class UnivocityParser(
-    dataSchema: StructType,
+    schema: StructType,
     requiredSchema: StructType,
     val options: CSVOptions) extends Logging {
-  require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
+  require(requiredSchema.toSet.subsetOf(schema.toSet),
     "requiredSchema should be the subset of schema.")
 
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)
@@ -45,17 +45,9 @@ class UnivocityParser(
   // A `ValueConverter` is responsible for converting the given value to a desired type.
   private type ValueConverter = String => Any
 
-  private val tokenizer = {
-    val parserSetting = options.asParserSettings
-    if (options.columnPruning && requiredSchema.length < dataSchema.length) {
-      val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f)))
-      parserSetting.selectIndexes(tokenIndexArr: _*)
-    }
-    new CsvParser(parserSetting)
-  }
-  private val schema = if (options.columnPruning) requiredSchema else dataSchema
+  private val tokenizer = new CsvParser(options.asParserSettings)
 
-  private val row = new GenericInternalRow(schema.length)
+  private val row = new GenericInternalRow(requiredSchema.length)
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
@@ -81,8 +73,11 @@ class UnivocityParser(
   // Each input token is placed in each output row's position by mapping these. In this case,
   //
   //   output row - ["A", 2]
-  private val valueConverters: Array[ValueConverter] = {
+  private val valueConverters: Array[ValueConverter] =
     schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
+
+  private val tokenIndexArr: Array[Int] = {
+    requiredSchema.map(f => schema.indexOf(f)).toArray
   }
 
   /**
@@ -215,8 +210,9 @@ class UnivocityParser(
     } else {
       try {
         var i = 0
-        while (i < schema.length) {
-          row(i) = valueConverters(i).apply(tokens(i))
+        while (i < requiredSchema.length) {
+          val from = tokenIndexArr(i)
+          row(i) = valueConverters(from).apply(tokens(from))
           i += 1
         }
         row

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5a868d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
index ec788df..d442ba7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
@@ -74,49 +74,7 @@ object CSVBenchmarks {
     }
   }
 
-  def multiColumnsBenchmark(rowsNum: Int): Unit = {
-    val colsNum = 1000
-    val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum)
-
-    withTempPath { path =>
-      val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
-      val schema = StructType(fields)
-      val values = (0 until colsNum).map(i => i.toString).mkString(",")
-      val columnNames = schema.fieldNames
-
-      spark.range(rowsNum)
-        .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
-        .write.option("header", true)
-        .csv(path.getAbsolutePath)
-
-      val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
-
-      benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
-        ds.select("*").filter((row: Row) => true).count()
-      }
-      val cols100 = columnNames.take(100).map(Column(_))
-      benchmark.addCase(s"Select 100 columns", 3) { _ =>
-        ds.select(cols100: _*).filter((row: Row) => true).count()
-      }
-      benchmark.addCase(s"Select one column", 3) { _ =>
-        ds.select($"col1").filter((row: Row) => true).count()
-      }
-
-      /*
-      Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
-
-      Wide rows with 1000 columns:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns) 
 Relative
-      --------------------------------------------------------------------------------------------
-      Select 1000 columns                     76910 / 78065          0.0       76909.8  
    1.0X
-      Select 100 columns                      28625 / 32884          0.0       28625.1  
    2.7X
-      Select one column                       22498 / 22669          0.0       22497.8  
    3.4X
-      */
-      benchmark.run()
-    }
-  }
-
   def main(args: Array[String]): Unit = {
     quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
-    multiColumnsBenchmark(rowsNum = 1000 * 1000)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5a868d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 5f9f799..07e6c74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -260,16 +260,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils
with Te
   }
 
   test("test for DROPMALFORMED parsing mode") {
-    withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") {
-      Seq(false, true).foreach { multiLine =>
-        val cars = spark.read
-          .format("csv")
-          .option("multiLine", multiLine)
-          .options(Map("header" -> "true", "mode" -> "dropmalformed"))
-          .load(testFile(carsFile))
+    Seq(false, true).foreach { multiLine =>
+      val cars = spark.read
+        .format("csv")
+        .option("multiLine", multiLine)
+        .options(Map("header" -> "true", "mode" -> "dropmalformed"))
+        .load(testFile(carsFile))
 
-        assert(cars.select("year").collect().size === 2)
-      }
+      assert(cars.select("year").collect().size === 2)
     }
   }
 
@@ -1370,31 +1368,4 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils
with Te
       checkAnswer(computed, expected)
     }
   }
-
-  test("SPARK-24244: Select a subset of all columns") {
-    withTempPath { path =>
-      import collection.JavaConverters._
-      val schema = new StructType()
-        .add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType)
-        .add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType)
-        .add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType)
-        .add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType)
-        .add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType)
-
-      val odf = spark.createDataFrame(List(
-        Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
-        Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15)
-      ).asJava, schema)
-      odf.write.csv(path.getCanonicalPath)
-      val idf = spark.read
-        .schema(schema)
-        .csv(path.getCanonicalPath)
-        .select('f15, 'f10, 'f5)
-
-      checkAnswer(
-        idf,
-        List(Row(15, 10, 5), Row(-15, -10, -5))
-      )
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message