spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-1912] Lazily initialize buffers for local shuffle blocks.
Date Fri, 29 Aug 2014 02:00:44 GMT
Repository: spark
Updated Branches:
  refs/heads/master 3c517a812 -> 665e71d14


[SPARK-1912] Lazily initialize buffers for local shuffle blocks.

This is a simplified fix for SPARK-1912.

Author: Reynold Xin <rxin@apache.org>

Closes #2179 from rxin/SPARK-1912 and squashes the following commits:

b2f0e9e [Reynold Xin] Fix unit tests.
a8eddfe [Reynold Xin] [SPARK-1912] Lazily initialize buffers for local shuffle blocks.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/665e71d1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/665e71d1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/665e71d1

Branch: refs/heads/master
Commit: 665e71d14debb8a7fc1547c614867a8c3b1f806a
Parents: 3c517a8
Author: Reynold Xin <rxin@apache.org>
Authored: Thu Aug 28 19:00:40 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Thu Aug 28 19:00:40 2014 -0700

----------------------------------------------------------------------
 .../spark/storage/BlockFetcherIterator.scala    |  5 +---
 .../org/apache/spark/storage/BlockManager.scala | 22 ++-------------
 .../storage/BlockFetcherIteratorSuite.scala     | 28 ++++++++++++--------
 3 files changed, 20 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/665e71d1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 4ab8ec8..d07e6a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -196,11 +196,8 @@ object BlockFetcherIterator {
       // any memory that might exceed our maxBytesInFlight
       for (id <- localBlocksToFetch) {
         try {
-          // getLocalFromDisk never return None but throws BlockException
-          val iter = getLocalFromDisk(id, serializer).get
-          // Pass 0 as size since it's not in flight
           readMetrics.localBlocksFetched += 1
-          results.put(new FetchResult(id, 0, () => iter))
+          results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get))
           logDebug("Got local block " + id)
         } catch {
           case e: Exception => {

http://git-wip-us.apache.org/repos/asf/spark/blob/665e71d1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1eb622c..cfe5b6c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1039,26 +1039,8 @@ private[spark] class BlockManager(
       bytes: ByteBuffer,
       serializer: Serializer = defaultSerializer): Iterator[Any] = {
     bytes.rewind()
-
-    def getIterator: Iterator[Any] = {
-      val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
-      serializer.newInstance().deserializeStream(stream).asIterator
-    }
-
-    if (blockId.isShuffle) {
-      /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
-       * at the beginning. The wrapping will cost some memory (compression instance
-       * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
-       * wrapping lazily to save memory. */
-      class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
-        lazy val proxy = f
-        override def hasNext: Boolean = proxy.hasNext
-        override def next(): Any = proxy.next()
-      }
-      new LazyProxyIterator(getIterator)
-    } else {
-      getIterator
-    }
+    val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
+    serializer.newInstance().deserializeStream(stream).asIterator
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/665e71d1/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
index 1591284..fbfcb51 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
@@ -76,20 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
 
     iterator.initialize()
 
-    // 3rd getLocalFromDisk invocation should be failed
-    verify(blockManager, times(3)).getLocalFromDisk(any(), any())
+    // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
+    verify(blockManager, times(0)).getLocalFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
     // the 2nd element of the tuple returned by iterator.next should be defined when fetching
successfully
-    assert(iterator.next._2.isDefined, "1st element should be defined but is not actually
defined") 
+    assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually
defined")
+    verify(blockManager, times(1)).getLocalFromDisk(any(), any())
+
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
-    assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually
defined") 
+    assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually
defined")
+    verify(blockManager, times(2)).getLocalFromDisk(any(), any())
+
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
     // 3rd fetch should be failed
-    assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually
defined") 
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
-    // Don't call next() after fetching non-defined element even if thare are rest of elements
in the iterator.
-    // Otherwise, BasicBlockFetcherIterator hangs up.
+    intercept[Exception] {
+      iterator.next()
+    }
+    verify(blockManager, times(3)).getLocalFromDisk(any(), any())
   }
 
 
@@ -127,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
 
     iterator.initialize()
 
-    // getLocalFromDis should be invoked for all of 5 blocks
-    verify(blockManager, times(5)).getLocalFromDisk(any(), any())
+    // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
+    verify(blockManager, times(0)).getLocalFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
     assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is
not actually defined") 
@@ -139,7 +143,9 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
     assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is
not actually defined") 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
-    assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is
not actually defined") 
+    assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is
not actually defined")
+
+    verify(blockManager, times(5)).getLocalFromDisk(any(), any())
   }
 
   test("block fetch from remote fails using BasicBlockFetcherIterator") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message