spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when invFunc is None
Date Thu, 17 Dec 2015 06:10:31 GMT
Repository: spark
Updated Branches:
  refs/heads/master 97678edea -> 437583f69


[SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when invFunc is
None

when invFunc is None, `reduceByKeyAndWindow(func, None, winsize, slidesize)` is equivalent
to

     reduceByKey(func).window(winsize, slidesize).reduceByKey(winsize, slidesize)

and no checkpoint is necessary. The corresponding Scala code does exactly that, but Python
code always creates a windowed stream with obligatory checkpointing. The patch fixes this.

I do not know how to unit-test this.

Author: David Tolpin <david.tolpin@gmail.com>

Closes #9888 from dtolpin/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/437583f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/437583f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/437583f6

Branch: refs/heads/master
Commit: 437583f692e30b8dc03b339a34e92595d7b992ba
Parents: 97678ed
Author: David Tolpin <david.tolpin@gmail.com>
Authored: Wed Dec 16 22:10:24 2015 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Wed Dec 16 22:10:24 2015 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/dstream.py | 45 ++++++++++++++++----------------
 1 file changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/437583f6/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index f61137c..b994a53 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -542,31 +542,32 @@ class DStream(object):
 
         reduced = self.reduceByKey(func, numPartitions)
 
-        def reduceFunc(t, a, b):
-            b = b.reduceByKey(func, numPartitions)
-            r = a.union(b).reduceByKey(func, numPartitions) if a else b
-            if filterFunc:
-                r = r.filter(filterFunc)
-            return r
-
-        def invReduceFunc(t, a, b):
-            b = b.reduceByKey(func, numPartitions)
-            joined = a.leftOuterJoin(b, numPartitions)
-            return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
-                                    if kv[1] is not None else kv[0])
-
-        jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
         if invFunc:
+            def reduceFunc(t, a, b):
+                b = b.reduceByKey(func, numPartitions)
+                r = a.union(b).reduceByKey(func, numPartitions) if a else b
+                if filterFunc:
+                    r = r.filter(filterFunc)
+                return r
+
+            def invReduceFunc(t, a, b):
+                b = b.reduceByKey(func, numPartitions)
+                joined = a.leftOuterJoin(b, numPartitions)
+                return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
+                                        if kv[1] is not None else kv[0])
+
+            jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
             jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer)
+            if slideDuration is None:
+                slideDuration = self._slideDuration
+            dstream = self._sc._jvm.PythonReducedWindowedDStream(
+                reduced._jdstream.dstream(),
+                jreduceFunc, jinvReduceFunc,
+                self._ssc._jduration(windowDuration),
+                self._ssc._jduration(slideDuration))
+            return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
         else:
-            jinvReduceFunc = None
-        if slideDuration is None:
-            slideDuration = self._slideDuration
-        dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
-                                                             jreduceFunc, jinvReduceFunc,
-                                                             self._ssc._jduration(windowDuration),
-                                                             self._ssc._jduration(slideDuration))
-        return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
+            return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions)
 
     def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):
         """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message