spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [19/20] git commit: Made the code more compact and readable
Date Wed, 01 Jan 2014 01:48:49 GMT
Made the code more compact and readable


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bee445c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bee445c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bee445c9

Branch: refs/heads/master
Commit: bee445c927586136673f39259f23642a5a6e8efe
Parents: acb0323
Author: Hossein Falaki <falaki@gmail.com>
Authored: Tue Dec 31 16:58:18 2013 -0800
Committer: Hossein Falaki <falaki@gmail.com>
Committed: Tue Dec 31 16:58:18 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 12 ++----------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala    | 14 +-------------
 .../apache/spark/util/SerializableHyperLogLog.scala   |  5 +++++
 3 files changed, 8 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bee445c9/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 088b298..04a8d05 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -218,19 +218,11 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * Partitioner to partition the output RDD.
    */
   def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)]
= {
-    val createHLL = (v: V) => {
-      val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
-      hll.value.offer(v)
-      hll
-    }
-    val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => {
-      hll.value.offer(v)
-      hll
-    }
+    val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
+    val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
     val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
 
     combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bee445c9/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 161fd06..4960e6e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -798,20 +798,8 @@ abstract class RDD[T: ClassTag](
    * relativeSD is 0.05.
    */
   def countApproxDistinct(relativeSD: Double = 0.05): Long = {
-
-    def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = {
-      val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
-      while (iter.hasNext) {
-        val v = iter.next()
-        hllCounter.value.offer(v)
-      }
-      Iterator(hllCounter)
-    }
-    def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2)
-
     val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
-    mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, mergeCounters)
-      .value.cardinality()
+    aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bee445c9/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
index 9cfd414..8b4e7c1 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -30,6 +30,11 @@ class SerializableHyperLogLog(var value: ICardinality) extends Externalizable
{
 
   def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
 
+  def add[T](elem: T) = {
+    this.value.offer(elem)
+    this
+  }
+
   def readExternal(in: ObjectInput) {
     val byteLength = in.readInt()
     val bytes = new Array[Byte](byteLength)


Mime
View raw message