spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-4759] Fix driver hanging from coalescing partitions
Date Wed, 10 Dec 2014 22:28:08 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 273f2c891 -> 396de67fc


[SPARK-4759] Fix driver hanging from coalescing partitions

The driver hangs sometimes when we coalesce RDD partitions. See JIRA for more details and
reproduction.

This is because our use of empty string as default preferred location in `CoalescedRDDPartition`
causes the `TaskSetManager` to schedule the corresponding task on host `""` (empty string).
The intended semantics here, however, is that the partition does not have a preferred location,
and the TSM should schedule the corresponding task accordingly.

Author: Andrew Or <andrew@databricks.com>

Closes #3633 from andrewor14/coalesce-preferred-loc and squashes the following commits:

e520d6b [Andrew Or] Oops
3ebf8bd [Andrew Or] A few comments
f370a4e [Andrew Or] Fix tests
2f7dfb6 [Andrew Or] Avoid using empty string as default preferred location

(cherry picked from commit 4f93d0cabe5d1fc7c0fd0a33d992fd85df1fecb4)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/396de67f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/396de67f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/396de67f

Branch: refs/heads/branch-1.1
Commit: 396de67fc648a40a53668ba85061489b61f5c2ae
Parents: 273f2c8
Author: Andrew Or <andrew@databricks.com>
Authored: Wed Dec 10 14:27:53 2014 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Wed Dec 10 14:28:04 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoalescedRDD.scala     | 36 ++++++++++++--------
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  2 +-
 2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/396de67f/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 9fab1d7..b073eba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -35,11 +35,10 @@ import org.apache.spark.util.Utils
  * @param preferredLocation the preferred location for this partition
  */
 private[spark] case class CoalescedRDDPartition(
-                                  index: Int,
-                                  @transient rdd: RDD[_],
-                                  parentsIndices: Array[Int],
-                                  @transient preferredLocation: String = ""
-                                  ) extends Partition {
+    index: Int,
+    @transient rdd: RDD[_],
+    parentsIndices: Array[Int],
+    @transient preferredLocation: Option[String] = None) extends Partition {
   var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
 
   @throws(classOf[IOException])
@@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition(
    * @return locality of this coalesced partition between 0 and 1
    */
   def localFraction: Double = {
-    val loc = parents.count(p =>
-      rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
-
+    val loc = parents.count { p =>
+      val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
+      preferredLocation.exists(parentPreferredLocations.contains)
+    }
     if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
   }
 }
@@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition(
  * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is
all balance
  */
 private[spark] class CoalescedRDD[T: ClassTag](
-                                      @transient var prev: RDD[T],
-                                      maxPartitions: Int,
-                                      balanceSlack: Double = 0.10)
+    @transient var prev: RDD[T],
+    maxPartitions: Int,
+    balanceSlack: Double = 0.10)
   extends RDD[T](prev.context, Nil) {  // Nil since we implement getDependencies
 
   override def getPartitions: Array[Partition] = {
@@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
    * @return the machine most preferred by split
    */
   override def getPreferredLocations(partition: Partition): Seq[String] = {
-    List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+    partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
   }
 }
 
@@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
  *
  */
 
-private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
{
+private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
{
 
   def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
   def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
@@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
balanc
   }
 }
 
-private[spark] case class PartitionGroup(prefLoc: String = "") {
+private case class PartitionGroup(prefLoc: Option[String] = None) {
   var arr = mutable.ArrayBuffer[Partition]()
-
   def size = arr.size
 }
+
+private object PartitionGroup {
+  def apply(prefLoc: String): PartitionGroup = {
+    require(prefLoc != "", "Preferred location must not be empty")
+    PartitionGroup(Some(prefLoc))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/396de67f/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 96b1165..6ac4b6a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -296,7 +296,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
   test("coalesced RDDs with locality") {
     val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
     val coal3 = data3.coalesce(3)
-    val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+    val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation)
     assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
 
     // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message