tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mlidd...@apache.org
Subject git commit: TEZ-172. TestVertexImpl hangs (intermittent failure).
Date Sat, 01 Jun 2013 01:10:42 GMT
Updated Branches:
  refs/heads/TEZ-1 d3b19212c -> d94d37afb


TEZ-172. TestVertexImpl hangs (intermittent failure).


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d94d37af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d94d37af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d94d37af

Branch: refs/heads/TEZ-1
Commit: d94d37afbe9a0cc045e2c8717c93fa5446b589b9
Parents: d3b1921
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri May 31 17:10:14 2013 -0700
Committer: Mike Liddell <mliddell@apache.org>
Committed: Fri May 31 18:09:37 2013 -0700

----------------------------------------------------------------------
 pom.xml                                            |    3 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |    6 +-
 .../apache/tez/dag/app/dag/impl/TestDAGImpl.java   |   68 +++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java       |  262 ++++++++++-----
 4 files changed, 218 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 721df2f..4c5d0c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -325,7 +325,8 @@
           <artifactId>maven-surefire-plugin</artifactId>
           <version>2.14.1</version>
           <configuration>
-            <forkMode>always</forkMode>
+            <forkCount>1</forkCount>
+            <reuseForks>false</reuseForks>
             <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
             <argLine>-Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine>
             <redirectTestOutputToFile>true</redirectTestOutputToFile>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c7bceb6..b06e264 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -613,7 +613,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
          getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
         String message = "Invalid event " + event.getType() +
-            " on vertex " + this.vertexId +
+            " on vertex " + this.vertexName +
+            " with vertexId " + this.vertexId +
             " at current state " + oldState;
         LOG.error("Can't handle " + message, e);
         addDiagnostic(message);
@@ -630,6 +631,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     finally {
       writeLock.unlock();
     }
+    LOG.info("DEBUG: Finished processing VertexEvent " + event.getVertexId()
+        + " of type " + event.getType() + " while in state "
+        + getInternalState() + ". Event: " + event);
   }
 
   private VertexState getInternalState() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 3adcd8b..9e94cbd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -373,19 +373,23 @@ public class TestDAGImpl {
 
   @After
   public void teardown() {
-    dagPlan = null;
-    dag = null;
     dispatcher.await();
     dispatcher.stop();
+    dagPlan = null;
+    dag = null;
   }
 
   private void initDAG(DAGImpl dag) {
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+    dag.handle(
+        new DAGEvent(dagId, DAGEventType.DAG_INIT));
     Assert.assertEquals(DAGState.INITED, dag.getState());
   }
 
+  @SuppressWarnings("unchecked")
   private void startDAG(DAGImpl dag) {
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START));
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagId, DAGEventType.DAG_START));
+    dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
   }
 
@@ -432,9 +436,9 @@ public class TestDAGImpl {
 
     TezVertexID vId = new TezVertexID(dagId, 1);
     Vertex v = dag.getVertex(vId);
-    ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted(
+    dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
         new TezTaskID(vId, 0), TaskState.SUCCEEDED));
-    ((EventHandler<VertexEvent>) v).handle(new VertexEventTaskCompleted(
+    dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
         new TezTaskID(vId, 1), TaskState.SUCCEEDED));
     dispatcher.await();
 
@@ -442,12 +446,14 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dag.getSuccessfulVertices());
   }
 
+  @SuppressWarnings("unchecked")
   public void testKillStartedDAG() {
     initDAG(dag);
     startDAG(dag);
     dispatcher.await();
 
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagId, DAGEventType.DAG_KILL));
     dispatcher.await();
 
     Assert.assertEquals(DAGState.KILLED, dag.getState());
@@ -479,7 +485,7 @@ public class TestDAGImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
     Assert.assertEquals(VertexState.RUNNING, v1.getState());
 
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
     dispatcher.await();
 
     Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
@@ -493,13 +499,16 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dag.getSuccessfulVertices());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testInvalidEvent() {
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_START));
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagId, DAGEventType.DAG_START));
     dispatcher.await();
     Assert.assertEquals(DAGState.ERROR, dag.getState());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   @Ignore
   public void testVertexSuccessfulCompletionUpdates() {
@@ -508,28 +517,29 @@ public class TestDAGImpl {
     dispatcher.await();
 
     for (int i = 0; i < 6; ++i) {
-      dag.handle(new DAGEventVertexCompleted(
+      dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
     }
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
     Assert.assertEquals(1, dag.getSuccessfulVertices());
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 2), VertexState.SUCCEEDED));
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 3), VertexState.SUCCEEDED));
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 4), VertexState.SUCCEEDED));
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 5), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.SUCCEEDED, dag.getState());
     Assert.assertEquals(6, dag.getSuccessfulVertices());
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   @Ignore
   public void testVertexFailureHandling() {
@@ -537,14 +547,14 @@ public class TestDAGImpl {
     startDAG(dag);
     dispatcher.await();
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 2), VertexState.FAILED));
     dispatcher.await();
     Assert.assertEquals(DAGState.FAILED, dag.getState());
@@ -558,6 +568,7 @@ public class TestDAGImpl {
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   @Ignore
   public void testDAGKill() {
@@ -565,17 +576,18 @@ public class TestDAGImpl {
     startDAG(dag);
     dispatcher.await();
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagId, DAGEventType.DAG_KILL));
 
     for (int i = 2; i < 6; ++i) {
-      dag.handle(new DAGEventVertexCompleted(
+      dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           new TezVertexID(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
@@ -584,29 +596,31 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testDAGKillPending() {
     initDAG(dag);
     startDAG(dag);
     dispatcher.await();
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagId, DAGEventType.DAG_KILL));
 
     for (int i = 2; i < 5; ++i) {
-      dag.handle(new DAGEventVertexCompleted(
+      dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           new TezVertexID(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
     Assert.assertEquals(DAGState.KILL_WAIT, dag.getState());
 
-    dag.handle(new DAGEventVertexCompleted(
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         new TezVertexID(dagId, 5), VertexState.KILLED));
     dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d94d37af/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e4a1040..41b500f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -413,11 +413,12 @@ public class TestVertexImpl {
     vertexIdMap = new HashMap<TezVertexID, VertexImpl>();
     for (int i = 0; i < vCnt; ++i) {
       VertexPlan vPlan = dagPlan.getVertex(i);
+      String vName = vPlan.getName();
       TezVertexID vertexId = new TezVertexID(dagId, i+1);
       VertexImpl v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens,
           clock, thh, appContext, vertexLocationHint);
-      vertices.put(vPlan.getName(), v);
+      vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
     }
   }
@@ -492,16 +493,29 @@ public class TestVertexImpl {
 
   @After
   public void teardown() {
+    dispatcher.await();
+    dispatcher.stop();
+    dispatcher = null;
+    vertexEventDispatcher = null;
+    dagEventDispatcher = null;
     dagPlan = null;
     this.vertices = null;
     this.edges = null;
-    dispatcher.await();
-    dispatcher.stop();
+    this.vertexIdMap = null;
   }
 
+  private void initAllVertices() {
+    for (int i = 1; i <= 6; ++i) {
+      VertexImpl v = vertices.get("vertex" + i);
+      initVertex(v);
+    }
+  }
+
+
+  @SuppressWarnings("unchecked")
   private void initVertex(VertexImpl v) {
     Assert.assertEquals(VertexState.NEW, v.getState());
-    v.handle(new VertexEvent(v.getVertexId(),
+    dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(),
           VertexEventType.V_INIT));
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v.getState());
@@ -511,8 +525,10 @@ public class TestVertexImpl {
     startVertex(v, true);
   }
 
+  @SuppressWarnings("unchecked")
   private void killVertex(VertexImpl v, boolean checkKillWait) {
-    v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
     dispatcher.await();
     if (checkKillWait) {
       Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
@@ -521,10 +537,11 @@ public class TestVertexImpl {
     }
   }
 
+  @SuppressWarnings("unchecked")
   private void startVertex(VertexImpl v,
       boolean checkRunningState) {
     Assert.assertEquals(VertexState.INITED, v.getState());
-    v.handle(new VertexEvent(v.getVertexId(),
+    dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(),
           VertexEventType.V_START));
     dispatcher.await();
     if (checkRunningState) {
@@ -532,7 +549,7 @@ public class TestVertexImpl {
     }
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testVertexInit() {
     VertexImpl v = vertices.get("vertex2");
     initVertex(v);
@@ -583,66 +600,79 @@ public class TestVertexImpl {
             .getOutputClassName()));
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testVertexStart() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
     startVertex(v);
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testBasicVertexCompletion() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(1, v.getCompletedTasks());
 
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(2, v.getCompletedTasks());
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   @Ignore // FIXME fix verteximpl for this test to work
   public void testDuplicateTaskCompletion() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
   }
 
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testVertexFailure() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.FAILED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
     String diagnostics =
@@ -650,17 +680,18 @@ public class TestVertexImpl {
     Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testVertexWithNoTasks() {
     // FIXME a vertex with no tasks should not be allowed
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex1");
-    initVertex(v);
     startVertex(v, false);
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testVertexKillDiagnostics() {
     VertexImpl v1 = vertices.get("vertex1");
     killVertex(v1, false);
@@ -679,7 +710,14 @@ public class TestVertexImpl {
         "vertex received kill in inited state"));
 
     VertexImpl v3 = vertices.get("vertex3");
+    VertexImpl v4 = vertices.get("vertex4");
+    VertexImpl v5 = vertices.get("vertex5");
+    VertexImpl v6 = vertices.get("vertex6");
     initVertex(v3);
+    initVertex(v4);
+    initVertex(v5);
+    initVertex(v6);
+
     startVertex(v3);
     killVertex(v3, true);
     diagnostics =
@@ -688,68 +726,77 @@ public class TestVertexImpl {
         "vertex received kill while in running state"));
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testVertexKillPending() {
-    VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
-    VertexImpl v3 = vertices.get("vertex3");
-    initVertex(v3);
+    initAllVertices();
 
+    VertexImpl v = vertices.get("vertex2");
     startVertex(v);
 
-    v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+    dispatcher.await();
     Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(
-        new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(
+            new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+    dispatcher.await();
     Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(
-        new TezTaskID(v.getVertexId(), 1), TaskState.KILLED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(
+            new TezTaskID(v.getVertexId(), 1), TaskState.KILLED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   @Ignore
   public void testVertexKill() {
-    VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
-    VertexImpl v3 = vertices.get("vertex3");
-    initVertex(v3);
+    initAllVertices();
 
+    VertexImpl v = vertices.get("vertex2");
     startVertex(v);
 
-    v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
     Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(
-        new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(
+            new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
     Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(
-        new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(
+            new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   @Ignore
   public void testKilledTasksHandling() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.FAILED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
     Assert.assertEquals(TaskState.KILLED, v.getTask(t2).getState());
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testVertexCommitterInit() {
     VertexImpl v2 = vertices.get("vertex2");
     initVertex(v2);
@@ -762,7 +809,7 @@ public class TestVertexImpl {
         instanceof MRVertexOutputCommitter);
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testVertexSchedulerInit() {
     VertexImpl v2 = vertices.get("vertex2");
     initVertex(v2);
@@ -775,10 +822,13 @@ public class TestVertexImpl {
         instanceof BipartiteSlowStartVertexScheduler);
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testVertexTaskFailure() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
+
     CountingVertexOutputCommitter committer =
         new CountingVertexOutputCommitter();
     v.setVertexOutputCommitter(committer);
@@ -787,62 +837,66 @@ public class TestVertexImpl {
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
 
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.FAILED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.FAILED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
     Assert.assertEquals(0, committer.commitCounter);
     Assert.assertEquals(1, committer.abortCounter);
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testSourceVertexStartHandling() {
+    LOG.info("Testing testSourceVertexStartHandling");
+    initAllVertices();
+
     VertexImpl v4 = vertices.get("vertex4");
-    initVertex(v4);
     VertexImpl v5 = vertices.get("vertex5");
-    initVertex(v5);
     VertexImpl v6 = vertices.get("vertex6");
-    initVertex(v6);
-    Assert.assertEquals(VertexState.INITED, v6.getState());
 
     startVertex(v4);
     startVertex(v5);
     dispatcher.await();
+    LOG.info("Verifying v6 state " + v6.getState());
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
     Assert.assertEquals(1, v6.getDistanceFromRoot());
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testCounters() {
     // FIXME need to test counters at vertex level
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testDiagnostics() {
     // FIXME need to test diagnostics in various cases
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testTaskAttemptCompletionEvents() {
     // FIXME need to test handling of task attempt events
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testSourceTaskAttemptCompletionEvents() {
+    LOG.info("Testing testSourceTaskAttemptCompletionEvents");
+    initAllVertices();
+
     VertexImpl v4 = vertices.get("vertex4");
-    initVertex(v4);
     VertexImpl v5 = vertices.get("vertex5");
-    initVertex(v5);
     VertexImpl v6 = vertices.get("vertex6");
-    initVertex(v6);
 
     startVertex(v4);
     startVertex(v5);
     dispatcher.await();
+    LOG.info("Verifying v6 state " + v6.getState());
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
 
     TezTaskID t1_v4 = new TezTaskID(v4.getVertexId(), 0);
@@ -897,17 +951,21 @@ public class TestVertexImpl {
     Assert.assertEquals(6, v6.getTaskAttemptCompletionEvents(0, 100).length);
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testDAGEventGeneration() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
     startVertex(v);
 
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1,
@@ -915,11 +973,14 @@ public class TestVertexImpl {
             DAGEventType.DAG_VERTEX_COMPLETED).intValue());
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testTaskReschedule() {
     // For downstream failures
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
+
     CountingVertexOutputCommitter committer =
         new CountingVertexOutputCommitter();
     v.setVertexOutputCommitter(committer);
@@ -929,26 +990,33 @@ public class TestVertexImpl {
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
-    v.handle(new VertexEventTaskReschedule(t1));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskReschedule(t1));
     // FIXME need to handle dups
-    // v.handle(new VertexEventTaskReschedule(t1));
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    // dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(t1));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(0, committer.commitCounter);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
 
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testVertexCommit() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
+
     CountingVertexOutputCommitter committer =
         new CountingVertexOutputCommitter();
     v.setVertexOutputCommitter(committer);
@@ -958,14 +1026,18 @@ public class TestVertexImpl {
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
 
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
@@ -974,20 +1046,23 @@ public class TestVertexImpl {
     Assert.assertEquals(0, committer.setupCounter); // already done in init
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testCommitterInitAndSetup() {
     // FIXME need to add a test for this
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testTaskAttemptFetchFailureHandling() {
     // FIXME needs testing
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testBadCommitter() {
+    initAllVertices();
+
     VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
+
     CountingVertexOutputCommitter committer =
         new CountingVertexOutputCommitter(true, true);
     v.setVertexOutputCommitter(committer);
@@ -997,8 +1072,10 @@ public class TestVertexImpl {
     TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
     TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
 
-    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
-    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v.getState());
     Assert.assertEquals(1, committer.commitCounter);
@@ -1009,14 +1086,15 @@ public class TestVertexImpl {
     Assert.assertEquals(0, committer.setupCounter); // already done in init
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testHistoryEventGeneration() {
   }
 
-  @Test
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testInvalidEvent() {
     VertexImpl v = vertices.get("vertex2");
-    v.handle(new VertexEvent(v.getVertexId(),
+    dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(),
         VertexEventType.V_START));
     dispatcher.await();
     Assert.assertEquals(VertexState.ERROR, v.getState());


Mime
View raw message