Return-Path: X-Original-To: apmail-spark-reviews-archive@minotaur.apache.org Delivered-To: apmail-spark-reviews-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5D74217FC1 for ; Tue, 21 Oct 2014 01:57:38 +0000 (UTC) Received: (qmail 14566 invoked by uid 500); 21 Oct 2014 01:57:38 -0000 Delivered-To: apmail-spark-reviews-archive@spark.apache.org Received: (qmail 14547 invoked by uid 500); 21 Oct 2014 01:57:38 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 14534 invoked by uid 99); 21 Oct 2014 01:57:37 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Oct 2014 01:57:37 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B4A029350B1; Tue, 21 Oct 2014 01:57:37 +0000 (UTC) From: JoshRosen To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg... Content-Type: text/plain Message-Id: <20141021015737.B4A029350B1@tyr.zones.apache.org> Date: Tue, 21 Oct 2014 01:57:37 +0000 (UTC) Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2844#discussion_r19126189 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag]( for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { val pieceId = BroadcastBlockId(id, "piece" + pid) - - // First try getLocalBytes because there is a chance that previous attempts to fetch the + logDebug(s"Reading piece $pieceId of $broadcastId") + // First try getLocalBytes because there is a chance that previous attempts to fetch the // broadcast blocks have already fetched some of the blocks. In that case, some blocks // would be available locally (on this executor). - var blockOpt = bm.getLocalBytes(pieceId) - if (!blockOpt.isDefined) { - blockOpt = bm.getRemoteBytes(pieceId) - blockOpt match { - case Some(block) => - // If we found the block from remote executors/driver's BlockManager, put the block - // in this executor's BlockManager. - SparkEnv.get.blockManager.putBytes( - pieceId, - block, - StorageLevel.MEMORY_AND_DISK_SER, - tellMaster = true) - - case None => - throw new SparkException("Failed to get " + pieceId + " of " + broadcastId) - } + val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse { + bm.getRemoteBytes(pieceId).map { block => --- End diff -- I pushed a new commit that simplifies this code. I think that the problem was the use of nested getOrElse calls. I replaced this with a series of `defs` that show how to get the bytes locally and remotely, followed by a non-nested `orElse` chain. I think this is a lot cleaner now, since the core logic is a one-liner: ```scala getLocal.orElse(getRemote).getOrElse( throw new SparkException(s"Failed to get $pieceId of $broadcastId")) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org