spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-13792][SQL] Limit logging of bad records in CSV data source
Date Tue, 21 Jun 2016 04:46:14 GMT
Repository: spark
Updated Branches:
  refs/heads/master 217db56ba -> c775bf09e


[SPARK-13792][SQL] Limit logging of bad records in CSV data source

## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the
maximum of logging message Spark generates per partition for malformed records.

The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this
partition. Malformed records from now on will not be logged.
```

Closes #12173

## How was this patch tested?
Manually tested.

Author: Reynold Xin <rxin@databricks.com>

Closes #13795 from rxin/SPARK-13792.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c775bf09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c775bf09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c775bf09

Branch: refs/heads/master
Commit: c775bf09e0c3540f76de3f15d3fd35112a4912c1
Parents: 217db56
Author: Reynold Xin <rxin@databricks.com>
Authored: Mon Jun 20 21:46:12 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Mon Jun 20 21:46:12 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  4 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +
 .../datasources/csv/CSVFileFormat.scala         |  9 ++++-
 .../execution/datasources/csv/CSVOptions.scala  |  2 +
 .../execution/datasources/csv/CSVRelation.scala | 42 +++++++++++++-------
 5 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 72fd184..89506ca 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils):
         :param maxCharsPerColumn: defines the maximum number of characters allowed for any
given
                                   value being read. If None is set, it uses the default value,
                                   ``1000000``.
+        :param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark
will
+                                            log for each partition. Malformed records beyond
this
+                                            number will be ignored. If None is set, it
+                                            uses the default value, ``10``.
         :param mode: allows a mode for dealing with corrupt records during parsing. If None
is
                      set, it uses the default value, ``PERMISSIVE``.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 841503b..35ba9c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends
Logging {
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters
allowed
    * for any given value being read.</li>
+   * <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed
rows
+   * Spark will log for each partition. Malformed records beyond this number will be ignored.</li>
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    *    during parsing.</li>
    * <ul>

http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index be52de8..12e19f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -120,7 +120,14 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister
{
 
       val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
       val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
-      tokenizedIterator.flatMap(parser(_).toSeq)
+      var numMalformedRecords = 0
+      tokenizedIterator.flatMap { recordTokens =>
+        val row = parser(recordTokens, numMalformedRecords)
+        if (row.isEmpty) {
+          numMalformedRecords += 1
+        }
+        row
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9f4ce83..581eda7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -113,6 +113,8 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String,
Str
 
   val escapeQuotes = getBool("escapeQuotes", true)
 
+  val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10)
+
   val inputBufferSize = 128
 
   val isCommentSet = this.comment != '\u0000'

http://git-wip-us.apache.org/repos/asf/spark/blob/c775bf09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index d72c8b9..083ac33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -50,10 +50,19 @@ object CSVRelation extends Logging {
     }
   }
 
+  /**
+   * Returns a function that parses a single CSV record (in the form of an array of strings
in which
+   * each element represents a column) and turns it into either one resulting row or no row
(if the
+   * the record is malformed).
+   *
+   * The 2nd argument in the returned function represents the total number of malformed rows
+   * observed so far.
+   */
+  // This is pretty convoluted and we should probably rewrite the entire CSV parsing soon.
   def csvParser(
       schema: StructType,
       requiredColumns: Array[String],
-      params: CSVOptions): Array[String] => Option[InternalRow] = {
+      params: CSVOptions): (Array[String], Int) => Option[InternalRow] = {
     val schemaFields = schema.fields
     val requiredFields = StructType(requiredColumns.map(schema(_))).fields
     val safeRequiredFields = if (params.dropMalformed) {
@@ -72,9 +81,16 @@ object CSVRelation extends Logging {
     val requiredSize = requiredFields.length
     val row = new GenericMutableRow(requiredSize)
 
-    (tokens: Array[String]) => {
+    (tokens: Array[String], numMalformedRows) => {
       if (params.dropMalformed && schemaFields.length != tokens.length) {
-        logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+        if (numMalformedRows < params.maxMalformedLogPerPartition) {
+          logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+        }
+        if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
+          logWarning(
+            s"More than ${params.maxMalformedLogPerPartition} malformed records have been
" +
+            "found on this partition. Malformed records from now on will not be logged.")
+        }
         None
       } else if (params.failFast && schemaFields.length != tokens.length) {
         throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
@@ -109,23 +125,21 @@ object CSVRelation extends Logging {
           Some(row)
         } catch {
           case NonFatal(e) if params.dropMalformed =>
-            logWarning("Parse exception. " +
-              s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+            if (numMalformedRows < params.maxMalformedLogPerPartition) {
+              logWarning("Parse exception. " +
+                s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+            }
+            if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
+              logWarning(
+                s"More than ${params.maxMalformedLogPerPartition} malformed records have
been " +
+                "found on this partition. Malformed records from now on will not be logged.")
+            }
             None
         }
       }
     }
   }
 
-  def parseCsv(
-      tokenizedRDD: RDD[Array[String]],
-      schema: StructType,
-      requiredColumns: Array[String],
-      options: CSVOptions): RDD[InternalRow] = {
-    val parser = csvParser(schema, requiredColumns, options)
-    tokenizedRDD.flatMap(parser(_).toSeq)
-  }
-
   // Skips the header line of each file if the `header` option is set to true.
   def dropHeaderLine(
       file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message