spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-10122] [PYSPARK] [STREAMING] Fix getOffsetRanges bug in PySpark-Streaming transform function
Date Fri, 21 Aug 2015 20:15:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master 3c462f5d8 -> d89cc38b3


[SPARK-10122] [PYSPARK] [STREAMING] Fix getOffsetRanges bug in PySpark-Streaming transform
function

Details of the bug and explanations can be seen in [SPARK-10122](https://issues.apache.org/jira/browse/SPARK-10122).

tdas , please help to review.

Author: jerryshao <sshao@hortonworks.com>

Closes #8347 from jerryshao/SPARK-10122 and squashes the following commits:

4039b16 [jerryshao] Fix getOffsetRanges in transform() bug


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d89cc38b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d89cc38b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d89cc38b

Branch: refs/heads/master
Commit: d89cc38b33815e7b99fb3389b5038a543527065d
Parents: 3c462f5
Author: jerryshao <sshao@hortonworks.com>
Authored: Fri Aug 21 13:10:11 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Fri Aug 21 13:15:35 2015 -0700

----------------------------------------------------------------------
 python/pyspark/streaming/dstream.py | 5 ++++-
 python/pyspark/streaming/tests.py   | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d89cc38b/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 8dcb964..698336c 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -610,7 +610,10 @@ class TransformedDStream(DStream):
         self.is_checkpointed = False
         self._jdstream_val = None
 
-        if (isinstance(prev, TransformedDStream) and
+        # Using type() to avoid folding the functions and compacting the DStreams which is
not
+        # not strictly a object of TransformedDStream.
+        # Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges().
+        if (type(prev) is TransformedDStream and
                 not prev.is_cached and not prev.is_checkpointed):
             prev_func = prev.func
             self.func = lambda t, rdd: func(t, prev_func(t, rdd))

http://git-wip-us.apache.org/repos/asf/spark/blob/d89cc38b/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 6108c84..214d5be 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -850,7 +850,9 @@ class KafkaStreamTests(PySparkStreamingTestCase):
                 offsetRanges.append(o)
             return rdd
 
-        stream.transform(transformWithOffsetRanges).foreachRDD(lambda rdd: rdd.count())
+        # Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together,
+        # only the TransformedDstreams can be folded together.
+        stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint()
         self.ssc.start()
         self.wait_for(offsetRanges, 1)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message