Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7DA9A104A9 for ; Tue, 18 Mar 2014 21:23:01 +0000 (UTC) Received: (qmail 29316 invoked by uid 500); 18 Mar 2014 21:23:00 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 29293 invoked by uid 500); 18 Mar 2014 21:23:00 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 29278 invoked by uid 99); 18 Mar 2014 21:22:59 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Mar 2014 21:22:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 18 Mar 2014 21:22:55 +0000 Received: (qmail 25517 invoked by uid 99); 18 Mar 2014 21:22:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Mar 2014 21:22:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1B4169832FE; Tue, 18 Mar 2014 21:22:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.incubator.apache.org Date: Tue, 18 Mar 2014 21:22:32 -0000 Message-Id: <8a91ca715cdd47e8ad10bfd9ce9504b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: TEZ-851. Handle failure to persist events to HDFS. (hitesh) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master 3f3f94827 -> 8d87e0398 TEZ-851. Handle failure to persist events to HDFS. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c548c915 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c548c915 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c548c915 Branch: refs/heads/master Commit: c548c91576875030862aa909dea68cf8a40673a8 Parents: 3f3f948 Author: Hitesh Shah Authored: Tue Mar 18 12:32:35 2014 -0700 Committer: Hitesh Shah Committed: Tue Mar 18 12:32:35 2014 -0700 ---------------------------------------------------------------------- docs/src/site/site.xml | 2 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 7 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 47 ++++--- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 18 ++- .../tez/dag/history/HistoryEventHandler.java | 18 ++- .../dag/history/recovery/RecoveryService.java | 131 +++++++++---------- 6 files changed, 132 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/docs/src/site/site.xml ---------------------------------------------------------------------- diff --git a/docs/src/site/site.xml b/docs/src/site/site.xml index ddc9d03..c08b02d 100644 --- a/docs/src/site/site.xml +++ b/docs/src/site/site.xml @@ -102,7 +102,7 @@ - + http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 9a01090..6db1647 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1713,7 +1713,12 @@ public class DAGAppMaster extends AbstractService { // for an app later DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(), submitTime, dagPlan, this.appAttemptID); - historyEventHandler.handle(new DAGHistoryEvent(newDAG.getID(), submittedEvent)); + try { + historyEventHandler.handleCriticalEvent( + new DAGHistoryEvent(newDAG.getID(), submittedEvent)); + } catch (IOException e) { + throw new RuntimeException(e); + } startDAG(newDAG); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 8fc278f..ed73433 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -729,8 +729,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, boolean failedWhileCommitting = false; if (dagSucceeded && !successfulOutputsAlreadyCommitted) { // commit all shared outputs - appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(), - new DAGCommitStartedEvent(getID(), clock.getTime()))); + try { + appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), + new DAGCommitStartedEvent(getID(), clock.getTime()))); + } catch (IOException e) { + LOG.error("Failed to send commit event to history/recovery handler", e); + return false; + } for (VertexGroupInfo groupInfo : vertexGroups.values()) { if (failedWhileCommitting) { break; @@ -1629,24 +1634,36 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } groupInfo.committed = true; Vertex v = getVertex(groupInfo.groupMembers.iterator().next()); - appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(), - new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName, - clock.getTime()))); - for (String outputName : groupInfo.outputs) { - OutputCommitter committer = v.getOutputCommitters().get(outputName); - LOG.info("Committing output: " + outputName); - if (!commitOutput(outputName, committer)) { - // using same logic as vertex level commit. stop after first failure. - failedCommit = true; - break; + try { + appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), + new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName, + clock.getTime()))); + } catch (IOException e) { + LOG.error("Failed to send commit recovery event to handler", e); + failedCommit = true; + } + if (!failedCommit) { + for (String outputName : groupInfo.outputs) { + OutputCommitter committer = v.getOutputCommitters().get(outputName); + LOG.info("Committing output: " + outputName); + if (!commitOutput(outputName, committer)) { + // using same logic as vertex level commit. stop after first failure. + failedCommit = true; + break; + } } } if (failedCommit) { break; } - appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(), - new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName, - clock.getTime()))); + try { + appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), + new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName, + clock.getTime()))); + } catch (IOException e) { + LOG.error("Failed to send commit recovery event to handler", e); + failedCommit = true; + } } } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 1010ab0..46c91b1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -17,6 +17,7 @@ package org.apache.tez.dag.app.dag.impl; +import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; @@ -1264,13 +1265,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, new DAGHistoryEvent(getDAGId(), startEvt)); } - void logJobHistoryVertexFinishedEvent() { + void logJobHistoryVertexFinishedEvent() throws IOException { this.setFinishTime(); VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested, initedTime, startTimeRequested, startedTime, finishTime, VertexState.SUCCEEDED, "", getAllCounters(), getVertexStats()); - this.appContext.getHistoryHandler().handle( + this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(getDAGId(), finishEvt)); } @@ -1439,7 +1440,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, case SUCCEEDED: eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), finalState)); - logJobHistoryVertexFinishedEvent(); + try { + logJobHistoryVertexFinishedEvent(); + } catch (IOException e) { + LOG.error("Failed to send vertex finished event to recovery", e); + finalState = VertexState.ERROR; + eventHandler.handle(new DAGEvent(getDAGId(), + DAGEventType.INTERNAL_ERROR)); + } break; default: throw new TezUncheckedException("Unexpected VertexState: " + finalState); @@ -1830,7 +1838,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, break; } assert vertex.tasks.size() == vertex.numTasks; - if (vertex.tasks != null) { + if (vertex.tasks != null && vertex.numTasks != 0) { for (Task task : vertex.tasks.values()) { vertex.eventHandler.handle( new TaskEventRecoverTask(task.getTaskId())); @@ -2100,7 +2108,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, break; } assert vertex.tasks.size() == vertex.numTasks; - if (vertex.tasks != null) { + if (vertex.tasks != null && vertex.numTasks != 0) { for (Task task : vertex.tasks.values()) { vertex.eventHandler.handle( new TaskEventRecoverTask(task.getTaskId())); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java index 866cdc4..413d4ef 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java @@ -28,6 +28,7 @@ import org.apache.tez.dag.history.ats.ATSService; import org.apache.tez.dag.history.recovery.RecoveryService; import org.apache.tez.dag.records.TezDAGID; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; public class HistoryEventHandler extends CompositeService { @@ -76,7 +77,14 @@ public class HistoryEventHandler extends CompositeService { super.serviceStop(); } - public void handle(DAGHistoryEvent event) { + /** + * Used by events that are critical for recovery + * DAG Submission/finished and any commit related activites are critical events + * In short, any events that are instances of SummaryEvent + * @param event History event + * @throws IOException + */ + public void handleCriticalEvent(DAGHistoryEvent event) throws IOException { TezDAGID dagId = event.getDagID(); String dagIdStr = "N/A"; if(dagId != null) { @@ -102,6 +110,14 @@ public class HistoryEventHandler extends CompositeService { + ": " + event.getHistoryEvent().toString()); } + public void handle(DAGHistoryEvent event) { + try { + handleCriticalEvent(event); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 0074a4c..1353151 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -122,7 +122,8 @@ public class RecoveryService extends AbstractService { ++eventsProcessed; handleRecoveryEvent(event); } catch (Exception e) { - // TODO handle failures - treat as fatal or ignore? + // For now, ignore any such errors as these are non-critical + // All summary event related errors are handled as critical LOG.warn("Error handling recovery event", e); } } @@ -161,7 +162,7 @@ public class RecoveryService extends AbstractService { } } - public void handle(DAGHistoryEvent event) { + public void handle(DAGHistoryEvent event) throws IOException { if (stopped.get()) { LOG.warn("Igoring event as service stopped, eventType" + event.getHistoryEvent().getEventType()); @@ -228,13 +229,13 @@ public class RecoveryService extends AbstractService { } catch (IOException ioe) { LOG.warn("Error when trying to flush/close recovery file for" + " dag, dagId=" + event.getDagID()); - // FIXME handle error ? } } } - } catch (Exception e) { - // FIXME handle failures - LOG.warn("Error handling recovery event", e); + } catch (IOException ioe) { + LOG.warn("Error handling summary event" + + ", eventType=" + event.getHistoryEvent().getEventType(), ioe); + throw ioe; } } } else { @@ -248,39 +249,32 @@ public class RecoveryService extends AbstractService { private void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType, - SummaryEvent summaryEvent) { + SummaryEvent summaryEvent) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Handling summary event" + ", dagID=" + dagID + ", eventType=" + eventType); } - try { - if (summaryStream == null) { - Path summaryPath = new Path(recoveryPath, - appContext.getApplicationID() - + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX); - if (!recoveryDirFS.exists(summaryPath)) { - summaryStream = recoveryDirFS.create(summaryPath, false, - bufferSize); - } else { - summaryStream = recoveryDirFS.append(summaryPath, bufferSize); - } + if (summaryStream == null) { + Path summaryPath = new Path(recoveryPath, + appContext.getApplicationID() + + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX); + if (!recoveryDirFS.exists(summaryPath)) { + summaryStream = recoveryDirFS.create(summaryPath, false, + bufferSize); + } else { + summaryStream = recoveryDirFS.append(summaryPath, bufferSize); } - if (LOG.isDebugEnabled()) { - LOG.debug("Writing recovery event to summary stream" - + ", dagId=" + dagID - + ", eventType=" + eventType); - } - summaryEvent.toSummaryProtoStream(summaryStream); - } catch (IOException ioe) { - // FIXME handle failures - LOG.warn("Failed to write to stream", ioe); } - - + if (LOG.isDebugEnabled()) { + LOG.debug("Writing recovery event to summary stream" + + ", dagId=" + dagID + + ", eventType=" + eventType); + } + summaryEvent.toSummaryProtoStream(summaryStream); } - private void handleRecoveryEvent(DAGHistoryEvent event) { + private void handleRecoveryEvent(DAGHistoryEvent event) throws IOException { HistoryEventType eventType = event.getHistoryEvent().getEventType(); if (LOG.isDebugEnabled()) { LOG.debug("Handling recovery event of type " @@ -300,56 +294,57 @@ public class RecoveryService extends AbstractService { return; } - try { - - if (!outputStreamMap.containsKey(dagID)) { - Path dagFilePath = new Path(recoveryPath, - dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX); - FSDataOutputStream outputStream; - if (recoveryDirFS.exists(dagFilePath)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Opening DAG recovery file in append mode" - + ", filePath=" + dagFilePath); - } - outputStream = recoveryDirFS.append(dagFilePath, bufferSize); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Opening DAG recovery file in create mode" - + ", filePath=" + dagFilePath); - } - outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize); + if (!outputStreamMap.containsKey(dagID)) { + Path dagFilePath = new Path(recoveryPath, + dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX); + FSDataOutputStream outputStream; + if (recoveryDirFS.exists(dagFilePath)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Opening DAG recovery file in append mode" + + ", filePath=" + dagFilePath); } - outputStreamMap.put(dagID, outputStream); + outputStream = recoveryDirFS.append(dagFilePath, bufferSize); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Opening DAG recovery file in create mode" + + ", filePath=" + dagFilePath); + } + outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize); } + outputStreamMap.put(dagID, outputStream); + } - FSDataOutputStream outputStream = outputStreamMap.get(dagID); + FSDataOutputStream outputStream = outputStreamMap.get(dagID); - if (LOG.isDebugEnabled()) { - LOG.debug("Writing recovery event to output stream" - + ", dagId=" + dagID - + ", eventType=" + eventType); - } - ++unflushedEventsCount; - outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal()); - event.getHistoryEvent().toProtoStream(outputStream); - if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED, - HistoryEventType.DAG_FINISHED).contains(eventType)) { - maybeFlush(outputStream); - } - } catch (IOException ioe) { - // FIXME handle failures - LOG.warn("Failed to write to stream", ioe); + if (LOG.isDebugEnabled()) { + LOG.debug("Writing recovery event to output stream" + + ", dagId=" + dagID + + ", eventType=" + eventType); + } + ++unflushedEventsCount; + outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal()); + event.getHistoryEvent().toProtoStream(outputStream); + if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED, + HistoryEventType.DAG_FINISHED).contains(eventType)) { + maybeFlush(outputStream); } - } private void maybeFlush(FSDataOutputStream outputStream) throws IOException { long currentTime = appContext.getClock().getTime(); boolean doFlush = false; if (unflushedEventsCount >= maxUnflushedEvents) { + if (LOG.isDebugEnabled()) { + LOG.debug("Max unflushed events count reached. Flushing recovery data" + + ", unflushedEventsCount=" + unflushedEventsCount + + ", maxUnflushedEvents=" + maxUnflushedEvents); + } doFlush = true; } else if (flushInterval >= 0 && ((currentTime - lastFlushTime) >= (flushInterval*1000))) { + LOG.debug("Flush interval time period elapsed. Flushing recovery data" + + ", lastTimeSinceFLush=" + lastFlushTime + + ", timeSinceLastFlush=" + (currentTime - lastFlushTime)); doFlush = true; } if (!doFlush) { @@ -369,9 +364,9 @@ public class RecoveryService extends AbstractService { if (LOG.isDebugEnabled()) { LOG.debug("Flushing output stream" + ", lastTimeSinceFLush=" + lastFlushTime + + ", timeSinceLastFlush=" + (currentTime - lastFlushTime) + ", unflushedEventsCount=" + unflushedEventsCount - + ", maxUnflushedEvents=" + maxUnflushedEvents - + ", currentTime=" + currentTime); + + ", maxUnflushedEvents=" + maxUnflushedEvents); } unflushedEventsCount = 0;