spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable
Date Mon, 16 Nov 2015 22:21:33 GMT
Repository: spark
Updated Branches:
  refs/heads/master b1a966262 -> 985b38dd2


[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

…ishable

Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply

Author: Zee Chen <zeechen@us.ibm.com>

Closes #9679 from zeocio/spark-11390.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/985b38dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/985b38dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/985b38dd

Branch: refs/heads/master
Commit: 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181
Parents: b1a9662
Author: Zee Chen <zeechen@us.ibm.com>
Authored: Mon Nov 16 14:21:28 2015 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Mon Nov 16 14:21:28 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/ExistingRDD.scala  |  6 ++++--
 .../execution/datasources/DataSourceStrategy.scala    |  6 ++++--
 .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++++++++++++++
 3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 8b41d3d..62620ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
   def createFromDataSource(
       output: Seq[Attribute],
       rdd: RDD[InternalRow],
-      relation: BaseRelation): PhysicalRDD = {
-    PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation])
+      relation: BaseRelation,
+      extraInformation: String = ""): PhysicalRDD = {
+    PhysicalRDD(output, rdd, relation.toString + extraInformation,
+      relation.isInstanceOf[HadoopFsRelation])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bbbfa7..544d5ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     // `Filter`s or cannot be handled by `relation`.
     val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+    val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
+
     if (projects.map(_.toAttribute) == projects &&
         projectSet.size == projects.size &&
         filterSet.subsetOf(projectSet)) {
@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       val scan = execution.PhysicalRDD.createFromDataSource(
         projects.map(_.toAttribute),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation)
+        relation.relation, pushedFiltersString)
       filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
     } else {
       // Don't request columns that are only referenced by pushed filters.
@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       val scan = execution.PhysicalRDD.createFromDataSource(
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation)
+        relation.relation, pushedFiltersString)
       execution.Project(
         projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be53ec3..dfec139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
+  test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") {
+    withTempPath { file =>
+      val path = file.getCanonicalPath
+      testData.write.parquet(path)
+      val df = sqlContext.read.parquet(path)
+      sqlContext.registerDataFrameAsTable(df, "testPushed")
+
+      withTempTable("testPushed") {
+        val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
+        assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
+      }
+    }
+  }
+
   test("efficient limit -> project -> sort") {
     {
       val query =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message