spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-5224] [PySpark] improve performance of parallelize list/ndarray
Date Thu, 15 Jan 2015 19:41:12 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 3813547a1 -> b3fe6df67


[SPARK-5224] [PySpark] improve performance of parallelize list/ndarray

After the default batchSize changed to 0 (batched based on the size of object), but parallelize()
still use BatchedSerializer with batchSize=1, this PR will use batchSize=1024 for parallelize
by default.

Also, BatchedSerializer did not work well with list and numpy.ndarray, this improve BatchedSerializer
by using __len__ and __getslice__.

Here is the benchmark for parallelize 1 millions int with list or ndarray:

    |          before     |   after  | improvements
 ------- | ------------ | ------------- | -------
list |   11.7 s  | 0.8 s |  14x
numpy.ndarray     |  32 s  |   0.7 s | 40x

Author: Davies Liu <davies@databricks.com>

Closes #4024 from davies/opt_numpy and squashes the following commits:

7618c7c [Davies Liu] improve performance of parallelize list/ndarray

(cherry picked from commit 3c8650c12ad7a97852e7bd76153210493fd83e92)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3fe6df6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3fe6df6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3fe6df6

Branch: refs/heads/branch-1.2
Commit: b3fe6df67fe4c2f71d8424a50aac7e56f9032606
Parents: 3813547
Author: Davies Liu <davies@databricks.com>
Authored: Thu Jan 15 11:40:41 2015 -0800
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Thu Jan 15 11:41:05 2015 -0800

----------------------------------------------------------------------
 python/pyspark/context.py     | 2 +-
 python/pyspark/serializers.py | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b3fe6df6/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index ed7351d..3935413 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -319,7 +319,7 @@ class SparkContext(object):
         # Make sure we distribute data evenly if it's smaller than self.batchSize
         if "__len__" not in dir(c):
             c = list(c)    # Make it a list so we can compute its length
-        batchSize = max(1, min(len(c) // numSlices, self._batchSize))
+        batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
         serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
         serializer.dump_stream(c, tempFile)
         tempFile.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/b3fe6df6/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index bd08c9a..b8bda83 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -181,6 +181,10 @@ class BatchedSerializer(Serializer):
     def _batched(self, iterator):
         if self.batchSize == self.UNLIMITED_BATCH_SIZE:
             yield list(iterator)
+        elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
+            n = len(iterator)
+            for i in xrange(0, n, self.batchSize):
+                yield iterator[i: i + self.batchSize]
         else:
             items = []
             count = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message