tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [02/37] tez git commit: TEZ-2248. VertexImpl/DAGImpl.checkForCompletion have too many termination cause checks (zjffdu)
Date Tue, 28 Apr 2015 20:40:43 GMT
TEZ-2248. VertexImpl/DAGImpl.checkForCompletion have too many termination cause checks (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/778c1f5a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/778c1f5a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/778c1f5a

Branch: refs/heads/TEZ-2003
Commit: 778c1f5afcf6cc2552e180e3d48385a1fade673d
Parents: 0a64ae7
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Fri Apr 24 13:05:18 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Fri Apr 24 13:05:18 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/dag/DAGTerminationCause.java    |  28 +++--
 .../tez/dag/app/dag/VertexTerminationCause.java |  34 +++--
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  66 +---------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 123 ++-----------------
 5 files changed, 57 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 47af758..afb458a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2248. VertexImpl/DAGImpl.checkForCompletion have too many termination cause checks
   TEZ-2341. TestMockDAGAppMaster.testBasicCounters fails on windows
   TEZ-2352. Move getTaskStatistics into the RuntimeTask class.
   TEZ-2357. Tez UI: misc.js.orig is committed by accident

http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index d8ba95d..b6be395 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -25,27 +25,37 @@ package org.apache.tez.dag.app.dag;
 public enum DAGTerminationCause {
 
   /** DAG was directly killed.   */
-  DAG_KILL, 
+  DAG_KILL(DAGState.KILLED),
   
   /** A vertex failed. */ 
-  VERTEX_FAILURE, 
+  VERTEX_FAILURE(DAGState.FAILED),
   
   /** DAG failed due as it had zero vertices. */
-  ZERO_VERTICES, 
+  ZERO_VERTICES(DAGState.FAILED),
   
   /** DAG failed during init. */
-  INIT_FAILURE,
+  INIT_FAILURE(DAGState.FAILED),
   
   /** DAG failed during output commit. */
-  COMMIT_FAILURE,
+  COMMIT_FAILURE(DAGState.FAILED),
 
   /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output
of vertex group */
-  VERTEX_RERUN_AFTER_COMMIT,
+  VERTEX_RERUN_AFTER_COMMIT(DAGState.FAILED),
 
-  VERTEX_RERUN_IN_COMMITTING,
+  VERTEX_RERUN_IN_COMMITTING(DAGState.FAILED),
 
   /** DAG failed while trying to write recovery events */
-  RECOVERY_FAILURE,
+  RECOVERY_FAILURE(DAGState.FAILED),
 
-  INTERNAL_ERROR
+  INTERNAL_ERROR(DAGState.ERROR);
+
+  private DAGState finishedState;
+
+  DAGTerminationCause(DAGState finishedState) {
+    this.finishedState = finishedState;
+  }
+
+  public DAGState getFinishedState() {
+    return finishedState;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index ebece97..28712ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -24,39 +24,49 @@ package org.apache.tez.dag.app.dag;
 public enum VertexTerminationCause {
 
   /** DAG was killed  */
-  DAG_KILL, 
+  DAG_KILL(VertexState.KILLED),
 
   /** Other vertex failed causing DAG to fail thus killing this vertex  */
-  OTHER_VERTEX_FAILURE,
+  OTHER_VERTEX_FAILURE(VertexState.KILLED),
 
   /** Initialization failed for one of the root Inputs */
-  ROOT_INPUT_INIT_FAILURE,
+  ROOT_INPUT_INIT_FAILURE(VertexState.FAILED),
   
   /** This vertex failed as its AM usercode (VertexManager/EdgeManager/InputInitializer)
    * throw Exception
    */
-  AM_USERCODE_FAILURE,
+  AM_USERCODE_FAILURE(VertexState.FAILED),
 
   /** One of the tasks for this vertex failed.  */
-  OWN_TASK_FAILURE, 
+  OWN_TASK_FAILURE(VertexState.FAILED),
 
   /** This vertex failed during commit. */
-  COMMIT_FAILURE,
+  COMMIT_FAILURE(VertexState.FAILED),
 
   /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output
of vertex group */
-  VERTEX_RERUN_AFTER_COMMIT,
+  VERTEX_RERUN_AFTER_COMMIT(VertexState.FAILED),
 
   /** Rerun vertex while it is in committing, it would cause conflict. */
-  VERTEX_RERUN_IN_COMMITTING,
+  VERTEX_RERUN_IN_COMMITTING(VertexState.FAILED),
 
   /** This vertex failed as it had invalid number tasks. */
-  INVALID_NUM_OF_TASKS, 
+  INVALID_NUM_OF_TASKS(VertexState.FAILED),
 
   /** This vertex failed during init. */
-  INIT_FAILURE,
+  INIT_FAILURE(VertexState.FAILED),
   
-  INTERNAL_ERROR,
+  INTERNAL_ERROR(VertexState.ERROR),
   
   /** error when writing recovery log */ 
-  RECOVERY_ERROR,
+  RECOVERY_ERROR(VertexState.FAILED);
+
+  private VertexState finishedState;
+
+  private VertexTerminationCause(VertexState finishedState) {
+    this.finishedState = finishedState;
+  }
+
+  public VertexState getFinishedState() {
+    return finishedState;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 9e55088..f8cd10f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1234,67 +1234,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   private static DAGState finishWithTerminationCause(DAGImpl dag) {
-    if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
-      String diagnosticMsg = "DAG killed due to user-initiated kill." +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      return dag.finished(DAGState.KILLED);
-    }
-    if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
-      String diagnosticMsg = "DAG failed due to vertex failure." +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      return dag.finished(DAGState.FAILED);
-    }
-    if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){
-      String diagnosticMsg = "DAG failed due to commit failure." +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      return dag.finished(DAGState.FAILED);
-    }
-    if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
-      String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      return dag.finished(DAGState.FAILED);
-    }
-    if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING ){
-      String diagnosticMsg = "DAG failed due to vertex rerun in commit." +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      return dag.finished(DAGState.FAILED);
-    }
-    if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
-      String diagnosticMsg = "DAG failed due to failure in recovery handling." +
-          " failedVertices:" + dag.numFailedVertices +
-          " killedVertices:" + dag.numKilledVertices;
-      LOG.info(diagnosticMsg);
-      dag.addDiagnostic(diagnosticMsg);
-      return dag.finished(DAGState.FAILED);
-    }
-
-    // catch all
-    String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
-        + ", numCompletedVertices=" + dag.numCompletedVertices
-        + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
-        + ", numFailedVertices=" + dag.numFailedVertices
-        + ", numKilledVertices=" + dag.numKilledVertices
-        + ", numVertices=" + dag.numVertices
-        + ", commitInProgress=" + dag.commitFutures.size() 
-        + ", terminationCause=" + dag.terminationCause;
-    LOG.error(diagnosticMsg);
+    Preconditions.checkArgument(dag.getTerminationCause() != null, "TerminationCause is not
set.");
+    String diagnosticMsg =  "DAG did not succeed due to " + dag.terminationCause
+        + ". failedVertices:" + dag.numFailedVertices
+        + " killedVertices:" + dag.numKilledVertices;
+    LOG.info(diagnosticMsg);
     dag.addDiagnostic(diagnosticMsg);
-    return dag.finished(DAGState.ERROR);
+    return dag.finished(dag.getTerminationCause().getFinishedState());
   }
 
   private void updateCpuCounters() {

http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e22343b..dfa358d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1948,119 +1948,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  // TODO TEZ-2248
   private static VertexState finishWithTerminationCause(VertexImpl vertex) {
-    if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex killed due to user-initiated job kill. "
-          + "failedTasks:"
-          + vertex.failedTaskCount;
-      LOG.info(diagnosticMsg);
-      vertex.addDiagnostic(diagnosticMsg);
-      return vertex.finished(VertexState.KILLED);
-    }
-    else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex killed as other vertex failed. "
-          + "failedTasks:"
-          + vertex.failedTaskCount;
-      LOG.info(diagnosticMsg);
-      vertex.addDiagnostic(diagnosticMsg);
-      return vertex.finished(VertexState.KILLED);
-    }
-    else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){
-      if(vertex.failedTaskCount == 0){
-        LOG.error("task failure accounting error.  terminationCause=TASK_FAILURE but vertex.failedTaskCount
== 0");
-      }
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed as one or more tasks failed. "
-          + "failedTasks:"
-          + vertex.failedTaskCount;
-      LOG.info(diagnosticMsg);
-      vertex.addDiagnostic(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) {
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed/killed due to internal error. "
-          + "failedTasks:"
-          + vertex.failedTaskCount
-          + " killedTasks:"
-          + vertex.killedTaskCount;
-      LOG.info(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) {
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin
failed. "
-          + "failedTasks:"
-          + vertex.failedTaskCount
-          + " killedTasks:"
-          + vertex.killedTaskCount;
-      LOG.info(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) {
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed.
"
-          + "failedTasks:"
-          + vertex.failedTaskCount
-          + " killedTasks:"
-          + vertex.killedTaskCount;
-      LOG.info(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
-          + "failedTasks:"
-          + vertex.failedTaskCount
-          + " killedTasks:"
-          + vertex.killedTaskCount;
-      LOG.info(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT)
{
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed/killed due to vertex-rerun after commit. "
-          + "failedTasks:"
-          + vertex.failedTaskCount
-          + " killedTasks:"
-          + vertex.killedTaskCount;
-      LOG.info(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING)
{
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed/killed due to vertex-rerun in commiting. "
-          + "failedTasks:"
-          + vertex.failedTaskCount
-          + " killedTasks:"
-          + vertex.killedTaskCount;
-      LOG.info(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else if (vertex.terminationCause == VertexTerminationCause.RECOVERY_ERROR) {
-      vertex.setFinishTime();
-      String diagnosticMsg = "Vertex failed/killed due to recovery error. "
-          + "failedTasks:"
-          + vertex.failedTaskCount
-          + " killedTasks:"
-          + vertex.killedTaskCount;
-      LOG.info(diagnosticMsg);
-      return vertex.finished(VertexState.FAILED);
-    }
-    else {
-      //should never occur
-      throw new TezUncheckedException("All tasks & commits complete, but cannot determine
final state of vertex:"
-          + vertex.logIdentifier
-          + ", failedTaskCount=" + vertex.failedTaskCount
-          + ", killedTaskCount=" + vertex.killedTaskCount
-          + ", successfulTaskCount=" + vertex.succeededTaskCount
-          + ", completedTaskCount=" + vertex.completedTaskCount
-          + ", commitInProgress=" + vertex.commitFutures.size()
-          + ", terminationCause=" + vertex.terminationCause);
-    }
+    Preconditions.checkArgument(vertex.getTerminationCause()!= null, "TerminationCause is
not set");
+    String diagnosticMsg = "Vertex did not succeed due to " + vertex.getTerminationCause()
+        + ", failedTasks:" + vertex.failedTaskCount
+        + " killedTasks:" + vertex.killedTaskCount;
+    LOG.info(diagnosticMsg);
+    vertex.addDiagnostic(diagnosticMsg);
+    return vertex.finished(vertex.getTerminationCause().getFinishedState());
   }
 
   /**
@@ -2133,7 +2028,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         } catch (IOException e) {
           LOG.error("Failed to send vertex finished event to recovery", e);
           finalState = VertexState.FAILED;
-          this.terminationCause = VertexTerminationCause.INTERNAL_ERROR;
+          trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
           eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
               finalState));
         }
@@ -3750,7 +3645,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Received a user code error during recovering, setting recovered"
             + " state to FAILED");
         vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
-        vertex.terminationCause = VertexTerminationCause.AM_USERCODE_FAILURE;
+        vertex.trySetTerminationCause(VertexTerminationCause.AM_USERCODE_FAILURE);
         vertex.recoveredState = VertexState.FAILED;
         return VertexState.RECOVERING;
       } else if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING)
{


Mime
View raw message