spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-10072] [STREAMING] BlockGenerator can deadlock when the queue of generate blocks fills up to capacity
Date Wed, 19 Aug 2015 02:26:57 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 0a1385e31 -> 08c5962a2


[SPARK-10072] [STREAMING] BlockGenerator can deadlock when the queue of generate blocks fills
up to capacity

Generated blocks are inserted into an ArrayBlockingQueue, and another thread pulls stuff from
the ArrayBlockingQueue and pushes it into BlockManager. Now if that queue fills up to capacity
(default is 10 blocks), then the inserting into queue (done in the function updateCurrentBuffer)
get blocked inside a synchronized block. However, the thread that is pulling blocks from the
queue uses the same lock to check the current (active or stopped) while pulling from the queue.
Since the block generating threads is blocked (as the queue is full) on the lock, this thread
that is supposed to drain the queue gets blocked. Ergo, deadlock.

Solution: Moved blocking call to ArrayBlockingQueue outside the synchronized to prevent deadlock.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8257 from tdas/SPARK-10072.

(cherry picked from commit 1aeae05bb20f01ab7ccaa62fe905a63e020074b5)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08c5962a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08c5962a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08c5962a

Branch: refs/heads/branch-1.5
Commit: 08c5962a251555e7d34460135ab6c32cce584704
Parents: 0a1385e
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue Aug 18 19:26:38 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Aug 18 19:26:51 2015 -0700

----------------------------------------------------------------------
 .../streaming/receiver/BlockGenerator.scala     | 29 +++++++++++++-------
 1 file changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08c5962a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 300e820..421d60a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -227,16 +227,21 @@ private[streaming] class BlockGenerator(
   def isStopped(): Boolean = state == StoppedAll
 
   /** Change the buffer to which single records are added to. */
-  private def updateCurrentBuffer(time: Long): Unit = synchronized {
+  private def updateCurrentBuffer(time: Long): Unit = {
     try {
-      val newBlockBuffer = currentBuffer
-      currentBuffer = new ArrayBuffer[Any]
-      if (newBlockBuffer.size > 0) {
-        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
-        val newBlock = new Block(blockId, newBlockBuffer)
-        listener.onGenerateBlock(blockId)
+      var newBlock: Block = null
+      synchronized {
+        if (currentBuffer.nonEmpty) {
+          val newBlockBuffer = currentBuffer
+          currentBuffer = new ArrayBuffer[Any]
+          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
+          listener.onGenerateBlock(blockId)
+          newBlock = new Block(blockId, newBlockBuffer)
+        }
+      }
+
+      if (newBlock != null) {
         blocksForPushing.put(newBlock)  // put is blocking when queue is full
-        logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
       }
     } catch {
       case ie: InterruptedException =>
@@ -250,9 +255,13 @@ private[streaming] class BlockGenerator(
   private def keepPushingBlocks() {
     logInfo("Started block pushing thread")
 
-    def isGeneratingBlocks = synchronized { state == Active || state == StoppedAddingData
}
+    def areBlocksBeingGenerated: Boolean = synchronized {
+      state != StoppedGeneratingBlocks
+    }
+
     try {
-      while (isGeneratingBlocks) {
+      // While blocks are being generated, keep polling for to-be-pushed blocks and push
them.
+      while (areBlocksBeingGenerated) {
         Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
           case Some(block) => pushBlock(block)
           case None =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message