spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JoshRosen <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...
Date Tue, 21 Oct 2014 01:57:37 GMT
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19126189
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
         for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
           val pieceId = BroadcastBlockId(id, "piece" + pid)
    -
    -      // First try getLocalBytes because  there is a chance that previous attempts to
fetch the
    +      logDebug(s"Reading piece $pieceId of $broadcastId")
    +      // First try getLocalBytes because there is a chance that previous attempts to
fetch the
           // broadcast blocks have already fetched some of the blocks. In that case, some
blocks
           // would be available locally (on this executor).
    -      var blockOpt = bm.getLocalBytes(pieceId)
    -      if (!blockOpt.isDefined) {
    -        blockOpt = bm.getRemoteBytes(pieceId)
    -        blockOpt match {
    -          case Some(block) =>
    -            // If we found the block from remote executors/driver's BlockManager, put
the block
    -            // in this executor's BlockManager.
    -            SparkEnv.get.blockManager.putBytes(
    -              pieceId,
    -              block,
    -              StorageLevel.MEMORY_AND_DISK_SER,
    -              tellMaster = true)
    -
    -          case None =>
    -            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    -        }
    +      val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
    +        bm.getRemoteBytes(pieceId).map { block =>
    --- End diff --
    
    I pushed a new commit that simplifies this code.  I think that the problem was the use
of nested getOrElse calls.  I replaced this with a series of `defs` that show how to get the
bytes locally and remotely, followed by a non-nested `orElse` chain.  I think this is a lot
cleaner now, since the core logic is a one-liner:
    
    ```scala
    getLocal.orElse(getRemote).getOrElse(
            throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message