tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1170 Simplify Vertex Initializing transition (bikas)
Date Thu, 12 Jun 2014 18:54:34 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master a6c7ee30a -> 8142f3c91


TEZ-1170 Simplify Vertex Initializing transition (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8142f3c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8142f3c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8142f3c9

Branch: refs/heads/master
Commit: 8142f3c91c5ee8c22c461f4ba6fece35f04a550d
Parents: a6c7ee3
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jun 12 11:54:26 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jun 12 11:54:26 2014 -0700

----------------------------------------------------------------------
 .../VertexEventParallelismInitialized.java      |  29 --
 .../tez/dag/app/dag/event/VertexEventType.java  |   2 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 318 ++++++++++---------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 298 +++++++++++++----
 5 files changed, 407 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8142f3c9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventParallelismInitialized.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventParallelismInitialized.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventParallelismInitialized.java
deleted file mode 100644
index 0c2a011..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventParallelismInitialized.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezVertexID;
-
-public class VertexEventParallelismInitialized extends VertexEvent {
-
-  public VertexEventParallelismInitialized(TezVertexID vertexId) {
-    super(vertexId, VertexEventType.V_PARALLELISM_INITIALIZED);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8142f3c9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 6075dbe..828f75b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -61,7 +61,7 @@ public enum VertexEventType {
   V_RECOVER,
   
   // Producer: Vertex
-  V_PARALLELISM_INITIALIZED,
+  V_READY_TO_INIT,
 
   // Recover Event, Producer:Vertex
   V_SOURCE_VERTEX_RECOVERED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8142f3c9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index c4d9c70..1e31545 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -327,6 +327,7 @@ public class Edge {
               " srcTaskIndex = " + srcTaskIndex +
               " destVertex=" + destinationVertex.getVertexId() +
               " destTaskIndex=" + destTaskIndex + 
+              " destNumTasks=" + destinationVertex.getTotalTasks() + 
               " edgeManager=" + edgeManager.getClass().getName());
         }
         TezTaskID destTaskId = destTask.getTaskId();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8142f3c9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8aba90a..0b52062 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -104,7 +104,6 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
-import org.apache.tez.dag.app.dag.event.VertexEventParallelismInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -241,7 +240,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                       VertexState.INITIALIZING, VertexState.FAILED),
                   VertexEventType.V_INIT,
                   new InitTransition())
-          .addTransition(VertexState.NEW, VertexState.NEW,
+          .addTransition(VertexState.NEW, 
+                EnumSet.of(VertexState.NEW),
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
                   new NullEdgeInitializedTransition())
           .addTransition
@@ -291,9 +291,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               (VertexState.RECOVERING, VertexState.RECOVERING,
                   VertexEventType.V_TERMINATE,
                   new TerminateDuringRecoverTransition())
-          .addTransition(VertexState.RECOVERING, VertexState.RECOVERING,
-                  VertexEventType.V_NULL_EDGE_INITIALIZED,
-                  new NullEdgeInitializedTransition())
 
           // Transitions from INITIALIZING state
           .addTransition(VertexState.INITIALIZING,
@@ -302,13 +299,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_ROOT_INPUT_INITIALIZED,
               new RootInputInitializedTransition())
           .addTransition(VertexState.INITIALIZING,
-              EnumSet.of(VertexState.FAILED, VertexState.INITED),
+              EnumSet.of(VertexState.INITIALIZING),
               VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
               new OneToOneSourceSplitTransition())
           .addTransition(VertexState.INITIALIZING,
               EnumSet.of(VertexState.INITED, VertexState.FAILED),
-              VertexEventType.V_PARALLELISM_INITIALIZED,
-              new VertexParallelismInitializedTransition())
+              VertexEventType.V_READY_TO_INIT,
+              new VertexInitializedTransition())
           .addTransition(VertexState.INITIALIZING, VertexState.FAILED,
               VertexEventType.V_ROOT_INPUT_FAILED,
               new RootInputInitFailedTransition())
@@ -330,7 +327,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITIALIZING, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
+          .addTransition(VertexState.INITIALIZING, 
+              EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
+                  VertexState.FAILED),
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
                   new NullEdgeInitializedTransition())
 
@@ -365,9 +364,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          .addTransition(VertexState.INITED, VertexState.INITED,
-              VertexEventType.V_NULL_EDGE_INITIALIZED,
-              new NullEdgeInitializedTransition())
 
           // Transitions from RUNNING state
           .addTransition(VertexState.RUNNING, VertexState.RUNNING,
@@ -413,6 +409,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexState.TERMINATING, VertexState.KILLED, VertexState.FAILED),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
+          .addTransition(VertexState.TERMINATING,
+              EnumSet.of(VertexState.TERMINATING),
+              VertexEventType.V_ROOT_INPUT_INITIALIZED,
+              new RootInputInitializedTransition())
           .addTransition(
               VertexState.TERMINATING,
               VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
@@ -421,6 +421,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
+                  VertexEventType.V_NULL_EDGE_INITIALIZED,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
@@ -464,6 +465,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.FAILED,
               VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(VertexState.FAILED,
+              EnumSet.of(VertexState.FAILED),
+              VertexEventType.V_ROOT_INPUT_INITIALIZED,
+              new RootInputInitializedTransition())
           // Ignore-able events
           .addTransition(VertexState.FAILED, VertexState.FAILED,
               EnumSet.of(VertexEventType.V_TERMINATE,
@@ -474,8 +479,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
-                  VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_NULL_EDGE_INITIALIZED,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_RECOVERED))
 
@@ -484,6 +489,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.KILLED,
               VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(VertexState.KILLED,
+              EnumSet.of(VertexState.KILLED),
+              VertexEventType.V_ROOT_INPUT_INITIALIZED,
+              new RootInputInitializedTransition())
           // Ignore-able events
           .addTransition(VertexState.KILLED, VertexState.KILLED,
               EnumSet.of(VertexEventType.V_TERMINATE,
@@ -496,11 +505,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
-                  VertexEventType.V_ROOT_INPUT_INITIALIZED,
+                  VertexEventType.V_NULL_EDGE_INITIALIZED,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_RECOVERED))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
+          .addTransition(VertexState.ERROR,
+              EnumSet.of(VertexState.ERROR),
+              VertexEventType.V_ROOT_INPUT_INITIALIZED,
+              new RootInputInitializedTransition())
           .addTransition(
               VertexState.ERROR,
               VertexState.ERROR,
@@ -515,7 +528,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_INTERNAL_ERROR,
-                  VertexEventType.V_ROOT_INPUT_INITIALIZED,
+                  VertexEventType.V_NULL_EDGE_INITIALIZED,
                   VertexEventType.V_ROOT_INPUT_FAILED,
                   VertexEventType.V_SOURCE_VERTEX_RECOVERED))
           // create the topology tables
@@ -540,7 +553,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private final TezVertexID vertexId;  //runtime assigned id.
   private final VertexPlan vertexPlan;
-  private boolean sendEventWhenParallelismInitialized = false;
+  private boolean initWaitsForRootInitializers = false;
 
   private final String vertexName;
   private final ProcessorDescriptor processorDescriptor;
@@ -1070,6 +1083,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     try {
       tasksNotYetScheduled = false;
       if (!pendingTaskEvents.isEmpty()) {
+        LOG.info("Routing pending task events for vertex: " + logIdentifier);
         VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
             new VertexEventRouteEvent(getVertexId(), pendingTaskEvents));
         pendingTaskEvents.clear();
@@ -1196,8 +1210,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         this.createTasks();
         LOG.info("Vertex " + getVertexId() + 
             " parallelism set to " + parallelism);
-        if (sendEventWhenParallelismInitialized) {
-          getEventHandler().handle(new VertexEventParallelismInitialized(getVertexId()));
+        if (canInitVertex()) {
+          getEventHandler().handle(new VertexEvent(getVertexId(), VertexEventType.V_READY_TO_INIT));
         }
       } else {
         // This is an artificial restriction since there's no way of knowing whether a VertexManager
@@ -1674,28 +1688,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private boolean initializeVertex() {
-    if (targetVertices != null) {
-      for (Edge e : targetVertices.values()) {
-        if (e.getEdgeManager() == null) {
-          Preconditions
-              .checkState(
-                  e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
-                  "Null edge manager allowed only for custom edge. " + logIdentifier);
-          uninitializedEdges.add(e);
-        }
-      }
-    }
-    if (sourceVertices != null) {
-      for (Edge e : sourceVertices.values()) {
-        if (e.getEdgeManager() == null) {
-          Preconditions
-              .checkState(
-                  e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
-                  "Null edge manager allowed only for custom edge. " + logIdentifier);
-          uninitializedEdges.add(e);
-        }
-      }
-    }
     try {
       initializeCommitters();
     } catch (Exception e) {
@@ -2210,23 +2202,32 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   public static class NullEdgeInitializedTransition implements
-      SingleArcTransition<VertexImpl, VertexEvent> {
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
     @Override
-    public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
+    public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
       VertexEventNullEdgeInitialized event = (VertexEventNullEdgeInitialized) vertexEvent;
       Edge edge = event.getEdge();
       Vertex otherVertex = event.getVertex();
       Preconditions.checkState(
+          vertex.getState() == VertexState.NEW
+              || vertex.getState() == VertexState.INITIALIZING,
+          "Unexpected state " + vertex.getState() + " for vertex: "
+              + vertex.logIdentifier);
+      Preconditions.checkState(
           (vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex) ||
           vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex)),
           "Not connected to vertex " + otherVertex.getName() + " from vertex: " + vertex.logIdentifier);
       LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() + 
           " at vertex : " + vertex.logIdentifier);
       vertex.uninitializedEdges.remove(edge);
-      vertex.startIfPossible();
+      if(vertex.getState() == VertexState.INITIALIZING && vertex.canInitVertex()) {
+        // Vertex in Initialing state and can init. Do init.
+        return VertexInitializedTransition.doTransition(vertex);
+      }
+      // Vertex is either New (waiting for sources to init) or its not ready to init (failed)
+      return vertex.getState();
     }
-
   }
 
   public static class BufferDataRecoverTransition implements
@@ -2564,7 +2565,30 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (state.equals(VertexState.FAILED)) {
         return state;
       }
-
+      // TODO move before to handle NEW state 
+      if (vertex.targetVertices != null) {
+        for (Edge e : vertex.targetVertices.values()) {
+          if (e.getEdgeManager() == null) {
+            Preconditions
+                .checkState(
+                    e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
+                    "Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
+            vertex.uninitializedEdges.add(e);
+          }
+        }
+      }
+      if (vertex.sourceVertices != null) {
+        for (Edge e : vertex.sourceVertices.values()) {
+          if (e.getEdgeManager() == null) {
+            Preconditions
+                .checkState(
+                    e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
+                    "Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
+            vertex.uninitializedEdges.add(e);
+          }
+        }
+      }
+      
       // Create tasks based on initial configuration, but don't start them yet.
       if (vertex.numTasks == -1) {
         LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
@@ -2605,16 +2629,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           if (vertex.vertexPlan.hasVertexManagerPlugin()) {
             LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
-            // set flag to send event after parallelism is set so that we can 
-            // move out of INITIALIZING state
-            vertex.sendEventWhenParallelismInitialized = true;
             return VertexState.INITIALIZING;
           }
           throw new TezUncheckedException(vertex.getVertexId() + 
           " has -1 tasks but does not have input initializers, " +
           "1-1 uninited sources or custom vertex manager to set it at runtime");
-        }                
+        }
       } else {
+        LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
+        vertex.createTasks();
         if (vertex.inputsWithInitializers != null) {
           vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
               vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
@@ -2629,17 +2652,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           LOG.info("Starting root input initializers: "
               + vertex.inputsWithInitializers.size());
+          // special case when numTasks>0 and still we want to stay in initializing 
+          // state. This is handled in RootInputInitializedTransition specially.
+          vertex.initWaitsForRootInitializers = true;
           vertex.rootInputInitializer.runInputInitializers(inputList);
-          vertex.createTasks();
           return VertexState.INITIALIZING;
+        }
+        if (!vertex.uninitializedEdges.isEmpty()) {
+          LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier);
+          return VertexState.INITIALIZING;
+        }
+        LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
+        boolean isInitialized = vertex.initializeVertex();
+        if (isInitialized) {
+          return VertexState.INITED;
         } else {
-          vertex.createTasks();
-          boolean isInitialized = vertex.initializeVertex();
-          if (isInitialized) {
-            return VertexState.INITED;
-          } else {
-            return VertexState.FAILED;
-          }
+          return VertexState.FAILED;
         }
       }
     }
@@ -2662,32 +2690,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       return false;
     }
 
-//    // Vertex will be moving to INITED state, safe to process pending route events.
-//    if (!pendingRouteEvents.isEmpty()) {
-//      VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
-//          new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
-//      pendingRouteEvents.clear();
-//    }
     return true;
   }
+  
+  void startIfPossible() {
+    if (startSignalPending) {
+      // Trigger a start event to ensure route events are seen before
+      // a start event.
+      LOG.info("Triggering start event for vertex: " + logIdentifier +
+          " with distanceFromRoot: " + distanceFromRoot );
+      eventHandler.handle(new VertexEvent(vertexId,
+          VertexEventType.V_START));
+    }
+  }
 
-  public static class VertexParallelismInitializedTransition implements
+  public static class VertexInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
     
     static VertexState doTransition(VertexImpl vertex) {
+      Preconditions.checkState(vertex.canInitVertex(), "Vertex: " + vertex.logIdentifier);
       boolean isInitialized = vertex.initializeVertexInInitializingState();
       if (!isInitialized) {
         return VertexState.FAILED;
       }
 
-      if (vertex.startSignalPending) {
-        // Trigger a start event to ensure route events are seen before
-        // a start event.
-        LOG.info("Triggering start event for vertex: " + vertex.logIdentifier +
-            " with distanceFromRoot: " + vertex.distanceFromRoot );
-        vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-            VertexEventType.V_START));
-      }
+      vertex.startIfPossible();
       return VertexState.INITED;      
     }
     
@@ -2697,31 +2724,40 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
   
+  // present in most transitions so that the initializer thread can be shutdown properly
   public static class RootInputInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event;
-
-      vertex.vertexManager.onRootVertexInitialized(
-          liInitEvent.getInputName(),
-          vertex.getAdditionalInputs().get(liInitEvent.getInputName())
-              .getDescriptor(), liInitEvent.getEvents());
+      VertexState state = vertex.getState();
+      if (state == VertexState.INITIALIZING) {
+        vertex.vertexManager.onRootVertexInitialized(
+            liInitEvent.getInputName(),
+            vertex.getAdditionalInputs().get(liInitEvent.getInputName())
+                .getDescriptor(), liInitEvent.getEvents());
+      }
 
       vertex.numInitializedInputs++;
       if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
         // All inputs initialized, shutdown the initializer.
         vertex.rootInputInitializer.shutdown();
-
-        // If RootInputs are determining parallelism, it should have been set by
-        // this point, so it's safe to checkTaskLimits and createTasks
-        Preconditions.checkState(vertex.numTasks >= 0, 
-            "Parallelism should have been set by now for vertex: " + vertex.logIdentifier);
-        return VertexParallelismInitializedTransition.doTransition(vertex);
-      } else {
-        return VertexState.INITIALIZING;
       }
+      
+      // done. check if we need to do the initialization
+      if (vertex.getState() == VertexState.INITIALIZING && 
+          vertex.initWaitsForRootInitializers) {
+        // set the wait flag to false
+        vertex.initWaitsForRootInitializers = false;
+        // initialize vertex if possible and needed
+        if (vertex.canInitVertex()) {
+          Preconditions.checkState(vertex.numTasks >= 0, 
+              "Parallelism should have been set by now for vertex: " + vertex.logIdentifier);
+          return VertexInitializedTransition.doTransition(vertex);
+        }
+      }
+      return vertex.getState();
     }
   }
 
@@ -2736,12 +2772,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       
       if (vertex.originalOneToOneSplitSource != null) {
         VertexState state = vertex.getState();
-        Preconditions.checkState((state == VertexState.INITED || state == VertexState.RUNNING), 
-            " Unexpected 1-1 split for vertex " + vertex.getVertexId() + 
-            " in state " + vertex.getState() + 
-            " . Split in vertex " + originalSplitSource + 
-            " sent by vertex " + splitEvent.getSenderVertex() +
-            " numTasks " + splitEvent.getNumTasks());
+        Preconditions
+            .checkState(
+                (state == VertexState.INITIALIZING
+                    || state == VertexState.INITED || state == VertexState.RUNNING),
+                " Unexpected 1-1 split for vertex " + vertex.getVertexId()
+                    + " in state " + vertex.getState() + " . Split in vertex "
+                    + originalSplitSource + " sent by vertex "
+                    + splitEvent.getSenderVertex() + " numTasks "
+                    + splitEvent.getNumTasks());
         if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
           // ignore another split event that may have come from a different
           // path in the DAG. We have already split because of that source
@@ -2773,7 +2812,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 " . Split in vertex " + originalSplitSource +
                 " sent by vertex " + splitEvent.getSenderVertex() +
                 " numTasks " + splitEvent.getNumTasks());
-        return VertexParallelismInitializedTransition.doTransition(vertex);
+        return vertex.getState();
       }
     }
   }
@@ -2795,64 +2834,61 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       LOG.info("Source vertex started: " + startEvent.getSourceVertexId() +
           " for vertex: " + vertex.getVertexId() + " numStartedSources: " + 
           vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
+      
+      if (vertex.numStartedSourceVertices < vertex.sourceVertices.size()) {
+        LOG.info("Cannot start vertex: " + vertex.logIdentifier + " numStartedSources: "
+            + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
+        return;
+      }
+      
+      // vertex meets external start dependency conditions. Save this signal in 
+      // case we are not ready to start now and need to start later
+      vertex.startSignalPending = true;
+      
+      if (vertex.getState() != VertexState.INITED) {
+        // vertex itself is not ready to start. External dependencies have already
+        // notified us.
+        LOG.info("Cannot start vertex. Not in inited state. "
+            + vertex.logIdentifier + " . VertesState: " + vertex.getState()
+            + " numTasks: " + vertex.numTasks + " Num uninitialized edges: "
+            + vertex.uninitializedEdges.size());
+        return;
+      }
+      
+      // vertex is inited and all dependencies are ready. Inited vertex means
+      // parallelism must be set already and edges defined
+      Preconditions.checkState(
+          (vertex.numTasks >= 0 && vertex.uninitializedEdges.isEmpty()),
+          "Cannot start vertex that is not completely defined. Vertex: "
+              + vertex.logIdentifier + " numTasks: " + vertex.numTasks);
+      
       vertex.startIfPossible();
     }
   }
 
-  boolean hasSourceVertexDependency() {
-    return (sourceVertices != null && sourceVertices.size() > 0);
-  }
-  
-  boolean canStartVertex() {
-    if ((sourceVertices == null || numStartedSourceVertices == sourceVertices.size())
-        && uninitializedEdges.isEmpty()) {
-      // vertex meets external start dependency conditions
-      if (hasSourceVertexDependency()) {
-        // this vertex is not going to receive a direct external start event from DAG
-        startSignalPending = true;
-      }
-      if (getState() != VertexState.INITED) {
-        // vertex itself is not ready to start. External dependencies have already
-        // notified us. So save that notification so that we can start when we 
-        // ourselves are ready internally.
-        LOG.info("Cannot start vertex. Not in inited state. " + logIdentifier + 
-            " . VertesState: " + getState() + " numTasks: " + numTasks);
-        return false;
-      }
-      // vertex is inited and all dependencies are ready. Inited vertex means 
-      // parallelism must be set already
-      Preconditions
-          .checkState(numTasks >= 0,
-              "Cannot start vertex without parallelism being set. "
-                  + logIdentifier);
+  boolean canInitVertex() {
+    if (numTasks >= 0 && uninitializedEdges.isEmpty() && !initWaitsForRootInitializers) {
+      // vertex fully defined
       return true;
     }
-    LOG.info("Cannot start vertex: " + logIdentifier + " numStartedSources: "
-        + numStartedSourceVertices + " numSources: "
-        + ((sourceVertices == null) ? 0 : sourceVertices.size())
-        + " numUnitializedEdges: " + uninitializedEdges.size());
+    LOG.info("Cannot init vertex: " + logIdentifier + " numTasks: " + numTasks
+        + " numUnitializedEdges: " + uninitializedEdges.size()
+        + " numInitializedInputs: " + numInitializedInputs
+        + " initWaitsForRootInitializers: " + initWaitsForRootInitializers);
     return false;
   }
   
-  void startIfPossible() {
-    if (canStartVertex()) {
-      Preconditions.checkState(getState() == VertexState.INITED, 
-          "Vertex must be inited " + logIdentifier);
-      if (startSignalPending) {
-        LOG.info("Starting vertex: " + getVertexId() +
-                 " with name: " + getName() +
-                 " with distanceFromRoot: " + distanceFromRoot );
-        eventHandler.handle(new VertexEvent(vertexId,
-            VertexEventType.V_START));
-      }
-    }
-  }
-
   public static class StartWhileInitializingTransition implements 
     SingleArcTransition<VertexImpl, VertexEvent> {
 
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
+      // vertex state machine does not start itself in the initializing state
+      // this start event can only come directly from the DAG. That means this 
+      // is a top level vertex of the dag
+      Preconditions.checkState(
+          (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()),
+          "Vertex: " + vertex.logIdentifier + " got invalid start event");
       vertex.startTimeRequested = vertex.clock.getTime();
       vertex.startSignalPending = true;
     }
@@ -2875,16 +2911,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     Preconditions.checkState(getState() == VertexState.INITED, 
         "Vertex must be inited " + logIdentifier);
 
-    if (!canStartVertex()) {
-      // this is to handle the initial vertices who are directly sent a V_START
-      // from the DAG. They may have uninitialized edges that may be initialized
-      // when the downstream vertices initialize
-      LOG.info("Received START event. Saving notification so that we can start " +
-      		"when other requirements are met for vertex: " + logIdentifier);
-      startSignalPending = true;
-      return VertexState.INITED;
-    }
-
     startedTime = clock.getTime();
     vertexManager.onVertexStarted(pendingReportedSrcCompletions);
     pendingReportedSrcCompletions.clear();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8142f3c9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 373a5e9..ab798db 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -131,6 +131,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.EdgeManagerForTest;
 import org.apache.tez.test.VertexManagerPluginForTest;
@@ -310,7 +311,12 @@ public class TestVertexImpl {
     public void handle(TaskEvent event) {
       VertexImpl vertex = vertexIdMap.get(event.getTaskID().getVertexID());
       Task task = vertex.getTask(event.getTaskID());
-      ((EventHandler<TaskEvent>)task).handle(event);
+      if (task != null) {
+        ((EventHandler<TaskEvent>)task).handle(event);
+      } else {
+        LOG.warn("Task null for vertex: " + vertex.getName() + " taskId: " +
+            event.getTaskID() + ". Please check if this is important for the test");
+      }
     }
   }
 
@@ -542,6 +548,69 @@ public class TestVertexImpl {
     return dag;
   }
   
+  private DAGPlan createDAGPlanWithInputDistributor(String initializerClassName) {
+    LOG.info("Setting up invalid dag plan with input distributor");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testVertexWithInitializer")
+        .addVertex( // simulates split distribution with known number of tasks
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setInitializerClassName(initializerClassName)
+                        .setName("input1")
+                        .setEntityDescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                              .setClassName("InputClazz")
+                              .build()
+                        )
+                        .build()
+                    )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(2)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x3.y3")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+              .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(2)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x1.y1")
+                    .build()
+                )
+                .addInEdgeId("e1")
+            .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v5"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex2")
+                .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+            )
+        .build();
+    return dag;
+  }
+  
   private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName, 
       int numTasks, boolean addNullEdge) {
     VertexPlan.Builder v1Builder = VertexPlan.newBuilder();
@@ -585,6 +654,26 @@ public class TestVertexImpl {
     }
     VertexPlan v1Plan = v1Builder.build();
     
+    VertexPlan.Builder v4Builder = VertexPlan.newBuilder();
+    v4Builder
+    .setName("vertex4")
+    .setType(PlanVertexType.NORMAL)
+    .setTaskConfig(
+        PlanTaskConfiguration.newBuilder()
+        .setNumTasks(numTasks)
+        .setVirtualCores(4)
+        .setMemoryMb(1024)
+        .setJavaOpts("")
+        .setTaskModule("x4.y4")
+        .build()
+    )
+    .addInEdgeId("e3")
+    .addInEdgeId("e4");
+    if (addNullEdge) {
+      v4Builder.addOutEdgeId("e6");
+    }
+    VertexPlan v4Plan = v4Builder.build();
+    
     LOG.info("Setting up one to one dag plan");
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder()
         .setName("testVertexOneToOneSplit")
@@ -623,23 +712,7 @@ public class TestVertexImpl {
                 .addOutEdgeId("e4")
             .build()
         )
-        .addVertex(
-            VertexPlan.newBuilder()
-                .setName("vertex4")
-                .setType(PlanVertexType.NORMAL)
-                .setTaskConfig(
-                    PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(numTasks)
-                    .setVirtualCores(4)
-                    .setMemoryMb(1024)
-                    .setJavaOpts("")
-                    .setTaskModule("x4.y4")
-                    .build()
-                )
-                .addInEdgeId("e3")
-                .addInEdgeId("e4")
-            .build()
-        )
+        .addVertex(v4Plan)
         .addEdge(
             EdgePlan.newBuilder()
                 .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
@@ -690,20 +763,35 @@ public class TestVertexImpl {
             );
     if (addNullEdge) {
       dagBuilder.addVertex(
-          VertexPlan.newBuilder()
-          .setName("vertex5")
-          .setType(PlanVertexType.NORMAL)
-          .setTaskConfig(
-              PlanTaskConfiguration.newBuilder()
-              .setNumTasks(1)
-              .setVirtualCores(4)
-              .setMemoryMb(1024)
-              .setJavaOpts("")
-              .setTaskModule("x4.y4")
+              VertexPlan.newBuilder()
+              .setName("vertex5")
+              .setType(PlanVertexType.NORMAL)
+              .setTaskConfig(
+                  PlanTaskConfiguration.newBuilder()
+                  .setNumTasks(1)
+                  .setVirtualCores(4)
+                  .setMemoryMb(1024)
+                  .setJavaOpts("")
+                  .setTaskModule("x4.y4")
+                  .build()
+              )
+              .addInEdgeId("e5")
+              .build()
+          ).addVertex(
+              VertexPlan.newBuilder()
+              .setName("vertex6")
+              .setType(PlanVertexType.NORMAL)
+              .setTaskConfig(
+                  PlanTaskConfiguration.newBuilder()
+                  .setNumTasks(1)
+                  .setVirtualCores(4)
+                  .setMemoryMb(1024)
+                  .setJavaOpts("")
+                  .setTaskModule("x4.y4")
+                  .build()
+              )
+              .addInEdgeId("e6")
               .build()
-          )
-          .addInEdgeId("e5")
-          .build()
           )
           .addEdge(
               EdgePlan.newBuilder()
@@ -716,6 +804,18 @@ public class TestVertexImpl {
                   .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                   .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
                   .build()
+          )
+          .addEdge(
+              EdgePlan.newBuilder()
+                  .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v4_v6"))
+                  .setInputVertexName("vertex4")
+                  .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                  .setOutputVertexName("vertex6")
+                  .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+                  .setId("e6")
+                  .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                  .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                  .build()
           );
     }
     DAGPlan dag = dagBuilder.build();
@@ -959,6 +1059,36 @@ public class TestVertexImpl {
   // A -> B, A -> C, B -> C
   private DAGPlan createSamplerDAGPlan(boolean customEdge) {
     LOG.info("Setting up dag plan");
+    VertexPlan.Builder vCBuilder = VertexPlan.newBuilder();
+    vCBuilder.setName("C")
+      .setType(PlanVertexType.NORMAL)
+      .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+      .addTaskLocationHint(
+        PlanTaskLocationHint.newBuilder()
+          .addHost("host3")
+          .addRack("rack3")
+          .build()
+      )
+      .setTaskConfig(
+        PlanTaskConfiguration.newBuilder()
+          .setNumTasks(customEdge ? -1 : 2)
+          .setVirtualCores(4)
+          .setMemoryMb(1024)
+          .setJavaOpts("foo")
+          .setTaskModule("x3.y3")
+          .build()
+      )
+      .setVertexManagerPlugin(
+          TezEntityDescriptorProto.newBuilder().setClassName(
+              VertexManagerPluginForTest.class.getName()))
+      .addInEdgeId("A_C")
+      .addInEdgeId("B_C");
+    if (customEdge) {
+      vCBuilder.setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+          .setClassName(VertexManagerPluginForTest.class.getName()));
+
+    }
+    VertexPlan vCPlan = vCBuilder.build();
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("TestSamplerDAG")
         .addVertex(
@@ -1010,28 +1140,7 @@ public class TestVertexImpl {
             .build()
         )
         .addVertex(
-          VertexPlan.newBuilder()
-            .setName("C")
-            .setType(PlanVertexType.NORMAL)
-            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
-            .addTaskLocationHint(
-              PlanTaskLocationHint.newBuilder()
-                .addHost("host3")
-                .addRack("rack3")
-                .build()
-            )
-            .setTaskConfig(
-              PlanTaskConfiguration.newBuilder()
-                .setNumTasks(2)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("foo")
-                .setTaskModule("x3.y3")
-                .build()
-            )
-            .addInEdgeId("A_C")
-            .addInEdgeId("B_C")
-            .build()
+          vCPlan
         )
         .addEdge(
             EdgePlan.newBuilder()
@@ -2429,7 +2538,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     
     // setting the edge manager should vertex1 should not INIT/START it since
-    // input initialization is not complete
+    // input initialization is not complete. v5 should be inited
     EdgeManagerDescriptor mockEdgeManagerDescriptor =
         new EdgeManagerDescriptor(EdgeManagerForTest.class.getName());
     Edge e = v5.sourceVertices.get(v1);
@@ -2437,7 +2546,7 @@ public class TestVertexImpl {
     e.setCustomEdgeManager(mockEdgeManagerDescriptor);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState());
     
     RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
@@ -2457,15 +2566,28 @@ public class TestVertexImpl {
     Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
     Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
-    Assert.assertEquals(VertexState.INITED, vertices.get("vertex4").getState());
     Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState());
+    // v5, v6 still initializing since edge is null
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
     
     startVertex(v1);
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
-    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex5").getState());
+    // v5, v6 still initializing since edge is null
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    
+    mockEdgeManagerDescriptor =
+        new EdgeManagerDescriptor(EdgeManagerForTest.class.getName());
+    e = vertices.get("vertex6").sourceVertices.get(vertices.get("vertex4"));
+    Assert.assertNull(e.getEdgeManager());
+    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
   }
   
   @Test(timeout = 5000)
@@ -2667,6 +2789,40 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testVertexWithInputDistributor() {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer");
+    setupPostDagCreation();
+
+    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+        .get("vertex1");
+    VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+    byte[] payload = new byte[0];
+    runner1.completeInputDistribution(payload);
+    // edge is still null so its initializing
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+    Assert.assertEquals(true, runner1.hasShutDown);    
+    Assert.assertEquals(2, v1.getTotalTasks());
+    Assert.assertEquals(payload, v1.getInputSpecList(0).get(0).getInputDescriptor().getUserPayload());
+    EdgeManagerDescriptor mockEdgeManagerDescriptor =
+        new EdgeManagerDescriptor(EdgeManagerForTest.class.getName());
+    Edge e = v2.sourceVertices.get(v1);
+    Assert.assertNull(e.getEdgeManager());
+    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(VertexState.INITED, v2.getState());
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testVertexRootInputSpecUpdateAll() {
     useCustomInitializer = true;
     setupPreDagCreation();
@@ -2847,6 +3003,15 @@ public class TestVertexImpl {
       dispatcher.await();
     }
     
+    public void completeInputDistribution(byte[] payload) {
+      List<Event> events = Lists.newArrayListWithCapacity(1);
+      RootInputUpdatePayloadEvent event = new RootInputUpdatePayloadEvent(payload);
+      events.add(event);
+      eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
+          .get(0).getEntityName(), events));
+      dispatcher.await();
+    }
+    
     public void completeInputInitialization(int targetTasks, List<TaskLocationHint> locationHints) {
       List<Event> events = Lists.newArrayListWithCapacity(targetTasks + 1);
 
@@ -2911,9 +3076,9 @@ public class TestVertexImpl {
       VertexEventType.V_START));
 
     dispatcher.await();
-    Assert.assertEquals(VertexState.INITED, vA.getState());
-    Assert.assertEquals(VertexState.INITED, vB.getState());
-    Assert.assertEquals(VertexState.INITED, vC.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vA.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
     
     // setting the edge manager should vA to start
     EdgeManagerDescriptor mockEdgeManagerDescriptor =
@@ -2923,17 +3088,20 @@ public class TestVertexImpl {
     e.setCustomEdgeManager(mockEdgeManagerDescriptor);
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, vA.getState());
-    Assert.assertEquals(VertexState.INITED, vB.getState());
-    Assert.assertEquals(VertexState.INITED, vC.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
     
-    e = vC.sourceVertices.get(vB);
-    Assert.assertNull(e.getEdgeManager());
-    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+    Map<String, EdgeManagerDescriptor> edges = Maps.newHashMap();
+    edges.put("B", mockEdgeManagerDescriptor);
+    vC.setParallelism(2, vertexLocationHint, edges, null);
 
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, vA.getState());
     Assert.assertEquals(VertexState.RUNNING, vB.getState());
     Assert.assertEquals(VertexState.RUNNING, vC.getState());
+    Assert.assertNotNull(vA.getTask(0));
+    Assert.assertNotNull(vB.getTask(0));
+    Assert.assertNotNull(vC.getTask(0));
   }
 
   @SuppressWarnings("unchecked")


Mime
View raw message