spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [6/8] git commit: Changed StreamingContext.stopForWait to awaitTermination.
Date Mon, 13 Jan 2014 04:04:42 GMT
Changed StreamingContext.stopForWait to awaitTermination.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c7fabb74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c7fabb74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c7fabb74

Branch: refs/heads/master
Commit: c7fabb745b26b42bb4a4609edcb43019fcbd68aa
Parents: 7883b8f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Sun Jan 12 17:21:13 2014 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Sun Jan 12 17:21:13 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/StreamingContext.scala   |  6 +++---
 .../streaming/api/java/JavaStreamingContext.scala   |  6 +++---
 .../spark/streaming/StreamingContextSuite.scala     | 16 ++++++++--------
 .../org/apache/spark/streaming/TestSuiteBase.scala  |  2 +-
 4 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c7fabb74/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 7b2a7d5..ee83ae9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -434,16 +434,16 @@ class StreamingContext private[streaming] (
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
    */
-  def waitForStop() {
+  def awaitTermination() {
     waiter.waitForStopOrError()
   }
 
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
-   * @param timeout time to wait
+   * @param timeout time to wait in milliseconds
    */
-  def waitForStop(timeout: Long) {
+  def awaitTermination(timeout: Long) {
     waiter.waitForStopOrError(timeout)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c7fabb74/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index ea7f7da..b4c46f5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -486,14 +486,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
    */
-  def waitForStop() = ssc.waitForStop()
+  def awaitTermination() = ssc.awaitTermination()
 
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
-   * @param timeout time to wait
+   * @param timeout time to wait in milliseconds
    */
-  def waitForStop(timeout: Long) = ssc.waitForStop(timeout)
+  def awaitTermination(timeout: Long) = ssc.awaitTermination(timeout)
 
   /**
    * Stop the execution of the streams. Will stop the associated JavaSparkContext as well.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c7fabb74/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 10c18a7..a477d20 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -146,7 +146,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts {
     ssc = new StreamingContext(sc, batchDuration)
   }
 
-  test("waitForStop") {
+  test("awaitTermination") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
     inputStream.map(x => x).register
@@ -158,13 +158,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts {
 
     // test whether waitForStop() exits after give amount of time
     failAfter(1000 millis) {
-      ssc.waitForStop(500)
+      ssc.awaitTermination(500)
     }
 
     // test whether waitForStop() does not exit if not time is given
     val exception = intercept[Exception] {
       failAfter(1000 millis) {
-        ssc.waitForStop()
+        ssc.awaitTermination()
         throw new Exception("Did not wait for stop")
       }
     }
@@ -178,11 +178,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts {
           ssc.stop()
         }
       }.start()
-      ssc.waitForStop()
+      ssc.awaitTermination()
     }
   }
 
-  test("waitForStop with error in task") {
+  test("awaitTermination with error in task") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
     inputStream.map(x => { throw new TestException("error in map task"); x})
@@ -190,19 +190,19 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts {
 
     val exception = intercept[Exception] {
       ssc.start()
-      ssc.waitForStop(5000)
+      ssc.awaitTermination(5000)
     }
     assert(exception.getMessage.contains("map task"), "Expected exception not thrown")
   }
 
-  test("waitForStop with error in job generation") {
+  test("awaitTermination with error in job generation") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
 
     inputStream.transform(rdd => { throw new TestException("error in transform"); rdd
}).register
     val exception = intercept[TestException] {
       ssc.start()
-      ssc.waitForStop(5000)
+      ssc.awaitTermination(5000)
     }
     assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c7fabb74/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index a8ff444..63a07cf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -273,7 +273,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging
{
       val startTime = System.currentTimeMillis()
       while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime
< maxWaitTimeMillis) {
         logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
-        ssc.waitForStop(50)
+        ssc.awaitTermination(50)
       }
       val timeTaken = System.currentTimeMillis() - startTime
       logInfo("Output generated in " + timeTaken + " milliseconds")


Mime
View raw message