tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-107. DAG hangs on any task failure.
Date Wed, 08 May 2013 22:27:04 GMT
Updated Branches:
  refs/heads/TEZ-1 42bbadf85 -> df1038e1a


TEZ-107. DAG hangs on any task failure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/df1038e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/df1038e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/df1038e1

Branch: refs/heads/TEZ-1
Commit: df1038e1ad9922639111cac1b0eac7ebd819f51e
Parents: 42bbadf
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed May 8 15:26:35 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed May 8 15:26:35 2013 -0700

----------------------------------------------------------------------
 tez-dag/src/main/avro/HistoryEvents.avpr           |    6 +-
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |   37 +++++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |   52 +++++++++------
 .../tez/dag/history/events/DAGFinishedEvent.java   |    7 ++-
 .../dag/history/events/VertexFinishedEvent.java    |    6 +-
 5 files changed, 71 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df1038e1/tez-dag/src/main/avro/HistoryEvents.avpr
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/avro/HistoryEvents.avpr b/tez-dag/src/main/avro/HistoryEvents.avpr
index 3653dcf..04333e6 100644
--- a/tez-dag/src/main/avro/HistoryEvents.avpr
+++ b/tez-dag/src/main/avro/HistoryEvents.avpr
@@ -49,7 +49,8 @@
       "fields": [
           {"name": "dagId", "type": "string"},
           {"name": "finishTime", "type": "long"},
-          {"name": "status", "type": "string"}
+          {"name": "status", "type": "string"},
+          {"name": "diagnostics", "type": "string"}
       ]
      },
 
@@ -69,7 +70,8 @@
           {"name": "vertexName", "type": "string"},
           {"name": "vertexId", "type": "string"},
           {"name": "finishTime", "type": "long"},
-          {"name": "status", "type": "string"}
+          {"name": "status", "type": "string"},
+          {"name": "diagnostics", "type": "string"}
       ]
      },
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df1038e1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 2575a43..c37a9e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -97,6 +97,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   EventHandler<DAGEvent> {
 
   private static final Log LOG = LogFactory.getLog(DAGImpl.class);
+  private static final String LINE_SEPARATOR = System
+      .getProperty("line.separator");
 
   //final fields
   private final ApplicationAttemptId applicationAttemptId;
@@ -586,7 +588,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   void logJobHistoryFinishedEvent() {
     this.setFinishTime();
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, finishTime,
-        DAGStatus.State.SUCCEEDED);
+        DAGStatus.State.SUCCEEDED, "");
     this.eventHandler.handle(
         new DAGHistoryEvent(dagId, finishEvt));
   }
@@ -602,7 +604,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(),
-        state);
+        state, StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
     this.eventHandler.handle(
         new DAGHistoryEvent(dagId, finishEvt));
   }
@@ -617,14 +619,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return FileSystem.get(conf);
   }
 
-  static DAGState checkJobCompleteSuccess(DAGImpl job) {
-    // check for Job success
-    if (job.numCompletedVertices == job.vertices.size()) {
-      // TODO: Maybe set cleanup progress. Otherwise job progress will
+  static DAGState checkJobCompleteSuccess(DAGImpl dag) {
+    // check for dag success
+    LOG.info("ZZZZ: Checking dag completion"
+        + ", numCompletedVertices=" + dag.numCompletedVertices
+        + ", numFailedVertices=" + dag.numFailedVertices
+        + ", numKilledVertices=" + dag.numKilledVertices
+        + ", numVertices=" + dag.numVertices);
+
+    if (dag.numCompletedVertices == dag.vertices.size()) {
+      // TODO: Maybe set cleanup progress. Otherwise dag progress will
       // always stay at 0.95 when reported from an AM.
       // TODO DAG committer
-      job.logJobHistoryFinishedEvent();
-      return job.finished(DAGState.SUCCEEDED);
+      dag.logJobHistoryFinishedEvent();
+      return dag.finished(DAGState.SUCCEEDED);
     }
     return null;
   }
@@ -1008,9 +1016,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     @Override
     public DAGState transition(DAGImpl job, DAGEvent event) {
-      job.numCompletedVertices++;
-      LOG.info("Num completed vertices: " + job.numCompletedVertices);
+
       DAGEventVertexCompleted vertexEvent = (DAGEventVertexCompleted) event;
+      LOG.info("DEBUG: Received a vertex completion event"
+          + ", vertexId=" + vertexEvent.getVertexId()
+          + ", vertexState=" + vertexEvent.getVertexState());
+
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
         vertexSucceeded(job, vertex);
@@ -1019,7 +1030,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       } else if (vertexEvent.getVertexState() == VertexState.KILLED) {
         vertexKilled(job, vertex);
       }
-      
+
+      LOG.info("ZZZZ: Num completed vertices: " + job.numCompletedVertices
+          + ", Num failed vertices: " + job.numFailedVertices
+          + ", Num killed vertices: " + job.numKilledVertices);
+
       job.dagScheduler.vertexCompleted(vertex);
 
       return checkJobForCompletion(job);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df1038e1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8200f02..dd621fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -75,6 +75,9 @@ import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -105,6 +108,8 @@ import org.apache.tez.engine.records.TezVertexID;
 public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   EventHandler<VertexEvent>, VertexContext {
 
+  private static final String LINE_SEPARATOR = System
+      .getProperty("line.separator");
   private static final TezDependentTaskCompletionEvent[]
     EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TezDependentTaskCompletionEvent[0];
 
@@ -618,13 +623,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   void logJobHistoryVertexFinishedEvent() {
     this.setFinishTime();
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
-        vertexName, finishTime, VertexStatus.State.SUCCEEDED);
+        vertexName, finishTime, VertexStatus.State.SUCCEEDED, "");
     this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
   void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
-        vertexName, clock.getTime(), state);
+        vertexName, clock.getTime(), state,
+        StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
     this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
@@ -640,7 +646,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   static VertexState checkVertexForCompletion(VertexImpl vertex) {
     //check for vertex failure first
-    if (vertex.failedTaskCount > 1) {
+
+    LOG.info("ZZZZ: checking for vertex completion"
+        + ", failedTaskCount=" + vertex.failedTaskCount
+        + ", killedTaskCount=" + vertex.killedTaskCount
+        + ", successfulTaskCount=" + vertex.succeededTaskCount
+        + ", completedTaskCount=" + vertex.completedTaskCount);
+
+    if (vertex.failedTaskCount > 0) {
       vertex.setFinishTime();
       String diagnosticMsg = "Vertex failed as tasks failed. "
           + "failedTasks:"
@@ -648,8 +661,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       LOG.info(diagnosticMsg);
       vertex.addDiagnostic(diagnosticMsg);
       vertex.abortVertex(VertexStatus.State.FAILED);
-      vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
-          .getVertexId(), VertexState.FAILED));
       return vertex.finished(VertexState.FAILED);
     }
     
@@ -661,12 +672,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
       } catch (IOException e) {
         LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
-        vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
-            .getVertexId(), VertexState.FAILED));
         return vertex.finished(VertexState.FAILED);
       }
-      vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
-          .getVertexId(), vertex.getState()));
       return vertex.finished(VertexState.SUCCEEDED);      
     }
     
@@ -683,29 +690,30 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   VertexState finished(VertexState finalState) {
-    if (getInternalState() == VertexState.RUNNING) {
-      // TODO: Metrics
-      // metrics.endRunningJob(this);
-    }
     if (finishTime == 0) setFinishTime();
 
     switch (finalState) {
       case KILLED:
       case KILL_WAIT:
-        // TODO: Metrics
-        //metrics.killedJob(this);
+        eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+            finalState));
         logJobHistoryVertexFailedEvent(State.KILLED);
         break;
       case ERROR:
+        eventHandler.handle(new DAGEvent(getDAGId(),
+            DAGEventType.INTERNAL_ERROR));
+        logJobHistoryVertexFailedEvent(State.FAILED);
+        break;
       case FAILED:
-        // TODO: Metrics
-        //metrics.failedJob(this);
+        eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+            finalState));
         logJobHistoryVertexFailedEvent(State.FAILED);
         break;
       case SUCCEEDED:
+        eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+            finalState));
         logJobHistoryVertexFinishedEvent();
-        // TODO: Metrics
-        //metrics.completedJob(this);
+        break;
     }
     return finalState;
   }
@@ -1146,7 +1154,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
-      //TODO Is this JH event required.
+      LOG.error("Invalid event " + event.getType() + " on Vertex "
+          + vertex.getVertexId());
+      vertex.eventHandler.handle(new DAGEventDiagnosticsUpdate(
+          vertex.getDAGId(), "Invalid event " + event.getType()
+          + " on Vertex " + vertex.getVertexId()));
       vertex.setFinishTime();
       vertex.finished(VertexState.ERROR);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df1038e1/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 2ad2a8a..547a09a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -29,10 +29,12 @@ public class DAGFinishedEvent implements HistoryEvent {
   private DAGFinished datum = new DAGFinished();
 
   public DAGFinishedEvent(TezDAGID dagId,
-      long finishTime, DAGStatus.State state) {
+      long finishTime, DAGStatus.State state,
+      String diagnostics) {
     datum.dagId = dagId.toString();
     datum.finishTime = finishTime;
     datum.status = state.name();
+    datum.diagnostics = diagnostics;
   }
 
   @Override
@@ -55,6 +57,7 @@ public class DAGFinishedEvent implements HistoryEvent {
   public String toString() {
     return "dagId=" + datum.dagId
         + ", finishTime=" + datum.finishTime
-        + ", status=" + datum.status;
+        + ", status=" + datum.status
+        + ", diagnostics=" + datum.diagnostics;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df1038e1/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index cd941e1..b6d4d0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -30,11 +30,12 @@ public class VertexFinishedEvent implements HistoryEvent {
 
   public VertexFinishedEvent(TezVertexID vertexId,
       String vertexName, long finishTime,
-      VertexStatus.State state) {
+      VertexStatus.State state, String diagnostics) {
     datum.vertexName = vertexName;
     datum.vertexId = vertexId.toString();
     datum.finishTime = finishTime;
     datum.status = state.name();
+    datum.diagnostics = diagnostics;
   }
 
   @Override
@@ -58,6 +59,7 @@ public class VertexFinishedEvent implements HistoryEvent {
     return "vertexName=" + datum.vertexName
         + ", vertexId=" + datum.vertexId
         + ", finishTime=" + datum.finishTime
-        + ", status=" + datum.status;
+        + ", status=" + datum.status
+        + ", diagnostics=" + datum.diagnostics;
   }
 }


Mime
View raw message