tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir (zjffdu)
Date Fri, 10 Apr 2015 07:34:50 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 282e63af1 -> 3b59666fb


TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir (zjffdu)

(cherry picked from commit 47f3e8325dad0124de74f6c1305d627046125e0d)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
	tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b59666f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b59666f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b59666f

Branch: refs/heads/branch-0.6
Commit: 3b59666fb2b2b9f988c8d39c2bd9127c14519d5b
Parents: 282e63a
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Wed Apr 8 13:53:32 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Fri Apr 10 15:34:30 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/dag/app/RecoveryParser.java  | 699 ++++++++-----------
 .../dag/history/recovery/RecoveryService.java   |   6 +-
 .../apache/tez/dag/app/TestRecoveryParser.java  | 205 +++++-
 .../org/apache/tez/test/TestAMRecovery.java     |  21 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |  20 +-
 6 files changed, 538 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3b59666f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e719e10..6d292d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir
   TEZ-2061. Tez UI: vertex id column and filter on tasks page should be changed to vertex name
   TEZ-2242. Refactor ShuffleVertexManager code
   TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false.

http://git-wip-us.apache.org/repos/asf/tez/blob/3b59666f/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 220b5b5..767f036 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -84,8 +83,6 @@ public class RecoveryParser {
   private final int recoveryBufferSize;
   private final int currentAttemptId;
 
-  private static final String dataRecoveredFileFlag = "dataRecovered";
-
   public RecoveryParser(DAGAppMaster dagAppMaster,
       FileSystem recoveryFS,
       Path recoveryDataDir,
@@ -277,11 +274,6 @@ public class RecoveryParser {
     return TezCommonUtils.getSummaryRecoveryPath(attemptRrecoveryDataDir);
   }
 
-  private FSDataOutputStream getSummaryOutputStream(Path summaryPath)
-      throws IOException {
-    return recoveryFS.create(summaryPath, true, recoveryBufferSize);
-  }
-
   private FSDataInputStream getSummaryStream(Path summaryPath)
       throws IOException {
     if (!recoveryFS.exists(summaryPath)) {
@@ -296,24 +288,6 @@ public class RecoveryParser {
         dagID.toString() + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
   }
 
-  private FSDataInputStream getDAGRecoveryStream(Path recoveryDataDir,
-      TezDAGID dagID)
-      throws IOException {
-    Path dagRecoveryPath = getDAGRecoveryFilePath(recoveryDataDir, dagID);
-    if (!recoveryFS.exists(dagRecoveryPath)) {
-      return null;
-    }
-    return recoveryFS.open(dagRecoveryPath, recoveryBufferSize);
-  }
-
-  private FSDataOutputStream getDAGRecoveryOutputStream(Path recoveryDataDir,
-      TezDAGID dagID)
-      throws IOException {
-    Path dagRecoveryPath = new Path(recoveryDataDir,
-        dagID.toString() + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-    return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
-  }
-
   @VisibleForTesting
   DAGSummaryData getLastCompletedOrInProgressDAG(
       Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
@@ -340,65 +314,6 @@ public class RecoveryParser {
     return inProgressDAG;
   }
 
-  private Path getPreviousAttemptRecoveryDataDir() throws IOException {
-    LOG.info("Looking for the correct attempt directory to recover from");
-    int foundPreviousAttempt = -1;
-    for (int i = currentAttemptId - 1; i > 0; --i) {
-      Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
-      LOG.info("Looking at attempt directory, path=" + attemptPath);
-      Path fatalErrorOccurred = new Path(attemptPath,
-          RecoveryService.RECOVERY_FATAL_OCCURRED_DIR);
-      if (recoveryFS.exists(fatalErrorOccurred)) {
-        throw new IOException("Found that a fatal error occurred in"
-            + " recovery during previous attempt, foundFile="
-            + fatalErrorOccurred.toString());
-      }
-
-      Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag);
-      try {
-        if (recoveryFS.exists(dataRecoveredFile)) {
-          LOG.info("Found data recovered file in attempt directory"
-              + ", dataRecoveredFile=" + dataRecoveredFile
-              + ", path=" + attemptPath);
-          foundPreviousAttempt = i;
-          break;
-        }
-        LOG.info("Skipping attempt directory as data recovered file does not exist"
-            + ", dataRecoveredFile=" + dataRecoveredFile
-            + ", path=" + attemptPath);
-      } catch (IOException e) {
-        LOG.warn("Exception when checking previous attempt dir for "
-            + dataRecoveredFile.toString(), e);
-      }
-    }
-    if (foundPreviousAttempt == -1) {
-      // Look for oldest summary file and use that
-      LOG.info("Did not find any attempt dir that had data recovered file."
-          + " Looking for oldest summary file");
-      for (int i = 1; i < currentAttemptId; ++i) {
-        Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
-        Path summaryPath = getSummaryPath(attemptPath);
-        if (recoveryFS.exists(summaryPath)) {
-          LOG.info("Found summary file in attempt directory"
-              + ", summaryFile=" + summaryPath
-              + ", path=" + attemptPath);
-          foundPreviousAttempt = i;
-          break;
-        }
-        LOG.info("Skipping attempt directory as no summary file found"
-            + ", summaryFile=" + summaryPath
-            + ", path=" + attemptPath);
-      }
-    }
-    if (foundPreviousAttempt == -1) {
-      LOG.info("Falling back to first attempt as no other recovered attempts"
-          + " found");
-      foundPreviousAttempt = 1;
-    }
-
-    return TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, foundPreviousAttempt);
-  }
-
   @VisibleForTesting
   static class DAGSummaryData {
 
@@ -525,75 +440,89 @@ public class RecoveryParser {
     return null;
   }
 
-  public RecoveredDAGData parseRecoveryData() throws IOException {
-    Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
-    LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
-        + " for recovering data from previous attempt");
-    if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
-      LOG.info("Nothing to recover as previous attempt data does not exist"
-          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
-      createDataRecoveredFlagFile();
-      return null;
+  private List<Path> getSummaryFiles() throws IOException {
+    List<Path> summaryFiles = new ArrayList<Path>();
+    for (int i = 1; i < currentAttemptId; ++i) {
+      Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
+      Path fatalErrorOccurred = new Path(attemptPath,
+          RecoveryService.RECOVERY_FATAL_OCCURRED_DIR);
+      if (recoveryFS.exists(fatalErrorOccurred)) {
+        throw new IOException("Found that a fatal error occurred in"
+            + " recovery during previous attempt, foundFile="
+            + fatalErrorOccurred.toString());
+      }
+      Path summaryFile = getSummaryPath(attemptPath);
+      if (recoveryFS.exists(summaryFile)) {
+        summaryFiles.add(summaryFile);
+      }
     }
+    return summaryFiles;
+  }
 
-    Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir);
-    FSDataInputStream summaryStream = getSummaryStream(
-        summaryPath);
-    if (summaryStream == null) {
-      LOG.info("Nothing to recover as summary file does not exist"
-          + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
-          + ", summaryPath=" + summaryPath.toString());
-      createDataRecoveredFlagFile();
-      return null;
+  private List<Path> getDAGRecoveryFiles(TezDAGID dagId) throws IOException {
+    List<Path> recoveryFiles = new ArrayList<Path>();
+    for (int i = 1; i < currentAttemptId; ++i) {
+      Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
+      Path recoveryFile = getDAGRecoveryFilePath(attemptPath, dagId);
+      if (recoveryFS.exists(recoveryFile)) {
+        recoveryFiles.add(recoveryFile);
+      }
     }
+    return recoveryFiles;
+  }
 
-    Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir);
-    FSDataOutputStream newSummaryStream =
-        getSummaryOutputStream(newSummaryPath);
-
-    FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
-    LOG.info("Parsing summary file"
-        + ", path=" + summaryPath.toString()
-        + ", len=" + summaryFileStatus.getLen()
-        + ", lastModTime=" + summaryFileStatus.getModificationTime());
-
+  public RecoveredDAGData parseRecoveryData() throws IOException {
     int dagCounter = 0;
-
     Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
         new HashMap<TezDAGID, DAGSummaryData>();
-    while (true) {
-      RecoveryProtos.SummaryEventProto proto;
-      try {
-        proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
-        if (proto == null) {
+    List<Path> summaryFiles = getSummaryFiles();
+    for (Path summaryFile : summaryFiles) {
+      FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryFile);
+      LOG.info("Parsing summary file"
+          + ", path=" + summaryFile.toString()
+          + ", len=" + summaryFileStatus.getLen()
+          + ", lastModTime=" + summaryFileStatus.getModificationTime());
+      FSDataInputStream summaryStream = getSummaryStream(
+          summaryFile);
+      while (true) {
+        RecoveryProtos.SummaryEventProto proto;
+        try {
+          proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+          if (proto == null) {
+            LOG.info("Reached end of summary stream");
+            break;
+          }
+        } catch (EOFException eof) {
           LOG.info("Reached end of summary stream");
           break;
         }
-      } catch (EOFException eof) {
-        LOG.info("Reached end of summary stream");
-        break;
-      }
-      HistoryEventType eventType =
-          HistoryEventType.values()[proto.getEventType()];
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("[RECOVERY SUMMARY]"
-            + " dagId=" + proto.getDagId()
-            + ", timestamp=" + proto.getTimestamp()
-            + ", event=" + eventType);
-      }
-      TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
-      if (dagCounter < dagId.getId()) {
-        dagCounter = dagId.getId();
-      }
-      if (!dagSummaryDataMap.containsKey(dagId)) {
-        dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
+        HistoryEventType eventType =
+            HistoryEventType.values()[proto.getEventType()];
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("[RECOVERY SUMMARY]"
+              + " dagId=" + proto.getDagId()
+              + ", timestamp=" + proto.getTimestamp()
+              + ", event=" + eventType);
+        }
+        TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
+        if (dagId == null) {
+          throw new IOException("null dagId, summary records may be corrupted");
+        }
+        if (dagCounter < dagId.getId()) {
+          dagCounter = dagId.getId();
+        }
+        if (!dagSummaryDataMap.containsKey(dagId)) {
+          dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
+        }
+        try {
+          dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
+        } catch (Exception e) {
+          // any exception when parsing protobuf
+          throw new IOException("Error when parsing summary event proto", e);
+        }
       }
-      dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
-      proto.writeDelimitedTo(newSummaryStream);
+      summaryStream.close();
     }
-    summaryStream.close();
-    newSummaryStream.hsync();
-    newSummaryStream.close();
 
     // Set counter for next set of DAGs & update dagNames Set in DAGAppMaster
     dagAppMaster.setDAGCounter(dagCounter);
@@ -631,263 +560,254 @@ public class RecoveryParser {
       recoveredDAGData.reason = nonRecoverableReason;
     }
 
-    LOG.info("Trying to recover dag from recovery file"
-        + ", dagId=" + lastInProgressDAG.toString()
-        + ", dataDir=" + previousAttemptRecoveryDataDir
-        + ", intoCurrentDir=" + currentAttemptRecoveryDataDir);
-
-    FSDataInputStream dagRecoveryStream = getDAGRecoveryStream(
-        previousAttemptRecoveryDataDir, lastInProgressDAG);
-    if (dagRecoveryStream == null) {
-      // Could not find data to recover
-      // Error out
-      throw new IOException("Could not find recovery data for last in progress DAG"
-          + ", dagId=" + lastInProgressDAG);
-    }
-
-    LOG.info("Copying DAG data into Current Attempt directory"
-        + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
-        lastInProgressDAG));
-    FSDataOutputStream newDAGRecoveryStream =
-        getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
-
+    List<Path> dagRecoveryFiles = getDAGRecoveryFiles(lastInProgressDAG);
     boolean skipAllOtherEvents = false;
-    while (true) {
-      HistoryEvent event;
-      try {
-        event = getNextEvent(dagRecoveryStream);
-        if (event == null) {
-          LOG.info("Reached end of dag recovery stream");
-          break;
-        }
-      } catch (EOFException eof) {
-        LOG.info("Reached end of dag recovery stream");
-        break;
-      } catch (IOException ioe) {
-        LOG.warn("Corrupt data found when trying to read next event", ioe);
-        break;
-      }
-      if (event == null || skipAllOtherEvents) {
-        // reached end of data
+    Path lastRecoveryFile = null;
+    for (Path dagRecoveryFile : dagRecoveryFiles) {
+      if (skipAllOtherEvents) {
+        LOG.warn("Other recovery files will be skipped due to error in the previous recovery file"
+            + lastRecoveryFile);
         break;
       }
-      HistoryEventType eventType = event.getEventType();
-      switch (eventType) {
-        case DAG_SUBMITTED:
-        {
-          DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
-              lastInProgressDAG);
-          recoveredDAGData.cumulativeAdditionalResources = submittedEvent
-            .getCumulativeAdditionalLocalResources();
-          recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
-          dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
-          if (recoveredDAGData.nonRecoverable) {
-            skipAllOtherEvents = true;
+      lastRecoveryFile = dagRecoveryFile;
+      LOG.info("Trying to recover dag from recovery file"
+          + ", dagId=" + lastInProgressDAG.toString()
+          + ", dagRecoveryFile=" + dagRecoveryFile);
+      FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize);
+      while (true) {
+        HistoryEvent event;
+        try {
+          event = getNextEvent(dagRecoveryStream);
+          if (event == null) {
+            LOG.info("Reached end of dag recovery stream");
+            break;
           }
+        } catch (EOFException eof) {
+          LOG.info("Reached end of dag recovery stream");
           break;
-        }
-        case DAG_INITIALIZED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-          break;
-        }
-        case DAG_STARTED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-          break;
-        }
-        case DAG_COMMIT_STARTED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-          break;
-        }
-        case VERTEX_GROUP_COMMIT_STARTED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-          break;
-        }
-        case VERTEX_GROUP_COMMIT_FINISHED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-          break;
-        }
-        case DAG_FINISHED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          // If this is seen, nothing to recover
-          assert recoveredDAGData.recoveredDAG != null;
-          recoveredDAGData.recoveredDAG.restoreFromEvent(event);
-          recoveredDAGData.isCompleted = true;
-          recoveredDAGData.dagState =
-              ((DAGFinishedEvent) event).getState();
-          skipAllOtherEvents = true;
-        }
-        case CONTAINER_LAUNCHED:
-        {
-          // Nothing to do for now
-          break;
-        }
-        case VERTEX_INITIALIZED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
-          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-          v.restoreFromEvent(vEvent);
-          break;
-        }
-        case VERTEX_STARTED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          VertexStartedEvent vEvent = (VertexStartedEvent) event;
-          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-          v.restoreFromEvent(vEvent);
-          break;
-        }
-        case VERTEX_PARALLELISM_UPDATED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
-          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-          v.restoreFromEvent(vEvent);
-          break;
-        }
-        case VERTEX_COMMIT_STARTED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
-          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-          v.restoreFromEvent(vEvent);
-          break;
-        }
-        case VERTEX_FINISHED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
-          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-          v.restoreFromEvent(vEvent);
-          break;
-        }
-        case TASK_STARTED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          TaskStartedEvent tEvent = (TaskStartedEvent) event;
-          Task task = recoveredDAGData.recoveredDAG.getVertex(
-              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
-          task.restoreFromEvent(tEvent);
+        } catch (IOException ioe) {
+          LOG.warn("Corrupt data found when trying to read next event", ioe);
           break;
         }
-        case TASK_FINISHED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
-          Task task = recoveredDAGData.recoveredDAG.getVertex(
-              tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
-          task.restoreFromEvent(tEvent);
+        if (skipAllOtherEvents) {
+          // hit an error - skip reading other events
           break;
         }
-        case TASK_ATTEMPT_STARTED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
-          Task task =
-              recoveredDAGData.recoveredDAG.getVertex(
-                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
-                      .getTask(tEvent.getTaskAttemptID().getTaskID());
-          task.restoreFromEvent(tEvent);
-          break;
-        }
-        case TASK_ATTEMPT_FINISHED:
-        {
-          LOG.info("Recovering from event"
-              + ", eventType=" + eventType
-              + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
-          Task task =
-              recoveredDAGData.recoveredDAG.getVertex(
-                  tEvent.getTaskAttemptID().getTaskID().getVertexID())
-                  .getTask(tEvent.getTaskAttemptID().getTaskID());
-          task.restoreFromEvent(tEvent);
-          break;
+        HistoryEventType eventType = event.getEventType();
+        switch (eventType) {
+          case DAG_SUBMITTED:
+          {
+            DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
+                lastInProgressDAG);
+            recoveredDAGData.cumulativeAdditionalResources = submittedEvent
+              .getCumulativeAdditionalLocalResources();
+            recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
+            dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
+            if (recoveredDAGData.nonRecoverable) {
+              skipAllOtherEvents = true;
+            }
+            break;
+          }
+          case DAG_INITIALIZED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            break;
+          }
+          case DAG_STARTED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            break;
+          }
+          case DAG_COMMIT_STARTED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            break;
+          }
+          case VERTEX_GROUP_COMMIT_STARTED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            break;
+          }
+          case VERTEX_GROUP_COMMIT_FINISHED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            break;
+          }
+          case DAG_FINISHED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            // If this is seen, nothing to recover
+            assert recoveredDAGData.recoveredDAG != null;
+            recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+            recoveredDAGData.isCompleted = true;
+            recoveredDAGData.dagState =
+                ((DAGFinishedEvent) event).getState();
+            skipAllOtherEvents = true;
+            break;
+          }
+          case CONTAINER_LAUNCHED:
+          {
+            // Nothing to do for now
+            break;
+          }
+          case VERTEX_INITIALIZED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
+            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
+            v.restoreFromEvent(vEvent);
+            break;
+          }
+          case VERTEX_STARTED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            VertexStartedEvent vEvent = (VertexStartedEvent) event;
+            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
+            v.restoreFromEvent(vEvent);
+            break;
+          }
+          case VERTEX_PARALLELISM_UPDATED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
+            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
+            v.restoreFromEvent(vEvent);
+            break;
+          }
+          case VERTEX_COMMIT_STARTED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
+            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
+            v.restoreFromEvent(vEvent);
+            break;
+          }
+          case VERTEX_FINISHED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
+            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
+            v.restoreFromEvent(vEvent);
+            break;
+          }
+          case TASK_STARTED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            TaskStartedEvent tEvent = (TaskStartedEvent) event;
+            Task task = recoveredDAGData.recoveredDAG.getVertex(
+                tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+            task.restoreFromEvent(tEvent);
+            break;
+          }
+          case TASK_FINISHED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
+            Task task = recoveredDAGData.recoveredDAG.getVertex(
+                tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+            task.restoreFromEvent(tEvent);
+            break;
+          }
+          case TASK_ATTEMPT_STARTED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
+            Task task =
+                recoveredDAGData.recoveredDAG.getVertex(
+                    tEvent.getTaskAttemptID().getTaskID().getVertexID())
+                        .getTask(tEvent.getTaskAttemptID().getTaskID());
+            task.restoreFromEvent(tEvent);
+            break;
+          }
+          case TASK_ATTEMPT_FINISHED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
+            Task task =
+                recoveredDAGData.recoveredDAG.getVertex(
+                    tEvent.getTaskAttemptID().getTaskID().getVertexID())
+                    .getTask(tEvent.getTaskAttemptID().getTaskID());
+            task.restoreFromEvent(tEvent);
+            break;
+          }
+          case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            assert recoveredDAGData.recoveredDAG != null;
+            VertexRecoverableEventsGeneratedEvent vEvent =
+                (VertexRecoverableEventsGeneratedEvent) event;
+            Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
+            v.restoreFromEvent(vEvent);
+            break;
+          }
+          default:
+            throw new RuntimeException("Invalid data found, unknown event type "
+                + eventType);
         }
-        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-        {
-          LOG.info("Recovering from event"
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("[DAG RECOVERY]"
+              + " dagId=" + lastInProgressDAG
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          assert recoveredDAGData.recoveredDAG != null;
-          VertexRecoverableEventsGeneratedEvent vEvent =
-              (VertexRecoverableEventsGeneratedEvent) event;
-          Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
-          v.restoreFromEvent(vEvent);
-          break;
         }
-        default:
-          throw new RuntimeException("Invalid data found, unknown event type "
-              + eventType);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("[DAG RECOVERY]"
-            + " dagId=" + lastInProgressDAG
-            + ", eventType=" + eventType
-            + ", event=" + event.toString());
       }
-      newDAGRecoveryStream.writeInt(eventType.ordinal());
-      event.toProtoStream(newDAGRecoveryStream);
+      dagRecoveryStream.close();
     }
-    dagRecoveryStream.close();
-    newDAGRecoveryStream.hsync();
-    newDAGRecoveryStream.close();
 
     if (!recoveredDAGData.isCompleted
         && !recoveredDAGData.nonRecoverable) {
@@ -925,18 +845,7 @@ public class RecoveryParser {
       }
     }
 
-    LOG.info("Finished copying data from previous attempt into current attempt");
-    createDataRecoveredFlagFile();
-
     return recoveredDAGData;
   }
 
-  private void createDataRecoveredFlagFile() throws IOException {
-    Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
-        dataRecoveredFileFlag);
-    LOG.info("Trying to create data recovered flag file"
-        + ", filePath=" + dataCopiedFlagPath.toString());
-    recoveryFS.mkdirs(dataCopiedFlagPath);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/3b59666f/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 365c6a6..3db5365 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -81,10 +81,12 @@ public class RecoveryService extends AbstractService {
   private final Object lock = new Object();
   private FileSystem recoveryDirFS; // FS where staging dir exists
   Path recoveryPath;
-  Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
+  @VisibleForTesting
+  public Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
       HashMap<TezDAGID, FSDataOutputStream>();
   private int bufferSize;
-  private FSDataOutputStream summaryStream;
+  @VisibleForTesting
+  public FSDataOutputStream summaryStream;
   private int unflushedEventsCount = 0;
   private long lastFlushTime = -1;
   private int maxUnflushedEvents;

http://git-wip-us.apache.org/repos/asf/tez/blob/3b59666f/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index 41ea071..16c3a38 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -20,14 +20,29 @@ package org.apache.tez.dag.app;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.RecoveryParser.DAGSummaryData;
+import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.TestDAGImpl;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
 import org.junit.*;
 
@@ -40,14 +55,25 @@ public class TestRecoveryParser {
       + TestRecoveryParser.class.getName() + "-tmpDir";
 
   private RecoveryParser parser;
+  private FileSystem localFS;
+  private Configuration conf;
+  private Path recoveryPath;
+  private DAGAppMaster mockAppMaster;
+  private DAGImpl mockDAGImpl;
 
   @Before
   public void setUp() throws IllegalArgumentException, IOException {
-    DAGAppMaster mockAppMaster = mock(DAGAppMaster.class);
+    this.conf = new Configuration();
+    this.localFS = FileSystem.getLocal(conf);
+    this.recoveryPath = new Path(TEST_ROOT_DIR + "/recovery");
+    this.localFS.delete(new Path(TEST_ROOT_DIR), true);
+    mockAppMaster = mock(DAGAppMaster.class);
+    mockAppMaster.dagNames = new HashSet<String>();
+    mockAppMaster.dagIDs = new HashSet<String>();
     when(mockAppMaster.getConfig()).thenReturn(new Configuration());
-    parser =
-        new RecoveryParser(mockAppMaster, mock(FileSystem.class), new Path(
-            TEST_ROOT_DIR), 3);
+    mockDAGImpl = mock(DAGImpl.class);
+    when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl);
+    parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3);
   }
 
   private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) {
@@ -92,4 +118,175 @@ public class TestRecoveryParser {
         parser.getLastCompletedOrInProgressDAG(summaryDataMap);
     assertEquals(lastInProgressDAGId, lastInProgressDAG.dagId.getId());
   }
+
+  // skipAllOtherEvents due to non-recoverable (in the middle of commit)
+  @Test(timeout = 5000)
+  public void testSkipAllOtherEvents_1() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write data in attempt_1
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration())));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
+    // only for testing, DAGCommitStartedEvent is not supposed to happen at this time.
+    rService.handle(new DAGHistoryEvent(dagID,new DAGCommitStartedEvent(dagID, System.currentTimeMillis())));
+    rService.stop();
+
+    // write data in attempt_2
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/2"));
+    rService = new RecoveryService(appContext);
+    rService.init(conf);
+    rService.start();
+    // only for testing, DAGStartedEvent is not supposed to happen at this time.
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGStartedEvent(dagID, 1L, "user", "dag1")));
+    rService.stop();
+
+    RecoveredDAGData dagData = parser.parseRecoveryData();
+    assertEquals(true, dagData.nonRecoverable);
+    assertTrue(dagData.reason.contains("DAG Commit was in progress, not recoverable,"));
+    // DAGSubmittedEvent is handled but DAGInitializedEvent and DAGStartedEvent in the next attempt are both skipped
+    // due to the dag is not recoerable.
+    verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
+    verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGInitializedEvent.class));
+    verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class));
+  }
+
+  // skipAllOtherEvents due to dag finished
+  @Test (timeout = 5000)
+  public void testSkipAllOtherEvents_2() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write data in attempt_1
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration())));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null)));
+    rService.handle(new DAGHistoryEvent(dagID, new DAGStartedEvent(dagID, 1L, "user", "dag1")));
+    rService.stop();
+
+    // write data in attempt_2
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/2"));
+    rService = new RecoveryService(appContext);
+    rService.init(conf);
+    rService.start();
+    rService.handle(new DAGHistoryEvent(dagID,
+       new DAGStartedEvent(dagID, 1L, "user", "dag1")));
+    rService.stop();
+
+    RecoveredDAGData dagData = parser.parseRecoveryData();
+    assertEquals(false, dagData.nonRecoverable);
+    assertEquals(DAGState.FAILED, dagData.dagState);
+    assertEquals(true, dagData.isCompleted);
+    // DAGSubmittedEvent, DAGInitializedEvent and DAGFinishedEvent is handled
+    verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
+    verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGInitializedEvent.class));
+    verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGFinishedEvent.class));
+    // DAGStartedEvent is skipped due to it is after DAGFinishedEvent
+    verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class));
+  }
+
+  @Test(timeout = 5000)
+  public void testLastCorruptedRecoveryRecord() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write data in attempt_1
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration())));
+    // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread
+    rService.await();
+    rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
+    rService.stop();
+
+    // write data in attempt_2
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/2"));
+    rService = new RecoveryService(appContext);
+    rService.init(conf);
+    rService.start();
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
+    rService.await();
+    rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
+    rService.stop();
+
+    // corrupted last records will be skipped but the whole recovery logs will be read
+    RecoveredDAGData dagData = parser.parseRecoveryData();
+    assertEquals(false, dagData.isCompleted);
+    assertEquals(null, dagData.reason);
+    assertEquals(false, dagData.nonRecoverable);
+    // verify DAGSubmitedEvent & DAGInititlizedEvent is handled.
+    verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
+    verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGInitializedEvent.class));
+  }
+
+  @Test(timeout = 5000)
+  public void testLastCorruptedSummaryRecord() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+
+    // write data in attempt_1
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // write a DAGSubmittedEvent first to initialize summaryStream
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration())));
+    // write an corrupted SummaryEvent
+    rService.summaryStream.writeChars("INVALID_DATA");
+    rService.stop();
+
+    try {
+      // Corrupted SummaryEvent will cause recovery fail (throw exception here)
+      parser.parseRecoveryData();
+      fail();
+    } catch (IOException e) {
+      // exception when parsing protobuf object
+      e.printStackTrace();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/3b59666f/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 321c15c..b5877bb 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -468,13 +468,20 @@ public class TestAMRecovery {
     Path recoveryDataDir =
         TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
     FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
-    Path currentAttemptRecoveryDataDir =
-        TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, attemptNum);
-    Path recoveryFilePath =
-        new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
-            "application", "dag")
-            + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-    return RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath));
+    List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
+    for (int i=1; i <= attemptNum; ++i) {
+      Path currentAttemptRecoveryDataDir =
+          TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
+      Path recoveryFilePath =
+          new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
+              "application", "dag")
+              + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+      if (fs.exists(recoveryFilePath)) {
+        LOG.info("read recovery file:" + recoveryFilePath);
+        historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath)));
+      }
+    }
+    return historyEvents;
   }
 
   private void printHistoryEvents(List<HistoryEvent> historyEvents, int attemptId) {

http://git-wip-us.apache.org/repos/asf/tez/blob/3b59666f/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 35e6dcd..609929e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -57,6 +57,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
@@ -181,12 +182,19 @@ public class TestDAGRecovery {
     Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
 
     FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
-    for (int i=1; i<=3; ++i) {
-      Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i);
-      Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir,
-      appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-      List<HistoryEvent> historyEvents = RecoveryParser.parseDAGRecoveryFile(
-          fs.open(recoveryFilePath));
+    // verify recovery logs in each attempt
+    for (int attemptNum=1; attemptNum<=3; ++attemptNum) {
+      List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
+      // read the recovery logs for current attempt
+      // since dag recovery logs is dispersed in each attempt's recovery directory,
+      // so need to read recovery logs from the first attempt to current attempt
+      for (int i=1 ;i<=attemptNum;++i) {
+        Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i);
+        Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir,
+        appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+        historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(
+            fs.open(recoveryFilePath)));
+      }
 
       int inputInfoEventIndex = -1;
       int vertexInitedEventIndex = -1;


Mime
View raw message