spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Date Tue, 18 Feb 2020 12:43:17 GMT
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication
failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r380647954
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 ##########
 @@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     }
   }
 
+  Seq(false, true).foreach { stream =>
+    test(s"test block replication failures when block is received " +
+      s"by remote block manager but putBlock fails (stream = $stream)") {
+      // Retry replication logic for 1 failure
+      conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1)
+      // Custom block replication policy which prioritizes BlockManagers as per hostnames
+      conf.set(STORAGE_REPLICATION_POLICY, classOf[SortOnHostNameBlockReplicationPolicy].getName)
+      // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0
+      val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512
+      conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, maxRemoteBlockSizeFetchToMem.toLong)
+
+      // Create 2 normal block manager
+      val store1 = makeBlockManager(10000, "host-1")
+      val store3 = makeBlockManager(10000, "host-3")
+
+      // create 1 faulty block manager by injecting faulty memory manager
+      val memManager = UnifiedMemoryManager(conf, numCores = 1)
+      val mockedMemoryManager = spy(memManager)
+      doAnswer(_ => false).when(mockedMemoryManager).acquireStorageMemory(any(), any(),
any())
+      val store2 = makeBlockManager(10000, "host-2", Some(mockedMemoryManager))
+
+      assert(master.getPeers(store1.blockManagerId).toSet ===
+        Set(store2.blockManagerId, store3.blockManagerId))
+
+      val blockId = "blockId"
+      val message = new Array[Byte](1000)
+
+      // Replication will be tried by store1 in this order: store2, store3
+      // store2 is faulty block manager, so it won't be able to put block
+      // Then store1 will try to replicate block on store3
+      store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2)
+
+      val blockLocations = master.getLocations(blockId).toSet
+      assert(blockLocations === Set(store1.blockManagerId, store3.blockManagerId))
 
 Review comment:
   I see. Make sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message