spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject git commit: [SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming Programming Guide
Date Sun, 19 Oct 2014 02:14:59 GMT
Repository: spark
Updated Branches:
  refs/heads/master f406a8391 -> 05db2da7d


[SPARK-3952] [Streaming] [PySpark] add Python examples in Streaming Programming Guide

Having Python examples in Streaming Programming Guide.

Also add RecoverableNetworkWordCount example.

Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>

Closes #2808 from davies/pyguide and squashes the following commits:

8d4bec4 [Davies Liu] update readme
26a7e37 [Davies Liu] fix format
3821c4d [Davies Liu] address comments, add missing file
7e4bb8a [Davies Liu] add Python examples in Streaming Programming Guide


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05db2da7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05db2da7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05db2da7

Branch: refs/heads/master
Commit: 05db2da7dc256822cdb602c4821cbb9fb84dac98
Parents: f406a83
Author: Davies Liu <davies.liu@gmail.com>
Authored: Sat Oct 18 19:14:48 2014 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Sat Oct 18 19:14:48 2014 -0700

----------------------------------------------------------------------
 docs/README.md                                  |   3 +-
 docs/streaming-programming-guide.md             | 304 ++++++++++++++++++-
 .../streaming/recoverable_network_wordcount.py  |  80 +++++
 python/docs/pyspark.streaming.rst               |  10 +
 python/pyspark/streaming/dstream.py             |   8 +-
 5 files changed, 391 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/05db2da7/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 0facecd..d2d58e4 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -25,8 +25,7 @@ installing via the Ruby Gem dependency manager. Since the exact HTML output
 varies between versions of Jekyll and its dependencies, we list specific versions here
 in some cases:
 
-    $ sudo gem install jekyll -v 1.4.3
-    $ sudo gem uninstall kramdown -v 1.4.1
+    $ sudo gem install jekyll
     $ sudo gem install jekyll-redirect-from
 
 Execute `jekyll` from the `docs/` directory. Compiling the site with Jekyll will create a
directory

http://git-wip-us.apache.org/repos/asf/spark/blob/05db2da7/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 738309c..8bbba88 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -213,6 +213,67 @@ The complete code can be found in the Spark Streaming example
 <br>
 
 </div>
+<div data-lang="python"  markdown="1" >
+First, we import StreamingContext, which is the main entry point for all streaming functionality.
We create a local StreamingContext with two execution threads, and batch interval of 1 second.
+
+{% highlight python %}
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+# Create a local StreamingContext with two working thread and batch interval of 1 second
+sc = SparkContext("local[2]", "NetworkWordCount")
+ssc = StreamingContext(sc, 1)
+{% endhighlight %}
+
+Using this context, we can create a DStream that represents streaming data from a TCP
+source hostname, e.g. `localhost`, and port, e.g. `9999`
+
+{% highlight python %}
+# Create a DStream that will connect to hostname:port, like localhost:9999
+lines = ssc.socketTextStream("localhost", 9999)
+{% endhighlight %}
+
+This `lines` DStream represents the stream of data that will be received from the data
+server. Each record in this DStream is a line of text. Next, we want to split the lines by
+space into words.
+
+{% highlight python %}
+# Split each line into words
+words = lines.flatMap(lambda line: line.split(" "))
+{% endhighlight %}
+
+`flatMap` is a one-to-many DStream operation that creates a new DStream by
+generating multiple new records from each record in the source DStream. In this case,
+each line will be split into multiple words and the stream of words is represented as the
+`words` DStream.  Next, we want to count these words.
+
+{% highlight python %}
+# Count each word in each batch
+pairs = words.map(lambda word: (word, 1))
+wordCounts = pairs.reduceByKey(lambda x, y: x + y)
+
+# Print the first ten elements of each RDD generated in this DStream to the console
+wordCounts.pprint()
+{% endhighlight %}
+
+The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
+1)` pairs, which is then reduced to get the frequency of words in each batch of data.
+Finally, `wordCounts.pprint()` will print a few of the counts generated every second.
+
+Note that when these lines are executed, Spark Streaming only sets up the computation it
+will perform when it is started, and no real processing has started yet. To start the processing
+after all the transformations have been setup, we finally call
+
+{% highlight python %}
+ssc.start()             # Start the computation
+ssc.awaitTermination()  # Wait for the computation to terminate
+{% endhighlight %}
+
+The complete code can be found in the Spark Streaming example
+[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/network_wordcount.py).
+<br>
+
+</div>
 </div>
 
 If you have already [downloaded](index.html#downloading) and [built](index.html#building)
Spark,
@@ -236,6 +297,11 @@ $ ./bin/run-example streaming.NetworkWordCount localhost 9999
 $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight bash %}
+$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
+{% endhighlight %}
+</div>
 </div>
 
 
@@ -259,8 +325,11 @@ hello world
     </td>
     <td width="2%"></td>
     <td>
+<div class="codetabs">
+
+<div data-lang="scala" markdown="1">
 {% highlight bash %}
-# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
+# TERMINAL 2: RUNNING NetworkWordCount
 
 $ ./bin/run-example streaming.NetworkWordCount localhost 9999
 ...
@@ -271,6 +340,37 @@ Time: 1357008430000 ms
 (world,1)
 ...
 {% endhighlight %}
+</div>
+
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+# TERMINAL 2: RUNNING JavaNetworkWordCount
+
+$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
+...
+-------------------------------------------
+Time: 1357008430000 ms
+-------------------------------------------
+(hello,1)
+(world,1)
+...
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight bash %}
+# TERMINAL 2: RUNNING network_wordcount.py
+
+$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
+...
+-------------------------------------------
+Time: 2014-10-14 15:25:21
+-------------------------------------------
+(hello,1)
+(world,1)
+...
+{% endhighlight %}
+</div>
+</div>    
     </td>
 </table>
 
@@ -398,9 +498,34 @@ JavaSparkContext sc = ...   //existing JavaSparkContext
 JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
 {% endhighlight %} 
 </div>
+<div data-lang="python" markdown="1">
+
+A [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
object can be created from a [SparkContext](api/python/pyspark.html#pyspark.SparkContext)
object.
+
+{% highlight python %}
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+sc = SparkContext(master, appName)
+ssc = StreamingContext(sc, 1)
+{% endhighlight %}
+
+The `appName` parameter is a name for your application to show on the cluster UI.
+`master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
+or a special __"local[\*]"__ string to run in local mode. In practice, when running on a
cluster, 
+you will not want to hardcode `master` in the program,
+but rather [launch the application with `spark-submit`](submitting-applications.html) and
+receive it there. However, for local testing and unit tests, you can pass "local[\*]" to
run Spark Streaming
+in-process (detects the number of cores in the local system).
+
+The batch interval must be set based on the latency requirements of your application
+and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
+section for more details.
+</div>
 </div>
 
 After a context is defined, you have to do the follow steps.
+
 1. Define the input sources.
 1. Setup the streaming computations.
 1. Start the receiving and procesing of data using `streamingContext.start()`.
@@ -483,6 +608,9 @@ methods for creating DStreams from files and Akka actors as input sources.
     <div data-lang="java" markdown="1">
 		streamingContext.fileStream<keyClass, valueClass, inputFormatClass>(dataDirectory);
     </div>
+    <div data-lang="python" markdown="1">
+		streamingContext.textFileStream(dataDirectory)
+    </div>
     </div>
 
 	Spark Streaming will monitor the directory `dataDirectory` and process any files created
in that directory (files written in nested directories not supported). Note that
@@ -685,12 +813,29 @@ JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFu
 {% endhighlight %}
 
 </div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+def updateFunction(newValues, runningCount):
+    if runningCount is None:
+       runningCount = 0
+    return sum(newValues, runningCount)  # add the new values with the previous running count
to get the new count
+{% endhighlight %}
+
+This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
+1)` pairs in the [earlier example](#a-quick-example)).
+
+{% highlight python %}
+runningCounts = pairs.updateStateByKey(updateFunction)
+{% endhighlight %}
+
+</div>
 </div>
 
 The update function will be called for each word, with `newValues` having a sequence of 1's
(from
 the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
 Scala code, take a look at the example
-[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala).
+[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py).
 
 #### Transform Operation
 {:.no_toc}
@@ -733,6 +878,15 @@ JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
 {% endhighlight %}
 
 </div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
+
+# join data stream with spam information to do data cleaning
+cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
+{% endhighlight %}
+</div>
 </div>
 
 In fact, you can also use [machine learning](mllib-guide.html) and
@@ -794,6 +948,14 @@ JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow
 {% endhighlight %}
 
 </div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+# Reduce last 30 seconds of data, every 10 seconds
+windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30,
10)
+{% endhighlight %}
+
+</div>
 </div>
 
 Some of the common window operations are as follows. All of these operations take the
@@ -860,6 +1022,7 @@ see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
 and [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
 For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html)
 and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html).
+For the Python API, see [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
 
 ***
 
@@ -872,9 +1035,12 @@ Currently, the following output operations are defined:
 <table class="table">
 <tr><th style="width:30%">Output Operation</th><th>Meaning</th></tr>
 <tr>
-  <td> <b>print</b>() </td>
+  <td> <b>print</b>()</td>
   <td> Prints first ten elements of every batch of data in a DStream on the driver.

-  This is useful for development and debugging. </td>
+  This is useful for development and debugging. 
+  <br/>
+  <b>PS</b>: called <b>pprint</b>() in Python)
+  </td>
 </tr>
 <tr>
   <td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>])
</td>
@@ -915,17 +1081,41 @@ For this purpose, a developer may inadvertantly try creating a connection
object
 the Spark driver, but try to use it in a Spark worker to save records in the RDDs.
 For example (in Scala),
 
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
         dstream.foreachRDD(rdd => {
             val connection = createNewConnection()  // executed at the driver
             rdd.foreach(record => {
                 connection.send(record) // executed at the worker
             })
         })
+{% endhighlight %}
+
+</div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+def sendRecord(rdd):
+    connection = createNewConnection()  # executed at the driver
+    rdd.foreach(lambda record: connection.send(record))
+    connection.close()
+        
+dstream.foreachRDD(sendRecord)
+{% endhighlight %}
+
+</div>
+</div>
 
-	This is incorrect as this requires the connection object to be serialized and sent from
the driver to the worker. Such connection objects are rarely transferrable across machines.
This error may manifest as serialization errors (connection object not serializable), initialization
errors (connection object needs to be initialized at the workers), etc. The correct solution
is to create the connection object at the worker.
+  This is incorrect as this requires the connection object to be serialized and sent from
the driver to the worker. Such connection objects are rarely transferrable across machines.
This error may manifest as serialization errors (connection object not serializable), initialization
errors (connection object needs to be initialized at the workers), etc. The correct solution
is to create the connection object at the worker.
 
 - However, this can lead to another common mistake - creating a new connection for every
record. For example,
 
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
         dstream.foreachRDD(rdd => {
             rdd.foreach(record => {
                 val connection = createNewConnection()
@@ -933,9 +1123,28 @@ For example (in Scala),
                 connection.close()
             })
         })
+{% endhighlight %}
 
-	Typically, creating a connection object has time and resource overheads. Therefore, creating
and destroying a connection object for each record can incur unnecessarily high overheads
and can significantly reduce the overall throughput of the system. A better solution is to
use `rdd.foreachPartition` - create a single connection object and send all the records in
a RDD partition using that connection.
+</div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+def sendRecord(record):
+    connection = createNewConnection()
+    connection.send(record)
+    connection.close()
+        
+dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
+{% endhighlight %}
 
+</div>
+</div>
+
+  Typically, creating a connection object has time and resource overheads. Therefore, creating
and destroying a connection object for each record can incur unnecessarily high overheads
and can significantly reduce the overall throughput of the system. A better solution is to
use `rdd.foreachPartition` - create a single connection object and send all the records in
a RDD partition using that connection.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
         dstream.foreachRDD(rdd => {
             rdd.foreachPartition(partitionOfRecords => {
                 val connection = createNewConnection()
@@ -943,13 +1152,31 @@ For example (in Scala),
                 connection.close()
             })
         })
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+def sendPartition(iter):
+    connection = createNewConnection()
+    for record in iter:
+        connection.send(record)
+    connection.close()
+    
+dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
+{% endhighlight %}
+</div>
+</div>    
 
-    This amortizes the connection creation overheads over many records.
+  This amortizes the connection creation overheads over many records.
 
 - Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches.
 	One can maintain a static pool of connection objects than can be reused as
     RDDs of multiple batches are pushed to the external system, thus further reducing the
overheads.
-
+    
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
         dstream.foreachRDD(rdd => {
             rdd.foreachPartition(partitionOfRecords => {
                 // ConnectionPool is a static, lazily initialized pool of connections
@@ -958,8 +1185,25 @@ For example (in Scala),
                 ConnectionPool.returnConnection(connection)  // return to the pool for future
reuse
             })
         })
+{% endhighlight %}
+</div>
 
-    Note that the connections in the pool should be lazily created on demand and timed out
if not used for a while. This achieves the most efficient sending of data to external systems.
+<div data-lang="python" markdown="1">
+{% highlight python %}
+def sendPartition(iter):
+    # ConnectionPool is a static, lazily initialized pool of connections
+    connection = ConnectionPool.getConnection()
+    for record in iter:
+        connection.send(record)
+    # return to the pool for future reuse
+    ConnectionPool.returnConnection(connection)
+    
+dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
+{% endhighlight %}
+</div>
+</div> 
+
+Note that the connections in the pool should be lazily created on demand and timed out if
not used for a while. This achieves the most efficient sending of data to external systems.
 
 
 ##### Other points to remember:
@@ -1376,6 +1620,44 @@ You can also explicitly create a `JavaStreamingContext` from the checkpoint
data
 the computation by using `new JavaStreamingContext(checkpointDirectory)`.
 
 </div>
+<div data-lang="python" markdown="1">
+
+This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
+
+{% highlight python %}
+# Function to create and setup a new StreamingContext
+def functionToCreateContext():
+    sc = SparkContext(...)   # new context
+    ssc = new StreamingContext(...)  
+    lines = ssc.socketTextStream(...) # create DStreams
+    ...
+    ssc.checkpoint(checkpointDirectory)   # set checkpoint directory
+    return ssc
+
+# Get StreamingContext from checkpoint data or create a new one
+context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
+
+# Do additional setup on context that needs to be done,
+# irrespective of whether it is being started or restarted
+context. ...
+
+# Start the context
+context.start()
+context.awaitTermination()
+{% endhighlight %}
+
+If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint
data.
+If the directory does not exist (i.e., running for the first time),
+then the function `functionToCreateContext` will be called to create a new
+context and set up the DStreams. See the Python example
+[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+This example appends the word counts of network data into a file.
+
+You can also explicitly create a `StreamingContext` from the checkpoint data and start the
+ computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.
+
+</div>
+
 </div>
 
 **Note**: If Spark Streaming and/or the Spark Streaming program is recompiled,
@@ -1572,7 +1854,11 @@ package and renamed for better clarity.
     [TwitterUtils](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html),
     [ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html),
and
     [MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)
+  - Python docs
+    * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
+    * [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
 
 * More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
   and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)
+  and [Python] ({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming)
 * [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0)
describing Spark Streaming.

http://git-wip-us.apache.org/repos/asf/spark/blob/05db2da7/examples/src/main/python/streaming/recoverable_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
new file mode 100644
index 0000000..fc6827c
--- /dev/null
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in text encoded with UTF8 received from the network every second.
+
+ Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory>
<output-file>
+   <hostname> and <port> describe the TCP server that Spark Streaming would connect
to receive
+   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint
data
+   <output-file> file to which the word counts will be appended
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+
+ and then run the example
+    `$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py
\
+        localhost 9999 ~/checkpoint/ ~/out`
+
+ If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will
create
+ a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
+ checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
+ the checkpoint data.
+"""
+
+import os
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+
+def createContext(host, port, outputPath):
+    # If you do not see this printed, that means the StreamingContext has been loaded
+    # from the new checkpoint
+    print "Creating new context"
+    if os.path.exists(outputPath):
+        os.remove(outputPath)
+    sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
+    ssc = StreamingContext(sc, 1)
+
+    # Create a socket stream on target ip:port and count the
+    # words in input stream of \n delimited text (eg. generated by 'nc')
+    lines = ssc.socketTextStream(host, port)
+    words = lines.flatMap(lambda line: line.split(" "))
+    wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
+
+    def echo(time, rdd):
+        counts = "Counts at time %s %s" % (time, rdd.collect())
+        print counts
+        print "Appending to " + os.path.abspath(outputPath)
+        with open(outputPath, 'a') as f:
+            f.write(counts + "\n")
+
+    wordCounts.foreachRDD(echo)
+    return ssc
+
+if __name__ == "__main__":
+    if len(sys.argv) != 5:
+        print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname>
<port> "\
+                             "<checkpoint-directory> <output-file>"
+        exit(-1)
+    host, port, checkpoint, output = sys.argv[1:]
+    ssc = StreamingContext.getOrCreate(checkpoint,
+                                       lambda: createContext(host, int(port), output))
+    ssc.start()
+    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/05db2da7/python/docs/pyspark.streaming.rst
----------------------------------------------------------------------
diff --git a/python/docs/pyspark.streaming.rst b/python/docs/pyspark.streaming.rst
new file mode 100644
index 0000000..5024d69
--- /dev/null
+++ b/python/docs/pyspark.streaming.rst
@@ -0,0 +1,10 @@
+pyspark.streaming module
+==================
+
+Module contents
+---------------
+
+.. automodule:: pyspark.streaming
+    :members:
+    :undoc-members:
+    :show-inheritance:

http://git-wip-us.apache.org/repos/asf/spark/blob/05db2da7/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 5ae5cf0..0826ddc 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -441,9 +441,11 @@ class DStream(object):
 
         if `invReduceFunc` is not None, the reduction is done incrementally
         using the old window's reduced value :
-         1. reduce the new values that entered the window (e.g., adding new counts)
-         2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
-         This is more efficient than `invReduceFunc` is None.
+
+        1. reduce the new values that entered the window (e.g., adding new counts)
+
+        2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+        This is more efficient than `invReduceFunc` is None.
 
         @param reduceFunc:     associative reduce function
         @param invReduceFunc:  inverse reduce function of `reduceFunc`


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message