tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [09/50] [abbrv] tez git commit: TEZ-714. OutputCommitters should not run in the main AM dispatcher thread (zjffdu)
Date Fri, 24 Apr 2015 00:26:03 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
new file mode 100644
index 0000000..0df8a4f
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -0,0 +1,1682 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.CallableEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.DAGImpl.OutputKey;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * 
+ * The test case of commit here are different from that in TestDAGImpl &
+ * TestVertexImpl in that the commits here are running in separated thread. So
+ * should need to pay some special attention.
+ * 
+ * 2 kinds of commit 
+ * <li> test XXX_OnDAGSuccess means TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true
+ * <li> test XXX_OnVertexSuccess means TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false
+ * 
+ */
+public class TestCommit {
+
+  private static final Log LOG = LogFactory.getLog(TestCommit.class);
+  private TezDAGID dagId;
+  private static Configuration conf = new Configuration();
+  private DrainDispatcher dispatcher;
+  private Credentials fsTokens;
+  private AppContext appContext;
+  private ACLManager aclManager;
+  private ApplicationAttemptId appAttemptId;
+  private DAGImpl dag;
+  private TaskEventDispatcher taskEventDispatcher;
+  private VertexEventDispatcher vertexEventDispatcher;
+  private DagEventDispatcher dagEventDispatcher;
+  private TaskAttemptListener taskAttemptListener;
+  private TaskHeartbeatHandler thh;
+  private Clock clock = new SystemClock();
+  private DAGFinishEventHandler dagFinishEventHandler;
+  private HistoryEventHandler historyEventHandler;
+  private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
+
+  private ExecutorService rawExecutor;
+  private ListeningExecutorService execService;
+
+  private class DagEventDispatcher implements EventHandler<DAGEvent> {
+    @Override
+    public void handle(DAGEvent event) {
+      dag.handle(event);
+    }
+  }
+
+  private class VertexEventDispatcher implements EventHandler<VertexEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(VertexEvent event) {
+      Vertex vertex = dag.getVertex(event.getVertexId());
+      ((EventHandler<VertexEvent>) vertex).handle(event);
+    }
+  }
+
+  private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(TaskEvent event) {
+      Vertex vertex = dag.getVertex(event.getTaskID().getVertexID());
+      Task task = vertex.getTask(event.getTaskID());
+      ((EventHandler<TaskEvent>) task).handle(event);
+    }
+  }
+
+  private class TaskAttemptEventDispatcher implements
+      EventHandler<TaskAttemptEvent> {
+    @Override
+    public void handle(TaskAttemptEvent event) {
+      // Ignore
+    }
+  }
+
+  private class DAGFinishEventHandler implements
+      EventHandler<DAGAppMasterEventDAGFinished> {
+    @Override
+    public void handle(DAGAppMasterEventDAGFinished event) {
+    }
+  }
+
+  public static class CountingOutputCommitter extends OutputCommitter {
+
+    public volatile int initCounter = 0;
+    public volatile int setupCounter = 0;
+    public volatile int commitCounter = 0;
+    public volatile int abortCounter = 0;
+    private boolean throwError;
+    private volatile boolean blockCommit;
+
+    public CountingOutputCommitter(OutputCommitterContext context) {
+      super(context);
+      this.throwError = false;
+    }
+
+    @Override
+    public void initialize() throws IOException {
+      if (getContext().getUserPayload() != null
+          && getContext().getUserPayload().hasPayload()) {
+        CountingOutputCommitterConfig conf = new CountingOutputCommitterConfig(
+            getContext().getUserPayload());
+        this.throwError = conf.throwError;
+        this.blockCommit = conf.blockCommit;
+      }
+      ++initCounter;
+    }
+
+    @Override
+    public void setupOutput() throws IOException {
+      ++setupCounter;
+    }
+
+    @Override
+    public void commitOutput() throws IOException {
+      ++commitCounter;
+      while (blockCommit) {
+        try {
+          Thread.sleep(100);
+          LOG.info("committing output:" + getContext().getOutputName());
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+      }
+      if (throwError) {
+        throw new RuntimeException("I can throwz exceptions in commit");
+      }
+    }
+
+    public void unblockCommit() {
+      blockCommit = false;
+    }
+
+    @Override
+    public void abortOutput(VertexStatus.State finalState) throws IOException {
+      ++abortCounter;
+    }
+
+    public static class CountingOutputCommitterConfig implements Writable {
+
+      boolean throwError = false;
+      boolean blockCommit = false;
+
+      public CountingOutputCommitterConfig() {
+      }
+
+      public CountingOutputCommitterConfig(boolean throwError,
+          boolean blockCommit) {
+        this.throwError = throwError;
+        this.blockCommit = blockCommit;
+      }
+
+      public CountingOutputCommitterConfig(UserPayload payload)
+          throws IOException {
+        DataInputByteBuffer in = new DataInputByteBuffer();
+        in.reset(payload.getPayload());
+        this.readFields(in);
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        out.writeBoolean(throwError);
+        out.writeBoolean(blockCommit);
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        throwError = in.readBoolean();
+        blockCommit = in.readBoolean();
+      }
+
+      public byte[] toUserPayload() throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutput out = new DataOutputStream(bos);
+        write(out);
+        return bos.toByteArray();
+      }
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void setupDAG(DAGPlan dagPlan) {
+    conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    appAttemptId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(100, 1), 1);
+    dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
+    Assert.assertNotNull(dagId);
+    dispatcher = new DrainDispatcher();
+    fsTokens = new Credentials();
+    appContext = mock(AppContext.class);
+    rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build());
+    execService = MoreExecutors.listeningDecorator(rawExecutor);
+
+    doReturn(execService).when(appContext).getExecService();
+    historyEventHandler = mock(HistoryEventHandler.class);
+    aclManager = new ACLManager("amUser");
+    doReturn(conf).when(appContext).getAMConf();
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(appAttemptId.getApplicationId()).when(appContext)
+        .getApplicationID();
+    doReturn(dagId).when(appContext).getCurrentDAGID();
+    doReturn(historyEventHandler).when(appContext).getHistoryHandler();
+    doReturn(aclManager).when(appContext).getAMACLManager();
+    dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(),
+        taskAttemptListener, fsTokens, clock, "user", thh, appContext);
+    doReturn(dag).when(appContext).getCurrentDAG();
+    doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
+
+    dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
+    taskEventDispatcher = new TaskEventDispatcher();
+    dispatcher.register(TaskEventType.class, taskEventDispatcher);
+    taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
+    vertexEventDispatcher = new VertexEventDispatcher();
+    dispatcher.register(VertexEventType.class, vertexEventDispatcher);
+    dagEventDispatcher = new DagEventDispatcher();
+    dispatcher.register(DAGEventType.class, dagEventDispatcher);
+    dagFinishEventHandler = new DAGFinishEventHandler();
+    dispatcher.register(DAGAppMasterEventType.class, dagFinishEventHandler);
+    dispatcher.init(conf);
+    dispatcher.start();
+  }
+
+  @After
+  public void teardown() {
+    if (dispatcher != null) {
+      dispatcher.await();
+      dispatcher.stop();
+    }
+    if (execService != null) {
+      execService.shutdownNow();
+    }
+  }
+
+  private void waitUntil(DAGImpl dag, DAGState state) {
+    while (dag.getState() != state) {
+      LOG.info("Wait for dag go to state:" + state);
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void waitUntil(VertexImpl vertex, VertexState state) {
+    while (vertex.getState() != state) {
+      LOG.info("Wait for vertex " + vertex.getLogIdentifier() + " go to state:"
+          + state);
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void waitForCommitCompleted(VertexImpl vertex, String outputName) {
+    while (vertex.commitFutures.containsKey(outputName)) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      LOG.info("Wait for vertex commit " + outputName + " to complete");
+    }
+  }
+
+  private void waitForCommitCompleted(DAGImpl vertex, OutputKey outputKey) {
+    while (vertex.commitFutures.containsKey(outputKey)) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      LOG.info("Wait for dag commit " + outputKey + " to complete");
+    }
+  }
+
+  // v1->v3
+  // v2->v3
+  // vertex_group (v1, v2)
+  private DAGPlan createDAGPlan(boolean vertexGroupCommitSucceeded,
+      boolean v3CommitSucceeded) throws Exception {
+    LOG.info("Setting up group dag plan");
+    int dummyTaskCount = 1;
+    Resource dummyTaskResource = Resource.newInstance(1, 1);
+    org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create(
+        "vertex1", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+        dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create(
+        "vertex2", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+        dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create(
+        "vertex3", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+        dummyTaskResource);
+
+    DAG dag = DAG.create("testDag");
+    String groupName1 = "uv12";
+    OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create(
+        CountingOutputCommitter.class.getName()).setUserPayload(
+        UserPayload.create(ByteBuffer
+            .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                !vertexGroupCommitSucceeded, true).toUserPayload())));
+    OutputCommitterDescriptor ocd2 = OutputCommitterDescriptor.create(
+        CountingOutputCommitter.class.getName()).setUserPayload(
+        UserPayload.create(ByteBuffer
+            .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                !v3CommitSucceeded, true).toUserPayload())));
+
+    org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1,
+        v1, v2);
+    OutputDescriptor outDesc = OutputDescriptor.create("output.class");
+    uv12.addDataSink("v12Out", DataSinkDescriptor.create(outDesc, ocd1, null));
+    v3.addDataSink("v3Out", DataSinkDescriptor.create(outDesc, ocd2, null));
+
+    GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("dummy output class"),
+        InputDescriptor.create("dummy input class")), InputDescriptor
+        .create("merge.class"));
+
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    return dag.createDag(conf, null, null, null, true);
+  }
+
+  private DAGPlan createDAGPlan_SingleVertexWith2Committer(
+      boolean commit1Succeed, boolean commit2Succeed) throws IOException {
+    return createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false);
+  }
+
+  // used for route event error in VM
+  private DAGPlan createDAGPlan_SingleVertexWith2Committer
+    (boolean commit1Succeed, boolean commit2Succeed, boolean customVM) throws IOException {
+    LOG.info("Setting up group dag plan");
+    int dummyTaskCount = 1;
+    Resource dummyTaskResource = Resource.newInstance(1, 1);
+    org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create(
+        "vertex1", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+        dummyTaskResource);
+    if (customVM) {
+      v1.setVertexManagerPlugin(
+          VertexManagerPluginDescriptor.create(
+              FailOnVMEventReceivedlVertexManager.class.getName()));
+    }
+    OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create(
+        CountingOutputCommitter.class.getName()).setUserPayload(
+        UserPayload.create(ByteBuffer
+            .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                !commit1Succeed, true).toUserPayload())));
+    OutputCommitterDescriptor ocd2 = OutputCommitterDescriptor.create(
+        CountingOutputCommitter.class.getName()).setUserPayload(
+        UserPayload.create(ByteBuffer
+            .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                !commit2Succeed, true).toUserPayload())));
+
+    DAG dag = DAG.create("testDag");
+    dag.addVertex(v1);
+    OutputDescriptor outDesc = OutputDescriptor.create("output.class");
+    v1.addDataSink("v1Out_1", DataSinkDescriptor.create(outDesc, ocd1, null));
+    v1.addDataSink("v1Out_2", DataSinkDescriptor.create(outDesc, ocd2, null));
+    return dag.createDag(conf, null, null, null, true);
+  }
+
+  private void initDAG(DAGImpl dag) {
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+    Assert.assertEquals(DAGState.INITED, dag.getState());
+  }
+
+  @SuppressWarnings("unchecked")
+  private void startDAG(DAGImpl impl) {
+    dispatcher.getEventHandler().handle(
+        new DAGEventStartDag(impl.getID(), null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, impl.getState());
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexSucceedWithoutCommit() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertNull(v1.getTerminationCause());
+    Assert.assertTrue(v1.commitFutures.isEmpty());
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    Assert.assertEquals(0, v1OutputCommitter_1.commitCounter);
+    Assert.assertEquals(0, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    Assert.assertEquals(0, v1OutputCommitter_2.commitCounter);
+    Assert.assertEquals(0, v1OutputCommitter_2.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexCommit_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    v1OutputCommitter_1.unblockCommit();
+    waitForCommitCompleted(v1, "v1Out_1");
+    // still in COMMITTING due to another pending commit
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    v1OutputCommitter_2.unblockCommit();
+    waitUntil(v1, VertexState.SUCCEEDED);
+    Assert.assertNull(v1.getTerminationCause());
+    Assert.assertTrue(v1.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.commitCounter);
+    Assert.assertEquals(0, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.commitCounter);
+    Assert.assertEquals(0, v1OutputCommitter_2.abortCounter);
+  }
+
+  // the first commit fail which cause the second commit abort
+  @Test(timeout = 5000)
+  public void testVertexCommitFail1_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(false, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    v1OutputCommitter_1.unblockCommit();
+    waitUntil(v1, VertexState.FAILED);
+
+    Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
+        v1.getTerminationCause());
+    Assert.assertTrue(v1.commitFutures.isEmpty());
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.commitCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    // can't verify the commitCounter because v1OutputCommitter_2 may not be started
+    Assert.assertEquals(1, v1OutputCommitter_2.abortCounter);
+  }
+
+  // the first commit succeed while the second fails
+  @Test(timeout = 5000)
+  public void testVertexCommitFail2_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(true, false));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    v1OutputCommitter_1.unblockCommit();
+    waitForCommitCompleted(v1, "v1Out_1");
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    v1OutputCommitter_2.unblockCommit();
+    waitUntil(v1, VertexState.FAILED);
+
+    Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
+        v1.getTerminationCause());
+    Assert.assertTrue(v1.commitFutures.isEmpty());
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.commitCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.commitCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexKilledWhileCommitting() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    // kill dag which will trigger the vertex killed event
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.KILLED, v1.getState());
+    Assert.assertTrue(v1.commitFutures.isEmpty());
+    Assert.assertEquals(VertexTerminationCause.DAG_KILL,
+        v1.getTerminationCause());
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert
+        .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v1OutputCommitter_2.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexRescheduleWhileCommitting() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    // reschedule task
+    v1.handle(new VertexEventTaskReschedule(TezTaskID.getInstance(
+        v1.getVertexId(), 2)));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING,
+        v1.getTerminationCause());
+    Assert.assertTrue(v1.commitFutures.isEmpty());
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
+        dag.getTerminationCause());
+
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v1OutputCommitter_2.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexRouteEventErrorWhileCommitting() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    // reschedule task
+    VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex1", ByteBuffer.wrap(new byte[0]));
+    TezEvent tezEvent = new TezEvent(vmEvent,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", 
+            null, null));
+    v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE,
+        v1.getTerminationCause());
+    Assert.assertTrue(v1.commitFutures.isEmpty());
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
+        dag.getTerminationCause());
+
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v1OutputCommitter_2.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexInternalErrorWhileCommiting() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v1.getState());
+    // internal error
+    v1.handle(new VertexEvent(v1.getVertexId(),
+        VertexEventType.V_INTERNAL_ERROR));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.ERROR, v1.getState());
+    Assert.assertEquals(VertexTerminationCause.INTERNAL_ERROR,
+        v1.getTerminationCause());
+    Assert.assertEquals(DAGState.ERROR, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR,
+        dag.getTerminationCause());
+
+    CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_1");
+    CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v1Out_2");
+    Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    // TODO abort it when internal error happens TEZ-2250
+    // Assert.assertEquals(1, v1OutputCommitter_1.abortCounter);
+
+    Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
+    Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    // TODO abort it when internal error happens TEZ-2250
+    // Assert.assertEquals(1, v1OutputCommitter_2.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGCommitSucceeded_OnDAGSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    v12OutputCommitter.unblockCommit();
+    // still in COMMITTING due to another pending commit
+    waitUntil(dag, DAGState.COMMITTING);
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.SUCCEEDED);
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+    Assert.assertNull(dag.getTerminationCause());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(0, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(0, v3OutputCommitter.abortCounter);
+  }
+
+  // first commit(v12Out) succeed and then the second commit(v3Out) fail
+  @Test(timeout = 5000)
+  public void testDAGCommitFail1_OnDAGSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, false));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    v12OutputCommitter.unblockCommit();
+    waitForCommitCompleted(dag, new OutputKey("v12Out", "uv12", true));
+    // still in COMMITTING due to another pending commit
+    Assert.assertEquals(DAGState.COMMITTING, dag.getState());
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
+
+    Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
+        dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // the first commit(v12Out) fail
+  @Test(timeout = 5000)
+  public void testDAGCommitFail2_OnDAGSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(false, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v12OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
+
+    Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
+        dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // commit of v3Out complete first then commit of v12Out complete
+  @Test(timeout = 5000)
+  public void testDAGCommitSucceeded1_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitUntil(v3, VertexState.SUCCEEDED);
+    // dag go to COMMITTING due to the pending vertex group commit of v1,v2
+    waitUntil(dag, DAGState.COMMITTING);
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    v12OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.SUCCEEDED);
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(0, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(0, v3OutputCommitter.abortCounter);
+  }
+
+  // commit of v12Out complete first then commit of v3Out
+  @Test(timeout = 5000)
+  public void testDAGCommitSucceeded2_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    v12OutputCommitter.unblockCommit();
+    // ugly (wait for commit event sent out)
+    Thread.sleep(500);
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.SUCCEEDED);
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(0, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(0, v3OutputCommitter.abortCounter);
+  }
+
+  // commit of vertex group(v1,v2) fail and commit of v3 is not completed
+  @Test(timeout = 5000)
+  public void testDAGCommitFail1_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(false, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    v12OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.FAILED);
+    // v3 is killed due to the commit failure of the vertex group (v1,v2)
+    Assert.assertEquals(VertexState.KILLED, v3.getState());
+    Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE,
+        v3.getTerminationCause());
+    Assert.assertTrue(v3.commitFutures.isEmpty());
+    Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
+        dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // commit of vertex v3 fail and commit of vertex group (v1,v2) is not completed
+  @Test(timeout = 5000)
+  public void testDAGCommitFail2_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(true, false));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(VertexState.FAILED, v3.getState());
+    Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
+        v3.getTerminationCause());
+    Assert.assertTrue(v3.commitFutures.isEmpty());
+    Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
+        dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // vertex group (v1,v2) succeeded first and then commit of vertex v3 fail
+  @Test (timeout = 5000)
+  public void testDAGCommitFail3_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(true, false));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    v12OutputCommitter.unblockCommit();
+
+    waitForCommitCompleted(dag, new OutputKey("v12Out", "uv12", true));
+
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(VertexState.FAILED, v3.getState());
+    Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
+        v3.getTerminationCause());
+    Assert.assertTrue(v3.commitFutures.isEmpty());
+    Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
+        dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // commit of vertex v3 succeeded first and then commit of vertex group(v1,v2) fail
+  @Test(timeout = 5000)
+  public void testDAGCommitFail4_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(false, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitForCommitCompleted(dag, new OutputKey("v3Out", "vertex3", true));
+    waitUntil(v3, VertexState.SUCCEEDED);
+    Assert.assertTrue(v3.commitFutures.isEmpty());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    v12OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
+        dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  @Test (timeout = 5000)
+  public void testDAGInternalErrorWhileCommiting() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR));
+    waitUntil(dag, DAGState.ERROR);
+
+    Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause());
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    // TODO abort it when internal error happens TEZ-2250
+    // Assert.assertEquals(0, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    // TODO abort it when internal error happens TEZ-2250
+    // Assert.assertEquals(0, v3OutputCommitter.abortCounter);
+  }
+
+  // Kill dag while it is in COMMITTING in the case of
+  // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true
+  @Test(timeout = 5000)
+  public void testDAGKilledWhileCommitting1_OnDAGSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+    waitUntil(dag, DAGState.KILLED);
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
+    Assert
+        .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // Kill dag while it is in COMMITTING in the case of
+  // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false
+  @Test(timeout = 5000)
+  public void testDAGKilledWhileCommitting1_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    // dag is still in RUNNING because v3 has not completed
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    // dag go to COMMITTING due to the pending commit of v12Out
+    waitUntil(dag, DAGState.COMMITTING);
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+    waitUntil(dag, DAGState.KILLED);
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert
+        .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // DAG killed while dag is still in RUNNING and vertex is in COMMITTING
+  @Test(timeout = 5000)
+  public void testDAGKilledWhileRunning_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    // dag is still in RUNNING because v3 has not completed
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+    waitUntil(dag, DAGState.KILLED);
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.KILLED, v3.getState());
+    Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause());
+    Assert.assertTrue(v3.commitFutures.isEmpty());
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert
+        .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGCommitVertexRerunWhileCommitting_OnDAGSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+    dag.handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGCommitInternalErrorWhileCommiting_OnDAGSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR));
+    waitUntil(dag, DAGState.ERROR);
+
+    Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause());
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexGroupCommitFinishedEventFail() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlan(true, true));
+    MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext);
+    doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler();
+    mockHistoryEventHandler.failVertexGroupCommitFinishedEvent = true;
+    
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v12OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.FAILED);
+
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE,
+        dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+    Assert.assertEquals(VertexState.KILLED, v3.getState());
+    Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE, v3.getTerminationCause());
+    Assert.assertTrue(v3.commitFutures.isEmpty());
+    
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit may not have started, so can't verify commitCounter
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGCommitStartedEventFail() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, true));
+    MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext);
+    doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler();
+    mockHistoryEventHandler.failDAGCommitStartedEvent = true;
+    
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.FAILED);
+    Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, dag.getTerminationCause());
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit has not started
+    Assert.assertEquals(0, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit has not started
+    Assert.assertEquals(0, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+  }
+
+  // test commit will be canceled no matter it is started or still in the threadpool
+  // ControlledThreadPoolExecutor is used for to not schedule the commits
+  @Test(timeout = 5000)
+  public void testCommitCanceled() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        true);
+    setupDAG(createDAGPlan(true, true));
+    // create customized ThreadPoolExecutor to wait before schedule new task
+    rawExecutor = new ControlledThreadPoolExecutor(1);
+    execService = MoreExecutors.listeningDecorator(rawExecutor);
+    doReturn(execService).when(appContext).getExecService();
+
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+    // need to make vertices to go to SUCCEEDED
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    waitUntil(dag, DAGState.COMMITTING);
+    // mean the commits have been submitted to ThreadPool
+    Assert.assertEquals(2, dag.commitFutures.size());
+
+    dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+    waitUntil(dag, DAGState.KILLED);
+    
+    Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    // mean the commits have been canceled
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+
+    CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out");
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    Assert.assertEquals(1, v12OutputCommitter.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter.setupCounter);
+    // commit is not started because ControlledThreadPoolExecutor wait before schedule tasks
+    Assert.assertEquals(0, v12OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v12OutputCommitter.abortCounter);
+    
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    // commit is not started because ControlledThreadPoolExecutor  wait before schedule tasks
+    Assert.assertEquals(0, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(1, v3OutputCommitter.abortCounter);
+
+  }
+
+  public static class FailOnVMEventReceivedlVertexManager extends ImmediateStartVertexManager {
+
+    public FailOnVMEventReceivedlVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+      super.onVertexManagerEventReceived(vmEvent);
+      throw new RuntimeException("fail vm");
+    }
+
+  }
+
+  private static class MockHistoryEventHandler extends HistoryEventHandler {
+
+    public boolean failVertexGroupCommitFinishedEvent = false;
+    public boolean failDAGCommitStartedEvent = false;
+
+    public MockHistoryEventHandler(AppContext context) {
+      super(context);
+    }
+    
+    @Override
+    public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
+      if (event.getHistoryEvent().getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED
+          && failVertexGroupCommitFinishedEvent) {
+        throw new IOException("fail VertexGroupCommitFinishedEvent");
+      }
+      if (event.getHistoryEvent().getEventType() == HistoryEventType.DAG_COMMIT_STARTED
+          && failDAGCommitStartedEvent) {
+        throw new IOException("fail DAGCommitStartedEvent");
+      }
+    }
+  }
+  
+  private static class ControlledThreadPoolExecutor extends ThreadPoolExecutor {
+
+    public ControlledThreadPoolExecutor(int poolSize) {
+      this(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
+          new LinkedBlockingQueue<Runnable>());
+    }
+
+    public ControlledThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
+
+    public boolean startFlag = false;
+    
+    @Override
+    protected void beforeExecute(Thread t, Runnable r) {
+      while(!startFlag) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      super.beforeExecute(t, r);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 98c8492..7e944ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1364,7 +1364,7 @@ public class TestDAGImpl {
       }
     }
     
-    // error on vertex -> dag error -> successful vertex output not aborted
+    // error on vertex -> dag error
     Vertex errorVertex = mrrDag.getVertex("vertex3");
     dispatcher.getEventHandler().handle(new VertexEvent(
         errorVertex.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
@@ -1383,7 +1383,8 @@ public class TestDAGImpl {
           Assert.assertEquals(1, committer.initCounter);
           Assert.assertEquals(1, committer.setupCounter);
         } else {
-          Assert.assertEquals(0, committer.abortCounter);
+          // abort operation should take no side effort on the successful commit
+          Assert.assertEquals(1, committer.abortCounter);
           Assert.assertEquals(1, committer.commitCounter);
           Assert.assertEquals(1, committer.initCounter);
           Assert.assertEquals(1, committer.setupCounter);          

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5ad320e..891da23 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -3341,8 +3341,7 @@ public class TestVertexImpl {
 
     Assert.assertEquals(1, committer.commitCounter);
 
-    // FIXME need to verify whether abort needs to be called if commit fails
-    Assert.assertEquals(0, committer.abortCounter);
+    Assert.assertEquals(1, committer.abortCounter);
     Assert.assertEquals(1, committer.initCounter);
     Assert.assertEquals(1, committer.setupCounter);
   }
@@ -3386,8 +3385,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v.getTerminationCause());
     Assert.assertEquals(1, committer.commitCounter);
 
-    // FIXME need to verify whether abort needs to be called if commit fails
-    Assert.assertEquals(0, committer.abortCounter);
+    Assert.assertEquals(1, committer.abortCounter);
     Assert.assertEquals(1, committer.initCounter);
     Assert.assertEquals(1, committer.setupCounter);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 185f4a4..d1bc46c 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -82,6 +82,8 @@ public class ExampleDriver {
           "Filters lines by the specified word using broadcast edge");
       pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class,
           "Filters lines by the specified word using OneToOne edge");
+      pgd.addClass("multiplecommitsExample", MultipleCommitsExample.class,
+          "Job with multiple commits in both vertex group and vertex");
       exitCode = pgd.run(argv);
     }
     catch(Throwable e){


Mime
View raw message