tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1172. Allow multiple Root Inputs to be specified per Vertex. (sseth)
Date Thu, 12 Jun 2014 19:42:39 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 8142f3c91 -> 608ee831b


TEZ-1172. Allow multiple Root Inputs to be specified per Vertex. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/608ee831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/608ee831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/608ee831

Branch: refs/heads/master
Commit: 608ee831b4ed4471905eeef85da12c5ac4b25113
Parents: 8142f3c
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Jun 12 12:41:31 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Jun 12 12:41:31 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   |   2 +-
 .../java/org/apache/tez/dag/api/Vertex.java     |  11 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  72 ++++++++-----
 .../app/dag/impl/RootInputVertexManager.java    |  18 +++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 -
 .../dag/impl/TestRootInputVertexManager.java    | 103 +++++++++++++++++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  21 ++--
 7 files changed, 184 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608ee831/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c49ac79..9df37b3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -377,7 +377,7 @@ public class DAG { // FIXME rename to Topology
       outboundSet.add(outputVertex.getVertexName());
     }
 
-    // check input and output names dont collide with vertex names
+    // check input and output names don't collide with vertex names
     for (Vertex vertex : vertices.values()) {
       for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
         if (vertexMap.containsKey(input.getName())) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608ee831/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 1be75a2..a6c5999 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -154,7 +154,12 @@ public class Vertex {
    * 
    * If a vertex needs to use data generated by another vertex in the DAG and
    * also from an external source, a combination of this API and the DAG.addEdge
-   * API can be used.
+   * API can be used. </p>
+   * 
+   * Note: If more than one RootInput exists on a vertex, which generates events which need
to be
+   * routed, or generates information to set parallelism, a custom vertex manager should
be setup
+   * to handle this. Not using a custom vertex manager for such a scenario will lead to a
+   * runtime failure. 
    * 
    * @param inputName
    *          the name of the input. This will be used when accessing the input
@@ -174,10 +179,6 @@ public class Vertex {
    */
   public Vertex addInput(String inputName, InputDescriptor inputDescriptor,
       Class<? extends TezRootInputInitializer> inputInitializer) {
-    if (additionalInputs.size() == 1) {
-      throw new IllegalStateException(
-          "For now, only a single Root Input can be added to a Vertex");
-    }
     additionalInputs.add(new RootInputLeafOutput<InputDescriptor>(inputName,
         inputDescriptor, inputInitializer));
     return this;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608ee831/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 4b6dfc6..e0d0551 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -36,7 +36,7 @@ public class TestDAGVerify {
   //    v1
   //    |
   //    v2
-  @Test
+  @Test(timeout = 5000)
   public void testVerifyScatterGather() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor(dummyProcessorClassName),
@@ -56,7 +56,7 @@ public class TestDAGVerify {
     dag.verify();
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testVerifyCustomEdge() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor(dummyProcessorClassName),
@@ -77,7 +77,7 @@ public class TestDAGVerify {
     dag.verify();
   }
 
-  @Test  
+  @Test(timeout = 5000)
   public void testVerifyOneToOne() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor(dummyProcessorClassName),
@@ -97,7 +97,7 @@ public class TestDAGVerify {
     dag.verify();
   }
   
-  @Test
+  @Test(timeout = 5000)
   // v1 (known) -> v2 (-1) -> v3 (-1)
   public void testVerifyOneToOneInferParallelism() {
     Vertex v1 = new Vertex("v1",
@@ -130,7 +130,7 @@ public class TestDAGVerify {
     Assert.assertEquals(dummyTaskCount, v3.getParallelism());
   }
   
-  @Test
+  @Test(timeout = 5000)
   // v1 (known) -> v2 (-1) -> v3 (-1)
   // The test checks resiliency to ordering of the vertices/edges
   public void testVerifyOneToOneInferParallelismReverseOrder() {
@@ -164,7 +164,7 @@ public class TestDAGVerify {
     Assert.assertEquals(dummyTaskCount, v3.getParallelism());
   }
   
-  @Test  
+  @Test(timeout = 5000)
   public void testVerifyOneToOneNoInferParallelism() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor(dummyProcessorClassName),
@@ -185,7 +185,7 @@ public class TestDAGVerify {
     Assert.assertEquals(-1, v2.getParallelism());
   }
   
-  @Test  
+  @Test(timeout = 5000)
   // v1 (-1) -> v2 (known) -> v3 (-1)
   public void testVerifyOneToOneIncorrectParallelism1() {
     Vertex v1 = new Vertex("v1",
@@ -222,7 +222,7 @@ public class TestDAGVerify {
     }
   }
 
-  @Test
+  @Test(timeout = 5000)
   // v1 (-1) -> v3 (-1), v2 (known) -> v3 (-1)
   // order of edges should not matter
   public void testVerifyOneToOneIncorrectParallelism2() {
@@ -270,7 +270,7 @@ public class TestDAGVerify {
     }
   }
   
-  @Test  
+  @Test(timeout = 5000)
   public void testVerifyBroadcast() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor(dummyProcessorClassName),
@@ -290,7 +290,7 @@ public class TestDAGVerify {
     dag.verify();
   }
 
-  @Test(expected = IllegalStateException.class)  
+  @Test(expected = IllegalStateException.class, timeout = 5000)  
   public void testVerify3() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor(dummyProcessorClassName),
@@ -310,7 +310,7 @@ public class TestDAGVerify {
     dag.verify();
   }
 
-  @Test(expected = IllegalStateException.class)  
+  @Test(expected = IllegalStateException.class, timeout = 5000)  
   public void testVerify4() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor(dummyProcessorClassName),
@@ -335,7 +335,7 @@ public class TestDAGVerify {
   //       v2   ^
   //      |  |  ^
   //    v3    v4
-  @Test
+  @Test(timeout = 5000)
   public void testCycle1() {
     IllegalStateException ex=null;
     Vertex v1 = new Vertex("v1",
@@ -395,7 +395,7 @@ public class TestDAGVerify {
   //    -> v2
   //    ^  | |
   //    v3    v4
-  @Test
+  @Test(timeout = 5000)
   public void testCycle2() {
     IllegalStateException ex=null;
     Vertex v1 = new Vertex("v1",
@@ -450,7 +450,7 @@ public class TestDAGVerify {
     Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void repeatedVertexName() {
     IllegalStateException ex=null;
     Vertex v1 = new Vertex("v1",
@@ -473,7 +473,7 @@ public class TestDAGVerify {
     Assert.assertTrue(ex.getMessage().startsWith("Vertex v1 already defined"));
   }
   
-  @Test (expected = IllegalStateException.class)
+  @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testInputAndInputVertexNameCollision() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("MapProcessor"),
@@ -497,7 +497,7 @@ public class TestDAGVerify {
     dag.verify();
   }
   
-  @Test (expected = IllegalStateException.class)
+  @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testOutputAndOutputVertexNameCollision() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("MapProcessor"),
@@ -521,7 +521,7 @@ public class TestDAGVerify {
     dag.verify();
   }
   
-  @Test (expected = IllegalStateException.class)
+  @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testOutputAndVertexNameCollision() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("MapProcessor"),
@@ -538,7 +538,7 @@ public class TestDAGVerify {
     dag.verify();
   }
   
-  @Test (expected = IllegalStateException.class)
+  @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testInputAndVertexNameCollision() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("MapProcessor"),
@@ -558,7 +558,7 @@ public class TestDAGVerify {
   //  v1  v2
   //   |  |
   //    v3
-  @Test
+  @Test(timeout = 5000)
   public void BinaryInputAllowed() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("MapProcessor"),
@@ -588,7 +588,7 @@ public class TestDAGVerify {
     dag.verify();
   }
   
-  @Test
+  @Test(timeout = 5000)
   public void testVertexGroupWithMultipleOutputEdges() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("Processor"),
@@ -638,7 +638,7 @@ public class TestDAGVerify {
     Assert.assertTrue(v2.getOutputVertices().contains(v4));
   }
   
-  @Test
+  @Test(timeout = 5000)
   public void testVertexGroup() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("Processor"),
@@ -718,7 +718,7 @@ public class TestDAGVerify {
     Assert.assertEquals(2, dag.vertexGroups.size());
   }
   
-  @Test
+  @Test(timeout = 5000)
   public void testVertexGroupOneToOne() {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("Processor"),
@@ -772,7 +772,7 @@ public class TestDAGVerify {
   //   v1
   //  |  |
   //  v2  v3
-  @Test
+  @Test(timeout = 5000)
   public void BinaryOutput() {
     IllegalStateException ex = null;
     try {
@@ -809,7 +809,7 @@ public class TestDAGVerify {
     Assert.assertNull(ex);
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testDagWithNoVertices() {
     IllegalStateException ex=null;
     try {
@@ -826,7 +826,7 @@ public class TestDAGVerify {
   }
 
   @SuppressWarnings("unused")
-  @Test
+  @Test(timeout = 5000)
   public void testInvalidVertexConstruction() {
     {
       Vertex v1 = new Vertex("v1",
@@ -858,4 +858,26 @@ public class TestDAGVerify {
     }
   }
 
+  @Test(timeout = 5000)
+  public void testMultipleRootInputsAllowed() {
+    DAG dag = new DAG("testDag");
+    ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1")
+        .setUserPayload("processor1Bytes".getBytes());
+    Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
+    VertexManagerPluginDescriptor vertexManagerPluginDescriptor = new VertexManagerPluginDescriptor(
+        "TestVertexManager");
+    v1.setVertexManagerPlugin(vertexManagerPluginDescriptor);
+
+    InputDescriptor inputDescriptor1 = new InputDescriptor("input1").setUserPayload("inputBytes"
+        .getBytes());
+    InputDescriptor inputDescriptor2 = new InputDescriptor("input2").setUserPayload("inputBytes"
+        .getBytes());
+    v1.addInput("input1", inputDescriptor1, null);
+    v1.addInput("input2", inputDescriptor2, null);
+
+    dag.addVertex(v1);
+
+    dag.createDag(new TezConfiguration());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608ee831/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index e1d73e4..5bd80e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Lists;
 public class RootInputVertexManager implements VertexManagerPlugin {
 
   VertexManagerPluginContext context;
+  private String configuredInputName;
 
   @Override
   public void initialize(VertexManagerPluginContext context) {
@@ -73,9 +74,14 @@ public class RootInputVertexManager implements VertexManagerPlugin {
       if (event instanceof RootInputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state check.
         Preconditions.checkState(dataInformationEventSeen == false);
-        Preconditions
-            .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
-                "Parallelism for the vertex should be set to -1 if the InputInitializer is
setting parallelism");
+        Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
+            "Parallelism for the vertex should be set to -1 if the InputInitializer is setting
parallelism"
+                + ", VertexName: " + context.getVertexName());
+        Preconditions.checkState(configuredInputName == null,
+            "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"
+                + ", VertexName: " + context.getVertexName() + ", ConfiguredInput: "
+                + configuredInputName + ", CurrentInput: " + inputName);
+        configuredInputName = inputName;
         RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent)
event;
         Map<String, RootInputSpecUpdate> rootInputSpecUpdate = new HashMap<String,
RootInputSpecUpdate>();
         rootInputSpecUpdate.put(
@@ -94,6 +100,12 @@ public class RootInputVertexManager implements VertexManagerPlugin {
         dataInformationEventSeen = true;
         // # Tasks should have been set by this point.
         Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) != 0);
+        Preconditions.checkState(
+            configuredInputName == null || configuredInputName.equals(inputName),
+            "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"
+                + ", VertexName:" + context.getVertexName() + ", ConfiguredInput: "
+                + configuredInputName + ", CurrentInput: " + inputName);
+        configuredInputName = inputName;
         
         RootInputDataInformationEvent rEvent = (RootInputDataInformationEvent)event;
         rEvent.setTargetIndex(rEvent.getSourceIndex()); // 1:1 routing

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608ee831/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0b52062..42d010d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3465,8 +3465,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
-    Preconditions.checkArgument(inputs.size() < 2,
-        "For now, only a single root input can be specified on a Vertex");
     this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size());
     for (RootInputLeafOutputProto input : inputs) {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608ee831/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
new file mode 100644
index 0000000..c37cc00
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.junit.Test;
+
+public class TestRootInputVertexManager {
+
+  @Test
+  public void testEventsFromMultipleInputs() {
+
+    VertexManagerPluginContext context = mock(VertexManagerPluginContext.class);
+    doReturn("vertex1").when(context).getVertexName();
+    doReturn(1).when(context).getVertexNumTasks(eq("vertex1"));
+
+    RootInputVertexManager rootInputVertexManager = new RootInputVertexManager();
+    rootInputVertexManager.initialize(context);
+
+    InputDescriptor id1 = mock(InputDescriptor.class);
+    List<Event> events1 = new LinkedList<Event>();
+    RootInputDataInformationEvent diEvent11 = new RootInputDataInformationEvent(0, null);
+    events1.add(diEvent11);
+    rootInputVertexManager.onRootVertexInitialized("input1", id1, events1);
+    // All good so far, single input only.
+
+    InputDescriptor id2 = mock(InputDescriptor.class);
+    List<Event> events2 = new LinkedList<Event>();
+    RootInputDataInformationEvent diEvent21 = new RootInputDataInformationEvent(0, null);
+    events2.add(diEvent21);
+    try {
+      // Should fail due to second input
+      rootInputVertexManager.onRootVertexInitialized("input2", id2, events2);
+      fail("Expecting failure in case of multiple inputs attempting to send events");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().startsWith(
+          "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
+    }
+  }
+
+  @Test
+  public void testConfigureFromMultipleInputs() {
+
+    VertexManagerPluginContext context = mock(VertexManagerPluginContext.class);
+    doReturn("vertex1").when(context).getVertexName();
+    doReturn(-1).when(context).getVertexNumTasks(eq("vertex1"));
+
+    RootInputVertexManager rootInputVertexManager = new RootInputVertexManager();
+    rootInputVertexManager.initialize(context);
+
+    InputDescriptor id1 = mock(InputDescriptor.class);
+    List<Event> events1 = new LinkedList<Event>();
+    RootInputConfigureVertexTasksEvent diEvent11 = new RootInputConfigureVertexTasksEvent(1,
null,
+        null);
+    events1.add(diEvent11);
+    rootInputVertexManager.onRootVertexInitialized("input1", id1, events1);
+    // All good so far, single input only.
+
+    InputDescriptor id2 = mock(InputDescriptor.class);
+    List<Event> events2 = new LinkedList<Event>();
+    RootInputConfigureVertexTasksEvent diEvent21 = new RootInputConfigureVertexTasksEvent(1,
null,
+        null);
+    events2.add(diEvent21);
+    try {
+      // Should fail due to second input
+      rootInputVertexManager.onRootVertexInitialized("input2", id2, events2);
+      fail("Expecting failure in case of multiple inputs attempting to send events");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().startsWith(
+          "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/608ee831/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index ab798db..16ee7bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -1576,7 +1577,6 @@ public class TestVertexImpl {
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
       vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
     }
-    
     setupVertices();
     when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
       @Override
@@ -1638,8 +1638,10 @@ public class TestVertexImpl {
 
   @After
   public void teardown() {
-    dispatcher.await();
-    dispatcher.stop();
+    if (dispatcher.isInState(STATE.STARTED)) {
+      dispatcher.await();
+      dispatcher.stop();
+    }
     dispatcher = null;
     vertexEventDispatcher = null;
     dagEventDispatcher = null;
@@ -2550,7 +2552,7 @@ public class TestVertexImpl {
     
     RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
-    runner1.completeInputInitialization(numTasks, v1Hints);
+    runner1.completeInputInitialization(0, numTasks, v1Hints);
 
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(numTasks, v1.getTotalTasks());
@@ -2729,7 +2731,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
-    runner1.completeInputInitialization(5, v1Hints);
+    runner1.completeInputInitialization(0, 5, v1Hints);
 
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(5, v1.getTotalTasks());
@@ -2768,7 +2770,7 @@ public class TestVertexImpl {
     
     RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
     List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
-    runner2.completeInputInitialization(10, v2Hints);
+    runner2.completeInputInitialization(0, 10, v2Hints);
     
     Assert.assertEquals(VertexState.INITED, v2.getState());
     Assert.assertEquals(10, v2.getTotalTasks());
@@ -2882,7 +2884,7 @@ public class TestVertexImpl {
       Assert.assertEquals(i + 1, inputSpecs.get(0).getPhysicalEdgeCount());
     }
   }
-  
+
   private List<TaskLocationHint> createTaskLocationHints(int numTasks) {
     List<TaskLocationHint> locationHints = Lists
         .newArrayListWithCapacity(numTasks);
@@ -3012,7 +3014,8 @@ public class TestVertexImpl {
       dispatcher.await();
     }
     
-    public void completeInputInitialization(int targetTasks, List<TaskLocationHint>
locationHints) {
+    public void completeInputInitialization(int initializerIndex, int targetTasks,
+        List<TaskLocationHint> locationHints) {
       List<Event> events = Lists.newArrayListWithCapacity(targetTasks + 1);
 
       RootInputConfigureVertexTasksEvent configEvent = new RootInputConfigureVertexTasksEvent(
@@ -3024,7 +3027,7 @@ public class TestVertexImpl {
         events.add(diEvent);
       }
       eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
-          .get(0).getEntityName(), events));
+          .get(initializerIndex).getEntityName(), events));
       dispatcher.await();
     }
   }


Mime
View raw message