tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/3] git commit: TEZ-46. Add compute capability to Inputs specified directly on a Vertex. (sseth)
Date Wed, 09 Oct 2013 17:32:55 GMT
TEZ-46. Add compute capability to Inputs specified directly on a Vertex.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/74d33b6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/74d33b6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/74d33b6a

Branch: refs/heads/master
Commit: 74d33b6aa5ff80d715deab02353397e35b0eca4c
Parents: 1d78f23
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Oct 9 10:31:31 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Oct 9 10:31:31 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   |   8 +-
 .../apache/tez/dag/api/DagTypeConverters.java   |  12 +-
 .../org/apache/tez/dag/api/NamedDescriptor.java |  38 --
 .../apache/tez/dag/api/RootInputLeafOutput.java |  47 ++
 .../java/org/apache/tez/dag/api/Vertex.java     |  40 +-
 .../java/org/apache/tez/runtime/api/Input.java  |   2 +-
 .../runtime/api/TezRootInputInitializer.java    |  34 ++
 .../api/TezRootInputInitializerContext.java     |  33 ++
 .../events/RootInputDataInformationEvent.java   |  52 ++
 .../api/events/RootInputUpdatePayloadEvent.java |  42 ++
 tez-api/src/main/proto/DAGApiRecords.proto      |   7 +-
 tez-api/src/main/proto/Events.proto             |   7 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  10 +-
 .../java/org/apache/tez/common/TezUtils.java    |  22 +
 tez-dag/src/main/avro/HistoryEvents.avpr        |   6 +-
 .../dag/app/dag/RootInputInitializerRunner.java | 146 ++++++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  15 +-
 .../apache/tez/dag/app/dag/VertexScheduler.java |   6 +
 .../org/apache/tez/dag/app/dag/VertexState.java |   1 +
 .../tez/dag/app/dag/VertexTerminationCause.java |   7 +-
 .../app/dag/event/DAGEventVertexCompleted.java  |  47 +-
 .../dag/event/VertexEventRootInputFailed.java   |  41 ++
 .../event/VertexEventRootInputInitialized.java  |  50 ++
 .../tez/dag/app/dag/event/VertexEventType.java  |   5 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   5 +-
 .../dag/impl/ImmediateStartVertexScheduler.java |   9 +
 .../dag/impl/RootInputLeafOutputDescriptor.java |  46 ++
 .../app/dag/impl/RootInputVertexManager.java    | 116 +++++
 .../dag/app/dag/impl/ShuffleVertexManager.java  |   8 +
 .../TezRootInputInitializerContextImpl.java     |  65 +++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 513 ++++++++++++++-----
 .../dag/history/events/VertexStartedEvent.java  |  14 +-
 .../apache/tez/runtime/api/impl/EventType.java  |   3 +-
 .../apache/tez/runtime/api/impl/TezEvent.java   |  20 +-
 .../input/BroadcastShuffleManager.java          |   2 +-
 .../library/output/OnFileSortedOutput.java      |   2 +-
 36 files changed, 1241 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 5a51891..98ea91b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -177,7 +177,7 @@ public class DAG { // FIXME rename to Topology
     // Verify Input/Output names don't collide amongs themselves as well as with vertexNames
     Set<String> namedIOs = new HashSet<String>();
     for (Vertex v : vertices.values()) {
-      for (NamedDescriptor<InputDescriptor> in : v.getInputs()) {
+      for (RootInputLeafOutput<InputDescriptor> in : v.getInputs()) {
         if (vertexMap.containsKey(in.getName())) {
           throw new IllegalStateException(
               "DAG contains a vertex and an Input to a vertex with the same name: "
@@ -190,7 +190,7 @@ public class DAG { // FIXME rename to Topology
           namedIOs.add(in.getName());
         }
       }
-      for (NamedDescriptor<OutputDescriptor> out : v.getOutputs()) {
+      for (RootInputLeafOutput<OutputDescriptor> out : v.getOutputs()) {
         if (vertexMap.containsKey(out.getName())) {
           throw new IllegalStateException(
               "DAG contains a vertex and an Output from a vertex with the same name: "
@@ -308,12 +308,12 @@ public class DAG { // FIXME rename to Topology
       vertexBuilder.setProcessorDescriptor(DagTypeConverters
           .convertToDAGPlan(vertex.getProcessorDescriptor()));
       if (vertex.getInputs().size() > 0) {
-        for (NamedDescriptor<InputDescriptor> input : vertex.getInputs()) {
+        for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
           vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
         }
       }
       if (vertex.getOutputs().size() > 0) {
-        for (NamedDescriptor<OutputDescriptor> output : vertex.getOutputs()) {
+        for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
           vertexBuilder.addOutputs(DagTypeConverters.convertToDAGPlan(output));
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 9c72c3d..3247935 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -39,7 +39,6 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
-import org.apache.tez.dag.api.records.DAGProtos.NamedDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
@@ -49,6 +48,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 
 import com.google.protobuf.ByteString;
@@ -248,11 +248,15 @@ public class DagTypeConverters {
     return builder.build();
   }
 
-  public static NamedDescriptorProto convertToDAGPlan(
-      NamedDescriptor<? extends TezEntityDescriptor> descriptor) {
-    NamedDescriptorProto.Builder builder = NamedDescriptorProto.newBuilder();
+  public static RootInputLeafOutputProto convertToDAGPlan(
+      RootInputLeafOutput<? extends TezEntityDescriptor> descriptor) {
+    RootInputLeafOutputProto.Builder builder = RootInputLeafOutputProto.newBuilder();
     builder.setName(descriptor.getName());
     builder.setEntityDescriptor(convertToDAGPlan(descriptor.getDescriptor()));
+    if (descriptor.getInitializerClass() != null) {
+      builder.setInitializerClassName(descriptor.getInitializerClass()
+          .getName());
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/dag/api/NamedDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedDescriptor.java
deleted file mode 100644
index 761fa74..0000000
--- a/tez-api/src/main/java/org/apache/tez/dag/api/NamedDescriptor.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.api;
-
-class  NamedDescriptor <T extends TezEntityDescriptor> {
-
-  private final String name;
-  private final T descriptor;
-  
-  NamedDescriptor(String name, T descriptor) {
-    this.name = name;
-    this.descriptor = descriptor;
-  }
-  
-  public String getName() {
-    return this.name;
-  }
-
-  public T getDescriptor() {
-    return this.descriptor;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java b/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java
new file mode 100644
index 0000000..1334c41
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/RootInputLeafOutput.java
@@ -0,0 +1,47 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+class  RootInputLeafOutput <T extends TezEntityDescriptor> {
+
+  private final String name;
+  private final T descriptor;
+  private final Class<?> initializerClazz;
+  
+  RootInputLeafOutput(String name, T descriptor, Class<?> initializerClazz) {
+    this.name = name;
+    this.descriptor = descriptor;
+    this.initializerClazz = initializerClazz;
+  }
+  
+  public String getName() {
+    return this.name;
+  }
+
+  public T getDescriptor() {
+    return this.descriptor;
+  }
+  
+  public Class<?> getInitializerClass() {
+    return this.initializerClazz;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 9506af1..673430d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
 public class Vertex { // FIXME rename to Task
 
@@ -38,8 +40,8 @@ public class Vertex { // FIXME rename to Task
   private final Resource taskResource;
   private Map<String, LocalResource> taskLocalResources;
   private Map<String, String> taskEnvironment;
-  private final List<NamedDescriptor<InputDescriptor>> additionalInputs = new ArrayList<NamedDescriptor<InputDescriptor>>();
-  private final List<NamedDescriptor<OutputDescriptor>> additionalOutputs = new ArrayList<NamedDescriptor<OutputDescriptor>>();
+  private final List<RootInputLeafOutput<InputDescriptor>> additionalInputs = new ArrayList<RootInputLeafOutput<InputDescriptor>>();
+  private final List<RootInputLeafOutput<OutputDescriptor>> additionalOutputs = new ArrayList<RootInputLeafOutput<OutputDescriptor>>();
 
   private final List<Vertex> inputVertices = new ArrayList<Vertex>();
   private final List<Vertex> outputVertices = new ArrayList<Vertex>();
@@ -56,8 +58,9 @@ public class Vertex { // FIXME rename to Task
     this.processorDescriptor = processorDescriptor;
     this.parallelism = parallelism;
     this.taskResource = taskResource;
-    if (parallelism == 0) {
-      throw new IllegalArgumentException("Parallelism cannot be 0");
+    if (!(parallelism == -1 || parallelism > 0)) {
+      throw new IllegalArgumentException(
+          "Parallelism should be -1 if determined by the AM, otherwise should be > 0");
     }
     if (taskResource == null) {
       throw new IllegalArgumentException("Resource cannot be null");
@@ -133,12 +136,25 @@ public class Vertex { // FIXME rename to Task
    *          in the {@link LogicalIOProcessor}
    * @param inputDescriptor
    *          the inputDescriptor for this input
+   * @param inputInitializer
+   *          An initializer for this Input which may run within the AM. This
+   *          can be used to set the parallelism for this vertex and generate
+   *          {@link RootInputDataInformationEvent}s for the actual Input.</p>
+   *          If this is not specified, the parallelism must be set for the
+   *          vertex. In addition, the Input should know how to access data for
+   *          each of it's tasks. </p> If a {@link TezRootInputInitializer} is
+   *          meant to determine the parallelism of the vertex, the initial
+   *          vertex parallelism should be set to -1.
    * @return
    */
-  // TODO : Add a processing component.
-  public Vertex addInput(String inputName, InputDescriptor inputDescriptor) {
-    additionalInputs.add(new NamedDescriptor<InputDescriptor>(inputName,
-        inputDescriptor));
+  public Vertex addInput(String inputName, InputDescriptor inputDescriptor,
+      Class<? extends TezRootInputInitializer> inputInitializer) {
+    if (additionalInputs.size() == 1) {
+      throw new IllegalStateException(
+          "For now, only a single Root Input can be added to a Vertex");
+    }
+    additionalInputs.add(new RootInputLeafOutput<InputDescriptor>(inputName,
+        inputDescriptor, inputInitializer));
     return this;
   }
 
@@ -161,8 +177,8 @@ public class Vertex { // FIXME rename to Task
    */
   // TODO : Add a processing component.
   public Vertex addOutput(String outputName, OutputDescriptor outputDescriptor) {
-    additionalOutputs.add(new NamedDescriptor<OutputDescriptor>(outputName,
-        outputDescriptor));
+    additionalOutputs.add(new RootInputLeafOutput<OutputDescriptor>(outputName,
+        outputDescriptor, null));
     return this;
   }
 
@@ -201,11 +217,11 @@ public class Vertex { // FIXME rename to Task
     return outputEdgeIds;
   }
   
-  List<NamedDescriptor<InputDescriptor>> getInputs() {
+  List<RootInputLeafOutput<InputDescriptor>> getInputs() {
     return additionalInputs;
   }
 
-  List<NamedDescriptor<OutputDescriptor>> getOutputs() {
+  List<RootInputLeafOutput<OutputDescriptor>> getOutputs() {
     return additionalOutputs;
   }
   // FIXME how do we support profiling? Can't profile all tasks.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index a15c106..7cb5482 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -58,7 +58,7 @@ public interface Input {
    * @param inputEvents
    *          the list of {@link Event}s
    */
-  public void handleEvents(List<Event> inputEvents);
+  public void handleEvents(List<Event> inputEvents) throws Exception;
 
   /**
    * Closes the <code>Input</code>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
new file mode 100644
index 0000000..b9df646
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+/**
+ * <code>TezRootInputInitializer</code>s are used to initialize root vertices
+ * within the AM. They can be used to distribute data across the tasks for the
+ * vertex, determine the number of tasks at runtime, update the Input payload
+ * etc.
+ */
+public interface TezRootInputInitializer {
+  
+  List<Event> initialize(TezRootInputInitializerContext inputVertexContext)
+      throws Exception;
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
new file mode 100644
index 0000000..37cb800
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public interface TezRootInputInitializerContext {
+
+  ApplicationId getApplicationId();
+  
+  String getDAGName();
+  
+  String getInputName();
+
+  public byte[] getUserPayload();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
new file mode 100644
index 0000000..a4938cc
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+
+/**
+ * Events used by {@link TezRootInputInitializer} implementations to provide the
+ * byte payload for individual tasks running as part of the Vertex for which an
+ * Initial Input has been configured.
+ */
+public class RootInputDataInformationEvent extends Event {
+
+  private final int index;
+  private final byte[] userPayload;
+  
+  public RootInputDataInformationEvent(int index, byte[] userPayload) {
+    this.index = index;
+    this.userPayload = userPayload;
+  }
+
+  public int getIndex() {
+    return this.index;
+  }
+  
+  public byte[] getUserPayload() {
+    return this.userPayload;
+  }
+
+  @Override
+  public String toString() {
+    return "RootInputDataInformationEvent [index=" + index + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
new file mode 100644
index 0000000..607678f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+
+/**
+ * Events used by {@link TezRootInputInitializer} implementations to update the
+ * shared user payload for the Input that is being initialized. </p>
+ * 
+ * This event is specific to an Input, and should only be sent once - ideally
+ * before {@link RootInputDataInformationEvent}s
+ */
+public class RootInputUpdatePayloadEvent extends Event {
+
+  private final byte[] userPayload;
+
+  public RootInputUpdatePayloadEvent(byte[] userPayload) {
+    this.userPayload = userPayload;
+  }
+
+  public byte[] getUserPayload() {
+    return this.userPayload;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 7b7d701..79e62ca 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -102,9 +102,10 @@ message TezEntityDescriptorProto {
   optional bytes user_payload = 2;
 }
 
-message NamedDescriptorProto {
+message RootInputLeafOutputProto {
   optional string name = 1;
   optional TezEntityDescriptorProto entity_descriptor= 2;
+  optional string initializer_class_name = 3;
 }
 
 message VertexPlan {
@@ -115,8 +116,8 @@ message VertexPlan {
   repeated PlanTaskLocationHint taskLocationHint = 7;
   repeated string inEdgeId = 8;
   repeated string outEdgeId = 9;
-  repeated NamedDescriptorProto inputs = 10;
-  repeated NamedDescriptorProto outputs = 11;
+  repeated RootInputLeafOutputProto inputs = 10;
+  repeated RootInputLeafOutputProto outputs = 11;
 }
 
 message EdgePlan {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index d76708a..a738972 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -42,4 +42,9 @@ message InputFailedEventProto {
 message VertexManagerEventProto {
   optional string target_vertex_name = 1;
   optional bytes user_payload = 2;
-}
\ No newline at end of file
+}
+
+message RootInputDataInformationEventProto {
+  optional int32 index = 1;
+  optional bytes user_payload = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 459194f..ea74927 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -284,7 +284,7 @@ public class TestDAGVerify {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
-    v1.addInput("v1", new InputDescriptor("Input"));
+    v1.addInput("v1", new InputDescriptor("Input"), null);
     Vertex v2 = new Vertex("v2",
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
@@ -314,7 +314,7 @@ public class TestDAGVerify {
     Vertex v1 = new Vertex("v1",
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
-    v1.addInput("collide", new InputDescriptor("Input"));
+    v1.addInput("collide", new InputDescriptor("Input"), null);
     Vertex v2 = new Vertex("v2",
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
@@ -425,7 +425,11 @@ public class TestDAGVerify {
           0, dummyTaskResource);
       Assert.fail("Expected exception for 0 parallelism");
     } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage().startsWith("Parallelism cannot be 0"));
+      Assert
+          .assertTrue(e
+              .getMessage()
+              .startsWith(
+                  "Parallelism should be -1 if determined by the AM, otherwise should be > 0"));
     }
     try {
       Vertex v1 = new Vertex("v1",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index d7774be..65ac1a4 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -17,18 +17,21 @@
 
 package org.apache.tez.common;
 
+import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 
 public class TezUtils {
   
@@ -56,6 +59,15 @@ public class TezUtils {
     }
   }
   
+  public static ByteString createByteStringFromConf(Configuration conf)
+      throws IOException {
+    Preconditions.checkNotNull(conf, "Configuration must be specified");
+    ByteString.Output os = ByteString.newOutput();
+    DataOutputStream dos = new DataOutputStream(os);
+    conf.write(dos);
+    return os.toByteString();
+  }
+  
   public static byte[] createUserPayloadFromConf(Configuration conf)
       throws IOException {
     Preconditions.checkNotNull(conf, "Configuration must be specified");
@@ -64,6 +76,16 @@ public class TezUtils {
     return dob.getData();
   }
 
+  public static Configuration createConfFromByteString(ByteString byteString)
+      throws IOException {
+    Preconditions.checkNotNull(byteString, "ByteString must be specified");
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(byteString.asReadOnlyByteBuffer());
+    Configuration conf = new Configuration(false);
+    conf.readFields(dibb);
+    return conf;
+  }
+  
   public static Configuration createConfFromUserPayload(byte[] bb)
       throws IOException {
     // TODO Avoid copy ?

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/avro/HistoryEvents.avpr
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/avro/HistoryEvents.avpr b/tez-dag/src/main/avro/HistoryEvents.avpr
index 762871a..ebb3267 100644
--- a/tez-dag/src/main/avro/HistoryEvents.avpr
+++ b/tez-dag/src/main/avro/HistoryEvents.avpr
@@ -83,8 +83,10 @@
       "fields": [
           {"name": "vertexName", "type": "string"},
           {"name": "vertexId", "type": "string"},
-          {"name": "initTime", "type": "long"},
-          {"name": "startTime", "type": "long"},
+          {"name": "initRequestedTime", "type": "long"},
+          {"name": "initedTime", "type": "long"},
+          {"name": "startRequestedTime", "type": "long"},
+          {"name": "startedTime", "type": "long"},
           {"name": "numTasks", "type": "long"},
           {"name": "processorName", "type": "string"}
       ]

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
new file mode 100644
index 0000000..ab78d1b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
+import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
+import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class RootInputInitializerRunner {
+
+  private static final Log LOG = LogFactory.getLog(RootInputInitializerRunner.class);
+  
+  private final ExecutorService rawExecutor;
+  private final ListeningExecutorService executor;
+  private final String dagName;
+  private final String vertexName;
+  private final TezVertexID vertexID;
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+  private volatile boolean isStopped = false;
+
+  @SuppressWarnings("rawtypes")
+  public RootInputInitializerRunner(String dagName, String vertexName, TezVertexID vertexID, EventHandler eventHandler) {
+    this.dagName = dagName;
+    this.vertexName = vertexName;
+    this.vertexID = vertexID;
+    this.eventHandler = eventHandler;
+    this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("InputInitializer #%d").build());
+    this.executor = MoreExecutors.listeningDecorator(rawExecutor);
+  }
+
+  public void runInputInitializers(List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
+    for (RootInputLeafOutputDescriptor<InputDescriptor> input : inputs) {
+      ListenableFuture<List<Event>> future = executor
+          .submit(new InputInitializerCallable(input, vertexID, dagName,
+              vertexName));
+      Futures.addCallback(future, new InputInitializerCallback(input.getEntityName()));
+    }
+  }
+
+  public void shutdown() {
+    if (executor != null && !isStopped) {
+      // Don't really care about what is running if an error occurs. If no error
+      // occurs, all execution is complete.
+      executor.shutdownNow();
+      isStopped = true;
+    }
+  }
+
+  private static class InputInitializerCallable implements
+      Callable<List<Event>> {
+
+    private final RootInputLeafOutputDescriptor<InputDescriptor> input;
+    private final TezVertexID vertexID;
+    private final String dagName;
+    private final String vertexName;
+
+    public InputInitializerCallable(RootInputLeafOutputDescriptor<InputDescriptor> input,
+        TezVertexID vertexID, String dagName, String vertexName) {
+      this.input = input;
+      this.vertexID = vertexID;
+      this.dagName = dagName;
+      this.vertexName = vertexName;
+    }
+
+    @Override
+    public List<Event> call() throws Exception {
+      TezRootInputInitializer initializer = createInitializer();
+      TezRootInputInitializerContext context = new TezRootInputInitializerContextImpl(
+          vertexID, dagName, vertexName, input.getEntityName(), input.getDescriptor());
+      return initializer.initialize(context);
+    }
+
+    private TezRootInputInitializer createInitializer()
+        throws ClassNotFoundException, InstantiationException,
+        IllegalAccessException {
+      String className = input.getInitializerClassName();
+      @SuppressWarnings("unchecked")
+      Class<? extends TezRootInputInitializer> clazz = (Class<? extends TezRootInputInitializer>) Class
+          .forName(className);
+      TezRootInputInitializer initializer = clazz.newInstance();
+      return initializer;
+    }
+  }
+
+  private class InputInitializerCallback implements FutureCallback<List<Event>> {
+
+    private final String inputName;
+
+    public InputInitializerCallback(String inputName) {
+      this.inputName = inputName;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onSuccess(List<Event> result) {
+      eventHandler.handle(new VertexEventRootInputInitialized(vertexID,
+          inputName, result));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onFailure(Throwable t) {
+      eventHandler
+          .handle(new VertexEventRootInputFailed(vertexID, inputName, t));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 05e678b..2aaabab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -24,12 +24,15 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.NamedDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.app.dag.impl.Edge;
+import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -72,12 +75,15 @@ public interface Vertex extends Comparable<Vertex> {
   void setInputVertices(Map<Vertex, Edge> inVertices);
   void setOutputVertices(Map<Vertex, Edge> outVertices);
 
-  void setAdditionalInputs(List<NamedDescriptorProto> inputs);
-  void setAdditionalOutputs(List<NamedDescriptorProto> outputs);
-  
   Map<Vertex, Edge> getInputVertices();
   Map<Vertex, Edge> getOutputVertices();
   
+  void setAdditionalInputs(List<RootInputLeafOutputProto> inputs);
+  void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs);
+  
+  Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs();
+  Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> getAdditionalOutputs();
+  
   List<InputSpec> getInputSpecList(int taskIndex);
   List<OutputSpec> getOutputSpecList(int taskIndex);
 
@@ -89,4 +95,5 @@ public interface Vertex extends Comparable<Vertex> {
   ProcessorDescriptor getProcessorDescriptor();
   public DAG getDAG();
   VertexTerminationCause getTerminationCause();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
index 4656efd..8bbf446 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
@@ -18,14 +18,20 @@
 
 package org.apache.tez.dag.app.dag;
 
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+
 // Rename to VertexManager TEZ-364 and move to DAG API. Make abstract class.
 public interface VertexScheduler {
   void initialize(Configuration conf);
   void onVertexStarted();
   void onSourceTaskCompleted(TezTaskAttemptID attemptId);
   void onVertexManagerEventReceived(VertexManagerEvent vmEvent);
+  void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 887c0d4..5a7af0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag;
 
 public enum VertexState {
   NEW,
+  INITIALIZING,
   INITED,
   RUNNING,
   SUCCEEDED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index f675ace..d06fac3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -29,14 +29,17 @@ public enum VertexTerminationCause {
   /** Other vertex failed causing DAG to fail thus killing this vertex  */
   OTHER_VERTEX_FAILURE,
 
+  /** Initialization failed for one of the root Inputs */
+  ROOT_INPUT_INIT_FAILURE,
+  
   /** One of the tasks for this vertex failed.  */
   OWN_TASK_FAILURE, 
 
   /** This vertex failed during commit. */
   COMMIT_FAILURE,
 
-  /** This vertex failed as it had zero tasks. */
-  ZERO_TASKS, 
+  /** This vertex failed as it had invalid number tasks. */
+  INVALID_NUM_OF_TASKS, 
 
   /** This vertex failed during init. */
   INIT_FAILURE,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
index 3d4367d..e58d46e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
@@ -1,35 +1,46 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.tez.dag.app.dag.event;
 
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.records.TezVertexID;
 
 public class DAGEventVertexCompleted extends DAGEvent {
 
   private final TezVertexID vertexId;
   private final VertexState vertexState;
+  private final VertexTerminationCause terminationCause;
 
   public DAGEventVertexCompleted(TezVertexID vertexId, VertexState vertexState) {
     super(vertexId.getDAGId(), DAGEventType.DAG_VERTEX_COMPLETED);
     this.vertexId = vertexId;
     this.vertexState = vertexState;
+    this.terminationCause = null;
+  }
+
+  public DAGEventVertexCompleted(TezVertexID vertexId, VertexState vertexState,
+      VertexTerminationCause terminationCause) {
+    super(vertexId.getDAGId(), DAGEventType.DAG_VERTEX_COMPLETED);
+    this.vertexId = vertexId;
+    this.vertexState = vertexState;
+    this.terminationCause = terminationCause;
   }
 
   public TezVertexID getVertexId() {
@@ -39,4 +50,8 @@ public class DAGEventVertexCompleted extends DAGEvent {
   public VertexState getVertexState() {
     return vertexState;
   }
+
+  public VertexTerminationCause getVertexTerminationCause() {
+    return this.terminationCause;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
new file mode 100644
index 0000000..4ab4ae9
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputFailed.java
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventRootInputFailed extends VertexEvent {
+  
+  private final String inputName;
+  private final Throwable error;
+
+  public VertexEventRootInputFailed(TezVertexID vertexId, String inputName, Throwable error) {
+    super(vertexId, VertexEventType.V_ROOT_INPUT_FAILED);
+    this.inputName = inputName;
+    this.error = error;
+  }
+  
+  public Throwable getError() {
+    return this.error;
+  }
+  
+  public String getInputName() {
+    return this.inputName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputInitialized.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputInitialized.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputInitialized.java
new file mode 100644
index 0000000..3c5e0be
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRootInputInitialized.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+
+public class VertexEventRootInputInitialized extends VertexEvent {
+
+  private final String inputName;
+  private final List<Event> events;
+
+  public VertexEventRootInputInitialized(TezVertexID vertexId, String inputName, List<Event> events) {
+    super(vertexId, VertexEventType.V_ROOT_INPUT_INITIALIZED);
+    this.inputName = inputName;
+    if (events == null) {
+      this.events = Collections.emptyList();
+    } else {
+      this.events = events;
+    }
+  }
+  
+  public List<Event> getEvents() {
+    return events;
+  }
+  
+  public String getInputName() {
+    return this.inputName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 7d640af..9d62ede 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -50,10 +50,13 @@ public enum VertexEventType {
   V_ATTEMPT_KILLED,
   
   //Producer:Any component
-  V_DIAGNOSTIC_UPDATE,
   V_INTERNAL_ERROR,
   V_COUNTER_UPDATE,
   
   V_ROUTE_EVENT,
   
+  //Producer: VertexInputInitializer
+  V_ROOT_INPUT_INITIALIZED,
+  V_ROOT_INPUT_FAILED,
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 10d36b9..e677c0f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1150,7 +1150,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         job.vertexSucceeded(vertex);
       }
       else if (vertexEvent.getVertexState() == VertexState.FAILED) {
-        job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
+        job.enactKill(
+            DAGTerminationCause.VERTEX_FAILURE,
+            vertexEvent.getVertexTerminationCause() == null ? VertexTerminationCause.OTHER_VERTEX_FAILURE
+                : vertexEvent.getVertexTerminationCause());
         job.vertexFailed(vertex);
         forceTransitionToKillWait = true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
index 2d94006..6217d91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
@@ -18,10 +18,14 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 /**
@@ -51,4 +55,9 @@ public class ImmediateStartVertexScheduler implements VertexScheduler {
   public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
   }
 
+  @Override
+  public void onRootVertexInitialized(String inputName,
+      InputDescriptor inputDescriptor, List<Event> events) {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
new file mode 100644
index 0000000..d2d2c13
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
@@ -0,0 +1,46 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.tez.dag.api.TezEntityDescriptor;
+
+public class RootInputLeafOutputDescriptor<T extends TezEntityDescriptor> {
+  private final String entityName;
+  private final T descriptor;
+  private final String initializerClassName;
+
+  public RootInputLeafOutputDescriptor(String entityName, T descriptor,
+      String initializerClassName) {
+    this.entityName = entityName;
+    this.descriptor = descriptor;
+    this.initializerClassName = initializerClassName;
+  }
+
+  public String getEntityName() {
+    return this.entityName;
+  }
+
+  public T getDescriptor() {
+    return this.descriptor;
+  }
+
+  public String getInitializerClassName() {
+    return this.initializerClassName;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
new file mode 100644
index 0000000..4329192
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexScheduler;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class RootInputVertexManager implements VertexScheduler {
+
+  private final Vertex managedVertex;
+  private final EventMetaData sourceInfo;
+  private final Map<String, EventMetaData> destInfoMap;
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+
+  @SuppressWarnings("rawtypes")
+  public RootInputVertexManager(Vertex vertex, EventHandler eventHandler) {
+    this.managedVertex = vertex;
+    this.eventHandler = eventHandler;
+    this.sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT,
+        vertex.getName(), "NULL", null);
+    Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> inputs = this.managedVertex
+        .getAdditionalInputs();
+    this.destInfoMap = Maps.newHashMapWithExpectedSize(inputs.size());
+    for (RootInputLeafOutputDescriptor input : inputs.values()) {
+      EventMetaData destInfo = new EventMetaData(
+          EventProducerConsumerType.INPUT, vertex.getName(),
+          input.getEntityName(), null);
+      destInfoMap.put(input.getEntityName(), destInfo);
+    }
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  @Override
+  public void onVertexStarted() {
+    managedVertex.scheduleTasks(managedVertex.getTasks().keySet());
+  }
+
+  @Override
+  public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
+  }
+
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  }
+
+  @Override
+  public void onRootVertexInitialized(String inputName,
+      InputDescriptor inputDescriptor, List<Event> events) {
+    boolean dataInformationEventSeen = false;
+    Preconditions.checkState(EnumSet.of(VertexState.INITIALIZING,
+        VertexState.NEW).contains(managedVertex.getState()));
+    for (Event event : events) {
+      if (event instanceof RootInputUpdatePayloadEvent) {
+        // No tasks should have been started yet. Checked by initial state check.
+        Preconditions.checkState(dataInformationEventSeen == false);
+        inputDescriptor.setUserPayload(((RootInputUpdatePayloadEvent) event)
+            .getUserPayload());
+      } else if (event instanceof RootInputDataInformationEvent) {
+        dataInformationEventSeen = true;
+        // # Tasks should have been set by this point.
+        Preconditions.checkState(managedVertex.getTasks().size() != 0);
+        TezEvent tezEvent = new TezEvent(event, sourceInfo);
+        tezEvent.setDestinationInfo(destInfoMap.get(inputName));
+        sendEventToTask(new TezTaskID(managedVertex.getVertexId(),
+            ((RootInputDataInformationEvent) event).getIndex()), tezEvent);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
+    eventHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index d633d0c..63ad8b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -37,6 +38,7 @@ import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -425,4 +427,10 @@ public class ShuffleVertexManager implements VertexScheduler {
             TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT);
   }
 
+  @Override
+  public void onRootVertexInitialized(String inputName,
+      InputDescriptor inputDescriptor, List<Event> events) {
+    // Not allowing this for now. Nothing to do.
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74d33b6a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
new file mode 100644
index 0000000..2534ec4
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+
+public class TezRootInputInitializerContextImpl implements
+    TezRootInputInitializerContext {
+
+  private final TezVertexID vertexID;
+  private final String dagName;
+  private final String inputName;
+  private final InputDescriptor inputDescriptor;
+
+  // TODO Add support for counters - merged with the Vertex counters.
+  
+  public TezRootInputInitializerContextImpl(TezVertexID vertexID,
+      String dagName, String vertexName, String inputName,
+      InputDescriptor inputDescriptor) {
+    this.vertexID = vertexID;
+    this.dagName = dagName;
+    this.inputName = inputName;
+    this.inputDescriptor = inputDescriptor;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return vertexID.getDAGId().getApplicationId();
+  }
+
+  @Override
+  public String getDAGName() {
+    return this.dagName;
+  }
+
+  @Override
+  public String getInputName() {
+    return this.inputName;
+  }
+
+  @Override
+  public byte[] getUserPayload() {
+    return inputDescriptor.getUserPayload();
+  }
+
+}


Mime
View raw message