Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8224311B11 for ; Thu, 11 Sep 2014 20:22:42 +0000 (UTC) Received: (qmail 59903 invoked by uid 500); 11 Sep 2014 20:22:42 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 59811 invoked by uid 500); 11 Sep 2014 20:22:42 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 59795 invoked by uid 99); 11 Sep 2014 20:22:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Sep 2014 20:22:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id CFB4A9BD82D; Thu, 11 Sep 2014 20:22:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Date: Thu, 11 Sep 2014 20:22:42 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: TEZ-850. Recovery unit tests. (Jeff Zhang via hitesh) TEZ-850. Recovery unit tests. (Jeff Zhang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f65e65ae Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f65e65ae Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f65e65ae Branch: refs/heads/master Commit: f65e65aea8cbdb44dd65c6590fbe38dd84413a5a Parents: d6589d3 Author: Hitesh Shah Authored: Thu Sep 11 13:22:21 2014 -0700 Committer: Hitesh Shah Committed: Thu Sep 11 13:22:21 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 45 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 3 +- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 12 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 240 +++--- .../tez/dag/app/dag/impl/TestDAGImpl.java | 6 +- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 514 +++++++++++ .../app/dag/impl/TestTaskAttemptRecovery.java | 178 ++++ .../tez/dag/app/dag/impl/TestTaskRecovery.java | 629 +++++++++++--- .../dag/app/dag/impl/TestVertexRecovery.java | 860 +++++++++++++++++++ 10 files changed, 2243 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 032438f..4fc7e83 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: TEZ-1544. Link to release artifacts for 0.5.0 does not point to a specific link for 0.5.0. TEZ-1559. Add system tests for AM recovery. + TEZ-850. Recovery unit tests. Release 0.5.1: Unreleased http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 24d2e6b..daaa81b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -139,7 +139,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final TaskAttemptListener taskAttemptListener; private final TaskHeartbeatHandler taskHeartbeatHandler; private final Object tasksSyncHandle = new Object(); - + private volatile boolean committedOrAborted = false; private volatile boolean allOutputsCommitted = false; boolean commitAllOutputsOnSuccess = true; @@ -157,7 +157,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final StateChangeNotifier entityUpdateTracker; volatile Map vertices = new HashMap(); - private Map edges = new HashMap(); + @VisibleForTesting + Map edges = new HashMap(); private TezCounters dagCounters = new TezCounters(); private Object fullCountersLock = new Object(); private TezCounters fullCounters = null; @@ -359,14 +360,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private DAGTerminationCause terminationCause; private Credentials credentials; - private long initTime; - private long startTime; - private long finishTime; - + @VisibleForTesting + long initTime; + @VisibleForTesting + long startTime; + @VisibleForTesting + long finishTime; + Map vertexGroups = Maps.newHashMap(); Map> vertexGroupInfo = Maps.newHashMap(); private DAGState recoveredState = DAGState.NEW; - private boolean recoveryCommitInProgress = false; + @VisibleForTesting + boolean recoveryCommitInProgress = false; Map recoveredGroupCommits = new HashMap(); static class VertexGroupInfo { @@ -381,7 +386,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList()); edgeMergedInputs = Maps.newHashMapWithExpectedSize(groupInfo.getEdgeMergedInputsCount()); for (PlanGroupInputEdgeInfo edgInfo : groupInfo.getEdgeMergedInputsList()) { - edgeMergedInputs.put(edgInfo.getDestVertexName(), + edgeMergedInputs.put(edgInfo.getDestVertexName(), DagTypeConverters.convertInputDescriptorFromDAGPlan(edgInfo.getMergedInput())); } outputs = Sets.newHashSet(groupInfo.getOutputsList()); @@ -706,7 +711,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } } - + private boolean commitOutput(String outputName, OutputCommitter outputCommitter) { final OutputCommitter committer = outputCommitter; try { @@ -723,7 +728,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } return false; } - + private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) { if (this.committedOrAborted) { LOG.info("Ignoring multiple output commit/abort"); @@ -731,7 +736,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } LOG.info("Calling DAG commit/abort for dag: " + getID()); this.committedOrAborted = true; - + boolean successfulOutputsAlreadyCommitted = !commitAllOutputsOnSuccess; boolean failedWhileCommitting = false; if (dagSucceeded && !successfulOutputsAlreadyCommitted) { @@ -772,7 +777,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, LOG.info("No output committers for vertex: " + vertex.getName()); continue; } - Map outputCommitters = + Map outputCommitters = new HashMap(vertex.getOutputCommitters()); Set sharedOutputs = vertex.getSharedOutputs(); // remove shared outputs @@ -793,7 +798,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, LOG.info("Committing output: " + entry.getKey() + " for vertex: " + vertex.getVertexId()); if (vertex.getState() != VertexState.SUCCEEDED) { - throw new TezUncheckedException("Vertex: " + vertex.getName() + + throw new TezUncheckedException("Vertex: " + vertex.getName() + " not in SUCCEEDED state. State= " + vertex.getState()); } if (!commitOutput(entry.getKey(), entry.getValue())) { @@ -803,11 +808,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } } - + if (failedWhileCommitting) { LOG.info("DAG: " + getID() + " failed while committing"); } - + if (!dagSucceeded || failedWhileCommitting) { // come here because dag failed or // dag succeeded and all or none semantics were on and a commit failed @@ -1026,9 +1031,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, if (finishTime == 0) { setFinishTime(); } - + boolean allOutputsCommitted = commitOrAbortOutputs(finalState == DAGState.SUCCEEDED); - + if (finalState == DAGState.SUCCEEDED && !allOutputsCommitted) { finalState = DAGState.FAILED; trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE); @@ -1057,7 +1062,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, LOG.info("DAG: " + getID() + " finished with state: " + finalState); return finalState; } - + private DAGStatus.State getDAGStatusFromState(DAGState finalState) { switch (finalState) { case NEW: @@ -1631,7 +1636,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, + ", numFailedVertices=" + job.numFailedVertices + ", numKilledVertices=" + job.numKilledVertices + ", numVertices=" + job.numVertices); - + if (failed) { return DAGState.TERMINATING; } @@ -1724,7 +1729,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, addDiagnostic("Vertex re-running" + ", vertexName=" + vertex.getName() + ", vertexId=" + vertex.getVertexId()); - + if (!commitAllOutputsOnSuccess) { // partial output may already have been committed. fail if so List groupList = vertexGroupInfo.get(vertex.getName()); http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e63dbf5..7ba90b5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -137,7 +137,8 @@ public class TaskAttemptImpl implements TaskAttempt, private String nodeHttpAddress; private String nodeRackName; - private TaskAttemptStatus reportedStatus; + @VisibleForTesting + TaskAttemptStatus reportedStatus; private DAGCounter localityCounter; // Used to store locality information when http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index ecd2bcc..1dd711b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -116,7 +116,8 @@ public class TaskImpl implements Task, EventHandler { protected final AppContext appContext; private final Resource taskResource; private final ContainerContext containerContext; - private long scheduledTime; + @VisibleForTesting + long scheduledTime; private final List tezEventsForTaskAttempts = new ArrayList(); private static final List EMPTY_TASK_ATTEMPT_TEZ_EVENTS = @@ -124,7 +125,8 @@ public class TaskImpl implements Task, EventHandler { // counts the number of attempts that are either running or in a state where // they will come to be running when they get a Container - private int numberUncompletedAttempts = 0; + @VisibleForTesting + int numberUncompletedAttempts = 0; private boolean historyTaskStartGenerated = false; @@ -290,11 +292,13 @@ public class TaskImpl implements Task, EventHandler { //saying COMMIT_PENDING private TezTaskAttemptID commitAttempt; - private TezTaskAttemptID successfulAttempt; + @VisibleForTesting + TezTaskAttemptID successfulAttempt; @VisibleForTesting int failedAttempts; - private int finishedAttempts;//finish are total of success, failed and killed + @VisibleForTesting + int finishedAttempts;//finish are total of success, failed and killed private final boolean leafVertex; private TaskState recoveredState = TaskState.NEW; http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index ff556ba..31240cb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -205,9 +205,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, //fields initialized in init - private int numStartedSourceVertices = 0; - private int numInitedSourceVertices = 0; - private int numRecoveredSourceVertices = 0; + @VisibleForTesting + int numStartedSourceVertices = 0; + @VisibleForTesting + int numInitedSourceVertices = 0; + @VisibleForTesting + int numRecoveredSourceVertices = 0; private int distanceFromRoot = 0; @@ -238,7 +241,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, new VertexStateChangedCallback(); private VertexState recoveredState = VertexState.NEW; - private List recoveredEvents = new ArrayList(); + @VisibleForTesting + List recoveredEvents = new ArrayList(); private boolean vertexAlreadyInitialized = false; protected static final @@ -254,7 +258,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexState.INITIALIZING, VertexState.FAILED), VertexEventType.V_INIT, new InitTransition()) - .addTransition(VertexState.NEW, + .addTransition(VertexState.NEW, EnumSet.of(VertexState.NEW), VertexEventType.V_NULL_EDGE_INITIALIZED, new NullEdgeInitializedTransition()) @@ -308,7 +312,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // Transitions from INITIALIZING state .addTransition(VertexState.INITIALIZING, - EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, + EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.FAILED), VertexEventType.V_ROOT_INPUT_INITIALIZED, new RootInputInitializedTransition()) @@ -341,14 +345,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, .addTransition(VertexState.INITIALIZING, VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - .addTransition(VertexState.INITIALIZING, + .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.FAILED), VertexEventType.V_NULL_EDGE_INITIALIZED, new NullEdgeInitializedTransition()) // Transitions from INITED state - // SOURCE_VERTEX_STARTED - for sources which determine parallelism, + // SOURCE_VERTEX_STARTED - for sources which determine parallelism, // they must complete before this vertex can start. .addTransition (VertexState.INITED, @@ -358,14 +362,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, .addTransition(VertexState.INITED, VertexState.INITED, VertexEventType.V_SOURCE_VERTEX_STARTED, new SourceVertexStartedTransition()) - .addTransition(VertexState.INITED, + .addTransition(VertexState.INITED, EnumSet.of(VertexState.INITED), VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, new OneToOneSourceSplitTransition()) .addTransition(VertexState.INITED, VertexState.INITED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) - .addTransition(VertexState.INITED, + .addTransition(VertexState.INITED, EnumSet.of(VertexState.RUNNING, VertexState.INITED), VertexEventType.V_START, new StartTransition()) @@ -393,7 +397,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexState.ERROR), VertexEventType.V_TASK_COMPLETED, new TaskCompletedTransition()) - .addTransition(VertexState.RUNNING, + .addTransition(VertexState.RUNNING, EnumSet.of(VertexState.RUNNING), VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, new OneToOneSourceSplitTransition()) @@ -455,7 +459,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_ROUTE_EVENT, ROUTE_EVENT_TRANSITION) .addTransition( - VertexState.SUCCEEDED, + VertexState.SUCCEEDED, EnumSet.of(VertexState.FAILED, VertexState.ERROR), VertexEventType.V_TASK_COMPLETED, new TaskCompletedAfterVertexSuccessTransition()) @@ -551,17 +555,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private final StateMachineTez stateMachine; //changing fields while the vertex is running - private int numTasks; - private int completedTaskCount = 0; - private int succeededTaskCount = 0; - private int failedTaskCount = 0; - private int killedTaskCount = 0; - - private long initTimeRequested; // Time at which INIT request was received. - private long initedTime; // Time when entering state INITED - private long startTimeRequested; // Time at which START request was received. - private long startedTime; // Time when entering state STARTED - private long finishTime; + @VisibleForTesting + int numTasks; + @VisibleForTesting + int completedTaskCount = 0; + @VisibleForTesting + int succeededTaskCount = 0; + @VisibleForTesting + int failedTaskCount = 0; + @VisibleForTesting + int killedTaskCount = 0; + + @VisibleForTesting + long initTimeRequested; // Time at which INIT request was received. + @VisibleForTesting + long initedTime; // Time when entering state INITED + @VisibleForTesting + long startTimeRequested; // Time at which START request was received. + @VisibleForTesting + long startedTime; // Time when entering state STARTED + @VisibleForTesting + long finishTime; private float progress; private final TezVertexID vertexId; //runtime assigned id. @@ -576,14 +590,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private Map targetVertices; Set uninitializedEdges = Sets.newHashSet(); - private Map> + private Map> rootInputDescriptors; - private Map> + private Map> additionalOutputs; private Map outputCommitters; private Map rootInputSpecs = new HashMap(); private static final InputSpecUpdate DEFAULT_ROOT_INPUT_SPECS = InputSpecUpdate - .getDefaultSinglePhysicalInputSpecUpdate(); + .getDefaultSinglePhysicalInputSpecUpdate(); private final List additionalOutputSpecs = new ArrayList(); private Set inputsWithInitializers; private int numInitializedInputs; @@ -598,7 +612,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private RootInputInitializerManager rootInputInitializerManager; VertexManager vertexManager; - + private final UserGroupInformation dagUgi; private boolean parallelismSet = false; @@ -607,20 +621,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private AtomicBoolean committed = new AtomicBoolean(false); private AtomicBoolean aborted = new AtomicBoolean(false); private boolean commitVertexOutputs = false; - + private Map dagVertexGroups; - + private TaskLocationHint taskLocationHints[]; private Map localResources; private Map environment; private final String javaOpts; private final ContainerContext containerContext; private VertexTerminationCause terminationCause; - + private String logIdentifier; - private boolean recoveryCommitInProgress = false; + @VisibleForTesting + boolean recoveryCommitInProgress = false; private boolean summaryCompleteSeen = false; - private boolean hasCommitter = false; + @VisibleForTesting + boolean hasCommitter = false; private boolean vertexCompleteSeen = false; private Map recoveredSourceEdgeManagers = null; private Map recoveredRootInputSpecUpdates = null; @@ -658,7 +674,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, logLocationHints(this.vertexName, vertexLocationHint); } setTaskLocationHints(vertexLocationHint); - + this.dagUgi = appContext.getCurrentDAG().getDagUGI(); this.taskResource = DagTypeConverters @@ -897,12 +913,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.readLock.unlock(); } } - + @Override public TaskLocationHint getTaskLocationHint(TezTaskID taskId) { this.readLock.lock(); try { - if (taskLocationHints == null || + if (taskLocationHints == null || taskLocationHints.length <= taskId.getId()) { return null; } @@ -1081,8 +1097,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } private void setTaskLocationHints(VertexLocationHint vertexLocationHint) { - if (vertexLocationHint != null && - vertexLocationHint.getTaskLocationHints() != null && + if (vertexLocationHint != null && + vertexLocationHint.getTaskLocationHints() != null && !vertexLocationHint.getTaskLocationHints().isEmpty()) { List locHints = vertexLocationHint.getTaskLocationHints(); taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]); @@ -1154,7 +1170,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } } - + // Restore any rootInputSpecUpdates which may have been registered during a parallelism // update. if (rootInputSpecUpdates != null) { @@ -1166,7 +1182,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, writeLock.unlock(); } } - Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " + Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value: " + parallelism + " for vertex: " + logIdentifier); setVertexLocationHint(vertexLocationHint); writeLock.lock(); @@ -1175,7 +1191,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Parallelism can only be set dynamically once per vertex: " + logIdentifier); return false; } - + parallelismSet = true; // Input initializer/Vertex Manager/1-1 split expected to set parallelism. @@ -1185,7 +1201,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, "Vertex state is not Initializing. Value: " + getState() + " for vertex: " + logIdentifier); } - + if(sourceEdgeManagers != null) { for(Map.Entry entry : sourceEdgeManagers.entrySet()) { LOG.info("Replacing edge manager for source:" @@ -1223,7 +1239,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); this.createTasks(); - LOG.info("Vertex " + getVertexId() + + LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism); if (canInitVertex()) { getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT)); @@ -1244,7 +1260,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return false; } if (parallelism == numTasks) { - LOG.info("setParallelism same as current value: " + parallelism + + LOG.info("setParallelism same as current value: " + parallelism + " for vertex: " + logIdentifier); Preconditions.checkArgument(sourceEdgeManagers != null, "Source edge managers or RootInputSpecs must be set when not changing parallelism"); @@ -1258,7 +1274,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, for (Edge edge : sourceVertices.values()) { edge.startEventBuffering(); } - + // assign to local variable of LinkedHashMap to make sure that changing // type of task causes compile error. We depend on LinkedHashMap for order LinkedHashMap currentTasks = this.tasks; @@ -1281,14 +1297,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Removing task: " + entry.getKey()); iter.remove(); } - LOG.info("Vertex " + logIdentifier + + LOG.info("Vertex " + logIdentifier + " parallelism set to " + parallelism + " from " + numTasks); int oldNumTasks = numTasks; this.numTasks = parallelism; stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); assert tasks.size() == numTasks; - + // set new edge managers if(sourceEdgeManagers != null) { for(Map.Entry entry : sourceEdgeManagers.entrySet()) { @@ -1320,17 +1336,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, edge.stopEventBuffering(); } } - + for (Map.Entry entry : targetVertices.entrySet()) { Edge edge = entry.getValue(); - if (edge.getEdgeProperty().getDataMovementType() + if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) { // inform these target vertices that we have changed parallelism - VertexEventOneToOneSourceSplit event = + VertexEventOneToOneSourceSplit event = new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(), getVertexId(), - ((originalOneToOneSplitSource!=null) ? - originalOneToOneSplitSource : getVertexId()), + ((originalOneToOneSplitSource!=null) ? + originalOneToOneSplitSource : getVertexId()), numTasks); getEventHandler().handle(event); } @@ -1339,7 +1355,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } finally { writeLock.unlock(); } - + return true; } @@ -1733,7 +1749,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // no code, for now } - private ContainerContext getContainerContext(int taskIdx) { + @VisibleForTesting + ContainerContext getContainerContext(int taskIdx) { if (taskSpecificLaunchCmdOpts.addTaskSpecificLaunchCmdOption(vertexName, taskIdx)) { String jvmOpts = taskSpecificLaunchCmdOpts.getTaskSpecificOption(javaOpts, vertexName, taskIdx); ContainerContext context = new ContainerContext(this.localResources, @@ -1806,9 +1823,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (rootInputDescriptors != null) { LOG.info("Root Inputs exist for Vertex: " + getName() + " : " + rootInputDescriptors); - for (RootInputLeafOutput input + for (RootInputLeafOutput input : rootInputDescriptors.values()) { - if (input.getControllerDescriptor() != null && + if (input.getControllerDescriptor() != null && input.getControllerDescriptor().getClassName() != null) { if (inputsWithInitializers == null) { inputsWithInitializers = Sets.newHashSet(); @@ -1875,7 +1892,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, checkTaskLimits(); return VertexState.INITED; } - + private void assignVertexManager() { boolean hasBipartite = false; boolean hasOneToOne = false; @@ -1895,12 +1912,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, hasCustom = true; break; default: - throw new TezUncheckedException("Unknown data movement type: " + + throw new TezUncheckedException("Unknown data movement type: " + edge.getEdgeProperty().getDataMovementType()); } } } - + boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin(); if (hasUserVertexManager) { @@ -2008,6 +2025,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.finished(VertexState.ERROR); } + // recover from recover log, should recover to running + // desiredState must be RUNNING based on above code VertexState endState; switch (vertex.recoveredState) { case NEW: @@ -2086,6 +2105,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions); endState = VertexState.RUNNING; } else { + // why succeeded here endState = VertexState.SUCCEEDED; vertex.finished(endState); } @@ -2249,7 +2269,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, (vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) || vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex)), "Not connected to vertex " + otherVertex.getName() + " from vertex: " + vertex.logIdentifier); - LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() + + LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() + " at vertex : " + vertex.logIdentifier); vertex.uninitializedEdges.remove(edge); if(vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) { @@ -2596,7 +2616,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (state.equals(VertexState.FAILED)) { return state; } - // TODO move before to handle NEW state + // TODO move before to handle NEW state if (vertex.targetVertices != null) { for (Edge e : vertex.targetVertices.values()) { if (e.getEdgeManager() == null) { @@ -2619,7 +2639,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } } - + // Create tasks based on initial configuration, but don't start them yet. if (vertex.numTasks == -1) { LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split" @@ -2633,7 +2653,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.appContext.getTaskScheduler().getNumClusterNodes(), vertex.getTaskResource(), vertex.appContext.getTaskScheduler().getTotalResources()); - List> + List> inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size()); for (String inputName : vertex.inputsWithInitializers) { inputList.add(vertex.rootInputDescriptors.get(inputName)); @@ -2646,7 +2666,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } else { boolean hasOneToOneUninitedSource = false; for (Map.Entry entry : vertex.sourceVertices.entrySet()) { - if (entry.getValue().getEdgeProperty().getDataMovementType() == + if (entry.getValue().getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) { if (entry.getKey().getTotalTasks() == -1) { hasOneToOneUninitedSource = true; @@ -2662,7 +2682,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier); return VertexState.INITIALIZING; } - throw new TezUncheckedException(vertex.getVertexId() + + throw new TezUncheckedException(vertex.getVertexId() + " has -1 tasks but does not have input initializers, " + "1-1 uninited sources or custom vertex manager to set it at runtime"); } @@ -2676,14 +2696,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.appContext.getTaskScheduler().getNumClusterNodes(), vertex.getTaskResource(), vertex.appContext.getTaskScheduler().getTotalResources()); - List> + List> inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size()); for (String inputName : vertex.inputsWithInitializers) { inputList.add(vertex.rootInputDescriptors.get(inputName)); } LOG.info("Starting root input initializers: " + vertex.inputsWithInitializers.size()); - // special case when numTasks>0 and still we want to stay in initializing + // special case when numTasks>0 and still we want to stay in initializing // state. This is handled in RootInputInitializedTransition specially. vertex.initWaitsForRootInitializers = true; vertex.rootInputInitializerManager.runInputInitializers(inputList); @@ -2711,7 +2731,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Resource vertexTaskResource, Resource totalResource) { return new RootInputInitializerManager(this, appContext, this.dagUgi, this.stateChangeNotifier); } - + private boolean initializeVertexInInitializingState() { boolean isInitialized = initializeVertex(); if (!isInitialized) { @@ -2721,7 +2741,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return true; } - + void startIfPossible() { if (startSignalPending) { // Trigger a start event to ensure route events are seen before @@ -2735,7 +2755,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, public static class VertexInitializedTransition implements MultipleArcTransition { - + static VertexState doTransition(VertexImpl vertex) { Preconditions.checkState(vertex.canInitVertex(), "Vertex: " + vertex.logIdentifier); boolean isInitialized = vertex.initializeVertexInInitializingState(); @@ -2744,15 +2764,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } vertex.startIfPossible(); - return VertexState.INITED; + return VertexState.INITED; } - + @Override public VertexState transition(VertexImpl vertex, VertexEvent event) { return doTransition(vertex); } } - + // present in most transitions so that the initializer thread can be shutdown properly public static class RootInputInitializedTransition implements MultipleArcTransition { @@ -2773,15 +2793,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // All inputs initialized, shutdown the initializer. vertex.rootInputInitializerManager.shutdown(); } - + // done. check if we need to do the initialization - if (vertex.getState() == VertexState.INITIALIZING && + if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) { // set the wait flag to false vertex.initWaitsForRootInitializers = false; // initialize vertex if possible and needed if (vertex.canInitVertex()) { - Preconditions.checkState(vertex.numTasks >= 0, + Preconditions.checkState(vertex.numTasks >= 0, "Parallelism should have been set by now for vertex: " + vertex.logIdentifier); return VertexInitializedTransition.doTransition(vertex); } @@ -2795,10 +2815,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public VertexState transition(VertexImpl vertex, VertexEvent event) { - VertexEventOneToOneSourceSplit splitEvent = + VertexEventOneToOneSourceSplit splitEvent = (VertexEventOneToOneSourceSplit)event; TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource(); - + if (vertex.originalOneToOneSplitSource != null) { VertexState state = vertex.getState(); Preconditions @@ -2813,25 +2833,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) { // ignore another split event that may have come from a different // path in the DAG. We have already split because of that source - LOG.info("Ignoring split of vertex " + vertex.getVertexId() + - " because of split in vertex " + originalSplitSource + + LOG.info("Ignoring split of vertex " + vertex.getVertexId() + + " because of split in vertex " + originalSplitSource + " sent by vertex " + splitEvent.getSenderVertex() + " numTasks " + splitEvent.getNumTasks()); return state; } // cannot split from multiple sources - throw new TezUncheckedException("Vertex: " + vertex.getVertexId() + - " asked to split by: " + originalSplitSource + + throw new TezUncheckedException("Vertex: " + vertex.getVertexId() + + " asked to split by: " + originalSplitSource + " but was already split by:" + vertex.originalOneToOneSplitSource); } - - LOG.info("Splitting vertex " + vertex.getVertexId() + - " because of split in vertex " + originalSplitSource + + + LOG.info("Splitting vertex " + vertex.getVertexId() + + " because of split in vertex " + originalSplitSource + " sent by vertex " + splitEvent.getSenderVertex() + " numTasks " + splitEvent.getNumTasks()); vertex.originalOneToOneSplitSource = originalSplitSource; vertex.setParallelism(splitEvent.getNumTasks(), null, null, null); - if (vertex.getState() == VertexState.RUNNING || + if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.INITED) { return vertex.getState(); } else { @@ -2861,19 +2881,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } vertex.numStartedSourceVertices++; LOG.info("Source vertex started: " + startEvent.getSourceVertexId() + - " for vertex: " + vertex.getVertexId() + " numStartedSources: " + + " for vertex: " + vertex.getVertexId() + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); - + if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) { LOG.info("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); return; } - - // vertex meets external start dependency conditions. Save this signal in + + // vertex meets external start dependency conditions. Save this signal in // case we are not ready to start now and need to start later vertex.startSignalPending = true; - + if (vertex.getState() != VertexState.INITED) { // vertex itself is not ready to start. External dependencies have already // notified us. @@ -2883,14 +2903,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, + vertex.uninitializedEdges.size()); return; } - + // vertex is inited and all dependencies are ready. Inited vertex means // parallelism must be set already and edges defined Preconditions.checkState( (vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty()), "Cannot start vertex that is not completely defined. Vertex: " + vertex.logIdentifier + " numTasks: " + vertex.numTasks); - + vertex.startIfPossible(); } } @@ -2906,14 +2926,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, + " initWaitsForRootInitializers: " + initWaitsForRootInitializers); return false; } - - public static class StartWhileInitializingTransition implements + + public static class StartWhileInitializingTransition implements SingleArcTransition { @Override public void transition(VertexImpl vertex, VertexEvent event) { // vertex state machine does not start itself in the initializing state - // this start event can only come directly from the DAG. That means this + // this start event can only come directly from the DAG. That means this // is a top level vertex of the dag Preconditions.checkState( (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()), @@ -2926,10 +2946,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, public static class StartTransition implements MultipleArcTransition { - + @Override public VertexState transition(VertexImpl vertex, VertexEvent event) { - Preconditions.checkState(vertex.getState() == VertexState.INITED, + Preconditions.checkState(vertex.getState() == VertexState.INITED, "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier); vertex.startTimeRequested = vertex.clock.getTime(); return vertex.startVertex(); @@ -2937,7 +2957,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } private VertexState startVertex() { - Preconditions.checkState(getState() == VertexState.INITED, + Preconditions.checkState(getState() == VertexState.INITED, "Vertex must be inited " + logIdentifier); startedTime = clock.getTime(); @@ -3095,7 +3115,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break; case ROOT_INPUT_INIT_FAILURE: case COMMIT_FAILURE: - case INVALID_NUM_OF_TASKS: + case INVALID_NUM_OF_TASKS: case INIT_FAILURE: case INTERNAL_ERROR: case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break; @@ -3172,7 +3192,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (taskEvent.getState() == TaskState.SUCCEEDED) { taskSucceeded(vertex, task); } else if (taskEvent.getState() == TaskState.FAILED) { - LOG.info("Failing vertex: " + vertex.logIdentifier + + LOG.info("Failing vertex: " + vertex.logIdentifier + " because task failed: " + taskEvent.getTaskID()); vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE); forceTransitionToKillWait = true; @@ -3220,7 +3240,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.succeededTaskCount--; } } - + private static class VertexNoTasksCompletedTransition implements MultipleArcTransition { @@ -3229,7 +3249,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return VertexImpl.checkVertexForCompletion(vertex); } } - + private static class TaskCompletedAfterVertexSuccessTransition implements MultipleArcTransition { @Override @@ -3241,12 +3261,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (vEvent.getState() == TaskState.FAILED) { finalState = VertexState.FAILED; finalStatus = VertexStatus.State.FAILED; - diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() + + diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID() + " failed after vertex succeeded."; } else { finalState = VertexState.ERROR; finalStatus = VertexStatus.State.ERROR; - diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() + + diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID() + " completed with state " + vEvent.getState() + " after vertex succeeded."; } LOG.info(diagnosticMsg); @@ -3599,14 +3619,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Nullable @Override - public Map> + public Map> getAdditionalInputs() { return this.rootInputDescriptors; } @Nullable @Override - public Map> + public Map> getAdditionalOutputs() { return this.additionalOutputs; } @@ -3696,7 +3716,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, inputSpecList = new ArrayList(this.getInputVerticesCount() + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size())); if (rootInputDescriptors != null) { - for (Entry> + for (Entry> rootInputDescriptorEntry : rootInputDescriptors.entrySet()) { inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(), rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get( @@ -3729,18 +3749,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } return outputSpecList; } - + //TODO Eventually remove synchronization. @Override public synchronized List getGroupInputSpecList(int taskIndex) { return groupInputSpecList; } - + @Override public synchronized void addSharedOutputs(Set outputs) { this.sharedOutputs.addAll(outputs); } - + @Override public synchronized Set getSharedOutputs() { return this.sharedOutputs; http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index ec05815..aba4fd9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -111,7 +111,7 @@ public class TestDAGImpl { private static final Log LOG = LogFactory.getLog(TestDAGImpl.class); private DAGPlan dagPlan; private TezDAGID dagId; - private Configuration conf; + private static Configuration conf; private DrainDispatcher dispatcher; private Credentials fsTokens; private AppContext appContext; @@ -344,7 +344,7 @@ public class TestDAGImpl { } // Create a plan with 3 vertices: A, B, C. Group(A,B)->C - private DAGPlan createGroupDAGPlan() { + static DAGPlan createGroupDAGPlan() { LOG.info("Setting up group dag plan"); int dummyTaskCount = 1; Resource dummyTaskResource = Resource.newInstance(1, 1); @@ -381,7 +381,7 @@ public class TestDAGImpl { return dag.createDag(conf); } - private DAGPlan createTestDAGPlan() { + public static DAGPlan createTestDAGPlan() { LOG.info("Setting up dag plan"); DAGPlan dag = DAGPlan.newBuilder() .setName("testverteximpl") http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java new file mode 100644 index 0000000..da0186e --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished; +import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent; +import org.apache.tez.dag.app.dag.event.DAGEventType; +import org.apache.tez.dag.app.dag.event.VertexEvent; +import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; +import org.apache.tez.dag.history.events.DAGCommitStartedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; +import org.apache.tez.dag.records.TezDAGID; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class TestDAGRecovery { + + private DAGImpl dag; + private EventHandler mockEventHandler; + + private String user = "root"; + private String dagName = "dag1"; + + private AppContext mockAppContext; + private ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + private TezDAGID dagId = TezDAGID.getInstance(appId, 1); + private long initTime = 100L; + private long startTime = initTime + 200L; + private long commitStartTime = startTime + 200L; + private long finishTime = commitStartTime + 200L; + + @Before + public void setUp() { + + mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + when(mockAppContext.getCurrentDAG().getDagUGI()).thenReturn(null); + mockEventHandler = mock(EventHandler.class); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + dag = + new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler, + mock(TaskAttemptListener.class), new Credentials(), + new SystemClock(), user, mock(TaskHeartbeatHandler.class), + mockAppContext); + } + + private void assertNewState() { + assertEquals(0, dag.getVertices().size()); + assertEquals(0, dag.edges.size()); + assertNull(dag.dagScheduler); + assertFalse(dag.recoveryCommitInProgress); + assertEquals(0, dag.recoveredGroupCommits.size()); + } + + private void restoreFromDAGInitializedEvent() { + DAGState recoveredState = + dag.restoreFromEvent(new DAGInitializedEvent(dagId, initTime, user, + dagName)); + assertEquals(DAGState.INITED, recoveredState); + assertEquals(initTime, dag.initTime); + assertEquals(6, dag.getVertices().size()); + assertEquals(6, dag.edges.size()); + assertNotNull(dag.dagScheduler); + } + + private void restoreFromDAGStartedEvent() { + DAGState recoveredState = + dag.restoreFromEvent(new DAGStartedEvent(dagId, startTime, user, + dagName)); + assertEquals(startTime, dag.startTime); + assertEquals(DAGState.RUNNING, recoveredState); + } + + private void restoreFromDAGCommitStartedEvent() { + DAGState recoveredState = + dag.restoreFromEvent(new DAGCommitStartedEvent(dagId, commitStartTime)); + assertTrue(dag.recoveryCommitInProgress); + assertEquals(DAGState.RUNNING, recoveredState); + } + + private void restoreFromVertexGroupCommitStartedEvent() { + DAGState recoveredState = + dag.restoreFromEvent(new VertexGroupCommitStartedEvent(dagId, "g1", + commitStartTime)); + assertEquals(1, dag.recoveredGroupCommits.size()); + assertFalse(dag.recoveredGroupCommits.get("g1").booleanValue()); + assertEquals(DAGState.RUNNING, recoveredState); + } + + private void restoreFromVertexGroupCommitFinishedEvent() { + DAGState recoveredState = + dag.restoreFromEvent(new VertexGroupCommitFinishedEvent(dagId, "g1", + commitStartTime + 100L)); + assertEquals(1, dag.recoveredGroupCommits.size()); + assertTrue(dag.recoveredGroupCommits.get("g1").booleanValue()); + assertEquals(DAGState.RUNNING, recoveredState); + } + + private void restoreFromDAGFinishedEvent(DAGState finalState) { + DAGState recoveredState = + dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime, + finalState, "", new TezCounters(), user, dagName)); + assertEquals(finishTime, dag.finishTime); + assertFalse(dag.recoveryCommitInProgress); + assertEquals(finalState, recoveredState); + } + + /** + * New -> RecoverTransition + */ + @Test + public void testDAGRecovery_FromNew() { + assertNewState(); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + + ArgumentCaptor eventCaptor = + ArgumentCaptor.forClass(DAGEvent.class); + verify(mockEventHandler, times(2)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + assertEquals(2, events.size()); + assertEquals(DAGEventType.DAG_INIT, events.get(0).getType()); + assertEquals(DAGEventType.DAG_START, events.get(1).getType()); + } + + /** + * restoreFromDAGInitializedEvent -> RecoverTransition + */ + @Test + public void testDAGRecovery_FromInited() { + assertNewState(); + restoreFromDAGInitializedEvent(); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + + assertEquals(DAGState.RUNNING, dag.getState()); + // send recover event to 2 root vertex + ArgumentCaptor eventCaptor = + ArgumentCaptor.forClass(VertexEvent.class); + verify(mockEventHandler, times(2)).handle(eventCaptor.capture()); + List vertexEvents = eventCaptor.getAllValues(); + assertEquals(2, vertexEvents.size()); + for (VertexEvent vEvent : vertexEvents) { + assertTrue(vEvent instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent; + assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState()); + } + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * RecoverTransition + */ + @Test + public void testDAGRecovery_FromStarted() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + + assertEquals(DAGState.RUNNING, dag.getState()); + // send recover event to 2 root vertex + ArgumentCaptor eventCaptor = + ArgumentCaptor.forClass(VertexEvent.class); + verify(mockEventHandler, times(2)).handle(eventCaptor.capture()); + List vertexEvents = eventCaptor.getAllValues(); + assertEquals(2, vertexEvents.size()); + for (VertexEvent vEvent : vertexEvents) { + assertTrue(vEvent instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent; + assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState()); + } + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * restoreFromDAGFinishedEvent (SUCCEEDED) -> RecoverTransition + */ + @Test + public void testDAGRecovery_Finished_SUCCEEDED() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromDAGFinishedEvent(DAGState.SUCCEEDED); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.SUCCEEDED, dag.getState()); + + // recover all the vertices to SUCCEED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState()); + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * restoreFromDAGFinishedEvent(FAILED) -> RecoverTransition + */ + @Test + public void testDAGRecovery_Finished_FAILED() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromDAGFinishedEvent(DAGState.FAILED); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.FAILED, dag.getState()); + + // recover all the vertices to FAILED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.FAILED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState()); + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> -> + * restoreFromDAGFinishedEvent -> RecoverTransition + */ + @Test + public void testDAGRecovery_Finished_KILLED() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromDAGFinishedEvent(DAGState.KILLED); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.KILLED, dag.getState()); + + // recover all the vertices to KILLED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.KILLED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState()); + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> -> + * restoreFromDAGFinishedEvent -> RecoverTransition + */ + @Test + public void testDAGRecovery_Finished_ERROR() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromDAGFinishedEvent(DAGState.ERROR); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.ERROR, dag.getState()); + + // recover all the vertices to KILLED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.FAILED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.ERROR, dagFinishedEvent.getDAGState()); + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * restoreFromDAG_COMMIT_STARTED -> RecoverTransition + */ + @Test + public void testDAGRecovery_COMMIT_STARTED() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromDAGCommitStartedEvent(); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.FAILED, dag.getState()); + + // recover all the vertices to SUCCEEDED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState()); + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * restoreFromDAG_COMMIT_STARTED -> -> restoreFromDAGFinished (SUCCEEDED)-> + * RecoverTransition + */ + @Test + public void testDAGRecovery_COMMIT_STARTED_Finished_SUCCEEDED() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + + restoreFromDAGCommitStartedEvent(); + restoreFromDAGFinishedEvent(DAGState.SUCCEEDED); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.SUCCEEDED, dag.getState()); + + // recover all the vertices to SUCCEED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState()); + + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * restoreFromVERTEX_GROUP_COMMIT_STARTED -> RecoverTransition + */ + @Test + public void testDAGRecovery_GROUP_COMMIT_STARTED() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromVertexGroupCommitStartedEvent(); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.FAILED, dag.getState()); + + // recover all the vertices to SUCCEEDED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState()); + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED -> + * RecoverTransition + */ + @Test + public void testDAGRecovery_GROUP_COMMIT_STARTED_FINISHED() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + + restoreFromVertexGroupCommitStartedEvent(); + restoreFromVertexGroupCommitFinishedEvent(); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + assertEquals(DAGState.RUNNING, dag.getState()); + + // send recover event to 2 root vertex + verify(mockEventHandler, times(2)).handle( + any(VertexEventRecoverVertex.class)); + assertEquals(DAGState.RUNNING, dag.getState()); + } + + /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> + * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED -> + * restoreFromDAG_Finished -> RecoverTransition + */ + @Test + public void testDAGRecovery_GROUP_COMMIT_Finished() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + + restoreFromVertexGroupCommitStartedEvent(); + restoreFromVertexGroupCommitFinishedEvent(); + restoreFromDAGFinishedEvent(DAGState.SUCCEEDED); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList())); + + // recover all the vertices to SUCCEEDED + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState()); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/f65e65ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java new file mode 100644 index 0000000..3b04cf6 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.junit.Before; +import org.junit.Test; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class TestTaskAttemptRecovery { + + private TaskAttemptImpl ta; + private EventHandler mockEventHandler; + private long startTime = System.currentTimeMillis(); + private long finishTime = startTime + 5000; + + private TezTaskAttemptID taId = mock(TezTaskAttemptID.class); + private String vertexName = "v1"; + + @Before + public void setUp() { + mockEventHandler = mock(EventHandler.class); + TezTaskID taskId = + TezTaskID.fromString("task_1407371892933_0001_1_00_000000"); + ta = + new TaskAttemptImpl(taskId, 0, mockEventHandler, + mock(TaskAttemptListener.class), new Configuration(), + new SystemClock(), mock(TaskHeartbeatHandler.class), + mock(AppContext.class), false, Resource.newInstance(1, 1), + mock(ContainerContext.class), false); + } + + private void restoreFromTAStartEvent() { + TaskAttemptState recoveredState = + ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, + startTime, mock(ContainerId.class), mock(NodeId.class), "", "")); + assertEquals(startTime, ta.getLaunchTime()); + assertEquals(TaskAttemptState.RUNNING, recoveredState); + } + + private void restoreFromTAFinishedEvent(TaskAttemptState state) { + String diag = "test_diag"; + TezCounters counters = mock(TezCounters.class); + + TaskAttemptState recoveredState = + ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, + startTime, finishTime, state, diag, counters)); + assertEquals(startTime, ta.getLaunchTime()); + assertEquals(finishTime, ta.getFinishTime()); + assertEquals(counters, ta.reportedStatus.counters); + assertEquals(1.0f, ta.reportedStatus.progress, 1e-6); + assertEquals(state, ta.reportedStatus.state); + assertEquals(1, ta.getDiagnostics().size()); + assertEquals(diag, ta.getDiagnostics().get(0)); + assertEquals(state, recoveredState); + } + + /** + * No any event to restore -> RecoverTransition + */ + @Test + public void testTARecovery_NEW() { + ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); + assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState()); + verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class)); + } + + /** + * restoreFromTAStartEvent -> RecoverTransition + */ + @Test + public void testTARecovery_START() { + restoreFromTAStartEvent(); + + ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); + assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState()); + verify(mockEventHandler, times(1)).handle(any(TaskEventTAUpdate.class)); + } + + /** + * restoreFromTAStartEvent -> restoreFromTAFinished (SUCCEED) + * -> RecoverTransition + */ + @Test + public void testTARecovery_SUCCEED() { + restoreFromTAStartEvent(); + restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED); + + ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); + assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState()); + verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class)); + } + + /** + * restoreFromTAStartEvent -> restoreFromTAFinished (KILLED) + * -> RecoverTransition + */ + @Test + public void testTARecovery_KIILED() { + restoreFromTAStartEvent(); + restoreFromTAFinishedEvent(TaskAttemptState.KILLED); + + ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); + assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState()); + verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class)); + } + + /** + * restoreFromTAStartEvent -> restoreFromTAFinished (FAILED) + * -> RecoverTransition + */ + @Test + public void testTARecovery_FAILED() { + restoreFromTAStartEvent(); + restoreFromTAFinishedEvent(TaskAttemptState.FAILED); + + ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER)); + assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState()); + verify(mockEventHandler, never()).handle(any(TaskEventTAUpdate.class)); + } + + /** + * restoreFromTAFinishedEvent ( no TAStartEvent before TAFinishedEvent ) + */ + @Test + public void testRecover_FINISH_BUT_NO_START() { + try { + restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED); + fail("Should fail due to no TaskAttemptStartEvent before TaskAttemptFinishedEvent"); + } catch (Throwable e) { + assertEquals("Finished Event seen but" + + " no Started Event was encountered earlier", e.getMessage()); + } + } +}