spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/7] git commit: Fix some review comments
Date Thu, 10 Oct 2013 20:55:52 GMT
Fix some review comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0e40cfab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0e40cfab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0e40cfab

Branch: refs/heads/master
Commit: 0e40cfabf867469f988979decd9981adc03c90b3
Parents: b535db7
Author: Matei Zaharia <matei@eecs.berkeley.edu>
Authored: Wed Aug 14 11:45:21 2013 -0700
Committer: Matei Zaharia <matei@eecs.berkeley.edu>
Committed: Tue Oct 8 23:16:16 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     | 19 ++++++--------
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  2 --
 .../main/scala/spark/util/AppendOnlyMap.scala   | 27 +++++---------------
 .../scala/spark/util/AppendOnlyMapSuite.scala   | 23 +++++++++++++----
 4 files changed, 33 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0e40cfab/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index fa1419d..84e15fc 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,18 +17,15 @@
 
 package org.apache.spark
 
-import java.util.{HashMap => JHashMap}
+import org.apache.spark.util.AppendOnlyMap
 
-import scala.collection.JavaConversions._
-
-import spark.util.AppendOnlyMap
-
-/** A set of functions used to aggregate data.
-  *
-  * @param createCombiner function to create the initial value of the aggregation.
-  * @param mergeValue function to merge a new value into the aggregation result.
-  * @param mergeCombiners function to merge outputs from multiple mergeValue function.
-  */
+/**
+ * A set of functions used to aggregate data.
+ *
+ * @param createCombiner function to create the initial value of the aggregation.
+ * @param mergeValue function to merge a new value into the aggregation result.
+ * @param mergeCombiners function to merge outputs from multiple mergeValue function.
+ */
 case class Aggregator[K, V, C] (
     createCombiner: V => C,
     mergeValue: (C, V) => C,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0e40cfab/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index f6dd8a6..f41a023 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -18,9 +18,7 @@
 package org.apache.spark.rdd
 
 import java.io.{ObjectOutputStream, IOException}
-import java.util.{HashMap => JHashMap}
 
-import scala.collection.JavaConversions
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{Partition, Partitioner, SparkEnv, TaskContext}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0e40cfab/core/src/main/scala/spark/util/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/util/AppendOnlyMap.scala b/core/src/main/scala/spark/util/AppendOnlyMap.scala
index 416b93e..a7a8625 100644
--- a/core/src/main/scala/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/spark/util/AppendOnlyMap.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package spark.util
+package org.apache.spark.util
 
 /**
  * A simple open hash table optimized for the append-only use case, where keys
@@ -29,14 +29,10 @@ package spark.util
  */
 private[spark]
 class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable
{
-  if (!isPowerOf2(initialCapacity)) {
-    throw new IllegalArgumentException("Initial capacity must be power of 2")
-  }
-  if (initialCapacity >= (1 << 30)) {
-    throw new IllegalArgumentException("Can't make capacity bigger than 2^29 elements")
-  }
+  require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+  require(initialCapacity >= 1, "Invalid initial capacity")
 
-  private var capacity = initialCapacity
+  private var capacity = nextPowerOf2(initialCapacity)
   private var curSize = 0
 
   // Holds keys and values in the same array for memory locality; specifically, the order
of
@@ -225,17 +221,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
     capacity = newCapacity
   }
 
-  private def isPowerOf2(num: Int): Boolean = {
-    var n = num
-    while (n > 0) {
-      if (n == 1) {
-        return true
-      } else if (n % 2 == 1) {
-        return false
-      } else {
-        n /= 2
-      }
-    }
-    return false
+  private def nextPowerOf2(n: Int): Int = {
+    val highBit = Integer.highestOneBit(n)
+    if (highBit == n) n else highBit << 1
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0e40cfab/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala
index d1e3678..7177919 100644
--- a/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package spark.util
+package org.apache.spark.util
 
 import scala.collection.mutable.HashSet
 
@@ -25,16 +25,18 @@ class AppendOnlyMapSuite extends FunSuite {
   test("initialization") {
     val goodMap1 = new AppendOnlyMap[Int, Int](1)
     assert(goodMap1.size === 0)
-    val goodMap2 = new AppendOnlyMap[Int, Int](256)
+    val goodMap2 = new AppendOnlyMap[Int, Int](255)
     assert(goodMap2.size === 0)
+    val goodMap3 = new AppendOnlyMap[Int, Int](256)
+    assert(goodMap3.size === 0)
     intercept[IllegalArgumentException] {
-      new AppendOnlyMap[Int, Int](255)     // Invalid map size: not power of 2
+      new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
     }
     intercept[IllegalArgumentException] {
-      new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
+      new AppendOnlyMap[Int, Int](-1)
     }
     intercept[IllegalArgumentException] {
-      new AppendOnlyMap[Int, Int](-1)      // Invalid map size: not power of 2
+      new AppendOnlyMap[Int, Int](0)
     }
   }
 
@@ -138,4 +140,15 @@ class AppendOnlyMapSuite extends FunSuite {
     })
     assert(map.size === 401)
   }
+
+  test("inserting in capacity-1 map") {
+    val map = new AppendOnlyMap[String, String](1)
+    for (i <- 1 to 100) {
+      map("" + i) = "" + i
+    }
+    assert(map.size === 100)
+    for (i <- 1 to 100) {
+      assert(map("" + i) === "" + i)
+    }
+  }
 }


Mime
View raw message