tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [16/35] tez git commit: TEZ-2269. Fix for DAGAppMaster becomint unresoponsive. Contributed by Rajesh Balamohan and Siddharth Seth.
Date Tue, 07 Apr 2015 20:12:34 GMT
TEZ-2269. Fix for DAGAppMaster becomint unresoponsive. Contributed by Rajesh Balamohan and
Siddharth Seth.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f2d560cb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f2d560cb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f2d560cb

Branch: refs/heads/TEZ-2003
Commit: f2d560cbeb85066d241cedb8023b6f7e61b36d86
Parents: b87a36f
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Apr 7 00:09:24 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Apr 7 00:09:24 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 69 +++++++++++++-------
 2 files changed, 45 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f2d560cb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26a9d75..11b843d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
 
 
 ALL CHANGES:
+  TEZ-2269. DAGAppMaster becomes unresponsive (post TEZ-2149).
   TEZ-2243. documentation should explicitly specify protobuf 2.5.0.
   TEZ-2232. Allow setParallelism to be called multiple times before tasks get
   scheduled

http://git-wip-us.apache.org/repos/asf/tez/blob/f2d560cb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index e685f1b..3b282d6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -33,13 +33,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -144,13 +148,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   // TODO Recovery
   //private final List<AMInfo> amInfos;
+  private final Lock dagStatusLock = new ReentrantLock();
+  private final Condition dagCompletionCondition = dagStatusLock.newCondition();
+  private final AtomicBoolean isFinalState = new AtomicBoolean(false);
   private final Lock readLock;
   private final Lock writeLock;
   private final String dagName;
   private final TaskAttemptListener taskAttemptListener;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
-  private final Condition dagCompleteCondition;
 
   private volatile boolean committedOrAborted = false;
   private volatile boolean allOutputsCommitted = false;
@@ -193,6 +199,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   private TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
 
+  private static final DagStateChangedCallback STATE_CHANGED_CALLBACK = new DagStateChangedCallback();
+
   private static final DiagnosticsUpdateTransition
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition
@@ -366,7 +374,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           // create the topology tables
           .installTopology();
 
-  private final StateMachine<DAGState, DAGEventType, DAGEvent> stateMachine;
+  private final StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl> stateMachine;
 
   //changing fields while the job is running
   @VisibleForTesting
@@ -446,7 +454,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
-    this.dagCompleteCondition = writeLock.newCondition();
 
     this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(jobPlan
         .getLocalResourceList());
@@ -472,10 +479,38 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(dagConf);
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
-    stateMachine = stateMachineFactory.make(this);
+    stateMachine = new StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl>(
+        stateMachineFactory.make(this), this);
+    augmentStateMachine();
     this.entityUpdateTracker = new StateChangeNotifier(this);
   }
 
+  private void augmentStateMachine() {
+    stateMachine
+        .registerStateEnteredCallback(DAGState.SUCCEEDED,
+            STATE_CHANGED_CALLBACK)
+        .registerStateEnteredCallback(DAGState.FAILED,
+            STATE_CHANGED_CALLBACK)
+        .registerStateEnteredCallback(DAGState.KILLED,
+            STATE_CHANGED_CALLBACK)
+        .registerStateEnteredCallback(DAGState.ERROR,
+            STATE_CHANGED_CALLBACK);
+  }
+
+  private static class DagStateChangedCallback
+      implements OnStateChangedCallback<DAGState, DAGImpl> {
+    @Override
+    public void onStateChanged(DAGImpl dag, DAGState dagState) {
+      dag.isFinalState.set(true);
+      dag.dagStatusLock.lock();
+      try {
+        dag.dagCompletionCondition.signal();
+      } finally {
+        dag.dagStatusLock.unlock();
+      }
+    }
+  }
+
   protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
     return stateMachine;
   }
@@ -749,22 +784,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       // Return only on SUCCESS
       timeoutNanos = Long.MAX_VALUE;
     }
-    if (isComplete()) {
+    if (timeoutMillis == 0 || isComplete()) {
       return getDAGStatus(statusOptions);
     }
     while (true) {
       long nanosLeft;
-      writeLock.lock();
+      dagStatusLock.lock();
       try {
         // Check within the lock to ensure we don't end up waiting after the notify has happened
-        if (isComplete()) {
+        if (isFinalState.get()) {
           break;
         }
-        nanosLeft = dagCompleteCondition.awaitNanos(timeoutNanos);
+        nanosLeft = dagCompletionCondition.awaitNanos(timeoutNanos);
       } catch (InterruptedException e) {
         throw new TezException("Interrupted while waiting for dag to complete", e);
       } finally {
-        writeLock.unlock();
+        dagStatusLock.unlock();
       }
       if (nanosLeft <= 0) {
         // Time expired.
@@ -1219,25 +1254,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     LOG.info("DAG: " + getID() + " finished with state: " + finalState);
 
-    // Signal dag completion.
-    // The state will move to the final state after the Transition which invoked this method
completes.
-    // However, it is OK to send the signal from here itself.
-    // This happens within a writeLock. The dagCompletionCondition check attempts to check
for
-    // dagCompletion within the associated lock - so it will block till the full transition
-    // completes and the state updates.
-    notifyDagFinished();
     return finalState;
   }
 
-  private void notifyDagFinished() {
-    writeLock.lock();
-    try {
-      dagCompleteCondition.signal();
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
   @Override
   public String getUserName() {
     return userName;


Mime
View raw message