tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [02/23] git commit: TEZ-997. Internal Errror in am logs during dag shutdown. (hitesh)
Date Fri, 20 Jun 2014 22:35:40 GMT
TEZ-997. Internal Errror in am logs during dag shutdown. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/608c08af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/608c08af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/608c08af

Branch: refs/heads/branch-0.4.1-incubating
Commit: 608c08af2412931041e09f85accb8bb4445f1039
Parents: 958f0a3
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Apr 1 16:57:57 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:44 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 18 +++++++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 32 ++++++++++++++++++++
 3 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608c08af/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index f6b3faa..918a9e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
@@ -1084,6 +1085,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
+      if (taskEvent instanceof TaskEventRecoverTask) {
+        TaskEventRecoverTask taskEventRecoverTask =
+            (TaskEventRecoverTask) taskEvent;
+        if (taskEventRecoverTask.getDesiredState() != null) {
+          // TODO recover attempts if desired state is given?
+          // History may not have all data.
+          switch (taskEventRecoverTask.getDesiredState()) {
+            case SUCCEEDED:
+              return TaskStateInternal.SUCCEEDED;
+            case FAILED:
+              return TaskStateInternal.FAILED;
+            case KILLED:
+              return TaskStateInternal.KILLED;
+          }
+        }
+      }
+
       TaskStateInternal endState = TaskStateInternal.NEW;
       if (task.attempts != null) {
         for (TaskAttempt taskAttempt : task.attempts.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608c08af/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c6e8e3d..975a58e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -65,6 +65,7 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
@@ -1404,6 +1405,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.abortVertex(VertexStatus.State.FAILED);
         return vertex.finished(VertexState.FAILED);
       }
+      else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) {
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to internal error. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
       else {
         //should never occur
         throw new TezUncheckedException("All tasks complete, but cannot determine final state
of vertex"

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608c08af/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 4e790e6..e979eef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -80,6 +80,8 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate.UpdateType;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -127,6 +129,7 @@ public class TestDAGImpl {
   private DAGImpl groupDag;
   private TezDAGID groupDagId;
   private HistoryEventHandler historyEventHandler;
+  private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
 
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
     @Override
@@ -162,6 +165,14 @@ public class TestDAGImpl {
     }
   }
 
+  private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent>
{
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(TaskAttemptEvent event) {
+      // Ignore
+    }
+  }
+
   private class VertexEventDispatcher
       implements EventHandler<VertexEvent> {
 
@@ -627,6 +638,8 @@ public class TestDAGImpl {
     doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
+    taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
     vertexEventDispatcher = new VertexEventDispatcher();
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dagEventDispatcher = new DagEventDispatcher();
@@ -1213,4 +1226,23 @@ public class TestDAGImpl {
 
     Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
   }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testDAGKillInternalError() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    dispatcher.getEventHandler().handle(new DAGEvent(dagId,
+        DAGEventType.INTERNAL_ERROR));
+
+    dispatcher.await();
+    Assert.assertEquals(DAGState.ERROR, dag.getState());
+    Assert.assertEquals(0, dag.getSuccessfulVertices());
+    Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 2)).getTerminationCause(),
+        VertexTerminationCause.INTERNAL_ERROR);
+    Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
+  }
+
 }


Mime
View raw message