Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9562818C44 for ; Mon, 7 Sep 2015 23:50:52 +0000 (UTC) Received: (qmail 87519 invoked by uid 500); 7 Sep 2015 23:50:52 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 87487 invoked by uid 500); 7 Sep 2015 23:50:52 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 87478 invoked by uid 99); 7 Sep 2015 23:50:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Sep 2015 23:50:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20430E050C; Mon, 7 Sep 2015 23:50:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Message-Id: <0f742c5c72d64d3290b5aeeb14a04210@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2778. Improvements to handle multiple read errors with complex DAGs (bikas) Date: Mon, 7 Sep 2015 23:50:52 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master db725e568 -> 171d48504 TEZ-2778. Improvements to handle multiple read errors with complex DAGs (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/171d4850 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/171d4850 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/171d4850 Branch: refs/heads/master Commit: 171d4850443a9e67aa5a68d4c8482779c0caa8eb Parents: db725e5 Author: Bikas Saha Authored: Mon Sep 7 16:50:33 2015 -0700 Committer: Bikas Saha Committed: Mon Sep 7 16:50:33 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 3 +- .../tez/dag/app/TaskCommunicatorManager.java | 13 +- .../dag/event/TaskAttemptEventStatusUpdate.java | 9 ++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 72 +++++++-- .../events/TaskAttemptFinishedEvent.java | 43 ++--- .../impl/HistoryEventJsonConversion.java | 6 +- .../apache/tez/dag/history/utils/DAGUtils.java | 24 +++ tez-dag/src/main/proto/HistoryEvents.proto | 8 +- .../dag/app/TestTaskCommunicatorManager1.java | 33 ++++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 87 ++++++++++ .../app/dag/impl/TestTaskAttemptRecovery.java | 15 +- .../tez/dag/app/dag/impl/TestTaskRecovery.java | 34 ++-- .../TestHistoryEventsProtoConversion.java | 18 ++- .../impl/TestHistoryEventJsonConversion.java | 2 +- .../parser/datamodel/TaskAttemptInfo.java | 56 +++++-- .../apache/tez/history/parser/utils/Utils.java | 18 +++ .../apache/tez/history/TestHistoryParser.java | 10 +- .../ats/HistoryEventTimelineConversion.java | 7 +- .../ats/TestHistoryEventTimelineConversion.java | 21 ++- .../analyzer/plugins/CriticalPathAnalyzer.java | 157 +++++++------------ .../org/apache/tez/analyzer/TestAnalyzer.java | 32 ++-- 22 files changed, 462 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9df51c0..bce05c0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2778. Improvements to handle multiple read errors with complex DAGs TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting down an AM. TEZ-2745. ClassNotFoundException of user code should fail dag http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index ad9270f..f786a4e 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -84,8 +84,7 @@ public class ATSConstants { public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL"; public static final String COMPLETED_LOGS_URL = "completedLogsURL"; public static final String EXIT_STATUS = "exitStatus"; - public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime"; - public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA"; + public static final String LAST_DATA_EVENTS = "lastDataEvents"; public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers"; public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt"; http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index cfb34ac..2cc6ae2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -35,7 +35,6 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.serviceplugins.api.ContainerEndReason; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; @@ -234,19 +233,27 @@ public class TaskCommunicatorManager extends AbstractService implements // to VertexImpl to ensure the events ordering // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent + TaskAttemptEventStatusUpdate taskAttemptEvent = null; + boolean readErrorReported = false; for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { // for now, set the event time on the AM when it is received. // this avoids any time disparity between machines. tezEvent.setEventReceivedTime(currTime); final EventType eventType = tezEvent.getEventType(); if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { - TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, + taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, (TaskStatusUpdateEvent) tezEvent.getEvent()); - context.getEventHandler().handle(taskAttemptEvent); } else { + if (eventType == EventType.INPUT_READ_ERROR_EVENT) { + readErrorReported = true; + } otherEvents.add(tezEvent); } } + if (taskAttemptEvent != null) { + taskAttemptEvent.setReadErrorReported(readErrorReported); + context.getEventHandler().handle(taskAttemptEvent); + } if(!otherEvents.isEmpty()) { TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); context.getEventHandler().handle( http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java index c5a6ea7..458679c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java @@ -24,6 +24,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent { private TaskStatusUpdateEvent taskAttemptStatus; + private boolean readErrorReported = false; public TaskAttemptEventStatusUpdate(TezTaskAttemptID id, TaskStatusUpdateEvent statusEvent) { @@ -34,4 +35,12 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent { public TaskStatusUpdateEvent getStatusEvent() { return this.taskAttemptStatus; } + + public void setReadErrorReported(boolean value) { + readErrorReported = value; + } + + public boolean getReadErrorReported() { + return readErrorReported; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e57c827..003e05f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -97,6 +97,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; @@ -119,6 +120,37 @@ public class TaskAttemptImpl implements TaskAttempt, private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class); private static final String LINE_SEPARATOR = System .getProperty("line.separator"); + + public static class DataEventDependencyInfo { + long timestamp; + TezTaskAttemptID taId; + public DataEventDependencyInfo(long time, TezTaskAttemptID id) { + this.timestamp = time; + this.taId = id; + } + public long getTimestamp() { + return timestamp; + } + public TezTaskAttemptID getTaskAttemptId() { + return taId; + } + public static DataEventDependencyInfoProto toProto(DataEventDependencyInfo info) { + DataEventDependencyInfoProto.Builder builder = DataEventDependencyInfoProto.newBuilder(); + builder.setTimestamp(info.timestamp); + if (info.taId != null) { + builder.setTaskAttemptId(info.taId.toString()); + } + return builder.build(); + } + + public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto proto) { + TezTaskAttemptID taId = null; + if(proto.hasTaskAttemptId()) { + taId = TezTaskAttemptID.fromString(proto.getTaskAttemptId()); + } + return new DataEventDependencyInfo(proto.getTimestamp(), taId); + } + } static final TezCounters EMPTY_COUNTERS = new TezCounters(); @@ -150,8 +182,8 @@ public class TaskAttemptImpl implements TaskAttempt, private final Vertex vertex; @VisibleForTesting - long lastDataEventTime; - TezTaskAttemptID lastDataEventSourceTA = null; + boolean appendNextDataEvent = true; + ArrayList lastDataEvents = Lists.newArrayList(); @VisibleForTesting TaskAttemptStatus reportedStatus; @@ -822,8 +854,9 @@ public class TaskAttemptImpl implements TaskAttempt, : TaskAttemptTerminationCause.UNKNOWN_ERROR; this.diagnostics.add(tEvent.getDiagnostics()); this.recoveredState = tEvent.getState(); - this.lastDataEventTime = tEvent.getLastDataEventTime(); - this.lastDataEventSourceTA = tEvent.getLastDataEventSourceTA(); + if (tEvent.getDataEvents() != null) { + this.lastDataEvents.addAll(tEvent.getDataEvents()); + } sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState())); return recoveredState; } @@ -1040,7 +1073,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), getFinishTime(), TaskAttemptState.SUCCEEDED, null, - "", getCounters(), lastDataEventTime, lastDataEventSourceTA); + "", getCounters(), lastDataEvents); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1057,8 +1090,7 @@ public class TaskAttemptImpl implements TaskAttempt, finishTime, state, terminationCause, StringUtils.join( - getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime, - lastDataEventSourceTA); + getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1324,12 +1356,16 @@ public class TaskAttemptImpl implements TaskAttempt, SingleArcTransition { @Override public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { - TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event) - .getStatusEvent(); + TaskAttemptEventStatusUpdate sEvent = (TaskAttemptEventStatusUpdate) event; + TaskStatusUpdateEvent statusEvent = sEvent.getStatusEvent(); ta.reportedStatus.state = ta.getState(); ta.reportedStatus.progress = statusEvent.getProgress(); ta.reportedStatus.counters = statusEvent.getCounters(); ta.statistics = statusEvent.getStatistics(); + if (sEvent.getReadErrorReported()) { + // if there is a read error then track the next last data event + ta.appendNextDataEvent = true; + } ta.updateProgressSplits(); @@ -1655,8 +1691,20 @@ public class TaskAttemptImpl implements TaskAttempt, @Override public void setLastEventSent(TezEvent lastEventSent) { - // task attempt id may be null for input data information events - this.lastDataEventSourceTA = lastEventSent.getSourceInfo().getTaskAttemptID(); - this.lastDataEventTime = lastEventSent.getEventReceivedTime(); + writeLock.lock(); + try { + DataEventDependencyInfo info = new DataEventDependencyInfo( + lastEventSent.getEventReceivedTime(), lastEventSent.getSourceInfo().getTaskAttemptID()); + // task attempt id may be null for input data information events + if (appendNextDataEvent) { + appendNextDataEvent = false; + lastDataEvents.add(info); + } else { + // over-write last event - array list makes it quick + lastDataEvents.set(lastDataEvents.size() - 1, info); + } + } finally { + writeLock.unlock(); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 52761e2..fbde635 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -21,16 +21,22 @@ package org.apache.tez.dag.history.events; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto; public class TaskAttemptFinishedEvent implements HistoryEvent { @@ -45,9 +51,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private String diagnostics; private TezCounters tezCounters; private TaskAttemptTerminationCause error; - private TezTaskAttemptID lastDataEventSourceTA; - private long lastDataEventTime; - + private List dataEvents; + public TaskAttemptFinishedEvent(TezTaskAttemptID taId, String vertexName, long startTime, @@ -55,8 +60,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { TaskAttemptState state, TaskAttemptTerminationCause error, String diagnostics, TezCounters counters, - long lastDataEventTime, - TezTaskAttemptID lastDataEventSourceTA) { + List dataEvents) { this.taskAttemptId = taId; this.vertexName = vertexName; this.startTime = startTime; @@ -65,8 +69,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.diagnostics = diagnostics; this.tezCounters = counters; this.error = error; - this.lastDataEventTime = lastDataEventTime; - this.lastDataEventSourceTA = lastDataEventSourceTA; + this.dataEvents = dataEvents; } public TaskAttemptFinishedEvent() { @@ -87,14 +90,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { return true; } - public long getLastDataEventTime() { - return lastDataEventTime; + public List getDataEvents() { + return dataEvents; } - public TezTaskAttemptID getLastDataEventSourceTA() { - return lastDataEventSourceTA; - } - public TaskAttemptFinishedProto toProto() { TaskAttemptFinishedProto.Builder builder = TaskAttemptFinishedProto.newBuilder(); @@ -110,9 +109,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { if (tezCounters != null) { builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters)); } - if (lastDataEventSourceTA != null) { - builder.setLastDataEventSourceTA(lastDataEventSourceTA.toString()); - builder.setLastDataEventTime(lastDataEventTime); + if (dataEvents != null && !dataEvents.isEmpty()) { + for (DataEventDependencyInfo info : dataEvents) { + builder.addDataEvents(DataEventDependencyInfo.toProto(info)); + } } return builder.build(); } @@ -131,9 +131,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.tezCounters = DagTypeConverters.convertTezCountersFromProto( proto.getCounters()); } - if (proto.hasLastDataEventSourceTA()) { - this.lastDataEventSourceTA = TezTaskAttemptID.fromString(proto.getLastDataEventSourceTA()); - this.lastDataEventTime = proto.getLastDataEventTime(); + if (proto.getDataEventsCount() > 0) { + this.dataEvents = Lists.newArrayListWithCapacity(proto.getDataEventsCount()); + for (DataEventDependencyInfoProto protoEvent : proto.getDataEventsList()) { + this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent)); + } } } @@ -163,8 +165,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { + ", errorEnum=" + (error != null ? error.name() : "") + ", diagnostics=" + diagnostics + ", lastDataEventSourceTA=" + - ((lastDataEventSourceTA==null) ? null:lastDataEventSourceTA.toString()) - + ", lastDataEventTime=" + lastDataEventTime + ((dataEvents==null) ? 0:dataEvents.size()) + ", counters=" + (tezCounters == null ? "null" : tezCounters.toString() .replaceAll("\\n", ", ").replaceAll("\\s+", " ")); http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index b32b324..411d677 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -530,9 +530,9 @@ public class HistoryEventJsonConversion { otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); otherInfo.put(ATSConstants.COUNTERS, DAGUtils.convertCountersToJSON(event.getCounters())); - otherInfo.put(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime()); - if (event.getLastDataEventSourceTA() != null) { - otherInfo.put(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, event.getLastDataEventSourceTA().toString()); + if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) { + otherInfo.put(ATSConstants.LAST_DATA_EVENTS, + DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents())); } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 1447832..76e592e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; @@ -42,6 +43,8 @@ import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; +import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.records.TezTaskID; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -99,6 +102,27 @@ public class DAGUtils { } return dagJson; } + + public static JSONObject convertDataEventDependencyInfoToJSON(List info) { + return new JSONObject(convertDataEventDependecyInfoToATS(info)); + } + + public static Map convertDataEventDependecyInfoToATS(List info) { + ArrayList infoList = new ArrayList(); + for (DataEventDependencyInfo event : info) { + Map eventObj = new LinkedHashMap(); + String id = ""; + if (event.getTaskAttemptId() != null) { + id = event.getTaskAttemptId().toString(); + } + eventObj.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), id); + eventObj.put(ATSConstants.TIMESTAMP, event.getTimestamp()); + infoList.add(eventObj); + } + Map object = new LinkedHashMap(); + putInto(object, ATSConstants.LAST_DATA_EVENTS, infoList); + return object; + } public static JSONObject convertCountersToJSON(TezCounters counters) throws JSONException { http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index e268e0d..232f1b7 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -169,6 +169,11 @@ message TaskAttemptStartedProto { optional int64 allocation_time = 7; } +message DataEventDependencyInfoProto { + optional string task_attempt_id = 1; + optional int64 timestamp = 2; +} + message TaskAttemptFinishedProto { optional string task_attempt_id = 1; optional int64 finish_time = 2; @@ -176,8 +181,7 @@ message TaskAttemptFinishedProto { optional string diagnostics = 4; optional TezCountersProto counters = 5; optional string error_enum = 6; - optional int64 last_data_event_time = 7; - optional string last_data_event_source_t_a = 8; + repeated DataEventDependencyInfoProto data_events = 7; } message EventMetaDataProto { http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java index 117d3b3..35893b3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -68,6 +68,7 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.dag.event.VertexEventType; @@ -79,6 +80,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; @@ -247,6 +249,7 @@ public class TestTaskCommunicatorManager1 { final Event statusUpdateEvent = argAllValues.get(0); assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, statusUpdateEvent.getType()); + assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported()); final Event vertexEvent = argAllValues.get(1); final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; @@ -256,9 +259,39 @@ public class TestTaskCommunicatorManager1 { vertexRouteEvent.getEvents().get(0).getEventType()); assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, vertexRouteEvent.getEvents().get(1).getEventType()); + } + + @Test (timeout = 5000) + public void testTaskEventRoutingWithReadError() throws Exception { + List events = Arrays.asList( + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null), + new TezEvent(InputReadErrorEvent.create("", 0, 0), null), + new TezEvent(new TaskAttemptCompletedEvent(), null) + ); + + generateHeartbeat(events, 0, 1, 0, new ArrayList()); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(arg.capture()); + final List argAllValues = arg.getAllValues(); + + final Event statusUpdateEvent = argAllValues.get(0); + assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, + statusUpdateEvent.getType()); + assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported()); + + final Event vertexEvent = argAllValues.get(1); + final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; + assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, + vertexEvent.getType()); + assertEquals(EventType.INPUT_READ_ERROR_EVENT, + vertexRouteEvent.getEvents().get(0).getEventType()); + assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, + vertexRouteEvent.getEvents().get(1).getEventType()); } + @Test (timeout = 5000) public void testTaskEventRoutingTaskAttemptOnly() throws Exception { List events = Arrays.asList( http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 101b22f..2d30a6f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -584,6 +585,92 @@ public class TestTaskAttempt { } @Test(timeout = 5000) + public void testLastDataEventRecording() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, new SystemClock(), + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + + long ts1 = 1024; + long ts2 = 2048; + TezTaskAttemptID mockId1 = mock(TezTaskAttemptID.class); + TezTaskAttemptID mockId2 = mock(TezTaskAttemptID.class); + TezEvent mockTezEvent1 = mock(TezEvent.class, RETURNS_DEEP_STUBS); + when(mockTezEvent1.getEventReceivedTime()).thenReturn(ts1); + when(mockTezEvent1.getSourceInfo().getTaskAttemptID()).thenReturn(mockId1); + TezEvent mockTezEvent2 = mock(TezEvent.class, RETURNS_DEEP_STUBS); + when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2); + when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2); + TaskAttemptEventStatusUpdate statusEvent = + new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)); + + assertEquals(0, taImpl.lastDataEvents.size()); + taImpl.setLastEventSent(mockTezEvent1); + assertEquals(1, taImpl.lastDataEvents.size()); + assertEquals(ts1, taImpl.lastDataEvents.get(0).getTimestamp()); + assertEquals(mockId1, taImpl.lastDataEvents.get(0).getTaskAttemptId()); + taImpl.handle(statusEvent); + taImpl.setLastEventSent(mockTezEvent2); + assertEquals(1, taImpl.lastDataEvents.size()); + assertEquals(ts2, taImpl.lastDataEvents.get(0).getTimestamp()); + assertEquals(mockId2, taImpl.lastDataEvents.get(0).getTaskAttemptId()); // over-write earlier value + statusEvent.setReadErrorReported(true); + taImpl.handle(statusEvent); + taImpl.setLastEventSent(mockTezEvent1); + assertEquals(2, taImpl.lastDataEvents.size()); + assertEquals(ts1, taImpl.lastDataEvents.get(1).getTimestamp()); + assertEquals(mockId1, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // add new event + taImpl.setLastEventSent(mockTezEvent2); + assertEquals(2, taImpl.lastDataEvents.size()); + assertEquals(ts2, taImpl.lastDataEvents.get(1).getTimestamp()); + assertEquals(mockId2, taImpl.lastDataEvents.get(1).getTaskAttemptId()); // over-write earlier value + } + + @Test(timeout = 5000) public void testFailure() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index 6bbfc3d..53f1856 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -51,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.HistoryEventType; @@ -65,6 +66,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import com.google.common.collect.Lists; + @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestTaskAttemptRecovery { @@ -177,9 +180,12 @@ public class TestTaskAttemptRecovery { long lastDataEventTime = 1024; TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class); + List events = Lists.newLinkedList(); + events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA)); + events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA)); TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - startTime, finishTime, state, errorEnum, diag, counters, lastDataEventTime, lastDataEventTA)); + startTime, finishTime, state, errorEnum, diag, counters, events)); assertEquals(startTime, ta.getLaunchTime()); assertEquals(finishTime, ta.getFinishTime()); assertEquals(counters, ta.reportedStatus.counters); @@ -188,8 +194,9 @@ public class TestTaskAttemptRecovery { assertEquals(1, ta.getDiagnostics().size()); assertEquals(diag, ta.getDiagnostics().get(0)); assertEquals(state, recoveredState); - assertEquals(lastDataEventTime, ta.lastDataEventTime); - assertEquals(lastDataEventTA, ta.lastDataEventSourceTA); + assertEquals(events.size(), ta.lastDataEvents.size()); + assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp()); + assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId()); if (state != TaskAttemptState.SUCCEEDED) { assertEquals(errorEnum, ta.getTerminationCause()); } else { @@ -314,7 +321,7 @@ public class TestTaskAttemptRecovery { TaskAttemptState recoveredState = ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, startTime, finishTime, TaskAttemptState.KILLED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), 0, null)); + TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null)); assertEquals(TaskAttemptState.KILLED, recoveredState); } } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index eca8274..b6d4c10 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -286,7 +286,7 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), 0, null)); + 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), null)); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -307,7 +307,7 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), 0, null)); + 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), null)); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -329,7 +329,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); try { task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), 0, null)); + 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null)); fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)"); } catch (TezUncheckedException e) { assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover")); @@ -372,7 +372,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -405,7 +405,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -438,7 +438,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -473,7 +473,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -516,7 +516,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -528,7 +528,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -563,7 +563,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -575,7 +575,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -614,7 +614,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -654,7 +654,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -735,7 +735,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters(), 0, null)); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -776,7 +776,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.KILLED, null, "", null, 0, null)); + 0, TaskAttemptState.KILLED, null, "", null, null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(0, task.failedAttempts); @@ -806,7 +806,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null, 0, null)); + 0, TaskAttemptState.FAILED, null, "", null, null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(maxFailedAttempts, task.failedAttempts); @@ -836,7 +836,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null, 0, null)); + 0, TaskAttemptState.FAILED, null, "", null, null)); } assertEquals(maxFailedAttempts - 1, task.getAttempts().size()); assertEquals(maxFailedAttempts - 1, task.failedAttempts); http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index b215a06..5c8c90e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -48,6 +48,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -508,9 +509,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - null, null, null, 1024, - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 110), 1), 1)); + null, null, null, null); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -523,16 +522,20 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getState()); Assert.assertEquals(event.getCounters(), deserializedEvent.getCounters()); - Assert.assertEquals(event.getLastDataEventTime(), deserializedEvent.getLastDataEventTime()); - Assert.assertEquals(event.getLastDataEventSourceTA(), deserializedEvent.getLastDataEventSourceTA()); logEvents(event, deserializedEvent); } { + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 0), 0); + long timestamp = 1024L; + List events = Lists.newArrayList(); + events.add(new DataEventDependencyInfo(timestamp, taId)); + events.add(new DataEventDependencyInfo(timestamp, taId)); TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent( TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), 0, null); + TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -547,6 +550,9 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getCounters()); Assert.assertEquals(event.getTaskAttemptError(), deserializedEvent.getTaskAttemptError()); + Assert.assertEquals(events.size(), event.getDataEvents().size()); + Assert.assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp()); + Assert.assertEquals(events.get(0).getTaskAttemptId(), event.getDataEvents().get(0).getTaskAttemptId()); logEvents(event, deserializedEvent); } } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 9c11dc7..711e4bb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -164,7 +164,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, 0, null); + random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index ccec0db..ca008ce 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -20,6 +20,7 @@ package org.apache.tez.history.parser.datamodel; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -28,10 +29,13 @@ import org.apache.tez.common.ATSConstants; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.history.parser.utils.Utils; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; import java.util.Map; import static org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -53,14 +57,29 @@ public class TaskAttemptInfo extends BaseInfo { private final String status; private final String logUrl; private final String creationCausalTA; - private final long lastDataEventTime; - private final String lastDataEventSourceTA; private final String terminationCause; private final long executionTimeInterval; + // this list is in time order - array list for easy walking + private final ArrayList lastDataEvents = Lists.newArrayList(); private TaskInfo taskInfo; private Container container; + + public static class DataDependencyEvent { + String taId; + long timestamp; + public DataDependencyEvent(String id, long time) { + taId = id; + timestamp = time; + } + public long getTimestamp() { + return timestamp; + } + public String getTaskAttemptId() { + return taId; + } + } TaskAttemptInfo(JSONObject jsonObject) throws JSONException { super(jsonObject); @@ -87,9 +106,17 @@ public class TaskAttemptInfo extends BaseInfo { status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS)); container = new Container(containerId, nodeId); - lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME); - lastDataEventSourceTA = StringInterner.weakIntern( - otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA)); + if (otherInfoNode.has(Constants.LAST_DATA_EVENTS)) { + List eventInfo = Utils.parseDataEventDependencyFromJSON( + otherInfoNode.optJSONObject(Constants.LAST_DATA_EVENTS)); + long lastTime = 0; + for (DataDependencyEvent item : eventInfo) { + // check these are in time order + Preconditions.checkState(lastTime < item.getTimestamp()); + lastTime = item.getTimestamp(); + lastDataEvents.add(item); + } + } terminationCause = StringInterner .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0; @@ -121,6 +148,10 @@ public class TaskAttemptInfo extends BaseInfo { return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); } + public final List getLastDataEvents() { + return lastDataEvents; + } + public final long getExecutionTimeInterval() { return executionTimeInterval; } @@ -149,14 +180,17 @@ public class TaskAttemptInfo extends BaseInfo { return creationTime; } - public final long getLastDataEventTime() { - return lastDataEventTime; + public final DataDependencyEvent getLastDataEventInfo(long timeThreshold) { + for (int i=lastDataEvents.size()-1; i>=0; i--) { + // walk back in time until we get first event that happened before the threshold + DataDependencyEvent item = lastDataEvents.get(i); + if (item.getTimestamp() < timeThreshold) { + return item; + } + } + return null; } - public final String getLastDataEventSourceTA() { - return lastDataEventSourceTA; - } - public final long getTimeTaken() { return getFinishTimeInterval() - getStartTimeInterval(); } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java index 7345012..ffb854a 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java @@ -19,7 +19,10 @@ package org.apache.tez.history.parser.utils; import com.google.common.base.Strings; +import com.google.common.collect.Lists; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.StringInterner; import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -27,8 +30,10 @@ import org.apache.log4j.PatternLayout; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.history.parser.datamodel.Constants; import org.apache.tez.history.parser.datamodel.Event; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -83,6 +88,19 @@ public class Utils { } return counters; } + + public static List parseDataEventDependencyFromJSON(JSONObject jsonObject) + throws JSONException { + List events = Lists.newArrayList(); + JSONArray fields = jsonObject.optJSONArray(Constants.LAST_DATA_EVENTS); + for (int i=0; i 0); + DataDependencyEvent item = attempt.getLastDataEvents().get(0); + assertTrue(item.getTimestamp() > 0); + if (lastDataEventSourceTA == null) { - lastDataEventSourceTA = attempt.getLastDataEventSourceTA(); + lastDataEventSourceTA = item.getTaskAttemptId(); } else { // all attempts should have the same last data event source TA - assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA())); + assertTrue(lastDataEventSourceTA.equals(item.getTaskAttemptId())); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 1b7e183..4685a61 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -435,10 +435,9 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); atsEntity.addOtherInfo(ATSConstants.COUNTERS, DAGUtils.convertCountersToATSMap(event.getCounters())); - atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_TIME, event.getLastDataEventTime()); - if (event.getLastDataEventSourceTA() != null) { - atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENT_SOURCE_TA, - event.getLastDataEventSourceTA().toString()); + if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) { + atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS, + DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents())); } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 8db32b0..2849c10 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.history.logging.ats; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -45,6 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.app.web.AMWebController; import org.apache.tez.dag.history.HistoryEvent; @@ -85,6 +87,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; + public class TestHistoryEventTimelineConversion { private ApplicationAttemptId applicationAttemptId; @@ -170,7 +174,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, 0, null); + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), @@ -445,6 +449,7 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.USER)); } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testConvertTaskAttemptFinishedEvent() { String vertexName = "testVertex"; @@ -457,9 +462,12 @@ public class TestHistoryEventTimelineConversion { String diagnostics = "random diagnostics message"; TezCounters counters = new TezCounters(); long lastDataEventTime = finishTime - 1; + List events = Lists.newArrayList(); + events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID)); + events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID)); TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, - startTime, finishTime, state, error, diagnostics, counters, lastDataEventTime, tezTaskAttemptID); + startTime, finishTime, state, error, diagnostics, counters, events); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -482,14 +490,17 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, evt.getTimestamp()); final Map otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(8, otherInfo.size()); + Assert.assertEquals(7, otherInfo.size()); Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); - Assert.assertEquals(lastDataEventTime, otherInfo.get(ATSConstants.LAST_DATA_EVENT_TIME)); - Assert.assertEquals(tezTaskAttemptID.toString(), otherInfo.get(ATSConstants.LAST_DATA_EVENT_SOURCE_TA)); + Map obj1 = (Map)otherInfo.get(ATSConstants.LAST_DATA_EVENTS); + List obj2 = (List) obj1.get(ATSConstants.LAST_DATA_EVENTS); + Assert.assertEquals(2, obj2.size()); + Map obj3 = (Map) obj2.get(0); + Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP)); Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); } http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index c8d4225..350f783 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -33,11 +33,11 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent import org.apache.tez.analyzer.utils.SVGUtils; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.history.parser.datamodel.Container; import org.apache.tez.history.parser.datamodel.DagInfo; import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -165,8 +165,10 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { .getAvgExecutionTimeInterval(); if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) { step.notes - .add("Potential straggler. Execution time " + attempt.getExecutionTimeInterval() - + " compared to vertex average of " + avgExecutionTime); + .add("Potential straggler. Execution time " + + SVGUtils.getTimeStr(attempt.getExecutionTimeInterval()) + + " compared to vertex average of " + + SVGUtils.getTimeStr(avgExecutionTime)); } if (attempt.getStartTime() > step.startCriticalPathTime) { @@ -231,14 +233,44 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { Preconditions.checkState(currentAttemptStopCriticalPathTime > 0); System.out.println( "Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId()); + currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT); currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime; + + // consider the last data event seen immediately preceding the current critical path + // stop time for this attempt + long currentStepLastDataEventTime = 0; + String currentStepLastDataTA = null; + DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime); + if (item!=null) { + currentStepLastDataEventTime = item.getTimestamp(); + currentStepLastDataTA = item.getTaskAttemptId(); + } + + // sanity check + for (CriticalPathStep previousStep : tempCP) { + if (previousStep.type == EntityType.ATTEMPT) { + if (previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) { + // found loop. + // this should only happen for read errors in currentAttempt + List dataEvents = currentAttempt.getLastDataEvents(); + Preconditions.checkState(dataEvents.size() > 1); // received + // original and + // retry data events + Preconditions.checkState(currentStepLastDataEventTime < dataEvents + .get(dataEvents.size() - 1).getTimestamp()); // new event is + // earlier than + // last + } + } + } + tempCP.add(currentStep); // find the next attempt on the critical path boolean dataDependency = false; // find out predecessor dependency - if (currentAttempt.getLastDataEventTime() > currentAttempt.getCreationTime()) { + if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) { dataDependency = true; } @@ -248,13 +280,13 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { if (dataDependency) { // last data event was produced after the attempt was scheduled. use // data dependency - // typically case when scheduling ahead of time + // typically the case when scheduling ahead of time System.out.println("Has data dependency"); - if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) { + if (!Strings.isNullOrEmpty(currentStepLastDataTA)) { // there is a valid data causal TA. Use it. - nextAttemptId = currentAttempt.getLastDataEventSourceTA(); + nextAttemptId = currentStepLastDataTA; reason = CriticalPathDependency.DATA_DEPENDENCY; - startCriticalPathTime = currentAttempt.getLastDataEventTime(); + startCriticalPathTime = currentStepLastDataEventTime; System.out.println("Using data dependency " + nextAttemptId); } else { // there is no valid data causal TA. This means data event came from the same vertex @@ -289,20 +321,30 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { } } } - startCriticalPathTime = currentAttempt.getCreationTime(); + if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) { + // rescheduled due to read error. start critical at read error report time. + // for now proxy own creation time for read error report time + startCriticalPathTime = currentAttempt.getCreationTime(); + } else { + // rescheduled due to own previous attempt failure + // we are critical when the previous attempt fails + Preconditions.checkState(nextAttempt != null); + Preconditions.checkState(nextAttempt.getTaskInfo().getTaskId().equals( + currentAttempt.getTaskInfo().getTaskId())); + startCriticalPathTime = nextAttempt.getFinishTime(); + } System.out.println("Using scheduling dependency " + nextAttemptId); } else { // there is no scheduling causal TA. - if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) { + if (!Strings.isNullOrEmpty(currentStepLastDataTA)) { // there is a data event going to the vertex. Count the time between data event and - // scheduling time as Initializer/Manager overhead and follow data dependency - nextAttemptId = currentAttempt.getLastDataEventSourceTA(); + // creation time as Initializer/Manager overhead and follow data dependency + nextAttemptId = currentStepLastDataTA; reason = CriticalPathDependency.DATA_DEPENDENCY; - startCriticalPathTime = currentAttempt.getLastDataEventTime(); - long overhead = currentAttempt.getCreationTime() - - currentAttempt.getLastDataEventTime(); + startCriticalPathTime = currentStepLastDataEventTime; + long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime; currentStep.notes - .add("Initializer/VertexManager scheduling overhead " + overhead + " ms"); + .add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead)); System.out.println("Using data dependency " + nextAttemptId); } else { // there is no scheduling causal TA and no data event casual TA. @@ -316,89 +358,6 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { } } - - if (!Strings.isNullOrEmpty(nextAttemptId)) { - TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId); - TaskAttemptInfo attemptToCheck = nextAttempt; - - // check if the next attempt is already on critical path to prevent infinite loop - boolean foundLoop = false; - CriticalPathDependency prevReason = null; - for (CriticalPathStep previousStep : tempCP) { - if (previousStep.attempt.equals(attemptToCheck)) { - foundLoop = true; - prevReason = previousStep.reason; - } - } - - if (foundLoop) { - // found a loop - find the next step based on heuristics - /* only the losing outputs causes us to backtrack. There are 2 cases - * 1) Step N reported last data event to this step - * -> Step N+1 (current step) is the retry for read error reported - * -> read error was reported by the Step N attempt and it did not exit after the - * error - * -> So scheduling dependency of Step N points back to step N+1 - * 2) Step N reported last data event to this step - * -> Step N+1 is a retry for a read error reported - * -> Step N+2 is the attempt that reported the read error - * -> Step N+3 is the last data event of N+2 and points back to N+1 - */ - System.out.println("Reset " + currentAttempt.getTaskAttemptId() - + " cause: " + currentAttempt.getTerminationCause() - + " time: " + currentAttempt.getFinishTime() - + " reason: " + reason - + " because of: " + attemptToCheck.getTaskAttemptId()); - TaskAttemptInfo attemptWithLostAncestor = currentAttempt; - if (reason != CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) { - // Case 2 above. If reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY - // then its Case 1 above - Preconditions.checkState(prevReason.equals( - CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), prevReason); - reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY; - attemptWithLostAncestor = nextAttempt; - } - System.out.println("Reset " + currentAttempt.getTaskAttemptId() - + " cause: " + currentAttempt.getTerminationCause() - + " time: " + currentAttempt.getFinishTime() - + " reason: " + reason - + " because of: " + attemptToCheck.getTaskAttemptId() - + " looking at: " + attemptWithLostAncestor.getTaskAttemptId()); - Preconditions.checkState(reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY); - // we dont track all input events to the consumer. So just jump to - // the previous successful version of the current attempt - TaskAttemptInfo prevSuccAttempt = null; - for (TaskAttemptInfo prevAttempt : attemptWithLostAncestor.getTaskInfo().getTaskAttempts()) { - System.out.println("Looking at " + prevAttempt.getTaskAttemptId() - + " cause: " + prevAttempt.getTerminationCause() + - " time: " + prevAttempt.getFinishTime()); - if (prevAttempt.getTerminationCause() - .equals(TaskAttemptTerminationCause.OUTPUT_LOST.name())) { - if (prevAttempt.getFinishTime() < currentAttempt.getFinishTime()) { - // attempt finished before current attempt - if (prevSuccAttempt == null - || prevAttempt.getFinishTime() > prevSuccAttempt.getFinishTime()) { - // keep the latest attempt that had lost outputs - prevSuccAttempt = prevAttempt; - } - } - } - } - Preconditions.checkState(prevSuccAttempt != null, - attemptWithLostAncestor.getTaskAttemptId()); - System.out - .println("Resetting nextAttempt to : " + prevSuccAttempt.getTaskAttemptId() - + " from " + nextAttempt.getTaskAttemptId()); - nextAttemptId = prevSuccAttempt.getTaskAttemptId(); - if (attemptWithLostAncestor == currentAttempt) { - startCriticalPathTime = currentAttempt.getCreationTime(); - } else { - startCriticalPathTime = prevSuccAttempt.getFinishTime(); - } - } - - } - currentStep.startCriticalPathTime = startCriticalPathTime; currentStep.reason = reason; http://git-wip-us.apache.org/repos/asf/tez/blob/171d4850/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java index 9a75461..f3a69a6 100644 --- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java @@ -182,7 +182,7 @@ public class TestAnalyzer { DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List steps) throws Exception { tezSession.waitTillReady(); numDAGs++; - LOG.info("XXX Running DAG name: " + dag.getName()); + LOG.info("ABC Running DAG name: " + dag.getName()); DAGClient dagClient = tezSession.submitDAG(dag); DAGStatus dagStatus = dagClient.getDAGStatus(null); while (!dagStatus.isCompleted()) { @@ -230,13 +230,13 @@ public class TestAnalyzer { List criticalPath = cp.getCriticalPath(); for (CriticalPathStep step : criticalPath) { - LOG.info("XXX Step: " + step.getType()); + LOG.info("ABC Step: " + step.getType()); if (step.getType() == EntityType.ATTEMPT) { - LOG.info("XXX Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus()); + LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus()); } - LOG.info("XXX Reason: " + step.getReason()); + LOG.info("ABC Reason: " + step.getReason()); String notes = Joiner.on(";").join(step.getNotes()); - LOG.info("XXX Notes: " + notes); + LOG.info("ABC Notes: " + notes); } boolean foundMatchingLength = false; @@ -361,6 +361,7 @@ public class TestAnalyzer { StepCheck[] check = { createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), }; @@ -416,7 +417,9 @@ public class TestAnalyzer { StepCheck[] check = { createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), }; @@ -484,6 +487,9 @@ public class TestAnalyzer { StepCheck[] check = { createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), @@ -553,7 +559,13 @@ public class TestAnalyzer { StepCheck[] check = { // use regex for either vertices being possible on the path createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), - createStep("v[12] : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v[12] : 000000_[01]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v[12] : 000000_[012]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v[12] : 000000_[12]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), createStep("v[12] : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), }; @@ -643,17 +655,11 @@ public class TestAnalyzer { StepCheck[] check1 = { // use regex for either vertices being possible on the path createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), - createStep("v[23] : 000000_0", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), }; - StepCheck[] check2 = { - createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), - createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), - createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), - }; stepsOptions.add(check1); - stepsOptions.add(check2); DAG dag = SimpleReverseVTestDAG.createDAG( "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf); runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions);