Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2296017E00 for ; Fri, 13 Feb 2015 03:16:15 +0000 (UTC) Received: (qmail 99605 invoked by uid 500); 13 Feb 2015 03:16:05 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 99564 invoked by uid 500); 13 Feb 2015 03:16:05 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 99555 invoked by uid 99); 13 Feb 2015 03:16:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Feb 2015 03:16:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 54D1EE01BD; Fri, 13 Feb 2015 03:16:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Message-Id: <00f3fe931d294ac1815222450ac3b087@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED (zjffdu) Date: Fri, 13 Feb 2015 03:16:05 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.6 78cd5222f -> dc2c8dca9 TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED (zjffdu) (cherry picked from commit c266289cde6693e8a586e3c4b2bdffbd6b98b9ca) Conflicts: CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dc2c8dca Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dc2c8dca Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dc2c8dca Branch: refs/heads/branch-0.6 Commit: dc2c8dca9effba8a1cf58760c08435f4f6f30535 Parents: 78cd522 Author: Jeff Zhang Authored: Fri Feb 13 10:01:33 2015 +0800 Committer: Jeff Zhang Committed: Fri Feb 13 11:15:44 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 1 + .../app/dag/impl/TestTaskAttemptRecovery.java | 66 +++++++++++++++++++- 3 files changed, 67 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/dc2c8dca/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0624fdf..fd16976 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -142,7 +142,8 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: - TEZ-2071. TestAMRecovery should set test names for test DAGs + TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED. + TEZ-2071. TestAMRecovery should set test names for test DAGs. TEZ-1928. Tez local mode hang in Pig tez local mode. TEZ-1893. Verify invalid -1 parallelism in DAG.verify(). TEZ-900. Confusing message for incorrect queue for some tez examples. http://git-wip-us.apache.org/repos/asf/tez/blob/dc2c8dca/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 1c8fb8d..14f008a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1391,6 +1391,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskEventType.T_ATTEMPT_KILLED)); taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt, getExternalState(TaskAttemptStateInternal.KILLED))); + taskAttempt.logJobHistoryAttemptUnsuccesfulCompletion(TaskAttemptState.KILLED); endState = TaskAttemptStateInternal.KILLED; break; case SUCCEEDED: http://git-wip-us.apache.org/repos/asf/tez/blob/dc2c8dca/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index e5fcd72..9d0e121 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -20,10 +20,15 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -39,16 +44,22 @@ import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventHandler; +import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -61,11 +72,59 @@ public class TestTaskAttemptRecovery { private long startTime = System.currentTimeMillis(); private long finishTime = startTime + 5000; - private TezTaskAttemptID taId = mock(TezTaskAttemptID.class); + private TezTaskAttemptID taId; private String vertexName = "v1"; + private AppContext mockAppContext; + private MockHistoryEventHandler mockHistoryEventHandler; + private Task mockTask; + private Vertex mockVertex; + + public static class MockHistoryEventHandler extends HistoryEventHandler { + + private List events; + + public MockHistoryEventHandler(AppContext context) { + super(context); + events = new ArrayList(); + } + + @Override + public void handle(DAGHistoryEvent event) { + events.add(event); + } + + @Override + public void handleCriticalEvent(DAGHistoryEvent event) throws IOException { + events.add(event); + } + + void verfiyTaskAttemptFinishedEvent(TezTaskAttemptID taId, TaskAttemptState finalState, int expectedTimes) { + int actualTimes = 0; + for (DAGHistoryEvent event : events) { + if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) { + TaskAttemptFinishedEvent tfEvent = (TaskAttemptFinishedEvent)event.getHistoryEvent(); + if (tfEvent.getTaskAttemptID().equals(taId) && + tfEvent.getState().equals(finalState)) { + actualTimes ++; + } + } + } + assertEquals(expectedTimes, actualTimes); + } + } + @Before public void setUp() { + mockTask = mock(Task.class); + mockVertex = mock(Vertex.class); + when(mockTask.getVertex()).thenReturn(mockVertex); + mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class)) + .getTask(any(TezTaskID.class))) + .thenReturn(mockTask); + mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext); + when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler); mockEventHandler = mock(EventHandler.class); TezTaskID taskId = TezTaskID.fromString("task_1407371892933_0001_1_00_000000"); @@ -73,8 +132,9 @@ public class TestTaskAttemptRecovery { new TaskAttemptImpl(taskId, 0, mockEventHandler, mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), - mock(AppContext.class), false, Resource.newInstance(1, 1), + mockAppContext, false, Resource.newInstance(1, 1), mock(ContainerContext.class), false); + taId = ta.getID(); } private void restoreFromTAStartEvent() { @@ -157,6 +217,8 @@ public class TestTaskAttemptRecovery { verifyEvents(events, TaskEventTAUpdate.class, 1); // one for task launch, one for task killed verifyEvents(events, DAGEventCounterUpdate.class, 2); + + mockHistoryEventHandler.verfiyTaskAttemptFinishedEvent(taId, TaskAttemptState.KILLED, 1); } /**