spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-9639] [STREAMING] Fix a potential NPE in Streaming JobScheduler
Date Thu, 06 Aug 2015 21:39:40 GMT
Repository: spark
Updated Branches:
  refs/heads/master 1723e3489 -> 346209097


[SPARK-9639] [STREAMING] Fix a potential NPE in Streaming JobScheduler

Because `JobScheduler.stop(false)` may set `eventLoop` to null when `JobHandler` is running,
then it's possible that when `post` is called, `eventLoop` happens to null.

This PR fixed this bug and also set threads in `jobExecutor` to `daemon`.

Author: zsxwing <zsxwing@gmail.com>

Closes #7960 from zsxwing/fix-npe and squashes the following commits:

b0864c4 [zsxwing] Fix a potential NPE in Streaming JobScheduler


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34620909
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34620909
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34620909

Branch: refs/heads/master
Commit: 346209097e88fe79015359e40b49c32cc0bdc439
Parents: 1723e34
Author: zsxwing <zsxwing@gmail.com>
Authored: Thu Aug 6 14:39:36 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu Aug 6 14:39:36 2015 -0700

----------------------------------------------------------------------
 .../streaming/scheduler/JobScheduler.scala      | 32 ++++++++++++++------
 1 file changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/34620909/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 7e73556..6d4cdc4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success}
@@ -25,7 +25,7 @@ import scala.util.{Failure, Success}
 import org.apache.spark.Logging
 import org.apache.spark.rdd.PairRDDFunctions
 import org.apache.spark.streaming._
-import org.apache.spark.util.EventLoop
+import org.apache.spark.util.{EventLoop, ThreadUtils}
 
 
 private[scheduler] sealed trait JobSchedulerEvent
@@ -44,7 +44,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   // https://gist.github.com/AlainODea/1375759b8720a3f9f094
   private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
   private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
-  private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs)
+  private val jobExecutor =
+    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
   private val jobGenerator = new JobGenerator(this)
   val clock = jobGenerator.clock
   val listenerBus = new StreamingListenerBus()
@@ -193,14 +194,25 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
       ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
       ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
       try {
-        eventLoop.post(JobStarted(job))
-        // Disable checks for existing output directories in jobs launched by the streaming
-        // scheduler, since we may need to write output to an existing directory during checkpoint
-        // recovery; see SPARK-4835 for more details.
-        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
-          job.run()
+        // We need to assign `eventLoop` to a temp variable. Otherwise, because
+        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running,
then
+        // it's possible that when `post` is called, `eventLoop` happens to null.
+        var _eventLoop = eventLoop
+        if (_eventLoop != null) {
+          _eventLoop.post(JobStarted(job))
+          // Disable checks for existing output directories in jobs launched by the streaming
+          // scheduler, since we may need to write output to an existing directory during
checkpoint
+          // recovery; see SPARK-4835 for more details.
+          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+            job.run()
+          }
+          _eventLoop = eventLoop
+          if (_eventLoop != null) {
+            _eventLoop.post(JobCompleted(job))
+          }
+        } else {
+          // JobScheduler has been stopped.
         }
-        eventLoop.post(JobCompleted(job))
       } finally {
         ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
         ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message