Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 614C710738 for ; Wed, 26 Mar 2014 18:44:28 +0000 (UTC) Received: (qmail 19644 invoked by uid 500); 26 Mar 2014 18:44:28 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 19606 invoked by uid 500); 26 Mar 2014 18:44:26 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 19552 invoked by uid 99); 26 Mar 2014 18:44:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Mar 2014 18:44:24 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 26 Mar 2014 18:44:13 +0000 Received: (qmail 14959 invoked by uid 99); 26 Mar 2014 18:43:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Mar 2014 18:43:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4C61883636F; Wed, 26 Mar 2014 18:43:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-973. Abort additional attempts if recovery fails. (hitesh) Date: Wed, 26 Mar 2014 18:43:49 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master f31aba7bc -> 5a6f42a81 TEZ-973. Abort additional attempts if recovery fails. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5a6f42a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5a6f42a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5a6f42a8 Branch: refs/heads/master Commit: 5a6f42a8118ef47475b4522ae984810616914461 Parents: f31aba7 Author: Hitesh Shah Authored: Wed Mar 26 11:43:17 2014 -0700 Committer: Hitesh Shah Committed: Wed Mar 26 11:43:17 2014 -0700 ---------------------------------------------------------------------- .../org/apache/tez/dag/app/DAGAppMaster.java | 29 +++++- .../org/apache/tez/dag/app/RecoveryParser.java | 57 +++++++++-- .../tez/dag/app/dag/DAGTerminationCause.java | 5 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 101 +++++++++++++------ .../apache/tez/dag/app/dag/impl/VertexImpl.java | 42 +++++--- .../tez/dag/history/HistoryEventHandler.java | 14 ++- .../dag/history/recovery/RecoveryService.java | 44 +++++++- .../mapreduce/examples/OrderedWordCount.java | 4 + 8 files changed, 237 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 45a4f18..526bea7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -444,6 +444,11 @@ public class DAGAppMaster extends AbstractService { + ", dagState=" + finishEvt.getDAGState()); lastDAGCompletionTime = clock.getTime(); _updateLoggers(currentDAG, "_post"); + if (this.historyEventHandler.hasRecoveryFailed()) { + LOG.warn("Recovery had a fatal error, shutting down session after" + + " DAG completion"); + sessionStopped.set(true); + } switch(finishEvt.getDAGState()) { case SUCCEEDED: if (!currentDAG.getName().startsWith( @@ -451,7 +456,6 @@ public class DAGAppMaster extends AbstractService { successfulDAGs.incrementAndGet(); } break; - case ERROR: case FAILED: if (!currentDAG.getName().startsWith( TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) { @@ -464,6 +468,11 @@ public class DAGAppMaster extends AbstractService { killedDAGs.incrementAndGet(); } break; + case ERROR: + if (!currentDAG.getName().startsWith( + TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) { + failedDAGs.incrementAndGet(); + } default: LOG.fatal("Received a DAG Finished Event with state=" + finishEvt.getDAGState() @@ -481,7 +490,11 @@ public class DAGAppMaster extends AbstractService { } else { LOG.info("Session shutting down now."); this.taskSchedulerEventHandler.setShouldUnregisterFlag(); - state = DAGAppMasterState.SUCCEEDED; + if (this.historyEventHandler.hasRecoveryFailed()) { + state = DAGAppMasterState.FAILED; + } else { + state = DAGAppMasterState.SUCCEEDED; + } shutdownHandler.shutdown(); } } @@ -1418,7 +1431,17 @@ public class DAGAppMaster extends AbstractService { this.lastDAGCompletionTime = clock.getTime(); - RecoveredDAGData recoveredDAGData = recoverDAG(); + RecoveredDAGData recoveredDAGData; + try { + recoveredDAGData = recoverDAG(); + } catch (IOException e) { + LOG.error("Error occurred when trying to recover data from previous attempt." + + " Shutting down AM", e); + this.state = DAGAppMasterState.ERROR; + this.taskSchedulerEventHandler.setShouldUnregisterFlag(); + shutdownHandler.shutdown(); + return; + } if (!isSession) { LOG.info("In Non-Session mode."); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 7e1feca..45c98e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -60,6 +60,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.recovery.RecoveryService; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; @@ -81,7 +82,7 @@ public class RecoveryParser { public RecoveryParser(DAGAppMaster dagAppMaster, FileSystem recoveryFS, Path recoveryDataDir, - int currentAttemptId) { + int currentAttemptId) throws IOException { this.dagAppMaster = dagAppMaster; this.recoveryFS = recoveryFS; this.recoveryDataDir = recoveryDataDir; @@ -91,6 +92,8 @@ public class RecoveryParser { recoveryBufferSize = dagAppMaster.getConfig().getInt( TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE, TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT); + + this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir); } public static class RecoveredDAGData { @@ -308,22 +311,57 @@ public class RecoveryParser { return inProgressDAG; } - private Path getPreviousAttemptRecoveryDataDir() { + private Path getPreviousAttemptRecoveryDataDir() throws IOException { + LOG.info("Looking for the correct attempt directory to recover from"); int foundPreviousAttempt = -1; for (int i = currentAttemptId - 1; i > 0; --i) { Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i); + LOG.info("Looking at attempt directory, path=" + attemptPath); + Path fatalErrorOccurred = new Path(attemptPath, + RecoveryService.RECOVERY_FATAL_OCCURRED_DIR); + if (recoveryFS.exists(fatalErrorOccurred)) { + throw new IOException("Found that a fatal error occurred in" + + " recovery during previous attempt, foundFile=" + + fatalErrorOccurred.toString()); + } + Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag); try { if (recoveryFS.exists(dataRecoveredFile)) { + LOG.info("Found data recovered file in attempt directory" + + ", dataRecoveredFile=" + dataRecoveredFile + + ", path=" + attemptPath); foundPreviousAttempt = i; break; } + LOG.info("Skipping attempt directory as data recovered file does not exist" + + ", dataRecoveredFile=" + dataRecoveredFile + + ", path=" + attemptPath); } catch (IOException e) { LOG.warn("Exception when checking previous attempt dir for " + dataRecoveredFile.toString(), e); } } if (foundPreviousAttempt == -1) { + // Look for oldest summary file and use that + LOG.info("Did not find any attempt dir that had data recovered file." + + " Looking for oldest summary file"); + for (int i = 1; i < currentAttemptId; ++i) { + Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i); + Path summaryPath = getSummaryPath(attemptPath); + if (recoveryFS.exists(summaryPath)) { + LOG.info("Found summary file in attempt directory" + + ", summaryFile=" + summaryPath + + ", path=" + attemptPath); + foundPreviousAttempt = i; + break; + } + LOG.info("Skipping attempt directory as no summary file found" + + ", summaryFile=" + summaryPath + + ", path=" + attemptPath); + } + } + if (foundPreviousAttempt == -1) { LOG.info("Falling back to first attempt as no other recovered attempts" + " found"); foundPreviousAttempt = 1; @@ -466,6 +504,7 @@ public class RecoveryParser { if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) { LOG.info("Nothing to recover as previous attempt data does not exist" + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()); + createDataRecoveredFlagFile(); return null; } @@ -476,6 +515,7 @@ public class RecoveryParser { LOG.info("Nothing to recover as summary file does not exist" + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString() + ", summaryPath=" + summaryPath.toString()); + createDataRecoveredFlagFile(); return null; } @@ -832,18 +872,23 @@ public class RecoveryParser { } } + LOG.info("Finished copying data from previous attempt into current attempt"); + createDataRecoveredFlagFile(); + + return recoveredDAGData; + } + + private void createDataRecoveredFlagFile() throws IOException { Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir, dataRecoveredFileFlag); - LOG.info("Finished copying data from previous attempt into current attempt" - + " - setting flag by creating file" - + ", path=" + dataCopiedFlagPath.toString()); + LOG.info("Trying to create data recovered flag file" + + ", filePath=" + dataCopiedFlagPath.toString()); FSDataOutputStream flagFile = recoveryFS.create(dataCopiedFlagPath, true, recoveryBufferSize); flagFile.writeInt(1); flagFile.hsync(); flagFile.close(); - return recoveredDAGData; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java index f7020da..d01fb2f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java @@ -38,6 +38,9 @@ public enum DAGTerminationCause { /** DAG failed during output commit. */ COMMIT_FAILURE, - + + /** DAG failed while trying to write recovery events */ + RECOVERY_FAILURE, + INTERNAL_ERROR } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index ed73433..0e4c504 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -250,7 +250,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // Transitions from TERMINATING state. .addTransition (DAGState.TERMINATING, - EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED), + EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED, + DAGState.ERROR), DAGEventType.DAG_VERTEX_COMPLETED, new VertexCompletedTransition()) .addTransition(DAGState.TERMINATING, DAGState.TERMINATING, @@ -734,6 +735,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, new DAGCommitStartedEvent(getID(), clock.getTime()))); } catch (IOException e) { LOG.error("Failed to send commit event to history/recovery handler", e); + trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE); return false; } for (VertexGroupInfo groupInfo : vertexGroups.values()) { @@ -887,11 +889,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, finishTime = clock.getTime(); } - void logJobHistoryFinishedEvent() { + void logJobHistoryFinishedEvent() throws IOException { this.setFinishTime(); DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, finishTime, DAGState.SUCCEEDED, "", getAllCounters()); - this.appContext.getHistoryHandler().handle( + this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(dagId, finishEvt)); } @@ -909,12 +911,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, new DAGHistoryEvent(dagId, startEvt)); } - void logJobHistoryUnsuccesfulEvent(DAGState state) { + void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException { DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR, getDiagnostics()), getAllCounters()); - this.appContext.getHistoryHandler().handle( + this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(dagId, finishEvt)); } @@ -967,7 +969,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, dag.addDiagnostic(diagnosticMsg); return dag.finished(DAGState.FAILED); } - + if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){ + String diagnosticMsg = "DAG failed due to failure in recovery handling." + + " failedVertices:" + dag.numFailedVertices + + " killedVertices:" + dag.numKilledVertices; + LOG.info(diagnosticMsg); + dag.addDiagnostic(diagnosticMsg); + return dag.finished(DAGState.FAILED); + } + // catch all String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG" + ", numCompletedVertices=" + dag.numCompletedVertices @@ -1016,14 +1026,26 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, finalState = DAGState.FAILED; trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE); } - - if (finalState == DAGState.SUCCEEDED) { - logJobHistoryFinishedEvent(); + + boolean recoveryError = false; + try { + if (finalState == DAGState.SUCCEEDED) { + logJobHistoryFinishedEvent(); + } else { + logJobHistoryUnsuccesfulEvent(finalState); + } + } catch (IOException e) { + LOG.warn("Failed to persist recovery event for DAG completion" + + ", dagId=" + dagId + + ", finalState=" + finalState); + recoveryError = true; + } + + if (recoveryError) { + eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), DAGState.ERROR)); } else { - logJobHistoryUnsuccesfulEvent(finalState); + eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState)); } - - eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState)); LOG.info("DAG: " + getID() + " finished with state: " + finalState); return finalState; @@ -1339,10 +1361,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(), desiredState)); } - dag.logJobHistoryUnsuccesfulEvent(DAGState.FAILED); + DAGState endState = DAGState.FAILED; + try { + dag.logJobHistoryUnsuccesfulEvent(endState); + } catch (IOException e) { + LOG.warn("Failed to persist recovery event for DAG completion" + + ", dagId=" + dag.dagId + + ", finalState=" + endState); + } dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), - DAGState.FAILED)); - return DAGState.FAILED; + endState)); + return endState; } for (Vertex v : dag.vertices.values()) { @@ -1387,9 +1416,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // Error state LOG.warn("Trying to recover DAG, failed to recover" + " from non-handled state" + dag.recoveredState); + // Tell AM ERROR so that it can shutdown dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(), DAGState.ERROR)); - return DAGState.ERROR; + return DAGState.FAILED; } } @@ -1494,23 +1524,27 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // Task-start has been moved out of InitTransition, so this arc simply // hardcodes 0 for both map and reduce finished tasks. private static class KillNewJobTransition - implements SingleArcTransition { + implements SingleArcTransition { + @Override - public void transition(DAGImpl job, DAGEvent event) { - job.setFinishTime(); - job.trySetTerminationCause(DAGTerminationCause.DAG_KILL); - job.finished(DAGState.KILLED); + public void transition(DAGImpl dag, DAGEvent dagEvent) { + dag.setFinishTime(); + dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL); + dag.finished(DAGState.KILLED); } + } private static class KillInitedJobTransition - implements SingleArcTransition { + implements SingleArcTransition { + @Override - public void transition(DAGImpl job, DAGEvent event) { - job.trySetTerminationCause(DAGTerminationCause.DAG_KILL); - job.addDiagnostic("Job received Kill in INITED state."); - job.finished(DAGState.KILLED); + public void transition(DAGImpl dag, DAGEvent dagEvent) { + dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL); + dag.addDiagnostic("Job received Kill in INITED state."); + dag.finished(DAGState.KILLED); } + } private static class DAGKilledTransition @@ -1610,6 +1644,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private boolean vertexSucceeded(Vertex vertex) { numSuccessfulVertices++; boolean failedCommit = false; + boolean recoveryFailed = false; if (!commitAllOutputsOnSuccess) { // committing successful outputs immediately. check for shared outputs List groupsList = vertexGroupInfo.get(vertex.getName()); @@ -1640,6 +1675,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, clock.getTime()))); } catch (IOException e) { LOG.error("Failed to send commit recovery event to handler", e); + recoveryFailed = true; failedCommit = true; } if (!failedCommit) { @@ -1662,6 +1698,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, clock.getTime()))); } catch (IOException e) { LOG.error("Failed to send commit recovery event to handler", e); + recoveryFailed = true; failedCommit = true; } } @@ -1670,10 +1707,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, if (failedCommit) { LOG.info("Aborting job due to failure in commit."); - enactKill(DAGTerminationCause.COMMIT_FAILURE, - VertexTerminationCause.COMMIT_FAILURE); + if (!recoveryFailed) { + enactKill(DAGTerminationCause.COMMIT_FAILURE, + VertexTerminationCause.COMMIT_FAILURE); + } else { + LOG.info("Recovery failure occurred during commit"); + enactKill(DAGTerminationCause.RECOVERY_FAILURE, + VertexTerminationCause.COMMIT_FAILURE); + } } - + return !failedCommit; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 7f94ed7..6f1398a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1276,12 +1276,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, new DAGHistoryEvent(getDAGId(), finishEvt)); } - void logJobHistoryVertexFailedEvent(VertexState state) { + void logJobHistoryVertexFailedEvent(VertexState state) throws IOException { VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested, initedTime, startTimeRequested, startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR, getDiagnostics()), getAllCounters(), getVertexStats()); - this.appContext.getHistoryHandler().handle( + this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(getDAGId(), finishEvt)); } @@ -1331,10 +1331,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } if (firstCommit) { // Log commit start event on first actual commit - vertex.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(vertex.getDAGId(), - new VertexCommitStartedEvent(vertex.vertexId, - vertex.clock.getTime()))); + try { + vertex.appContext.getHistoryHandler().handleCriticalEvent( + new DAGHistoryEvent(vertex.getDAGId(), + new VertexCommitStartedEvent(vertex.vertexId, + vertex.clock.getTime()))); + } catch (IOException e) { + LOG.error("Failed to persist commit start event to recovery, vertexId=" + + vertex.logIdentifier, e); + vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR); + return vertex.finished(VertexState.FAILED); + } } else { firstCommit = false; } @@ -1430,24 +1437,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, case ERROR: eventHandler.handle(new DAGEvent(getDAGId(), DAGEventType.INTERNAL_ERROR)); - logJobHistoryVertexFailedEvent(finalState); + try { + logJobHistoryVertexFailedEvent(finalState); + } catch (IOException e) { + LOG.error("Failed to send vertex finished event to recovery", e); + } break; case KILLED: case FAILED: eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), finalState, terminationCause)); - logJobHistoryVertexFailedEvent(finalState); + try { + logJobHistoryVertexFailedEvent(finalState); + } catch (IOException e) { + LOG.error("Failed to send vertex finished event to recovery", e); + } break; case SUCCEEDED: - eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), - finalState)); try { logJobHistoryVertexFinishedEvent(); + eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), + finalState)); } catch (IOException e) { LOG.error("Failed to send vertex finished event to recovery", e); - finalState = VertexState.ERROR; - eventHandler.handle(new DAGEvent(getDAGId(), - DAGEventType.INTERNAL_ERROR)); + finalState = VertexState.FAILED; + this.terminationCause = VertexTerminationCause.INTERNAL_ERROR; + eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), + finalState)); } break; default: http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java index 413d4ef..4eb094f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java @@ -37,7 +37,6 @@ public class HistoryEventHandler extends CompositeService { private final AppContext context; private boolean yarnATSEnabled; - private AtomicBoolean stopped = new AtomicBoolean(false); private ATSService atsService; private RecoveryService recoveryService; private boolean recoveryEnabled; @@ -114,12 +113,17 @@ public class HistoryEventHandler extends CompositeService { try { handleCriticalEvent(event); } catch (IOException e) { - throw new RuntimeException(e); + LOG.warn("Failed to handle recovery event" + + ", eventType=" + event.getHistoryEvent().getEventType(), e); } } - - - + public boolean hasRecoveryFailed() { + if (recoveryEnabled) { + return recoveryService.hasRecoveryFailed(); + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 110da65..840ad1d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -48,6 +48,9 @@ public class RecoveryService extends AbstractService { private static final Log LOG = LogFactory.getLog(RecoveryService.class); private final AppContext appContext; + public static final String RECOVERY_FATAL_OCCURRED_DIR = + "RecoveryFatalErrorOccurred"; + private LinkedBlockingQueue eventQueue = new LinkedBlockingQueue(); private Set completedDAGs = new HashSet(); @@ -69,6 +72,7 @@ public class RecoveryService extends AbstractService { private long lastFlushTime = -1; private int maxUnflushedEvents; private int flushInterval; + private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false); public RecoveryService(AppContext appContext) { super(RecoveryService.class.getName()); @@ -99,6 +103,13 @@ public class RecoveryService extends AbstractService { DAGHistoryEvent event; while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + if (recoveryFatalErrorOccurred.get()) { + LOG.error("Recovery failure occurred. Stopping recovery thread." + + " Current eventQueueSize=" + eventQueue.size()); + eventQueue.clear(); + return; + } + // Log the size of the event-queue every so often. if (eventCounter != 0 && eventCounter % 1000 == 0) { LOG.info("Event queue stats" @@ -169,6 +180,11 @@ public class RecoveryService extends AbstractService { return; } HistoryEventType eventType = event.getHistoryEvent().getEventType(); + + if (recoveryFatalErrorOccurred.get()) { + return; + } + if (!started.get()) { LOG.warn("Adding event of type " + eventType + " to queue as service not started"); @@ -233,9 +249,29 @@ public class RecoveryService extends AbstractService { } } } catch (IOException ioe) { - LOG.warn("Error handling summary event" + LOG.error("Error handling summary event" + ", eventType=" + event.getHistoryEvent().getEventType(), ioe); - throw ioe; + Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR); + try { + LOG.error("Adding a flag to ensure next AM attempt does not start up" + + ", flagFile=" + fatalErrorDir.toString()); + recoveryFatalErrorOccurred.set(true); + recoveryDirFS.mkdirs(fatalErrorDir); + if (recoveryDirFS.exists(fatalErrorDir)) { + LOG.error("Recovery failure occurred. Skipping all events"); + } else { + // throw error if fatal error flag could not be set + throw ioe; + } + } catch (IOException e) { + LOG.fatal("Failed to create fatal error flag dir " + + fatalErrorDir.toString(), e); + throw ioe; + } + if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { + // Throw error to tell client that dag submission failed + throw ioe; + } } } } else { @@ -374,4 +410,8 @@ public class RecoveryService extends AbstractService { lastFlushTime = currentTime; } + public boolean hasRecoveryFailed() { + return recoveryFatalErrorOccurred.get(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index 93a3a48..9874f7b 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -509,11 +509,15 @@ public class OrderedWordCount { + dagStatus.getDiagnostics()); } } + } catch (Exception e) { + LOG.error("Error occurred when submitting/running DAGs", e); + throw e; } finally { if (!retainStagingDir) { fs.delete(stagingDir, true); } if (useTezSession) { + LOG.info("Shutting down session"); tezSession.stop(); } }