spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-17310][SQL] Add an option to disable record-level filter in Parquet-side
Date Tue, 14 Nov 2017 11:34:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master d8741b2b0 -> 673c67046


[SPARK-17310][SQL] Add an option to disable record-level filter in Parquet-side

## What changes were proposed in this pull request?

There is a concern that Spark-side codegen row-by-row filtering might be faster than Parquet's
one in general due to type-boxing and additional fuction calls which Spark's one tries to
avoid.

So, this PR adds an option to disable/enable record-by-record filtering in Parquet side.

It sets the default to `false` to take the advantage of the improvement.

This was also discussed in https://github.com/apache/spark/pull/14671.
## How was this patch tested?

Manually benchmarks were performed. I generated a billion (1,000,000,000) records and tested
equality comparison concatenated with `OR`. This filter combinations were made from 5 to 30.

It seem indeed Spark-filtering is faster in the test case and the gap increased as the filter
tree becomes larger.

The details are as below:

**Code**

``` scala
test("Parquet-side filter vs Spark-side filter - record by record") {
  withTempPath { path =>
    val N = 1000 * 1000 * 1000
    val df = spark.range(N).toDF("a")
    df.write.parquet(path.getAbsolutePath)

    val benchmark = new Benchmark("Parquet-side vs Spark-side", N)
    Seq(5, 10, 20, 30).foreach { num =>
      val filterExpr = (0 to num).map(i => s"a = $i").mkString(" OR ")

      benchmark.addCase(s"Parquet-side filter - number of filters [$num]", 3) { _ =>
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
          SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> true.toString) {

          // We should strip Spark-side filter to compare correctly.
          stripSparkFilter(
            spark.read.parquet(path.getAbsolutePath).filter(filterExpr)).count()
        }
      }

      benchmark.addCase(s"Spark-side filter - number of filters [$num]", 3) { _ =>
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
          SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> false.toString) {

          spark.read.parquet(path.getAbsolutePath).filter(filterExpr).count()
        }
      }
    }

    benchmark.run()
  }
}
```

**Result**

```
Parquet-side vs Spark-side:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Parquet-side filter - number of filters [5]      4268 / 4367        234.3           4.3  
    0.8X
Spark-side filter - number of filters [5]      3709 / 3741        269.6           3.7    
  0.9X
Parquet-side filter - number of filters [10]      5673 / 5727        176.3           5.7 
     0.6X
Spark-side filter - number of filters [10]      3588 / 3632        278.7           3.6   
   0.9X
Parquet-side filter - number of filters [20]      8024 / 8440        124.6           8.0 
     0.4X
Spark-side filter - number of filters [20]      3912 / 3946        255.6           3.9   
   0.8X
Parquet-side filter - number of filters [30]    11936 / 12041         83.8          11.9 
     0.3X
Spark-side filter - number of filters [30]      3929 / 3978        254.5           3.9   
   0.8X
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15049 from HyukjinKwon/SPARK-17310.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/673c6704
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/673c6704
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/673c6704

Branch: refs/heads/master
Commit: 673c67046598d33b9ecf864024ca7a937c1998d6
Parents: d8741b2
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Tue Nov 14 12:34:21 2017 +0100
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Nov 14 12:34:21 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 ++++
 .../datasources/parquet/ParquetFileFormat.scala | 14 +++---
 .../parquet/ParquetFilterSuite.scala            | 51 +++++++++++++++++++-
 3 files changed, 65 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/673c6704/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 831ef62..0cb58fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -327,6 +327,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled")
+    .doc("If true, enables Parquet's native record-level filtering using the pushed down
" +
+      "filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown'
" +
+      "is enabled.")
+    .booleanConf
+    .createWithDefault(false)
+
   val PARQUET_OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.parquet.output.committer.class")
     .doc("The output committer class used by Parquet. The specified class needs to be a "
+
       "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass
" +
@@ -1173,6 +1180,8 @@ class SQLConf extends Serializable with Logging {
 
   def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
 
+  def parquetRecordFilterEnabled: Boolean = getConf(PARQUET_RECORD_FILTER_ENABLED)
+
   def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
 
   def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)

http://git-wip-us.apache.org/repos/asf/spark/blob/673c6704/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index a48f8d5..044b1a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -335,6 +335,8 @@ class ParquetFileFormat
     val enableVectorizedReader: Boolean =
       sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
       resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean =
+      sparkSession.sessionState.conf.parquetRecordFilterEnabled
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
 
@@ -374,13 +376,11 @@ class ParquetFileFormat
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns UnsafeRow
-        val reader = pushed match {
-          case Some(filter) =>
-            new ParquetRecordReader[UnsafeRow](
-              new ParquetReadSupport,
-              FilterCompat.get(filter, null))
-          case _ =>
-            new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport, parquetFilter)
+        } else {
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
         }
         reader.initialize(split, hadoopAttemptContext)
         reader

http://git-wip-us.apache.org/repos/asf/spark/blob/673c6704/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 90f6620..3380195 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -45,8 +45,29 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
  *
  * 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to ensure the inferred
  *    data type is nullable.
+ *
+ * NOTE:
+ *
+ * This file intendedly enables record-level filtering explicitly. If new test cases are
+ * dependent on this configuration, don't forget you better explicitly set this configuration
+ * within the test.
  */
 class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    // Note that there are many tests here that require record-level filtering set to be
true.
+    spark.conf.set(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key, "true")
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      spark.conf.unset(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key)
+    } finally {
+      super.afterEach()
+    }
+  }
+
   private def checkFilterPredicate(
       df: DataFrame,
       predicate: Predicate,
@@ -369,7 +390,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
   test("Filter applied on merged Parquet schema with new column should work") {
     import testImplicits._
-    Seq("true", "false").map { vectorized =>
+    Seq("true", "false").foreach { vectorized =>
       withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
         SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
         SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
@@ -491,7 +512,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
   }
 
-  test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
+  test("Filters should be pushed down for vectorized Parquet reader at row group level")
{
     import testImplicits._
 
     withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
@@ -555,6 +576,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("Filters should be pushed down for Parquet readers at row group level") {
+    import testImplicits._
+
+    withSQLConf(
+      // Makes sure disabling 'spark.sql.parquet.recordFilter' still enables
+      // row group level filtering.
+      SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> "false",
+      SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+      SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+      withTempPath { path =>
+        val data = (1 to 1024)
+        data.toDF("a").coalesce(1)
+          .write.option("parquet.block.size", 512)
+          .parquet(path.getAbsolutePath)
+        val df = spark.read.parquet(path.getAbsolutePath).filter("a == 500")
+        // Here, we strip the Spark side filter and check the actual results from Parquet.
+        val actual = stripSparkFilter(df).collect().length
+        // Since those are filtered at row group level, the result count should be less
+        // than the total length but should not be a single record.
+        // Note that, if record level filtering is enabled, it should be a single record.
+        // If no filter is pushed down to Parquet, it should be the total length of data.
+        assert(actual > 1 && actual < data.length)
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message