spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-6886] [PySpark] fix big closure with shuffle
Date Wed, 15 Apr 2015 19:58:44 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ec0e817ee -> 1e4b065b7


[SPARK-6886] [PySpark] fix big closure with shuffle

Currently, the created broadcast object will have same life cycle as RDD in Python. For multistage
jobs, an PythonRDD will be created in JVM and the RDD in Python may be GCed, then the broadcast
will be destroyed in JVM before the PythonRDD.

This PR change to use PythonRDD to track the lifecycle of the broadcast object. It also have
a refactor about getNumPartitions() to avoid unnecessary creation of PythonRDD, which could
be heavy.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #5496 from davies/big_closure and squashes the following commits:

9a0ea4c [Davies Liu] fix big closure with shuffle

(cherry picked from commit f11288d5272bc18585b8cad4ee3bd59eade7c296)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e4b065b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e4b065b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e4b065b

Branch: refs/heads/branch-1.3
Commit: 1e4b065b72bc90db6a9135d1262d277c5dcaa081
Parents: ec0e817
Author: Davies Liu <davies@databricks.com>
Authored: Wed Apr 15 12:58:02 2015 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Wed Apr 15 12:58:22 2015 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py   | 15 +++++----------
 python/pyspark/tests.py |  6 ++----
 2 files changed, 7 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e4b065b/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index eb8c6b4..d80366a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1191,7 +1191,7 @@ class RDD(object):
         [91, 92, 93]
         """
         items = []
-        totalParts = self._jrdd.partitions().size()
+        totalParts = self.getNumPartitions()
         partsScanned = 0
 
         while len(items) < num and partsScanned < totalParts:
@@ -1254,7 +1254,7 @@ class RDD(object):
         >>> sc.parallelize([1]).isEmpty()
         False
         """
-        return self._jrdd.partitions().size() == 0 or len(self.take(1)) == 0
+        return self.getNumPartitions() == 0 or len(self.take(1)) == 0
 
     def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
         """
@@ -2205,11 +2205,9 @@ def _prepare_for_python_RDD(sc, command, obj=None):
     ser = CloudPickleSerializer()
     pickled_command = ser.dumps(command)
     if len(pickled_command) > (1 << 20):  # 1M
+        # The broadcast will have same life cycle as created PythonRDD
         broadcast = sc.broadcast(pickled_command)
         pickled_command = ser.dumps(broadcast)
-        # tracking the life cycle by obj
-        if obj is not None:
-            obj._broadcast = broadcast
     broadcast_vars = ListConverter().convert(
         [x._jbroadcast for x in sc._pickled_broadcast_vars],
         sc._gateway._gateway_client)
@@ -2264,12 +2262,9 @@ class PipelinedRDD(RDD):
         self._jrdd_deserializer = self.ctx.serializer
         self._bypass_serializer = False
         self.partitioner = prev.partitioner if self.preservesPartitioning else None
-        self._broadcast = None
 
-    def __del__(self):
-        if self._broadcast:
-            self._broadcast.unpersist()
-            self._broadcast = None
+    def getNumPartitions(self):
+        return self._prev_jrdd.partitions().size()
 
     @property
     def _jrdd(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/1e4b065b/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index c10f857..e1ea578 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -521,10 +521,8 @@ class RDDTests(ReusedPySparkTestCase):
         data = [float(i) for i in xrange(N)]
         rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
         self.assertEquals(N, rdd.first())
-        self.assertTrue(rdd._broadcast is not None)
-        rdd = self.sc.parallelize(range(1), 1).map(lambda x: 1)
-        self.assertEqual(1, rdd.first())
-        self.assertTrue(rdd._broadcast is None)
+        # regression test for SPARK-6886
+        self.assertEqual(1, rdd.map(lambda x: (x, 1)).groupByKey().count())
 
     def test_zip_with_different_serializers(self):
         a = self.sc.parallelize(range(5))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message