spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [31/50] git commit: Add support and test for null keys in ExternalAppendOnlyMap
Date Sat, 11 Jan 2014 00:25:48 GMT
Add support and test for null keys in ExternalAppendOnlyMap

Also add safeguard against use of destructively sorted AppendOnlyMap


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/53d8d366
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/53d8d366
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/53d8d366

Branch: refs/heads/master
Commit: 53d8d36684b16ae536a5e065e690bb21b9aadc49
Parents: 3ce22df
Author: Andrew Or <andrewor14@gmail.com>
Authored: Tue Dec 31 17:19:02 2013 -0800
Committer: Andrew Or <andrewor14@gmail.com>
Committed: Tue Dec 31 17:19:02 2013 -0800

----------------------------------------------------------------------
 .../spark/util/collection/AppendOnlyMap.scala   | 88 +++++++++++++-------
 .../util/collection/ExternalAppendOnlyMap.scala |  1 +
 .../util/collection/AppendOnlyMapSuite.scala    | 44 ++++++++++
 .../collection/ExternalAppendOnlyMapSuite.scala | 38 +++++++++
 4 files changed, 139 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/53d8d366/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index a32416a..d2a9574 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -48,10 +48,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
   private var haveNullValue = false
   private var nullValue: V = null.asInstanceOf[V]
 
+  // Triggered by destructiveSortedIterator; the underlying data array may no longer be used
+  private var destroyed = false
+
   private val LOAD_FACTOR = 0.7
 
   /** Get the value for a given key */
   def apply(key: K): V = {
+    checkValidityOrThrowException()
     val k = key.asInstanceOf[AnyRef]
     if (k.eq(null)) {
       return nullValue
@@ -75,6 +79,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
 
   /** Set the value for a key */
   def update(key: K, value: V): Unit = {
+    checkValidityOrThrowException()
     val k = key.asInstanceOf[AnyRef]
     if (k.eq(null)) {
       if (!haveNullValue) {
@@ -109,6 +114,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
    * for key, if any, or null otherwise. Returns the newly updated value.
    */
   def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
+    checkValidityOrThrowException()
     val k = key.asInstanceOf[AnyRef]
     if (k.eq(null)) {
       if (!haveNullValue) {
@@ -142,35 +148,38 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
   }
 
   /** Iterator method from Iterable */
-  override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] {
-    var pos = -1
+  override def iterator: Iterator[(K, V)] = {
+    checkValidityOrThrowException()
+    new Iterator[(K, V)] {
+      var pos = -1
 
-    /** Get the next value we should return from next(), or null if we're finished iterating
*/
-    def nextValue(): (K, V) = {
-      if (pos == -1) {    // Treat position -1 as looking at the null value
-        if (haveNullValue) {
-          return (null.asInstanceOf[K], nullValue)
+      /** Get the next value we should return from next(), or null if we're finished iterating
*/
+      def nextValue(): (K, V) = {
+        if (pos == -1) {    // Treat position -1 as looking at the null value
+          if (haveNullValue) {
+            return (null.asInstanceOf[K], nullValue)
+          }
+          pos += 1
         }
-        pos += 1
-      }
-      while (pos < capacity) {
-        if (!data(2 * pos).eq(null)) {
-          return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+        while (pos < capacity) {
+          if (!data(2 * pos).eq(null)) {
+            return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+          }
+          pos += 1
         }
-        pos += 1
+        null
       }
-      null
-    }
 
-    override def hasNext: Boolean = nextValue() != null
+      override def hasNext: Boolean = nextValue() != null
 
-    override def next(): (K, V) = {
-      val value = nextValue()
-      if (value == null) {
-        throw new NoSuchElementException("End of iterator")
+      override def next(): (K, V) = {
+        val value = nextValue()
+        if (value == null) {
+          throw new NoSuchElementException("End of iterator")
+        }
+        pos += 1
+        value
       }
-      pos += 1
-      value
     }
   }
 
@@ -238,12 +247,14 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
     if (highBit == n) n else highBit << 1
   }
 
-  /** Return an iterator of the map in sorted order. This provides a way to sort the map
without
-    * using additional memory, at the expense of destroying the validity of the map.
-    */
+  /**
+   * Return an iterator of the map in sorted order. This provides a way to sort the map without
+   * using additional memory, at the expense of destroying the validity of the map.
+   */
   def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
-    var keyIndex, newIndex = 0
+    destroyed = true
     // Pack KV pairs into the front of the underlying array
+    var keyIndex, newIndex = 0
     while (keyIndex < capacity) {
       if (data(2 * keyIndex) != null) {
         data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
@@ -251,23 +262,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] wi
       }
       keyIndex += 1
     }
-    assert(newIndex == curSize)
+    assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
+
     // Sort by the given ordering
     val rawOrdering = new Comparator[AnyRef] {
       def compare(x: AnyRef, y: AnyRef): Int = {
         cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
       }
     }
-    util.Arrays.sort(data, 0, curSize, rawOrdering)
+    util.Arrays.sort(data, 0, newIndex, rawOrdering)
 
     new Iterator[(K, V)] {
       var i = 0
-      def hasNext = i < curSize
+      var nullValueReady = haveNullValue
+      def hasNext: Boolean = (i < newIndex || nullValueReady)
       def next(): (K, V) = {
-        val item = data(i).asInstanceOf[(K, V)]
-        i += 1
-        item
+        if (nullValueReady) {
+          nullValueReady = false
+          (null.asInstanceOf[K], nullValue)
+        } else {
+          val item = data(i).asInstanceOf[(K, V)]
+          i += 1
+          item
+        }
       }
     }
   }
+
+  private def checkValidityOrThrowException(): Unit = {
+    if (destroyed) {
+      throw new IllegalStateException("Map state is invalid from destructive sorting!")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/53d8d366/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 317a6c1..492b4fc 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -112,6 +112,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
 
   private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
   private val oldMaps = new ArrayBuffer[DiskKGIterator]
+
   private val memoryThresholdMB = {
     val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
     val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/53d8d366/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 7e7aa78..71b936b 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
 import scala.collection.mutable.HashSet
 
 import org.scalatest.FunSuite
+import java.util.Comparator
 
 class AppendOnlyMapSuite extends FunSuite {
   test("initialization") {
@@ -151,4 +152,47 @@ class AppendOnlyMapSuite extends FunSuite {
       assert(map("" + i) === "" + i)
     }
   }
+
+  test("destructive sort") {
+    val map = new AppendOnlyMap[String, String]()
+    for (i <- 1 to 100) {
+      map("" + i) = "" + i
+    }
+    map.update(null, "happy new year!")
+
+    try {
+      map.apply("1")
+      map.update("1", "2013")
+      map.changeValue("1", (hadValue, oldValue) => "2014")
+      map.iterator
+    } catch {
+      case e: IllegalStateException => fail()
+    }
+
+    val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
+      def compare(kv1: (String, String), kv2: (String, String)): Int = {
+        val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
+        val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
+        x.compareTo(y)
+      }
+    })
+
+    // Should be sorted by key
+    assert(it.hasNext)
+    var previous = it.next()
+    assert(previous == (null, "happy new year!"))
+    previous = it.next()
+    assert(previous == ("1", "2014"))
+    while (it.hasNext) {
+      val kv = it.next()
+      assert(kv._1.toInt > previous._1.toInt)
+      previous = kv
+    }
+
+    // All subsequent calls to apply, update, changeValue and iterator should throw exception
+    intercept[IllegalStateException] { map.apply("1") }
+    intercept[IllegalStateException] { map.update("1", "2013") }
+    intercept[IllegalStateException] { map.changeValue("1", (hadValue, oldValue) => "2014")
}
+    intercept[IllegalStateException] { map.iterator }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/53d8d366/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 3bc88ca..baf94b4 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -113,6 +113,44 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter
with Local
     assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
   }
 
+  test("null keys and values") {
+    val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+      mergeValue, mergeCombiners)
+    map.insert(1, 5)
+    map.insert(2, 6)
+    map.insert(3, 7)
+    assert(map.size === 3)
+    assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+      (1, Seq[Int](5)),
+      (2, Seq[Int](6)),
+      (3, Seq[Int](7))
+    ))
+
+    // Null keys
+    val nullInt = null.asInstanceOf[Int]
+    map.insert(nullInt, 8)
+    assert(map.size === 4)
+    assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+      (1, Seq[Int](5)),
+      (2, Seq[Int](6)),
+      (3, Seq[Int](7)),
+      (nullInt, Seq[Int](8))
+    ))
+
+    // Null values
+    map.insert(4, nullInt)
+    map.insert(nullInt, nullInt)
+    assert(map.size === 5)
+    val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
+    assert(result == Set[(Int, Set[Int])](
+      (1, Set[Int](5)),
+      (2, Set[Int](6)),
+      (3, Set[Int](7)),
+      (4, Set[Int](nullInt)),
+      (nullInt, Set[Int](nullInt, 8))
+    ))
+  }
+
   test("simple aggregator") {
     // reduceByKey
     val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))


Mime
View raw message