tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1145. Vertices should not start if they have uninitialized custom edges (bikas)
Date Thu, 29 May 2014 18:46:24 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master f96a53d2d -> 8e351a4a3


TEZ-1145. Vertices should not start if they have uninitialized custom edges (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8e351a4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8e351a4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8e351a4a

Branch: refs/heads/master
Commit: 8e351a4a37050eaa9c226528e97edec7a8a4337f
Parents: f96a53d
Author: Bikas Saha <bikas@apache.org>
Authored: Thu May 29 11:46:15 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu May 29 11:46:15 2014 -0700

----------------------------------------------------------------------
 .../event/VertexEventNullEdgeInitialized.java   |  42 +++++
 .../event/VertexEventSourceVertexStarted.java   |  11 +-
 .../tez/dag/app/dag/event/VertexEventType.java  |   5 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  14 --
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  43 ++++-
 .../tez/dag/app/dag/impl/NullEdgeManager.java   |  77 --------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 188 ++++++++++++++-----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 171 +++++++++++++----
 8 files changed, 370 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventNullEdgeInitialized.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventNullEdgeInitialized.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventNullEdgeInitialized.java
new file mode 100644
index 0000000..bedf28b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventNullEdgeInitialized.java
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.impl.Edge;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventNullEdgeInitialized extends VertexEvent {
+  final Edge edge;
+  final Vertex vertex;
+  
+  public VertexEventNullEdgeInitialized(TezVertexID vertexId, Edge edge, Vertex vertex) {
+    super(vertexId, VertexEventType.V_NULL_EDGE_INITIALIZED);
+    this.edge = edge;
+    this.vertex = vertex;
+  }
+  
+  public Edge getEdge() {
+    return edge;
+  }
+  
+  public Vertex getVertex() {
+    return vertex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexStarted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexStarted.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexStarted.java
index cc97aa2..5cc9c23 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexStarted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexStarted.java
@@ -22,15 +22,22 @@ import org.apache.tez.dag.records.TezVertexID;
 
 public class VertexEventSourceVertexStarted extends VertexEvent {
 
-  int sourceDistanceFromRoot;
+  final int sourceDistanceFromRoot;
+  final TezVertexID sourceVertexId;
   
-  public VertexEventSourceVertexStarted(TezVertexID vertexId, 
+  public VertexEventSourceVertexStarted(TezVertexID vertexId,
+                                         TezVertexID sourceVertexId,
                                          int distanceFromRoot) {
     super(vertexId, VertexEventType.V_SOURCE_VERTEX_STARTED);
     this.sourceDistanceFromRoot = distanceFromRoot;
+    this.sourceVertexId = sourceVertexId;
   }
   
   public int getSourceDistanceFromRoot() {
     return sourceDistanceFromRoot;
   }
+  
+  public TezVertexID getSourceVertexId() {
+    return sourceVertexId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 69952d9..0f2c145 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -61,6 +61,9 @@ public enum VertexEventType {
   V_RECOVER,
 
   // Recover Event, Producer:Vertex
-  V_SOURCE_VERTEX_RECOVERED
+  V_SOURCE_VERTEX_RECOVERED,
+  
+  // Producer: Edge
+  V_NULL_EDGE_INITIALIZED
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 71d92b0..61432c3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -54,9 +54,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -1229,18 +1227,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       EdgeProperty edgeProperty = DagTypeConverters
           .createEdgePropertyMapFromDAGPlan(edgePlan);
 
-      // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
-      // referencing the fake edge manager within the API module.
-      if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
-          && edgeProperty.getEdgeManagerDescriptor() == null) {
-        EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
-            NullEdgeManager.class.getName());
-        EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
-            edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
-            edgeProperty.getEdgeDestination());
-        edgeProperty = ep;
-      }
-
       // edge manager may be also set via API when using custom edge type
       dag.edges.put(edgePlan.getId(),
           new Edge(edgeProperty, dag.getEventHandler()));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 2e07a4d..4a9ea65 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -36,6 +36,7 @@ import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
@@ -49,6 +50,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -116,8 +118,10 @@ public class Edge {
         edgeManager = new ScatterGatherEdgeManager();
         break;
       case CUSTOM:
-        String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
-        edgeManager = RuntimeUtils.createClazzInstance(edgeManagerClassName);
+        if (edgeProperty.getEdgeManagerDescriptor() != null) {
+          String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
+          edgeManager = RuntimeUtils.createClazzInstance(edgeManagerClassName);
+        }
         break;
       default:
         String message = "Unknown edge data movement type: "
@@ -129,11 +133,16 @@ public class Edge {
   public void initialize() {
     byte[] bb = null;
     if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM) {
-      bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
+      if (edgeProperty.getEdgeManagerDescriptor() != null && 
+          edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) {
+        bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
+      }
     }
     edgeManagerContext = new EdgeManagerContextImpl(sourceVertex.getName(),
         destinationVertex.getName(), bb);
-    edgeManager.initialize(edgeManagerContext);
+    if (edgeManager != null) {
+      edgeManager.initialize(edgeManagerContext);
+    }
     destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT, 
         destinationVertex.getName(), 
         sourceVertex.getName(), 
@@ -148,8 +157,13 @@ public class Edge {
             edgeProperty.getEdgeSource(),
             edgeProperty.getEdgeDestination());
     this.edgeProperty = modifiedEdgeProperty;
+    boolean wasUnInitialized = (edgeManager == null);
     createEdgeManager();
     initialize();
+    if (wasUnInitialized) {
+      sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this, destinationVertex));
+      sendEvent(new VertexEventNullEdgeInitialized(destinationVertex.getVertexId(), this,
sourceVertex));
+    }
   }
 
   public EdgeProperty getEdgeProperty() {
@@ -178,6 +192,8 @@ public class Edge {
   }
 
   public InputSpec getDestinationSpec(int destinationTaskIndex) {
+    Preconditions.checkState(edgeManager != null, 
+        "Edge Manager must be initialized by this time");
     return new InputSpec(sourceVertex.getName(),
         edgeProperty.getEdgeDestination(),
         edgeManager.getNumDestinationTaskPhysicalInputs(sourceVertex.getTotalTasks(),
@@ -185,6 +201,8 @@ public class Edge {
   }
 
   public OutputSpec getSourceSpec(int sourceTaskIndex) {
+    Preconditions.checkState(edgeManager != null, 
+        "Edge Manager must be initialized by this time");
     return new OutputSpec(destinationVertex.getName(),
         edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskPhysicalOutputs(
         destinationVertex.getTotalTasks(), sourceTaskIndex));
@@ -207,8 +225,9 @@ public class Edge {
     sourceEventBuffer.clear();
   }
   
-  @SuppressWarnings("unchecked")
   public void sendTezEventToSourceTasks(TezEvent tezEvent) {
+    Preconditions.checkState(edgeManager != null, 
+        "Edge Manager must be initialized by this time");
     if (!bufferEvents.get()) {
       switch (tezEvent.getEventType()) {
       case INPUT_READ_ERROR_EVENT:
@@ -233,7 +252,7 @@ public class Edge {
         int taskAttemptIndex = event.getVersion();
         TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance(srcTaskId,
             taskAttemptIndex);
-        eventHandler.handle(new TaskAttemptEventOutputFailed(srcTaskAttemptId,
+        sendEvent(new TaskAttemptEventOutputFailed(srcTaskAttemptId,
             tezEvent, numConsumers));
         break;
       default:
@@ -259,6 +278,8 @@ public class Edge {
   void sendDmEventOrIfEventToTasks(TezEvent tezEvent, int srcTaskIndex,
       boolean isDataMovementEvent,
       Map<Integer, List<Integer>> ifInputIndicesToTaskIndices) {
+    Preconditions.checkState(edgeManager != null, 
+        "Edge Manager must be initialized by this time");
     int num = 0;
     Event event = tezEvent.getEvent();
     for (Map.Entry<Integer, List<Integer>> entry : ifInputIndicesToTaskIndices.entrySet())
{
@@ -301,6 +322,8 @@ public class Edge {
   }
   
   public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
+    Preconditions.checkState(edgeManager != null, 
+        "Edge Manager must be initialized by this time");
     if (!bufferEvents.get()) {
       boolean isDataMovementEvent = true;
       switch (tezEvent.getEventType()) {
@@ -345,9 +368,13 @@ public class Edge {
     }
   }
   
-  @SuppressWarnings("unchecked")
   private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
-    eventHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
+    sendEvent(new TaskEventAddTezEvent(taskId, tezEvent));
+  }
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void sendEvent(org.apache.hadoop.yarn.event.Event event) {
+    eventHandler.handle(event);
   }
 
   public String getSourceVertexName() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
deleted file mode 100644
index 02f5154..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.impl;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.tez.dag.api.EdgeManager;
-import org.apache.tez.dag.api.EdgeManagerContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-
-public class NullEdgeManager implements EdgeManager {
-
-  public NullEdgeManager() {
-  }
-
-  @Override
-  public void initialize(EdgeManagerContext edgeManagerContext) {
-  }
-
-  @Override
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, int destinationTaskIndex)
{
-    throw new UnsupportedOperationException(
-        "Cannot route events. EdgeManager should have been replaced at runtime");
-  }
-
-  @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex)
{
-    throw new UnsupportedOperationException(
-        "Cannot route events. EdgeManager should have been replaced at runtime");
-  }
-
-  @Override
-  public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex,
-      int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices)
{
-    throw new UnsupportedOperationException(
-        "Cannot route events. EdgeManager should have been replaced at runtime");
-  }
-
-  @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks)
{
-    throw new UnsupportedOperationException(
-        "Cannot route events. EdgeManager should have been replaced at runtime");
-  }
-
-  @Override
-  public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex)
{
-    throw new UnsupportedOperationException(
-        "Cannot route events. EdgeManager should have been replaced at runtime");
-  }
-
-  @Override
-  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
-      int numDestinationTasks,
-      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
-    throw new UnsupportedOperationException(
-        "Cannot route events. EdgeManager should have been replaced at runtime");
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0b46fbb..a58fddc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -57,7 +56,6 @@ import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
-import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -102,6 +100,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
@@ -237,6 +236,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                       VertexState.INITIALIZING, VertexState.FAILED),
                   VertexEventType.V_INIT,
                   new InitTransition())
+          .addTransition(VertexState.NEW, VertexState.NEW,
+                  VertexEventType.V_NULL_EDGE_INITIALIZED,
+                  new NullEdgeInitializedTransition())
           .addTransition
               (VertexState.NEW,
                   EnumSet.of(VertexState.NEW, VertexState.INITED,
@@ -247,6 +249,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_RECOVER,
                   new StartRecoverTransition())
           .addTransition
+              (VertexState.NEW,
+                  EnumSet.of(VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.RUNNING,
+                      VertexState.SUCCEEDED, VertexState.FAILED,
+                      VertexState.KILLED, VertexState.ERROR,
+                      VertexState.RECOVERING),
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+                  new RecoverTransition())
+          .addTransition(VertexState.NEW, VertexState.NEW,
+              VertexEventType.V_SOURCE_VERTEX_STARTED,
+              new SourceVertexStartedTransition())
+          .addTransition(VertexState.NEW, VertexState.KILLED,
+              VertexEventType.V_TERMINATE,
+              new TerminateNewVertexTransition())
+          .addTransition(VertexState.NEW, VertexState.ERROR,
+              VertexEventType.V_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          .addTransition
               (VertexState.RECOVERING,
                   EnumSet.of(VertexState.NEW, VertexState.INITED,
                       VertexState.INITIALIZING, VertexState.RUNNING,
@@ -266,29 +286,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               (VertexState.RECOVERING, VertexState.RECOVERING,
                   VertexEventType.V_TERMINATE,
                   new TerminateDuringRecoverTransition())
-          .addTransition
-              (VertexState.NEW,
-                  EnumSet.of(VertexState.INITED,
-                      VertexState.INITIALIZING, VertexState.RUNNING,
-                      VertexState.SUCCEEDED, VertexState.FAILED,
-                      VertexState.KILLED, VertexState.ERROR,
-                      VertexState.RECOVERING),
-                  VertexEventType.V_SOURCE_VERTEX_RECOVERED,
-                  new RecoverTransition())
-          .addTransition
-              (VertexState.INITED,
-                  EnumSet.of(VertexState.INITED, VertexState.ERROR),
-                  VertexEventType.V_INIT,
-                  new IgnoreInitInInitedTransition())
-          .addTransition(VertexState.NEW, VertexState.NEW,
-              VertexEventType.V_SOURCE_VERTEX_STARTED,
-              new SourceVertexStartedTransition())
-          .addTransition(VertexState.NEW, VertexState.KILLED,
-              VertexEventType.V_TERMINATE,
-              new TerminateNewVertexTransition())
-          .addTransition(VertexState.NEW, VertexState.ERROR,
-              VertexEventType.V_INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
+          .addTransition(VertexState.RECOVERING, VertexState.RECOVERING,
+                  VertexEventType.V_NULL_EDGE_INITIALIZED,
+                  new NullEdgeInitializedTransition())
 
           // Transitions from INITIALIZING state
           .addTransition(VertexState.INITIALIZING,
@@ -321,10 +321,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITIALIZING, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
+                  VertexEventType.V_NULL_EDGE_INITIALIZED,
+                  new NullEdgeInitializedTransition())
 
           // Transitions from INITED state
           // SOURCE_VERTEX_STARTED - for sources which determine parallelism, 
           // they must complete before this vertex can start.
+          .addTransition
+              (VertexState.INITED,
+                  EnumSet.of(VertexState.INITED, VertexState.ERROR),
+                  VertexEventType.V_INIT,
+                  new IgnoreInitInInitedTransition())
           .addTransition(VertexState.INITED, VertexState.INITED,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
@@ -335,7 +343,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED,  VertexState.INITED,
               VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
-          .addTransition(VertexState.INITED, VertexState.RUNNING,
+          .addTransition(VertexState.INITED, 
+              EnumSet.of(VertexState.RUNNING, VertexState.INITED),
               VertexEventType.V_START,
               new StartTransition())
           .addTransition(VertexState.INITED,
@@ -347,6 +356,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(VertexState.INITED, VertexState.INITED,
+              VertexEventType.V_NULL_EDGE_INITIALIZED,
+              new NullEdgeInitializedTransition())
 
           // Transitions from RUNNING state
           .addTransition(VertexState.RUNNING, VertexState.RUNNING,
@@ -526,6 +538,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @VisibleForTesting
   Map<Vertex, Edge> sourceVertices;
   private Map<Vertex, Edge> targetVertices;
+  Set<Edge> uninitializedEdges = Sets.newHashSet();
 
   private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs;
   private Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> additionalOutputs;
@@ -1603,6 +1616,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private VertexState initializeVertex() {
+    if (targetVertices != null) {
+      for (Edge e : targetVertices.values()) {
+        if (e.getEdgeManager() == null) {
+          Preconditions
+              .checkState(
+                  e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
+                  "Null edge manager allowed only for custom edge. " + logIdentifier);
+          uninitializedEdges.add(e);
+        }
+      }
+    }
+    if (sourceVertices != null) {
+      for (Edge e : sourceVertices.values()) {
+        if (e.getEdgeManager() == null) {
+          Preconditions
+              .checkState(
+                  e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
+                  "Null edge manager allowed only for custom edge. " + logIdentifier);
+          uninitializedEdges.add(e);
+        }
+      }
+    }
     try {
       initializeCommitters();
     } catch (Exception e) {
@@ -2114,6 +2149,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   }
 
+  public static class NullEdgeInitializedTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
+      VertexEventNullEdgeInitialized event = (VertexEventNullEdgeInitialized) vertexEvent;
+      Edge edge = event.getEdge();
+      Vertex otherVertex = event.getVertex();
+      Preconditions.checkState(
+          (vertex.sourceVertices == null || vertex.sourceVertices.containsKey(otherVertex)
||
+          vertex.targetVertices == null || vertex.targetVertices.containsKey(otherVertex)),
+          "Not connected to vertex " + otherVertex.getName() + " from vertex: " + vertex.logIdentifier);
+      LOG.info("Edge initialized for connection to vertex " + otherVertex.getName() + 
+          " at vertex : " + vertex.logIdentifier);
+      vertex.uninitializedEdges.remove(edge);
+      vertex.startIfPossible();
+    }
+
+  }
+
   public static class BufferDataRecoverTransition implements
       SingleArcTransition<VertexImpl, VertexEvent> {
 
@@ -2650,14 +2705,45 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.distanceFromRoot = distanceFromRoot;
       }
       vertex.numStartedSourceVertices++;
-      if (vertex.numStartedSourceVertices == vertex.sourceVertices.size()) {
-        // Consider inlining this.
-        LOG.info("Starting vertex: " + vertex.getVertexId() +
-                 " with name: " + vertex.getName() +
-                 " with distanceFromRoot: " + vertex.distanceFromRoot );
-        vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-            VertexEventType.V_START));
-      }
+      LOG.info("Source vertex started: " + startEvent.getSourceVertexId() +
+          " for vertex: " + vertex.getVertexId() + " numStartedSources: " + 
+          vertex.numStartedSourceVertices);
+      vertex.startIfPossible();
+    }
+  }
+
+  boolean canStartVertex() {
+    if (getState() != VertexState.INITED) {
+      LOG.info("Cannot start vertex. Not in inited state. " + logIdentifier + 
+          " . VertesState: " + getState());
+      return false;
+    }
+    if ((sourceVertices == null || numStartedSourceVertices == sourceVertices.size())
+        && uninitializedEdges.isEmpty()) {
+      // vertex is inited and all dependencies are ready. Inited vertex means 
+      // parallelism must be set already
+      Preconditions
+          .checkState(numTasks >= 0,
+              "Cannot start vertex without parallelism being set. "
+                  + logIdentifier);
+      return true;
+    }
+    LOG.info("Cannot start vertex: " + logIdentifier + " numStartedSources: "
+        + numStartedSourceVertices + " numSources: "
+        + ((sourceVertices == null) ? 0 : sourceVertices.size())
+        + " numUnitializedEdges: " + uninitializedEdges.size());
+    return false;
+  }
+  
+  void startIfPossible() {
+    if (canStartVertex()) {
+      Preconditions.checkState(getState() == VertexState.INITED, 
+          "Vertex must be inited " + logIdentifier);
+      LOG.info("Starting vertex: " + getVertexId() +
+               " with name: " + getName() +
+               " with distanceFromRoot: " + distanceFromRoot );
+      eventHandler.handle(new VertexEvent(vertexId,
+          VertexEventType.V_START));
     }
   }
 
@@ -2672,20 +2758,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   }
 
-  public static class StartTransition
-  implements SingleArcTransition<VertexImpl, VertexEvent> {
-    /**
-     * This transition executes in the event-dispatcher thread, though it's
-     * triggered in MRAppMaster's startJobs() method.
-     */
-    @Override
-    public void transition(VertexImpl vertex, VertexEvent event) {
-      vertex.startTimeRequested  = vertex.clock.getTime();
-      vertex.startVertex();
+  public static class StartTransition implements
+    MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+  
+  @Override
+  public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      Preconditions.checkState(vertex.getState() == VertexState.INITED, 
+          "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier);
+      vertex.startTimeRequested = vertex.clock.getTime();
+      return vertex.startVertex();
     }
   }
 
-  private void startVertex() {
+  private VertexState startVertex() {
+    Preconditions.checkState(getState() == VertexState.INITED, 
+        "Vertex must be inited " + logIdentifier);
+
+    if (!canStartVertex()) {
+      // this is to handle the initial vertices who are directly sent a V_START
+      // from the DAG. They may have uninitialized edges that may be initialized
+      // when the downstream vertices initialize
+      return VertexState.INITED;
+    }
+
     startedTime = clock.getTime();
     vertexManager.onVertexStarted(pendingReportedSrcCompletions);
     pendingReportedSrcCompletions.clear();
@@ -2701,7 +2796,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (targetVertices != null) {
       for (Vertex targetVertex : targetVertices.keySet()) {
         eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
-            .getVertexId(), distanceFromRoot));
+            .getVertexId(), getVertexId(), distanceFromRoot));
       }
     }
 
@@ -2710,6 +2805,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       eventHandler.handle(new VertexEvent(
         this.vertexId, VertexEventType.V_COMPLETED));
     }
+    return VertexState.RUNNING;
   }
 
   private void abortVertex(final VertexStatus.State finalState) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8e351a4a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 663e23c..f2b6a64 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -474,12 +474,16 @@ public class TestVertexImpl {
     return dag;
   }
   
-  private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName, int numTasks)
{
+  private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName, 
+      int numTasks, boolean addNullEdge) {
     VertexPlan.Builder v1Builder = VertexPlan.newBuilder();
     v1Builder.setName("vertex1")
     .setType(PlanVertexType.NORMAL)
     .addOutEdgeId("e1")
     .addOutEdgeId("e2");
+    if (addNullEdge) {
+      v1Builder.addOutEdgeId("e5");
+    }
     if (initializerClassName != null) {
       numTasks = -1;
       v1Builder.addInputs(
@@ -514,7 +518,7 @@ public class TestVertexImpl {
     VertexPlan v1Plan = v1Builder.build();
     
     LOG.info("Setting up one to one dag plan");
-    DAGPlan dag = DAGPlan.newBuilder()
+    DAGPlan.Builder dagBuilder = DAGPlan.newBuilder()
         .setName("testVertexOneToOneSplit")
         .addVertex(v1Plan)
         .addVertex(
@@ -614,9 +618,39 @@ public class TestVertexImpl {
                 .setId("e4")
                 .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                .build()
-        )
-    .build();
+                
+            );
+    if (addNullEdge) {
+      dagBuilder.addVertex(
+          VertexPlan.newBuilder()
+          .setName("vertex5")
+          .setType(PlanVertexType.NORMAL)
+          .setTaskConfig(
+              PlanTaskConfiguration.newBuilder()
+              .setNumTasks(1)
+              .setVirtualCores(4)
+              .setMemoryMb(1024)
+              .setJavaOpts("")
+              .setTaskModule("x4.y4")
+              .build()
+          )
+          .addInEdgeId("e5")
+          .build()
+          )
+          .addEdge(
+              EdgePlan.newBuilder()
+                  .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v5"))
+                  .setInputVertexName("vertex1")
+                  .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                  .setOutputVertexName("vertex5")
+                  .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+                  .setId("e5")
+                  .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                  .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                  .build()
+          );
+    }
+    DAGPlan dag = dagBuilder.build();
     return dag;
   }
 
@@ -855,7 +889,7 @@ public class TestVertexImpl {
 
   // Create a plan with 3 vertices: A, B, C
   // A -> B, A -> C, B -> C
-  private DAGPlan createSamplerDAGPlan() {
+  private DAGPlan createSamplerDAGPlan(boolean customEdge) {
     LOG.info("Setting up dag plan");
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("TestSamplerDAG")
@@ -937,7 +971,7 @@ public class TestVertexImpl {
                 .setInputVertexName("A")
                 .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class"))
                 .setOutputVertexName("B")
-                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
                 .setId("A_B")
                 .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -949,7 +983,9 @@ public class TestVertexImpl {
                 .setInputVertexName("A")
                 .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_C.class"))
                 .setOutputVertexName("C")
-                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setDataMovementType(
+                    customEdge ? PlanEdgeDataMovementType.CUSTOM
+                        : PlanEdgeDataMovementType.SCATTER_GATHER)
                 .setId("A_C")
                 .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -961,7 +997,9 @@ public class TestVertexImpl {
                 .setInputVertexName("B")
                 .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
                 .setOutputVertexName("C")
-                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setDataMovementType(
+                    customEdge ? PlanEdgeDataMovementType.CUSTOM
+                        : PlanEdgeDataMovementType.SCATTER_GATHER)
                 .setId("B_C")
                 .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -1470,9 +1508,9 @@ public class TestVertexImpl {
     Assert.assertEquals(2, tasks.size());
     TezTaskID firstTask = tasks.keySet().iterator().next();
 
-    startVertex(v3);
-
-    Vertex v1 = vertices.get("vertex1");
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(vertices.get("vertex2"));
+    startVertex(v1);
     EdgeManagerDescriptor mockEdgeManagerDescriptor =
         new EdgeManagerDescriptor(EdgeManagerForTest.class.getName());
 
@@ -1495,9 +1533,9 @@ public class TestVertexImpl {
     initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
     VertexImpl v2 = vertices.get("vertex2");
+    VertexImpl v1 = vertices.get("vertex1");
     
-    startVertex(v2);
-    startVertex(v3);
+    startVertex(v1);
     
     TezTaskID t0_v2 = TezTaskID.getInstance(v2.getVertexId(), 0);
     TezTaskAttemptID ta0_t0_v2 = TezTaskAttemptID.getInstance(t0_v2, 0);
@@ -1636,21 +1674,26 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexKillDiagnostics() {
+  public void testVertexKillDiagnosticsInInit() {
     initAllVertices(VertexState.INITED);
-    VertexImpl v2 = vertices.get("vertex2");
+    VertexImpl v2 = vertices.get("vertex4");
     killVertex(v2);
     String diagnostics =
         StringUtils.join(",", v2.getDiagnostics()).toLowerCase();
     LOG.info("diagnostics v2: " + diagnostics);
     Assert.assertTrue(diagnostics.contains(
         "vertex received kill in inited state"));
+  }
 
+  @Test(timeout = 5000)
+  public void testVertexKillDiagnosticsInRunning() {
+    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
 
-    startVertex(v3);
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
     killVertex(v3);
-    diagnostics =
+    String diagnostics =
         StringUtils.join(",", v3.getDiagnostics()).toLowerCase();
     Assert.assertTrue(diagnostics.contains(
         "vertex received kill while in running state"));
@@ -1757,7 +1800,8 @@ public class TestVertexImpl {
 
     VertexImpl v = vertices.get("vertex6");
 
-    startVertex(v);
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
     CountingOutputCommitter committer =
         (CountingOutputCommitter) v.getOutputCommitter("outputx");
 
@@ -1785,16 +1829,14 @@ public class TestVertexImpl {
     LOG.info("Testing testSourceVertexStartHandling");
     initAllVertices(VertexState.INITED);
 
-    VertexImpl v4 = vertices.get("vertex4");
-    VertexImpl v5 = vertices.get("vertex5");
     VertexImpl v6 = vertices.get("vertex6");
 
-    startVertex(v4);
-    startVertex(v5);
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
     dispatcher.await();
     LOG.info("Verifying v6 state " + v6.getState());
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
-    Assert.assertEquals(1, v6.getDistanceFromRoot());
+    Assert.assertEquals(3, v6.getDistanceFromRoot());
   }
 
   @Test(timeout = 5000)
@@ -1821,8 +1863,8 @@ public class TestVertexImpl {
     VertexImpl v5 = vertices.get("vertex5");
     VertexImpl v6 = vertices.get("vertex6");
 
-    startVertex(v4);
-    startVertex(v5);
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
     dispatcher.await();
     LOG.info("Verifying v6 state " + v6.getState());
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
@@ -2006,7 +2048,8 @@ public class TestVertexImpl {
 
     VertexImpl v = vertices.get("vertex6");
 
-    startVertex(v);
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
     CountingOutputCommitter committer =
         (CountingOutputCommitter) v.getOutputCommitter("outputx");
 
@@ -2032,7 +2075,9 @@ public class TestVertexImpl {
 
     VertexImpl v = vertices.get("vertex6");
 
-    startVertex(v);
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
     CountingOutputCommitter committer =
         (CountingOutputCommitter) v.getOutputCommitter("outputx");
 
@@ -2177,14 +2222,28 @@ public class TestVertexImpl {
     // vertex with 2 incoming splits from the same source should split once
     useCustomInitializer = true;
     setupPreDagCreation();
-    dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer", -1);
+    dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer", -1, true);
     setupPostDagCreation();
-    initAllVertices(VertexState.INITIALIZING);
     
     int numTasks = 5;
     VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
         .get("vertex1");
+    VertexImpl v5 = vertices.get("vertex5");
+    initVertex(v1);
+    
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+    
+    // setting the edge manager should vertex1 should not INIT/START it since
+    // input initialization is not complete
+    EdgeManagerDescriptor mockEdgeManagerDescriptor =
+        new EdgeManagerDescriptor(EdgeManagerForTest.class.getName());
+    Edge e = v5.sourceVertices.get(v1);
+    Assert.assertNull(e.getEdgeManager());
+    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+
+    
     RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
     runner1.completeInputInitialization(numTasks, v1Hints);
@@ -2204,12 +2263,14 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
     Assert.assertEquals(VertexState.INITED, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState());
     
     startVertex(v1);
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex5").getState());
   }
   
   @Test(timeout = 5000)
@@ -2217,7 +2278,7 @@ public class TestVertexImpl {
     int numTasks = 5;
     // create a diamond shaped dag with 1-1 edges. 
     setupPreDagCreation();
-    dagPlan = createDAGPlanForOneToOneSplit(null, numTasks);
+    dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
     setupPostDagCreation();
     VertexImpl v1 = vertices.get("vertex1");
     initAllVertices(VertexState.INITED);
@@ -2252,7 +2313,7 @@ public class TestVertexImpl {
     int numTasks = 5;
     // create a diamond shaped dag with 1-1 edges. 
     setupPreDagCreation();
-    dagPlan = createDAGPlanForOneToOneSplit(null, numTasks);
+    dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
     setupPostDagCreation();
     VertexImpl v1 = vertices.get("vertex1");
     initAllVertices(VertexState.INITED);
@@ -2539,11 +2600,55 @@ public class TestVertexImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testStartWithUninitializedCustomEdge() {
+    // Race when a source vertex manages to start before the target vertex has
+    // been initialized
+    setupPreDagCreation();
+    dagPlan = createSamplerDAGPlan(true);
+    setupPostDagCreation();
+
+    VertexImpl vA = vertices.get("A");
+    VertexImpl vB = vertices.get("B");
+    VertexImpl vC = vertices.get("C");
+
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_INIT));
+    dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+      VertexEventType.V_START));
+
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, vA.getState());
+    Assert.assertEquals(VertexState.INITED, vB.getState());
+    Assert.assertEquals(VertexState.INITED, vC.getState());
+    
+    // setting the edge manager should vA to start
+    EdgeManagerDescriptor mockEdgeManagerDescriptor =
+        new EdgeManagerDescriptor(EdgeManagerForTest.class.getName());
+    Edge e = vC.sourceVertices.get(vA);
+    Assert.assertNull(e.getEdgeManager());
+    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.INITED, vB.getState());
+    Assert.assertEquals(VertexState.INITED, vC.getState());
+    
+    e = vC.sourceVertices.get(vB);
+    Assert.assertNull(e.getEdgeManager());
+    e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, vA.getState());
+    Assert.assertEquals(VertexState.RUNNING, vB.getState());
+    Assert.assertEquals(VertexState.RUNNING, vC.getState());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
   public void testInitStartRace() {
     // Race when a source vertex manages to start before the target vertex has
     // been initialized
     setupPreDagCreation();
-    dagPlan = createSamplerDAGPlan();
+    dagPlan = createSamplerDAGPlan(false);
     setupPostDagCreation();
 
     VertexImpl vA = vertices.get("A");


Mime
View raw message