spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-17916][SPARK-25241][SQL][FOLLOW-UP] Fix empty string being parsed as null when nullValue is set.
Date Tue, 11 Sep 2018 12:47:40 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 fb4965a41 -> b7efca7ec


[SPARK-17916][SPARK-25241][SQL][FOLLOW-UP] Fix empty string being parsed as null when nullValue
is set.

## What changes were proposed in this pull request?

In the PR, I propose new CSV option `emptyValue` and an update in the SQL Migration Guide
which describes how to revert previous behavior when empty strings were not written at all.
Since Spark 2.4, empty strings are saved as `""` to distinguish them from saved `null`s.

Closes #22234
Closes #22367

## How was this patch tested?

It was tested by `CSVSuite` and new tests added in the PR #22234

Closes #22389 from MaxGekk/csv-empty-value-master.

Lead-authored-by: Mario Molina <mmolimar@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
(cherry picked from commit c9cb393dc414ae98093c1541d09fa3c8663ce276)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7efca7e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7efca7e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7efca7e

Branch: refs/heads/branch-2.4
Commit: b7efca7ece484ee85091b1b50bbc84ad779f9bfe
Parents: fb4965a
Author: Mario Molina <mmolimar@gmail.com>
Authored: Tue Sep 11 20:47:14 2018 +0800
Committer: hyukjinkwon <gurwls223@apache.org>
Committed: Tue Sep 11 20:47:30 2018 +0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |   1 +
 python/pyspark/sql/readwriter.py                |  12 +-
 python/pyspark/sql/streaming.py                 |   7 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   1 +
 .../org/apache/spark/sql/DataFrameWriter.scala  |   1 +
 .../execution/datasources/csv/CSVOptions.scala  |  19 +++-
 .../spark/sql/streaming/DataStreamReader.scala  |   1 +
 .../resources/test-data/cars-empty-value.csv    |   4 +
 .../execution/datasources/csv/CSVSuite.scala    | 111 +++++++++++++++++++
 9 files changed, 149 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 3749094..9da7d64 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1897,6 +1897,7 @@ working with timestamps in `pandas_udf`s to get the best performance,
see
   - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column
value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs
an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when
it contains malformed column values requested from CSV datasource, other values can be ignored.
As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection
of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier
it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled`
to `false`.
   - Since Spark 2.4, File listing for compute statistics is done in parallel by default.
This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled`
to `False`.
   - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are
not counted as data files when calculating table size during Statistics computation.
+  - Since Spark 2.4, empty strings are saved as quoted empty strings `""`. In version 2.3
and earlier, empty strings are equal to `null` values and do not reflect to any characters
in saved CSV files. For example, the row of `"a", null, "", 1` was writted as `a,,,1`. Since
Spark 2.4, the same row is saved as `a,,"",1`. To restore the previous behavior, set the CSV
option `emptyValue` to empty (not quoted) string.  
 
 ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 49f4e6b..3ca5d54 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -349,7 +349,7 @@ class DataFrameReader(OptionUtils):
             negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
             columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
-            samplingRatio=None, enforceSchema=None):
+            samplingRatio=None, enforceSchema=None, emptyValue=None):
         """Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -444,6 +444,8 @@ class DataFrameReader(OptionUtils):
                                           different, ``\0`` otherwise.
         :param samplingRatio: defines fraction of rows used for schema inferring.
                               If None is set, it uses the default value, ``1.0``.
+        :param emptyValue: sets the string representation of an empty value. If None is set,
it uses
+                           the default value, empty string.
 
         >>> df = spark.read.csv('python/test_support/sql/ages.csv')
         >>> df.dtypes
@@ -463,7 +465,7 @@ class DataFrameReader(OptionUtils):
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
             columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
             charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
-            enforceSchema=enforceSchema)
+            enforceSchema=enforceSchema, emptyValue=emptyValue)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -859,7 +861,7 @@ class DataFrameWriter(OptionUtils):
     def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
             header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
             timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
-            charToEscapeQuoteEscaping=None, encoding=None):
+            charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None):
         """Saves the content of the :class:`DataFrame` in CSV format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -911,6 +913,8 @@ class DataFrameWriter(OptionUtils):
                                           different, ``\0`` otherwise..
         :param encoding: sets the encoding (charset) of saved csv files. If None is set,
                          the default UTF-8 charset will be used.
+        :param emptyValue: sets the string representation of an empty value. If None is set,
it uses
+                           the default value, ``""``.
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
@@ -921,7 +925,7 @@ class DataFrameWriter(OptionUtils):
                        ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
                        ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
                        charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
-                       encoding=encoding)
+                       encoding=encoding, emptyValue=emptyValue)
         self._jwrite.csv(path)
 
     @since(1.5)

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ee13778..522900b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -564,7 +564,7 @@ class DataStreamReader(OptionUtils):
             negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
             columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
-            enforceSchema=None):
+            enforceSchema=None, emptyValue=None):
         """Loads a CSV file stream and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -658,6 +658,8 @@ class DataStreamReader(OptionUtils):
                                           the quote character. If None is set, the default
value is
                                           escape character when escape and quote characters
are
                                           different, ``\0`` otherwise..
+        :param emptyValue: sets the string representation of an empty value. If None is set,
it uses
+                           the default value, empty string.
 
         >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
         >>> csv_sdf.isStreaming
@@ -674,7 +676,8 @@ class DataStreamReader(OptionUtils):
             maxCharsPerColumn=maxCharsPerColumn,
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
             columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
-            charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema)
+            charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
+            emptyValue=emptyValue)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))
         else:

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 0cfcc45..e6c2cba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -571,6 +571,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends
Logging {
    * whitespaces from values being read should be skipped.</li>
    * <li>`nullValue` (default empty string): sets the string representation of a null
value. Since
    * 2.0.1, this applies to all supported types including the string type.</li>
+   * <li>`emptyValue` (default empty string): sets the string representation of an
empty value.</li>
    * <li>`nanValue` (default `NaN`): sets the string representation of a non-number"
value.</li>
    * <li>`positiveInf` (default `Inf`): sets the string representation of a positive
infinity
    * value.</li>

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index eca2d5b..dfb8c47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -635,6 +635,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * enclosed in quotes. Default is to only escape values containing a quote character.</li>
    * <li>`header` (default `false`): writes the names of columns as the first line.</li>
    * <li>`nullValue` (default empty string): sets the string representation of a null
value.</li>
+   * <li>`emptyValue` (default `""`): sets the string representation of an empty value.</li>
    * <li>`encoding` (by default it is not set): specifies encoding (charset) of saved
csv
    * files. If it is not set, the UTF-8 charset will be used.</li>
    * <li>`compression` (default `null`): compression codec to use when saving to file.
This can be

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index fab8d62..492a21b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -162,6 +162,21 @@ class CSVOptions(
    */
   val enforceSchema = getBool("enforceSchema", default = true)
 
+
+  /**
+   * String representation of an empty value in read and in write.
+   */
+  val emptyValue = parameters.get("emptyValue")
+  /**
+   * The string is returned when CSV reader doesn't have any characters for input value,
+   * or an empty quoted string `""`. Default value is empty string.
+   */
+  val emptyValueInRead = emptyValue.getOrElse("")
+  /**
+   * The value is used instead of an empty string in write. Default value is `""`
+   */
+  val emptyValueInWrite = emptyValue.getOrElse("\"\"")
+
   def asWriterSettings: CsvWriterSettings = {
     val writerSettings = new CsvWriterSettings()
     val format = writerSettings.getFormat
@@ -173,7 +188,7 @@ class CSVOptions(
     writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
     writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
     writerSettings.setNullValue(nullValue)
-    writerSettings.setEmptyValue("\"\"")
+    writerSettings.setEmptyValue(emptyValueInWrite)
     writerSettings.setSkipEmptyLines(true)
     writerSettings.setQuoteAllFields(quoteAll)
     writerSettings.setQuoteEscapingEnabled(escapeQuotes)
@@ -194,7 +209,7 @@ class CSVOptions(
     settings.setInputBufferSize(inputBufferSize)
     settings.setMaxColumns(maxColumns)
     settings.setNullValue(nullValue)
-    settings.setEmptyValue("")
+    settings.setEmptyValue(emptyValueInRead)
     settings.setMaxCharsPerColumn(maxCharsPerColumn)
     settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
     settings

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 39e9e1a..2a4db4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -327,6 +327,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession)
extends Lo
    * whitespaces from values being read should be skipped.</li>
    * <li>`nullValue` (default empty string): sets the string representation of a null
value. Since
    * 2.0.1, this applies to all supported types including the string type.</li>
+   * <li>`emptyValue` (default empty string): sets the string representation of an
empty value.</li>
    * <li>`nanValue` (default `NaN`): sets the string representation of a non-number"
value.</li>
    * <li>`positiveInf` (default `Inf`): sets the string representation of a positive
infinity
    * value.</li>

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/sql/core/src/test/resources/test-data/cars-empty-value.csv
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/test-data/cars-empty-value.csv b/sql/core/src/test/resources/test-data/cars-empty-value.csv
new file mode 100644
index 0000000..0f20a2f
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/cars-empty-value.csv
@@ -0,0 +1,4 @@
+year,make,model,comment,blank
+"2012","Tesla","S","",""
+1997,Ford,E350,"Go get one now they are going fast",
+2015,Chevy,Volt,,""

http://git-wip-us.apache.org/repos/asf/spark/blob/b7efca7e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 5a1d667..2b39a0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -50,6 +50,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils
with Te
   private val carsAltFile = "test-data/cars-alternative.csv"
   private val carsUnbalancedQuotesFile = "test-data/cars-unbalanced-quotes.csv"
   private val carsNullFile = "test-data/cars-null.csv"
+  private val carsEmptyValueFile = "test-data/cars-empty-value.csv"
   private val carsBlankColName = "test-data/cars-blank-column-name.csv"
   private val emptyFile = "test-data/empty.csv"
   private val commentsFile = "test-data/comments.csv"
@@ -668,6 +669,70 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils
with Te
     assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
   }
 
+  test("empty fields with user defined empty values") {
+
+    // year,make,model,comment,blank
+    val dataSchema = StructType(List(
+      StructField("year", IntegerType, nullable = true),
+      StructField("make", StringType, nullable = false),
+      StructField("model", StringType, nullable = false),
+      StructField("comment", StringType, nullable = true),
+      StructField("blank", StringType, nullable = true)))
+    val cars = spark.read
+      .format("csv")
+      .schema(dataSchema)
+      .option("header", "true")
+      .option("emptyValue", "empty")
+      .load(testFile(carsEmptyValueFile))
+
+    verifyCars(cars, withHeader = true, checkValues = false)
+    val results = cars.collect()
+    assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
+    assert(results(1).toSeq ===
+      Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
+    assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
+  }
+
+  test("save csv with empty fields with user defined empty values") {
+    withTempDir { dir =>
+      val csvDir = new File(dir, "csv").getCanonicalPath
+
+      // year,make,model,comment,blank
+      val dataSchema = StructType(List(
+        StructField("year", IntegerType, nullable = true),
+        StructField("make", StringType, nullable = false),
+        StructField("model", StringType, nullable = false),
+        StructField("comment", StringType, nullable = true),
+        StructField("blank", StringType, nullable = true)))
+      val cars = spark.read
+        .format("csv")
+        .schema(dataSchema)
+        .option("header", "true")
+        .option("nullValue", "NULL")
+        .load(testFile(carsEmptyValueFile))
+
+      cars.coalesce(1).write
+        .format("csv")
+        .option("header", "true")
+        .option("emptyValue", "empty")
+        .option("nullValue", null)
+        .save(csvDir)
+
+      val carsCopy = spark.read
+        .format("csv")
+        .schema(dataSchema)
+        .option("header", "true")
+        .load(csvDir)
+
+      verifyCars(carsCopy, withHeader = true, checkValues = false)
+      val results = carsCopy.collect()
+      assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
+      assert(results(1).toSeq ===
+        Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
+      assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
+    }
+  }
+
   test("save csv with compression codec option") {
     withTempDir { dir =>
       val csvDir = new File(dir, "csv").getCanonicalPath
@@ -1375,6 +1440,52 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils
with Te
     }
   }
 
+  test("SPARK-25241: An empty string should not be coerced to null when emptyValue is passed.")
{
+    val litNull: String = null
+    val df = Seq(
+      (1, "John Doe"),
+      (2, ""),
+      (3, "-"),
+      (4, litNull)
+    ).toDF("id", "name")
+
+    // Checks for new behavior where a null is not coerced to an empty string when `emptyValue`
is
+    // set to anything but an empty string literal.
+    withTempPath { path =>
+      df.write
+        .option("emptyValue", "-")
+        .csv(path.getAbsolutePath)
+      val computed = spark.read
+        .option("emptyValue", "-")
+        .schema(df.schema)
+        .csv(path.getAbsolutePath)
+      val expected = Seq(
+        (1, "John Doe"),
+        (2, "-"),
+        (3, "-"),
+        (4, "-")
+      ).toDF("id", "name")
+
+      checkAnswer(computed, expected)
+    }
+    // Keeps the old behavior where empty string us coerced to emptyValue is not passed.
+    withTempPath { path =>
+      df.write
+        .csv(path.getAbsolutePath)
+      val computed = spark.read
+        .schema(df.schema)
+        .csv(path.getAbsolutePath)
+      val expected = Seq(
+        (1, "John Doe"),
+        (2, litNull),
+        (3, "-"),
+        (4, litNull)
+      ).toDF("id", "name")
+
+      checkAnswer(computed, expected)
+    }
+  }
+
   test("SPARK-24329: skip lines with comments, and one or multiple whitespaces") {
     val schema = new StructType().add("colA", StringType)
     val ds = spark


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message