tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [4/4] tez git commit: TEZ-714. OutputCommitters should not run in the main AM dispatcher thread (zjffdu)
Date Thu, 16 Apr 2015 04:40:20 GMT
TEZ-714. OutputCommitters should not run in the main AM dispatcher thread (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d932579b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d932579b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d932579b

Branch: refs/heads/master
Commit: d932579b002f14b81836eeed75f4bf92d4ed7fbf
Parents: 5de0fb1
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Apr 16 06:39:34 2015 +0200
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Thu Apr 16 06:39:34 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 .../dag/api/client/DAGClientTimelineImpl.java   |    2 +
 .../apache/tez/dag/api/client/DAGStatus.java    |    1 +
 .../apache/tez/dag/api/client/VertexStatus.java |    4 +-
 tez-api/src/main/proto/DAGApiRecords.proto      |    2 +
 .../tez/dag/api/client/DAGStatusBuilder.java    |    2 +
 .../tez/dag/api/client/VertexStatusBuilder.java |    2 +
 .../org/apache/tez/dag/app/dag/DAGState.java    |    3 +-
 .../tez/dag/app/dag/DAGTerminationCause.java    |    2 +
 .../tez/dag/app/dag/TaskTerminationCause.java   |    2 +
 .../org/apache/tez/dag/app/dag/VertexState.java |    1 +
 .../tez/dag/app/dag/VertexTerminationCause.java |    8 +-
 .../app/dag/event/DAGEventCommitCompleted.java  |   48 +
 .../tez/dag/app/dag/event/DAGEventType.java     |    4 +-
 .../dag/event/VertexEventCommitCompleted.java   |   47 +
 .../tez/dag/app/dag/event/VertexEventType.java  |    5 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  757 +++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  559 ++++--
 .../tez/dag/app/TestMockDAGAppMaster.java       |  236 ++-
 .../apache/tez/dag/app/dag/impl/TestCommit.java | 1682 ++++++++++++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |    5 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |    6 +-
 .../tez/mapreduce/examples/ExampleDriver.java   |    2 +
 .../examples/MultipleCommitsExample.java        |  270 +++
 .../java/org/apache/tez/test/TestTezJobs.java   |   78 +
 25 files changed, 3294 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4cd4557..11c7322 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-714. OutputCommitters should not run in the main AM dispatcher thread
   TEZ-2323. Fix TestOrderedWordcount to use MR memory configs.
   TEZ-1482. Fix memory issues for Local Mode running concurrent tasks
   TEZ-2033. Update TestOrderedWordCount to add processor configs as history text

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index 4a5a4e2..cc000df 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -479,6 +479,7 @@ public class DAGClientTimelineImpl extends DAGClient {
         put("KILLED", DAGStatusStateProto.DAG_KILLED);
         put("ERROR", DAGStatusStateProto.DAG_ERROR);
         put("TERMINATING", DAGStatusStateProto.DAG_TERMINATING);
+        put("COMMITTING", DAGStatusStateProto.DAG_COMMITTING);
   }});
 
   private static final Map<String, VertexStatusStateProto> vertexStateProtoMap =
@@ -493,6 +494,7 @@ public class DAGClientTimelineImpl extends DAGClient {
         put("KILLED", VertexStatusStateProto.VERTEX_KILLED);
         put("ERROR", VertexStatusStateProto.VERTEX_ERROR);
         put("TERMINATING", VertexStatusStateProto.VERTEX_TERMINATING);
+        put("COMMITTING", VertexStatusStateProto.VERTEX_COMMITTING);
       }});
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 7e48334..f530858 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -74,6 +74,7 @@ public class DAGStatus {
     case DAG_INITING:
     case DAG_TERMINATING:
     case DAG_RUNNING:
+    case DAG_COMMITTING:
       return DAGStatus.State.RUNNING;
     case DAG_SUCCEEDED:
       return DAGStatus.State.SUCCEEDED;

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index e22d6c0..61f093d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -19,7 +19,6 @@
 package org.apache.tez.dag.api.client;
 
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -43,6 +42,7 @@ public class VertexStatus {
     INITIALIZING,
     INITED,
     RUNNING,
+    COMMITTING,
     SUCCEEDED,
     FAILED,
     KILLED,
@@ -74,6 +74,8 @@ public class VertexStatus {
         return VertexStatus.State.INITED;
       case VERTEX_RUNNING:
         return VertexStatus.State.RUNNING;
+      case VERTEX_COMMITTING:
+        return VertexStatus.State.COMMITTING;
       case VERTEX_SUCCEEDED:
         return VertexStatus.State.SUCCEEDED;
       case VERTEX_FAILED:

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 9ac08b2..959d4e6 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -199,6 +199,7 @@ enum VertexStatusStateProto {
   VERTEX_KILLED = 7;
   VERTEX_ERROR = 8;
   VERTEX_TERMINATING = 9;
+  VERTEX_COMMITTING = 10;
 }
 
 message VertexStatusProto {
@@ -217,6 +218,7 @@ enum DAGStatusStateProto {
   DAG_FAILED = 5;
   DAG_ERROR = 6;
   DAG_TERMINATING = 7;
+  DAG_COMMITTING = 8;
 }
 
 message StringProgressPairProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
index b0a2c63..0002d8b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
@@ -72,6 +72,8 @@ public class DAGStatusBuilder extends DAGStatus {
       return DAGStatusStateProto.DAG_INITING;
     case RUNNING:
       return DAGStatusStateProto.DAG_RUNNING;
+    case COMMITTING:
+      return DAGStatusStateProto.DAG_COMMITTING;
     case SUCCEEDED:
       return DAGStatusStateProto.DAG_SUCCEEDED;
     case FAILED:

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 1ea4e3d..ada3490 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -71,6 +71,8 @@ public class VertexStatusBuilder extends VertexStatus {
         return VertexStatusStateProto.VERTEX_INITED;
       case RUNNING:
         return VertexStatusStateProto.VERTEX_RUNNING;
+      case COMMITTING:
+        return VertexStatusStateProto.VERTEX_COMMITTING;
       case SUCCEEDED:
         return VertexStatusStateProto.VERTEX_SUCCEEDED;
       case FAILED:

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
index b83e9a5..1acc08d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
@@ -25,5 +25,6 @@ public enum DAGState {
   FAILED,
   KILLED,
   ERROR, 
-  TERMINATING
+  TERMINATING,
+  COMMITTING,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index 5ae96a1..d8ba95d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -42,6 +42,8 @@ public enum DAGTerminationCause {
   /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
   VERTEX_RERUN_AFTER_COMMIT,
 
+  VERTEX_RERUN_IN_COMMITTING,
+
   /** DAG failed while trying to write recovery events */
   RECOVERY_FAILURE,
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
index eb0a2af..e2ae8da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
@@ -39,4 +39,6 @@ public enum TaskTerminationCause {
    * throw Exception
    */
   AM_USERCODE_FAILURE,
+  
+  TASK_RESCHEDULE_IN_COMMITTING,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 7130c7a..1b7ac0f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -28,4 +28,5 @@ public enum VertexState {
   ERROR,
   TERMINATING,
   RECOVERING,
+  COMMITTING,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 2eeae3c..ebece97 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -46,11 +46,17 @@ public enum VertexTerminationCause {
   /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
   VERTEX_RERUN_AFTER_COMMIT,
 
+  /** Rerun vertex while it is in committing, it would cause conflict. */
+  VERTEX_RERUN_IN_COMMITTING,
+
   /** This vertex failed as it had invalid number tasks. */
   INVALID_NUM_OF_TASKS, 
 
   /** This vertex failed during init. */
   INIT_FAILURE,
   
-  INTERNAL_ERROR
+  INTERNAL_ERROR,
+  
+  /** error when writing recovery log */ 
+  RECOVERY_ERROR,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCommitCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCommitCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCommitCompleted.java
new file mode 100644
index 0000000..2cfd60b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCommitCompleted.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.impl.DAGImpl.OutputKey;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventCommitCompleted extends DAGEvent{
+
+  private OutputKey outputKey;
+  private boolean isSucceeded;
+  private Throwable failException;
+
+  public DAGEventCommitCompleted(TezDAGID dagId, OutputKey outputKey,
+      boolean isSucceeded, Throwable failException) {
+    super(dagId, DAGEventType.DAG_COMMIT_COMPLETED);
+    this.outputKey = outputKey;
+    this.isSucceeded = isSucceeded;
+    this.failException = failException;
+  }
+
+  public OutputKey getOutputKey() {
+    return outputKey;
+  }
+
+  public boolean isSucceeded() {
+    return this.isSucceeded;
+  }
+
+  public Throwable getException() {
+    return failException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 146ff4d..ea6a3cc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -44,5 +44,7 @@ public enum DAGEventType {
 
   // Event to trigger recovery
   // Producer:AM
-  DAG_RECOVER
+  DAG_RECOVER,
+
+  DAG_COMMIT_COMPLETED,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventCommitCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventCommitCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventCommitCompleted.java
new file mode 100644
index 0000000..77272f2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventCommitCompleted.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventCommitCompleted extends VertexEvent {
+
+  private String outputName;
+  private boolean isSucceeded;
+  private Throwable failException;
+
+  public VertexEventCommitCompleted(TezVertexID vertexId, String outputName,
+      boolean isSucceeded, Throwable failException) {
+    super(vertexId, VertexEventType.V_COMMIT_COMPLETED);
+    this.outputName = outputName;
+    this.isSucceeded = isSucceeded;
+    this.failException = failException;
+  }
+
+  public String getOutputName() {
+    return outputName;
+  }
+
+  public boolean isSucceeded() {
+    return isSucceeded;
+  }
+
+  public Throwable getException() {
+    return failException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index aa202a4..6ea945b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -62,6 +62,9 @@ public enum VertexEventType {
   V_SOURCE_VERTEX_RECOVERED,
   
   // Producer: Edge
-  V_NULL_EDGE_INITIALIZED
+  V_NULL_EDGE_INITIALIZED,
+
+  // Committer
+  V_COMMIT_COMPLETED,
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3b282d6..1c93dc6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -93,8 +94,10 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventCommitCompleted;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
@@ -127,9 +130,13 @@ import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.OutputCommitter;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
@@ -158,8 +165,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
 
-  private volatile boolean committedOrAborted = false;
-  private volatile boolean allOutputsCommitted = false;
+  private AtomicBoolean committed = new AtomicBoolean(false);
+  private AtomicBoolean aborted = new AtomicBoolean(false);
+  private AtomicBoolean commitCanceled = new AtomicBoolean(false);
   boolean commitAllOutputsOnSuccess = true;
 
   @VisibleForTesting
@@ -201,6 +209,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   private static final DagStateChangedCallback STATE_CHANGED_CALLBACK = new DagStateChangedCallback();
 
+  @VisibleForTesting
+  Map<OutputKey, ListenableFuture<Void>> commitFutures
+    = new HashMap<OutputKey, ListenableFuture<Void>>();
+  private Set<OutputKey> succeededCommits = new HashSet<OutputKey>();
+
   private static final DiagnosticsUpdateTransition
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition
@@ -209,6 +222,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       new CounterUpdateTransition();
   private static final DAGSchedulerUpdateTransition
           DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
+  private static final CommitCompletedTransition COMMIT_COMPLETED_TRANSITION =
+      new CommitCompletedTransition();
 
   protected static final
     StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
@@ -259,7 +274,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           // Transitions from RUNNING state
           .addTransition
               (DAGState.RUNNING,
-              EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED),
+              EnumSet.of(DAGState.RUNNING, DAGState.COMMITTING,
+                  DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED),
               DAGEventType.DAG_VERTEX_COMPLETED,
               new VertexCompletedTransition())
           .addTransition(DAGState.RUNNING, EnumSet.of(DAGState.RUNNING, DAGState.TERMINATING),
@@ -279,6 +295,35 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DAGState.RUNNING,
               DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(DAGState.RUNNING,
+              EnumSet.of(DAGState.RUNNING, DAGState.TERMINATING),
+              DAGEventType.DAG_COMMIT_COMPLETED,
+              new CommitCompletedWhileRunning())
+
+          // Transitions from COMMITTING state.
+          .addTransition(DAGState.COMMITTING,
+              EnumSet.of(DAGState.COMMITTING, DAGState.TERMINATING, DAGState.FAILED, DAGState.SUCCEEDED),
+              DAGEventType.DAG_COMMIT_COMPLETED,
+              COMMIT_COMPLETED_TRANSITION)
+          .addTransition(DAGState.COMMITTING, DAGState.TERMINATING, 
+              DAGEventType.DAG_KILL,
+              new DAGKilledWhileCommittingTransition())
+          .addTransition(
+              DAGState.COMMITTING,
+              DAGState.ERROR,
+              DAGEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          .addTransition(DAGState.COMMITTING, DAGState.COMMITTING,
+              DAGEventType.DAG_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(DAGState.COMMITTING, DAGState.COMMITTING,
+              DAGEventType.DAG_SCHEDULER_UPDATE,
+              DAG_SCHEDULER_UPDATE_TRANSITION)
+          .addTransition(DAGState.COMMITTING, DAGState.TERMINATING,
+              DAGEventType.DAG_VERTEX_RERUNNING,
+              new VertexRerunWhileCommitting())
+          .addTransition(DAGState.COMMITTING, DAGState.COMMITTING,
+              DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
 
           // Transitions from TERMINATING state.
           .addTransition
@@ -296,6 +341,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               DAGState.TERMINATING,
               DAGState.ERROR, DAGEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(
+              DAGState.TERMINATING,
+              EnumSet.of(DAGState.TERMINATING, DAGState.FAILED, DAGState.KILLED, DAGState.ERROR),
+              DAGEventType.DAG_COMMIT_COMPLETED,
+              COMMIT_COMPLETED_TRANSITION)
 
               // Ignore-able events
           .addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
@@ -885,144 +935,125 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
-  private boolean commitOutput(String outputName, OutputCommitter outputCommitter) {
+  private void commitOutput(OutputCommitter outputCommitter) throws Exception {
     final OutputCommitter committer = outputCommitter;
-    try {
-      getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          committer.commitOutput();
-          return null;
-        }
-      });
-      return true;
-    } catch (Exception e) {
-      LOG.info("Exception in committing output: " + outputName, e);
-    }
-    return false;
+    getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        committer.commitOutput();
+        return null;
+      }
+    });
   }
 
-  private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
-    if (this.committedOrAborted) {
-      LOG.info("Ignoring multiple output commit/abort");
-      return this.allOutputsCommitted;
+  // either commit when all vertices are completed or just finish if there's no committer
+  private synchronized DAGState commitOrFinish() {
+
+    // commit all other outputs
+    // we come here for successful dag completion and when outputs need to be
+    // committed at the end for all or none visibility
+    Map<OutputKey, CallableEvent> commitEvents = new HashMap<OutputKey, CallableEvent>();
+    // commit all shared outputs
+    for (final VertexGroupInfo groupInfo : vertexGroups.values()) {
+      if (!groupInfo.outputs.isEmpty()) {
+        groupInfo.committed = true;
+        final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+        for (final String outputName : groupInfo.outputs) {
+          final OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true);
+          CommitCallback groupCommitCallback = new CommitCallback(outputKey);
+          CallableEvent groupCommitCallableEvent = new CallableEvent(groupCommitCallback) {
+            @Override
+            public Void call() throws Exception {
+              OutputCommitter committer = v.getOutputCommitters().get(outputName);
+              LOG.info("Committing output: " + outputKey);
+              commitOutput(committer);
+              return null;
+            }
+          };
+          commitEvents.put(outputKey, groupCommitCallableEvent);
+        }
+      }
     }
-    LOG.info("Calling DAG commit/abort for dag: " + getID());
-    this.committedOrAborted = true;
 
-    boolean successfulOutputsAlreadyCommitted = !commitAllOutputsOnSuccess;
-    boolean failedWhileCommitting = false;
-    if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
-      // commit all shared outputs
+    for (final Vertex vertex : vertices.values()) {
+      if (vertex.getOutputCommitters() == null) {
+        LOG.info("No output committers for vertex: " + vertex.getLogIdentifier());
+        continue;
+      }
+      Map<String, OutputCommitter> outputCommitters =
+          new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
+      Set<String> sharedOutputs = vertex.getSharedOutputs();
+      // remove shared outputs
+      if (sharedOutputs != null) {
+        Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters
+            .entrySet().iterator();
+        while (iter.hasNext()) {
+          if (sharedOutputs.contains(iter.next().getKey())) {
+            iter.remove();
+          }
+        }
+      }
+      if (outputCommitters.isEmpty()) {
+        LOG.info("No exclusive output committers for vertex: " + vertex.getLogIdentifier());
+        continue;
+      }
+      for (final Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
+        if (vertex.getState() != VertexState.SUCCEEDED) {
+          throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() +
+              " not in SUCCEEDED state. State= " + vertex.getState());
+        }
+        OutputKey outputKey = new OutputKey(entry.getKey(), vertex.getName(), false);
+        CommitCallback commitCallback = new CommitCallback(outputKey);
+        CallableEvent commitCallableEvent = new CallableEvent(commitCallback) {
+          @Override
+          public Void call() throws Exception {
+            LOG.info("Committing output: " + entry.getKey() + " for vertex: "
+                + vertex.getLogIdentifier() + ", outputName: " + entry.getKey());
+            commitOutput(entry.getValue());
+            return null;
+          }
+        };
+        commitEvents.put(outputKey, commitCallableEvent);
+      }
+    }
+    
+    if (!commitEvents.isEmpty()) {
       try {
+        LOG.info("Start writing dag commit event, " + getID());
         appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
             new DAGCommitStartedEvent(getID(), clock.getTime())));
       } catch (IOException e) {
         LOG.error("Failed to send commit event to history/recovery handler", e);
         trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
-        return false;
+        return finished(DAGState.FAILED);
       }
-      for (VertexGroupInfo groupInfo : vertexGroups.values()) {
-        if (failedWhileCommitting) {
-          break;
-        }
-        if (!groupInfo.outputs.isEmpty()) {
-          groupInfo.committed = true;
-          Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
-          for (String outputName : groupInfo.outputs) {
-            OutputCommitter committer = v.getOutputCommitters().get(outputName);
-            LOG.info("Committing output: " + outputName + " for group: " + groupInfo.groupName);
-            if (!commitOutput(outputName, committer)) {
-              failedWhileCommitting = true;
-              break;
-            }
-          }
-        }
-      }
-      // commit all other outputs
-      // we come here for successful dag completion and when outputs need to be
-      // committed at the end for all or none visibility
-      for (Vertex vertex : vertices.values()) {
-        if (failedWhileCommitting) {
-          break;
-        }
-        if (vertex.getOutputCommitters() == null) {
-          LOG.info("No output committers for vertex: " + vertex.getLogIdentifier());
-          continue;
-        }
-        Map<String, OutputCommitter> outputCommitters =
-            new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
-        Set<String> sharedOutputs = vertex.getSharedOutputs();
-        // remove shared outputs
-        if (sharedOutputs != null) {
-          Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters
-              .entrySet().iterator();
-          while (iter.hasNext()) {
-            if (sharedOutputs.contains(iter.next().getKey())) {
-              iter.remove();
-            }
-          }
-        }
-        if (outputCommitters.isEmpty()) {
-          LOG.info("No exclusive output committers for vertex: " + vertex.getLogIdentifier());
-          continue;
-        }
-        for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
-          LOG.info("Committing output: " + entry.getKey() + " for vertex: "
-              + vertex.getLogIdentifier());
-          if (vertex.getState() != VertexState.SUCCEEDED) {
-            throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() +
-                " not in SUCCEEDED state. State= " + vertex.getState());
-          }
-          if (!commitOutput(entry.getKey(), entry.getValue())) {
-            failedWhileCommitting = true;
-            break;
-          }
-        }
+      for (Map.Entry<OutputKey,CallableEvent> entry : commitEvents.entrySet()) {
+        ListenableFuture<Void> commitFuture = appContext.getExecService().submit(entry.getValue());
+        Futures.addCallback(commitFuture, entry.getValue().getCallback());
+        commitFutures.put(entry.getKey(), commitFuture);
       }
     }
 
-    if (failedWhileCommitting) {
-      LOG.info("DAG: " + getID() + " failed while committing");
+    if (commitFutures.isEmpty()) {
+      // no commit needs to be done
+      return finished(DAGState.SUCCEEDED);
+    } else {
+      return DAGState.COMMITTING;
     }
+  }
 
-    if (!dagSucceeded || failedWhileCommitting) {
-      // come here because dag failed or
-      // dag succeeded and all or none semantics were on and a commit failed
-      for (Vertex vertex : vertices.values()) {
-        Map<String, OutputCommitter> outputCommitters = vertex
-            .getOutputCommitters();
-        if (outputCommitters == null || outputCommitters.isEmpty()) {
-          LOG.info("No output committers for vertex: " + vertex.getLogIdentifier());
-          continue;
-        }
-        for (Map.Entry<String, OutputCommitter> entry : outputCommitters
-            .entrySet()) {
-          final OutputCommitter committer = entry.getValue();
-          if (commitAllOutputsOnSuccess // commit all outputs on success
-              || vertex.getState() != VertexState.SUCCEEDED // never commit unsuccessful outputs
-              ) {
-            LOG.info("Aborting output: " + entry.getKey() + " for vertex: "
-                + vertex.getLogIdentifier());
-            try {
-              getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws Exception {
-                  committer.abortOutput(VertexStatus.State.FAILED);
-                  return null;
-                }
-              });
-            } catch (Exception e) {
-              LOG.info("Exception in aborting output: " + entry.getKey()
-                  + " for vertex: " + vertex.getLogIdentifier(), e);
-            }
-          }
-          // else successful outputs have already been committed
-        }
-      }
+  private void abortOutputs() {
+    if (this.aborted.getAndSet(true)) {
+      LOG.info("Ignoring multiple output abort");
+      return ;
+    }
+    // come here because dag failed or
+    // dag succeeded and all or none semantics were on and a commit failed.
+    // Some output may be aborted multiple times if it is shared output.
+    // It should be OK for it to be aborted multiple times.
+    for (Vertex vertex : vertices.values()) {
+      ((VertexImpl)vertex).abortVertex(VertexStatus.State.FAILED);
     }
-    allOutputsCommitted = !failedWhileCommitting;
-    return allOutputsCommitted;
   }
 
   @Override
@@ -1124,16 +1155,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         new DAGHistoryEvent(dagId, finishEvt));
   }
 
-  static DAGState checkDAGForCompletion(DAGImpl dag) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking dag completion"
-          + ", numCompletedVertices=" + dag.numCompletedVertices
-          + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
-          + ", numFailedVertices=" + dag.numFailedVertices
-          + ", numKilledVertices=" + dag.numKilledVertices
-          + ", numVertices=" + dag.numVertices
-          + ", terminationCause=" + dag.terminationCause);
-    }
+  // triggered by vertex_complete
+  static DAGState checkVerticesForCompletion(DAGImpl dag) {
+    LOG.info("Checking vertices for DAG completion"
+        + ", numCompletedVertices=" + dag.numCompletedVertices
+        + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+        + ", numFailedVertices=" + dag.numFailedVertices
+        + ", numKilledVertices=" + dag.numKilledVertices
+        + ", numVertices=" + dag.numVertices
+        + ", commitInProgress=" + dag.commitFutures.size() 
+        + ", terminationCause=" + dag.terminationCause);
 
     // log in case of accounting error.
     if (dag.numCompletedVertices > dag.numVertices) {
@@ -1144,67 +1175,124 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
 
     if (dag.numCompletedVertices == dag.numVertices) {
-      dag.setFinishTime();
       //Only succeed if vertices complete successfully and no terminationCause is registered.
       if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause == null) {
-        return dag.finished(DAGState.SUCCEEDED);
-      }
-      if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
-        String diagnosticMsg = "DAG killed due to user-initiated kill." +
-            " failedVertices:" + dag.numFailedVertices +
-            " killedVertices:" + dag.numKilledVertices;
-        LOG.info(diagnosticMsg);
-        dag.addDiagnostic(diagnosticMsg);
-        return dag.finished(DAGState.KILLED);
-      }
-      if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
-        String diagnosticMsg = "DAG failed due to vertex failure." +
-            " failedVertices:" + dag.numFailedVertices +
-            " killedVertices:" + dag.numKilledVertices;
-        LOG.info(diagnosticMsg);
-        dag.addDiagnostic(diagnosticMsg);
-        return dag.finished(DAGState.FAILED);
-      }
-      if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){
-        String diagnosticMsg = "DAG failed due to commit failure." +
-            " failedVertices:" + dag.numFailedVertices +
-            " killedVertices:" + dag.numKilledVertices;
-        LOG.info(diagnosticMsg);
-        dag.addDiagnostic(diagnosticMsg);
-        return dag.finished(DAGState.FAILED);
+        if (dag.commitAllOutputsOnSuccess && !dag.committed.getAndSet(true)) {
+          // start dag commit if there's any commit or just finish dag if no commit
+          return dag.commitOrFinish();
+        } else if (!dag.commitFutures.isEmpty()) {
+          // vertex group commits are running
+          return DAGState.COMMITTING;
+        } else {
+          // no vertex group commits or vertex group commits are done
+          return dag.finished(DAGState.SUCCEEDED);
+        }
+      } else {
+        if (dag.commitFutures.isEmpty()) {
+          return finishWithTerminationCause(dag);
+        } else {
+          return DAGState.TERMINATING;
+        }
       }
-      if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
-        String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
-            " failedVertices:" + dag.numFailedVertices +
-            " killedVertices:" + dag.numKilledVertices;
-        LOG.info(diagnosticMsg);
-        dag.addDiagnostic(diagnosticMsg);
-        return dag.finished(DAGState.FAILED);
+    }
+
+    //return the current state, Job not finished yet
+    return dag.getInternalState();
+  }
+
+  // triggered by commit_complete
+  static DAGState checkCommitsForCompletion(DAGImpl dag) {
+    LOG.info("Checking commits for DAG completion"
+        + ", numCompletedVertices=" + dag.numCompletedVertices
+        + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+        + ", numFailedVertices=" + dag.numFailedVertices
+        + ", numKilledVertices=" + dag.numKilledVertices
+        + ", numVertices=" + dag.numVertices
+        + ", commitInProgress=" + dag.commitFutures.size() 
+        + ", terminationCause=" + dag.terminationCause);
+
+    // terminationCause is null means commit is succeeded, otherwise terminationCause will be set.
+    if (dag.terminationCause == null) {
+      Preconditions.checkState(dag.getState() == DAGState.COMMITTING,
+          "DAG should be in COMMITTING state, but in " + dag.getState());
+      if (!dag.commitFutures.isEmpty()) {
+        // pending commits are running
+        return DAGState.COMMITTING;
+      } else {
+        return dag.finished(DAGState.SUCCEEDED);
       }
-      if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
-        String diagnosticMsg = "DAG failed due to failure in recovery handling." +
-            " failedVertices:" + dag.numFailedVertices +
-            " killedVertices:" + dag.numKilledVertices;
-        LOG.info(diagnosticMsg);
-        dag.addDiagnostic(diagnosticMsg);
-        return dag.finished(DAGState.FAILED);
+    } else {
+      if (!dag.commitFutures.isEmpty()) {
+        // pending commits are running
+        return DAGState.TERMINATING;
+      } else {
+        return finishWithTerminationCause(dag);
       }
+    }
+  }
 
-      // catch all
-      String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
-          + ", numCompletedVertices=" + dag.numCompletedVertices
-          + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
-          + ", numFailedVertices=" + dag.numFailedVertices
-          + ", numKilledVertices=" + dag.numKilledVertices
-          + ", numVertices=" + dag.numVertices
-          + ", terminationCause=" + dag.terminationCause;
-      LOG.error(diagnosticMsg);
+  private static DAGState finishWithTerminationCause(DAGImpl dag) {
+    if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
+      String diagnosticMsg = "DAG killed due to user-initiated kill." +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
       dag.addDiagnostic(diagnosticMsg);
-      return dag.finished(DAGState.ERROR);
+      return dag.finished(DAGState.KILLED);
+    }
+    if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
+      String diagnosticMsg = "DAG failed due to vertex failure." +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      return dag.finished(DAGState.FAILED);
+    }
+    if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){
+      String diagnosticMsg = "DAG failed due to commit failure." +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      return dag.finished(DAGState.FAILED);
+    }
+    if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
+      String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      return dag.finished(DAGState.FAILED);
+    }
+    if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING ){
+      String diagnosticMsg = "DAG failed due to vertex rerun in commit." +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      return dag.finished(DAGState.FAILED);
+    }
+    if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
+      String diagnosticMsg = "DAG failed due to failure in recovery handling." +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      return dag.finished(DAGState.FAILED);
     }
 
-    //return the current state, Job not finished yet
-    return dag.getInternalState();
+    // catch all
+    String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
+        + ", numCompletedVertices=" + dag.numCompletedVertices
+        + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+        + ", numFailedVertices=" + dag.numFailedVertices
+        + ", numKilledVertices=" + dag.numKilledVertices
+        + ", numVertices=" + dag.numVertices
+        + ", commitInProgress=" + dag.commitFutures.size() 
+        + ", terminationCause=" + dag.terminationCause;
+    LOG.error(diagnosticMsg);
+    dag.addDiagnostic(diagnosticMsg);
+    return dag.finished(DAGState.ERROR);
   }
 
   private void updateCpuCounters() {
@@ -1221,13 +1309,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       setFinishTime();
     }
 
-    boolean allOutputsCommitted = commitOrAbortOutputs(finalState == DAGState.SUCCEEDED);
-
-    if (finalState == DAGState.SUCCEEDED && !allOutputsCommitted) {
-      finalState = DAGState.FAILED;
-      trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
-    }
-
     boolean recoveryError = false;
 
     // update cpu time counters before finishing the dag
@@ -1246,6 +1327,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       recoveryError = true;
     }
 
+    if (finalState != DAGState.SUCCEEDED) {
+      abortOutputs();
+    }
+
     if (recoveryError) {
       eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), DAGState.ERROR));
     } else {
@@ -1741,6 +1826,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     public void transition(DAGImpl job, DAGEvent event) {
       job.addDiagnostic("Job received Kill while in RUNNING state.");
       job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
+      // Commit may happen when dag is still in RUNNING (vertex group commit)
+      job.cancelCommits();
       // TODO Metrics
       //job.metrics.endRunningJob(job);
     }
@@ -1748,6 +1835,29 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   }
 
+  private static class DAGKilledWhileCommittingTransition
+    implements SingleArcTransition<DAGImpl, DAGEvent> {
+
+    @Override
+    public void transition(DAGImpl dag, DAGEvent event) {
+      String diag = "DAG received Kill while in COMMITTING state.";
+      LOG.info(diag);
+      dag.addDiagnostic(diag);
+      dag.cancelCommits();
+      dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+    }
+  }
+
+  private void cancelCommits() {
+    if (!this.commitCanceled.getAndSet(true)) {
+      for (Map.Entry<OutputKey, ListenableFuture<Void>> entry : commitFutures.entrySet()) {
+        OutputKey outputKey = entry.getKey();
+        LOG.info("Canceling commit of output=" + outputKey);
+        entry.getValue().cancel(true);
+      }
+    }
+  }
+
   private static class VertexCompletedTransition implements
       MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
@@ -1773,11 +1883,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       else if (vertexEvent.getVertexState() == VertexState.FAILED) {
         job.enactKill(
             DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
+        job.cancelCommits();
         job.vertexFailed(vertex);
         forceTransitionToKillWait = true;
       }
       else if (vertexEvent.getVertexState() == VertexState.KILLED) {
         job.vertexKilled(vertex);
+        job.cancelCommits();
         forceTransitionToKillWait = true;
       }
 
@@ -1791,8 +1903,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", numVertices=" + job.numVertices);
 
       // if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT.
-      DAGState state = checkDAGForCompletion(job);
+      DAGState state = checkVerticesForCompletion(job);
       if(state == DAGState.RUNNING && forceTransitionToKillWait){
+        job.cancelCommits();
         return DAGState.TERMINATING;
       }
       else {
@@ -1830,7 +1943,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   private boolean vertexSucceeded(Vertex vertex) {
     numSuccessfulVertices++;
-    boolean failedCommit = false;
     boolean recoveryFailed = false;
     if (!commitAllOutputsOnSuccess) {
       // committing successful outputs immediately. check for shared outputs
@@ -1855,7 +1967,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             continue;
           }
           groupInfo.committed = true;
-          Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+          final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
           try {
             appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
                 new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
@@ -1863,48 +1975,33 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           } catch (IOException e) {
             LOG.error("Failed to send commit recovery event to handler", e);
             recoveryFailed = true;
-            failedCommit = true;
           }
-          if (!failedCommit) {
-            for (String outputName : groupInfo.outputs) {
-              OutputCommitter committer = v.getOutputCommitters().get(outputName);
-              LOG.info("Committing output: " + outputName);
-              if (!commitOutput(outputName, committer)) {
-                // using same logic as vertex level commit. stop after first failure.
-                failedCommit = true;
-                break;
-              }
+          if (!recoveryFailed) {
+            for (final String outputName : groupInfo.outputs) {
+              OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true);
+              CommitCallback groupCommitCallback = new CommitCallback(outputKey);
+              CallableEvent groupCommitCallableEvent = new CallableEvent(groupCommitCallback) {
+                public Void call() throws Exception {
+                  OutputCommitter committer = v.getOutputCommitters().get(outputName);
+                  LOG.info("Committing output: " + outputName);
+                  commitOutput(committer);
+                  return null;
+                };
+              };
+              ListenableFuture<Void> groupCommitFuture = appContext.getExecService().submit(groupCommitCallableEvent);
+              Futures.addCallback(groupCommitFuture, groupCommitCallableEvent.getCallback());
+              commitFutures.put(outputKey, groupCommitFuture);
             }
           }
-          if (failedCommit) {
-            break;
-          }
-          try {
-            appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
-                new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
-                    clock.getTime())));
-          } catch (IOException e) {
-            LOG.error("Failed to send commit recovery event to handler", e);
-            recoveryFailed = true;
-            failedCommit = true;
-          }
         }
       }
     }
-
-    if (failedCommit) {
-      LOG.info("Aborting job due to failure in commit.");
-      if (!recoveryFailed) {
-        enactKill(DAGTerminationCause.COMMIT_FAILURE,
-            VertexTerminationCause.COMMIT_FAILURE);
-      } else {
-        LOG.info("Recovery failure occurred during commit");
-        enactKill(DAGTerminationCause.RECOVERY_FAILURE,
-            VertexTerminationCause.COMMIT_FAILURE);
-      }
+    if (recoveryFailed) {
+      LOG.info("Recovery failure occurred during commit");
+      enactKill(DAGTerminationCause.RECOVERY_FAILURE,
+          VertexTerminationCause.COMMIT_FAILURE);
     }
-
-    return !failedCommit;
+    return !recoveryFailed;
   }
 
   private boolean vertexReRunning(Vertex vertex) {
@@ -1983,29 +2080,109 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   private static class DAGSchedulerUpdateTransition implements
-  SingleArcTransition<DAGImpl, DAGEvent> {
-  @Override
-  public void transition(DAGImpl dag, DAGEvent event) {
-    DAGEventSchedulerUpdate sEvent = (DAGEventSchedulerUpdate) event;
-    switch(sEvent.getUpdateType()) {
-      case TA_SCHEDULE:
-        dag.dagScheduler.scheduleTask(sEvent);
-        break;
-      case TA_SCHEDULED:
-        DAGEventSchedulerUpdateTAAssigned taEvent =
-                              (DAGEventSchedulerUpdateTAAssigned) sEvent;
-        dag.dagScheduler.taskScheduled(taEvent);
-        break;
-      case TA_SUCCEEDED:
-        dag.dagScheduler.taskSucceeded(sEvent);
-        break;
-      default:
-        throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:"
-                                + sEvent.getUpdateType());
+    SingleArcTransition<DAGImpl, DAGEvent> {
+    @Override
+    public void transition(DAGImpl dag, DAGEvent event) {
+      DAGEventSchedulerUpdate sEvent = (DAGEventSchedulerUpdate) event;
+      switch(sEvent.getUpdateType()) {
+        case TA_SCHEDULE:
+          dag.dagScheduler.scheduleTask(sEvent);
+          break;
+        case TA_SCHEDULED:
+          DAGEventSchedulerUpdateTAAssigned taEvent =
+                                (DAGEventSchedulerUpdateTAAssigned) sEvent;
+          dag.dagScheduler.taskScheduled(taEvent);
+          break;
+        case TA_SUCCEEDED:
+          dag.dagScheduler.taskSucceeded(sEvent);
+          break;
+        default:
+          throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:"
+                                  + sEvent.getUpdateType());
+      }
     }
   }
-}
 
+  private static class CommitCompletedWhileRunning implements
+    MultipleArcTransition<DAGImpl, DAGEvent, DAGState>{
+
+    @Override
+    public DAGState transition(DAGImpl dag, DAGEvent event) {
+      DAGEventCommitCompleted commitCompletedEvent = (DAGEventCommitCompleted)event;
+      if (dag.commitCompleted(commitCompletedEvent)) {
+        return DAGState.RUNNING;
+      } else {
+        return DAGState.TERMINATING;
+      }
+    }
+    
+  }
+
+  private static class CommitCompletedTransition implements
+    MultipleArcTransition<DAGImpl, DAGEvent, DAGState>{
+
+    @Override
+    public DAGState transition(DAGImpl dag, DAGEvent event) {
+      DAGEventCommitCompleted commitCompletedEvent = (DAGEventCommitCompleted)event;
+      dag.commitCompleted(commitCompletedEvent);
+      return checkCommitsForCompletion(dag);
+    }
+  }
+
+  private boolean commitCompleted(DAGEventCommitCompleted commitCompletedEvent) {
+    Preconditions.checkState(commitFutures.remove(commitCompletedEvent.getOutputKey()) != null,
+        "Unknown commit:" + commitCompletedEvent.getOutputKey());
+
+    boolean commitFailed = false;
+    boolean recoveryFailed = false;
+    if (commitCompletedEvent.isSucceeded()) {
+      LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputKey());
+      succeededCommits.add(commitCompletedEvent.getOutputKey());
+      if (!commitAllOutputsOnSuccess) {
+        try {
+          appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+              new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(),
+                  clock.getTime())));
+        } catch (IOException e) {
+          String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e);
+          addDiagnostic(diag);
+          LOG.error(diag);
+          recoveryFailed = true;
+        }
+      }
+    } else {
+      String diag = "Commit failed for output: " + commitCompletedEvent.getOutputKey()
+          + ", " + ExceptionUtils.getStackTrace(commitCompletedEvent.getException());
+      addDiagnostic(diag);
+      LOG.error(diag);
+      commitFailed = true;
+    }
+
+    if (commitFailed) {
+      enactKill(DAGTerminationCause.COMMIT_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
+      cancelCommits();
+    }
+    if (recoveryFailed){
+      enactKill(DAGTerminationCause.RECOVERY_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
+      cancelCommits();
+    }
+    return !commitFailed && !recoveryFailed;
+  }
+
+
+  private static class VertexRerunWhileCommitting implements
+    SingleArcTransition<DAGImpl, DAGEvent> {
+
+    @Override
+    public void transition(DAGImpl dag, DAGEvent event) {
+      LOG.info("Vertex rerun while dag it is COMMITTING");
+      dag.cancelCommits();
+      dag.trySetTerminationCause(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING);
+    }
+
+  }
+
+  // TODO TEZ-2250 go to TERMINATING to wait for all vertices and commits completed
   private static class InternalErrorTransition implements
       SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
@@ -2016,6 +2193,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
           VertexTerminationCause.INTERNAL_ERROR);
       job.setFinishTime();
+      job.cancelCommits();
       job.finished(DAGState.ERROR);
     }
   }
@@ -2036,4 +2214,83 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       readLock.unlock();
     }
   }
+
+  // output of either vertex or vertex group
+  public static class OutputKey {
+    String outputName;
+    String entityName; // vertex name or vertex group name
+    boolean isVertexGroupOutput;
+
+    public OutputKey(String outputName, String entityName, boolean isVertexGroupOutput) {
+      super();
+      this.outputName = outputName;
+      this.entityName = entityName;
+      this.isVertexGroupOutput = isVertexGroupOutput;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result
+          + ((entityName == null) ? 0 : entityName.hashCode());
+      result = prime * result + (isVertexGroupOutput ? 1231 : 1237);
+      result = prime * result
+          + ((outputName == null) ? 0 : outputName.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      OutputKey other = (OutputKey) obj;
+      if (entityName == null) {
+        if (other.entityName != null)
+          return false;
+      } else if (!entityName.equals(other.entityName))
+        return false;
+      if (isVertexGroupOutput != other.isVertexGroupOutput)
+        return false;
+      if (outputName == null) {
+        if (other.outputName != null)
+          return false;
+      } else if (!outputName.equals(other.outputName))
+        return false;
+      return true;
+    }
+
+    public String getEntityName() {
+      return entityName;
+    }
+
+    @Override
+    public String toString() {
+      return "outputName:" + outputName + " of vertex/vertexGroup:" + entityName
+          + " isVertexGroupOutput:" + isVertexGroupOutput;
+    }
+  }
+
+  private class CommitCallback implements FutureCallback<Void> {
+
+    private OutputKey outputKey;
+
+    public CommitCallback(OutputKey outputKey) {
+      this.outputKey = outputKey;
+    }
+
+    @Override
+    public void onSuccess(Void result) {
+      eventHandler.handle(new DAGEventCommitCompleted(dagId, outputKey, true, null));
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      eventHandler.handle(new DAGEventCommitCompleted(dagId, outputKey, false, t));
+    }
+  }
 }


Mime
View raw message