spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context
Date Tue, 12 May 2015 01:54:12 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f18881598 -> c16b47f9e


[SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of
the context

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6058 from tdas/SPARK-7530 and squashes the following commits:

80ee0e6 [Tathagata Das] STARTED --> ACTIVE
3da6547 [Tathagata Das] Added synchronized
dd88444 [Tathagata Das] Added more docs
e1a8505 [Tathagata Das] Fixed comment length
89f9980 [Tathagata Das] Change to Java enum and added Java test
7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
dd4e702 [Tathagata Das] Addressed comments.
3d56106 [Tathagata Das] Added Mima excludes
2b86ba1 [Tathagata Das] Added scala docs.
1722433 [Tathagata Das] Fixed style
976b094 [Tathagata Das] Added license
0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState

(cherry picked from commit f9c7580adadce75a94bd2854cf4f743d8cbd1d23)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c16b47f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c16b47f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c16b47f9

Branch: refs/heads/branch-1.4
Commit: c16b47f9ed2753384bff6fe0fc303ae4f3df8eb8
Parents: f188815
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Mon May 11 18:53:50 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Mon May 11 18:54:06 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  4 ++
 .../spark/streaming/StreamingContext.scala      | 75 ++++++++++++--------
 .../spark/streaming/StreamingContextState.java  | 45 ++++++++++++
 .../api/java/JavaStreamingContext.scala         | 22 ++++++
 .../apache/spark/streaming/JavaAPISuite.java    | 14 ++++
 .../apache/spark/streaming/JavaTestUtils.scala  |  1 +
 .../spark/streaming/StreamingContextSuite.scala | 19 ++++-
 7 files changed, 147 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c16b47f9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index cfe387f..ad3d842 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -106,6 +106,10 @@ object MimaExcludes {
               "org.apache.spark.sql.parquet.ParquetTestData$"),
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.sql.parquet.TestGroupWriteSupport")
+          ) ++ Seq(
+            // SPARK-7530 Added StreamingContext.getState()
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.streaming.StreamingContext.state_=")
           )
 
         case v if v.startsWith("1.3") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/c16b47f9/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 5abe136..2c5834d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -32,10 +32,11 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
 import org.apache.spark._
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.input.FixedLengthBinaryInputFormat
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContextState._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
@@ -195,14 +196,7 @@ class StreamingContext private[streaming] (
   assert(env.metricsSystem != null)
   env.metricsSystem.registerSource(streamingSource)
 
-  /** Enumeration to identify current state of the StreamingContext */
-  private[streaming] object StreamingContextState extends Enumeration {
-    type CheckpointState = Value
-    val Initialized, Started, Stopped = Value
-  }
-
-  import StreamingContextState._
-  private[streaming] var state = Initialized
+  private var state: StreamingContextState = INITIALIZED
 
   private val startSite = new AtomicReference[CallSite](null)
 
@@ -517,17 +511,34 @@ class StreamingContext private[streaming] (
   }
 
   /**
+   * :: DeveloperApi ::
+   *
+   * Return the current state of the context. The context can be in three possible states
-
+   * - StreamingContextState.INTIALIZED - The context has been created, but not been started
yet.
+   *   Input DStreams, transformations and output operations can be created on the context.
+   * - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
+   *   Input DStreams, transformations and output operations cannot be created on the context.
+   * - StreamingContextState.STOPPED - The context has been stopped and cannot be used any
more.
+   */
+  @DeveloperApi
+  def getState(): StreamingContextState = synchronized {
+    state
+  }
+
+  /**
    * Start the execution of the streams.
    *
    * @throws SparkException if the context has already been started or stopped.
    */
   def start(): Unit = synchronized {
     import StreamingContext._
-    if (state == Started) {
-      throw new SparkException("StreamingContext has already been started")
-    }
-    if (state == Stopped) {
-      throw new SparkException("StreamingContext has already been stopped")
+    state match {
+      case INITIALIZED =>
+        // good to start
+      case ACTIVE =>
+        throw new SparkException("StreamingContext has already been started")
+      case STOPPED =>
+        throw new SparkException("StreamingContext has already been stopped")
     }
     validate()
     startSite.set(DStream.getCreationSite())
@@ -536,7 +547,7 @@ class StreamingContext private[streaming] (
       assertNoOtherContextIsActive()
       scheduler.start()
       uiTab.foreach(_.attach())
-      state = Started
+      state = StreamingContextState.ACTIVE
       setActiveContext(this)
     }
   }
@@ -598,22 +609,26 @@ class StreamingContext private[streaming] (
    *                       received data to be completed
    */
   def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
-    state match {
-      case Initialized => logWarning("StreamingContext has not been started yet")
-      case Stopped => logWarning("StreamingContext has already been stopped")
-      case Started =>
-        scheduler.stop(stopGracefully)
-        logInfo("StreamingContext stopped successfully")
-        waiter.notifyStop()
+    try {
+      state match {
+        case INITIALIZED =>
+          logWarning("StreamingContext has not been started yet")
+        case STOPPED =>
+          logWarning("StreamingContext has already been stopped")
+        case ACTIVE =>
+          scheduler.stop(stopGracefully)
+          uiTab.foreach(_.detach())
+          StreamingContext.setActiveContext(null)
+          waiter.notifyStop()
+          logInfo("StreamingContext stopped successfully")
+      }
+      // Even if we have already stopped, we still need to attempt to stop the SparkContext
because
+      // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext
= true).
+      if (stopSparkContext) sc.stop()
+    } finally {
+      // The state should always be Stopped after calling `stop()`, even if we haven't started
yet
+      state = STOPPED
     }
-    // Even if the streaming context has not been started, we still need to stop the SparkContext.
-    // Even if we have already stopped, we still need to attempt to stop the SparkContext
because
-    // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext =
true).
-    if (stopSparkContext) sc.stop()
-    uiTab.foreach(_.detach())
-    // The state should always be Stopped after calling `stop()`, even if we haven't started
yet:
-    state = Stopped
-    StreamingContext.setActiveContext(null)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c16b47f9/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java
new file mode 100644
index 0000000..d7b6393
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Represents the state of a StreamingContext.
+ */
+@DeveloperApi
+public enum StreamingContextState {
+  /**
+   * The context has been created, but not been started yet.
+   * Input DStreams, transformations and output operations can be created on the context.
+   */
+  INITIALIZED,
+
+  /**
+   * The context has been started, and been not stopped.
+   * Input DStreams, transformations and output operations cannot be created on the context.
+   */
+  ACTIVE,
+
+  /**
+   * The context has been stopped and cannot be used any more.
+   */
+  STOPPED
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c16b47f9/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 572d7d8..d8fbed2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -579,6 +579,28 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable
{
   }
 
   /**
+   * :: DeveloperApi ::
+   *
+   * Return the current state of the context. The context can be in three possible states
-
+   * <ul>
+   *   <li>
+   *   StreamingContextState.INTIALIZED - The context has been created, but not been started
yet.
+   *   Input DStreams, transformations and output operations can be created on the context.
+   *   </li>
+   *   <li>
+   *   StreamingContextState.ACTIVE - The context has been started, and been not stopped.
+   *   Input DStreams, transformations and output operations cannot be created on the context.
+   *   </li>
+   *   <li>
+   *   StreamingContextState.STOPPED - The context has been stopped and cannot be used any
more.
+   *   </li>
+   * </ul>
+   */
+  def getState(): StreamingContextState = {
+    ssc.getState()
+  }
+
+  /**
    * Start the execution of the streams.
    */
   def start(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c16b47f9/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index b1adf88..2e00b98 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -72,6 +72,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements
Serializa
 
   @SuppressWarnings("unchecked")
   @Test
+  public void testContextState() {
+    List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
+    Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData,
1);
+    JavaTestUtils.attachTestOutputStream(stream);
+    Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+    ssc.start();
+    Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
+    ssc.stop();
+    Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
   public void testCount() {
     List<List<Integer>> inputData = Arrays.asList(
         Arrays.asList(1,2,3,4),

http://git-wip-us.apache.org/repos/asf/spark/blob/c16b47f9/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index c0ea049..bb80bff 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase {
       ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]]
= {
     implicit val cm: ClassTag[V] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    ssc.getState()
     val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
     val out = new ArrayList[JList[V]]()
     res.map(entry => out.append(new ArrayList[V](entry)))

http://git-wip-us.apache.org/repos/asf/spark/blob/c16b47f9/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 11c7fd8..b8247db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -109,15 +109,21 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
     assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
   }
 
+  test("state matching") {
+    import StreamingContextState._
+    assert(INITIALIZED === INITIALIZED)
+    assert(INITIALIZED != ACTIVE)
+  }
+
   test("start and stop state check") {
     ssc = new StreamingContext(master, appName, batchDuration)
     addInputStream(ssc).register()
 
-    assert(ssc.state === ssc.StreamingContextState.Initialized)
+    assert(ssc.getState() === StreamingContextState.INITIALIZED)
     ssc.start()
-    assert(ssc.state === ssc.StreamingContextState.Started)
+    assert(ssc.getState() === StreamingContextState.ACTIVE)
     ssc.stop()
-    assert(ssc.state === ssc.StreamingContextState.Stopped)
+    assert(ssc.getState() === StreamingContextState.STOPPED)
 
     // Make sure that the SparkContext is also stopped by default
     intercept[Exception] {
@@ -129,9 +135,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
     ssc = new StreamingContext(master, appName, batchDuration)
     addInputStream(ssc).register()
     ssc.start()
+    assert(ssc.getState() === StreamingContextState.ACTIVE)
     intercept[SparkException] {
       ssc.start()
     }
+    assert(ssc.getState() === StreamingContextState.ACTIVE)
   }
 
   test("stop multiple times") {
@@ -139,13 +147,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
     addInputStream(ssc).register()
     ssc.start()
     ssc.stop()
+    assert(ssc.getState() === StreamingContextState.STOPPED)
     ssc.stop()
+    assert(ssc.getState() === StreamingContextState.STOPPED)
   }
 
   test("stop before start") {
     ssc = new StreamingContext(master, appName, batchDuration)
     addInputStream(ssc).register()
     ssc.stop()  // stop before start should not throw exception
+    assert(ssc.getState() === StreamingContextState.STOPPED)
   }
 
   test("start after stop") {
@@ -156,6 +167,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
     intercept[SparkException] {
       ssc.start() // start after stop should throw exception
     }
+    assert(ssc.getState() === StreamingContextState.STOPPED)
   }
 
   test("stop only streaming context") {
@@ -167,6 +179,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
     addInputStream(ssc).register()
     ssc.start()
     ssc.stop(stopSparkContext = false)
+    assert(ssc.getState() === StreamingContextState.STOPPED)
     assert(sc.makeRDD(1 to 100).collect().size === 100)
     sc.stop()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message