tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED (zjffdu)
Date Fri, 13 Feb 2015 03:16:05 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 78cd5222f -> dc2c8dca9


TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED (zjffdu)

(cherry picked from commit c266289cde6693e8a586e3c4b2bdffbd6b98b9ca)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dc2c8dca
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dc2c8dca
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dc2c8dca

Branch: refs/heads/branch-0.6
Commit: dc2c8dca9effba8a1cf58760c08435f4f6f30535
Parents: 78cd522
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Fri Feb 13 10:01:33 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Fri Feb 13 11:15:44 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  1 +
 .../app/dag/impl/TestTaskAttemptRecovery.java   | 66 +++++++++++++++++++-
 3 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/dc2c8dca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0624fdf..fd16976 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -142,7 +142,8 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
-  TEZ-2071. TestAMRecovery should set test names for test DAGs
+  TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED.
+  TEZ-2071. TestAMRecovery should set test names for test DAGs.
   TEZ-1928. Tez local mode hang in Pig tez local mode.
   TEZ-1893. Verify invalid -1 parallelism in DAG.verify().
   TEZ-900. Confusing message for incorrect queue for some tez examples.

http://git-wip-us.apache.org/repos/asf/tez/blob/dc2c8dca/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1c8fb8d..14f008a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1391,6 +1391,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskEventType.T_ATTEMPT_KILLED));
           taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt,
               getExternalState(TaskAttemptStateInternal.KILLED)));
+          taskAttempt.logJobHistoryAttemptUnsuccesfulCompletion(TaskAttemptState.KILLED);
           endState = TaskAttemptStateInternal.KILLED;
           break;
         case SUCCEEDED:

http://git-wip-us.apache.org/repos/asf/tez/blob/dc2c8dca/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index e5fcd72..9d0e121 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -20,10 +20,15 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,16 +44,22 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -61,11 +72,59 @@ public class TestTaskAttemptRecovery {
   private long startTime = System.currentTimeMillis();
   private long finishTime = startTime + 5000;
 
-  private TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
+  private TezTaskAttemptID taId;
   private String vertexName = "v1";
 
+  private AppContext mockAppContext;
+  private MockHistoryEventHandler mockHistoryEventHandler;
+  private Task mockTask;
+  private Vertex mockVertex;
+
+  public static class MockHistoryEventHandler extends HistoryEventHandler {
+
+    private List<DAGHistoryEvent> events;
+
+    public MockHistoryEventHandler(AppContext context) {
+      super(context);
+      events = new ArrayList<DAGHistoryEvent>();
+    }
+
+    @Override
+    public void handle(DAGHistoryEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
+      events.add(event);
+    }
+
+    void verfiyTaskAttemptFinishedEvent(TezTaskAttemptID taId, TaskAttemptState finalState,
int expectedTimes) {
+      int actualTimes = 0;
+      for (DAGHistoryEvent event : events) {
+        if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED)
{
+          TaskAttemptFinishedEvent tfEvent = (TaskAttemptFinishedEvent)event.getHistoryEvent();
+          if (tfEvent.getTaskAttemptID().equals(taId) &&
+              tfEvent.getState().equals(finalState)) {
+            actualTimes ++;
+          }
+        }
+      }
+      assertEquals(expectedTimes, actualTimes);
+    }
+  }
+
   @Before
   public void setUp() {
+    mockTask = mock(Task.class);
+    mockVertex = mock(Vertex.class);
+    when(mockTask.getVertex()).thenReturn(mockVertex);
+    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class))
+      .getTask(any(TezTaskID.class)))
+      .thenReturn(mockTask);
+    mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
+    when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
     mockEventHandler = mock(EventHandler.class);
     TezTaskID taskId =
         TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
@@ -73,8 +132,9 @@ public class TestTaskAttemptRecovery {
         new TaskAttemptImpl(taskId, 0, mockEventHandler,
             mock(TaskAttemptListener.class), new Configuration(),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
-            mock(AppContext.class), false, Resource.newInstance(1, 1),
+            mockAppContext, false, Resource.newInstance(1, 1),
             mock(ContainerContext.class), false);
+    taId = ta.getID();
   }
 
   private void restoreFromTAStartEvent() {
@@ -157,6 +217,8 @@ public class TestTaskAttemptRecovery {
     verifyEvents(events, TaskEventTAUpdate.class, 1);
     // one for task launch, one for task killed
     verifyEvents(events, DAGEventCounterUpdate.class, 2);
+
+    mockHistoryEventHandler.verfiyTaskAttemptFinishedEvent(taId, TaskAttemptState.KILLED,
1);
   }
 
   /**


Mime
View raw message