spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: SPARK-1162 Added top in python.
Date Wed, 12 Mar 2014 22:58:00 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-0.9 70491642f -> 51a77e977


SPARK-1162 Added top in python.

Author: Prashant Sharma <prashant.s@imaginea.com>

Closes #93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits:

ece1fa4 [Prashant Sharma] Added top in python.

(cherry picked from commit b8afe3052086547879ebf28d6e36207e0d370710)
Signed-off-by: Matei Zaharia <matei@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51a77e97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51a77e97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51a77e97

Branch: refs/heads/branch-0.9
Commit: 51a77e9779b64e464d07580de12ac3e1fe77e41a
Parents: 7049164
Author: Prashant Sharma <prashant.s@imaginea.com>
Authored: Wed Mar 12 15:57:44 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Wed Mar 12 15:57:54 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 25 +++++++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51a77e97/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 60d4cb2..678b005 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -28,6 +28,7 @@ from subprocess import Popen, PIPE
 from tempfile import NamedTemporaryFile
 from threading import Thread
 import warnings
+from heapq import heappush, heappop, heappushpop
 
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
     BatchedSerializer, CloudPickleSerializer, pack_long
@@ -616,6 +617,30 @@ class RDD(object):
                 m1[k] += v
             return m1
         return self.mapPartitions(countPartition).reduce(mergeMaps)
+    
+    def top(self, num):
+        """
+        Get the top N elements from a RDD.
+
+        Note: It returns the list sorted in ascending order.
+        >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
+        [12]
+        >>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2)
+        [5, 6]
+        """
+        def topIterator(iterator):
+            q = []
+            for k in iterator:
+                if len(q) < num:
+                    heappush(q, k)
+                else:
+                    heappushpop(q, k)
+            yield q
+
+        def merge(a, b):
+            return next(topIterator(a + b))
+
+        return sorted(self.mapPartitions(topIterator).reduce(merge))
 
     def take(self, num):
         """


Mime
View raw message