tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [3/4] tez git commit: TEZ-714. OutputCommitters should not run in the main AM dispatcher thread (zjffdu)
Date Thu, 16 Apr 2015 04:40:19 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 82f7a33..b4b8062 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -31,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -94,6 +96,7 @@ import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.RootInputInitializerManager;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
@@ -102,6 +105,7 @@ import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
@@ -117,6 +121,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventCommitCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
@@ -179,6 +184,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
@@ -255,6 +263,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private static final SourceTaskAttemptCompletedEventTransition
       SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new SourceTaskAttemptCompletedEventTransition();
+  private static final CommitCompletedTransition
+      COMMIT_COMPLETED_TRANSITION =
+          new CommitCompletedTransition();
   private static final VertexStateChangedCallback STATE_CHANGED_CALLBACK =
       new VertexStateChangedCallback();
 
@@ -268,6 +279,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   
   LegacySpeculator speculator;
 
+  @VisibleForTesting
+  Map<String, ListenableFuture<Void>> commitFutures = new ConcurrentHashMap<String, ListenableFuture<Void>>();
+
   protected static final
     StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
        stateMachineFactory
@@ -445,6 +459,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition
               (VertexState.RUNNING,
               EnumSet.of(VertexState.RUNNING,
+                  VertexState.COMMITTING,
                   VertexState.SUCCEEDED, VertexState.TERMINATING, VertexState.FAILED,
                   VertexState.ERROR),
               VertexEventType.V_TASK_COMPLETED,
@@ -473,6 +488,38 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_ROUTE_EVENT,
               ROUTE_EVENT_TRANSITION)
 
+          // Transitions from COMMITTING state.
+          .addTransition(
+              VertexState.COMMITTING,
+              EnumSet.of(VertexState.COMMITTING, VertexState.TERMINATING,
+                  VertexState.SUCCEEDED, VertexState.FAILED),
+              VertexEventType.V_COMMIT_COMPLETED,
+              COMMIT_COMPLETED_TRANSITION)
+          .addTransition(
+              VertexState.COMMITTING,
+              VertexState.TERMINATING,
+              VertexEventType.V_TERMINATE,
+              new VertexKilledWhileCommittingTransition()) 
+          .addTransition(
+              VertexState.COMMITTING,
+              VertexState.ERROR,
+              VertexEventType.V_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          .addTransition(
+              VertexState.COMMITTING,
+              EnumSet.of(VertexState.COMMITTING, VertexState.TERMINATING),
+              VertexEventType.V_ROUTE_EVENT,
+              ROUTE_EVENT_TRANSITION)
+          .addTransition(
+              VertexState.COMMITTING,
+              VertexState.TERMINATING,
+              VertexEventType.V_TASK_RESCHEDULED,
+              new TaskRescheduledWhileCommittingTransition())
+          .addTransition(VertexState.COMMITTING,
+              EnumSet.of(VertexState.TERMINATING),
+              VertexEventType.V_MANAGER_USER_CODE_ERROR,
+              new VertexManagerUserCodeErrorTransition())
+   
           // Transitions from TERMINATING state.
           .addTransition
               (VertexState.TERMINATING,
@@ -483,6 +530,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.TERMINATING,
               VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(
+              VertexState.TERMINATING,
+              EnumSet.of(VertexState.TERMINATING, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR),
+              VertexEventType.V_COMMIT_COMPLETED,
+              COMMIT_COMPLETED_TRANSITION)
           // Ignore-able events
           .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               EnumSet.of(VertexEventType.V_TERMINATE,
@@ -694,6 +746,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private AtomicBoolean committed = new AtomicBoolean(false);
   private AtomicBoolean aborted = new AtomicBoolean(false);
+  private AtomicBoolean commitCanceled = new AtomicBoolean(false);
   private boolean commitVertexOutputs = false;
 
   private Map<String, VertexGroupInfo> dagVertexGroups;
@@ -1750,18 +1803,77 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
-  static VertexState checkVertexForCompletion(final VertexImpl vertex) {
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking for vertex completion for "
-          + vertex.logIdentifier
-          + ", numTasks=" + vertex.numTasks
-          + ", failedTaskCount=" + vertex.failedTaskCount
-          + ", killedTaskCount=" + vertex.killedTaskCount
-          + ", successfulTaskCount=" + vertex.succeededTaskCount
-          + ", completedTaskCount=" + vertex.completedTaskCount
-          + ", terminationCause=" + vertex.terminationCause);
+  private static VertexState commitOrFinish(final VertexImpl vertex) {
+    // commit only once. Dont commit shared outputs
+    if (vertex.outputCommitters != null
+        && !vertex.outputCommitters.isEmpty()) {
+      boolean firstCommit = true;
+      for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
+        final OutputCommitter committer = entry.getValue();
+        final String outputName = entry.getKey();
+        if (vertex.sharedOutputs.contains(outputName)) {
+          // dont commit shared committers. Will be committed by the DAG
+          continue;
+        }
+        if (firstCommit) {
+          LOG.info("Invoking committer commit for vertex, vertexId="
+              + vertex.logIdentifier);
+          // Log commit start event on first actual commit
+          try {
+            vertex.appContext.getHistoryHandler().handleCriticalEvent(
+                new DAGHistoryEvent(vertex.getDAGId(),
+                    new VertexCommitStartedEvent(vertex.vertexId,
+                        vertex.clock.getTime())));
+          } catch (IOException e) {
+            LOG.error("Failed to persist commit start event to recovery, vertex="
+                + vertex.logIdentifier, e);
+            vertex.trySetTerminationCause(VertexTerminationCause.RECOVERY_ERROR);
+            return vertex.finished(VertexState.FAILED);
+          }
+        } else {
+          firstCommit = false;
+        }
+        VertexCommitCallback commitCallback = new VertexCommitCallback(vertex, outputName);
+        CallableEvent commitCallableEvent = new CallableEvent(commitCallback) {
+          @Override
+          public Void call() throws Exception {
+            vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                  LOG.info("Invoking committer commit for output=" + outputName
+                      + ", vertexId=" + vertex.logIdentifier);
+                  committer.commitOutput();
+                return null;
+              }
+            });
+            return null;
+          }
+        };
+        ListenableFuture<Void> commitFuture = 
+            vertex.getAppContext().getExecService().submit(commitCallableEvent);
+        Futures.addCallback(commitFuture, commitCallableEvent.getCallback());
+        vertex.commitFutures.put(outputName, commitFuture);
+      }
+    }
+    if (vertex.commitFutures.isEmpty()) {
+      return vertex.finished(VertexState.SUCCEEDED);
+    } else {
+      return VertexState.COMMITTING;
     }
+  }
+
+  // triggered by task_complete
+  static VertexState checkTasksForCompletion(final VertexImpl vertex) {
+
+    LOG.info("Checking tasks for vertex completion for "
+        + vertex.logIdentifier
+        + ", numTasks=" + vertex.numTasks
+        + ", failedTaskCount=" + vertex.failedTaskCount
+        + ", killedTaskCount=" + vertex.killedTaskCount
+        + ", successfulTaskCount=" + vertex.succeededTaskCount
+        + ", completedTaskCount=" + vertex.completedTaskCount
+        + ", commitInProgress=" + vertex.commitFutures.size()
+        + ", terminationCause=" + vertex.terminationCause);
 
     //check for vertex failure first
     if (vertex.completedTaskCount > vertex.tasks.size()) {
@@ -1781,161 +1893,171 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       
       //Only succeed if tasks complete successfully and no terminationCause is registered.
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
-        LOG.info("Vertex succeeded: " + vertex.logIdentifier);
-        try {
-          if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
-            // commit only once. Dont commit shared outputs
-            LOG.info("Invoking committer commit for vertex, vertexId="
-                + vertex.logIdentifier);
-            if (vertex.outputCommitters != null
-                && !vertex.outputCommitters.isEmpty()) {
-              boolean firstCommit = true;
-              for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
-                final OutputCommitter committer = entry.getValue();
-                final String outputName = entry.getKey();
-                if (vertex.sharedOutputs.contains(outputName)) {
-                  // dont commit shared committers. Will be committed by the DAG
-                  continue;
-                }
-                if (firstCommit) {
-                  // Log commit start event on first actual commit
-                  try {
-                    vertex.appContext.getHistoryHandler().handleCriticalEvent(
-                        new DAGHistoryEvent(vertex.getDAGId(),
-                            new VertexCommitStartedEvent(vertex.vertexId,
-                                vertex.clock.getTime())));
-                  } catch (IOException e) {
-                    LOG.error("Failed to persist commit start event to recovery, vertex="
-                        + vertex.logIdentifier, e);
-                    vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
-                    return vertex.finished(VertexState.FAILED);
-                  }
-                } else {
-                  firstCommit = false;
-                }
-                vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
-                  @Override
-                  public Void run() throws Exception {
-                      LOG.info("Invoking committer commit for output=" + outputName
-                          + ", vertexId=" + vertex.logIdentifier);
-                      committer.commitOutput();
-                    return null;
-                  }
-                });
-              }
-            }
-          }
-        } catch (Exception e) {
-          LOG.error("Failed to do commit on vertex, vertexId="
-              + vertex.logIdentifier, e);
-          vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
-          return vertex.finished(VertexState.FAILED);
-        }
-        return vertex.finished(VertexState.SUCCEEDED);
-      }
-      else if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex killed due to user-initiated job kill. "
-            + "failedTasks:"
-            + vertex.failedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.addDiagnostic(diagnosticMsg);
-        vertex.abortVertex(VertexStatus.State.KILLED);
-        return vertex.finished(VertexState.KILLED);
-      }
-      else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex killed as other vertex failed. "
-            + "failedTasks:"
-            + vertex.failedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.addDiagnostic(diagnosticMsg);
-        vertex.abortVertex(VertexStatus.State.KILLED);
-        return vertex.finished(VertexState.KILLED);
-      }
-      else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){
-        if(vertex.failedTaskCount == 0){
-          LOG.error("task failure accounting error.  terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0");
+        LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier);
+        if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
+          // start commit if there're commits or just finish if no commits
+          return commitOrFinish(vertex);
+        } else {
+          // just finish because no vertex committing needed
+          return vertex.finished(VertexState.SUCCEEDED);
         }
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex failed as one or more tasks failed. "
-            + "failedTasks:"
-            + vertex.failedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.addDiagnostic(diagnosticMsg);
-        vertex.abortVertex(VertexStatus.State.FAILED);
-        return vertex.finished(VertexState.FAILED);
-      }
-      else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) {
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex failed/killed due to internal error. "
-            + "failedTasks:"
-            + vertex.failedTaskCount
-            + " killedTasks:"
-            + vertex.killedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.abortVertex(State.FAILED);
-        return vertex.finished(VertexState.FAILED);
-      }
-      else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) {
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. "
-            + "failedTasks:"
-            + vertex.failedTaskCount
-            + " killedTasks:"
-            + vertex.killedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.abortVertex(State.FAILED);
-        return vertex.finished(VertexState.FAILED);
-      }
-      else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) {
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. "
-            + "failedTasks:"
-            + vertex.failedTaskCount
-            + " killedTasks:"
-            + vertex.killedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.abortVertex(State.FAILED);
-        return vertex.finished(VertexState.FAILED);
-      }
-      else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
-            + "failedTasks:"
-            + vertex.failedTaskCount
-            + " killedTasks:"
-            + vertex.killedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.abortVertex(State.FAILED);
-        return vertex.finished(VertexState.FAILED);
-      }
-      else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT) {
-        vertex.setFinishTime();
-        String diagnosticMsg = "Vertex failed/killed due to invalid rerun failed. "
-            + "failedTasks:"
-            + vertex.failedTaskCount
-            + " killedTasks:"
-            + vertex.killedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.abortVertex(State.FAILED);
-        return vertex.finished(VertexState.FAILED);
-      }
-      else {
-        //should never occur
-        throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier
-            + ", failedTaskCount=" + vertex.failedTaskCount
-            + ", killedTaskCount=" + vertex.killedTaskCount
-            + ", successfulTaskCount=" + vertex.succeededTaskCount
-            + ", completedTaskCount=" + vertex.completedTaskCount
-            + ", terminationCause=" + vertex.terminationCause);
       }
+      return finishWithTerminationCause(vertex);
     }
 
     //return the current state, Vertex not finished yet
     return vertex.getInternalState();
   }
 
+  //triggered by commit_complete
+  static VertexState checkCommitsForCompletion(final VertexImpl vertex) {
+    LOG.info("Checking commits for vertex completion for "
+        + vertex.logIdentifier
+        + ", numTasks=" + vertex.numTasks
+        + ", failedTaskCount=" + vertex.failedTaskCount
+        + ", killedTaskCount=" + vertex.killedTaskCount
+        + ", successfulTaskCount=" + vertex.succeededTaskCount
+        + ", completedTaskCount=" + vertex.completedTaskCount
+        + ", commitInProgress=" + vertex.commitFutures.size()
+        + ", terminationCause=" + vertex.terminationCause);
+
+    // terminationCause is null mean commit is succeeded, otherwise terminationCause will be set.
+    if (vertex.terminationCause == null) {
+      Preconditions.checkState(vertex.getState() == VertexState.COMMITTING,
+          "Vertex should be in COMMITTING state, but in " + vertex.getState()
+          + ", vertex:" + vertex.getLogIdentifier());
+      if (vertex.commitFutures.isEmpty()) {
+        // move from COMMITTING to SUCCEEDED
+        return vertex.finished(VertexState.SUCCEEDED);
+      } else {
+        return VertexState.COMMITTING;
+      }
+    } else {
+      if (!vertex.commitFutures.isEmpty()) {
+        // pending commits are running
+        return VertexState.TERMINATING;
+      } else {
+        // all the commits are completed successfully
+        return finishWithTerminationCause(vertex);
+      }
+    }
+  }
+
+  // TODO TEZ-2248
+  private static VertexState finishWithTerminationCause(VertexImpl vertex) {
+    if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex killed due to user-initiated job kill. "
+          + "failedTasks:"
+          + vertex.failedTaskCount;
+      LOG.info(diagnosticMsg);
+      vertex.addDiagnostic(diagnosticMsg);
+      return vertex.finished(VertexState.KILLED);
+    }
+    else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex killed as other vertex failed. "
+          + "failedTasks:"
+          + vertex.failedTaskCount;
+      LOG.info(diagnosticMsg);
+      vertex.addDiagnostic(diagnosticMsg);
+      return vertex.finished(VertexState.KILLED);
+    }
+    else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){
+      if(vertex.failedTaskCount == 0){
+        LOG.error("task failure accounting error.  terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0");
+      }
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed as one or more tasks failed. "
+          + "failedTasks:"
+          + vertex.failedTaskCount;
+      LOG.info(diagnosticMsg);
+      vertex.addDiagnostic(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) {
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed/killed due to internal error. "
+          + "failedTasks:"
+          + vertex.failedTaskCount
+          + " killedTasks:"
+          + vertex.killedTaskCount;
+      LOG.info(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) {
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. "
+          + "failedTasks:"
+          + vertex.failedTaskCount
+          + " killedTasks:"
+          + vertex.killedTaskCount;
+      LOG.info(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) {
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. "
+          + "failedTasks:"
+          + vertex.failedTaskCount
+          + " killedTasks:"
+          + vertex.killedTaskCount;
+      LOG.info(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
+          + "failedTasks:"
+          + vertex.failedTaskCount
+          + " killedTasks:"
+          + vertex.killedTaskCount;
+      LOG.info(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT) {
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed/killed due to vertex-rerun after commit. "
+          + "failedTasks:"
+          + vertex.failedTaskCount
+          + " killedTasks:"
+          + vertex.killedTaskCount;
+      LOG.info(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING) {
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed/killed due to vertex-rerun in commiting. "
+          + "failedTasks:"
+          + vertex.failedTaskCount
+          + " killedTasks:"
+          + vertex.killedTaskCount;
+      LOG.info(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else if (vertex.terminationCause == VertexTerminationCause.RECOVERY_ERROR) {
+      vertex.setFinishTime();
+      String diagnosticMsg = "Vertex failed/killed due to recovery error. "
+          + "failedTasks:"
+          + vertex.failedTaskCount
+          + " killedTasks:"
+          + vertex.killedTaskCount;
+      LOG.info(diagnosticMsg);
+      return vertex.finished(VertexState.FAILED);
+    }
+    else {
+      //should never occur
+      throw new TezUncheckedException("All tasks & commits complete, but cannot determine final state of vertex:"
+          + vertex.logIdentifier
+          + ", failedTaskCount=" + vertex.failedTaskCount
+          + ", killedTaskCount=" + vertex.killedTaskCount
+          + ", successfulTaskCount=" + vertex.succeededTaskCount
+          + ", completedTaskCount=" + vertex.completedTaskCount
+          + ", commitInProgress=" + vertex.commitFutures.size()
+          + ", terminationCause=" + vertex.terminationCause);
+    }
+  }
+
   /**
    * Set the terminationCause and send a kill-message to all tasks.
    * The task-kill messages are only sent once.
@@ -1974,6 +2096,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (!StringUtils.isEmpty(diag)) {
           addDiagnostic(diag);
         }
+        abortVertex(VertexStatus.State.valueOf(finalState.name()));
         eventHandler.handle(new DAGEvent(getDAGId(),
             DAGEventType.INTERNAL_ERROR));
         try {
@@ -1988,6 +2111,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (!StringUtils.isEmpty(diag)) {
           addDiagnostic(diag);
         }
+        abortVertex(VertexStatus.State.valueOf(finalState.name()));
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
             finalState, terminationCause));
         try {
@@ -2074,7 +2198,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       addDiagnostic("Vertex init failed : "
           + ExceptionUtils.getStackTrace(e));
       trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
-      abortVertex(VertexStatus.State.FAILED);
       finished(VertexState.FAILED);
       return false;
     }
@@ -2289,7 +2412,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           + ", numTasks=" + numTasks);
       trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
       if (event != null) {
-        abortVertex(VertexStatus.State.FAILED);
         return finished(VertexState.FAILED);
       } else {
         return VertexState.FAILED;
@@ -3441,13 +3563,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return VertexState.RUNNING;
   }
 
-  private void abortVertex(final VertexStatus.State finalState) {
+  void abortVertex(final VertexStatus.State finalState) {
     if (this.aborted.getAndSet(true)) {
       LOG.info("Ignoring multiple aborts for vertex: " + logIdentifier);
       return;
     }
-    LOG.info("Invoking committer abort for vertex, vertexId=" + logIdentifier);
+
     if (outputCommitters != null) {
+      LOG.info("Invoking committer abort for vertex, vertexId=" + logIdentifier);
       try {
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
@@ -3552,7 +3675,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public void transition(VertexImpl vertex, VertexEvent event) {
       VertexEventTermination vet = (VertexEventTermination) event;
       vertex.trySetTerminationCause(vet.getTerminationCause());
-      vertex.abortVertex(VertexStatus.State.KILLED);
       vertex.addDiagnostic("Vertex received Kill in INITED state.");
       vertex.finished(VertexState.KILLED);
     }
@@ -3592,6 +3714,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class VertexKilledWhileCommittingTransition
+    implements SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+
+
+      VertexEventTermination vet = (VertexEventTermination) event;
+      VertexTerminationCause trigger = vet.getTerminationCause();
+      String msg = "Vertex received Kill while in COMMITTING state, terminationCause="
+          + trigger +", vertex=" + vertex.logIdentifier;
+      LOG.info(msg);
+      vertex.addDiagnostic(msg);
+      vertex.trySetTerminationCause(trigger);
+      vertex.cancelCommits();
+    }
+  }
+
   private static class VertexManagerUserCodeErrorTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
     @Override
@@ -3608,10 +3748,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.terminationCause = VertexTerminationCause.AM_USERCODE_FAILURE;
         vertex.recoveredState = VertexState.FAILED;
         return VertexState.RECOVERING;
-      } else if (vertex.getState() == VertexState.RUNNING) {
+      } else if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) {
         vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
         vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE,
             TaskTerminationCause.AM_USERCODE_FAILURE);
+        vertex.cancelCommits();
         return VertexState.TERMINATING;
       } else {
         vertex.finished(VertexState.FAILED,
@@ -3707,7 +3848,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         taskKilled(vertex, task);
       }
 
-      VertexState state = VertexImpl.checkVertexForCompletion(vertex);
+      VertexState state = VertexImpl.checkTasksForCompletion(vertex);
       if(state == VertexState.RUNNING && forceTransitionToKillWait){
         return VertexState.TERMINATING;
       }
@@ -3752,7 +3893,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      return VertexImpl.checkVertexForCompletion(vertex);
+      return VertexImpl.checkTasksForCompletion(vertex);
     }
   }
 
@@ -3762,26 +3903,38 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       VertexEventTaskCompleted vEvent = (VertexEventTaskCompleted) event;
       VertexState finalState;
-      VertexStatus.State finalStatus;
       String diagnosticMsg;
       if (vEvent.getState() == TaskState.FAILED) {
         finalState = VertexState.FAILED;
-        finalStatus = VertexStatus.State.FAILED;
         diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() +
           " failed after vertex succeeded.";
       } else {
         finalState = VertexState.ERROR;
-        finalStatus = VertexStatus.State.ERROR;
         diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() +
             " completed with state " + vEvent.getState() + " after vertex succeeded.";
       }
       LOG.info(diagnosticMsg);
-      vertex.abortVertex(finalStatus);
       vertex.finished(finalState, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg);
       return finalState;
     }
   }
 
+  private static class TaskRescheduledWhileCommittingTransition implements 
+    SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      // terminate any running tasks
+      String diagnosticMsg = vertex.getLogIdentifier() + " failed due to in-committing rescheduling of "
+          + ((VertexEventTaskReschedule)event).getTaskID();
+      LOG.info(diagnosticMsg);
+      vertex.addDiagnostic(diagnosticMsg);
+      vertex.tryEnactKill(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING,
+          TaskTerminationCause.TASK_RESCHEDULE_IN_COMMITTING);
+      vertex.cancelCommits();
+    }
+  }
+
   private static class TaskRescheduledAfterVertexSuccessTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -3806,12 +3959,47 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       LOG.info(diagnosticMsg);
       vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE,
           TaskTerminationCause.OWN_TASK_FAILURE);
-      vertex.abortVertex(VertexStatus.State.FAILED);
       vertex.finished(VertexState.FAILED, VertexTerminationCause.OWN_TASK_FAILURE, diagnosticMsg);
       return VertexState.FAILED;
     }
   }
 
+  private void commitCompleted(VertexEventCommitCompleted commitCompletedEvent) {
+    Preconditions.checkState(commitFutures.remove(commitCompletedEvent.getOutputName()) != null,
+        "Unknown commit:" + commitCompletedEvent.getOutputName() + ", vertex=" + logIdentifier);
+    if (commitCompletedEvent.isSucceeded()) {
+      LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputName()
+          + ", vertexId=" + logIdentifier);
+    } else {
+      String diag = "Commit failed for output:" + commitCompletedEvent.getOutputName()
+          + ", vertexId=" + logIdentifier + ", "
+          + ExceptionUtils.getStackTrace(commitCompletedEvent.getException());;
+      LOG.info(diag);
+      addDiagnostic(diag);
+      trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
+      cancelCommits();
+    }
+  }
+
+  private static class CommitCompletedTransition implements
+    MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      vertex.commitCompleted((VertexEventCommitCompleted)event);
+      return checkCommitsForCompletion(vertex);
+    }
+  }
+
+  private void cancelCommits() {
+    if (!this.commitCanceled.getAndSet(true)) {
+      for (Map.Entry<String, ListenableFuture<Void>> entry : commitFutures.entrySet()) {
+        LOG.info("Canceling commit of output:" + entry.getKey() + ", vertexId=" + logIdentifier);
+        entry.getValue().cancel(true);
+      }
+    }
+  }
+
   private void addDiagnostic(String diag) {
     diagnostics.add(diag);
   }
@@ -3853,9 +4041,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
         LOG.error(msg, e);
-        if (vertex.getState() == VertexState.RUNNING) {
+        if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) {
           vertex.addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
           vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
+          vertex.cancelCommits();
           return VertexState.TERMINATING;
         } else {
           vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE,
@@ -4073,6 +4262,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           vertex.getDAGId(), "Invalid event " + event.getType()
           + " on Vertex " + vertex.getLogIdentifier()));
       vertex.setFinishTime();
+      vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
+      vertex.cancelCommits();
       vertex.finished(VertexState.ERROR);
     }
   }
@@ -4133,6 +4324,30 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class VertexCommitCallback implements FutureCallback<Void>{
+
+    private String outputName;
+    private VertexImpl vertex;
+
+    public VertexCommitCallback(VertexImpl vertex, String outputName) {
+      this.vertex = vertex;
+      this.outputName = outputName;
+    }
+
+    @Override
+    public void onSuccess(Void result) {
+      vertex.getEventHandler().handle(
+          new VertexEventCommitCompleted(vertex.vertexId, outputName, true, null));
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      vertex.getEventHandler().handle(
+          new VertexEventCommitCompleted(vertex.vertexId, outputName, false, t));
+    }
+
+  }
+
   @Override
   public void setInputVertices(Map<Vertex, Edge> inVertices) {
     this.sourceVertices = inVertices;

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 8be60c5..4d01117 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -28,6 +28,9 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -49,21 +52,26 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
 import org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate;
 import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
@@ -73,6 +81,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.VertexStatistics;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.IOStatistics;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -85,8 +95,10 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
 
 public class TestMockDAGAppMaster {
+  private static final Log LOG = LogFactory.getLog(TestMockDAGAppMaster.class);
   static Configuration defaultConf;
   static FileSystem localFs;
   
@@ -172,7 +184,7 @@ public class TestMockDAGAppMaster {
     Assert.assertEquals(TaskAttemptState.KILLED, killedTa.getState());
     tezClient.stop();
   }
-  
+
   @Test (timeout = 5000)
   public void testBasicEvents() throws Exception {
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
@@ -647,4 +659,224 @@ public class TestMockDAGAppMaster {
       mockApp.waitForServiceToStop(Integer.MAX_VALUE);
     }
   }
+
+
+  private OutputCommitterDescriptor createOutputCommitterDesc(boolean failOnCommit) {
+    OutputCommitterDescriptor outputCommitterDesc =
+        OutputCommitterDescriptor.create(FailingOutputCommitter.class.getName());
+    UserPayload payload = UserPayload.create(
+        ByteBuffer.wrap(new FailingOutputCommitter.FailingOutputCommitterConfig(failOnCommit).toUserPayload()));
+    outputCommitterDesc.setUserPayload(payload);
+    return outputCommitterDesc;
+  }
+
+  private DAG createDAG(String dagName, boolean uv12CommitFail, boolean v3CommitFail) {
+    DAG dag = DAG.create(dagName);
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Proc"), 1);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Proc"), 1);
+    Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Proc"), 1);
+    VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
+    DataSinkDescriptor uv12DataSink = DataSinkDescriptor.create(
+        OutputDescriptor.create("dummy output"), createOutputCommitterDesc(uv12CommitFail), null);
+    uv12.addDataSink("uv12Out", uv12DataSink);
+    DataSinkDescriptor v3DataSink = DataSinkDescriptor.create(
+        OutputDescriptor.create("dummy output"), createOutputCommitterDesc(v3CommitFail), null);
+    v3.addDataSink("v3Out", v3DataSink);
+
+    GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("dummy output class"),
+        InputDescriptor.create("dummy input class")), InputDescriptor
+        .create("merge.class"));
+    dag.addVertex(v1)
+      .addVertex(v2)
+      .addVertex(v3)
+      .addEdge(e1);
+    return dag;
+  }
+
+  @Test (timeout = 60000)
+  public void testCommitOutputOnDAGSuccess() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+    tezClient.start();
+
+    // both committers succeed
+    DAG dag1 = createDAG("testDAGBothCommitsSucceed", false, false);
+    DAGClient dagClient = tezClient.submitDAG(dag1);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    
+    // vertexGroupCommiter fail (uv12)
+    DAG dag2 = createDAG("testDAGVertexGroupCommitFail", true, false);
+    dagClient = tezClient.submitDAG(dag2);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
+    LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
+    Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
+        .contains("fail output committer:uv12Out"));
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v3", null).getState());
+
+    // vertex commit fail (v3)
+    DAG dag3 = createDAG("testDAGVertexCommitFail", false, true);
+    dagClient = tezClient.submitDAG(dag3);
+    dagClient.waitForCompletion();
+    LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
+    Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
+    Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
+        .contains("fail output committer:v3Out"));
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v3", null).getState());
+
+    // both committers fail
+    DAG dag4 = createDAG("testDAGBothCommitsFail", true, true);
+    dagClient = tezClient.submitDAG(dag4);
+    dagClient.waitForCompletion();
+    LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
+    Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
+    String diag = StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",");
+    Assert.assertTrue(diag.contains("fail output committer:uv12Out") ||
+        diag.contains("fail output committer:v3Out"));
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v3", null).getState());
+
+    tezClient.stop();
+  }
+
+  @Test (timeout = 60000)
+  public void testCommitOutputOnVertexSuccess() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    tezconf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+    tezClient.start();
+
+    // both committers succeed
+    DAG dag1 = createDAG("testDAGBothCommitsSucceed", false, false);
+    DAGClient dagClient = tezClient.submitDAG(dag1);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    
+    // vertexGroupCommiter fail (uv12)
+    DAG dag2 = createDAG("testDAGVertexGroupCommitFail", true, false);
+    dagClient = tezClient.submitDAG(dag2);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
+    LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
+    Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
+        .contains("fail output committer:uv12Out"));
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
+    VertexStatus.State v3State = dagClient.getVertexStatus("v3", null).getState();
+    // v3 either succeeded (commit completed before uv12 commit fails)
+    // or killed ( uv12 commit fail when v3 is in running/committing)
+    if (v3State.equals(VertexStatus.State.SUCCEEDED)) {
+      LOG.info("v3 is succeeded");
+    } else {
+      Assert.assertEquals(VertexStatus.State.KILLED, v3State);
+    }
+
+    // vertex commit fail (v3)
+    DAG dag3 = createDAG("testDAGVertexCommitFail", false, true);
+    dagClient = tezClient.submitDAG(dag3);
+    dagClient.waitForCompletion();
+    LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
+    Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
+    Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
+        .contains("Commit failed"));
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
+    Assert.assertEquals(VertexStatus.State.FAILED, dagClient.getVertexStatus("v3", null).getState());
+    Assert.assertTrue(StringUtils.join(dagClient.getVertexStatus("v3", null).getDiagnostics(),",")
+        .contains("fail output committer:v3Out"));
+    
+    // both committers fail
+    DAG dag4 = createDAG("testDAGBothCommitsFail", true, true);
+    dagClient = tezClient.submitDAG(dag4);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
+    LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
+    Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
+    String diag = StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",");
+    Assert.assertTrue(diag.contains("fail output committer:uv12Out") ||
+        diag.contains("fail output committer:v3Out"));
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
+    Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
+    v3State = dagClient.getVertexStatus("v3", null).getState();
+    // v3 either failed (commit of v3 fail before uv12 commit)
+    // or killed ( uv12 commit fail before commit of v3)
+    if (v3State.equals(VertexStatus.State.FAILED)) {
+      LOG.info("v3 is failed");
+      Assert.assertTrue(StringUtils.join(dagClient.getVertexStatus("v3", null).getDiagnostics(),",")
+          .contains("fail output committer:v3Out"));
+    } else {
+      Assert.assertEquals(VertexStatus.State.KILLED, v3State);
+    }
+
+    tezClient.stop();
+  }
+  
+  public static class FailingOutputCommitter extends OutputCommitter {
+
+    boolean failOnCommit = false;
+
+    public FailingOutputCommitter(OutputCommitterContext committerContext) {
+      super(committerContext);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      FailingOutputCommitterConfig config = new
+          FailingOutputCommitterConfig();
+      config.fromUserPayload(
+          getContext().getUserPayload().deepCopyAsArray());
+      failOnCommit = config.failOnCommit;
+    }
+
+    @Override
+    public void setupOutput() throws Exception {
+
+    }
+
+    @Override
+    public void commitOutput() throws Exception {
+      if (failOnCommit) {
+        throw new Exception("fail output committer:" + getContext().getOutputName());
+      }
+    }
+
+    @Override
+    public void abortOutput(State finalState) throws Exception {
+
+    }
+
+    public static class FailingOutputCommitterConfig {
+      boolean failOnCommit;
+
+      public FailingOutputCommitterConfig() {
+        this(false);
+      }
+
+      public FailingOutputCommitterConfig(boolean failOnCommit) {
+        this.failOnCommit = failOnCommit;
+      }
+
+      public byte[] toUserPayload() {
+        return Ints.toByteArray((failOnCommit ? 1 : 0));
+      }
+
+      public void fromUserPayload(byte[] userPayload) {
+        int failInt = Ints.fromByteArray(userPayload);
+        if (failInt == 0) {
+          failOnCommit = false;
+        } else {
+          failOnCommit = true;
+        }
+      }
+    }
+  }
 }


Mime
View raw message