tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [08/23] git commit: TEZ-1020. VertexImpl handling of task failed in SUCCEEDED state is incorrect. (bikas)
Date Fri, 20 Jun 2014 22:35:46 GMT
TEZ-1020. VertexImpl handling of task failed in SUCCEEDED state is incorrect. (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/eb15f0ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/eb15f0ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/eb15f0ec

Branch: refs/heads/branch-0.4.1-incubating
Commit: eb15f0ec0a97932b28b6b4e022b50bdfd2689799
Parents: ca5d7d4
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Apr 4 18:02:26 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:45 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 51 ++++++++++++++++----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 34 +++++++++++--
 2 files changed, 71 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb15f0ec/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8e1f04b..a282042 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -412,6 +412,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               // accumulate these in case we get restarted
               VertexEventType.V_ROUTE_EVENT,
               ROUTE_EVENT_TRANSITION)
+          .addTransition(
+              VertexState.SUCCEEDED, 
+              EnumSet.of(VertexState.FAILED, VertexState.ERROR),
+              VertexEventType.V_TASK_COMPLETED,
+              new TaskCompletedAfterVertexSuccessTransition())
           .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
@@ -419,8 +424,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   // us. These reruns may be triggered by other consumer vertices.
                   // We should have been in RUNNING state if we had triggered the
                   // reruns.
-                  VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_TASK_COMPLETED))
+                  VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED))
           .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
               VertexEventType.V_TASK_ATTEMPT_COMPLETED,
               new TaskAttemptCompletedEventTransition())
@@ -2882,8 +2886,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.succeededTaskCount--;
     }
   }
-
-  static class VertexNoTasksCompletedTransition implements
+  
+  private static class VertexNoTasksCompletedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
     @Override
@@ -2891,6 +2895,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       return VertexImpl.checkVertexForCompletion(vertex);
     }
   }
+  
+  private static class TaskCompletedAfterVertexSuccessTransition implements
+    MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventTaskCompleted vEvent = (VertexEventTaskCompleted) event;
+      VertexState finalState;
+      VertexStatus.State finalStatus;
+      String diagnosticMsg;
+      if (vEvent.getState() == TaskState.FAILED) {
+        finalState = VertexState.FAILED;
+        finalStatus = VertexStatus.State.FAILED;
+        diagnosticMsg = "Vertex " + vertex.logIdentifier +" failed as task " + vEvent.getTaskID()
+ 
+          " failed after vertex succeeded.";
+      } else {
+        finalState = VertexState.ERROR;
+        finalStatus = VertexStatus.State.ERROR;
+        diagnosticMsg = "Vertex " + vertex.logIdentifier + " error as task " + vEvent.getTaskID()
+ 
+            " completed with state " + vEvent.getState() + " after vertex succeeded.";
+      }
+      LOG.info(diagnosticMsg);
+      vertex.addDiagnostic(diagnosticMsg);
+      vertex.abortVertex(finalStatus);
+      vertex.finished(finalState, VertexTerminationCause.OWN_TASK_FAILURE);
+      return finalState;
+    }
+  }
 
   private static class TaskRescheduledAfterVertexSuccessTransition implements
     MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -2908,15 +2939,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         return VertexState.RUNNING;
       }
 
-      LOG.info(vertex.getVertexId() + " failed due to post-commit rescheduling of "
-          + ((VertexEventTaskReschedule)event).getTaskID());
       // terminate any running tasks
+      String diagnosticMsg = vertex.getVertexId() + " failed due to post-commit rescheduling
of "
+          + ((VertexEventTaskReschedule)event).getTaskID();
+      LOG.info(diagnosticMsg);
       vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE,
           TaskTerminationCause.OWN_TASK_FAILURE);
-      // since the DAG thinks this vertex is completed it must be notified of
-      // an error
-      vertex.eventHandler.handle(new DAGEvent(vertex.getDAGId(),
-          DAGEventType.INTERNAL_ERROR));
+      vertex.addDiagnostic(diagnosticMsg);
+      vertex.abortVertex(VertexStatus.State.FAILED);
+      vertex.finished(VertexState.FAILED, VertexTerminationCause.OWN_TASK_FAILURE);
       return VertexState.FAILED;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/eb15f0ec/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 01a25f6..aa829df 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -1933,9 +1933,9 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
 
-    Assert.assertEquals(1,
+    Assert.assertEquals(2,
         dagEventDispatcher.eventCount.get(
-            DAGEventType.INTERNAL_ERROR).intValue());
+            DAGEventType.DAG_VERTEX_COMPLETED).intValue());
   }
 
   @SuppressWarnings("unchecked")
@@ -1955,13 +1955,31 @@ public class TestVertexImpl {
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.getEventHandler().handle(
-        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
-    dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
+    Assert.assertEquals(0, committer.abortCounter);
+    Assert.assertEquals(1, committer.initCounter);
+    Assert.assertEquals(1, committer.setupCounter);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testTaskFailedAfterVertexSuccess() {
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v = vertices.get("vertex6");
 
+    startVertex(v);
+    CountingOutputCommitter committer =
+        (CountingOutputCommitter) v.getOutputCommitter("outputx");
+
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
@@ -1970,6 +1988,14 @@ public class TestVertexImpl {
     Assert.assertEquals(0, committer.abortCounter);
     Assert.assertEquals(1, committer.initCounter);
     Assert.assertEquals(1, committer.setupCounter);
+    
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.FAILED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(1, committer.commitCounter);
+    Assert.assertEquals(1, committer.abortCounter);
+    
   }
 
   @Test(timeout = 5000)


Mime
View raw message