tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang (rbalamohan)
Date Fri, 03 Apr 2015 00:45:20 GMT
Repository: tez
Updated Branches:
  refs/heads/master aec3f6b66 -> 212de07db


TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/212de07d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/212de07d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/212de07d

Branch: refs/heads/master
Commit: 212de07db49d0a0be923dc685e453f111488b99d
Parents: aec3f6b
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Fri Apr 3 06:14:48 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Fri Apr 3 06:14:48 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 51 ++++++++++++++++----
 2 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/212de07d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f092bd0..e844f60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar)
 
 ALL CHANGES:
+  TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang.
   TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late.
   TEZ-2149. Optimizations for the timed version of DAGClient.getStatus.
   TEZ-2213. For the ordered case, enabling pipelined shuffle should automatically disable
final merge.

http://git-wip-us.apache.org/repos/asf/tez/blob/212de07d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b254c8a..7f124b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1352,7 +1352,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     + parallelism + " for vertex: " + logIdentifier);
     setVertexLocationHint(vertexLocationHint);
     writeLock.lock();
-    
+
     try {
       if (parallelismSet == true) {
         String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier;

@@ -4186,12 +4186,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
       }
     }
-    for (Entry<Vertex, Edge> entry : this.getInputVertices().entrySet()) {
-      InputSpec inputSpec = entry.getValue().getDestinationSpec(taskIndex);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("For vertex : " + this.getLogIdentifier()
-            + ", Using InputSpec : " + inputSpec);
-      }
+    for(Vertex vertex : getInputVertices().keySet()) {
+      /**
+       * It is possible that setParallelism is in the middle of processing in target vertex
with
+       * its write lock. So we need to get inputspec by acquiring read lock in target vertex
to
+       * get consistent view.
+       * Refer TEZ-2251
+       */
+      InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
       // TODO DAGAM This should be based on the edge type.
       inputSpecList.add(inputSpec);
     }
@@ -4204,13 +4206,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
         + this.additionalOutputSpecs.size());
     outputSpecList.addAll(additionalOutputSpecs);
-    for (Entry<Vertex, Edge> entry : this.getOutputVertices().entrySet()) {
-      OutputSpec outputSpec = entry.getValue().getSourceSpec(taskIndex);
+    for(Vertex vertex : targetVertices.keySet()) {
+      /**
+       * It is possible that setParallelism (which could change numTasks) is in the middle
of
+       * processing in target vertex with its write lock. So we need to get outputspec by
+       * acquiring read lock in target vertex to get consistent view.
+       * Refer TEZ-2251
+       */
+      OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
       outputSpecList.add(outputSpec);
     }
     return outputSpecList;
   }
 
+  private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws
+      AMUserCodeException {
+    readLock.lock();
+    try {
+      Edge edge = sourceVertices.get(vertex);
+      Preconditions.checkState(edge != null, getLogIdentifier());
+      return edge.getSourceSpec(taskIndex);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private InputSpec getDestinationSpecFor(VertexImpl vertex, int taskIndex) throws
+      AMUserCodeException {
+    readLock.lock();
+    try {
+      Edge edge = targetVertices.get(vertex);
+      Preconditions.checkState(edge != null, getLogIdentifier());
+      return edge.getDestinationSpec(taskIndex);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+
   //TODO Eventually remove synchronization.
   @Override
   public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {


Mime
View raw message