spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [13/20] git commit: Renamed countDistinct and countDistinctByKey methods to include Approx
Date Wed, 01 Jan 2014 01:48:43 GMT
Renamed countDistinct and countDistinctByKey methods to include Approx


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a7de8e9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a7de8e9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a7de8e9b

Branch: refs/heads/master
Commit: a7de8e9b1c9859f45db4a620dd62a62d472d8396
Parents: d50ccc5
Author: Hossein Falaki <falaki@gmail.com>
Authored: Mon Dec 30 19:28:03 2013 -0800
Committer: Hossein Falaki <falaki@gmail.com>
Committed: Mon Dec 30 19:28:03 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/PairRDDFunctions.scala     | 10 +++++-----
 core/src/main/scala/org/apache/spark/rdd/RDD.scala        |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctionsSuite.scala      |  6 +++---
 core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala   | 10 +++++-----
 .../org/apache/spark/serializer/KryoSerializerSuite.scala |  2 +-
 5 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a7de8e9b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4e4f860..1dc5f8d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -217,7 +217,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * more accurate counts but increase the memory footprint and vise versa. Uses the provided
    * Partitioner to partition the output RDD.
    */
-  def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] =
{
+  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)]
= {
     val createHLL = (v: V) => {
       val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
       hll.value.offer(v)
@@ -242,8 +242,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * output RDD into numPartitions.
    *
    */
-  def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
-    countDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] =
{
+    countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
   }
 
   /**
@@ -254,8 +254,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
    * level.
    */
-  def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
-    countDistinctByKey(relativeSD, defaultPartitioner(self))
+  def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+    countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a7de8e9b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 136fa45..74fab48 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -797,7 +797,7 @@ abstract class RDD[T: ClassTag](
    * more accurate counts but increase the memory footprint and vise versa. The default value
of
    * relativeSD is 0.05.
    */
-  def countDistinct(relativeSD: Double = 0.05): Long = {
+  def countApproxDistinct(relativeSD: Double = 0.05): Long = {
 
     def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = {
       val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a7de8e9b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 6ad58b8..5da538a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -110,7 +110,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
   }
 
-  test("countDistinctByKey") {
+  test("countApproxDistinctByKey") {
     def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
 
     /* Since HyperLogLog unique counting is approximate, and the relative standard deviation
is
@@ -124,7 +124,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     // Therefore, the expected count for key i would be i.
     val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
     val rdd1 = sc.parallelize(stacked)
-    val counted1 = rdd1.countDistinctByKey(relativeSD).collect()
+    val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
     counted1.foreach{
       case(k, count) => assert(error(count, k) < relativeSD)
     }
@@ -137,7 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
       (1 to num).map(j => (num, j))
     }
     val rdd2 = sc.parallelize(randStacked)
-    val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect()
+    val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
     counted2.foreach{
       case(k, count) => assert(error(count, k) < relativeSD)
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a7de8e9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 2f81b81..1383359 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
-  test("countDistinct") {
+  test("countApproxDistinct") {
 
     def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
 
     val size = 100
     val uniformDistro = for (i <- 1 to 100000) yield i % size
     val simpleRdd = sc.makeRDD(uniformDistro)
-    assert(error(simpleRdd.countDistinct(0.2), size) < 0.2)
-    assert(error(simpleRdd.countDistinct(0.05), size) < 0.05)
-    assert(error(simpleRdd.countDistinct(0.01), size) < 0.01)
-    assert(error(simpleRdd.countDistinct(0.001), size) < 0.001)
+    assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
+    assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
+    assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
+    assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
   }
 
   test("SparkContext.union") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a7de8e9b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 1852971..636e3ab 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -173,7 +173,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   }
 
   test("kryo with SerializableHyperLogLog") {
-    assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3)
+    assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) ===
3)
   }
 
   test("kryo with reduce") {


Mime
View raw message