spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
Date Mon, 29 Dec 2014 21:50:37 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 76046664d -> e81c86967


SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions

takeOrdered should skip reduce step in case mapped RDDs have no partitions. This prevents
the mentioned exception :

4. run query
SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 100;
Error trace
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:863)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136)

Author: Yash Datta <Yash.Datta@guavus.com>

Closes #3830 from saucam/fix_takeorder and squashes the following commits:

5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case mappers return no
partitions

(cherry picked from commit 9bc0df6804f241aff24520d9c6ec54d9b11f5785)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e81c8696
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e81c8696
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e81c8696

Branch: refs/heads/branch-1.2
Commit: e81c869677b566dfcabedca89a40aeea7dc16fa9
Parents: 7604666
Author: Yash Datta <Yash.Datta@guavus.com>
Authored: Mon Dec 29 13:49:45 2014 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Mon Dec 29 13:50:34 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e81c8696/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ff6d946..c26425d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1132,15 +1132,20 @@ abstract class RDD[T: ClassTag](
     if (num == 0) {
       Array.empty
     } else {
-      mapPartitions { items =>
+      val mapRDDs = mapPartitions { items =>
         // Priority keeps the largest elements, so let's reverse the ordering.
         val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
         queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
         Iterator.single(queue)
-      }.reduce { (queue1, queue2) =>
-        queue1 ++= queue2
-        queue1
-      }.toArray.sorted(ord)
+      }
+      if (mapRDDs.partitions.size == 0) {
+        Array.empty
+      } else {
+        mapRDDs.reduce { (queue1, queue2) =>
+          queue1 ++= queue2
+          queue1
+        }.toArray.sorted(ord)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message