spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host
Date Fri, 15 Nov 2019 19:37:50 GMT
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network
when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346979781
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##########
 @@ -420,12 +447,88 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  private[this] def fetchHostLocalBlock(
+      blockId: BlockId,
+      mapIndex: Int,
+      localDirs: Array[String],
+      blockManagerId: BlockManagerId): Boolean = {
+    try {
+      val buf = blockManager.getHostLocalShuffleData(blockId, localDirs)
+      buf.retain()
+      results.put(SuccessFetchResult(blockId, mapIndex, blockManagerId, buf.size(), buf,
+        isNetworkReqDone = false))
+      true
+    } catch {
+      case e: Exception =>
+        // If we see an exception, stop immediately.
+        logError(s"Error occurred while fetching local blocks", e)
+        results.put(FailureFetchResult(blockId, mapIndex, blockManagerId, e))
+        false
+    }
+  }
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input stream, so all
we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks(hostLocalDirManager: HostLocalDirManager): Unit
= {
+    val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
+    val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
+      hostLocalBlocksByExecutor
+        .map { case (hostLocalBmId, bmInfos) =>
+          (hostLocalBmId, bmInfos, cachedDirsByExec.get(hostLocalBmId.executorId))
+        }.partition(_._3.isDefined)
+    val bmId = blockManager.blockManagerId
+    val immutableHostLocalBlocksWithoutDirs =
+      hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
+        hostLocalBmId -> bmInfos
+      }.toMap
+    if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
+      logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: "
+
+        s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
+      val hostLocalDirsCompletable = new CompletableFuture[util.Map[String, Array[String]]]()
+      hostLocalDirManager.getHostLocalDirs(
+        immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray,
+        hostLocalDirsCompletable)
+      hostLocalDirsCompletable.whenComplete((dirs, throwable) => {
+        if (dirs != null ) {
+          immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos)
=>
+            blockInfos.takeWhile { case (blockId, _, mapIndex) =>
+              fetchHostLocalBlock(
+                blockId,
+                mapIndex,
+                dirs.get(hostLocalBmId.executorId),
+                hostLocalBmId)
+            }
+          }
+        } else {
+          logError(s"Error occurred while fetching host local blocks", throwable)
+          val (hostLocalBmId, blockInfoSeq) = immutableHostLocalBlocksWithoutDirs.head
+          val (blockId, _, mapIndex) = blockInfoSeq.head
+          results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, throwable))
 
 Review comment:
   If I'm following the code correctly, you're requiring that when this feature is on, that
all reads from the same host are made from disk, right?
   
   Wouldn't it be better to allow remote requests if you detect that you're missing this information,
and replace pending remote requests with local ones when this RPC finishes? Not sure how easy
that would be though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message