spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-2441][SQL] Add more efficient distinct operator.
Date Sat, 12 Jul 2014 19:08:38 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 354ce4d30 -> 37e49433a


[SPARK-2441][SQL] Add more efficient distinct operator.

Author: Michael Armbrust <michael@databricks.com>

Closes #1366 from marmbrus/partialDistinct and squashes the following commits:

12a31ab [Michael Armbrust] Add more efficient distinct operator.

(cherry picked from commit 7e26b57615f6c1d3f9058f9c19c05ec91f017f4c)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37e49433
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37e49433
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37e49433

Branch: refs/heads/branch-1.0
Commit: 37e49433a89535cb1defa67e3551b02ced1d445c
Parents: 354ce4d
Author: Michael Armbrust <michael@databricks.com>
Authored: Sat Jul 12 12:07:27 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Sat Jul 12 12:08:35 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   |  4 +--
 .../spark/sql/execution/basicOperators.scala    | 33 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37e49433/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index bd8ae4c..b6eb180 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -210,8 +210,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan]
{
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.Distinct(child) =>
-        execution.Aggregate(
-          partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
+        execution.Distinct(partial = false,
+          execution.Distinct(partial = true, planLater(child))) :: Nil
       case logical.Sort(sortExprs, child) =>
         // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
         execution.Sort(sortExprs, global = true, planLater(child)):: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/37e49433/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 1c5592b..b60509e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution,
UnspecifiedDistribution}
 import org.apache.spark.util.MutablePair
 
 /**
@@ -248,4 +248,35 @@ object ExistingRdd {
 case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
   override def execute() = rdd
 }
+/**
+ * :: DeveloperApi ::
+ * Computes the set of distinct input rows using a HashSet.
+ * @param partial when true the distinct operation is performed partially, per partition,
without
+ *                shuffling the data.
+ * @param child the input query plan.
+ */
+@DeveloperApi
+case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
+  override def output = child.output
+
+  override def requiredChildDistribution =
+    if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output)
:: Nil
+
+  override def execute() = {
+    child.execute().mapPartitions { iter =>
+      val hashSet = new scala.collection.mutable.HashSet[Row]()
+
+      var currentRow: Row = null
+      while (iter.hasNext) {
+        currentRow = iter.next()
+        if (!hashSet.contains(currentRow)) {
+          hashSet.add(currentRow.copy())
+        }
+      }
+
+      hashSet.iterator
+    }
+  }
+}
+
 


Mime
View raw message