spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant job description in streaming jobs
Date Mon, 21 Sep 2015 23:47:56 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7c4f852bf -> 72869883f


[SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant job description in streaming
jobs

The job group, and job descriptions information is passed through thread local properties,
and get inherited by child threads. In case of spark streaming, the streaming jobs inherit
these properties from the thread that called streamingContext.start(). This may not make sense.

1. Job group: This is mainly used for cancelling a group of jobs together. It does not make
sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not
a valid usecase any way, to cancel a streaming context, call streamingContext.stop()

2. Job description: This is used to pass on nice text descriptions for jobs to show up in
the UI. The job description of the thread that calls streamingContext.start() is not useful
for all the streaming jobs, as it does not make sense for all of the streaming jobs to have
the same description, and the description may or may not be related to streaming.

The solution in this PR is meant for the Spark master branch, where local properties are inherited
by cloning the properties. The job group and job description in the thread that starts the
streaming scheduler are explicitly removed, so that all the subsequent child threads does
not inherit them. Also, the starting is done in a new child thread, so that setting the job
group and description for streaming, does not change those properties in the thread that called
streamingContext.start().

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8781 from tdas/SPARK-10649.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72869883
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72869883
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72869883

Branch: refs/heads/master
Commit: 72869883f12b6e0a4e5aad79c0ac2cfdb4d83f09
Parents: 7c4f852
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Mon Sep 21 16:47:52 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Mon Sep 21 16:47:52 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/ThreadUtils.scala     | 59 ++++++++++++++++++++
 .../apache/spark/util/ThreadUtilsSuite.scala    | 24 +++++++-
 .../spark/streaming/StreamingContext.scala      | 15 ++++-
 .../spark/streaming/StreamingContextSuite.scala | 32 +++++++++++
 4 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72869883/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index ca5624a..22e291a 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -21,6 +21,7 @@ package org.apache.spark.util
 import java.util.concurrent._
 
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 
@@ -86,4 +87,62 @@ private[spark] object ThreadUtils {
     val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
     Executors.newSingleThreadScheduledExecutor(threadFactory)
   }
+
+  /**
+   * Run a piece of code in a new thread and return the result. Exception in the new thread
is
+   * thrown in the caller thread with an adjusted stack trace that removes references to
this
+   * method for clarity. The exception stack traces will be like the following
+   *
+   * SomeException: exception-message
+   *   at CallerClass.body-method (sourcefile.scala)
+   *   at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
+   *   at CallerClass.caller-method (sourcefile.scala)
+   *   ...
+   */
+  def runInNewThread[T](
+      threadName: String,
+      isDaemon: Boolean = true)(body: => T): T = {
+    @volatile var exception: Option[Throwable] = None
+    @volatile var result: T = null.asInstanceOf[T]
+
+    val thread = new Thread(threadName) {
+      override def run(): Unit = {
+        try {
+          result = body
+        } catch {
+          case NonFatal(e) =>
+            exception = Some(e)
+        }
+      }
+    }
+    thread.setDaemon(isDaemon)
+    thread.start()
+    thread.join()
+
+    exception match {
+      case Some(realException) =>
+        // Remove the part of the stack that shows method calls into this helper method
+        // This means drop everything from the top until the stack element
+        // ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
+        val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
+
+        // Remove the part of the new thread stack that shows methods call from this helper
method
+        val extraStackTrace = realException.getStackTrace.takeWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName))
+
+        // Combine the two stack traces, with a place holder just specifying that there
+        // was a helper method used, without any further details of the helper
+        val placeHolderStackElem = new StackTraceElement(
+          s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")}
..",
+          " ", "", -1)
+        val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
+
+        // Update the stack trace and rethrow the exception in the caller thread
+        realException.setStackTrace(finalStackTrace)
+        throw realException
+      case None =>
+        result
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/72869883/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 8c51e6b..620e4de 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -20,8 +20,9 @@ package org.apache.spark.util
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
 
@@ -66,4 +67,25 @@ class ThreadUtilsSuite extends SparkFunSuite {
     val futureThreadName = Await.result(f, 10.seconds)
     assert(futureThreadName === callerThreadName)
   }
+
+  test("runInNewThread") {
+    import ThreadUtils._
+    assert(runInNewThread("thread-name") { Thread.currentThread().getName } === "thread-name")
+    assert(runInNewThread("thread-name") { Thread.currentThread().isDaemon } === true)
+    assert(
+      runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon }
=== false
+    )
+    val uniqueExceptionMessage = "test" + Random.nextInt()
+    val exception = intercept[IllegalArgumentException] {
+      runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage)
}
+    }
+    assert(exception.asInstanceOf[IllegalArgumentException].getMessage === uniqueExceptionMessage)
+    assert(exception.getStackTrace.mkString("\n").contains(
+      "... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true,
+      "stack trace does not contain expected place holder"
+    )
+    assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false,
+      "stack trace contains unexpected references to ThreadUtils"
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/72869883/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b496d1f..6720ba4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -588,12 +588,20 @@ class StreamingContext private[streaming] (
     state match {
       case INITIALIZED =>
         startSite.set(DStream.getCreationSite())
-        sparkContext.setCallSite(startSite.get)
         StreamingContext.ACTIVATION_LOCK.synchronized {
           StreamingContext.assertNoOtherContextIsActive()
           try {
             validate()
-            scheduler.start()
+
+            // Start the streaming scheduler in a new thread, so that thread local properties
+            // like call sites and job groups can be reset without affecting those of the
+            // current thread.
+            ThreadUtils.runInNewThread("streaming-start") {
+              sparkContext.setCallSite(startSite.get)
+              sparkContext.clearJobGroup()
+              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
+              scheduler.start()
+            }
             state = StreamingContextState.ACTIVE
           } catch {
             case NonFatal(e) =>
@@ -618,6 +626,7 @@ class StreamingContext private[streaming] (
     }
   }
 
+
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.

http://git-wip-us.apache.org/repos/asf/spark/blob/72869883/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d26894e..3b9d0d1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -180,6 +180,38 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter
with Timeo
     assert(ssc.scheduler.isStarted === false)
   }
 
+  test("start should set job group and description of streaming jobs correctly") {
+    ssc = new StreamingContext(conf, batchDuration)
+    ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
+    val sc = ssc.sc
+
+    @volatile var jobGroupFound: String = ""
+    @volatile var jobDescFound: String = ""
+    @volatile var jobInterruptFound: String = ""
+    @volatile var allFound: Boolean = false
+
+    addInputStream(ssc).foreachRDD { rdd =>
+      jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
+      jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+      jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+      allFound = true
+    }
+    ssc.start()
+
+    eventually(timeout(10 seconds), interval(10 milliseconds)) {
+      assert(allFound === true)
+    }
+
+    // Verify streaming jobs have expected thread-local properties
+    assert(jobGroupFound === null)
+    assert(jobDescFound === null)
+    assert(jobInterruptFound === "false")
+
+    // Verify current thread's thread-local properties have not changed
+    assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
+    assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
+    assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
+  }
 
   test("start multiple times") {
     ssc = new StreamingContext(master, appName, batchDuration)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message