tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/3] TEZ-46. Add compute capability to Inputs specified directly on a Vertex. (sseth)
Date Wed, 09 Oct 2013 17:32:54 GMT
Updated Branches:
  refs/heads/master 1d78f2300 -> d41471fc6


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 96f0785..ec37eb8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -67,7 +67,7 @@ import org.apache.tez.dag.api.committer.NullVertexOutputCommitter;
 import org.apache.tez.dag.api.committer.VertexContext;
 import org.apache.tez.dag.api.committer.VertexOutputCommitter;
 import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.api.records.DAGProtos.NamedDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
@@ -75,6 +75,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.RootInputInitializerRunner;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
@@ -95,6 +96,8 @@ import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
@@ -121,8 +124,12 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
 
 
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
@@ -195,7 +202,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Transitions from NEW state
           .addTransition
               (VertexState.NEW,
-              EnumSet.of(VertexState.INITED, VertexState.FAILED),
+              EnumSet.of(VertexState.INITED, VertexState.INITIALIZING,
+                  VertexState.FAILED),
               VertexEventType.V_INIT,
               new InitTransition())
           .addTransition(VertexState.NEW, VertexState.KILLED,
@@ -205,7 +213,32 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
+          // Transitions from INITIALIZING state
+          .addTransition(VertexState.INITIALIZING,
+              EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.RUNNING),
+              VertexEventType.V_ROOT_INPUT_INITIALIZED,
+              new RootInputInitializedTransition())
+          .addTransition(VertexState.INITIALIZING, VertexState.FAILED,
+              VertexEventType.V_ROOT_INPUT_FAILED,
+              new RootInputInitFailedTransition())
+          .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
+              VertexEventType.V_START,
+              new StartWhileInitingTransition())
+          .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
+              VertexEventType.V_SOURCE_VERTEX_STARTED,
+              new SourceVertexStartedTransition())
+          .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
+              VertexEventType.V_ROUTE_EVENT,
+              new RouteEventsWhileInitializingTransition())
+          .addTransition(VertexState.INITIALIZING, VertexState.KILLED,
+              VertexEventType.V_TERMINATE,
+              new TerminateInitingVertexTransition())
+          .addTransition(VertexState.INITIALIZING, VertexState.ERROR,
+              VertexEventType.V_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+              
           // Transitions from INITED state
+              // SOURCE_VERTEX_STARTED - for srces which detemrine parallelism, they must
complete before this vertex can start.
           .addTransition(VertexState.INITED, VertexState.INITED,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
@@ -215,8 +248,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED,
               VertexState.INITED, VertexEventType.V_ROUTE_EVENT,
               ROUTE_EVENT_TRANSITION)
-
-
           .addTransition(VertexState.INITED, VertexState.KILLED,
               VertexEventType.V_TERMINATE,
               new TerminateInitedVertexTransition())
@@ -298,7 +329,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_TASK_COMPLETED))
+                  VertexEventType.V_TASK_COMPLETED,
+                  VertexEventType.V_ROOT_INPUT_INITIALIZED,
+                  VertexEventType.V_ROOT_INPUT_FAILED))
 
           // Transitions from KILLED state
           .addTransition(
@@ -313,7 +346,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_TASK_COMPLETED))
+                  VertexEventType.V_TASK_COMPLETED,
+                  VertexEventType.V_ROOT_INPUT_INITIALIZED,
+                  VertexEventType.V_ROOT_INPUT_FAILED))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
@@ -327,8 +362,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_RESCHEDULED,
-                  VertexEventType.V_DIAGNOSTIC_UPDATE,
-                  VertexEventType.V_INTERNAL_ERROR))
+                  VertexEventType.V_INTERNAL_ERROR,
+                  VertexEventType.V_ROOT_INPUT_INITIALIZED,
+                  VertexEventType.V_ROOT_INPUT_FAILED))
           // create the topology tables
           .installTopology();
 
@@ -342,8 +378,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private int failedTaskCount = 0;
   private int killedTaskCount = 0;
 
-  private long initTime;
-  private long startTime;
+  private long initTimeRequested; // Time at which INIT request was received.
+  private long initedTime; // Time when entering state INITED
+  private long startTimeRequested; // Time at which START request was received.
+  private long startedTime; // Time when entering state STARTED
   private long finishTime;
   private float progress;
 
@@ -362,9 +400,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   Map<Vertex, Edge> sourceVertices;
   private Map<Vertex, Edge> targetVertices;
 
+  private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs;
+  private Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> additionalOutputs;
   private final List<InputSpec> additionalInputSpecs = new ArrayList<InputSpec>();
   private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
+  private Set<String> inputsWithInitializers;
+  private int numInitializedInputs;
+  private boolean startSignalPending = false;
+  List<TezEvent> pendingRouteEvents = null;
 
+  private RootInputInitializerRunner rootInputInitializer;
+  
   private VertexScheduler vertexScheduler;
 
   private VertexOutputCommitter committer;
@@ -663,6 +709,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return false;
   }
 
+  @Override
   public VertexTerminationCause getTerminationCause(){
     readLock.lock();
     try {
@@ -843,22 +890,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
 
   void logJobHistoryVertexStartedEvent() {
-    VertexStartedEvent startEvt = new VertexStartedEvent(vertexId,
-        vertexName, initTime, startTime, numTasks, getProcessorName());
+    VertexStartedEvent startEvt = new VertexStartedEvent(vertexId, vertexName,
+        initTimeRequested, initedTime, startTimeRequested, startedTime, numTasks,
+        getProcessorName());
     this.eventHandler.handle(new DAGHistoryEvent(startEvt));
   }
 
   void logJobHistoryVertexFinishedEvent() {
     this.setFinishTime();
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
-        vertexName, startTime, finishTime, VertexStatus.State.SUCCEEDED, "",
+        vertexName, startedTime, finishTime, VertexStatus.State.SUCCEEDED, "",
         getAllCounters());
     this.eventHandler.handle(new DAGHistoryEvent(finishEvt));
   }
 
   void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
-        vertexName, startTime, clock.getTime(), state,
+        vertexName, startedTime, clock.getTime(), state,
         StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
         getAllCounters());
     this.eventHandler.handle(new DAGHistoryEvent(finishEvt));
@@ -963,13 +1011,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  VertexState finished(VertexState finalState) {
+  VertexState finished(VertexState finalState,
+      VertexTerminationCause terminationCause) {
     if (finishTime == 0) setFinishTime();
 
     switch (finalState) {
       case KILLED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
-            finalState));
+            finalState, terminationCause));
         logJobHistoryVertexFailedEvent(VertexStatus.State.KILLED);
         break;
       case ERROR:
@@ -979,7 +1028,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         break;
       case FAILED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
-            finalState));
+            finalState, terminationCause));
         logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
         break;
       case SUCCEEDED:
@@ -992,131 +1041,230 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     return finalState;
   }
+  
+  VertexState finished(VertexState finalState) {
+    return finished(finalState, null);
+  }
 
-  public static class InitTransition
-      implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      try {
-
-        // TODODAGAM
-        // TODO: Splits?
+  private VertexState initializeVertex() {
+    // FIXME how do we decide vertex needs a committer?
+    // Answer: Do commit for every vertex
+    // for now, only for leaf vertices
+    // TODO TEZ-41 make commmitter type configurable per vertex
+    
+    if (targetVertices.isEmpty()) {
+      committer = new MRVertexOutputCommitter();
+    }
+    try {
+      committer.init(vertexContext);
+      committer.setupVertex();
+    } catch (IOException e) {
+      LOG.warn("Vertex init failed", e);
+      addDiagnostic("Job init failed : "
+          + StringUtils.stringifyException(e));
+      trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+      abortVertex(VertexStatus.State.FAILED);
+      // TODO: Metrics
+      //job.metrics.endPreparingJob(vertex);
+      return finished(VertexState.FAILED);
+    }
 
-        vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
+    // TODO: Metrics
+    //vertex.metrics.endPreparingJob(job);
+    initedTime = clock.getTime();
+    return VertexState.INITED;
+  }
+  
+  /**
+   * If the number of tasks are greater than the configured value
+   * throw an exception that will fail job initialization
+   */
+  private void checkTaskLimits() {
+    // no code, for now
+  }
+  
+  private void createTasks(VertexImpl vertex) {
+    Configuration conf = vertex.conf;
+    boolean useNullLocationHint = true;
+    if (vertex.vertexLocationHint != null
+        && vertex.vertexLocationHint.getTaskLocationHints() != null
+        && vertex.vertexLocationHint.getTaskLocationHints().size() ==
+            vertex.numTasks) {
+      useNullLocationHint = false;
+    }
+    for (int i=0; i < vertex.numTasks; ++i) {
+      TaskLocationHint locHint = null;
+      if (!useNullLocationHint) {
+        locHint = vertex.vertexLocationHint.getTaskLocationHints().get(i);
+      }
+      TaskImpl task =
+          new TaskImpl(vertex.getVertexId(), i,
+              vertex.eventHandler,
+              conf,
+              vertex.taskAttemptListener,
+              vertex.clock,
+              vertex.taskHeartbeatHandler,
+              vertex.appContext,
+              vertex.targetVertices.isEmpty(),
+              locHint, vertex.taskResource,
+              vertex.containerContext);
+      vertex.addTask(task);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Created task for vertex " + vertex.getVertexId() + ": " +
+            task.getTaskId());
+      }
+    }
 
-        /*
-        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
-        job.numMapTasks = taskSplitMetaInfo.length;
-        */
+  }
+  
+  public static class InitTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
-        if (vertex.numTasks == 0) {
-          vertex.addDiagnostic("No tasks for vertex " + vertex.getVertexId());
-          vertex.trySetTerminationCause(VertexTerminationCause.ZERO_TASKS);
-          vertex.abortVertex(VertexStatus.State.FAILED);
-          return vertex.finished(VertexState.FAILED);
-        }
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
 
-        checkTaskLimits();
+      vertex.initTimeRequested = vertex.clock.getTime();
 
-        // create the Tasks but don't start them yet
-        createTasks(vertex);
+      // VertexManager needs to be setup before attempting to Initialize any
+      // Inputs - since events generated by them will be routed to the
+      // VertexManager for handling.
 
-        // TODO get this from API
-        boolean hasBipartite = false;
-        if (vertex.sourceVertices != null) {
-          for (Edge edge : vertex.sourceVertices.values()) {
-            if (edge.getEdgeProperty().getDataMovementType() == 
-                      DataMovementType.SCATTER_GATHER) {
-              hasBipartite = true;
-              break;
+      // Check if any inputs need initializers
+      if (vertex.additionalInputs != null) {
+        LOG.info("Root Inputs exist for Vertex: " + vertex.getName() + " : "
+            + vertex.additionalInputs);
+        for (RootInputLeafOutputDescriptor<InputDescriptor> input : vertex.additionalInputs.values())
{
+          if (input.getInitializerClassName() != null) {
+            if (vertex.inputsWithInitializers == null) {
+              vertex.inputsWithInitializers = Sets.newHashSet();
             }
+            vertex.inputsWithInitializers.add(input.getEntityName());
+            LOG.info("Starting root input initializer for input: "
+                + input.getEntityName() + ", with class: ["
+                + input.getInitializerClassName() + "]");
           }
         }
+      }
 
-        if (hasBipartite) {
-          // setup vertex scheduler
-          // TODO this needs to consider data size and perhaps API.
-          // Currently implicitly BIPARTITE is the only edge type
-          vertex.vertexScheduler = new ShuffleVertexManager(vertex);
-        } else {
-          // schedule all tasks upon vertex start
-          vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
+      // TODO get this from API
+      boolean hasBipartite = false;
+      if (vertex.sourceVertices != null) {
+        for (Edge edge : vertex.sourceVertices.values()) {
+          if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER)
{
+            hasBipartite = true;
+            break;
+          }
         }
+      }
 
-        vertex.vertexScheduler.initialize(vertex.conf);
+      if (hasBipartite && vertex.inputsWithInitializers != null) {
+        LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at
the moment");
+        return vertex.finished(VertexState.FAILED);
+      }
 
-        // FIXME how do we decide vertex needs a committer?
-        // Answer: Do commit for every vertex
-        // for now, only for leaf vertices
-        // TODO TEZ-41 make commmitter type configurable per vertex
-        
-        if (vertex.targetVertices.isEmpty()) {
-          vertex.committer = new MRVertexOutputCommitter();
-        }
-        vertex.committer.init(vertex.vertexContext);
-        vertex.committer.setupVertex();
-
-        // TODO: Metrics
-        //vertex.metrics.endPreparingJob(job);
-        vertex.initTime = vertex.clock.getTime();
-        return VertexState.INITED;
-
-      } catch (IOException e) {
-        LOG.warn("Vertex init failed", e);
-        vertex.addDiagnostic("Job init failed : "
-            + StringUtils.stringifyException(e));
-        vertex.trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+      if (hasBipartite) {
+        // setup vertex scheduler
+        // TODO this needs to consider data size and perhaps API.
+        // Currently implicitly BIPARTITE is the only edge type
+        LOG.info("Setting vertexManager to ShuffleVertexManager");
+        vertex.vertexScheduler = new ShuffleVertexManager(vertex);
+      } else if (vertex.inputsWithInitializers != null) {
+        LOG.info("Setting vertexManager to RootInputVertexManager");
+        vertex.vertexScheduler = new RootInputVertexManager(vertex,
+            vertex.eventHandler);
+      } else {
+        // schedule all tasks upon vertex start
+        LOG.info("Setting vertexManager to ImmediateStartVertexManager");
+        vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
+      }
+      vertex.vertexScheduler.initialize(vertex.conf);
+
+      // Setup tasks early if possible. If the VertexManager is not being used
+      // to set parallelism, sending events to Tasks is safe (and less confusing
+      // then relying on tasks to be created after TaskEvents are generated).
+      // For VertexManagers setting parallelism, the setParallelism call needs
+      // to be inline.
+      vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
+      if (!(vertex.numTasks == -1 || vertex.numTasks > 0)) {
+        vertex.addDiagnostic("No tasks for vertex " + vertex.getVertexId());
+        vertex.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
         vertex.abortVertex(VertexStatus.State.FAILED);
-        // TODO: Metrics
-        //job.metrics.endPreparingJob(vertex);
         return vertex.finished(VertexState.FAILED);
       }
-    }
 
-
-    private void createTasks(VertexImpl vertex) {
-      Configuration conf = vertex.conf;
-      boolean useNullLocationHint = true;
-      if (vertex.vertexLocationHint != null
-          && vertex.vertexLocationHint.getTaskLocationHints() != null
-          && vertex.vertexLocationHint.getTaskLocationHints().size() ==
-              vertex.numTasks) {
-        useNullLocationHint = false;
+      vertex.checkTaskLimits();
+      
+      // Create tasks based on initial configuration, but don't start them yet.
+      if (vertex.numTasks == -1) {
+        LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers to set #tasks
for the vertex");
+      } else {
+        vertex.createTasks(vertex);
       }
-      for (int i=0; i < vertex.numTasks; ++i) {
-        TaskLocationHint locHint = null;
-        if (!useNullLocationHint) {
-          locHint = vertex.vertexLocationHint.getTaskLocationHints().get(i);
-        }
-        TaskImpl task =
-            new TaskImpl(vertex.getVertexId(), i,
-                vertex.eventHandler,
-                conf,
-                vertex.taskAttemptListener,
-                vertex.clock,
-                vertex.taskHeartbeatHandler,
-                vertex.appContext,
-                vertex.targetVertices.isEmpty(),
-                locHint, vertex.taskResource,
-                vertex.containerContext);
-        vertex.addTask(task);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Created task for vertex " + vertex.getVertexId() + ": " +
-              task.getTaskId());
+
+      if (vertex.inputsWithInitializers != null) {
+        vertex.rootInputInitializer = new RootInputInitializerRunner(vertex
+            .getDAG().getName(), vertex.getName(), vertex.getVertexId(),
+            vertex.eventHandler);
+        List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
+            .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
+        for (String inputName : vertex.inputsWithInitializers) {
+          inputList.add(vertex.additionalInputs.get(inputName));
         }
+        LOG.info("Starting root input initializers: "
+            + vertex.inputsWithInitializers.size());
+        vertex.rootInputInitializer.runInputInitializers(inputList);
+        return VertexState.INITIALIZING;
       }
 
+      return vertex.initializeVertex();
     }
+  } // end of InitTransition
+  
+  public static class RootInputInitializedTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
-    /**
-     * If the number of tasks are greater than the configured value
-     * throw an exception that will fail job initialization
-     */
-    private void checkTaskLimits() {
-      // no code, for now
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event;
+
+      vertex.vertexScheduler.onRootVertexInitialized(
+          liInitEvent.getInputName(),
+          vertex.getAdditionalInputs().get(liInitEvent.getInputName())
+              .getDescriptor(), liInitEvent.getEvents());
+
+      vertex.numInitializedInputs++;
+      if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
+        // All inputs initialized, shutdown the initializer.
+        vertex.rootInputInitializer.shutdown();
+
+        // If RootInputs are determining parallelism, it should have been set by
+        // this point, so it's safe to checkTaskLimits and createTasks
+        VertexState vertexState = vertex.initializeVertex();
+        if (vertexState == VertexState.FAILED) {
+          // Don't bother starting if the vertex state is failed.
+          return vertexState;
+        }
+
+        // Vertex will be moving to INITED state, safe to process pending route events.
+        if (vertex.pendingRouteEvents != null) {
+          VertexImpl.ROUTE_EVENT_TRANSITION.transition(vertex,
+              new VertexEventRouteEvent(vertex.getVertexId(),
+                  vertex.pendingRouteEvents));
+          vertex.pendingRouteEvents = null;
+        }
+
+        if (vertex.startSignalPending) {
+          vertex.startVertex(); // Could be modelled as a separate state
+          return VertexState.RUNNING;
+        }
+
+        return vertexState;
+      } else {
+        return VertexState.INITIALIZING;
+      }
     }
-  } // end of InitTransition
+  }
+
 
   // Temporary to maintain topological order while starting vertices. Not useful
   // since there's not much difference between the INIT and RUNNING states.
@@ -1143,6 +1291,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  public static class StartWhileInitingTransition implements SingleArcTransition<VertexImpl,
VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      vertex.startTimeRequested = vertex.clock.getTime();
+      vertex.startSignalPending = true;
+    }
+    
+  }
+  
   public static class StartTransition
   implements SingleArcTransition<VertexImpl, VertexEvent> {
     /**
@@ -1151,23 +1309,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
      */
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
-      vertex.startTime = vertex.clock.getTime();
-      vertex.vertexScheduler.onVertexStarted();
-      vertex.logJobHistoryVertexStartedEvent();
-
-      // TODO: Metrics
-      //job.metrics.runningJob(job);
-
-      // default behavior is to start immediately. so send information about us
-      // starting to downstream vertices. If the connections/structure of this
-      // vertex is not fully defined yet then we could send this event later
-      // when we are ready
-      for (Vertex targetVertex : vertex.targetVertices.keySet()) {
-        vertex.eventHandler.handle(
-            new VertexEventSourceVertexStarted(targetVertex.getVertexId(),
-                                               vertex.distanceFromRoot));
-      }
+      vertex.startTimeRequested  = vertex.clock.getTime();
+      vertex.startVertex();
+    }
+  }
 
+  private void startVertex() {
+    startedTime = clock.getTime();
+    vertexScheduler.onVertexStarted();
+    logJobHistoryVertexStartedEvent();
+    
+    // TODO: Metrics
+    //job.metrics.runningJob(job);
+    
+    // default behavior is to start immediately. so send information about us
+    // starting to downstream vertices. If the connections/structure of this
+    // vertex is not fully defined yet then we could send this event later
+    // when we are ready
+    for (Vertex targetVertex : targetVertices.keySet()) {
+      eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
+          .getVertexId(), distanceFromRoot));
     }
   }
 
@@ -1203,6 +1364,30 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class RootInputInitFailedTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventRootInputFailed fe = (VertexEventRootInputFailed) event;
+      vertex.trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+      vertex.addDiagnostic("Vertex Input: " + fe.getInputName()
+          + " initializer failed.");
+      if (fe.getError() != null) {
+        LOG.error("Vertex Input: " + fe.getInputName() + " initializer failed",
+            fe.getError());
+        if (fe.getError().getMessage() != null) {
+          vertex.addDiagnostic(fe.getError().getMessage());
+        }
+      }
+      if (vertex.rootInputInitializer != null) {
+        vertex.rootInputInitializer.shutdown();
+      }
+      vertex.finished(VertexState.FAILED,
+          VertexTerminationCause.ROOT_INPUT_INIT_FAILURE);
+    }
+  }
+
   // Task-start has been moved out of InitTransition, so this arc simply
   // hardcodes 0 for both map and reduce finished tasks.
   private static class TerminateNewVertexTransition
@@ -1229,6 +1414,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class TerminateInitingVertexTransition extends TerminateInitedVertexTransition
{
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      super.transition(vertex, event);
+      if (vertex.rootInputInitializer != null) {
+        vertex.rootInputInitializer.shutdown();
+      }
+    }
+  }
+
   private static class VertexKilledTransition
       implements SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
@@ -1391,6 +1586,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private static class RouteEventsWhileInitializingTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventRouteEvent re = (VertexEventRouteEvent) event;
+      if (vertex.pendingRouteEvents == null) {
+        vertex.pendingRouteEvents = Lists.newLinkedList();
+      }
+      // Store the events for post-init routing, since INIT state is when
+      // initial task parallelism will be set
+      vertex.pendingRouteEvents.addAll(re.getEvents());
+    }
+  }
+
   private static class RouteEventTransition  implements
   SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
@@ -1506,11 +1716,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
   
   @Override
-  public void setAdditionalInputs(List<NamedDescriptorProto> inputs) {
-    for (NamedDescriptorProto input : inputs) {
+  public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
+    Preconditions.checkArgument(inputs.size() < 2,
+        "For now, only a single root input can be specified on a Vertex");
+    this.additionalInputs = Maps.newHashMapWithExpectedSize(inputs.size());
+    for (RootInputLeafOutputProto input : inputs) {
+      
       InputDescriptor id = DagTypeConverters
           .convertInputDescriptorFromDAGPlan(input.getEntityDescriptor());
-      
+
+      this.additionalInputs.put(input.getName(),
+          new RootInputLeafOutputDescriptor<InputDescriptor>(input.getName(), id,
+              input.hasInitializerClassName() ? input.getInitializerClassName()
+                  : null));
       InputSpec inputSpec = new InputSpec(input.getName(), id, 0);
       additionalInputSpecs.add(inputSpec);
     }
@@ -1518,17 +1736,34 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public void setAdditionalOutputs(List<NamedDescriptorProto> outputs) {
-    for (NamedDescriptorProto output : outputs) {
+  public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) {
+    this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
+    for (RootInputLeafOutputProto output : outputs) {
+      
       OutputDescriptor od = DagTypeConverters
           .convertOutputDescriptorFromDAGPlan(output.getEntityDescriptor());
-      
+
+      this.additionalOutputs.put(
+          output.getName(),
+          new RootInputLeafOutputDescriptor<OutputDescriptor>(output.getName(), od,
+              output.hasInitializerClassName() ? output
+                  .getInitializerClassName() : null));
       OutputSpec outputSpec = new OutputSpec(output.getName(), od, 0);
       additionalOutputSpecs.add(outputSpec);
     }
   }
 
   @Override
+  public Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs()
{
+    return this.additionalInputs;
+  }
+
+  @Override
+  public Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> getAdditionalOutputs()
{
+    return this.additionalOutputs;
+  }
+
+  @Override
   public int compareTo(Vertex other) {
     return this.vertexId.compareTo(other.getVertexId());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index a4fa20e..6487b5c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -28,12 +28,14 @@ public class VertexStartedEvent implements HistoryEvent {
   private VertexStarted datum = new VertexStarted();
 
   public VertexStartedEvent(TezVertexID vertexId,
-      String vertexName, long initTime, long startTime,
+      String vertexName, long initRequestedTime, long initedTime, long startRequestedTime,
long startTime,
       long numTasks, String processorName) {
     datum.vertexName = vertexName;
     datum.vertexId = vertexId.toString();
-    datum.initTime = initTime;
-    datum.startTime = startTime;
+    datum.initRequestedTime = initRequestedTime;
+    datum.initedTime = initedTime;
+    datum.startRequestedTime = startRequestedTime;
+    datum.startedTime = startTime;
     datum.numTasks = numTasks;
     datum.processorName = processorName;
   }
@@ -58,8 +60,10 @@ public class VertexStartedEvent implements HistoryEvent {
   public String toString() {
     return "vertexName=" + datum.vertexName
         + ", vertexId=" + datum.vertexId
-        + ", initTime=" + datum.initTime
-        + ", startTime=" + datum.startTime
+        + ", initRequestedTime=" + datum.initRequestedTime
+        + ", initedTime=" + datum.initedTime
+        + ", startRequestedTime=" + datum.startRequestedTime
+        + ", startedTime=" + datum.startedTime
         + ", numTasks=" + datum.numTasks
         + ", processorName=" + datum.processorName;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
index b42096c..6281b10 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -25,5 +25,6 @@ public enum EventType {
   INPUT_READ_ERROR_EVENT,
   INPUT_FAILED_EVENT,
   TASK_STATUS_UPDATE_EVENT,
-  VERTEX_MANAGER_EVENT
+  VERTEX_MANAGER_EVENT,
+  ROOT_INPUT_DATA_INFORMATION_EVENT
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 03e41f4..1b0f45c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -29,9 +29,11 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -71,6 +73,8 @@ public class TezEvent implements Writable {
       eventType = EventType.INPUT_FAILED_EVENT;
     } else if (event instanceof TaskStatusUpdateEvent) {
       eventType = EventType.TASK_STATUS_UPDATE_EVENT;
+    } else if (event instanceof RootInputDataInformationEvent) {
+      eventType = EventType.ROOT_INPUT_DATA_INFORMATION_EVENT;
     } else {
       throw new TezUncheckedException("Unknown event, event="
           + event.getClass().getName());
@@ -154,6 +158,13 @@ public class TezEvent implements Writable {
             .setTargetIndex(ifEvt.getTargetIndex())
             .setVersion(ifEvt.getVersion()).build().toByteArray();
         break;
+      case ROOT_INPUT_DATA_INFORMATION_EVENT:
+        RootInputDataInformationEvent liEvent = (RootInputDataInformationEvent) event;
+        eventBytes = RootInputDataInformationEventProto.newBuilder()
+            .setIndex(liEvent.getIndex())
+            .setUserPayload(ByteString.copyFrom(liEvent.getUserPayload()))
+            .build().toByteArray();
+        break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
            + ", type=" + eventType);
@@ -211,8 +222,15 @@ public class TezEvent implements Writable {
         event = new InputFailedEvent(ifProto.getSourceIndex(),
             ifProto.getTargetIndex(), ifProto.getVersion());
         break;
+      case ROOT_INPUT_DATA_INFORMATION_EVENT:
+        RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
+            .parseFrom(eventBytes);
+        event = new RootInputDataInformationEvent(difProto.getIndex(), difProto
+            .getUserPayload().toByteArray());
+        break;
       default:
-        throw new TezUncheckedException("Unknown TezEvent"
+        // RootInputUpdatePayload event not wrapped in a TezEvent.
+        throw new TezUncheckedException("Unexpected TezEvent"
            + ", type=" + eventType);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 82c7f37..3cc0eb8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -530,7 +530,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
     @Override
     public void onFailure(Throwable t) {
-      LOG.error("Fetcher failed with error: " + t);
+      LOG.error("Fetcher failed with error: ", t);
       shuffleError = t;
       inputContext.fatalError(t, "Fetch failed");
       doBookKeepingForFetcherComplete();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 8acc258..aef1142 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -125,7 +125,7 @@ public class OnFileSortedOutput implements LogicalOutput {
 
     List<Event> events = Lists.newArrayListWithCapacity(numOutputs+1);
     events.add(vmEvent);
-    
+
     for (int i = 0; i < numOutputs; i++) {
       DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
       events.add(event);


Mime
View raw message