spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs
Date Sun, 17 Sep 2017 17:46:53 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 51e5a821d -> 42852bb17


[SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

## What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121

In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__`
already) so that we can check that they are the same size. Normally they already are lists
so this should not have a performance impact, but this is needed when repeated `zip`'s are
done.

## How was this patch tested?

Additional unit test

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #19226 from aray/SPARK-21985.

(cherry picked from commit 6adf67dd14b0ece342bb91adf800df0a7101e038)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42852bb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42852bb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42852bb1

Branch: refs/heads/branch-2.2
Commit: 42852bb17121fb8067a4aea3e56d56f76a2e0d1d
Parents: 51e5a82
Author: Andrew Ray <ray.andrew@gmail.com>
Authored: Mon Sep 18 02:46:27 2017 +0900
Committer: hyukjinkwon <gurwls223@gmail.com>
Committed: Mon Sep 18 02:46:47 2017 +0900

----------------------------------------------------------------------
 python/pyspark/serializers.py |  6 +++++-
 python/pyspark/tests.py       | 12 ++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42852bb1/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index ea5e00e..9bd4e55 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -97,7 +97,7 @@ class Serializer(object):
 
     def _load_stream_without_unbatching(self, stream):
         """
-        Return an iterator of deserialized batches (lists) of objects from the input stream.
+        Return an iterator of deserialized batches (iterable) of objects from the input stream.
         if the serializer does not operate on batches the default implementation returns
an
         iterator of single element lists.
         """
@@ -326,6 +326,10 @@ class PairDeserializer(Serializer):
         key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
         val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
         for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+            # For double-zipped RDDs, the batches can be iterators from other PairDeserializer,
+            # instead of lists. We need to convert them to lists if needed.
+            key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch)
+            val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch)
             if len(key_batch) != len(val_batch):
                 raise ValueError("Can not deserialize PairRDD with different number of items"
                                  " in batches: (%d, %d)" % (len(key_batch), len(val_batch)))

http://git-wip-us.apache.org/repos/asf/spark/blob/42852bb1/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 20a933e..9f47798 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -644,6 +644,18 @@ class RDDTests(ReusedPySparkTestCase):
             set([(x, (y, y)) for x in range(10) for y in range(10)])
         )
 
+    def test_zip_chaining(self):
+        # Tests for SPARK-21985
+        rdd = self.sc.parallelize('abc', 2)
+        self.assertSetEqual(
+            set(rdd.zip(rdd).zip(rdd).collect()),
+            set([((x, x), x) for x in 'abc'])
+        )
+        self.assertSetEqual(
+            set(rdd.zip(rdd.zip(rdd)).collect()),
+            set([(x, (x, x)) for x in 'abc'])
+        )
+
     def test_deleting_input_files(self):
         # Regression test for SPARK-1025
         tempFile = tempfile.NamedTemporaryFile(delete=False)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message