tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)
Date Fri, 06 Feb 2015 02:05:09 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 363249d64 -> c0b4b8a82


TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)

(cherry picked from commit cfa637a16fa01b197c0310e03ef4a6e19883aaf1)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c0b4b8a8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c0b4b8a8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c0b4b8a8

Branch: refs/heads/branch-0.5
Commit: c0b4b8a8225663e82a540f86c872a1623f11ebd0
Parents: 363249d
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Mon Feb 2 10:45:32 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Fri Feb 6 10:04:37 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/DAGTerminationCause.java    |  3 ++
 .../tez/dag/app/dag/VertexTerminationCause.java |  3 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 25 ++++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 +++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 53 +++++++++++++++++++-
 6 files changed, 101 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c0b4b8a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a9e06d..bc7e09e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
   TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly
   TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
   before sending notification

http://git-wip-us.apache.org/repos/asf/tez/blob/c0b4b8a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index d01fb2f..5ae96a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -39,6 +39,9 @@ public enum DAGTerminationCause {
   /** DAG failed during output commit. */
   COMMIT_FAILURE,
 
+  /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output
of vertex group */
+  VERTEX_RERUN_AFTER_COMMIT,
+
   /** DAG failed while trying to write recovery events */
   RECOVERY_FAILURE,
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c0b4b8a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 4bfe001..2eeae3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -43,6 +43,9 @@ public enum VertexTerminationCause {
   /** This vertex failed during commit. */
   COMMIT_FAILURE,
 
+  /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output
of vertex group */
+  VERTEX_RERUN_AFTER_COMMIT,
+
   /** This vertex failed as it had invalid number tasks. */
   INVALID_NUM_OF_TASKS, 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c0b4b8a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f877eb4..aaa9790 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1073,6 +1073,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.addDiagnostic(diagnosticMsg);
         return dag.finished(DAGState.FAILED);
       }
+      if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
+        String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
+            " failedVertices:" + dag.numFailedVertices +
+            " killedVertices:" + dag.numKilledVertices;
+        LOG.info(diagnosticMsg);
+        dag.addDiagnostic(diagnosticMsg);
+        return dag.finished(DAGState.FAILED);
+      }
       if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
         String diagnosticMsg = "DAG failed due to failure in recovery handling." +
             " failedVertices:" + dag.numFailedVertices +
@@ -1728,9 +1736,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     public DAGState transition(DAGImpl job, DAGEvent event) {
       DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
-      job.numCompletedVertices--;
       boolean failed = job.vertexReRunning(vertex);
-
+      if (!failed) {
+        job.numCompletedVertices--;
+      }
 
       LOG.info("Vertex " + vertex.getVertexId() + " re-running."
           + ", numCompletedVertices=" + job.numCompletedVertices
@@ -1838,11 +1847,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       if (groupList != null) {
         for (VertexGroupInfo groupInfo : groupList) {
           if (groupInfo.committed) {
-            LOG.info("Aborting job as committed vertex: "
-                + vertex.getVertexId() + " is re-running");
-            enactKill(DAGTerminationCause.COMMIT_FAILURE,
-                VertexTerminationCause.COMMIT_FAILURE);
+            String msg = "Aborting job as committed vertex: "
+                + vertex.getLogIdentifier() + " is re-running";
+            LOG.info(msg);
+            addDiagnostic(msg);
+            enactKill(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT,
+                VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT);
             return true;
+          } else {
+            groupInfo.successfulMembers--;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/c0b4b8a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8e47d4b..b812f30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1754,6 +1754,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.abortVertex(State.FAILED);
         return vertex.finished(VertexState.FAILED);
       }
+      else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
+      else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT)
{
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to invalid rerun failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
       else {
         //should never occur
         throw new TezUncheckedException("All tasks complete, but cannot determine final state
of vertex:" + vertex.logIdentifier
@@ -3350,6 +3372,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         case INIT_FAILURE:
         case INTERNAL_ERROR:
         case AM_USERCODE_FAILURE:
+        case VERTEX_RERUN_AFTER_COMMIT:
         case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE);
break;
         default://should not occur
           throw new TezUncheckedException("VertexKilledTransition: event.terminationCause
is unexpected: " + trigger);

http://git-wip-us.apache.org/repos/asf/tez/blob/c0b4b8a8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index d859ae0..f75e0ad 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -90,6 +90,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -767,6 +768,9 @@ public class TestDAGImpl {
     doReturn(appAttemptId.getApplicationId())
         .when(groupAppContext).getApplicationID();
     doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
+
+    // reset totalCommitCounter to 0
+    TotalCountingOutputCommitter.totalCommitCounter = 0;
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
     taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
@@ -1069,7 +1073,54 @@ public class TestDAGImpl {
     Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
     Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
   }  
-  
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGroupDAGWithVertexReRunning() {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    initDAG(groupDag);
+    startDAG(groupDag);
+    dispatcher.await();
+
+    Vertex v1 = groupDag.getVertex("vertex1");
+    Vertex v2 = groupDag.getVertex("vertex2");
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // commit should not happen due to vertex-rerunning
+    Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
+
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // commit happen
+    Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+    Assert.assertEquals(2, groupDag.getSuccessfulVertices());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGroupDAGWithVertexReRunningAfterCommit() {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    initDAG(groupDag);
+    startDAG(groupDag);
+    dispatcher.await();
+
+    Vertex v1 = groupDag.getVertex("vertex1");
+    Vertex v2 = groupDag.getVertex("vertex2");
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // vertex group commit happens
+    Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+
+    // dag failed when vertex re-run happens after vertex group commit is done.
+    dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.FAILED, groupDag.getState());
+    Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, groupDag.getTerminationCause());
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGCompletionWithCommitSuccess() {


Mime
View raw message