spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-18973][SQL] Remove SortPartitions and RedistributeData
Date Thu, 22 Dec 2016 18:35:39 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ec0d6e21e -> f6853b3e5


[SPARK-18973][SQL] Remove SortPartitions and RedistributeData

## What changes were proposed in this pull request?
SortPartitions and RedistributeData logical operators are not actually used and can be removed.
Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions.

## How was this patch tested?
Also updated test cases to reflect the removal.

Author: Reynold Xin <rxin@databricks.com>

Closes #16381 from rxin/SPARK-18973.

(cherry picked from commit 2615100055860faa5f74d3711d4d15ebae6aba25)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6853b3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6853b3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6853b3e

Branch: refs/heads/branch-2.1
Commit: f6853b3e5a068c1bc972eae2370d8bd94026d682
Parents: ec0d6e2
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Dec 22 19:35:09 2016 +0100
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Thu Dec 22 19:35:32 2016 +0100

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  2 +-
 .../analysis/UnsupportedOperationChecker.scala  |  2 +-
 .../sql/catalyst/optimizer/Optimizer.scala      |  2 +-
 .../sql/catalyst/optimizer/expressions.scala    |  2 +-
 .../plans/logical/basicLogicalOperators.scala   | 22 +++++++++
 .../catalyst/plans/logical/partitioning.scala   | 49 --------------------
 .../analysis/UnsupportedOperationsSuite.scala   |  1 -
 .../spark/sql/execution/SparkStrategies.scala   |  4 --
 8 files changed, 26 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9ca9901..f17c372 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1057,7 +1057,7 @@ class Analyzer(
         case p: Sort =>
           failOnOuterReference(p)
           p
-        case p: RedistributeData =>
+        case p: RepartitionByExpression =>
           failOnOuterReference(p)
           p
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 60d9881..053c8eb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -166,7 +166,7 @@ object UnsupportedOperationChecker {
         case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming)
=>
           throwError("Limits are not supported on streaming DataFrames/Datasets")
 
-        case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
+        case Sort(_, _, _) if !containsCompleteData(subPlan) =>
           throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it
is on" +
             "aggregated DataFrame/Dataset in Complete output mode")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 75d9997..dfd66aa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -796,7 +796,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper
{
     case _: Distinct => true
     case _: Generate => true
     case _: Pivot => true
-    case _: RedistributeData => true
+    case _: RepartitionByExpression => true
     case _: Repartition => true
     case _: ScriptTransformation => true
     case _: Sort => true

http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 6958398..949ccdc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -489,7 +489,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
     case _: AppendColumns => true
     case _: AppendColumnsWithObject => true
     case _: BroadcastHint => true
-    case _: RedistributeData => true
+    case _: RepartitionByExpression => true
     case _: Repartition => true
     case _: Sort => true
     case _: TypedFilter => true

http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b4358c2..f51ed22 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -780,6 +780,28 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
 }
 
 /**
+ * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
+ * information about the number of partitions during execution. Used when a specific ordering
or
+ * distribution is expected by the consumer of the query result. Use [[Repartition]] for
RDD-like
+ * `coalesce` and `repartition`.
+ * If `numPartitions` is not specified, the number of partitions will be the number set by
+ * `spark.sql.shuffle.partitions`.
+ */
+case class RepartitionByExpression(
+    partitionExpressions: Seq[Expression],
+    child: LogicalPlan,
+    numPartitions: Option[Int] = None) extends UnaryNode {
+
+  numPartitions match {
+    case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
+    case None => // Ok
+  }
+
+  override def maxRows: Option[Long] = child.maxRows
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
  * A relation with one row. This is used in "SELECT ..." without a from clause.
  */
 case object OneRowRelation extends LeafNode {

http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
deleted file mode 100644
index 28cbce8..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder}
-
-/**
- * Performs a physical redistribution of the data.  Used when the consumer of the query
- * result have expectations about the distribution and ordering of partitioned input data.
- */
-abstract class RedistributeData extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
-
-case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
-  extends RedistributeData
-
-/**
- * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
- * information about the number of partitions during execution. Used when a specific ordering
or
- * distribution is expected by the consumer of the query result. Use [[Repartition]] for
RDD-like
- * `coalesce` and `repartition`.
- * If `numPartitions` is not specified, the number of partitions will be the number set by
- * `spark.sql.shuffle.partitions`.
- */
-case class RepartitionByExpression(
-    partitionExpressions: Seq[Expression],
-    child: LogicalPlan,
-    numPartitions: Option[Int] = None) extends RedistributeData {
-  numPartitions match {
-    case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
-    case None => // Ok
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 94a008f..d2c0f8c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -213,7 +213,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
 
 
   // Other unary operations
-  testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg
= "sort")
   testUnaryOperatorInStreamingPlan(
     "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
   testUnaryOperatorInStreamingPlan(

http://git-wip-us.apache.org/repos/asf/spark/blob/f6853b3e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b0bbcfc..ba82ec1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -376,10 +376,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         } else {
           execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
         }
-      case logical.SortPartitions(sortExprs, child) =>
-        // This sort only sorts tuples within a partition. Its requiredDistribution will
be
-        // an UnspecifiedDistribution.
-        execution.SortExec(sortExprs, global = false, child = planLater(child)) :: Nil
       case logical.Sort(sortExprs, global, child) =>
         execution.SortExec(sortExprs, global, planLater(child)) :: Nil
       case logical.Project(projectList, child) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message