spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject [1/2] spark git commit: [SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager
Date Wed, 02 Mar 2016 18:26:50 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8f8d8a231 -> d6969ffc0


http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e4ab9ee..89b4270 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -172,9 +172,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
@@ -205,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
     store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
     assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
     assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -218,9 +218,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
-    store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster
= false)
+    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
@@ -265,9 +265,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
-    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -283,8 +283,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.getLocations("nonrddblock") should have size (1)
     }
 
-    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = true)
     store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
     master.getLocations(rdd(0, 0)) should have size 0
@@ -308,10 +308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // insert broadcast blocks in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
     }
 
     // verify whether the blocks exist in both the stores
@@ -366,7 +366,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
 
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -384,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -407,13 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.putIteratorAndReleaseLock(
+          store.putIterator(
             "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
         override def run() {
-          store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
         }
       }
       val t3 = new Thread {
@@ -441,11 +441,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
     val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
     val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
@@ -486,9 +486,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store2 = makeBlockManager(8000, "executor2")
     store3 = makeBlockManager(8000, "executor3")
     val list1 = List(new Array[Byte](4000))
-    store2.putIteratorAndReleaseLock(
+    store2.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store3.putIteratorAndReleaseLock(
+    store3.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
     store2.stop()
@@ -515,15 +515,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
-    store.putSingleAndReleaseLock("a2", a2, storageLevel)
-    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
+    store.putSingle("a2", a2, storageLevel)
+    store.putSingle("a3", a3, storageLevel)
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
@@ -534,9 +534,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
     // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and
2
     // from the same RDD
     assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
@@ -550,9 +550,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("in-memory LRU for partitions of multiple RDDs") {
     store = makeBlockManager(12000)
-    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -560,8 +560,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Do a get() on rdd_0_2 so that it is the most recently used item
     assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -576,9 +576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
@@ -607,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
-    store.putSingleAndReleaseLock("a2", a2, storageLevel)
-    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
+    store.putSingle("a2", a2, storageLevel)
+    store.putSingle("a3", a3, storageLevel)
     assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
     assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
     assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -624,15 +624,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
     val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
     // At this point LRU should not kick in because a3 is only on disk
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
-    store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
     assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
@@ -644,11 +644,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
@@ -658,7 +658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
@@ -674,11 +674,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
@@ -697,7 +697,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
@@ -722,9 +722,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("overly large block") {
     store = makeBlockManager(5000)
-    store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
-    store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
   }
@@ -733,7 +733,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     try {
       conf.set("spark.shuffle.compress", "true")
       store = makeBlockManager(20000, "exec1")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
@@ -742,7 +742,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.shuffle.compress", "false")
       store = makeBlockManager(20000, "exec2")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
@@ -751,7 +751,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "true")
       store = makeBlockManager(20000, "exec3")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
@@ -760,7 +760,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "false")
       store = makeBlockManager(20000, "exec4")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was
compressed")
       store.stop()
@@ -768,21 +768,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.rdd.compress", "true")
       store = makeBlockManager(20000, "exec5")
-      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
       store = makeBlockManager(20000, "exec6")
-      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
       store = makeBlockManager(20000, "exec7")
-      store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
       store.stop()
       store = null
@@ -810,7 +810,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     class UnserializableClass
     val a1 = new UnserializableClass
     intercept[java.io.NotSerializableException] {
-      store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
     }
 
     // Make sure get a1 doesn't hang and returns None.
@@ -882,7 +882,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks1.size === 1)
@@ -891,7 +891,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
     }
     assert(updatedBlocks2.size === 1)
@@ -900,7 +900,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks3.size === 2)
@@ -915,7 +915,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is
added
     val updatedBlocks4 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks4.size === 2)
@@ -931,7 +931,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // No updated blocks - list5 is too big to fit in store and nothing is kicked out
     val updatedBlocks5 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks5.size === 0)
@@ -956,11 +956,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -975,11 +975,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
@@ -1001,11 +1001,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -1015,11 +1015,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       === 1)
 
     // insert some more blocks
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -1030,7 +1030,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
@@ -1042,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
     store = makeBlockManager(12000)
-    store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
     assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1126,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
-    store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 *
0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
-    store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1170,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // would not know how to drop them from memory later.
     memoryStore.remove("b1")
     memoryStore.remove("b2")
-    store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
-    store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
+    store.putIterator("b1", smallIterator, memOnly)
+    store.putIterator("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1182,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.contains("b3"))
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.remove("b3")
-    store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
+    store.putIterator("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
     val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1209,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
-    store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
+    store.putIterator("b1", smallIterator, memAndDisk)
+    store.putIterator("b2", smallIterator, memAndDisk)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     // Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1223,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!diskStore.contains("b2"))
     assert(!diskStore.contains("b3"))
     memoryStore.remove("b3")
-    store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll huge block with not enough space. This should fail and drop the new block to
disk
@@ -1310,12 +1310,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val arr = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
     // This put should fail because both a1 and a2 should be read-locked:
-    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a3").isEmpty, "a3 was in store")
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
@@ -1324,7 +1324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.releaseLock("a2")
     // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it
before
     // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
-    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a2").isEmpty, "a2 was in store")
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a3").isDefined, "a3 was not in store")
@@ -1335,41 +1335,6 @@ private object BlockManagerSuite {
 
   private implicit class BlockManagerTestUtils(store: BlockManager) {
 
-    def putSingleAndReleaseLock(
-        block: BlockId,
-        value: Any,
-        storageLevel: StorageLevel,
-        tellMaster: Boolean): Unit = {
-      if (store.putSingle(block, value, storageLevel, tellMaster)) {
-        store.releaseLock(block)
-      }
-    }
-
-    def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel):
Unit = {
-      if (store.putSingle(block, value, storageLevel)) {
-        store.releaseLock(block)
-      }
-    }
-
-    def putIteratorAndReleaseLock(
-        blockId: BlockId,
-        values: Iterator[Any],
-        level: StorageLevel): Unit = {
-      if (store.putIterator(blockId, values, level)) {
-        store.releaseLock(blockId)
-      }
-    }
-
-    def putIteratorAndReleaseLock(
-        blockId: BlockId,
-        values: Iterator[Any],
-        level: StorageLevel,
-        tellMaster: Boolean): Unit = {
-      if (store.putIterator(blockId, values, level, tellMaster)) {
-        store.releaseLock(blockId)
-      }
-    }
-
     def dropFromMemoryIfExists(
         blockId: BlockId,
         data: () => Either[Array[Any], ByteBuffer]): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d6969ffc/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 3d9c085..e22e320 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,8 +91,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
     if (!putSucceeded) {
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level $storageLevel")
-    } else {
-      blockManager.releaseLock(blockId)
     }
     BlockManagerBasedStoreResult(blockId, numRecords)
   }
@@ -191,8 +189,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
-      } else {
-        blockManager.releaseLock(blockId)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message