spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject spark git commit: [SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps
Date Sun, 16 Aug 2015 07:35:03 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5f9ce738f -> cf016075a


[SPARK-10008] Ensure shuffle locality doesn't take precedence over narrow deps

The shuffle locality patch made the DAGScheduler aware of shuffle data,
but for RDDs that have both narrow and shuffle dependencies, it can
cause them to place tasks based on the shuffle dependency instead of the
narrow one. This case is common in iterative join-based algorithms like
PageRank and ALS, where one RDD is hash-partitioned and one isn't.

Author: Matei Zaharia <matei@databricks.com>

Closes #8220 from mateiz/shuffle-loc-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf016075
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf016075
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf016075

Branch: refs/heads/master
Commit: cf016075a006034c24c5b758edb279f3e151d25d
Parents: 5f9ce73
Author: Matei Zaharia <matei@databricks.com>
Authored: Sun Aug 16 00:34:58 2015 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Sun Aug 16 00:34:58 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 37 +++++++++++---------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 26 ++++++++++++--
 2 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cf016075/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1c63d0..dadf83a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1383,33 +1383,36 @@ class DAGScheduler(
       return rddPrefs.map(TaskLocation(_))
     }
 
+    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
+    // that has any placement preferences. Ideally we would choose based on transfer sizes,
+    // but this will do for now.
     rdd.dependencies.foreach {
       case n: NarrowDependency[_] =>
-        // If the RDD has narrow dependencies, pick the first partition of the first narrow
dep
-        // that has any placement preferences. Ideally we would choose based on transfer
sizes,
-        // but this will do for now.
         for (inPart <- n.getParents(partition)) {
           val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
           if (locs != Nil) {
             return locs
           }
         }
-      case s: ShuffleDependency[_, _, _] =>
-        // For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
-        // of data as preferred locations
-        if (shuffleLocalityEnabled &&
-            rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
-            s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
-          // Get the preferred map output locations for this reducer
-          val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
-            partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
-          if (topLocsForReducer.nonEmpty) {
-            return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
-          }
-        }
-
       case _ =>
     }
+
+    // If the RDD has shuffle dependencies and shuffle locality is enabled, pick locations
that
+    // have at least REDUCER_PREF_LOCS_FRACTION of data as preferred locations
+    if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD)
{
+      rdd.dependencies.foreach {
+        case s: ShuffleDependency[_, _, _] =>
+          if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {
+            // Get the preferred map output locations for this reducer
+            val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+              partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)
+            if (topLocsForReducer.nonEmpty) {
+              return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
+            }
+          }
+        case _ =>
+      }
+    }
     Nil
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cf016075/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b0ca49c..a063596 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -926,7 +926,7 @@ class DAGSchedulerSuite
     assertLocations(reduceTaskSet, Seq(Seq("hostA")))
     complete(reduceTaskSet, Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("reduce task locality preferences should only include machines with largest map outputs")
{
@@ -950,7 +950,29 @@ class DAGSchedulerSuite
     assertLocations(reduceTaskSet, Seq(hosts))
     complete(reduceTaskSet, Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
+  }
+
+  test("stages with both narrow and shuffle dependencies use narrow ones for locality") {
+    // Create an RDD that has both a shuffle dependency and a narrow dependency (e.g. for
a join)
+    val rdd1 = new MyRDD(sc, 1, Nil)
+    val rdd2 = new MyRDD(sc, 1, Nil, locations = Seq(Seq("hostB")))
+    val shuffleDep = new ShuffleDependency(rdd1, null)
+    val narrowDep = new OneToOneDependency(rdd2)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep))
+    submit(reduceRdd, Array(0))
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 1))))
+    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
+      HashSet(makeBlockManagerId("hostA")))
+
+    // Reducer should run where RDD 2 has preferences, even though though it also has a shuffle
dep
+    val reduceTaskSet = taskSets(1)
+    assertLocations(reduceTaskSet, Seq(Seq("hostB")))
+    complete(reduceTaskSet, Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty()
   }
 
   test("Spark exceptions should include call site in stack trace") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message