tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery (zjffdu)
Date Wed, 03 Jun 2015 01:10:45 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 6f53b8e3d -> 222b241f2


TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery (zjffdu)

(cherry picked from commit 44a6bb686b08326b45c232fb7466b43c574fe7c1)

Conflicts:
	tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
	tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
	tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/222b241f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/222b241f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/222b241f

Branch: refs/heads/branch-0.5
Commit: 222b241f201f29328c4f9cf3bb34a5ea9cd9be8d
Parents: 6f53b8e
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Wed Jun 3 08:37:39 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Wed Jun 3 09:10:31 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 17 +-----
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 43 ++++----------
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 59 +++++++++++++++++++-
 .../app/dag/impl/TestTaskAttemptRecovery.java   | 14 ++---
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  8 +++
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 10 ++++
 7 files changed, 96 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/222b241f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 79190d0..98e426b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery
   TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes.
   TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
   TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning
fix from master.

http://git-wip-us.apache.org/repos/asf/tez/blob/222b241f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 368279e..297f66c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -796,10 +796,6 @@ public class TaskAttemptImpl implements TaskAttempt,
         }
         case TASK_ATTEMPT_FINISHED:
         {
-          if (!recoveryStartEventSeen) {
-            throw new RuntimeException("Finished Event seen but"
-                + " no Started Event was encountered earlier");
-          }
           TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
           this.finishTime = tEvent.getFinishTime();
           this.reportedStatus.counters = tEvent.getCounters();
@@ -1151,17 +1147,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
           helper.getTaskAttemptState()));
-      if (ta.getLaunchTime() != 0) {
-        // TODO For cases like this, recovery goes for a toss, since the the
-        // attempt will not exist in the history file.
-        ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
-            .getTaskAttemptState());
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Not generating HistoryFinish event since start event not "
-              + "generated for taskAttempt: " + ta.getID());
-        }
-      }
+      ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
+          .getTaskAttemptState());
       // Send out events to the Task - indicating TaskAttemptTermination(F/K)
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
           .getTaskEventType()));

http://git-wip-us.apache.org/repos/asf/tez/blob/222b241f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 6a1136c..7989bc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -133,9 +133,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   // track the status of TaskAttempt (true mean completed, false mean uncompleted)
   private final Map<Integer, Boolean> taskAttemptStatus = new HashMap<Integer,Boolean>();
 
-  private boolean historyTaskStartGenerated = false;
-
-  private static final SingleArcTransition<TaskImpl, TaskEvent>
+  private static final SingleArcTransition<TaskImpl , TaskEvent>
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      KILL_TRANSITION = new KillTransition();
@@ -568,8 +566,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             LOG.debug("Adding restored attempt into known attempts map"
                 + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
           }
-          this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
-              recoveredAttempt);
+          Preconditions.checkArgument(this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
+              recoveredAttempt) == null, taskAttemptStartedEvent.getTaskAttemptID() + " already
existed.");
           this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(),
false);
           this.recoveredState = TaskState.RUNNING;
           return recoveredState;
@@ -842,11 +840,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
             = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(maxFailedAttempts);
         newAttempts.putAll(attempts);
         attempts = newAttempts;
-        attempts.put(attempt.getID(), attempt);
+        Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
+            attempt.getID() + " already existed");
         break;
 
       default:
-        attempts.put(attempt.getID(), attempt);
+        Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
+            attempt.getID() + " already existed");
         break;
     }
 
@@ -1051,7 +1051,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       task.addAndScheduleAttempt();
       task.scheduledTime = task.clock.getTime();
       task.logJobHistoryTaskStartedEvent();
-      task.historyTaskStartGenerated = true;
     }
   }
 
@@ -1124,9 +1123,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       task.eventHandler.handle(new VertexEventTaskCompleted(
           task.taskId, TaskState.SUCCEEDED));
       LOG.info("Task succeeded with attempt " + task.successfulAttempt);
-      if (task.historyTaskStartGenerated) {
-        task.logJobHistoryTaskFinishedEvent();
-      }
+      task.logJobHistoryTaskFinishedEvent();
 
       // issue kill to all other attempts
       for (TaskAttempt attempt : task.attempts.values()) {
@@ -1316,13 +1313,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);
       // check whether all attempts are finished
       if (task.getFinishedAttemptsCount() == task.attempts.size()) {
-        if (task.historyTaskStartGenerated) {
-          task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
-        } else {
-          LOG.debug("Not generating HistoryFinish event since start event not" +
-          		" generated for task: " + task.getTaskId());
-        }
-
+        task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
         task.eventHandler.handle(
             new VertexEventTaskCompleted(
                 task.taskId, getExternalState(TaskStateInternal.KILLED)));
@@ -1374,13 +1365,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
         task.handleTaskAttemptCompletion(
             ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TaskAttemptStateInternal.FAILED);
-
-        if (task.historyTaskStartGenerated) {
-          task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
-        } else {
-          LOG.debug("Not generating HistoryFinish event since start event not" +
-          		" generated for task: " + task.getTaskId());
-        }
+        task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
         task.eventHandler.handle(
             new VertexEventTaskCompleted(task.taskId, TaskState.FAILED));
         return task.finished(TaskStateInternal.FAILED);
@@ -1493,13 +1478,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
     public void transition(TaskImpl task, TaskEvent event) {
       TaskEventTermination terminateEvent = (TaskEventTermination)event;
       task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
-      if (task.historyTaskStartGenerated) {
-        task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
-      } else {
-        LOG.debug("Not generating HistoryFinish event since start event not" +
-        		" generated for task: " + task.getTaskId());
-      }
-
+      task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
       task.eventHandler.handle(
           new VertexEventTaskCompleted(task.taskId, TaskState.KILLED));
       // TODO Metrics

http://git-wip-us.apache.org/repos/asf/tez/blob/222b241f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index c98e3de..2ff11e9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -921,6 +921,58 @@ public class TestTaskAttempt {
         arg.capture());
   }
 
+  @SuppressWarnings("deprecation")
+  @Test(timeout = 5000)
+  public void testKilledInNew() {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, locationHint, false,
+        resource, createFakeContainerContext(), true);
+    Assert.assertEquals(TaskAttemptStateInternal.NEW, taImpl.getInternalState());
+    taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it"));
+    Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+
+    Assert.assertEquals(0, taImpl.taskAttemptStartedEventLogged);
+    Assert.assertEquals(1, taImpl.taskAttemptFinishedEventLogged);
+  }
+
   private void verifyEventType(List<Event> events,
       Class<? extends Event> eventClass, int expectedOccurences) {
     int count = 0;
@@ -949,7 +1001,10 @@ public class TestTaskAttempt {
   };
 
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
+
     TaskLocationHint locationHint;
+    public int taskAttemptStartedEventLogged = 0;
+    public int taskAttemptFinishedEventLogged = 0;
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
         EventHandler eventHandler, TaskAttemptListener tal,
@@ -984,17 +1039,19 @@ public class TestTaskAttempt {
 
     @Override
     protected void logJobHistoryAttemptStarted() {
+      taskAttemptStartedEventLogged++;
     }
 
     @Override
     protected void logJobHistoryAttemptFinishedEvent(
         TaskAttemptStateInternal state) {
-
+      taskAttemptFinishedEventLogged++;
     }
 
     @Override
     protected void logJobHistoryAttemptUnsuccesfulCompletion(
         TaskAttemptState state) {
+      taskAttemptFinishedEventLogged++;
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/222b241f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index f5ee94f..1391361 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -271,16 +271,14 @@ public class TestTaskAttemptRecovery {
   }
 
   /**
-   * restoreFromTAFinishedEvent ( no TAStartEvent before TAFinishedEvent )
+   * restoreFromTAFinishedEvent ( killed before started)
    */
   @Test
   public void testRecover_FINISH_BUT_NO_START() {
-    try {
-      restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
-      fail("Should fail due to no TaskAttemptStartEvent before TaskAttemptFinishedEvent");
-    } catch (Throwable e) {
-      assertEquals("Finished Event seen but"
-          + " no Started Event was encountered earlier", e.getMessage());
-    }
+    TaskAttemptState recoveredState =
+        ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+            startTime, finishTime, TaskAttemptState.KILLED,
+            "", new TezCounters()));
+    assertEquals(TaskAttemptState.KILLED, recoveredState);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/222b241f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 434107f..ab7c87b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -629,6 +629,8 @@ public class TestTaskImpl {
     mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
     assertEquals(1, mockTask.getDiagnostics().size());
     assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.DAG_KILL.name()));
+    assertEquals(0, mockTask.taskStartedEventLogged);
+    assertEquals(1, mockTask.taskFinishedEventLogged);
   }
   
   @Test
@@ -669,6 +671,9 @@ public class TestTaskImpl {
   @SuppressWarnings("rawtypes")
   private class MockTaskImpl extends TaskImpl {
 
+    public int taskStartedEventLogged = 0;
+    public int taskFinishedEventLogged = 0;
+
     private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
     private Vertex vertex;
     TaskLocationHint locationHint;
@@ -716,12 +721,15 @@ public class TestTaskImpl {
     }
 
     protected void logJobHistoryTaskStartedEvent() {
+      taskStartedEventLogged++;
     }
 
     protected void logJobHistoryTaskFinishedEvent() {
+      taskFinishedEventLogged++;
     }
 
     protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
+      taskFinishedEventLogged++;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/222b241f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index afc3433..e66f2cd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -247,6 +247,16 @@ public class TestTaskRecovery {
   }
 
   /**
+   * -> restoreFromTaskFinishEvent ( no TaskStartEvent )
+   */
+  @Test(timeout = 5000)
+  public void testRecoveryNewToKilled_NoStartEvent() {
+    task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName,
+        taskStartTime, taskFinishTime, null, TaskState.KILLED, "",
+        new TezCounters()));
+  }
+
+  /**
    * restoreFromTaskStartedEvent -> RecoverTransition
    */
   @Test


Mime
View raw message