spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From js...@apache.org
Subject spark git commit: [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener
Date Thu, 19 Apr 2018 02:01:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master 0c94e48bc -> 8bb0df2c6


[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener

## What changes were proposed in this pull request?

The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This
patch adds it and a test for it.

This patch also includes a trivial doc improvement for `createDirectStream`.

Original PR is #21057.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21098 from viirya/SPARK-24014.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb0df2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb0df2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb0df2c

Branch: refs/heads/master
Commit: 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0
Parents: 0c94e48
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Thu Apr 19 10:00:57 2018 +0800
Committer: jerryshao <sshao@hortonworks.com>
Committed: Thu Apr 19 10:00:57 2018 +0800

----------------------------------------------------------------------
 python/pyspark/streaming/kafka.py    | 3 ++-
 python/pyspark/streaming/listener.py | 6 ++++++
 python/pyspark/streaming/tests.py    | 7 +++++++
 3 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index fdb9308..ed2e0e7 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -104,7 +104,8 @@ class KafkaUtils(object):
         :param topics:  list of topic_name to consume.
         :param kafkaParams: Additional params for Kafka.
         :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
-                            point of the stream.
+                            point of the stream (a dictionary mapping `TopicAndPartition`
to
+                            integers).
         :param keyDecoder:  A function used to decode key (default is utf8_decoder).
         :param valueDecoder:  A function used to decode value (default is utf8_decoder).
         :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can
assess

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/listener.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py
index b830797..d4ecc21 100644
--- a/python/pyspark/streaming/listener.py
+++ b/python/pyspark/streaming/listener.py
@@ -23,6 +23,12 @@ class StreamingListener(object):
     def __init__(self):
         pass
 
+    def onStreamingStarted(self, streamingStarted):
+        """
+        Called when the streaming has been started.
+        """
+        pass
+
     def onReceiverStarted(self, receiverStarted):
         """
         Called when a receiver has been started

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 7dde7c0..1039409 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase):
             self.batchInfosCompleted = []
             self.batchInfosStarted = []
             self.batchInfosSubmitted = []
+            self.streamingStartedTime = []
+
+        def onStreamingStarted(self, streamingStarted):
+            self.streamingStartedTime.append(streamingStarted.time)
 
         def onBatchSubmitted(self, batchSubmitted):
             self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
@@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase):
         batchInfosSubmitted = batch_collector.batchInfosSubmitted
         batchInfosStarted = batch_collector.batchInfosStarted
         batchInfosCompleted = batch_collector.batchInfosCompleted
+        streamingStartedTime = batch_collector.streamingStartedTime
 
         self.wait_for(batchInfosCompleted, 4)
 
+        self.assertEqual(len(streamingStartedTime), 1)
+
         self.assertGreaterEqual(len(batchInfosSubmitted), 4)
         for info in batchInfosSubmitted:
             self.assertGreaterEqual(info.batchTime().milliseconds(), 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message