spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-14131][SQL] Add a workaround for HADOOP-10622 to fix DataFrameReaderWriterSuite
Date Fri, 25 Mar 2016 20:28:36 GMT
Repository: spark
Updated Branches:
  refs/heads/master afd0debe0 -> b554b3c46


[SPARK-14131][SQL] Add a workaround for HADOOP-10622 to fix DataFrameReaderWriterSuite

## What changes were proposed in this pull request?

There is a potential dead-lock in Hadoop Shell.runCommand before 2.5.0 ([HADOOP-10622](https://issues.apache.org/jira/browse/HADOOP-10622)).
If we interrupt some thread running Shell.runCommand, we may hit this issue.

This PR adds some protecion to prevent from interrupting the microBatchThread when we may
run into Shell.runCommand. There are two places will call Shell.runCommand now:

- offsetLog.add
- FileStreamSource.getOffset

They will create a file using HDFS API and call Shell.runCommand to set the file permission.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11940 from zsxwing/workaround-for-HADOOP-10622.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b554b3c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b554b3c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b554b3c4

Branch: refs/heads/master
Commit: b554b3c46b0019a6caf0f9a975b460dc2570c3b2
Parents: afd0deb
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Fri Mar 25 13:28:26 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Fri Mar 25 13:28:26 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 85 ++++++++++++++++++--
 1 file changed, 80 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b554b3c4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5abd7ec..60e00d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
@@ -101,6 +102,65 @@ class StreamExecution(
   private val offsetLog =
     new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets"))
 
+  /** A monitor to protect "uninterruptible" and "interrupted" */
+  private val uninterruptibleLock = new Object
+
+  /**
+   * Indicates if "microBatchThread" are in the uninterruptible status. If so, interrupting
+   * "microBatchThread" will be deferred until "microBatchThread" enters into the interruptible
+   * status.
+   */
+  @GuardedBy("uninterruptibleLock")
+  private var uninterruptible = false
+
+  /**
+   * Indicates if we should interrupt "microBatchThread" when we are leaving the uninterruptible
+   * zone.
+   */
+  @GuardedBy("uninterruptibleLock")
+  private var shouldInterruptThread = false
+
+  /**
+   * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible
+   * status, "microBatchThread" won't be interrupted until it enters into the interruptible
status.
+   */
+  private def interruptMicroBatchThreadSafely(): Unit = {
+    uninterruptibleLock.synchronized {
+      if (uninterruptible) {
+        shouldInterruptThread = true
+      } else {
+        microBatchThread.interrupt()
+      }
+    }
+  }
+
+  /**
+   * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted
before
+   * returning from `f`.
+   */
+  private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = {
+    assert(Thread.currentThread() == microBatchThread)
+    uninterruptibleLock.synchronized {
+      uninterruptible = true
+      // Clear the interrupted status if it's set.
+      if (Thread.interrupted()) {
+        shouldInterruptThread = true
+      }
+    }
+    try {
+      f
+    } finally {
+      uninterruptibleLock.synchronized {
+        uninterruptible = false
+        if (shouldInterruptThread) {
+          // Recover the interrupted status
+          microBatchThread.interrupt()
+          shouldInterruptThread = false
+        }
+      }
+    }
+  }
+
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
@@ -227,14 +287,29 @@ class StreamExecution(
     // Update committed offsets.
     committedOffsets ++= availableOffsets
 
+    // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
+    // If we interrupt some thread running Shell.runCommand, we may hit this issue.
+    // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
+    // to set the file permission, we should not interrupt "microBatchThread" when running
this
+    // method. See SPARK-14131.
+    //
     // Check to see what new data is available.
-    val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+    val newData = runUninterruptiblyInMicroBatchThread {
+      uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+    }
     availableOffsets ++= newData
 
     if (dataAvailable) {
-      assert(
-        offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
-        s"Concurrent update to the log.  Multiple streaming jobs detected for $currentBatchId")
+      // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
+      // If we interrupt some thread running Shell.runCommand, we may hit this issue.
+      // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand"
to set
+      // the file permission, we should not interrupt "microBatchThread" when running this
method.
+      // See SPARK-14131.
+      runUninterruptiblyInMicroBatchThread {
+        assert(
+          offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+          s"Concurrent update to the log.  Multiple streaming jobs detected for $currentBatchId")
+      }
       currentBatchId += 1
       logInfo(s"Committed offsets for batch $currentBatchId.")
       true
@@ -320,7 +395,7 @@ class StreamExecution(
     // intentionally
     state = TERMINATED
     if (microBatchThread.isAlive) {
-      microBatchThread.interrupt()
+      interruptMicroBatchThreadSafely()
       microBatchThread.join()
     }
     logInfo(s"Query $name was stopped")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message