spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-20718][SQL] FileSourceScanExec with different filter orders should be the same after canonicalization
Date Fri, 12 May 2017 05:42:56 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2b36eb696 -> c8da53560


[SPARK-20718][SQL] FileSourceScanExec with different filter orders should be the same after
canonicalization

## What changes were proposed in this pull request?

Since `constraints` in `QueryPlan` is a set, the order of filters can differ. Usually this
is ok because of canonicalization. However, in `FileSourceScanExec`, its data filters and
partition filters are sequences, and their orders are not canonicalized. So `def sameResult`
returns different results for different orders of data/partition filters. This leads to, e.g.
different decision for `ReuseExchange`, and thus results in unstable performance.

## How was this patch tested?

Added a new test for `FileSourceScanExec.sameResult`.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17959 from wzhfy/canonicalizeFileSourceScanExec.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8da5356
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8da5356
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8da5356

Branch: refs/heads/master
Commit: c8da5356000c8e4ff9141e4a2892ebe0b9641d63
Parents: 2b36eb6
Author: wangzhenhua <wangzhenhua@huawei.com>
Authored: Fri May 12 13:42:48 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri May 12 13:42:48 2017 +0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 16 +++++--
 .../spark/sql/execution/SameResultSuite.scala   | 49 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8da5356/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 866fa98..251098c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
+trait DataSourceScanExec extends LeafExecNode with CodegenSupport with PredicateHelper {
   val relation: BaseRelation
   val metastoreTableIdentifier: Option[TableIdentifier]
 
@@ -519,8 +519,18 @@ case class FileSourceScanExec(
       relation,
       output.map(QueryPlan.normalizeExprId(_, output)),
       requiredSchema,
-      partitionFilters.map(QueryPlan.normalizeExprId(_, output)),
-      dataFilters.map(QueryPlan.normalizeExprId(_, output)),
+      canonicalizeFilters(partitionFilters, output),
+      canonicalizeFilters(dataFilters, output),
       None)
   }
+
+  private def canonicalizeFilters(filters: Seq[Expression], output: Seq[Attribute])
+    : Seq[Expression] = {
+    if (filters.nonEmpty) {
+      val normalizedFilters = QueryPlan.normalizeExprId(filters.reduce(And), output)
+      splitConjunctivePredicates(normalizedFilters)
+    } else {
+      Nil
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c8da5356/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
new file mode 100644
index 0000000..25e4ca0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Tests for the sameResult function for [[SparkPlan]]s.
+ */
+class SameResultSuite extends QueryTest with SharedSQLContext {
+
+  test("FileSourceScanExec: different orders of data filters and partition filters") {
+    withTempPath { path =>
+      val tmpDir = path.getCanonicalPath
+      spark.range(10)
+        .selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d")
+        .write
+        .partitionBy("a", "b")
+        .parquet(tmpDir)
+      val df = spark.read.parquet(tmpDir)
+      // partition filters: a > 1 AND b < 9
+      // data filters: c > 1 AND d < 9
+      val plan1 = getFileSourceScanExec(df.where("a > 1 AND b < 9 AND c > 1 AND
d < 9"))
+      val plan2 = getFileSourceScanExec(df.where("b < 9 AND a > 1 AND d < 9 AND
c > 1"))
+      assert(plan1.sameResult(plan2))
+    }
+  }
+
+  private def getFileSourceScanExec(df: DataFrame): FileSourceScanExec = {
+    df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+      .asInstanceOf[FileSourceScanExec]
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message