spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-8309] [CORE] Support for more than 12M items in OpenHashMap
Date Wed, 17 Jun 2015 08:42:46 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 877deb046 -> a5f602efc


[SPARK-8309] [CORE] Support for more than 12M items in OpenHashMap

The problem occurs because the position mask `0xEFFFFFF` is incorrect. It has zero 25th bit,
so when capacity grows beyond 2^24, `OpenHashMap` calculates incorrect index of value in `_values`
array.

I've also added a size check in `rehash()`, so that it fails instead of reporting invalid
item indices.

Author: Vyacheslav Baranov <slavik.baranov@gmail.com>

Closes #6763 from SlavikBaranov/SPARK-8309 and squashes the following commits:

8557445 [Vyacheslav Baranov] Resolved review comments
4d5b954 [Vyacheslav Baranov] Resolved review comments
eaf1e68 [Vyacheslav Baranov] Fixed failing test
f9284fd [Vyacheslav Baranov] Resolved review comments
3920656 [Vyacheslav Baranov] SPARK-8309: Support for more than 12M items in OpenHashMap

(cherry picked from commit c13da20a55b80b8632d547240d2c8f97539969a1)
Signed-off-by: Sean Owen <sowen@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5f602ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5f602ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5f602ef

Branch: refs/heads/branch-1.4
Commit: a5f602efcffea3da03f0cf828045b4e1b862fde8
Parents: 877deb0
Author: Vyacheslav Baranov <slavik.baranov@gmail.com>
Authored: Wed Jun 17 09:42:29 2015 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Wed Jun 17 09:42:41 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/util/collection/OpenHashSet.scala  | 10 +++++++---
 .../apache/spark/util/collection/OpenHashMapSuite.scala | 12 +++++++++++-
 .../util/collection/PrimitiveKeyOpenHashMapSuite.scala  |  2 +-
 3 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a5f602ef/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 1501111..60deb91 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -43,7 +43,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
     loadFactor: Double)
   extends Serializable {
 
-  require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+  require(initialCapacity <= OpenHashSet.MAX_CAPACITY,
+    s"Can't make capacity bigger than ${OpenHashSet.MAX_CAPACITY} elements")
   require(initialCapacity >= 1, "Invalid initial capacity")
   require(loadFactor < 1.0, "Load factor must be less than 1.0")
   require(loadFactor > 0.0, "Load factor must be greater than 0.0")
@@ -213,6 +214,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
    */
   private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit)
{
     val newCapacity = _capacity * 2
+    require(newCapacity > 0 && newCapacity <= OpenHashSet.MAX_CAPACITY,
+      s"Can't contain more than ${(loadFactor * OpenHashSet.MAX_CAPACITY).toInt} elements")
     allocateFunc(newCapacity)
     val newBitset = new BitSet(newCapacity)
     val newData = new Array[T](newCapacity)
@@ -266,9 +269,10 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
 private[spark]
 object OpenHashSet {
 
+  val MAX_CAPACITY = 1 << 30
   val INVALID_POS = -1
-  val NONEXISTENCE_MASK = 0x80000000
-  val POSITION_MASK = 0xEFFFFFF
+  val NONEXISTENCE_MASK = 1 << 31
+  val POSITION_MASK = (1 << 31) - 1
 
   /**
    * A set of specialized hash function implementation to avoid boxing hash code computation

http://git-wip-us.apache.org/repos/asf/spark/blob/a5f602ef/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
index 94e0117..3066e99 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
@@ -44,7 +44,7 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
     val goodMap3 = new OpenHashMap[String, String](256)
     assert(goodMap3.size === 0)
     intercept[IllegalArgumentException] {
-      new OpenHashMap[String, Int](1 << 30) // Invalid map size: bigger than 2^29
+      new OpenHashMap[String, Int](1 << 30 + 1) // Invalid map size: bigger than 2^30
     }
     intercept[IllegalArgumentException] {
       new OpenHashMap[String, Int](-1)
@@ -186,4 +186,14 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
     map(null) = 0
     assert(map.contains(null))
   }
+
+  test("support for more than 12M items") {
+    val cnt = 12000000 // 12M
+    val map = new OpenHashMap[Int, Int](cnt)
+    for (i <- 0 until cnt) {
+      map(i) = 1
+    }
+    val numInvalidValues = map.iterator.count(_._2 == 0)
+    assertResult(0)(numInvalidValues)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5f602ef/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala
index 462bc2f..508e737 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala
@@ -44,7 +44,7 @@ class PrimitiveKeyOpenHashMapSuite extends SparkFunSuite with Matchers {
     val goodMap3 = new PrimitiveKeyOpenHashMap[Int, Int](256)
     assert(goodMap3.size === 0)
     intercept[IllegalArgumentException] {
-      new PrimitiveKeyOpenHashMap[Int, Int](1 << 30) // Invalid map size: bigger than
2^29
+      new PrimitiveKeyOpenHashMap[Int, Int](1 << 30 + 1) // Invalid map size: bigger
than 2^30
     }
     intercept[IllegalArgumentException] {
       new PrimitiveKeyOpenHashMap[Int, Int](-1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message