spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject spark git commit: [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map
Date Wed, 18 Feb 2015 01:40:08 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 cb905841b -> 07a401a7b


[SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

This patch addresses a race condition in DAGScheduler by properly synchronizing accesses to
its `cacheLocs` map.

This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, which can be called
by separate threads, since DAGScheduler's `getPreferredLocs()` method is called by SparkContext
and indirectly calls `getCacheLocs()`.  If this map is cleared by the DAGScheduler event processing
thread while a user thread is submitting a job and computing preferred locations, then this
can cause the user thread to throw "NoSuchElementException: key not found" errors.

Most accesses to DAGScheduler's internal state do not need synchronization because that state
is only accessed from the event processing loop's thread.  An alternative approach to fixing
this bug would be to refactor this code so that SparkContext sends the DAGScheduler a message
in order to get the list of preferred locations.  However, this would involve more extensive
changes to this code and would be significantly harder to backport to maintenance branches
since some of the related code has undergone significant refactoring (e.g. the introduction
of EventLoop).  Since `cacheLocs` is the only state that's accessed in this way, adding simple
synchronization seems like a better short-term fix.

See #3345 for additional context.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits:

12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs map.

(cherry picked from commit d46d6246d225ff3af09ebae1a09d4de2430c502d)
Signed-off-by: Patrick Wendell <patrick@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07a401a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07a401a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07a401a7

Branch: refs/heads/branch-1.3
Commit: 07a401a7beea864092ec8f8c451e05cba5a19bbb
Parents: cb90584
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Tue Feb 17 17:39:58 2015 -0800
Committer: Patrick Wendell <patrick@databricks.com>
Committed: Tue Feb 17 17:40:04 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 34 ++++++++++++++------
 1 file changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07a401a7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7903557..9c355d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -98,7 +98,13 @@ class DAGScheduler(
 
   private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
-  // Contains the locations that each RDD's partitions are cached on
+  /**
+   * Contains the locations that each RDD's partitions are cached on.  This map's keys are
RDD ids
+   * and its values are arrays indexed by partition numbers. Each array value is the set
of
+   * locations where that RDD partition is cached.
+   *
+   * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
+   */
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
 
   // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent
with
@@ -183,18 +189,17 @@ class DAGScheduler(
     eventProcessLoop.post(TaskSetFailed(taskSet, reason))
   }
 
-  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
-    if (!cacheLocs.contains(rdd.id)) {
+  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized
{
+    cacheLocs.getOrElseUpdate(rdd.id, {
       val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
       val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
-      cacheLocs(rdd.id) = blockIds.map { id =>
+      blockIds.map { id =>
         locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
       }
-    }
-    cacheLocs(rdd.id)
+    })
   }
 
-  private def clearCacheLocs() {
+  private def clearCacheLocs(): Unit = cacheLocs.synchronized {
     cacheLocs.clear()
   }
 
@@ -1276,17 +1281,26 @@ class DAGScheduler(
   }
 
   /**
-   * Synchronized method that might be called from other threads.
+   * Gets the locality information associated with a partition of a particular RDD.
+   *
+   * This method is thread-safe and is called from both DAGScheduler and SparkContext.
+   *
    * @param rdd whose partitions are to be looked at
    * @param partition to lookup locality information for
    * @return list of machines that are preferred by the partition
    */
   private[spark]
-  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
+  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
     getPreferredLocsInternal(rdd, partition, new HashSet)
   }
 
-  /** Recursive implementation for getPreferredLocs. */
+  /**
+   * Recursive implementation for getPreferredLocs.
+   *
+   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
+   * methods (getCacheLocs()); please be careful when modifying this method, because any
new
+   * DAGScheduler state accessed by it may require additional synchronization.
+   */
   private def getPreferredLocsInternal(
       rdd: RDD[_],
       partition: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message