tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] tez git commit: TEZ-2857. Fix flakey tests in TestDAGImpl. (sseth) (cherry picked from commit d63d6ee600464662670058485492ec56ae13cffe)
Date Tue, 29 Sep 2015 00:52:45 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 4a7441731 -> 06da29aec


TEZ-2857. Fix flakey tests in TestDAGImpl. (sseth)
(cherry picked from commit d63d6ee600464662670058485492ec56ae13cffe)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bb063c6a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bb063c6a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bb063c6a

Branch: refs/heads/branch-0.7
Commit: bb063c6ade787eec84361b28c2695e9b2bfbfad2
Parents: 4a74417
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Sep 28 17:47:50 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Sep 28 17:51:50 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 99 +++++++++++++++-----
 2 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bb063c6a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d51eed..1c17702 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2857. Fix flakey tests in TestDAGImpl.
   TEZ-2398. Flaky test: TestFaultTolerance
   TEZ-2808. Race condition between preemption and container assignment
   TEZ-2853. Tez UI: task attempt page is coming empty

http://git-wip-us.apache.org/repos/asf/tez/blob/bb063c6a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index e69db0f..49f534b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1007,7 +1007,7 @@ public class TestDAGImpl {
     dispatcher.await();
     Assert.assertEquals(DAGState.FAILED, dag.getState());
     Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
-    Assert.assertTrue(StringUtils.join(dag.getDiagnostics(),",")
+    Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), ",")
         .contains("Vertex's TaskResource is beyond the cluster container capability"));
   }
 
@@ -1093,7 +1093,7 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dag.getSuccessfulVertices());
 
     // 2 tasks completed, total plan has 11 vertices
-    Assert.assertEquals((float)2/11,
+    Assert.assertEquals((float) 2 / 11,
         dag.getCompletedTaskProgress(), 0.05);
   }
   
@@ -1168,7 +1168,7 @@ public class TestDAGImpl {
     setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true);
     dispatcher.getEventHandler().handle(
         new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
-    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
         null));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
@@ -1183,7 +1183,8 @@ public class TestDAGImpl {
     DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0]));
     TezEvent tezEvent = new TezEvent(daEvent, 
         new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
-    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
     dispatcher.await();
 
     Assert.assertEquals(VertexState.FAILED, v2.getState());
@@ -1230,7 +1231,7 @@ public class TestDAGImpl {
     setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
     dispatcher.getEventHandler().handle(
         new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
-    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
         null));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
@@ -1245,7 +1246,8 @@ public class TestDAGImpl {
     InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0);
     TezEvent tezEvent = new TezEvent(ireEvent, 
         new EventMetaData(EventProducerConsumerType.INPUT,"vertex2", "vertex1", ta1.getID()));
-    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
     dispatcher.await();
     // 
     Assert.assertEquals(VertexState.FAILED, v2.getState());
@@ -1318,12 +1320,14 @@ public class TestDAGImpl {
     Vertex v2 = groupDag.getVertex("vertex2");
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
-    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
     dispatcher.await();
     // commit should not happen due to vertex-rerunning
     Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
 
-    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+        new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
     dispatcher.await();
     // commit happen
     Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
@@ -1417,10 +1421,10 @@ public class TestDAGImpl {
                 .newBuilder()
                 .setClassName(CountingOutputCommitter.class.getName())
                 .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
-                .setUserPayload(
-                    ByteString
-                        .copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
-                            true, false, false).toUserPayload())).build()))
+                    .setUserPayload(
+                        ByteString
+                            .copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                                true, false, false).toUserPayload())).build()))
         .setName("output3")
         .setIODescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName("output.class")
@@ -1848,13 +1852,8 @@ public class TestDAGImpl {
     }
   }
 
-  // a dag.kill() on an active DAG races with vertices all succeeding.
-  // if a JOB_KILL is processed while dag is in running state, it should end in KILLED,
-  // regardless of whether all vertices complete
-  //
-  // Final state:
-  //   DAG is in KILLED state, with killTrigger = USER_KILL
-  //   Each vertex had kill triggered but raced ahead and ends in SUCCEEDED state.
+  // Couple of vertices succeed. DAG_KILLED processed, which causes the rest of the vertices
to be
+  // marked as KILLED.
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGKill() {
@@ -1870,22 +1869,70 @@ public class TestDAGImpl {
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+    Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    Assert.assertEquals(2, dag.getSuccessfulVertices());
+
+    int killedCount = 0;
+    for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) {
+      if (vEntry.getValue().getState() == VertexState.KILLED) {
+        killedCount++;
+      }
+    }
+    Assert.assertEquals(4, killedCount);
+
+    for (Vertex v : dag.getVertices().values()) {
+      Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
+    }
+
+    Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
+  }
+
+  // Vertices succeed after a DAG kill has been processed. Should be ignored.
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testDAGKillVertexSuccessAfterKill() {
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.await();
+
+    Assert.assertEquals(DAGState.KILLED, dag.getState());
+
+    // Vertex SUCCESS gets processed after the DAG has reached the KILLED state. Should be
ignored.
     for (int i = 2; i < 6; ++i) {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
-    Assert.assertEquals(DAGState.KILLED, dag.getState());
+
+    int killedCount = 0;
+    for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) {
+      if (vEntry.getValue().getState() == VertexState.KILLED) {
+        killedCount++;
+      }
+    }
+    Assert.assertEquals(4, killedCount);
+
     Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
-    Assert.assertEquals(6, dag.getSuccessfulVertices());
+    Assert.assertEquals(2, dag.getSuccessfulVertices());
     for (Vertex v : dag.getVertices().values()) {
       Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
     }
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
-  // job kill races with most vertices succeeding and one directly killed.
-  // because the job.kill() happens before the direct kill, the vertex has kill_trigger=DAG_KILL
+  // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGKillPending() {
@@ -1900,13 +1947,14 @@ public class TestDAGImpl {
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
         TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
-    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
 
     for (int i = 2; i < 5; ++i) {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
           TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
+    dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+    dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
@@ -1914,7 +1962,8 @@ public class TestDAGImpl {
     dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());
     Assert.assertEquals(5, dag.getSuccessfulVertices());
-    Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(),
VertexTerminationCause.DAG_KILL);
+    Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(),
+        VertexTerminationCause.DAG_KILL);
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 


Mime
View raw message