spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-19872] [PYTHON] Use the correct deserializer for RDD construction for coalesce/repartition
Date Wed, 15 Mar 2017 17:12:54 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 80ebca62c -> 062254635


[SPARK-19872] [PYTHON] Use the correct deserializer for RDD construction for coalesce/repartition

## What changes were proposed in this pull request?

This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction
for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer`
as is not `BatchedSerializer` from the copied one.

with the file, `text.txt` below:

```
a
b

d
e
f
g
h
i
j
k
l

```

- Before

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
UTF8Deserializer(True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/rdd.py", line 811, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File ".../spark/python/pyspark/serializers.py", line 549, in load_stream
    yield self.loads(stream)
  File ".../spark/python/pyspark/serializers.py", line 544, in loads
    return s.decode("utf-8") if self.use_unicode else s
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py",
line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
```

- After

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
[u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u'']
```

## How was this patch tested?

Unit test in `python/pyspark/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17282 from HyukjinKwon/SPARK-19872.

(cherry picked from commit 7387126f83dc0489eb1df734bfeba705709b7861)
Signed-off-by: Davies Liu <davies.liu@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06225463
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06225463
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06225463

Branch: refs/heads/branch-2.1
Commit: 062254635a98da0b08f69dc7e8907079cfdce035
Parents: 80ebca6
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Wed Mar 15 10:17:18 2017 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Wed Mar 15 10:17:29 2017 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py   | 4 +++-
 python/pyspark/tests.py | 6 ++++++
 2 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06225463/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b384b2b..ccef30c 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2071,10 +2071,12 @@ class RDD(object):
             batchSize = min(10, self.ctx._batchSize or 1024)
             ser = BatchedSerializer(PickleSerializer(), batchSize)
             selfCopy = self._reserialize(ser)
+            jrdd_deserializer = selfCopy._jrdd_deserializer
             jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
         else:
+            jrdd_deserializer = self._jrdd_deserializer
             jrdd = self._jrdd.coalesce(numPartitions, shuffle)
-        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+        return RDD(jrdd, self.ctx, jrdd_deserializer)
 
     def zip(self, other):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/06225463/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 1df91ad..8d227ea 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -972,6 +972,12 @@ class RDDTests(ReusedPySparkTestCase):
         zeros = len([x for x in l if x == 0])
         self.assertTrue(zeros == 0)
 
+    def test_repartition_on_textfile(self):
+        path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
+        rdd = self.sc.textFile(path)
+        result = rdd.repartition(1).collect()
+        self.assertEqual(u"Hello World!", result[0])
+
     def test_distinct(self):
         rdd = self.sc.parallelize((1, 2, 3)*10, 10)
         self.assertEqual(rdd.getNumPartitions(), 10)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message