Return-Path: X-Original-To: apmail-spark-reviews-archive@minotaur.apache.org Delivered-To: apmail-spark-reviews-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CA6EC11E05 for ; Thu, 31 Jul 2014 05:53:18 +0000 (UTC) Received: (qmail 41698 invoked by uid 500); 31 Jul 2014 05:53:18 -0000 Delivered-To: apmail-spark-reviews-archive@spark.apache.org Received: (qmail 41676 invoked by uid 500); 31 Jul 2014 05:53:18 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: reviews@spark.apache.org Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 41650 invoked by uid 99); 31 Jul 2014 05:53:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Jul 2014 05:53:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 284E79BB0DB; Thu, 31 Jul 2014 05:53:18 +0000 (UTC) From: sarutak To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request: [SPARK-2670] FetchFailedException should be th... Content-Type: text/plain Message-Id: <20140731055318.284E79BB0DB@tyr.zones.apache.org> Date: Thu, 31 Jul 2014 05:53:18 +0000 (UTC) Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/1578#discussion_r15627125 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala --- @@ -199,15 +199,22 @@ object BlockFetcherIterator { // Get the local blocks while remote blocks are being fetched. Note that it's okay to do // these all at once because they will just memory-map some files, so they won't consume // any memory that might exceed our maxBytesInFlight - for (id <- localBlocksToFetch) { - getLocalFromDisk(id, serializer) match { - case Some(iter) => { - // Pass 0 as size since it's not in flight - results.put(new FetchResult(id, 0, () => iter)) - logDebug("Got local block " + id) - } - case None => { - throw new BlockException(id, "Could not get block " + id + " from local machine") + var fetchIndex = 0 + try { + for (id <- localBlocksToFetch) { + + // getLocalFromDisk never return None but throws BlockException + val iter = getLocalFromDisk(id, serializer).get + // Pass 0 as size since it's not in flight + results.put(new FetchResult(id, 0, () => iter)) + fetchIndex += 1 + logDebug("Got local block " + id) + } + } catch { + case e: Exception => { + logError(s"Error occurred while fetching local blocks", e) + for (id <- localBlocksToFetch.drop(fetchIndex)) { + results.put(new FetchResult(id, -1, null)) --- End diff -- Thank you for your comment, @mateiz . > I wouldn't do drop and such on a ConcurrentQueue, since it might drop stuff other threads were adding. Just do a results.put on the failed block and don't worry about dropping other ones. You can actually move the try/catch into the for loop and add a "return" at the bottom of the catch after adding this failing FetchResult. But, if it returns from getLocalBlocks immediately rest of FetchResults is not set to results, and we waits on results.take() in next method forever right? results is a instance of LinkedBlockingQueue and take method is blocking method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---