spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject spark git commit: [SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job (branch-1.6 backport)
Date Thu, 22 Sep 2016 18:05:50 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 ce0a222f5 -> 94524cef4

[SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job (branch-1.6

This patch is a branch-1.6 backport of #15037:

## What changes were proposed in this pull request?

In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then
a remote copy, and only fall back to recomputing the block if no cached copy (local or remote)
can be read. This logic works correctly in the case where no remote copies of the block exist,
but if there _are_ remote copies and reads of those copies fail (due to network issues or
internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail
the task (and which could possibly fail the whole job if the read failures keep occurring).

In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire
job in case no remote blocks can be fetched, but this logic is inappropriate for reads of
cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable.

Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote
fetch errors and, instead, should handle failures by returning `None`.

## How was this patch tested?

Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old
tests expected exceptions to be thrown on failed remote reads, while the modified tests now
expect `None` to be returned from the `getRemote*` method.

I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`,
and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note
that these `None` branches are already exercised because the old `getRemoteBytes` returned
`None` when no remote locations for the block could be found (which could occur if an executor
died and its block manager de-registered with the master).

Author: Josh Rosen <>

Closes #15186 from JoshRosen/SPARK-17485-branch-1.6-backport.


Branch: refs/heads/branch-1.6
Commit: 94524cef4cf367a0e73ebe0e919cc21f25f1043f
Parents: ce0a222
Author: Josh Rosen <>
Authored: Thu Sep 22 11:05:35 2016 -0700
Committer: Josh Rosen <>
Committed: Thu Sep 22 11:05:35 2016 -0700

 .../spark/storage/BlockFetchException.scala     | 24 --------------------
 .../org/apache/spark/storage/BlockManager.scala |  3 ++-
 .../spark/storage/BlockManagerSuite.scala       |  7 +++---
 3 files changed, 5 insertions(+), 29 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
deleted file mode 100644
index f6e46ae..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.spark.SparkException
-case class BlockFetchException(messages: String, throwable: Throwable)
-  extends SparkException(messages, throwable)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 339ee144..1fc6f39 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -602,8 +602,9 @@ private[spark] class BlockManager(
           numFetchFailures += 1
           if (numFetchFailures == locations.size) {
             // An exception is thrown while fetching this block from all locations
-            throw new BlockFetchException(s"Failed to fetch block from" +
+            logWarning(s"Failed to fetch block from" +
               s" ${locations.size} locations. Most recent failure cause:", e)
+            return None
           } else {
             // This location failed, so we retry fetch from a different one by returning
null here
             logWarning(s"Failed to fetch remote block $blockId " +
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 47e8545..fc76b91 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -509,10 +509,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       assert(list1Get.isDefined, "list1Get expected to be fetched")
       store3 = null
-      // exception throw because there is no locations
-      intercept[BlockFetchException] {
-        list1Get = store.getRemoteBytes("list1")
-      }
+      // Fetch should fail because there are no locations, but no exception should be thrown
+      list1Get = store.getRemoteBytes("list1")
+      assert(list1Get.isEmpty, "list1Get expected to fail")
     } finally {
       origTimeoutOpt match {
         case Some(t) => conf.set("", t)

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message