tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kshu...@apache.org
Subject tez git commit: TEZ-3857. Tez TaskImpl can throw Invalid state transition for leaf tasks that do Retro Active Transition (kshukla)
Date Wed, 01 Nov 2017 16:31:43 GMT
Repository: tez
Updated Branches:
  refs/heads/master 3fb57c869 -> a51af593f


TEZ-3857. Tez TaskImpl can throw Invalid state transition for leaf tasks that do Retro Active
Transition (kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a51af593
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a51af593
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a51af593

Branch: refs/heads/master
Commit: a51af593f3cf9cf42120cb741a2c69eb83a08768
Parents: 3fb57c8
Author: Kuhu Shukla <kshukla@yahoo-inc.com>
Authored: Wed Nov 1 11:26:54 2017 -0500
Committer: Kuhu Shukla <kshukla@yahoo-inc.com>
Committed: Wed Nov 1 11:26:54 2017 -0500

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 13 +++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 56 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a51af593/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index bed4141..99cb2e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1248,12 +1248,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      if (task.leafVertex) {
-        LOG.error("Unexpected event for task of leaf vertex " + event.getType() + ", taskId:
"
-            + task.getTaskId());
-        task.internalError(event.getType());
-      }
-
       TaskEventTAFailed castEvent = (TaskEventTAFailed) event;
       TezTaskAttemptID failedAttemptId = castEvent.getTaskAttemptID();
       TaskAttempt failedAttempt = task.getAttempt(failedAttemptId);
@@ -1277,7 +1271,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
         task.taskAttemptStatus.put(failedAttemptId.getId(), true);
         return TaskStateInternal.SUCCEEDED;
       }
-      
+
+      if (task.leafVertex) {
+        LOG.error("Unexpected event for task of leaf vertex " + event.getType() + ", taskId:
"
+            + task.getTaskId());
+        task.internalError(event.getType());
+      }
       Preconditions.checkState(castEvent.getCausalEvent() != null);
       TaskAttemptEventOutputFailed destinationEvent = 
           (TaskAttemptEventOutputFailed) castEvent.getCausalEvent();

http://git-wip-us.apache.org/repos/asf/tez/blob/a51af593/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index e03e282..d13e654 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -1067,6 +1067,62 @@ public class TestTaskImpl {
     assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test (timeout = 10000L)
+  public void testSucceededLeafTaskWithRetroFailures() throws InterruptedException {
+    Configuration newConf = new Configuration(conf);
+    newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
+    mockTask = new MockTaskImpl(vertexId, partition,
+        eventHandler, conf, taskCommunicatorManagerInterface, clock,
+        taskHeartbeatHandler, appContext, true,
+        taskResource, containerContext, vertex);
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
+    launchTaskAttempt(firstMockTaskAttempt.getID());
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
+    launchTaskAttempt(secondMockTaskAttempt.getID());
+
+    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
+    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
+
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
+            TaskAttemptEventType.TA_DONE));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
+            TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
+            TaskAttemptTerminationCause.CONTAINER_EXITED));
+
+    mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID()));
+    firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
+        firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
+
+    InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 0);
+    TezTaskAttemptID mockDestId = firstMockTaskAttempt.getID();
+    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge",
mockDestId);
+    TezEvent tzEvent = new TezEvent(mockReEvent, meta);
+    TaskAttemptEventOutputFailed outputFailedEvent =
+        new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1);
+    firstMockTaskAttempt.handle(outputFailedEvent);
+    mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
+        mock(TaskAttemptEvent.class)));
+    Assert.assertEquals(mockTask.getInternalState(), TaskStateInternal.SUCCEEDED);
+  }
+
   private void failAttempt(MockTaskAttemptImpl taskAttempt, int index, int expectedIncompleteAttempts)
{
     InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, index);
     TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);


Mime
View raw message