spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-17484] Prevent invalid block locations from being reported after put() exceptions
Date Thu, 15 Sep 2016 18:54:52 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0169c2edc -> 9c23f4408


[SPARK-17484] Prevent invalid block locations from being reported after put() exceptions

## What changes were proposed in this pull request?

If a BlockManager `put()` call failed after the BlockManagerMaster was notified of a block's
availability then incomplete cleanup logic in a `finally` block would never send a second
block status method to inform the master of the block's unavailability. This, in turn, leads
to fetch failures and used to be capable of causing complete job failures before #15037 was
fixed.

This patch addresses this issue via multiple small changes:

- The `finally` block now calls `removeBlockInternal` when cleaning up from a failed `put()`;
in addition to removing the `BlockInfo` entry (which was _all_ that the old cleanup logic
did), this code (redundantly) tries to remove the block from the memory and disk stores (as
an added layer of defense against bugs lower down in the stack) and optionally notifies the
master of block removal (which now happens during exception-triggered cleanup).
- When a BlockManager receives a request for a block that it does not have it will now notify
the master to update its block locations. This ensures that bad metadata pointing to non-existent
blocks will eventually be fixed. Note that I could have implemented this logic in the block
manager client (rather than in the remote server), but that would introduce the problem of
distinguishing between transient and permanent failures; on the server, however, we know definitively
that the block isn't present.
- Catch `NonFatal` instead of `Exception` to avoid swallowing `InterruptedException`s thrown
from synchronous block replication calls.

This patch depends upon the refactorings in #15036, so that other patch will also have to
be backported when backporting this fix.

For more background on this issue, including example logs from a real production failure,
see [SPARK-17484](https://issues.apache.org/jira/browse/SPARK-17484).

## How was this patch tested?

Two new regression tests in BlockManagerSuite.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15085 from JoshRosen/SPARK-17484.

(cherry picked from commit 1202075c95eabba0ffebc170077df798f271a139)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c23f440
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c23f440
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c23f440

Branch: refs/heads/branch-2.0
Commit: 9c23f4408d337f4af31ebfbcc78767df67d36aed
Parents: 0169c2e
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Thu Sep 15 11:54:17 2016 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Thu Sep 15 11:54:39 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 37 +++++++++++++++-----
 .../spark/storage/BlockManagerSuite.scala       | 34 ++++++++++++++++++
 2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c23f440/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 48db97a..37dfbd6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -279,7 +279,12 @@ private[spark] class BlockManager(
     } else {
       getLocalBytes(blockId) match {
         case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId,
buffer)
-        case None => throw new BlockNotFoundException(blockId.toString)
+        case None =>
+          // If this block manager receives a request for a block that it doesn't have then
it's
+          // likely that the master has outdated block statuses for this block. Therefore,
we send
+          // an RPC so that this block is marked as being unavailable from this block manager.
+          reportBlockStatus(blockId, BlockStatus.empty)
+          throw new BlockNotFoundException(blockId.toString)
       }
     }
   }
@@ -856,22 +861,38 @@ private[spark] class BlockManager(
     }
 
     val startTimeMs = System.currentTimeMillis
-    var blockWasSuccessfullyStored: Boolean = false
+    var exceptionWasThrown: Boolean = true
     val result: Option[T] = try {
       val res = putBody(putBlockInfo)
-      blockWasSuccessfullyStored = res.isEmpty
-      res
-    } finally {
-      if (blockWasSuccessfullyStored) {
+      exceptionWasThrown = false
+      if (res.isEmpty) {
+        // the block was successfully stored
         if (keepReadLock) {
           blockInfoManager.downgradeLock(blockId)
         } else {
           blockInfoManager.unlock(blockId)
         }
       } else {
-        blockInfoManager.removeBlock(blockId)
+        removeBlockInternal(blockId, tellMaster = false)
         logWarning(s"Putting block $blockId failed")
       }
+      res
+    } finally {
+      // This cleanup is performed in a finally block rather than a `catch` to avoid having
to
+      // catch and properly re-throw InterruptedException.
+      if (exceptionWasThrown) {
+        logWarning(s"Putting block $blockId failed due to an exception")
+        // If an exception was thrown then it's possible that the code in `putBody` has already
+        // notified the master about the availability of this block, so we need to send an
update
+        // to remove this block location.
+        removeBlockInternal(blockId, tellMaster = tellMaster)
+        // The `putBody` code may have also added a new block status to TaskMetrics, so we
need
+        // to cancel that out by overwriting it with an empty block status. We only do this
if
+        // the finally block was entered via an exception because doing this unconditionally
would
+        // cause us to send empty block statuses for every block that failed to be cached
due to
+        // a memory shortage (which is an expected failure, unlike an uncaught exception).
+        addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+      }
     }
     if (level.replication > 1) {
       logDebug("Putting block %s with replication took %s"
@@ -1170,7 +1191,7 @@ private[spark] class BlockManager(
               done = true  // specified number of peers have been replicated to
             }
           } catch {
-            case e: Exception =>
+            case NonFatal(e) =>
               logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
               failures += 1
               replicationFailed = true

http://git-wip-us.apache.org/repos/asf/spark/blob/9c23f440/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 2f594b8..6194d23 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -859,6 +859,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       serializerManager, conf, memoryManager, mapOutputTracker,
       shuffleManager, transfer, securityMgr, 0)
     memoryManager.setMemoryStore(store.memoryStore)
+    store.initialize("app-id")
 
     // The put should fail since a1 is not serializable.
     class UnserializableClass
@@ -1204,6 +1205,39 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     verify(mockBlockManagerMaster, times(2)).getLocations("item")
   }
 
+  test("SPARK-17484: block status is properly updated following an exception in put()") {
+    val mockBlockTransferService = new MockBlockTransferService(maxFailures = 10) {
+      override def uploadBlock(
+          hostname: String,
+          port: Int, execId: String,
+          blockId: BlockId,
+          blockData: ManagedBuffer,
+          level: StorageLevel,
+          classTag: ClassTag[_]): Future[Unit] = {
+        throw new InterruptedException("Intentional interrupt")
+      }
+    }
+    store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
+    store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService))
+    intercept[InterruptedException] {
+      store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true)
+    }
+    assert(store.getLocalBytes("item").isEmpty)
+    assert(master.getLocations("item").isEmpty)
+    assert(store2.getRemoteBytes("item").isEmpty)
+  }
+
+  test("SPARK-17484: master block locations are updated following an invalid remote block
fetch") {
+    store = makeBlockManager(8000, "executor1")
+    store2 = makeBlockManager(8000, "executor2")
+    store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(master.getLocations("item").nonEmpty)
+    store.removeBlock("item", tellMaster = false)
+    assert(master.getLocations("item").nonEmpty)
+    assert(store2.getRemoteBytes("item").isEmpty)
+    assert(master.getLocations("item").isEmpty)
+  }
+
   class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
     var numCalls = 0
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message