spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject [spark] branch branch-3.0 updated: [SPARK-32142][SQL][TESTS] Keep the original tests and codes to avoid potential conflicts in dev
Date Wed, 01 Jul 2020 05:21:20 GMT
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new edc3b37  [SPARK-32142][SQL][TESTS] Keep the original tests and codes to avoid potential conflicts in dev
edc3b37 is described below

commit edc3b37087f084fbabec8f53c45d99dc3370a129
Author: HyukjinKwon <gurwls223@apache.org>
AuthorDate: Wed Jul 1 14:15:02 2020 +0900

    [SPARK-32142][SQL][TESTS] Keep the original tests and codes to avoid potential conflicts in dev
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to partially reverts back in the tests and some codes at https://github.com/apache/spark/pull/27728 without touching any behaivours.
    
    Most of changes in tests are back before #27728 by combining `withNestedDataFrame` and `withParquetDataFrame`.
    
    Basically, it addresses the comments https://github.com/apache/spark/pull/27728#discussion_r397655390, and my own comment in another PR at https://github.com/apache/spark/pull/28761#discussion_r446761037
    
    ### Why are the changes needed?
    
    For maintenance purpose and to avoid a potential conflicts during backports. And also in case when other codes are matched with this.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, dev-only.
    
    ### How was this patch tested?
    
    Manually tested.
    
    Closes #28955 from HyukjinKwon/SPARK-25556-followup.
    
    Authored-by: HyukjinKwon <gurwls223@apache.org>
    Signed-off-by: HyukjinKwon <gurwls223@apache.org>
    (cherry picked from commit 8194d9ef788278ec23902da851f2a3c95f5f71bf)
    Signed-off-by: HyukjinKwon <gurwls223@apache.org>
---
 .../datasources/parquet/ParquetFilterSuite.scala   | 789 +++++++++++----------
 .../datasources/parquet/ParquetIOSuite.scala       |  20 +-
 .../datasources/parquet/ParquetTest.scala          |  14 +-
 3 files changed, 411 insertions(+), 412 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d20a07f..8b922aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.time.{LocalDate, LocalDateTime, ZoneId}
 
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
 import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
@@ -106,10 +109,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
   }
 
   /**
-   * Takes single level `inputDF` dataframe to generate multi-level nested
-   * dataframes as new test data.
+   * Takes a sequence of products `data` to generate multi-level nested
+   * dataframes as new test data. It tests both non-nested and nested dataframes
+   * which are written and read back with Parquet datasource.
+   *
+   * This is different from [[ParquetTest.withParquetDataFrame]] which does not
+   * test nested cases.
    */
-  private def withNestedDataFrame(inputDF: DataFrame)
+  private def withNestedParquetDataFrame[T <: Product: ClassTag: TypeTag](data: Seq[T])
+      (runTest: (DataFrame, String, Any => Any) => Unit): Unit =
+    withNestedParquetDataFrame(spark.createDataFrame(data))(runTest)
+
+  private def withNestedParquetDataFrame(inputDF: DataFrame)
       (runTest: (DataFrame, String, Any => Any) => Unit): Unit = {
     assert(inputDF.schema.fields.length == 1)
     assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
@@ -138,8 +149,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
         "`a.b`.`c.d`", // one level nesting with column names containing `dots`
         (x: Any) => Row(x)
       )
-    ).foreach { case (df, colName, resultFun) =>
-      runTest(df, colName, resultFun)
+    ).foreach { case (newDF, colName, resultFun) =>
+      withTempPath { file =>
+        newDF.write.format(dataSourceName).save(file.getCanonicalPath)
+        readParquetFile(file.getCanonicalPath) { df => runTest(df, colName, resultFun) }
+      }
     }
   }
 
@@ -155,7 +169,9 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
 
     import testImplicits._
     val df = data.map(i => Tuple1(Timestamp.valueOf(i))).toDF()
-    withNestedDataFrame(df) { case (inputDF, colName, fun) =>
+    withNestedParquetDataFrame(df) { case (parquetDF, colName, fun) =>
+      implicit val df: DataFrame = parquetDF
+
       def resultFun(tsStr: String): Any = {
         val parsed = if (java8Api) {
           LocalDateTime.parse(tsStr.replace(" ", "T"))
@@ -166,36 +182,35 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
         }
         fun(parsed)
       }
-      withParquetDataFrame(inputDF) { implicit df =>
-        val tsAttr = df(colName).expr
-        assert(df(colName).expr.dataType === TimestampType)
-
-        checkFilterPredicate(tsAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]],
-          data.map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]],
-          Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]],
-          Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
-        checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4))
-
-        checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4))
-        checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4))
-
-        checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4))
-        checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or],
-          Seq(Row(resultFun(ts1)), Row(resultFun(ts4))))
-      }
+
+      val tsAttr = df(colName).expr
+      assert(df(colName).expr.dataType === TimestampType)
+
+      checkFilterPredicate(tsAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]],
+        data.map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1))
+      checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1))
+      checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]],
+        Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1))
+      checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]],
+        Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
+      checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1))
+      checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4))
+
+      checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1))
+      checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1))
+      checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1))
+      checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4))
+      checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1))
+      checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4))
+
+      checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4))
+      checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or],
+        Seq(Row(resultFun(ts1)), Row(resultFun(ts4))))
     }
   }
 
@@ -226,272 +241,264 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
 
   test("filter pushdown - boolean") {
     val data = (true :: false :: Nil).map(b => Tuple1.apply(Option(b)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val booleanAttr = df(colName).expr
-        assert(df(colName).expr.dataType === BooleanType)
-
-        checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]],
-          Seq(Row(resultFun(true)), Row(resultFun(false))))
-
-        checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], resultFun(true))
-        checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], resultFun(true))
-        checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], resultFun(false))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val booleanAttr = df(colName).expr
+      assert(df(colName).expr.dataType === BooleanType)
+
+      checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]],
+        Seq(Row(resultFun(true)), Row(resultFun(false))))
+
+      checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], resultFun(true))
+      checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], resultFun(true))
+      checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], resultFun(false))
     }
   }
 
   test("filter pushdown - tinyint") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toByte)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val tinyIntAttr = df(colName).expr
-        assert(df(colName).expr.dataType === ByteType)
-
-        checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(Literal(4.toByte) <= tinyIntAttr, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(!(tinyIntAttr < 4.toByte), classOf[GtEq[_]], resultFun(4))
-        checkFilterPredicate(tinyIntAttr < 2.toByte || tinyIntAttr > 3.toByte,
-          classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val tinyIntAttr = df(colName).expr
+      assert(df(colName).expr.dataType === ByteType)
+
+      checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(Literal(4.toByte) <= tinyIntAttr, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(!(tinyIntAttr < 4.toByte), classOf[GtEq[_]], resultFun(4))
+      checkFilterPredicate(tinyIntAttr < 2.toByte || tinyIntAttr > 3.toByte,
+        classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
     }
   }
 
   test("filter pushdown - smallint") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toShort)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val smallIntAttr = df(colName).expr
-        assert(df(colName).expr.dataType === ShortType)
-
-        checkFilterPredicate(smallIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(smallIntAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(smallIntAttr === 1.toShort, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(smallIntAttr <=> 1.toShort, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(smallIntAttr =!= 1.toShort, classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(smallIntAttr < 2.toShort, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(smallIntAttr > 3.toShort, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(smallIntAttr <= 1.toShort, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(smallIntAttr >= 4.toShort, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(Literal(1.toShort) === smallIntAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(1.toShort) <=> smallIntAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(2.toShort) > smallIntAttr, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(Literal(3.toShort) < smallIntAttr, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(Literal(1.toShort) >= smallIntAttr, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(Literal(4.toShort) <= smallIntAttr, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(!(smallIntAttr < 4.toShort), classOf[GtEq[_]], resultFun(4))
-        checkFilterPredicate(smallIntAttr < 2.toShort || smallIntAttr > 3.toShort,
-          classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val smallIntAttr = df(colName).expr
+      assert(df(colName).expr.dataType === ShortType)
+
+      checkFilterPredicate(smallIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(smallIntAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(smallIntAttr === 1.toShort, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(smallIntAttr <=> 1.toShort, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(smallIntAttr =!= 1.toShort, classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(smallIntAttr < 2.toShort, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(smallIntAttr > 3.toShort, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(smallIntAttr <= 1.toShort, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(smallIntAttr >= 4.toShort, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(Literal(1.toShort) === smallIntAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(1.toShort) <=> smallIntAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(2.toShort) > smallIntAttr, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(Literal(3.toShort) < smallIntAttr, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(Literal(1.toShort) >= smallIntAttr, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(Literal(4.toShort) <= smallIntAttr, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(!(smallIntAttr < 4.toShort), classOf[GtEq[_]], resultFun(4))
+      checkFilterPredicate(smallIntAttr < 2.toShort || smallIntAttr > 3.toShort,
+        classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
     }
   }
 
   test("filter pushdown - integer") {
     val data = (1 to 4).map(i => Tuple1(Option(i)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val intAttr = df(colName).expr
-        assert(df(colName).expr.dataType === IntegerType)
-
-        checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(intAttr === 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(intAttr < 2, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(intAttr > 3, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], resultFun(4))
-        checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or],
-          Seq(Row(resultFun(1)), Row(resultFun(4))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val intAttr = df(colName).expr
+      assert(df(colName).expr.dataType === IntegerType)
+
+      checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(intAttr === 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(intAttr < 2, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(intAttr > 3, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], resultFun(4))
+      checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or],
+        Seq(Row(resultFun(1)), Row(resultFun(4))))
     }
   }
 
   test("filter pushdown - long") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toLong)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val longAttr = df(colName).expr
-        assert(df(colName).expr.dataType === LongType)
-
-        checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(longAttr === 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(longAttr < 2, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(longAttr > 3, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], resultFun(4))
-        checkFilterPredicate(longAttr < 2 || longAttr > 3, classOf[Operators.Or],
-          Seq(Row(resultFun(1)), Row(resultFun(4))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val longAttr = df(colName).expr
+      assert(df(colName).expr.dataType === LongType)
+
+      checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(longAttr === 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(longAttr < 2, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(longAttr > 3, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], resultFun(4))
+      checkFilterPredicate(longAttr < 2 || longAttr > 3, classOf[Operators.Or],
+        Seq(Row(resultFun(1)), Row(resultFun(4))))
     }
   }
 
   test("filter pushdown - float") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toFloat)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val floatAttr = df(colName).expr
-        assert(df(colName).expr.dataType === FloatType)
-
-        checkFilterPredicate(floatAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(floatAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(floatAttr === 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(floatAttr <=> 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(floatAttr =!= 1, classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(floatAttr < 2, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(floatAttr > 3, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(floatAttr <= 1, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(floatAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(Literal(1) === floatAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(1) <=> floatAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(2) > floatAttr, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(Literal(3) < floatAttr, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(Literal(1) >= floatAttr, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(Literal(4) <= floatAttr, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(!(floatAttr < 4), classOf[GtEq[_]], resultFun(4))
-        checkFilterPredicate(floatAttr < 2 || floatAttr > 3, classOf[Operators.Or],
-          Seq(Row(resultFun(1)), Row(resultFun(4))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val floatAttr = df(colName).expr
+      assert(df(colName).expr.dataType === FloatType)
+
+      checkFilterPredicate(floatAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(floatAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(floatAttr === 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(floatAttr <=> 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(floatAttr =!= 1, classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(floatAttr < 2, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(floatAttr > 3, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(floatAttr <= 1, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(floatAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(Literal(1) === floatAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(1) <=> floatAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(2) > floatAttr, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(Literal(3) < floatAttr, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(Literal(1) >= floatAttr, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(Literal(4) <= floatAttr, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(!(floatAttr < 4), classOf[GtEq[_]], resultFun(4))
+      checkFilterPredicate(floatAttr < 2 || floatAttr > 3, classOf[Operators.Or],
+        Seq(Row(resultFun(1)), Row(resultFun(4))))
     }
   }
 
   test("filter pushdown - double") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toDouble)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val doubleAttr = df(colName).expr
-        assert(df(colName).expr.dataType === DoubleType)
-
-        checkFilterPredicate(doubleAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(doubleAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(doubleAttr === 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(doubleAttr <=> 1, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(doubleAttr =!= 1, classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i))))
-
-        checkFilterPredicate(doubleAttr < 2, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(doubleAttr > 3, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(doubleAttr <= 1, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(doubleAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(Literal(1) === doubleAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(1) <=> doubleAttr, classOf[Eq[_]], resultFun(1))
-        checkFilterPredicate(Literal(2) > doubleAttr, classOf[Lt[_]], resultFun(1))
-        checkFilterPredicate(Literal(3) < doubleAttr, classOf[Gt[_]], resultFun(4))
-        checkFilterPredicate(Literal(1) >= doubleAttr, classOf[LtEq[_]], resultFun(1))
-        checkFilterPredicate(Literal(4) <= doubleAttr, classOf[GtEq[_]], resultFun(4))
-
-        checkFilterPredicate(!(doubleAttr < 4), classOf[GtEq[_]], resultFun(4))
-        checkFilterPredicate(doubleAttr < 2 || doubleAttr > 3, classOf[Operators.Or],
-          Seq(Row(resultFun(1)), Row(resultFun(4))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val doubleAttr = df(colName).expr
+      assert(df(colName).expr.dataType === DoubleType)
+
+      checkFilterPredicate(doubleAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(doubleAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(doubleAttr === 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(doubleAttr <=> 1, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(doubleAttr =!= 1, classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i))))
+
+      checkFilterPredicate(doubleAttr < 2, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(doubleAttr > 3, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(doubleAttr <= 1, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(doubleAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(Literal(1) === doubleAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(1) <=> doubleAttr, classOf[Eq[_]], resultFun(1))
+      checkFilterPredicate(Literal(2) > doubleAttr, classOf[Lt[_]], resultFun(1))
+      checkFilterPredicate(Literal(3) < doubleAttr, classOf[Gt[_]], resultFun(4))
+      checkFilterPredicate(Literal(1) >= doubleAttr, classOf[LtEq[_]], resultFun(1))
+      checkFilterPredicate(Literal(4) <= doubleAttr, classOf[GtEq[_]], resultFun(4))
+
+      checkFilterPredicate(!(doubleAttr < 4), classOf[GtEq[_]], resultFun(4))
+      checkFilterPredicate(doubleAttr < 2 || doubleAttr > 3, classOf[Operators.Or],
+        Seq(Row(resultFun(1)), Row(resultFun(4))))
     }
   }
 
   test("filter pushdown - string") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toString)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val stringAttr = df(colName).expr
-        assert(df(colName).expr.dataType === StringType)
-
-        checkFilterPredicate(stringAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(stringAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i.toString))))
-
-        checkFilterPredicate(stringAttr === "1", classOf[Eq[_]], resultFun("1"))
-        checkFilterPredicate(stringAttr <=> "1", classOf[Eq[_]], resultFun("1"))
-        checkFilterPredicate(stringAttr =!= "1", classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i.toString))))
-
-        checkFilterPredicate(stringAttr < "2", classOf[Lt[_]], resultFun("1"))
-        checkFilterPredicate(stringAttr > "3", classOf[Gt[_]], resultFun("4"))
-        checkFilterPredicate(stringAttr <= "1", classOf[LtEq[_]], resultFun("1"))
-        checkFilterPredicate(stringAttr >= "4", classOf[GtEq[_]], resultFun("4"))
-
-        checkFilterPredicate(Literal("1") === stringAttr, classOf[Eq[_]], resultFun("1"))
-        checkFilterPredicate(Literal("1") <=> stringAttr, classOf[Eq[_]], resultFun("1"))
-        checkFilterPredicate(Literal("2") > stringAttr, classOf[Lt[_]], resultFun("1"))
-        checkFilterPredicate(Literal("3") < stringAttr, classOf[Gt[_]], resultFun("4"))
-        checkFilterPredicate(Literal("1") >= stringAttr, classOf[LtEq[_]], resultFun("1"))
-        checkFilterPredicate(Literal("4") <= stringAttr, classOf[GtEq[_]], resultFun("4"))
-
-        checkFilterPredicate(!(stringAttr < "4"), classOf[GtEq[_]], resultFun("4"))
-        checkFilterPredicate(stringAttr < "2" || stringAttr > "3", classOf[Operators.Or],
-          Seq(Row(resultFun("1")), Row(resultFun("4"))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val stringAttr = df(colName).expr
+      assert(df(colName).expr.dataType === StringType)
+
+      checkFilterPredicate(stringAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(stringAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i.toString))))
+
+      checkFilterPredicate(stringAttr === "1", classOf[Eq[_]], resultFun("1"))
+      checkFilterPredicate(stringAttr <=> "1", classOf[Eq[_]], resultFun("1"))
+      checkFilterPredicate(stringAttr =!= "1", classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i.toString))))
+
+      checkFilterPredicate(stringAttr < "2", classOf[Lt[_]], resultFun("1"))
+      checkFilterPredicate(stringAttr > "3", classOf[Gt[_]], resultFun("4"))
+      checkFilterPredicate(stringAttr <= "1", classOf[LtEq[_]], resultFun("1"))
+      checkFilterPredicate(stringAttr >= "4", classOf[GtEq[_]], resultFun("4"))
+
+      checkFilterPredicate(Literal("1") === stringAttr, classOf[Eq[_]], resultFun("1"))
+      checkFilterPredicate(Literal("1") <=> stringAttr, classOf[Eq[_]], resultFun("1"))
+      checkFilterPredicate(Literal("2") > stringAttr, classOf[Lt[_]], resultFun("1"))
+      checkFilterPredicate(Literal("3") < stringAttr, classOf[Gt[_]], resultFun("4"))
+      checkFilterPredicate(Literal("1") >= stringAttr, classOf[LtEq[_]], resultFun("1"))
+      checkFilterPredicate(Literal("4") <= stringAttr, classOf[GtEq[_]], resultFun("4"))
+
+      checkFilterPredicate(!(stringAttr < "4"), classOf[GtEq[_]], resultFun("4"))
+      checkFilterPredicate(stringAttr < "2" || stringAttr > "3", classOf[Operators.Or],
+        Seq(Row(resultFun("1")), Row(resultFun("4"))))
     }
   }
 
@@ -501,38 +508,37 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
     }
 
     val data = (1 to 4).map(i => Tuple1(Option(i.b)))
-    import testImplicits._
-    withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
-      withParquetDataFrame(inputDF) { implicit df =>
-        val binaryAttr: Expression = df(colName).expr
-        assert(df(colName).expr.dataType === BinaryType)
-
-        checkFilterPredicate(binaryAttr === 1.b, classOf[Eq[_]], resultFun(1.b))
-        checkFilterPredicate(binaryAttr <=> 1.b, classOf[Eq[_]], resultFun(1.b))
-
-        checkFilterPredicate(binaryAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-        checkFilterPredicate(binaryAttr.isNotNull, classOf[NotEq[_]],
-          (1 to 4).map(i => Row.apply(resultFun(i.b))))
-
-        checkFilterPredicate(binaryAttr =!= 1.b, classOf[NotEq[_]],
-          (2 to 4).map(i => Row.apply(resultFun(i.b))))
-
-        checkFilterPredicate(binaryAttr < 2.b, classOf[Lt[_]], resultFun(1.b))
-        checkFilterPredicate(binaryAttr > 3.b, classOf[Gt[_]], resultFun(4.b))
-        checkFilterPredicate(binaryAttr <= 1.b, classOf[LtEq[_]], resultFun(1.b))
-        checkFilterPredicate(binaryAttr >= 4.b, classOf[GtEq[_]], resultFun(4.b))
-
-        checkFilterPredicate(Literal(1.b) === binaryAttr, classOf[Eq[_]], resultFun(1.b))
-        checkFilterPredicate(Literal(1.b) <=> binaryAttr, classOf[Eq[_]], resultFun(1.b))
-        checkFilterPredicate(Literal(2.b) > binaryAttr, classOf[Lt[_]], resultFun(1.b))
-        checkFilterPredicate(Literal(3.b) < binaryAttr, classOf[Gt[_]], resultFun(4.b))
-        checkFilterPredicate(Literal(1.b) >= binaryAttr, classOf[LtEq[_]], resultFun(1.b))
-        checkFilterPredicate(Literal(4.b) <= binaryAttr, classOf[GtEq[_]], resultFun(4.b))
-
-        checkFilterPredicate(!(binaryAttr < 4.b), classOf[GtEq[_]], resultFun(4.b))
-        checkFilterPredicate(binaryAttr < 2.b || binaryAttr > 3.b, classOf[Operators.Or],
-          Seq(Row(resultFun(1.b)), Row(resultFun(4.b))))
-      }
+    withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+      implicit val df: DataFrame = inputDF
+
+      val binaryAttr: Expression = df(colName).expr
+      assert(df(colName).expr.dataType === BinaryType)
+
+      checkFilterPredicate(binaryAttr === 1.b, classOf[Eq[_]], resultFun(1.b))
+      checkFilterPredicate(binaryAttr <=> 1.b, classOf[Eq[_]], resultFun(1.b))
+
+      checkFilterPredicate(binaryAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(binaryAttr.isNotNull, classOf[NotEq[_]],
+        (1 to 4).map(i => Row.apply(resultFun(i.b))))
+
+      checkFilterPredicate(binaryAttr =!= 1.b, classOf[NotEq[_]],
+        (2 to 4).map(i => Row.apply(resultFun(i.b))))
+
+      checkFilterPredicate(binaryAttr < 2.b, classOf[Lt[_]], resultFun(1.b))
+      checkFilterPredicate(binaryAttr > 3.b, classOf[Gt[_]], resultFun(4.b))
+      checkFilterPredicate(binaryAttr <= 1.b, classOf[LtEq[_]], resultFun(1.b))
+      checkFilterPredicate(binaryAttr >= 4.b, classOf[GtEq[_]], resultFun(4.b))
+
+      checkFilterPredicate(Literal(1.b) === binaryAttr, classOf[Eq[_]], resultFun(1.b))
+      checkFilterPredicate(Literal(1.b) <=> binaryAttr, classOf[Eq[_]], resultFun(1.b))
+      checkFilterPredicate(Literal(2.b) > binaryAttr, classOf[Lt[_]], resultFun(1.b))
+      checkFilterPredicate(Literal(3.b) < binaryAttr, classOf[Gt[_]], resultFun(4.b))
+      checkFilterPredicate(Literal(1.b) >= binaryAttr, classOf[LtEq[_]], resultFun(1.b))
+      checkFilterPredicate(Literal(4.b) <= binaryAttr, classOf[GtEq[_]], resultFun(4.b))
+
+      checkFilterPredicate(!(binaryAttr < 4.b), classOf[GtEq[_]], resultFun(4.b))
+      checkFilterPredicate(binaryAttr < 2.b || binaryAttr > 3.b, classOf[Operators.Or],
+        Seq(Row(resultFun(1.b)), Row(resultFun(4.b))))
     }
   }
 
@@ -546,56 +552,57 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
 
     Seq(false, true).foreach { java8Api =>
       withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
-        val df = data.map(i => Tuple1(Date.valueOf(i))).toDF()
-        withNestedDataFrame(df) { case (inputDF, colName, fun) =>
+        val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
+        withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
+          implicit val df: DataFrame = inputDF
+
           def resultFun(dateStr: String): Any = {
             val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
             fun(parsed)
           }
-          withParquetDataFrame(inputDF) { implicit df =>
-            val dateAttr: Expression = df(colName).expr
-            assert(df(colName).expr.dataType === DateType)
-
-            checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-            checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
-              data.map(i => Row.apply(resultFun(i))))
-
-            checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
-              Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
-
-            checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
-              resultFun("2018-03-21"))
-            checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
-              resultFun("2018-03-21"))
-
-            checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
-              resultFun("2018-03-21"))
-            checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
-              resultFun("2018-03-18"))
-            checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
-              resultFun("2018-03-21"))
-
-            checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
-              resultFun("2018-03-21"))
-            checkFilterPredicate(
-              dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
-              classOf[Operators.Or],
-              Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
-          }
+
+          val dateAttr: Expression = df(colName).expr
+          assert(df(colName).expr.dataType === DateType)
+
+          checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+          checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
+            data.map(i => Row.apply(resultFun(i))))
+
+          checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
+            Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
+
+          checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
+            resultFun("2018-03-21"))
+          checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
+            resultFun("2018-03-21"))
+
+          checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
+            resultFun("2018-03-21"))
+          checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
+            resultFun("2018-03-18"))
+          checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
+            resultFun("2018-03-21"))
+
+          checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
+            resultFun("2018-03-21"))
+          checkFilterPredicate(
+            dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
+            classOf[Operators.Or],
+            Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
         }
       }
     }
@@ -603,7 +610,9 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
 
   test("filter pushdown - timestamp") {
     Seq(true, false).foreach { java8Api =>
-      withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+      withSQLConf(
+        SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+        SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
         // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
         val millisData = Seq(
           "1000-06-14 08:28:53.123",
@@ -630,11 +639,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
         withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
           ParquetOutputTimestampType.INT96.toString) {
           import testImplicits._
-          withParquetDataFrame(
-            millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF()) { implicit df =>
-            val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
-            assertResult(None) {
-              createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+          withTempPath { file =>
+            millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF
+              .write.format(dataSourceName).save(file.getCanonicalPath)
+            readParquetFile(file.getCanonicalPath) { df =>
+              val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
+              assertResult(None) {
+                createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+              }
             }
           }
         }
@@ -653,36 +665,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
         val rdd =
           spark.sparkContext.parallelize((1 to 4).map(i => Row(new java.math.BigDecimal(i))))
         val dataFrame = spark.createDataFrame(rdd, StructType.fromDDL(s"a decimal($precision, 2)"))
-        withNestedDataFrame(dataFrame) { case (inputDF, colName, resultFun) =>
-          withParquetDataFrame(inputDF) { implicit df =>
-            val decimalAttr: Expression = df(colName).expr
-            assert(df(colName).expr.dataType === DecimalType(precision, 2))
-
-            checkFilterPredicate(decimalAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
-            checkFilterPredicate(decimalAttr.isNotNull, classOf[NotEq[_]],
-              (1 to 4).map(i => Row.apply(resultFun(i))))
-
-            checkFilterPredicate(decimalAttr === 1, classOf[Eq[_]], resultFun(1))
-            checkFilterPredicate(decimalAttr <=> 1, classOf[Eq[_]], resultFun(1))
-            checkFilterPredicate(decimalAttr =!= 1, classOf[NotEq[_]],
-              (2 to 4).map(i => Row.apply(resultFun(i))))
-
-            checkFilterPredicate(decimalAttr < 2, classOf[Lt[_]], resultFun(1))
-            checkFilterPredicate(decimalAttr > 3, classOf[Gt[_]], resultFun(4))
-            checkFilterPredicate(decimalAttr <= 1, classOf[LtEq[_]], resultFun(1))
-            checkFilterPredicate(decimalAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
-            checkFilterPredicate(Literal(1) === decimalAttr, classOf[Eq[_]], resultFun(1))
-            checkFilterPredicate(Literal(1) <=> decimalAttr, classOf[Eq[_]], resultFun(1))
-            checkFilterPredicate(Literal(2) > decimalAttr, classOf[Lt[_]], resultFun(1))
-            checkFilterPredicate(Literal(3) < decimalAttr, classOf[Gt[_]], resultFun(4))
-            checkFilterPredicate(Literal(1) >= decimalAttr, classOf[LtEq[_]], resultFun(1))
-            checkFilterPredicate(Literal(4) <= decimalAttr, classOf[GtEq[_]], resultFun(4))
-
-            checkFilterPredicate(!(decimalAttr < 4), classOf[GtEq[_]], resultFun(4))
-            checkFilterPredicate(decimalAttr < 2 || decimalAttr > 3, classOf[Operators.Or],
-              Seq(Row(resultFun(1)), Row(resultFun(4))))
-          }
+        withNestedParquetDataFrame(dataFrame) { case (inputDF, colName, resultFun) =>
+          implicit val df: DataFrame = inputDF
+
+          val decimalAttr: Expression = df(colName).expr
+          assert(df(colName).expr.dataType === DecimalType(precision, 2))
+
+          checkFilterPredicate(decimalAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+          checkFilterPredicate(decimalAttr.isNotNull, classOf[NotEq[_]],
+            (1 to 4).map(i => Row.apply(resultFun(i))))
+
+          checkFilterPredicate(decimalAttr === 1, classOf[Eq[_]], resultFun(1))
+          checkFilterPredicate(decimalAttr <=> 1, classOf[Eq[_]], resultFun(1))
+          checkFilterPredicate(decimalAttr =!= 1, classOf[NotEq[_]],
+            (2 to 4).map(i => Row.apply(resultFun(i))))
+
+          checkFilterPredicate(decimalAttr < 2, classOf[Lt[_]], resultFun(1))
+          checkFilterPredicate(decimalAttr > 3, classOf[Gt[_]], resultFun(4))
+          checkFilterPredicate(decimalAttr <= 1, classOf[LtEq[_]], resultFun(1))
+          checkFilterPredicate(decimalAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+          checkFilterPredicate(Literal(1) === decimalAttr, classOf[Eq[_]], resultFun(1))
+          checkFilterPredicate(Literal(1) <=> decimalAttr, classOf[Eq[_]], resultFun(1))
+          checkFilterPredicate(Literal(2) > decimalAttr, classOf[Lt[_]], resultFun(1))
+          checkFilterPredicate(Literal(3) < decimalAttr, classOf[Gt[_]], resultFun(4))
+          checkFilterPredicate(Literal(1) >= decimalAttr, classOf[LtEq[_]], resultFun(1))
+          checkFilterPredicate(Literal(4) <= decimalAttr, classOf[GtEq[_]], resultFun(4))
+
+          checkFilterPredicate(!(decimalAttr < 4), classOf[GtEq[_]], resultFun(4))
+          checkFilterPredicate(decimalAttr < 2 || decimalAttr > 3, classOf[Operators.Or],
+            Seq(Row(resultFun(1)), Row(resultFun(4))))
         }
       }
     }
@@ -1195,8 +1207,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
   }
 
   test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {
-    import testImplicits._
-    withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i))).toDF()) { implicit df =>
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
       // Here the schema becomes as below:
       //
       // root
@@ -1336,10 +1347,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
   }
 
   test("filter pushdown - StringStartsWith") {
-    withParquetDataFrame {
-      import testImplicits._
-      (1 to 4).map(i => Tuple1(i + "str" + i)).toDF()
-    } { implicit df =>
+    withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df =>
       checkFilterPredicate(
         '_1.startsWith("").asInstanceOf[Predicate],
         classOf[UserDefinedByInstance[_, _]],
@@ -1385,10 +1393,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
     }
 
     // SPARK-28371: make sure filter is null-safe.
-    withParquetDataFrame {
-      import testImplicits._
-      Seq(Tuple1[String](null)).toDF()
-    } { implicit df =>
+    withParquetDataFrame(Seq(Tuple1[String](null))) { implicit df =>
       checkFilterPredicate(
         '_1.startsWith("blah").asInstanceOf[Predicate],
         classOf[UserDefinedByInstance[_, _]],
@@ -1607,7 +1612,7 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
       expected: Seq[Row]): Unit = {
     val output = predicate.collect { case a: Attribute => a }.distinct
 
-    Seq(("parquet", true), ("", false)).map { case (pushdownDsList, nestedPredicatePushdown) =>
+    Seq(("parquet", true), ("", false)).foreach { case (pushdownDsList, nestedPredicatePushdown) =>
       withSQLConf(
         SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
         SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 79c3297..2dc8a06 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -85,7 +85,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
    * Writes `data` to a Parquet file, reads it back and check file contents.
    */
   protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: Seq[T]): Unit = {
-    withParquetDataFrame(data.toDF())(r => checkAnswer(r, data.map(Row.fromTuple)))
+    withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
   }
 
   test("basic data types (without binary)") {
@@ -97,7 +97,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
 
   test("raw binary") {
     val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
-    withParquetDataFrame(data.toDF()) { df =>
+    withParquetDataFrame(data) { df =>
       assertResult(data.map(_._1.mkString(",")).sorted) {
         df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
       }
@@ -200,7 +200,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
 
   testStandardAndLegacyModes("struct") {
     val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
-    withParquetDataFrame(data.toDF()) { df =>
+    withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(struct) =>
         Row(Row(struct.productIterator.toSeq: _*))
@@ -217,7 +217,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         )
       )
     }
-    withParquetDataFrame(data.toDF()) { df =>
+    withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(array) =>
         Row(array.map(struct => Row(struct.productIterator.toSeq: _*)))
@@ -236,7 +236,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         )
       )
     }
-    withParquetDataFrame(data.toDF()) { df =>
+    withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(array) =>
         Row(array.map { case Tuple1(Tuple1(str)) => Row(Row(str))})
@@ -246,7 +246,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
 
   testStandardAndLegacyModes("nested struct with array of array as field") {
     val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
-    withParquetDataFrame(data.toDF()) { df =>
+    withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(struct) =>
         Row(Row(struct.productIterator.toSeq: _*))
@@ -263,7 +263,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         )
       )
     }
-    withParquetDataFrame(data.toDF()) { df =>
+    withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(m) =>
         Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v })
@@ -280,7 +280,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         )
       )
     }
-    withParquetDataFrame(data.toDF()) { df =>
+    withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(m) =>
         Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
@@ -296,7 +296,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       null.asInstanceOf[java.lang.Float],
       null.asInstanceOf[java.lang.Double])
 
-    withParquetDataFrame((allNulls :: Nil).toDF()) { df =>
+    withParquetDataFrame(allNulls :: Nil) { df =>
       val rows = df.collect()
       assert(rows.length === 1)
       assert(rows.head === Row(Seq.fill(5)(null): _*))
@@ -309,7 +309,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       None.asInstanceOf[Option[Long]],
       None.asInstanceOf[Option[String]])
 
-    withParquetDataFrame((allNones :: Nil).toDF()) { df =>
+    withParquetDataFrame(allNones :: Nil) { df =>
       val rows = df.collect()
       assert(rows.length === 1)
       assert(rows.head === Row(Seq.fill(3)(null): _*))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 105f025..db8ee72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -63,18 +63,12 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
       (f: String => Unit): Unit = withDataSourceFile(data)(f)
 
   /**
-   * Writes `df` dataframe to a Parquet file and reads it back as a [[DataFrame]],
+   * Writes `data` to a Parquet file and reads it back as a [[DataFrame]],
    * which is then passed to `f`. The Parquet file will be deleted after `f` returns.
    */
-  protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true)
-      (f: DataFrame => Unit): Unit = {
-    withTempPath { file =>
-      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
-        df.write.format(dataSourceName).save(file.getCanonicalPath)
-      }
-      readFile(file.getCanonicalPath, testVectorized)(f)
-    }
-  }
+  protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
+      (data: Seq[T], testVectorized: Boolean = true)
+      (f: DataFrame => Unit): Unit = withDataSourceDataFrame(data, testVectorized)(f)
 
   /**
    * Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message