tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-3213. Uncaught exception during vertex recovery leads to invalid state transition loop. (Eric Badger via hitesh)
Date Wed, 27 Apr 2016 20:16:08 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 ea23646de -> 2917f4571


TEZ-3213. Uncaught exception during vertex recovery leads to invalid state transition loop.
(Eric Badger via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2917f457
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2917f457
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2917f457

Branch: refs/heads/branch-0.7
Commit: 2917f45719fc4415cdfbd1b9b9a27f9d3a777b26
Parents: ea23646
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Apr 27 13:13:38 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Apr 27 13:13:38 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  3 ++
 .../dag/app/dag/impl/TestVertexRecovery.java    | 41 ++++++++++++++++++++
 3 files changed, 45 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2917f457/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc4e5ae..129e3cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3213. Uncaught exception during vertex recovery leads to invalid state transition loop.
   TEZ-3224. User payload is not initialized before creating vertex manager plugin. 
   TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization
relative to Inputs/Outputs
   TEZ-3202. Reduce the memory need for jobs with high number of segments

http://git-wip-us.apache.org/repos/asf/tez/blob/2917f457/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e646363..2df1a3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -343,6 +343,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
           .addTransition(VertexState.NEW, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(VertexState.RECOVERING, VertexState.ERROR,
+              VertexEventType.V_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
           .addTransition
               (VertexState.RECOVERING,
                   EnumSet.of(VertexState.NEW, VertexState.INITED,

http://git-wip-us.apache.org/repos/asf/tez/blob/2917f457/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 0f532fb..bdb2377 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -73,6 +73,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
@@ -1337,4 +1338,44 @@ public class TestVertexRecovery {
     }
     assertEquals(DAGState.FAILED, dag.getState());
   }
+
+   @Test (timeout = 5000)
+  public void testRecovery_VInternalError() {
+    // In order to simulate the behavior that VertexManagerError happens in recovering stage,
need to start the recovering from
+    // vertex and disable the the eventhandling of DAG (use mock here).
+    dispatcher = new DrainDispatcher();
+    dispatcher.register(DAGEventType.class, mock(EventHandler.class));
+    vertexEventHandler = new VertexEventHanlder();
+    dispatcher.register(VertexEventType.class, vertexEventHandler);
+    taskEventHandler = new TaskEventHandler();
+    dispatcher.register(TaskEventType.class, taskEventHandler);
+    dispatcher.register(TaskAttemptEventType.class,
+        new TaskAttemptEventHandler());
+    dispatcher.init(new Configuration());
+    dispatcher.start();
+    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    DAGPlan dagPlan = createDAGPlan();
+    dag =
+        new DAGImpl(dagId, new Configuration(), dagPlan,
+            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            new Credentials(), new SystemClock(), user,
+            mock(TaskHeartbeatHandler.class), mockAppContext);
+    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
+    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+    doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
+    dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", "dagName", null));
+    dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", "dagName"));
+    LOG.info("finish setUp");
+
+    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
+
+    vertex3.handle(new VertexEventSourceVertexRecovered(
+        vertex3.getVertexId(),
+        null, VertexState.NEW, null, 0));
+    assertEquals(VertexState.RECOVERING, vertex3.getState());
+
+    vertex3.handle(new VertexEvent(
+        vertex3.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
+    assertEquals(VertexState.ERROR, vertex3.getState());
+  }
 }


Mime
View raw message