spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Qiuzhuang Lian <qiuzhuang.l...@gmail.com>
Subject Fwd: question on removeRdd method in BlockManagerMasterActor.scala
Date Thu, 27 Feb 2014 09:32:01 GMT
Sorry, I should send to the new dev spark address instead.


Hi,

I have one question on removeRdd method in BlockManagerMasterActor.scala
about asking slave actor to remove RDD.

in this piece of code,

    Future.sequence(blockManagerInfo.values.map { bm =>
      bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
    }.toSeq)

it asks all blockManagerInfo to remove rdd. Shouldn't we
filter blockManagerInfo so as to only pick up the BlockManagerInfo which
did contains that RDD?

I did my changes to see if making sense,

E:\projects\amplab\spark>git diff
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
b/core/src/main/scold mode 10064
4
new mode 100755
index a999d76..fccc5a9
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -128,9 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
     // Find all blocks for the given RDD, remove the block from both
blockLocations and
     // the blockManagerInfo that is tracking the blocks.
     val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId ==
rddId)
+    val bmInfos = new
mutable.HashSet[BlockManagerMasterActor.BlockManagerInfo]
     blocks.foreach { blockId =>
       val bms: mutable.HashSet[BlockManagerId] =
blockLocations.get(blockId)
-      bms.foreach(bm =>
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+      bms.foreach{ bm =>
+        blockManagerInfo.get(bm).foreach{ bmInfo =>
+          bmInfos += bmInfo
+          bmInfo.removeBlock(blockId)
+        }
+      }
       blockLocations.remove(blockId)
     }

@@ -138,7 +144,7 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
     // The dispatcher is used as an implicit argument into the Future
sequence construction.
     import context.dispatcher
     val removeMsg = RemoveRdd(rddId)
-    Future.sequence(blockManagerInfo.values.map { bm =>
+    Future.sequence(bmInfos.map { bm =>
       bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
     }.toSeq)
   }


Thanks,
Qiuzhuang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message