spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: SPARK-2685. Update ExternalAppendOnlyMap to avoid buffer.remove()
Date Tue, 05 Aug 2014 06:27:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master 05bf4e4af -> 066765d60


SPARK-2685. Update ExternalAppendOnlyMap to avoid buffer.remove()

Replaces this with an O(1) operation that does not have to shift over
the whole tail of the array into the gap produced by the element removed.

Author: Matei Zaharia <matei@databricks.com>

Closes #1773 from mateiz/SPARK-2685 and squashes the following commits:

1ea028a [Matei Zaharia] Update comments in StreamBuffer and EAOM, and reuse ArrayBuffers
eb1abfd [Matei Zaharia] Update ExternalAppendOnlyMap to avoid buffer.remove()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/066765d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/066765d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/066765d6

Branch: refs/heads/master
Commit: 066765d60d21b6b9943862b788e4a4bd07396e6c
Parents: 05bf4e4
Author: Matei Zaharia <matei@databricks.com>
Authored: Mon Aug 4 23:27:53 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Mon Aug 4 23:27:53 2014 -0700

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 50 ++++++++++++++------
 1 file changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/066765d6/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 5d10a1f..1f7d2dc 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -286,30 +286,32 @@ class ExternalAppendOnlyMap[K, V, C](
     private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
 
     inputStreams.foreach { it =>
-      val kcPairs = getMorePairs(it)
+      val kcPairs = new ArrayBuffer[(K, C)]
+      readNextHashCode(it, kcPairs)
       if (kcPairs.length > 0) {
         mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
       }
     }
 
     /**
-     * Fetch from the given iterator until a key of different hash is retrieved.
+     * Fill a buffer with the next set of keys with the same hash code from a given iterator.
We
+     * read streams one hash code at a time to ensure we don't miss elements when they are
merged.
+     *
+     * Assumes the given iterator is in sorted order of hash code.
      *
-     * In the event of key hash collisions, this ensures no pairs are hidden from being merged.
-     * Assume the given iterator is in sorted order.
+     * @param it iterator to read from
+     * @param buf buffer to write the results into
      */
-    private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
-      val kcPairs = new ArrayBuffer[(K, C)]
+    private def readNextHashCode(it: BufferedIterator[(K, C)], buf: ArrayBuffer[(K, C)]):
Unit = {
       if (it.hasNext) {
         var kc = it.next()
-        kcPairs += kc
+        buf += kc
         val minHash = hashKey(kc)
         while (it.hasNext && it.head._1.hashCode() == minHash) {
           kc = it.next()
-          kcPairs += kc
+          buf += kc
         }
       }
-      kcPairs
     }
 
     /**
@@ -321,7 +323,9 @@ class ExternalAppendOnlyMap[K, V, C](
       while (i < buffer.pairs.length) {
         val pair = buffer.pairs(i)
         if (pair._1 == key) {
-          buffer.pairs.remove(i)
+          // Note that there's at most one pair in the buffer with a given key, since we
always
+          // merge stuff in a map before spilling, so it's safe to return after the first
we find
+          removeFromBuffer(buffer.pairs, i)
           return mergeCombiners(baseCombiner, pair._2)
         }
         i += 1
@@ -330,6 +334,19 @@ class ExternalAppendOnlyMap[K, V, C](
     }
 
     /**
+     * Remove the index'th element from an ArrayBuffer in constant time, swapping another
element
+     * into its place. This is more efficient than the ArrayBuffer.remove method because
it does
+     * not have to shift all the elements in the array over. It works for our array buffers
because
+     * we don't care about the order of elements inside, we just want to search them for
a key.
+     */
+    private def removeFromBuffer[T](buffer: ArrayBuffer[T], index: Int): T = {
+      val elem = buffer(index)
+      buffer(index) = buffer(buffer.size - 1)  // This also works if index == buffer.size
- 1
+      buffer.reduceToSize(buffer.size - 1)
+      elem
+    }
+
+    /**
      * Return true if there exists an input stream that still has unvisited pairs.
      */
     override def hasNext: Boolean = mergeHeap.length > 0
@@ -346,7 +363,7 @@ class ExternalAppendOnlyMap[K, V, C](
       val minBuffer = mergeHeap.dequeue()
       val minPairs = minBuffer.pairs
       val minHash = minBuffer.minKeyHash
-      val minPair = minPairs.remove(0)
+      val minPair = removeFromBuffer(minPairs, 0)
       val minKey = minPair._1
       var minCombiner = minPair._2
       assert(hashKey(minPair) == minHash)
@@ -363,7 +380,7 @@ class ExternalAppendOnlyMap[K, V, C](
       // Repopulate each visited stream buffer and add it back to the queue if it is non-empty
       mergedBuffers.foreach { buffer =>
         if (buffer.isEmpty) {
-          buffer.pairs ++= getMorePairs(buffer.iterator)
+          readNextHashCode(buffer.iterator, buffer.pairs)
         }
         if (!buffer.isEmpty) {
           mergeHeap.enqueue(buffer)
@@ -375,10 +392,13 @@ class ExternalAppendOnlyMap[K, V, C](
 
     /**
      * A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash.
-     * Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to
-     * hash collisions, it is possible for multiple keys to be "tied" for being the lowest.
+     * Each buffer maintains all of the key-value pairs with what is currently the lowest
hash
+     * code among keys in the stream. There may be multiple keys if there are hash collisions.
+     * Note that because when we spill data out, we only spill one value for each key, there
is
+     * at most one element for each key.
      *
-     * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
+     * StreamBuffers are ordered by the minimum key hash currently available in their stream
so
+     * that we can put them into a heap and sort that.
      */
     private class StreamBuffer(
         val iterator: BufferedIterator[(K, C)],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message