spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level for parquet reader
Date Wed, 10 Aug 2016 17:03:58 GMT
Repository: spark
Updated Branches:
  refs/heads/master 11a6844be -> 19af298bb


[SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level for parquet reader

## What changes were proposed in this pull request?

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try
to get pushed-down filters from the given configuration. This pushed-down filters are used
for RowGroups-level filtering. However, we don't set up the filters to push down into the
configuration. In other words, the filters are not actually pushed down to do RowGroups-level
filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration
for the reader.

The benchmark that excludes the time of writing Parquet file:

    test("Benchmark for Parquet") {
      val N = 500 << 12
        withParquetTable((0 until N).map(i => (101, i)), "t") {
          val benchmark = new Benchmark("Parquet reader", N)
          benchmark.addCase("reading Parquet file", 10) { iter =>
            sql("SELECT _1 FROM t where t._1 < 100").collect()
          }
          benchmark.run()
      }
    }

`withParquetTable` in default will run tests for vectorized reader non-vectorized readers.
I only let it run vectorized reader.

When we set the block size of parquet as 1024 to have multiple row groups. The benchmark is:

Before this patch:

The retrieved row groups: 8063

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           825 / 1233          2.5         402.6 
     1.0X

After this patch:

The retrieved row groups: 0

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           306 /  503          6.7         149.6 
     1.0X

Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled
pushdown configuration. This time the parquet block size is default value.

Before this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           136 /  238         15.0          66.5 
     1.0X

After this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           124 /  193         16.5          60.7 
     1.0X

For non-pushdown case, from the results, I think this patch doesn't affect normal code path.

I've manually output the `totalRowCount` in `SpecificParquetRecordReaderBase` to see if this
patch actually filter the row-groups. When running the above benchmark:

After this patch:
    `totalRowCount = 0`

Before this patch:
    `totalRowCount = 1024000`

## How was this patch tested?
Existing tests should be passed.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #13701 from viirya/vectorized-reader-push-down-filter2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19af298b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19af298b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19af298b

Branch: refs/heads/master
Commit: 19af298bb6d264adcf02f6f84c8dc1542b408507
Parents: 11a6844
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Authored: Wed Aug 10 10:03:55 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Wed Aug 10 10:03:55 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala |   9 +
 .../org/apache/spark/util/AccumulatorV2.scala   |  12 ++
 .../SpecificParquetRecordReaderBase.java        |  18 ++
 .../datasources/parquet/ParquetFileFormat.scala |  86 +---------
 .../parquet/ParquetFilterSuite.scala            | 165 +++++++++++--------
 5 files changed, 143 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/19af298b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 5bb505b..dd149a9 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -225,6 +225,15 @@ class TaskMetrics private[spark] () extends Serializable {
   }
 
   private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums
+
+  /**
+   * Looks for a registered accumulator by accumulator name.
+   */
+  private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]]
= {
+    accumulators.find { acc =>
+      acc.name.isDefined && acc.name.get == name
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/19af298b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index a9167ce..d130a37 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -23,6 +23,8 @@ import java.util.ArrayList
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
 import org.apache.spark.scheduler.AccumulableInfo
 
@@ -257,6 +259,16 @@ private[spark] object AccumulatorContext {
     originals.clear()
   }
 
+  /**
+   * Looks for a registered accumulator by accumulator name.
+   */
+  private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]]
= {
+    originals.values().asScala.find { ref =>
+      val acc = ref.get
+      acc != null && acc.name.isDefined && acc.name.get == name
+    }.map(_.get)
+  }
+
   // Identifier for distinguishing SQL metrics from other accumulators
   private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/19af298b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index dfe6967..06cd9ea 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import scala.Option;
+
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
@@ -59,8 +61,12 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.util.AccumulatorV2;
+import org.apache.spark.util.LongAccumulator;
 
 /**
  * Base class for custom RecordReaders for Parquet that directly materialize to `T`.
@@ -145,6 +151,18 @@ public abstract class SpecificParquetRecordReaderBase<T> extends
RecordReader<Vo
     for (BlockMetaData block : blocks) {
       this.totalRowCount += block.getRowCount();
     }
+
+    // For test purpose.
+    // If the predefined accumulator exists, the row group number to read will be updated
+    // to the accumulator. So we can check if the row groups are filtered or not in test
case.
+    TaskContext taskContext = TaskContext$.MODULE$.get();
+    if (taskContext != null) {
+      Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>)
taskContext.taskMetrics()
+        .lookForAccumulatorByName("numRowGroups");
+      if (accu.isDefined()) {
+        ((LongAccumulator)accu.get()).add((long)blocks.size());
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/19af298b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 612a295..7794f31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -357,6 +358,11 @@ class ParquetFileFormat
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
 
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
       val parquetReader = if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader()
         vectorizedReader.initialize(split, hadoopAttemptContext)
@@ -563,87 +569,7 @@ private[parquet] class ParquetOutputWriter(
   override def close(): Unit = recordWriter.close(context)
 }
 
-
 object ParquetFileFormat extends Logging {
-  /**
-   * If parquet's block size (row group size) setting is larger than the min split size,
-   * we use parquet's block size setting as the min split size. Otherwise, we will create
-   * tasks processing nothing (because a split does not cover the starting point of a
-   * parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information.
-   */
-  private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = {
-    val minSplitSize =
-      math.max(
-        conf.getLong("mapred.min.split.size", 0L),
-        conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
-    if (parquetBlockSize > minSplitSize) {
-      val message =
-        s"Parquet's block size (row group size) is larger than " +
-          s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting
" +
-          s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to "
+
-          s"$parquetBlockSize."
-      logDebug(message)
-      conf.set("mapred.min.split.size", parquetBlockSize.toString)
-      conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString)
-    }
-  }
-
-  /** This closure sets various Parquet configurations at both driver side and executor side.
*/
-  private[parquet] def initializeLocalJobFunc(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      dataSchema: StructType,
-      parquetBlockSize: Long,
-      useMetadataCache: Boolean,
-      parquetFilterPushDown: Boolean,
-      assumeBinaryIsString: Boolean,
-      assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
-    val conf = job.getConfiguration
-    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-
-    // Try to push down filters when filter push-down is enabled.
-    if (parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not all predicates
can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      ParquetSchemaConverter.checkFieldNames(requestedSchema).json
-    })
-
-    conf.set(
-      ParquetWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetSchemaConverter.checkFieldNames(dataSchema).json)
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
-    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
-    // Sets flags for `CatalystSchemaConverter`
-    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
-    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
-
-    overrideMinSplitSize(parquetBlockSize, conf)
-  }
-
-  /** This closure sets input paths at the driver side. */
-  private[parquet] def initializeDriverSideJobFunc(
-      inputFiles: Array[FileStatus],
-      parquetBlockSize: Long)(job: Job): Unit = {
-    // We side the input paths at the driver side.
-    logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
-    if (inputFiles.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
-    }
-
-    overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
-  }
-
   private[parquet] def readSchema(
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/19af298b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d846b27..4246b54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
 
 /**
  * A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -368,73 +369,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
   test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
     import testImplicits._
-
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
-      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
-      withTempPath { dir =>
-        val pathOne = s"${dir.getCanonicalPath}/table1"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
-        val pathTwo = s"${dir.getCanonicalPath}/table2"
-        (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
-
-        // If the "c = 1" filter gets pushed down, this query will throw an exception which
-        // Parquet emits. This is a Parquet issue (PARQUET-389).
-        val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b",
"a")
-        checkAnswer(
-          df,
-          Row(1, "1", null))
-
-        // The fields "a" and "c" only exist in one Parquet file.
-        assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-        assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
-        val pathThree = s"${dir.getCanonicalPath}/table3"
-        df.write.parquet(pathThree)
-
-        // We will remove the temporary metadata when writing Parquet file.
-        val schema = spark.read.parquet(pathThree).schema
-        assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
-        val pathFour = s"${dir.getCanonicalPath}/table4"
-        val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
-        dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
-
-        val pathFive = s"${dir.getCanonicalPath}/table5"
-        val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
-        dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
-
-        // If the "s.c = 1" filter gets pushed down, this query will throw an exception which
-        // Parquet emits.
-        val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
-          .selectExpr("s")
-        checkAnswer(dfStruct3, Row(Row(null, 1)))
-
-        // The fields "s.a" and "s.c" only exist in one Parquet file.
-        val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
-        assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-        assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
-        val pathSix = s"${dir.getCanonicalPath}/table6"
-        dfStruct3.write.parquet(pathSix)
-
-        // We will remove the temporary metadata when writing Parquet file.
-        val forPathSix = spark.read.parquet(pathSix).schema
-        assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
-        // sanity test: make sure optional metadata field is not wrongly set.
-        val pathSeven = s"${dir.getCanonicalPath}/table7"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
-        val pathEight = s"${dir.getCanonicalPath}/table8"
-        (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
-
-        val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a",
"b")
-        checkAnswer(
-          df2,
-          Row(1, "1"))
-
-        // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
-        assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
-        assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+    Seq("true", "false").map { vectorized =>
+      withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+        SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+        withTempPath { dir =>
+          val pathOne = s"${dir.getCanonicalPath}/table1"
+          (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+          val pathTwo = s"${dir.getCanonicalPath}/table2"
+          (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+          // If the "c = 1" filter gets pushed down, this query will throw an exception which
+          // Parquet emits. This is a Parquet issue (PARQUET-389).
+          val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b",
"a")
+          checkAnswer(
+            df,
+            Row(1, "1", null))
+
+          // The fields "a" and "c" only exist in one Parquet file.
+          assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+          assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+          val pathThree = s"${dir.getCanonicalPath}/table3"
+          df.write.parquet(pathThree)
+
+          // We will remove the temporary metadata when writing Parquet file.
+          val schema = spark.read.parquet(pathThree).schema
+          assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+          val pathFour = s"${dir.getCanonicalPath}/table4"
+          val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+          dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
+
+          val pathFive = s"${dir.getCanonicalPath}/table5"
+          val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
+          dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
+
+          // If the "s.c = 1" filter gets pushed down, this query will throw an exception
which
+          // Parquet emits.
+          val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
+            .selectExpr("s")
+          checkAnswer(dfStruct3, Row(Row(null, 1)))
+
+          // The fields "s.a" and "s.c" only exist in one Parquet file.
+          val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
+          assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+          assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
+
+          val pathSix = s"${dir.getCanonicalPath}/table6"
+          dfStruct3.write.parquet(pathSix)
+
+          // We will remove the temporary metadata when writing Parquet file.
+          val forPathSix = spark.read.parquet(pathSix).schema
+          assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+          // sanity test: make sure optional metadata field is not wrongly set.
+          val pathSeven = s"${dir.getCanonicalPath}/table7"
+          (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
+          val pathEight = s"${dir.getCanonicalPath}/table8"
+          (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
+
+          val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a",
"b")
+          checkAnswer(
+            df2,
+            Row(1, "1"))
+
+          // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
+          assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+          assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
+        }
       }
     }
   }
@@ -527,4 +530,32 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       assert(df.filter("_1 IS NOT NULL").count() === 4)
     }
   }
+
+  test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
+        SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/table"
+        (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
+
+        Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map {
case (push, func) =>
+          withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
+            val accu = new LongAccumulator
+            accu.register(sparkContext, Some("numRowGroups"))
+
+            val df = spark.read.parquet(path).filter("a < 100")
+            df.foreachPartition(_.foreach(v => accu.add(0)))
+            df.collect
+
+            val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups")
+            assert(numRowGroups.isDefined)
+            assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
+            AccumulatorContext.remove(accu.id)
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message