tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. (rbalamohan)
Date Mon, 13 Jun 2016 00:02:17 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 8bcf6302a -> f478befcc


TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f478befc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f478befc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f478befc

Branch: refs/heads/branch-0.7
Commit: f478befcc4ac7bae3af0516ce13e3e2144be7afd
Parents: 8bcf630
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Mon Jun 13 05:30:21 2016 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Mon Jun 13 05:30:21 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 118 ++++++++++++-------
 2 files changed, 78 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f478befc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 79d813d..8d70d97 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.7.2 Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+
+  TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
   TEZ-3278. Hide Swimlane from Tez UI
   TEZ-909.  Provide support for application tags 

http://git-wip-us.apache.org/repos/asf/tez/blob/f478befc/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 2df1a3d..0d6bc68 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1564,17 +1564,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
       } finally {
         writeLock.unlock();
       }
-      
-      readLock.lock();
-      try {
-        for (ScheduleTaskRequest task : tasksToSchedule) {
-          TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
-          TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
-          eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
-              getTaskLocationHint(taskId)));
-        }
-      } finally {
-        readLock.unlock();
+
+      /**
+       * read lock is not needed here. For e.g after starting task
+       * scheduling on the vertex, it would not change numTasks. Rest of
+       * the methods creating remote task specs have their
+       * own locking mechanisms. Ref: TEZ-3297
+       */
+      for (ScheduleTaskRequest task : tasksToSchedule) {
+        TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
+        TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
+        eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
+            getTaskLocationHint(taskId)));
       }
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
@@ -4671,17 +4672,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
 
   @Override
   public void setInputVertices(Map<Vertex, Edge> inVertices) {
-    this.sourceVertices = inVertices;
-    for (Vertex vertex : sourceVertices.keySet()) {
-      addIO(vertex.getName());
+    writeLock.lock();
+    try {
+      this.sourceVertices = inVertices;
+      for (Vertex vertex : sourceVertices.keySet()) {
+        addIO(vertex.getName());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
   @Override
   public void setOutputVertices(Map<Vertex, Edge> outVertices) {
-    this.targetVertices = outVertices;
-    for (Vertex vertex : targetVertices.keySet()) {
-      addIO(vertex.getName());
+    writeLock.lock();
+    try {
+      this.targetVertices = outVertices;
+      for (Vertex vertex : targetVertices.keySet()) {
+        addIO(vertex.getName());
+      }
+    } finally {
+      writeLock.unlock();;
     }
   }
 
@@ -4881,9 +4892,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
 
   @Override
   public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException
{
+    // For locking strategy, please refer to getOutputSpecList()
     readLock.lock();
+    List<InputSpec> inputSpecList = null;
     try {
-      List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
+      inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
           + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
       if (rootInputDescriptors != null) {
         for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
@@ -4893,44 +4906,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
                   rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
         }
       }
-      for(Vertex vertex : getInputVertices().keySet()) {
-        /**
-         * It is possible that setParallelism is in the middle of processing in target vertex
with
-         * its write lock. So we need to get inputspec by acquiring read lock in target vertex
to
-         * get consistent view.
-         * Refer TEZ-2251
-         */
-        InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
-        // TODO DAGAM This should be based on the edge type.
-        inputSpecList.add(inputSpec);
-      }
-      return inputSpecList;
     } finally {
       readLock.unlock();
     }
+
+    for(Vertex vertex : getInputVertices().keySet()) {
+      /**
+       * It is possible that setParallelism is in the middle of processing in target vertex
with
+       * its write lock. So we need to get inputspec by acquiring read lock in target vertex
to
+       * get consistent view.
+       * Refer TEZ-2251
+       */
+      InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
+      // TODO DAGAM This should be based on the edge type.
+      inputSpecList.add(inputSpec);
+    }
+    return inputSpecList;
   }
 
   @Override
   public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException
{
+    /**
+     * Ref: TEZ-3297
+     * Locking entire method could introduce a nested lock and
+     * could lead to deadlock in corner cases. Example of deadlock with nested lock here:
+     * 1. In thread#1, Downstream vertex is in the middle of processing setParallelism and
gets
+     * writeLock.
+     * 2. In thread#2, currentVertex acquires read lock
+     * 3. In thread#3, central dispatcher tries to process an event for current vertex,
+     * so tries to acquire write lock.
+     *
+     * In further processing,
+     * 4. In thread#1, it tries to acquire readLock on current vertex for setting edges.
But
+     * this would be blocked as #3 already requested for write lock
+     * 5. In thread#2, getting readLock on downstream vertex would be blocked as writeLock
+     * is held by thread#1.
+     * 6. thread#3 is anyways blocked due to thread#2's read lock on current vertex.
+     */
+
+    List<OutputSpec> outputSpecList = null;
     readLock.lock();
     try {
-      List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
+      outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
           + this.additionalOutputSpecs.size());
       outputSpecList.addAll(additionalOutputSpecs);
-      for(Vertex vertex : targetVertices.keySet()) {
-        /**
-         * It is possible that setParallelism (which could change numTasks) is in the middle
of
-         * processing in target vertex with its write lock. So we need to get outputspec
by
-         * acquiring read lock in target vertex to get consistent view.
-         * Refer TEZ-2251
-         */
-        OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
-        outputSpecList.add(outputSpec);
-      }
-      return outputSpecList;
     } finally {
       readLock.unlock();
     }
+
+    for(Vertex vertex : targetVertices.keySet()) {
+      /**
+       * It is possible that setParallelism (which could change numTasks) is in the middle
of
+       * processing in target vertex with its write lock. So we need to get outputspec by
+       * acquiring read lock in target vertex to get consistent view.
+       * Refer TEZ-2251
+       */
+      OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
+      outputSpecList.add(outputSpec);
+    }
+    return outputSpecList;
   }
 
   private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws


Mime
View raw message