spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From js...@apache.org
Subject spark git commit: [SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch
Date Tue, 29 May 2018 12:11:19 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fec43fe1b -> 49a6c2b91


[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch

When blocks tried to get allocated to a batch and WAL write fails then the blocks will be
removed from the received block queue. This fact simply produces data loss because the next
allocation will not find the mentioned blocks in the queue.

In this PR blocks will be removed from the received queue only if WAL write succeded.

Additional unit test.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #21430 from gaborgsomogyi/SPARK-23991.

Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341
(cherry picked from commit aca65c63cb12073eb193fe08998994c60acb8b58)
Signed-off-by: jerryshao <sshao@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49a6c2b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49a6c2b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49a6c2b9

Branch: refs/heads/branch-2.3
Commit: 49a6c2b915f37682781efba708a103e709c54cf7
Parents: fec43fe
Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Authored: Tue May 29 20:10:59 2018 +0800
Committer: jerryshao <sshao@hortonworks.com>
Committed: Tue May 29 20:11:14 2018 +0800

----------------------------------------------------------------------
 .../scheduler/ReceivedBlockTracker.scala        |  3 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 47 +++++++++++++++++++-
 2 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/49a6c2b9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index dacff69..cf43245 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker(
   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
       val streamIdToBlocks = streamIds.map { streamId =>
-          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
+        (streamId, getReceivedBlockQueue(streamId).clone())
       }.toMap
       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
+        streamIds.foreach(getReceivedBlockQueue(_).clear())
         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
         lastAllocatedBatchTime = batchTime
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/49a6c2b9/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 4fa236b..fd7e00b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps}
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{doThrow, reset, spy}
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
@@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite
     tracker2.stop()
   }
 
+  test("block allocation to batch should not loose blocks from received queue") {
+    val tracker1 = spy(createTracker())
+    tracker1.isWriteAheadLogEnabled should be (true)
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+
+    // Add blocks
+    val blockInfos = generateBlockInfos()
+    blockInfos.map(tracker1.addBlock)
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+
+    // Try to allocate the blocks to a batch and verify that it's failing
+    // The blocks should stay in the received queue when WAL write failing
+    doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
+      .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
+    val errMsg = intercept[RuntimeException] {
+      tracker1.allocateBlocksToBatch(1)
+    }
+    assert(errMsg.getMessage === "Not able to write BatchAllocationEvent")
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+    tracker1.getBlocksOfBatch(1) shouldEqual Map.empty
+    tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty
+
+    // Allocate the blocks to a batch and verify that all of them have been allocated
+    reset(tracker1)
+    tracker1.allocateBlocksToBatch(2)
+    tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+    tracker1.hasUnallocatedReceivedBlocks should be (false)
+    tracker1.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
+    tracker1.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos
+
+    tracker1.stop()
+
+    // Recover from WAL to see the correctness
+    val tracker2 = createTracker(recoverFromWriteAheadLog = true)
+    tracker2.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+    tracker2.hasUnallocatedReceivedBlocks should be (false)
+    tracker2.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
+    tracker2.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos
+    tracker2.stop()
+  }
+
   test("recovery and cleanup with write ahead logs") {
     val manualClock = new ManualClock
     // Set the time increment level to twice the rotation interval so that every increment
creates
@@ -312,7 +355,7 @@ class ReceivedBlockTrackerSuite
       recoverFromWriteAheadLog: Boolean = false,
       clock: Clock = new SystemClock): ReceivedBlockTracker = {
     val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
-    val tracker = new ReceivedBlockTracker(
+    var tracker = new ReceivedBlockTracker(
       conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
     allReceivedBlockTrackers += tracker
     tracker


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message