tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [2/2] git commit: TEZ-904. Committer recovery events should be out-of-band. (hitesh)
Date Fri, 14 Mar 2014 01:47:33 GMT
TEZ-904. Committer recovery events should be out-of-band. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f58508a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f58508a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f58508a5

Branch: refs/heads/master
Commit: f58508a5692c817a11b7996ba563ba8a26b85560
Parents: 693f2ca
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Mar 13 18:47:05 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Mar 13 18:47:05 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  77 ++--
 .../org/apache/tez/dag/app/RecoveryParser.java  | 354 +++++++++++++++----
 .../dag/app/dag/event/DAGEventRecoverEvent.java |  37 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  73 +++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 114 +++---
 .../apache/tez/dag/history/HistoryEvent.java    |   8 +-
 .../tez/dag/history/HistoryEventType.java       |   4 +-
 .../apache/tez/dag/history/SummaryEvent.java    |  11 +
 .../history/events/DAGCommitStartedEvent.java   |  34 +-
 .../dag/history/events/DAGFinishedEvent.java    |  33 +-
 .../dag/history/events/DAGSubmittedEvent.java   |  23 +-
 .../events/VertexCommitStartedEvent.java        |  41 ++-
 .../dag/history/events/VertexFinishedEvent.java |  51 ++-
 .../events/VertexGroupCommitFinishedEvent.java  | 132 +++++++
 .../events/VertexGroupCommitStartedEvent.java   | 132 +++++++
 .../dag/history/recovery/RecoveryService.java   | 181 +++++-----
 tez-dag/src/main/proto/HistoryEvents.proto      |  21 ++
 .../TestHistoryEventsProtoConversion.java       |  75 +++-
 .../org/apache/tez/test/TestDAGRecovery2.java   | 144 ++++++++
 .../java/org/apache/tez/test/TestProcessor.java |   5 +
 .../apache/tez/test/dag/MultiAttemptDAG.java    | 109 ++++++
 21 files changed, 1390 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9df8752..9a01090 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -99,6 +100,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
@@ -108,6 +110,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -1371,28 +1374,19 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private DAG recoverDAG() throws IOException {
-    DAG recoveredDAG = null;
+  private RecoveredDAGData recoverDAG() throws IOException {
     if (recoveryEnabled) {
       if (this.appAttemptID.getAttemptId() > 1) {
+        LOG.info("Recovering data from previous attempts"
+            + ", currentAttemptId=" + this.appAttemptID.getAttemptId());
         this.state = DAGAppMasterState.RECOVERING;
         RecoveryParser recoveryParser = new RecoveryParser(
             this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
-        recoveredDAG = recoveryParser.parseRecoveryData();
-        if (recoveredDAG != null) {
-          LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
-          _updateLoggers(recoveredDAG, "");
-          DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
-              DAGEventType.DAG_RECOVER);
-          dagEventDispatcher.handle(recoverDAGEvent);
-          this.state = DAGAppMasterState.RUNNING;
-        } else {
-          LOG.info("No DAG to recover");
-          this.state = DAGAppMasterState.IDLE;
-        }
+        RecoveredDAGData recoveredDAGData = recoveryParser.parseRecoveryData();
+        return recoveredDAGData;
       }
     }
-    return recoveredDAG;
+    return null;
   }
 
   @SuppressWarnings("unchecked")
@@ -1415,20 +1409,56 @@ public class DAGAppMaster extends AbstractService {
 
     this.lastDAGCompletionTime = clock.getTime();
 
+    RecoveredDAGData recoveredDAGData = recoverDAG();
+
     if (!isSession) {
-      DAG recoveredDAG = null;
-      if (appAttemptID.getAttemptId() != 1) {
-        recoveredDAG = recoverDAG();
+      LOG.info("In Non-Session mode.");
+    } else {
+      LOG.info("In Session mode. Waiting for DAG over RPC");
+      this.state = DAGAppMasterState.IDLE;
+    }
+
+    if (recoveredDAGData != null) {
+      if (recoveredDAGData.isCompleted
+          || recoveredDAGData.nonRecoverable) {
+        LOG.info("Found previous DAG in completed or non-recoverable state"
+            + ", dagId=" + recoveredDAGData.recoveredDagID
+            + ", isCompleted=" + recoveredDAGData.isCompleted
+            + ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
+            + ", state=" + (recoveredDAGData.dagState == null ? "null" :
+                recoveredDAGData.dagState)
+            + ", failureReason=" + recoveredDAGData.reason);
+        _updateLoggers(recoveredDAGData.recoveredDAG, "");
+        if (recoveredDAGData.nonRecoverable) {
+          DAGEventRecoverEvent recoverDAGEvent =
+              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+                  DAGState.FAILED);
+          dagEventDispatcher.handle(recoverDAGEvent);
+          this.state = DAGAppMasterState.RUNNING;
+        } else {
+          DAGEventRecoverEvent recoverDAGEvent =
+              new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+                  recoveredDAGData.dagState);
+          dagEventDispatcher.handle(recoverDAGEvent);
+          this.state = DAGAppMasterState.RUNNING;
+        }
+      } else {
+        LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
+        _updateLoggers(recoveredDAGData.recoveredDAG, "");
+        DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAGData.recoveredDAG.getID(),
+            DAGEventType.DAG_RECOVER);
+        dagEventDispatcher.handle(recoverDAGEvent);
+        this.state = DAGAppMasterState.RUNNING;
       }
-      if (recoveredDAG == null) {
+    } else {
+      if (!isSession) {
+        // No dag recovered - in non-session, just restart the original DAG
         dagCounter.set(0);
         startDAG();
       }
-    } else {
-      LOG.info("In Session mode. Waiting for DAG over RPC");
-      this.state = DAGAppMasterState.IDLE;
-      recoverDAG();
+    }
 
+    if (isSession) {
       this.dagSubmissionTimer = new Timer(true);
       this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
         @Override
@@ -1436,7 +1466,6 @@ public class DAGAppMaster extends AbstractService {
           checkAndHandleSessionTimeout();
         }
       }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 9e59849..7e1feca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -18,6 +18,13 @@
 
 package org.apache.tez.dag.app;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,7 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
@@ -37,9 +44,9 @@ import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
@@ -48,17 +55,15 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 
 public class RecoveryParser {
 
@@ -88,6 +93,15 @@ public class RecoveryParser {
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
   }
 
+  public static class RecoveredDAGData {
+    public TezDAGID recoveredDagID = null;
+    public DAGImpl recoveredDAG = null;
+    public DAGState dagState = null;
+    public boolean isCompleted = false;
+    public boolean nonRecoverable = false;
+    public String reason = null;
+  }
+
   private static void parseSummaryFile(FSDataInputStream inputStream)
       throws IOException {
     while (inputStream.available() > 0) {
@@ -149,6 +163,12 @@ public class RecoveryParser {
       case VERTEX_COMMIT_STARTED:
         event = new VertexCommitStartedEvent();
         break;
+      case VERTEX_GROUP_COMMIT_STARTED:
+        event = new VertexGroupCommitStartedEvent();
+        break;
+      case VERTEX_GROUP_COMMIT_FINISHED:
+        event = new VertexGroupCommitFinishedEvent();
+        break;
       case VERTEX_FINISHED:
         event = new VertexFinishedEvent();
         break;
@@ -266,18 +286,25 @@ public class RecoveryParser {
     return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
   }
 
-  private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
-    TezDAGID inProgressDAG = null;
-    for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
-      if (!entry.getValue().booleanValue()) {
+  private DAGSummaryData getLastCompletedOrInProgressDAG(
+      Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
+    DAGSummaryData inProgressDAG = null;
+    DAGSummaryData lastCompletedDAG = null;
+    for (Map.Entry<TezDAGID, DAGSummaryData> entry : dagSummaryDataMap.entrySet()) {
+      if (!entry.getValue().completed) {
         if (inProgressDAG != null) {
           throw new RuntimeException("Multiple in progress DAGs seen"
-              + ", dagId=" + inProgressDAG
+              + ", dagId=" + inProgressDAG.dagId
               + ", dagId=" + entry.getKey());
         }
-        inProgressDAG = entry.getKey();
+        inProgressDAG = entry.getValue();
+      } else {
+        lastCompletedDAG = entry.getValue();
       }
     }
+    if (inProgressDAG == null) {
+      return lastCompletedDAG;
+    }
     return inProgressDAG;
   }
 
@@ -305,8 +332,134 @@ public class RecoveryParser {
     return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
   }
 
+  private static class DAGSummaryData {
+
+    final TezDAGID dagId;
+    boolean completed = false;
+    boolean dagCommitCompleted = true;
+    DAGState dagState;
+    Map<TezVertexID, Boolean> vertexCommitStatus =
+        new HashMap<TezVertexID, Boolean>();
+    Map<String, Boolean> vertexGroupCommitStatus =
+        new HashMap<String, Boolean>();
+    List<HistoryEvent> bufferedSummaryEvents =
+        new ArrayList<HistoryEvent>();
+
+    DAGSummaryData(TezDAGID dagId) {
+      this.dagId = dagId;
+    }
+
+    void handleSummaryEvent(SummaryEventProto proto) throws IOException {
+      HistoryEventType eventType =
+          HistoryEventType.values()[proto.getEventType()];
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[RECOVERY SUMMARY]"
+            + " dagId=" + proto.getDagId()
+            + ", timestamp=" + proto.getTimestamp()
+            + ", event=" + eventType);
+      }
+      switch (eventType) {
+        case DAG_SUBMITTED:
+          completed = false;
+          break;
+        case DAG_FINISHED:
+          completed = true;
+          dagCommitCompleted = true;
+          DAGFinishedEvent dagFinishedEvent = new DAGFinishedEvent();
+          dagFinishedEvent.fromSummaryProtoStream(proto);
+          dagState = dagFinishedEvent.getState();
+          break;
+        case DAG_COMMIT_STARTED:
+          dagCommitCompleted = false;
+          break;
+        case VERTEX_COMMIT_STARTED:
+          VertexCommitStartedEvent vertexCommitStartedEvent =
+              new VertexCommitStartedEvent();
+          vertexCommitStartedEvent.fromSummaryProtoStream(proto);
+          vertexCommitStatus.put(
+              vertexCommitStartedEvent.getVertexID(), false);
+          break;
+        case VERTEX_FINISHED:
+          VertexFinishedEvent vertexFinishedEvent =
+              new VertexFinishedEvent();
+          vertexFinishedEvent.fromSummaryProtoStream(proto);
+          if (vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) {
+            vertexCommitStatus.put(
+                vertexFinishedEvent.getVertexID(), true);
+            bufferedSummaryEvents.add(vertexFinishedEvent);
+          }
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+              new VertexGroupCommitStartedEvent();
+          vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
+          bufferedSummaryEvents.add(vertexGroupCommitStartedEvent);
+          vertexGroupCommitStatus.put(
+              vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+              new VertexGroupCommitFinishedEvent();
+          vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
+          bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent);
+          vertexGroupCommitStatus.put(
+              vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+          break;
+      }
+    }
 
-  public DAG parseRecoveryData() throws IOException {
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("dagId=").append(dagId);
+      sb.append(", dagCompleted=").append(completed);
+      if (!vertexCommitStatus.isEmpty()) {
+        sb.append(", vertexCommitStatuses=[");
+        for (Entry<TezVertexID, Boolean> entry : vertexCommitStatus.entrySet()) {
+          sb.append("{ vertexId=").append(entry.getKey())
+              .append(", committed=").append(entry.getValue()).append("}, ");
+        }
+        sb.append("]");
+      }
+      if (!vertexGroupCommitStatus.isEmpty()) {
+        sb.append(", vertexGroupCommitStatuses=[");
+        for (Entry<String, Boolean> entry : vertexGroupCommitStatus.entrySet()) {
+          sb.append("{ vertexGroup=").append(entry.getKey())
+              .append(", committed=").append(entry.getValue()).append("}, ");
+        }
+        sb.append("]");
+      }
+      return sb.toString();
+    }
+  }
+
+  private String isDAGRecoverable(DAGSummaryData data) {
+    if (!data.dagCommitCompleted) {
+      return "DAG Commit was in progress, not recoverable"
+          + ", dagId=" + data.dagId;
+    }
+    if (!data.vertexCommitStatus.isEmpty()) {
+      for (Entry<TezVertexID, Boolean> entry : data.vertexCommitStatus.entrySet()) {
+        if (!(entry.getValue().booleanValue())) {
+          return "Vertex Commit was in progress, not recoverable"
+              + ", dagId=" + data.dagId
+              + ", vertexId=" + entry.getKey();
+        }
+      }
+    }
+    if (!data.vertexGroupCommitStatus.isEmpty()) {
+      for (Entry<String, Boolean> entry : data.vertexGroupCommitStatus.entrySet()) {
+        if (!(entry.getValue().booleanValue())) {
+          return "Vertex Group Commit was in progress, not recoverable"
+              + ", dagId=" + data.dagId
+              + ", vertexGroup=" + entry.getKey();
+        }
+      }
+    }
+    return null;
+  }
+
+  public RecoveredDAGData parseRecoveryData() throws IOException {
     Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
     LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
         + " for recovering data from previous attempt");
@@ -330,8 +483,6 @@ public class RecoveryParser {
     FSDataOutputStream newSummaryStream =
         getSummaryOutputStream(newSummaryPath);
 
-    Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
-
     FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
     LOG.info("Parsing summary file"
         + ", path=" + summaryPath.toString()
@@ -339,6 +490,8 @@ public class RecoveryParser {
         + ", lastModTime=" + summaryFileStatus.getModificationTime());
 
     int dagCounter = 0;
+    Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
+        new HashMap<TezDAGID, DAGSummaryData>();
     while (summaryStream.available() > 0) {
       RecoveryProtos.SummaryEventProto proto =
           RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
@@ -354,11 +507,10 @@ public class RecoveryParser {
       if (dagCounter < dagId.getId()) {
         dagCounter = dagId.getId();
       }
-      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-        seenDAGs.put(dagId, false);
-      } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
-        seenDAGs.put(dagId, true);
+      if (!dagSummaryDataMap.containsKey(dagId)) {
+        dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
       }
+      dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
       proto.writeDelimitedTo(newSummaryStream);
     }
     newSummaryStream.hsync();
@@ -367,12 +519,35 @@ public class RecoveryParser {
     // Set counter for next set of DAGs
     dagAppMaster.setDAGCounter(dagCounter);
 
-    TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+    DAGSummaryData lastInProgressDAGData =
+        getLastCompletedOrInProgressDAG(dagSummaryDataMap);
+    if (lastInProgressDAGData == null) {
+      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
+      return null;
+    }
+    TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
     if (lastInProgressDAG == null) {
-      LOG.info("Nothing to recover as no uncompleted DAGs found");
+      LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
       return null;
     }
 
+    LOG.info("Checking if DAG is in recoverable state"
+        + ", dagId=" + lastInProgressDAGData.dagId);
+
+    final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
+    if (lastInProgressDAGData.completed) {
+      recoveredDAGData.isCompleted = true;
+      recoveredDAGData.dagState = lastInProgressDAGData.dagState;
+    }
+
+    String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
+    if (nonRecoverableReason != null) {
+      LOG.warn("Found last inProgress DAG but not recoverable: "
+          + lastInProgressDAGData);
+      recoveredDAGData.nonRecoverable = true;
+      recoveredDAGData.reason = nonRecoverableReason;
+    }
+
     LOG.info("Trying to recover dag from recovery file"
         + ", dagId=" + lastInProgressDAG.toString()
         + ", dataDir=" + previousAttemptRecoveryDataDir
@@ -387,14 +562,13 @@ public class RecoveryParser {
           + ", dagId=" + lastInProgressDAG);
     }
 
-    DAGImpl recoveredDAG = null;
-
     LOG.info("Copying DAG data into Current Attempt directory"
         + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
         lastInProgressDAG));
     FSDataOutputStream newDAGRecoveryStream =
         getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
 
+    boolean skipAllOtherEvents = false;
     while (dagRecoveryStream.available() > 0) {
       HistoryEvent event;
       try {
@@ -403,8 +577,9 @@ public class RecoveryParser {
         LOG.warn("Corrupt data found when trying to read next event", ioe);
         break;
       }
-      if (event == null) {
+      if (event == null || skipAllOtherEvents) {
         // reached end of data
+        event = null;
         break;
       }
       HistoryEventType eventType = event.getEventType();
@@ -414,9 +589,13 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+          recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
               lastInProgressDAG);
-          dagAppMaster.setCurrentDAG(recoveredDAG);
+          recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
+          dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
+          if (recoveredDAGData.nonRecoverable) {
+            skipAllOtherEvents = true;
+          }
           break;
         }
         case DAG_INITIALIZED:
@@ -424,8 +603,8 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
           break;
         }
         case DAG_STARTED:
@@ -433,8 +612,8 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
           break;
         }
         case DAG_COMMIT_STARTED:
@@ -442,8 +621,26 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case VERTEX_GROUP_COMMIT_STARTED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+          break;
+        }
+        case VERTEX_GROUP_COMMIT_FINISHED:
+        {
+          LOG.info("Recovering from event"
+              + ", eventType=" + eventType
+              + ", event=" + event.toString());
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
           break;
         }
         case DAG_FINISHED:
@@ -452,13 +649,16 @@ public class RecoveryParser {
               + ", eventType=" + eventType
               + ", event=" + event.toString());
           // If this is seen, nothing to recover
-          assert recoveredDAG != null;
-          recoveredDAG.restoreFromEvent(event);
-          return recoveredDAG;
+          assert recoveredDAGData.recoveredDAG != null;
+          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+          recoveredDAGData.isCompleted = true;
+          recoveredDAGData.dagState =
+              ((DAGFinishedEvent) event).getState();
+          skipAllOtherEvents = true;
         }
         case CONTAINER_LAUNCHED:
         {
-          // Nothing to do?
+          // Nothing to do for now
           break;
         }
         case VERTEX_INITIALIZED:
@@ -466,9 +666,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -477,9 +677,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexStartedEvent vEvent = (VertexStartedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -488,9 +688,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -499,9 +699,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -510,9 +710,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -521,9 +721,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskStartedEvent tEvent = (TaskStartedEvent) event;
-          Task task = recoveredDAG.getVertex(
+          Task task = recoveredDAGData.recoveredDAG.getVertex(
               tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
           task.restoreFromEvent(tEvent);
           break;
@@ -533,9 +733,9 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
-          Task task = recoveredDAG.getVertex(
+          Task task = recoveredDAGData.recoveredDAG.getVertex(
               tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
           task.restoreFromEvent(tEvent);
           break;
@@ -545,10 +745,10 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
           Task task =
-              recoveredDAG.getVertex(
+              recoveredDAGData.recoveredDAG.getVertex(
                   tEvent.getTaskAttemptID().getTaskID().getVertexID())
                       .getTask(tEvent.getTaskAttemptID().getTaskID());
           task.restoreFromEvent(tEvent);
@@ -559,10 +759,10 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
           Task task =
-              recoveredDAG.getVertex(
+              recoveredDAGData.recoveredDAG.getVertex(
                   tEvent.getTaskAttemptID().getTaskID().getVertexID())
                   .getTask(tEvent.getTaskAttemptID().getTaskID());
           task.restoreFromEvent(tEvent);
@@ -573,10 +773,10 @@ public class RecoveryParser {
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAG != null;
+          assert recoveredDAGData.recoveredDAG != null;
           VertexDataMovementEventsGeneratedEvent vEvent =
               (VertexDataMovementEventsGeneratedEvent) event;
-          Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
           v.restoreFromEvent(vEvent);
           break;
         }
@@ -596,6 +796,42 @@ public class RecoveryParser {
     newDAGRecoveryStream.hsync();
     newDAGRecoveryStream.close();
 
+    if (!recoveredDAGData.isCompleted
+        && !recoveredDAGData.nonRecoverable) {
+      if (lastInProgressDAGData.bufferedSummaryEvents != null
+        && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
+        for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
+          assert recoveredDAGData.recoveredDAG != null;
+          switch (bufferedEvent.getEventType()) {
+            case VERTEX_GROUP_COMMIT_STARTED:
+              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+              break;
+            case VERTEX_GROUP_COMMIT_FINISHED:
+              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+              break;
+            case VERTEX_FINISHED:
+              VertexFinishedEvent vertexFinishedEvent =
+                  (VertexFinishedEvent) bufferedEvent;
+              Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
+                  vertexFinishedEvent.getVertexID());
+              if (vertex == null) {
+                recoveredDAGData.nonRecoverable = true;
+                recoveredDAGData.reason = "All state could not be recovered"
+                    + ", vertex completed but events not flushed"
+                    + ", vertexId=" + vertexFinishedEvent.getVertexID();
+              } else {
+                vertex.restoreFromEvent(vertexFinishedEvent);
+              }
+              break;
+            default:
+              throw new RuntimeException("Invalid data found in buffered summary events"
+                  + ", unknown event type "
+                  + bufferedEvent.getEventType());
+          }
+        }
+      }
+    }
+
     Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
         dataRecoveredFileFlag);
     LOG.info("Finished copying data from previous attempt into current attempt"
@@ -607,7 +843,7 @@ public class RecoveryParser {
     flagFile.hsync();
     flagFile.close();
 
-    return recoveredDAG;
+    return recoveredDAGData;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
new file mode 100644
index 0000000..e64ad13
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventRecoverEvent extends DAGEvent {
+
+  private final DAGState desiredState;
+
+  public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState) {
+    super(dagId, DAGEventType.DAG_RECOVER);
+    this.desiredState = desiredState;
+  }
+
+  public DAGState getDesiredState() {
+    return desiredState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 432c189..8fc278f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -54,11 +55,11 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -73,32 +74,35 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.DAGReport;
 import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -357,6 +361,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
   private DAGState recoveredState = DAGState.NEW;
   private boolean recoveryCommitInProgress = false;
+  Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
 
   static class VertexGroupInfo {
     String groupName;
@@ -497,18 +502,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         recoveredState = DAGState.RUNNING;
         return recoveredState;
       case DAG_COMMIT_STARTED:
-        if (recoveredState != DAGState.RUNNING) {
-          throw new RuntimeException("Commit Started Event seen but"
-              + " recovered state is not RUNNING"
-              + ", recoveredState=" + recoveredState);
-        }
         recoveryCommitInProgress = true;
         return recoveredState;
+      case VERTEX_GROUP_COMMIT_STARTED:
+        VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+            (VertexGroupCommitStartedEvent) historyEvent;
+        recoveredGroupCommits.put(
+            vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+        return recoveredState;
+      case VERTEX_GROUP_COMMIT_FINISHED:
+        VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+            (VertexGroupCommitFinishedEvent) historyEvent;
+        recoveredGroupCommits.put(
+            vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+        return recoveredState;
       case DAG_FINISHED:
-        if (!recoveryStartEventSeen) {
-          throw new RuntimeException("Finished Event seen but"
-              + " no Start Event was encountered earlier");
-        }
         recoveryCommitInProgress = false;
         DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
         this.finishTime = finishedEvent.getFinishTime();
@@ -722,7 +730,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
       // commit all shared outputs
       appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
-          new DAGCommitStartedEvent(getID())));
+          new DAGCommitStartedEvent(getID(), clock.getTime())));
       for (VertexGroupInfo groupInfo : vertexGroups.values()) {
         if (failedWhileCommitting) {
           break;
@@ -1266,6 +1274,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     @Override
     public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+      if (dagEvent instanceof DAGEventRecoverEvent) {
+        // DAG completed or final end state known
+        DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
+        dag.recoveredState = recoverEvent.getDesiredState();
+      }
+
       switch (dag.recoveredState) {
         case NEW:
           // send DAG an Init and start events
@@ -1292,8 +1306,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         case RUNNING:
           // if commit is in progress, DAG should fail as commits are not
           // recoverable
-          if (dag.recoveryCommitInProgress) {
-            // Fail the DAG as we have not seen a completion
+          boolean groupCommitInProgress = false;
+          if (!dag.recoveredGroupCommits.isEmpty()) {
+            for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
+              if (!entry.getValue().booleanValue()) {
+                LOG.info("Found a pending Vertex Group commit"
+                    + ", vertexGroup=" + entry.getKey());
+              }
+              groupCommitInProgress = true;
+            }
+          }
+
+          if (groupCommitInProgress || dag.recoveryCommitInProgress) {
+            // Fail the DAG as we have not seen a commit completion
             dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
             dag.setFinishTime();
             // Recover all other data for all vertices
@@ -1314,6 +1339,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
                 DAGState.FAILED));
             return DAGState.FAILED;
           }
+
           for (Vertex v : dag.vertices.values()) {
             if (v.getInputVerticesCount() == 0) {
               if (LOG.isDebugEnabled()) {
@@ -1596,10 +1622,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           }
         }
         for (VertexGroupInfo groupInfo : commitList) {
+          if (recoveredGroupCommits.containsKey(groupInfo.groupName)) {
+            LOG.info("VertexGroup was already committed as per recovery"
+                + " data, groupName=" + groupInfo.groupName);
+            continue;
+          }
           groupInfo.committed = true;
           Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
           appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
-              new DAGCommitStartedEvent(dagId)));
+              new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
+                  clock.getTime())));
           for (String outputName : groupInfo.outputs) {
             OutputCommitter committer = v.getOutputCommitters().get(outputName);
             LOG.info("Committing output: " + outputName);
@@ -1612,6 +1644,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           if (failedCommit) {
             break;
           }
+          appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+              new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
+                  clock.getTime())));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3ca9e11..7b3b6b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -94,6 +94,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -106,7 +107,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -131,9 +131,9 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -146,7 +146,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 
-
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
  * The read and write calls use ReadWriteLock for concurrency.
  */
@@ -529,6 +528,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   
   private String logIdentifier;
   private boolean recoveryCommitInProgress = false;
+  private boolean summaryCompleteSeen = false;
+  private boolean hasCommitter = false;
+  private boolean vertexCompleteSeen = false;
   private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
 
   // Recovery related flags
@@ -905,20 +907,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
         return recoveredState;
       case VERTEX_COMMIT_STARTED:
-        if (recoveredState != VertexState.RUNNING) {
-          throw new RuntimeException("Commit Started Event seen but"
-              + " recovered state is not RUNNING"
-              + ", recoveredState=" + recoveredState);
-        }
         recoveryCommitInProgress = true;
+        hasCommitter = true;
         return recoveredState;
       case VERTEX_FINISHED:
-        if (!recoveryStartEventSeen) {
-          throw new RuntimeException("Finished Event seen but"
-              + " no Started Event was encountered earlier");
+        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+        if (finishedEvent.isFromSummary()) {
+          summaryCompleteSeen  = true;
+        } else {
+          vertexCompleteSeen = true;
         }
         recoveryCommitInProgress = false;
-        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
         recoveredState = finishedEvent.getState();
         diagnostics.add(finishedEvent.getDiagnostics());
         finishTime = finishedEvent.getFinishTime();
@@ -1280,16 +1279,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
         LOG.info("Vertex succeeded: " + vertex.logIdentifier);
         try {
-          if (vertex.outputCommitters != null) {
-            vertex.appContext.getHistoryHandler().handle(
-                new DAGHistoryEvent(vertex.getDAGId(),
-                    new VertexCommitStartedEvent(vertex.vertexId)));
-          }
           if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
             // commit only once. Dont commit shared outputs
             LOG.info("Invoking committer commit for vertex, vertexId="
                 + vertex.logIdentifier);
-            if (vertex.outputCommitters != null) {
+            if (vertex.outputCommitters != null
+                && !vertex.outputCommitters.isEmpty()) {
+              boolean firstCommit = true;
               for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
                 final OutputCommitter committer = entry.getValue();
                 final String outputName = entry.getKey();
@@ -1297,6 +1293,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   // dont commit shared committers. Will be committed by the DAG
                   continue;
                 }
+                if (firstCommit) {
+                  // Log commit start event on first actual commit
+                  vertex.appContext.getHistoryHandler().handle(
+                      new DAGHistoryEvent(vertex.getDAGId(),
+                          new VertexCommitStartedEvent(vertex.vertexId,
+                              vertex.clock.getTime())));
+                } else {
+                  firstCommit = false;
+                }
                 vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
                   @Override
                   public Void run() throws Exception {
@@ -1806,31 +1811,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         case SUCCEEDED:
         case FAILED:
         case KILLED:
-          vertex.tasksNotYetScheduled = false;
-          // recover tasks
-          if (vertex.tasks != null) {
-            TaskState taskState = TaskState.KILLED;
-            switch (vertex.recoveredState) {
-              case SUCCEEDED:
-                taskState = TaskState.SUCCEEDED;
-                break;
-              case KILLED:
-                taskState = TaskState.KILLED;
-                break;
-              case FAILED:
-                taskState = TaskState.FAILED;
-                break;
-            }
-            for (Task task : vertex.tasks.values()) {
-              vertex.eventHandler.handle(
-                  new TaskEventRecoverTask(task.getTaskId(),
-                      taskState));
-            }
-            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
-            endState = VertexState.RUNNING;
+          if (vertex.recoveredState == VertexState.SUCCEEDED
+              && vertex.hasCommitter
+              && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
+            LOG.warn("Cannot recover vertex as all recovery events not"
+                + " found, vertex=" + vertex.logIdentifier
+                + ", hasCommitters=" + vertex.hasCommitter
+                + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen
+                + ", finalCompletionSeen=" + vertex.vertexCompleteSeen);
+            vertex.finished(VertexState.FAILED,
+                VertexTerminationCause.COMMIT_FAILURE);
+            endState = VertexState.FAILED;
           } else {
-            endState = vertex.recoveredState;
-            vertex.finished(endState);
+            vertex.tasksNotYetScheduled = false;
+            // recover tasks
+            if (vertex.tasks != null) {
+              TaskState taskState = TaskState.KILLED;
+              switch (vertex.recoveredState) {
+                case SUCCEEDED:
+                  taskState = TaskState.SUCCEEDED;
+                  break;
+                case KILLED:
+                  taskState = TaskState.KILLED;
+                  break;
+                case FAILED:
+                  taskState = TaskState.FAILED;
+                  break;
+              }
+              for (Task task : vertex.tasks.values()) {
+                vertex.eventHandler.handle(
+                    new TaskEventRecoverTask(task.getTaskId(),
+                        taskState));
+              }
+              vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+              endState = VertexState.RUNNING;
+            } else {
+              endState = vertex.recoveredState;
+              vertex.finished(endState);
+            }
           }
           break;
         default:
@@ -2318,12 +2336,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             " asked to split by: " + originalSplitSource + 
             " but was already split by:" + vertex.originalOneToOneSplitSource);
       }
-      Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING, 
-          " Unexpected 1-1 split for vertex " + vertex.getVertexId() + 
-          " in state " + vertex.getState() + 
-          " . Split in vertex " + originalSplitSource + 
-          " sent by vertex " + splitEvent.getSenderVertex() +
-          " numTasks " + splitEvent.getNumTasks());
+      Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
+          " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+              " in state " + vertex.getState() +
+              " . Split in vertex " + originalSplitSource +
+              " sent by vertex " + splitEvent.getSenderVertex() +
+              " numTasks " + splitEvent.getNumTasks());
       LOG.info("Splitting vertex " + vertex.getVertexId() + 
           " because of split in vertex " + originalSplitSource + 
           " sent by vertex " + splitEvent.getSenderVertex() +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 78d1208..3f756c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -18,16 +18,16 @@
 
 package org.apache.tez.dag.history;
 
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
 public interface HistoryEvent {
 
-  HistoryEventType getEventType();
+  public HistoryEventType getEventType();
 
   public JSONObject convertToATSJSON() throws JSONException;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 7b2087a..219bfe3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -36,5 +36,7 @@ public enum HistoryEventType {
   CONTAINER_LAUNCHED,
   VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
   DAG_COMMIT_STARTED,
-  VERTEX_COMMIT_STARTED
+  VERTEX_COMMIT_STARTED,
+  VERTEX_GROUP_COMMIT_STARTED,
+  VERTEX_GROUP_COMMIT_FINISHED
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
index 587ee3e..eaae813 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
@@ -21,8 +21,19 @@ package org.apache.tez.dag.history;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+
 public interface SummaryEvent {
 
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException;
 
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException;
+
+  /**
+   * Whether to write this event immediately to the DAG recovery file
+   * Summary events are always written immediately to summary file.
+   * @return
+   */
+  public boolean writeToRecoveryImmediately();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 6d5a769..627751a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -18,26 +18,31 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.utils.ProtoUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class DAGCommitStartedEvent implements HistoryEvent {
+public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
   private TezDAGID dagID;
+  private long commitStartTime;
 
   public DAGCommitStartedEvent() {
   }
 
-  public DAGCommitStartedEvent(TezDAGID dagID) {
+  public DAGCommitStartedEvent(TezDAGID dagID, long commitStartTime) {
     this.dagID = dagID;
+    this.commitStartTime = commitStartTime;
   }
 
   @Override
@@ -91,4 +96,21 @@ public class DAGCommitStartedEvent implements HistoryEvent {
     return dagID;
   }
 
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    ProtoUtils.toSummaryEventProto(dagID, commitStartTime,
+        getEventType()).writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.commitStartTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 38e0702..14381b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -18,6 +18,10 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -28,15 +32,15 @@ import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGFinishedProto;
-import org.apache.tez.dag.utils.ProtoUtils;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
 
 public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
 
@@ -166,8 +170,25 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
 
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
-    ProtoUtils.toSummaryEventProto(dagID, finishTime,
-        HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setTimestamp(finishTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(ByteString.copyFrom(Ints.toByteArray(state.ordinal())));
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.finishTime = proto.getTimestamp();
+    this.state = DAGState.values()[
+        Ints.fromByteArray(proto.getEventPayload().toByteArray())];
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return true;
   }
 
   public long getFinishTime() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 853bea7..af83dc8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -18,6 +18,10 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.records.DAGProtos;
@@ -29,16 +33,13 @@ import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-      public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
+public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
 
   private TezDAGID dagID;
   private long submitTime;
@@ -168,6 +169,17 @@ import java.io.OutputStream;
         HistoryEventType.DAG_SUBMITTED).writeDelimitedTo(outputStream);
   }
 
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    throw new UnsupportedOperationException("Cannot re-initialize event from"
+        + " summary stream");
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return true;
+  }
+
   public String getDAGName() {
     if (dagPlan != null && dagPlan.hasName()) {
       return dagPlan.getName();
@@ -190,4 +202,5 @@ import java.io.OutputStream;
   public long getSubmitTime() {
     return submitTime;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 564f9ed..387bff1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -18,26 +18,33 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.protobuf.ByteString;
 
-public class VertexCommitStartedEvent implements HistoryEvent {
+public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
   private TezVertexID vertexID;
+  private long commitStartTime;
 
   public VertexCommitStartedEvent() {
   }
 
-  public VertexCommitStartedEvent(TezVertexID vertexId) {
+  public VertexCommitStartedEvent(TezVertexID vertexId, long commitStartTime) {
     this.vertexID = vertexId;
+    this.commitStartTime = commitStartTime;
   }
 
   @Override
@@ -91,4 +98,28 @@ public class VertexCommitStartedEvent implements HistoryEvent {
     return this.vertexID;
   }
 
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(vertexID.getDAGId().toString())
+        .setTimestamp(commitStartTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(
+            ByteString.copyFrom(vertexID.toString().getBytes()));
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    this.vertexID = TezVertexID.fromString(
+        new String(proto.getEventPayload().toByteArray()));
+    this.commitStartTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 2366eb1..6f07c91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -18,25 +18,29 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishedProto;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class VertexFinishedEvent implements HistoryEvent {
+public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
 
   private TezVertexID vertexID;
   private String vertexName;
@@ -48,6 +52,7 @@ public class VertexFinishedEvent implements HistoryEvent {
   private VertexState state;
   private String diagnostics;
   private TezCounters tezCounters;
+  private boolean fromSummary = false;
 
   public VertexFinishedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
@@ -186,4 +191,40 @@ public class VertexFinishedEvent implements HistoryEvent {
   public TezCounters getTezCounters() {
     return tezCounters;
   }
+
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    VertexFinishStateProto finishStateProto =
+        VertexFinishStateProto.newBuilder()
+            .setState(state.ordinal())
+            .setVertexId(vertexID.toString())
+            .build();
+
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(vertexID.getDAGId().toString())
+        .setTimestamp(finishTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(finishStateProto.toByteString());
+    builder.build().writeDelimitedTo(outputStream);
+
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    VertexFinishStateProto finishStateProto =
+        VertexFinishStateProto.parseFrom(proto.getEventPayload());
+    this.vertexID = TezVertexID.fromString(finishStateProto.getVertexId());
+    this.state = VertexState.values()[finishStateProto.getState()];
+    this.finishTime = proto.getTimestamp();
+    this.fromSummary = true;
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+  public boolean isFromSummary() {
+    return fromSummary;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
new file mode 100644
index 0000000..99a5288
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
+
+  private TezDAGID dagID;
+  private String vertexGroupName;
+  private long commitFinishTime;
+
+  public VertexGroupCommitFinishedEvent() {
+  }
+
+  public VertexGroupCommitFinishedEvent(TezDAGID dagID,
+      String vertexGroupName, long commitFinishTime) {
+    this.dagID = dagID;
+    this.vertexGroupName = vertexGroupName;
+    this.commitFinishTime = commitFinishTime;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexGroupCommitFinishedProto toProto() {
+    return VertexGroupCommitFinishedProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setVertexGroupName(vertexGroupName)
+        .build();
+  }
+
+  public void fromProto(VertexGroupCommitFinishedProto proto) {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.vertexGroupName = proto.getVertexGroupName();
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "dagId=" + dagID
+        + ", vertexGroup=" + vertexGroupName;
+  }
+
+  public String getVertexGroupName() {
+    return this.vertexGroupName;
+  }
+
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setTimestamp(commitFinishTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(toProto().toByteString());
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    VertexGroupCommitFinishedProto vertexGroupCommitFinishedProto =
+        VertexGroupCommitFinishedProto.parseFrom(proto.getEventPayload());
+    fromProto(vertexGroupCommitFinishedProto);
+    this.commitFinishTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
new file mode 100644
index 0000000..04d6276
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
+
+  private TezDAGID dagID;
+  private String vertexGroupName;
+  private long commitStartTime;
+
+  public VertexGroupCommitStartedEvent() {
+  }
+
+  public VertexGroupCommitStartedEvent(TezDAGID dagID,
+      String vertexGroupName, long commitStartTime) {
+    this.dagID = dagID;
+    this.vertexGroupName = vertexGroupName;
+    this.commitStartTime = commitStartTime;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_GROUP_COMMIT_STARTED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexGroupCommitStartedProto toProto() {
+    return VertexGroupCommitStartedProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setVertexGroupName(vertexGroupName)
+        .build();
+  }
+
+  public void fromProto(VertexGroupCommitStartedProto proto) {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.vertexGroupName = proto.getVertexGroupName();
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "dagId=" + dagID
+        + ", vertexGroup=" + vertexGroupName;
+  }
+
+  public String getVertexGroupName() {
+    return this.vertexGroupName;
+  }
+
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+    SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setTimestamp(commitStartTime)
+        .setEventType(getEventType().ordinal())
+        .setEventPayload(toProto().toByteString());
+    builder.build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+    VertexGroupCommitStartedProto vertexGroupCommitStartedProto =
+        VertexGroupCommitStartedProto.parseFrom(proto.getEventPayload());
+    fromProto(vertexGroupCommitStartedProto);
+    this.commitStartTime = proto.getTimestamp();
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+}


Mime
View raw message