spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: SPARK-3211 .take() is OOM-prone with empty partitions
Date Sat, 06 Sep 2014 01:52:10 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7ff8c45d7 -> ba5bcadde


SPARK-3211 .take() is OOM-prone with empty partitions

Instead of jumping straight from 1 partition to all partitions, do exponential
growth and double the number of partitions to attempt each time instead.

Fix proposed by Paul Nepywoda

Author: Andrew Ash <andrew@andrewash.com>

Closes #2117 from ash211/SPARK-3211 and squashes the following commits:

8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup
e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing
09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well
3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba5bcadd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba5bcadd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba5bcadd

Branch: refs/heads/master
Commit: ba5bcaddecd54811d45c5fc79a013b3857d4c633
Parents: 7ff8c45
Author: Andrew Ash <andrew@andrewash.com>
Authored: Fri Sep 5 18:52:05 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Fri Sep 5 18:52:05 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 +++----
 python/pyspark/rdd.py                              | 8 ++++----
 2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba5bcadd/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index af9e31b..1cf55e8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
       // greater than totalParts because we actually cap it at totalParts in runJob.
       var numPartsToTry = 1
       if (partsScanned > 0) {
-        // If we didn't find any rows after the first iteration, just try all partitions
next.
-        // Otherwise, interpolate the number of partitions we need to try, but overestimate
it
-        // by 50%.
+        // If we didn't find any rows after the previous iteration, quadruple and retry.
 Otherwise,
+        // interpolate the number of partitions we need to try, but overestimate it by 50%.
         if (buf.size == 0) {
-          numPartsToTry = totalParts - 1
+          numPartsToTry = partsScanned * 4
         } else {
           numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba5bcadd/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index dff6fc2..04f1352 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1089,11 +1089,11 @@ class RDD(object):
             # we actually cap it at totalParts in runJob.
             numPartsToTry = 1
             if partsScanned > 0:
-                # If we didn't find any rows after the first iteration, just
-                # try all partitions next. Otherwise, interpolate the number
-                # of partitions we need to try, but overestimate it by 50%.
+                # If we didn't find any rows after the previous iteration,
+                # quadruple and retry.  Otherwise, interpolate the number of
+                # partitions we need to try, but overestimate it by 50%.
                 if len(items) == 0:
-                    numPartsToTry = totalParts - 1
+                    numPartsToTry = partsScanned * 4
                 else:
                     numPartsToTry = int(1.5 * num * partsScanned / len(items))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message