Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D2AC17500 for ; Thu, 9 Apr 2015 20:33:04 +0000 (UTC) Received: (qmail 53182 invoked by uid 500); 9 Apr 2015 20:32:58 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 53084 invoked by uid 500); 9 Apr 2015 20:32:58 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 53029 invoked by uid 99); 9 Apr 2015 20:32:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 20:32:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D0477E00E4; Thu, 9 Apr 2015 20:32:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 09 Apr 2015 20:32:58 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/26] tez git commit: TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir (zjffdu) TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/47f3e832 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/47f3e832 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/47f3e832 Branch: refs/heads/TEZ-2003 Commit: 47f3e8325dad0124de74f6c1305d627046125e0d Parents: 74b400b Author: Jeff Zhang Authored: Wed Apr 8 13:53:32 2015 +0800 Committer: Jeff Zhang Committed: Wed Apr 8 13:53:32 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/app/RecoveryParser.java | 698 ++++++++----------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 - .../dag/history/recovery/RecoveryService.java | 6 +- .../apache/tez/dag/app/TestRecoveryParser.java | 205 +++++- .../org/apache/tez/test/TestAMRecovery.java | 21 +- .../org/apache/tez/test/TestDAGRecovery.java | 20 +- 7 files changed, 537 insertions(+), 415 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/47f3e832/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a5c46cc..db1f9a8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir TEZ-2223. TestMockDAGAppMaster fails due to TEZ-2210 on mac. TEZ-2236. Tez UI: Support loading of all tasks in the dag tasks page TEZ-2159. Tez UI: download timeline data for offline use. http://git-wip-us.apache.org/repos/asf/tez/blob/47f3e832/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 21683b0..8d1005f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -30,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -84,8 +83,6 @@ public class RecoveryParser { private final int recoveryBufferSize; private final int currentAttemptId; - private static final String dataRecoveredFileFlag = "dataRecovered"; - public RecoveryParser(DAGAppMaster dagAppMaster, FileSystem recoveryFS, Path recoveryDataDir, @@ -277,11 +274,6 @@ public class RecoveryParser { return TezCommonUtils.getSummaryRecoveryPath(attemptRrecoveryDataDir); } - private FSDataOutputStream getSummaryOutputStream(Path summaryPath) - throws IOException { - return recoveryFS.create(summaryPath, true, recoveryBufferSize); - } - private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException { if (!recoveryFS.exists(summaryPath)) { @@ -296,24 +288,6 @@ public class RecoveryParser { dagID.toString() + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); } - private FSDataInputStream getDAGRecoveryStream(Path recoveryDataDir, - TezDAGID dagID) - throws IOException { - Path dagRecoveryPath = getDAGRecoveryFilePath(recoveryDataDir, dagID); - if (!recoveryFS.exists(dagRecoveryPath)) { - return null; - } - return recoveryFS.open(dagRecoveryPath, recoveryBufferSize); - } - - private FSDataOutputStream getDAGRecoveryOutputStream(Path recoveryDataDir, - TezDAGID dagID) - throws IOException { - Path dagRecoveryPath = new Path(recoveryDataDir, - dagID.toString() + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); - return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize); - } - @VisibleForTesting DAGSummaryData getLastCompletedOrInProgressDAG( Map dagSummaryDataMap) { @@ -340,65 +314,6 @@ public class RecoveryParser { return inProgressDAG; } - private Path getPreviousAttemptRecoveryDataDir() throws IOException { - LOG.info("Looking for the correct attempt directory to recover from"); - int foundPreviousAttempt = -1; - for (int i = currentAttemptId - 1; i > 0; --i) { - Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i); - LOG.info("Looking at attempt directory, path=" + attemptPath); - Path fatalErrorOccurred = new Path(attemptPath, - RecoveryService.RECOVERY_FATAL_OCCURRED_DIR); - if (recoveryFS.exists(fatalErrorOccurred)) { - throw new IOException("Found that a fatal error occurred in" - + " recovery during previous attempt, foundFile=" - + fatalErrorOccurred.toString()); - } - - Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag); - try { - if (recoveryFS.exists(dataRecoveredFile)) { - LOG.info("Found data recovered file in attempt directory" - + ", dataRecoveredFile=" + dataRecoveredFile - + ", path=" + attemptPath); - foundPreviousAttempt = i; - break; - } - LOG.info("Skipping attempt directory as data recovered file does not exist" - + ", dataRecoveredFile=" + dataRecoveredFile - + ", path=" + attemptPath); - } catch (IOException e) { - LOG.warn("Exception when checking previous attempt dir for " - + dataRecoveredFile.toString(), e); - } - } - if (foundPreviousAttempt == -1) { - // Look for oldest summary file and use that - LOG.info("Did not find any attempt dir that had data recovered file." - + " Looking for oldest summary file"); - for (int i = 1; i < currentAttemptId; ++i) { - Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i); - Path summaryPath = getSummaryPath(attemptPath); - if (recoveryFS.exists(summaryPath)) { - LOG.info("Found summary file in attempt directory" - + ", summaryFile=" + summaryPath - + ", path=" + attemptPath); - foundPreviousAttempt = i; - break; - } - LOG.info("Skipping attempt directory as no summary file found" - + ", summaryFile=" + summaryPath - + ", path=" + attemptPath); - } - } - if (foundPreviousAttempt == -1) { - LOG.info("Falling back to first attempt as no other recovered attempts" - + " found"); - foundPreviousAttempt = 1; - } - - return TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, foundPreviousAttempt); - } - @VisibleForTesting static class DAGSummaryData { @@ -529,75 +444,89 @@ public class RecoveryParser { return null; } - public RecoveredDAGData parseRecoveryData() throws IOException { - Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir(); - LOG.info("Using " + previousAttemptRecoveryDataDir.toString() - + " for recovering data from previous attempt"); - if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) { - LOG.info("Nothing to recover as previous attempt data does not exist" - + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()); - createDataRecoveredFlagFile(); - return null; + private List getSummaryFiles() throws IOException { + List summaryFiles = new ArrayList(); + for (int i = 1; i < currentAttemptId; ++i) { + Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i); + Path fatalErrorOccurred = new Path(attemptPath, + RecoveryService.RECOVERY_FATAL_OCCURRED_DIR); + if (recoveryFS.exists(fatalErrorOccurred)) { + throw new IOException("Found that a fatal error occurred in" + + " recovery during previous attempt, foundFile=" + + fatalErrorOccurred.toString()); + } + Path summaryFile = getSummaryPath(attemptPath); + if (recoveryFS.exists(summaryFile)) { + summaryFiles.add(summaryFile); + } } + return summaryFiles; + } - Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir); - FSDataInputStream summaryStream = getSummaryStream( - summaryPath); - if (summaryStream == null) { - LOG.info("Nothing to recover as summary file does not exist" - + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString() - + ", summaryPath=" + summaryPath.toString()); - createDataRecoveredFlagFile(); - return null; + private List getDAGRecoveryFiles(TezDAGID dagId) throws IOException { + List recoveryFiles = new ArrayList(); + for (int i = 1; i < currentAttemptId; ++i) { + Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i); + Path recoveryFile = getDAGRecoveryFilePath(attemptPath, dagId); + if (recoveryFS.exists(recoveryFile)) { + recoveryFiles.add(recoveryFile); + } } + return recoveryFiles; + } - Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir); - FSDataOutputStream newSummaryStream = - getSummaryOutputStream(newSummaryPath); - - FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath); - LOG.info("Parsing summary file" - + ", path=" + summaryPath.toString() - + ", len=" + summaryFileStatus.getLen() - + ", lastModTime=" + summaryFileStatus.getModificationTime()); - + public RecoveredDAGData parseRecoveryData() throws IOException { int dagCounter = 0; - Map dagSummaryDataMap = new HashMap(); - while (true) { - RecoveryProtos.SummaryEventProto proto; - try { - proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream); - if (proto == null) { + List summaryFiles = getSummaryFiles(); + for (Path summaryFile : summaryFiles) { + FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryFile); + LOG.info("Parsing summary file" + + ", path=" + summaryFile.toString() + + ", len=" + summaryFileStatus.getLen() + + ", lastModTime=" + summaryFileStatus.getModificationTime()); + FSDataInputStream summaryStream = getSummaryStream( + summaryFile); + while (true) { + RecoveryProtos.SummaryEventProto proto; + try { + proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream); + if (proto == null) { + LOG.info("Reached end of summary stream"); + break; + } + } catch (EOFException eof) { LOG.info("Reached end of summary stream"); break; } - } catch (EOFException eof) { - LOG.info("Reached end of summary stream"); - break; - } - HistoryEventType eventType = - HistoryEventType.values()[proto.getEventType()]; - if (LOG.isDebugEnabled()) { - LOG.debug("[RECOVERY SUMMARY]" - + " dagId=" + proto.getDagId() - + ", timestamp=" + proto.getTimestamp() - + ", event=" + eventType); - } - TezDAGID dagId = TezDAGID.fromString(proto.getDagId()); - if (dagCounter < dagId.getId()) { - dagCounter = dagId.getId(); - } - if (!dagSummaryDataMap.containsKey(dagId)) { - dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId)); + HistoryEventType eventType = + HistoryEventType.values()[proto.getEventType()]; + if (LOG.isDebugEnabled()) { + LOG.debug("[RECOVERY SUMMARY]" + + " dagId=" + proto.getDagId() + + ", timestamp=" + proto.getTimestamp() + + ", event=" + eventType); + } + TezDAGID dagId = TezDAGID.fromString(proto.getDagId()); + if (dagId == null) { + throw new IOException("null dagId, summary records may be corrupted"); + } + if (dagCounter < dagId.getId()) { + dagCounter = dagId.getId(); + } + if (!dagSummaryDataMap.containsKey(dagId)) { + dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId)); + } + try { + dagSummaryDataMap.get(dagId).handleSummaryEvent(proto); + } catch (Exception e) { + // any exception when parsing protobuf + throw new IOException("Error when parsing summary event proto", e); + } } - dagSummaryDataMap.get(dagId).handleSummaryEvent(proto); - proto.writeDelimitedTo(newSummaryStream); + summaryStream.close(); } - summaryStream.close(); - newSummaryStream.hsync(); - newSummaryStream.close(); // Set counter for next set of DAGs & update dagNames Set in DAGAppMaster dagAppMaster.setDAGCounter(dagCounter); @@ -635,264 +564,254 @@ public class RecoveryParser { recoveredDAGData.reason = nonRecoverableReason; } - LOG.info("Trying to recover dag from recovery file" - + ", dagId=" + lastInProgressDAG.toString() - + ", dataDir=" + previousAttemptRecoveryDataDir - + ", intoCurrentDir=" + currentAttemptRecoveryDataDir); - - FSDataInputStream dagRecoveryStream = getDAGRecoveryStream( - previousAttemptRecoveryDataDir, lastInProgressDAG); - if (dagRecoveryStream == null) { - // Could not find data to recover - // Error out - throw new IOException("Could not find recovery data for last in progress DAG" - + ", dagId=" + lastInProgressDAG); - } - - LOG.info("Copying DAG data into Current Attempt directory" - + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir, - lastInProgressDAG)); - FSDataOutputStream newDAGRecoveryStream = - getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG); - + List dagRecoveryFiles = getDAGRecoveryFiles(lastInProgressDAG); boolean skipAllOtherEvents = false; - while (true) { - HistoryEvent event; - try { - event = getNextEvent(dagRecoveryStream); - if (event == null) { - LOG.info("Reached end of dag recovery stream"); - break; - } - } catch (EOFException eof) { - LOG.info("Reached end of dag recovery stream"); - break; - } catch (IOException ioe) { - LOG.warn("Corrupt data found when trying to read next event", ioe); - break; - } + Path lastRecoveryFile = null; + for (Path dagRecoveryFile : dagRecoveryFiles) { if (skipAllOtherEvents) { - // hit an error - skip reading other events + LOG.warn("Other recovery files will be skipped due to error in the previous recovery file" + + lastRecoveryFile); break; } - HistoryEventType eventType = event.getEventType(); - switch (eventType) { - case DAG_SUBMITTED: - { - DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event; - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(), - lastInProgressDAG); - recoveredDAGData.cumulativeAdditionalResources = submittedEvent - .getCumulativeAdditionalLocalResources(); - recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID(); - dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG); - if (recoveredDAGData.nonRecoverable) { - skipAllOtherEvents = true; + lastRecoveryFile = dagRecoveryFile; + LOG.info("Trying to recover dag from recovery file" + + ", dagId=" + lastInProgressDAG.toString() + + ", dagRecoveryFile=" + dagRecoveryFile); + FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize); + while (true) { + HistoryEvent event; + try { + event = getNextEvent(dagRecoveryStream); + if (event == null) { + LOG.info("Reached end of dag recovery stream"); + break; } + } catch (EOFException eof) { + LOG.info("Reached end of dag recovery stream"); break; - } - case DAG_INITIALIZED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - break; - } - case DAG_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - break; - } - case DAG_COMMIT_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - break; - } - case VERTEX_GROUP_COMMIT_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - break; - } - case VERTEX_GROUP_COMMIT_FINISHED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - break; - } - case DAG_FINISHED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - // If this is seen, nothing to recover - assert recoveredDAGData.recoveredDAG != null; - recoveredDAGData.recoveredDAG.restoreFromEvent(event); - recoveredDAGData.isCompleted = true; - recoveredDAGData.dagState = - ((DAGFinishedEvent) event).getState(); - skipAllOtherEvents = true; - break; - } - case CONTAINER_LAUNCHED: - { - // Nothing to do for now - break; - } - case VERTEX_INITIALIZED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexInitializedEvent vEvent = (VertexInitializedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); - break; - } - case VERTEX_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexStartedEvent vEvent = (VertexStartedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); - break; - } - case VERTEX_PARALLELISM_UPDATED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); - break; - } - case VERTEX_COMMIT_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); - break; - } - case VERTEX_FINISHED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexFinishedEvent vEvent = (VertexFinishedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); - break; - } - case TASK_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskStartedEvent tEvent = (TaskStartedEvent) event; - Task task = recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID()); - task.restoreFromEvent(tEvent); - break; - } - case TASK_FINISHED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskFinishedEvent tEvent = (TaskFinishedEvent) event; - Task task = recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID()); - task.restoreFromEvent(tEvent); + } catch (IOException ioe) { + LOG.warn("Corrupt data found when trying to read next event", ioe); break; } - case TASK_ATTEMPT_STARTED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event; - Task task = - recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskAttemptID().getTaskID().getVertexID()) - .getTask(tEvent.getTaskAttemptID().getTaskID()); - task.restoreFromEvent(tEvent); + if (skipAllOtherEvents) { + // hit an error - skip reading other events break; } - case TASK_ATTEMPT_FINISHED: - { - LOG.info("Recovering from event" - + ", eventType=" + eventType - + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event; - Task task = - recoveredDAGData.recoveredDAG.getVertex( - tEvent.getTaskAttemptID().getTaskID().getVertexID()) - .getTask(tEvent.getTaskAttemptID().getTaskID()); - task.restoreFromEvent(tEvent); - break; + HistoryEventType eventType = event.getEventType(); + switch (eventType) { + case DAG_SUBMITTED: + { + DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event; + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(), + lastInProgressDAG); + recoveredDAGData.cumulativeAdditionalResources = submittedEvent + .getCumulativeAdditionalLocalResources(); + recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID(); + dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG); + if (recoveredDAGData.nonRecoverable) { + skipAllOtherEvents = true; + } + break; + } + case DAG_INITIALIZED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + recoveredDAGData.recoveredDAG.restoreFromEvent(event); + break; + } + case DAG_STARTED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + recoveredDAGData.recoveredDAG.restoreFromEvent(event); + break; + } + case DAG_COMMIT_STARTED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + recoveredDAGData.recoveredDAG.restoreFromEvent(event); + break; + } + case VERTEX_GROUP_COMMIT_STARTED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + recoveredDAGData.recoveredDAG.restoreFromEvent(event); + break; + } + case VERTEX_GROUP_COMMIT_FINISHED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + recoveredDAGData.recoveredDAG.restoreFromEvent(event); + break; + } + case DAG_FINISHED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + // If this is seen, nothing to recover + assert recoveredDAGData.recoveredDAG != null; + recoveredDAGData.recoveredDAG.restoreFromEvent(event); + recoveredDAGData.isCompleted = true; + recoveredDAGData.dagState = + ((DAGFinishedEvent) event).getState(); + skipAllOtherEvents = true; + break; + } + case CONTAINER_LAUNCHED: + { + // Nothing to do for now + break; + } + case VERTEX_INITIALIZED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + VertexInitializedEvent vEvent = (VertexInitializedEvent) event; + Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); + v.restoreFromEvent(vEvent); + break; + } + case VERTEX_STARTED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + VertexStartedEvent vEvent = (VertexStartedEvent) event; + Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); + v.restoreFromEvent(vEvent); + break; + } + case VERTEX_PARALLELISM_UPDATED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event; + Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); + v.restoreFromEvent(vEvent); + break; + } + case VERTEX_COMMIT_STARTED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event; + Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); + v.restoreFromEvent(vEvent); + break; + } + case VERTEX_FINISHED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + VertexFinishedEvent vEvent = (VertexFinishedEvent) event; + Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); + v.restoreFromEvent(vEvent); + break; + } + case TASK_STARTED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + TaskStartedEvent tEvent = (TaskStartedEvent) event; + Task task = recoveredDAGData.recoveredDAG.getVertex( + tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID()); + task.restoreFromEvent(tEvent); + break; + } + case TASK_FINISHED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + TaskFinishedEvent tEvent = (TaskFinishedEvent) event; + Task task = recoveredDAGData.recoveredDAG.getVertex( + tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID()); + task.restoreFromEvent(tEvent); + break; + } + case TASK_ATTEMPT_STARTED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event; + Task task = + recoveredDAGData.recoveredDAG.getVertex( + tEvent.getTaskAttemptID().getTaskID().getVertexID()) + .getTask(tEvent.getTaskAttemptID().getTaskID()); + task.restoreFromEvent(tEvent); + break; + } + case TASK_ATTEMPT_FINISHED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event; + Task task = + recoveredDAGData.recoveredDAG.getVertex( + tEvent.getTaskAttemptID().getTaskID().getVertexID()) + .getTask(tEvent.getTaskAttemptID().getTaskID()); + task.restoreFromEvent(tEvent); + break; + } + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + assert recoveredDAGData.recoveredDAG != null; + VertexRecoverableEventsGeneratedEvent vEvent = + (VertexRecoverableEventsGeneratedEvent) event; + Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); + v.restoreFromEvent(vEvent); + break; + } + default: + throw new RuntimeException("Invalid data found, unknown event type " + + eventType); } - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - { - LOG.info("Recovering from event" + if (LOG.isDebugEnabled()) { + LOG.debug("[DAG RECOVERY]" + + " dagId=" + lastInProgressDAG + ", eventType=" + eventType + ", event=" + event.toString()); - assert recoveredDAGData.recoveredDAG != null; - VertexRecoverableEventsGeneratedEvent vEvent = - (VertexRecoverableEventsGeneratedEvent) event; - Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID()); - v.restoreFromEvent(vEvent); - break; } - default: - throw new RuntimeException("Invalid data found, unknown event type " - + eventType); - } - if (LOG.isDebugEnabled()) { - LOG.debug("[DAG RECOVERY]" - + " dagId=" + lastInProgressDAG - + ", eventType=" + eventType - + ", event=" + event.toString()); } - newDAGRecoveryStream.writeInt(eventType.ordinal()); - event.toProtoStream(newDAGRecoveryStream); + dagRecoveryStream.close(); } - dagRecoveryStream.close(); - newDAGRecoveryStream.hsync(); - newDAGRecoveryStream.close(); if (!recoveredDAGData.isCompleted && !recoveredDAGData.nonRecoverable) { @@ -930,18 +849,7 @@ public class RecoveryParser { } } - LOG.info("Finished copying data from previous attempt into current attempt"); - createDataRecoveredFlagFile(); - return recoveredDAGData; } - private void createDataRecoveredFlagFile() throws IOException { - Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir, - dataRecoveredFileFlag); - LOG.info("Trying to create data recovered flag file" - + ", filePath=" + dataCopiedFlagPath.toString()); - recoveryFS.mkdirs(dataCopiedFlagPath); - } - } http://git-wip-us.apache.org/repos/asf/tez/blob/47f3e832/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 81e9bb9..183e780 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -814,7 +814,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, isSpeculationEnabled = vertexConf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT); - LOG.info("isSpeculationEnabled:" + isSpeculationEnabled); if (isSpeculationEnabled()) { speculator = new LegacySpeculator(vertexConf, getAppContext(), this); } http://git-wip-us.apache.org/repos/asf/tez/blob/47f3e832/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 1801253..04a925f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -81,10 +81,12 @@ public class RecoveryService extends AbstractService { private final Object lock = new Object(); private FileSystem recoveryDirFS; // FS where staging dir exists Path recoveryPath; - Map outputStreamMap = new + @VisibleForTesting + public Map outputStreamMap = new HashMap(); private int bufferSize; - private FSDataOutputStream summaryStream; + @VisibleForTesting + public FSDataOutputStream summaryStream; private int unflushedEventsCount = 0; private long lastFlushTime = -1; private int maxUnflushedEvents; http://git-wip-us.apache.org/repos/asf/tez/blob/47f3e832/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index 41ea071..16c3a38 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -20,14 +20,29 @@ package org.apache.tez.dag.app; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.RecoveryParser.DAGSummaryData; +import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.app.dag.impl.TestDAGImpl; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.DAGCommitStartedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.recovery.RecoveryService; import org.apache.tez.dag.records.TezDAGID; import org.junit.*; @@ -40,14 +55,25 @@ public class TestRecoveryParser { + TestRecoveryParser.class.getName() + "-tmpDir"; private RecoveryParser parser; + private FileSystem localFS; + private Configuration conf; + private Path recoveryPath; + private DAGAppMaster mockAppMaster; + private DAGImpl mockDAGImpl; @Before public void setUp() throws IllegalArgumentException, IOException { - DAGAppMaster mockAppMaster = mock(DAGAppMaster.class); + this.conf = new Configuration(); + this.localFS = FileSystem.getLocal(conf); + this.recoveryPath = new Path(TEST_ROOT_DIR + "/recovery"); + this.localFS.delete(new Path(TEST_ROOT_DIR), true); + mockAppMaster = mock(DAGAppMaster.class); + mockAppMaster.dagNames = new HashSet(); + mockAppMaster.dagIDs = new HashSet(); when(mockAppMaster.getConfig()).thenReturn(new Configuration()); - parser = - new RecoveryParser(mockAppMaster, mock(FileSystem.class), new Path( - TEST_ROOT_DIR), 3); + mockDAGImpl = mock(DAGImpl.class); + when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl); + parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3); } private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) { @@ -92,4 +118,175 @@ public class TestRecoveryParser { parser.getLastCompletedOrInProgressDAG(summaryDataMap); assertEquals(lastInProgressDAGId, lastInProgressDAG.dagId.getId()); } + + // skipAllOtherEvents due to non-recoverable (in the middle of commit) + @Test(timeout = 5000) + public void testSkipAllOtherEvents_1() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write data in attempt_1 + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration()))); + rService.handle(new DAGHistoryEvent(dagID, + new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); + // only for testing, DAGCommitStartedEvent is not supposed to happen at this time. + rService.handle(new DAGHistoryEvent(dagID,new DAGCommitStartedEvent(dagID, System.currentTimeMillis()))); + rService.stop(); + + // write data in attempt_2 + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/2")); + rService = new RecoveryService(appContext); + rService.init(conf); + rService.start(); + // only for testing, DAGStartedEvent is not supposed to happen at this time. + rService.handle(new DAGHistoryEvent(dagID, + new DAGStartedEvent(dagID, 1L, "user", "dag1"))); + rService.stop(); + + RecoveredDAGData dagData = parser.parseRecoveryData(); + assertEquals(true, dagData.nonRecoverable); + assertTrue(dagData.reason.contains("DAG Commit was in progress, not recoverable,")); + // DAGSubmittedEvent is handled but DAGInitializedEvent and DAGStartedEvent in the next attempt are both skipped + // due to the dag is not recoerable. + verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class)); + verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGInitializedEvent.class)); + verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class)); + } + + // skipAllOtherEvents due to dag finished + @Test (timeout = 5000) + public void testSkipAllOtherEvents_2() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write data in attempt_1 + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration()))); + rService.handle(new DAGHistoryEvent(dagID, + new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); + rService.handle(new DAGHistoryEvent(dagID, + new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null))); + rService.handle(new DAGHistoryEvent(dagID, new DAGStartedEvent(dagID, 1L, "user", "dag1"))); + rService.stop(); + + // write data in attempt_2 + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/2")); + rService = new RecoveryService(appContext); + rService.init(conf); + rService.start(); + rService.handle(new DAGHistoryEvent(dagID, + new DAGStartedEvent(dagID, 1L, "user", "dag1"))); + rService.stop(); + + RecoveredDAGData dagData = parser.parseRecoveryData(); + assertEquals(false, dagData.nonRecoverable); + assertEquals(DAGState.FAILED, dagData.dagState); + assertEquals(true, dagData.isCompleted); + // DAGSubmittedEvent, DAGInitializedEvent and DAGFinishedEvent is handled + verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class)); + verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGInitializedEvent.class)); + verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGFinishedEvent.class)); + // DAGStartedEvent is skipped due to it is after DAGFinishedEvent + verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class)); + } + + @Test(timeout = 5000) + public void testLastCorruptedRecoveryRecord() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write data in attempt_1 + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration()))); + // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread + rService.await(); + rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); + rService.stop(); + + // write data in attempt_2 + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/2")); + rService = new RecoveryService(appContext); + rService.init(conf); + rService.start(); + rService.handle(new DAGHistoryEvent(dagID, + new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); + rService.await(); + rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); + rService.stop(); + + // corrupted last records will be skipped but the whole recovery logs will be read + RecoveredDAGData dagData = parser.parseRecoveryData(); + assertEquals(false, dagData.isCompleted); + assertEquals(null, dagData.reason); + assertEquals(false, dagData.nonRecoverable); + // verify DAGSubmitedEvent & DAGInititlizedEvent is handled. + verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class)); + verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGInitializedEvent.class)); + } + + @Test(timeout = 5000) + public void testLastCorruptedSummaryRecord() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + + // write data in attempt_1 + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // write a DAGSubmittedEvent first to initialize summaryStream + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration()))); + // write an corrupted SummaryEvent + rService.summaryStream.writeChars("INVALID_DATA"); + rService.stop(); + + try { + // Corrupted SummaryEvent will cause recovery fail (throw exception here) + parser.parseRecoveryData(); + fail(); + } catch (IOException e) { + // exception when parsing protobuf object + e.printStackTrace(); + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/47f3e832/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 9e0c02f..1d17b23 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -474,13 +474,20 @@ public class TestAMRecovery { Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf); FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf); - Path currentAttemptRecoveryDataDir = - TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, attemptNum); - Path recoveryFilePath = - new Path(currentAttemptRecoveryDataDir, appId.toString().replace( - "application", "dag") - + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); - return RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath)); + List historyEvents = new ArrayList(); + for (int i=1; i <= attemptNum; ++i) { + Path currentAttemptRecoveryDataDir = + TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i); + Path recoveryFilePath = + new Path(currentAttemptRecoveryDataDir, appId.toString().replace( + "application", "dag") + + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); + if (fs.exists(recoveryFilePath)) { + LOG.info("read recovery file:" + recoveryFilePath); + historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath))); + } + } + return historyEvents; } private void printHistoryEvents(List historyEvents, int attemptId) { http://git-wip-us.apache.org/repos/asf/tez/blob/47f3e832/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index a5f4025..74c2727 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -57,6 +57,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -181,12 +182,19 @@ public class TestDAGRecovery { Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf); FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf); - for (int i=1; i<=3; ++i) { - Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i); - Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir, - appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); - List historyEvents = RecoveryParser.parseDAGRecoveryFile( - fs.open(recoveryFilePath)); + // verify recovery logs in each attempt + for (int attemptNum=1; attemptNum<=3; ++attemptNum) { + List historyEvents = new ArrayList(); + // read the recovery logs for current attempt + // since dag recovery logs is dispersed in each attempt's recovery directory, + // so need to read recovery logs from the first attempt to current attempt + for (int i=1 ;i<=attemptNum;++i) { + Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i); + Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir, + appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); + historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile( + fs.open(recoveryFilePath))); + } int inputInfoEventIndex = -1; int vertexInitedEventIndex = -1;