tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1076. Allow events to be sent to InputInitializers. (sseth)
Date Thu, 10 Jul 2014 22:03:56 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 2e7641c69 -> e7e0b3fb6


TEZ-1076. Allow events to be sent to InputInitializers. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e7e0b3fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e7e0b3fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e7e0b3fb

Branch: refs/heads/master
Commit: e7e0b3fb6fbabbdb7fd06108359a00f50da5ebcf
Parents: 2e7641c
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Jul 10 15:03:39 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Jul 10 15:03:39 2014 -0700

----------------------------------------------------------------------
 .../runtime/api/TezRootInputInitializer.java    |  31 +-
 .../api/TezRootInputInitializerContext.java     |   7 +
 .../api/events/RootInputInitializerEvent.java   |  83 ++++
 tez-api/src/main/proto/Events.proto             |   7 +
 .../app/dag/RootInputInitializerManager.java    | 268 +++++++++++++
 .../dag/app/dag/RootInputInitializerRunner.java | 201 ----------
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +
 .../TezRootInputInitializerContextImpl.java     |  68 ++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  78 ++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 392 +++++++++++++++----
 .../common/MRInputAMSplitGenerator.java         |   6 +
 .../common/MRInputSplitDistributor.java         |   6 +
 .../common/TestMRInputSplitDistributor.java     |   5 +
 .../org/apache/tez/common/ProtoConverters.java  |  23 ++
 .../apache/tez/runtime/api/impl/EventType.java  |   3 +-
 .../apache/tez/runtime/api/impl/TezEvent.java   |  13 +
 .../apache/tez/test/dag/MultiAttemptDAG.java    |   7 +
 17 files changed, 856 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
index b9df646..8d2c82f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java
@@ -20,15 +20,44 @@ package org.apache.tez.runtime.api;
 
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
+
 /**
  * <code>TezRootInputInitializer</code>s are used to initialize root vertices
  * within the AM. They can be used to distribute data across the tasks for the
  * vertex, determine the number of tasks at runtime, update the Input payload
  * etc.
  */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
 public interface TezRootInputInitializer {
-  
+
+  /**
+   * Run the root input initializer. This is the main method where initialization takes place. If an
+   * Initializer is written to accept events, a notification mechanism should be setup, with the
+   * heavy lifting of processing the event being done via this method. The moment this method
+   * returns a list of events, RootInputInitialization is considered to be complete.
+   *
+   * @param inputVertexContext initializer context which can be used to access the payload, vertex
+   *                           properties, etc
+   * @return a list of events which are eventually routed to a {@link org.apache.tez.dag.api.VertexManagerPlugin}
+   * for routing
+   * @throws Exception
+   */
   List<Event> initialize(TezRootInputInitializerContext inputVertexContext)
       throws Exception;
+
+  /**
+   * Handle events meant for the specific Initializer. This is a notification mechanism to inform
+   * the Initializer about events received. Extensive event processing should not be performed via
+   * this method call. Instead this should just be used as a notification method to the main
+   * initialization via the initialize method.
+   *
+   * @param events list of events
+   * @throws Exception
+   */
+  void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws Exception;
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
index 62ced2e..399579a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
@@ -79,4 +79,11 @@ public interface TezRootInputInitializerContext {
    */
   int getDAGAttemptNumber();
 
+  /**
+   * Get the number of tasks in the given vertex
+   * @param vertexName
+   * @return Total number of tasks in this vertex
+   */
+  public int getVertexNumTasks(String vertexName);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputInitializerEvent.java
new file mode 100644
index 0000000..6e78add
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputInitializerEvent.java
@@ -0,0 +1,83 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.common.TezUserPayload;
+import org.apache.tez.runtime.api.Event;
+
+/**
+ * An event that is routed to the specified RootInputInitializer.
+ */
+public class RootInputInitializerEvent extends Event {
+
+  private String targetVertexName;
+  private String targetInputName;
+
+  private int version;
+  private TezUserPayload eventPayload;
+
+  /**
+   * @param targetVertexName the vertex on which the targeted Input exists
+   * @param targetInputName  the name of the root input
+   * @param eventPayload     the payload for the event. It is advisable to limit the size of the
+   *                         payload to a few KB at max
+   * @param version          version of the event. Multiple versions may be generated in case of
+   *                         retries
+   */
+  public RootInputInitializerEvent(String targetVertexName, String targetInputName,
+                                   byte[] eventPayload, int version) {
+    this.targetVertexName = targetVertexName;
+    this.targetInputName = targetInputName;
+    this.version = version;
+    this.eventPayload = new TezUserPayload(eventPayload);
+  }
+
+  /**
+   * Get the vertex name on which the targeted Input exists
+   *
+   * @return the vertex name
+   */
+  public String getTargetVertexName() {
+    return this.targetVertexName;
+  }
+
+  /**
+   * Get the input name to which this event is targeted
+   *
+   * @return the input name
+   */
+  public String getTargetInputName() {
+    return this.targetInputName;
+  }
+
+  public int getVersion() {
+    return this.version;
+  }
+
+  /**
+   * Get the actual user payload
+   *
+   * @return a byte representation of the payload
+   */
+  public byte[] getUserPayload() {
+    return this.eventPayload.getPayload();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index ab4525e..9f12508 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -54,3 +54,10 @@ message CompositeEventProto {
   optional int32 end_index = 2;
   optional bytes user_payload = 3;
 }
+
+message RootInputInitializerEventProto {
+  optional string target_vertex_name = 1;
+  optional string target_input_name = 2;
+  optional int32 version = 3;
+  optional bytes user_payload = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
new file mode 100644
index 0000000..c4a1085
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
+import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
+import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
+
+public class RootInputInitializerManager {
+
+  private static final Log LOG = LogFactory.getLog(RootInputInitializerManager.class);
+
+  private final ExecutorService rawExecutor;
+  private final ListeningExecutorService executor;
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+  private volatile boolean isStopped = false;
+  private final UserGroupInformation dagUgi;
+
+  private final Vertex vertex;
+  private final AppContext appContext;
+
+  private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();
+
+  @SuppressWarnings("rawtypes")
+  public RootInputInitializerManager(Vertex vertex, AppContext appContext,
+                                     UserGroupInformation dagUgi) {
+    this.appContext = appContext;
+    this.vertex = vertex;
+    this.eventHandler = appContext.getEventHandler();
+    this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
+    this.executor = MoreExecutors.listeningDecorator(rawExecutor);
+    this.dagUgi = dagUgi;
+  }
+  
+  public void runInputInitializers(List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
+
+    for (RootInputLeafOutputDescriptor<InputDescriptor> input : inputs) {
+      TezRootInputInitializer initializer = createInitializer(input);
+      InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, vertex, appContext);
+      initializerMap.put(input.getEntityName(), initializerWrapper);
+      ListenableFuture<List<Event>> future = executor
+          .submit(new InputInitializerCallable(initializerWrapper, dagUgi));
+      Futures.addCallback(future, createInputInitializerCallback(initializerWrapper));
+    }
+  }
+
+
+  @VisibleForTesting
+  protected TezRootInputInitializer createInitializer(RootInputLeafOutputDescriptor<InputDescriptor> input) {
+    String className = input.getInitializerClassName();
+    @SuppressWarnings("unchecked")
+    Class<? extends TezRootInputInitializer> clazz =
+        (Class<? extends TezRootInputInitializer>) RuntimeUtils
+            .getClazz(className);
+    TezRootInputInitializer initializer = null;
+    try {
+      initializer = clazz.newInstance();
+    } catch (InstantiationException e) {
+      throw new TezUncheckedException("Failed to create input initializerWrapper", e);
+    } catch (IllegalAccessException e) {
+      throw new TezUncheckedException("Failed to create input initializerWrapper", e);
+    }
+    return initializer;
+  }
+
+  public void handleInitializerEvent(RootInputInitializerEvent event) {
+    Preconditions.checkState(vertex.getName().equals(event.getTargetVertexName()),
+        "Received event for incorrect vertex");
+    Preconditions.checkNotNull(event.getTargetInputName(), "target input name must be set");
+    InitializerWrapper initializer = initializerMap.get(event.getTargetInputName());
+    Preconditions.checkState(initializer != null,
+        "Received event for unknown input : " + event.getTargetInputName());
+    // This is a restriction based on current flow - i.e. events generated only by initialize().
+    // TODO Rework the flow as per the first comment on TEZ-1076
+    if (isStopped) {
+      LOG.warn("InitializerManager already stopped for " + vertex.getLogIdentifier() +
+          " Dropping event. [" + event + "]");
+      return;
+    }
+    if (initializer.isComplete()) {
+      LOG.warn(
+          "Event targeted at vertex " + vertex.getLogIdentifier() + ", initializerWrapper for Input: " +
+              initializer.getInput().getEntityName() +
+              " will be dropped, since Input has already been initialized. [" + event + "]");
+    }
+    try {
+      initializer.getInitializer().handleInputInitializerEvent(Lists.newArrayList(event));
+    } catch (Exception e) {
+      throw new TezUncheckedException(
+          "Initializer for input: " + event.getTargetInputName() + " failed to process event", e);
+    }
+  }
+
+  @VisibleForTesting
+  protected InputInitializerCallback createInputInitializerCallback(InitializerWrapper initializer) {
+    return new InputInitializerCallback(initializer, eventHandler, vertex.getVertexId());
+  }
+  
+  public void shutdown() {
+    if (executor != null && !isStopped) {
+      // Don't really care about what is running if an error occurs. If no error
+      // occurs, all execution is complete.
+      executor.shutdownNow();
+      isStopped = true;
+    }
+  }
+
+  private static class InputInitializerCallable implements
+      Callable<List<Event>> {
+
+    private final InitializerWrapper initializerWrapper;
+    private final UserGroupInformation ugi;
+
+    public InputInitializerCallable(InitializerWrapper initializer, UserGroupInformation ugi) {
+      this.initializerWrapper = initializer;
+      this.ugi = ugi;
+    }
+
+    @Override
+    public List<Event> call() throws Exception {
+      List<Event> events = ugi.doAs(new PrivilegedExceptionAction<List<Event>>() {
+        @Override
+        public List<Event> run() throws Exception {
+          LOG.info(
+              "Starting InputInitializer for Input: " + initializerWrapper.getInput().getEntityName() +
+                  " on vertex " + initializerWrapper.getVertexLogIdentifier());
+          return initializerWrapper.getInitializer().initialize(initializerWrapper.context);
+        }
+      });
+      return events;
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @VisibleForTesting
+  private static class InputInitializerCallback implements
+      FutureCallback<List<Event>> {
+
+    private final InitializerWrapper initializer;
+    private final EventHandler eventHandler;
+    private final TezVertexID vertexID;
+
+    public InputInitializerCallback(InitializerWrapper initializer,
+        EventHandler eventHandler, TezVertexID vertexID) {
+      this.initializer = initializer;
+      this.eventHandler = eventHandler;
+      this.vertexID = vertexID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onSuccess(List<Event> result) {
+      initializer.setComplete();
+      LOG.info(
+          "Succeeded InputInitializer for Input: " + initializer.getInput().getEntityName() +
+              " on vertex " + initializer.getVertexLogIdentifier());
+      eventHandler.handle(new VertexEventRootInputInitialized(vertexID,
+          initializer.getInput().getEntityName(), result));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onFailure(Throwable t) {
+      initializer.setComplete();
+      LOG.info(
+          "Failed InputInitializer for Input: " + initializer.getInput().getEntityName() +
+              " on vertex " + initializer.getVertexLogIdentifier());
+      eventHandler
+          .handle(new VertexEventRootInputFailed(vertexID, initializer.getInput().getEntityName(), t));
+    }
+  }
+
+  private static class InitializerWrapper {
+
+
+    private final RootInputLeafOutputDescriptor<InputDescriptor> input;
+    private final TezRootInputInitializer initializer;
+    private final TezRootInputInitializerContext context;
+    private final AtomicBoolean isComplete = new AtomicBoolean(false);
+    private final String vertexLogIdentifier;
+
+    InitializerWrapper(RootInputLeafOutputDescriptor<InputDescriptor> input,
+                       TezRootInputInitializer initializer, Vertex vertex,
+                       AppContext appContext) {
+      this.input = input;
+      this.initializer = initializer;
+      this.context = new TezRootInputInitializerContextImpl(input, vertex, appContext);
+      this.vertexLogIdentifier = vertex.getLogIdentifier();
+    }
+
+    public RootInputLeafOutputDescriptor<InputDescriptor> getInput() {
+      return input;
+    }
+
+    public TezRootInputInitializer getInitializer() {
+      return initializer;
+    }
+
+    public TezRootInputInitializerContext getContext() {
+      return context;
+    }
+
+    public String getVertexLogIdentifier() {
+      return vertexLogIdentifier;
+    }
+
+    public boolean isComplete() {
+      return isComplete.get();
+    }
+
+    public void setComplete() {
+      this.isComplete.set(true);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
deleted file mode 100644
index cb2bf82..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag;
-
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.RuntimeUtils;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
-import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
-import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
-import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class RootInputInitializerRunner {
-
-  private static final Log LOG = LogFactory.getLog(RootInputInitializerRunner.class);
-  
-  private final ExecutorService rawExecutor;
-  private final ListeningExecutorService executor;
-  private final String dagName;
-  private final String vertexName;
-  private final TezVertexID vertexID;
-  private final int numTasks;
-  private final Resource vertexTaskResource;
-  private final Resource totalResource;
-  @SuppressWarnings("rawtypes")
-  private final EventHandler eventHandler;
-  private volatile boolean isStopped = false;
-  private final UserGroupInformation dagUgi;
-  private final int numClusterNodes;
-  private final int dagAttemptNumber;
-
-  @SuppressWarnings("rawtypes")
-  public RootInputInitializerRunner(String dagName, String vertexName,
-      TezVertexID vertexID, EventHandler eventHandler, UserGroupInformation dagUgi,
-      Resource vertexTaskResource, Resource totalResource, int numTasks, int numNodes,
-      int dagAttemptNumber) {
-    this.dagName = dagName;
-    this.vertexName = vertexName;
-    this.vertexID = vertexID;
-    this.eventHandler = eventHandler;
-    this.vertexTaskResource = vertexTaskResource;
-    this.totalResource = totalResource;
-    this.numTasks = numTasks;
-    this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("InputInitializer [" + this.vertexName + "] #%d").build());
-    this.executor = MoreExecutors.listeningDecorator(rawExecutor);
-    this.dagUgi = dagUgi;
-    this.numClusterNodes = numNodes;
-    this.dagAttemptNumber = dagAttemptNumber;
-  }
-  
-  public void runInputInitializers(List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
-    for (RootInputLeafOutputDescriptor<InputDescriptor> input : inputs) {
-      ListenableFuture<List<Event>> future = executor
-          .submit(new InputInitializerCallable(input, vertexID, dagName,
-              vertexName, dagUgi, numTasks, numClusterNodes, vertexTaskResource, totalResource,
-              dagAttemptNumber));
-      Futures.addCallback(future, createInputInitializerCallback(input.getEntityName()));
-    }
-  }
-
-  @VisibleForTesting
-  protected InputInitializerCallback createInputInitializerCallback(String entityName) {
-    return new InputInitializerCallback(entityName, eventHandler, vertexID);
-  }
-  
-  public void shutdown() {
-    if (executor != null && !isStopped) {
-      // Don't really care about what is running if an error occurs. If no error
-      // occurs, all execution is complete.
-      executor.shutdownNow();
-      isStopped = true;
-    }
-  }
-
-  private static class InputInitializerCallable implements
-      Callable<List<Event>> {
-
-    private final RootInputLeafOutputDescriptor<InputDescriptor> input;
-    private final TezVertexID vertexID;
-    private final String dagName;
-    private final String vertexName;
-    private final int numTasks;
-    private final Resource vertexTaskResource;
-    private final Resource totalResource;
-    private final UserGroupInformation ugi;
-    private final int numClusterNodes;
-    private final int dagAttemptNumber;
-
-    public InputInitializerCallable(RootInputLeafOutputDescriptor<InputDescriptor> input,
-        TezVertexID vertexID, String dagName, String vertexName, UserGroupInformation ugi, 
-        int numTasks, int numClusterNodes, Resource vertexTaskResource, Resource totalResource,
-        int dagAttemptNumber) {
-      this.input = input;
-      this.vertexID = vertexID;
-      this.dagName = dagName;
-      this.vertexName = vertexName;
-      this.numTasks = numTasks;
-      this.vertexTaskResource = vertexTaskResource;
-      this.totalResource = totalResource;
-      this.ugi = ugi;
-      this.numClusterNodes = numClusterNodes;
-      this.dagAttemptNumber = dagAttemptNumber;
-    }
-
-    @Override
-    public List<Event> call() throws Exception {
-      List<Event> events = ugi.doAs(new PrivilegedExceptionAction<List<Event>>() {
-        @Override
-        public List<Event> run() throws Exception {
-          TezRootInputInitializer initializer = createInitializer();
-          TezRootInputInitializerContext context = new TezRootInputInitializerContextImpl(vertexID,
-              dagName, vertexName, input.getEntityName(), input.getDescriptor(), 
-              numTasks, numClusterNodes, vertexTaskResource, totalResource,
-              dagAttemptNumber);
-          return initializer.initialize(context);
-        }
-      });
-      return events;
-    }
-
-    private TezRootInputInitializer createInitializer() throws InstantiationException,
-        IllegalAccessException {
-      String className = input.getInitializerClassName();
-      @SuppressWarnings("unchecked")
-      Class<? extends TezRootInputInitializer> clazz = (Class<? extends TezRootInputInitializer>) RuntimeUtils
-          .getClazz(className);
-      TezRootInputInitializer initializer = clazz.newInstance();
-      return initializer;
-    }
-  }
-
-  @SuppressWarnings("rawtypes")
-  @VisibleForTesting
-  private static class InputInitializerCallback implements
-      FutureCallback<List<Event>> {
-
-    private final String inputName;
-    private final EventHandler eventHandler;
-    private final TezVertexID vertexID;
-
-    public InputInitializerCallback(String inputName,
-        EventHandler eventHandler, TezVertexID vertexID) {
-      this.inputName = inputName;
-      this.eventHandler = eventHandler;
-      this.vertexID = vertexID;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void onSuccess(List<Event> result) {
-      eventHandler.handle(new VertexEventRootInputInitialized(vertexID,
-          inputName, result));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void onFailure(Throwable t) {
-      eventHandler
-          .handle(new VertexEventRootInputFailed(vertexID, inputName, t));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index da65458..2ee3483 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -129,4 +129,6 @@ public interface Vertex extends Comparable<Vertex> {
 
   VertexState restoreFromEvent(HistoryEvent event);
 
+  String getLogIdentifier();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index b0fb059..ab21dfb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -22,89 +22,77 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 
 public class TezRootInputInitializerContextImpl implements
     TezRootInputInitializerContext {
 
-  private final TezVertexID vertexID;
-  private final String dagName;
-  private final String inputName;
-  private final InputDescriptor inputDescriptor;
-  private final int numTasks;
-  private final Resource vertexTaskResource;
-  private final Resource totalResource;
-  private final int numClusterNodes;
-  private final int dagAttemptNumber;
+  private RootInputLeafOutputDescriptor<InputDescriptor> input;
+  private final Vertex vertex;
+  private final AppContext appContext;
 
   // TODO Add support for counters - merged with the Vertex counters.
-  
-  public TezRootInputInitializerContextImpl(TezVertexID vertexID,
-      String dagName, String vertexName, String inputName,
-      InputDescriptor inputDescriptor, int numTasks, int numClusterNodes,
-      Resource vertexTaskResource, Resource totalResource,
-      int dagAttemptNumber) {
-    checkNotNull(vertexID, "vertexID is null");
-    checkNotNull(dagName, "dagName is null");
-    checkNotNull(inputName, "inputName is null");
-    checkNotNull(inputDescriptor, "inputDescriptor is null");
-    checkNotNull(vertexTaskResource, "numTasks is null");
-    checkNotNull(totalResource, "totalResource is null");
-    this.vertexID = vertexID;
-    this.dagName = dagName;
-    this.inputName = inputName;
-    this.inputDescriptor = inputDescriptor;
-    this.numTasks = numTasks;
-    this.vertexTaskResource = vertexTaskResource;
-    this.totalResource = totalResource;
-    this.numClusterNodes = numClusterNodes;
-    this.dagAttemptNumber = dagAttemptNumber;
+
+  public TezRootInputInitializerContextImpl(RootInputLeafOutputDescriptor<InputDescriptor> input, Vertex vertex,
+                                            AppContext appContext) {
+    checkNotNull(input, "input is null");
+    checkNotNull(vertex, "vertex is null");
+    checkNotNull(appContext, "appContext is null");
+    this.input = input;
+    this.vertex = vertex;
+    this.appContext = appContext;
   }
 
   @Override
   public ApplicationId getApplicationId() {
-    return vertexID.getDAGId().getApplicationId();
+    return vertex.getVertexId().getDAGId().getApplicationId();
   }
 
   @Override
   public String getDAGName() {
-    return this.dagName;
+    return vertex.getDAG().getName();
   }
 
   @Override
   public String getInputName() {
-    return this.inputName;
+    return this.input.getEntityName();
   }
 
   @Override
   public byte[] getUserPayload() {
-    return inputDescriptor.getUserPayload();
+    return this.input.getDescriptor().getUserPayload();
   }
   
   @Override 
   public int getNumTasks() {
-    return numTasks;
+    return vertex.getTotalTasks();
   }
 
   @Override
   public Resource getVertexTaskResource() {
-    return vertexTaskResource;
+    return vertex.getTaskResource();
   }
 
   @Override
   public Resource getTotalAvailableResource() {
-    return totalResource;
+    return appContext.getTaskScheduler().getTotalResources();
   }
 
   @Override
   public int getNumClusterNodes() {
-    return numClusterNodes;
+    return appContext.getTaskScheduler().getNumClusterNodes();
   }
 
   @Override
   public int getDAGAttemptNumber() {
-    return dagAttemptNumber;
+    return appContext.getApplicationAttemptId().getAttemptId();
+  }
+
+  @Override
+  public int getVertexNumTasks(String vertexName) {
+    return appContext.getCurrentDAG().getVertex(vertexName).getTotalTasks();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4204c0a..a68093a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -80,7 +80,7 @@ import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.RootInputInitializerRunner;
+import org.apache.tez.dag.app.dag.RootInputInitializerManager;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
@@ -139,6 +139,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -567,7 +568,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
   List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
 
-  private RootInputInitializerRunner rootInputInitializer;
+
+  private RootInputInitializerManager rootInputInitializerManager;
 
   VertexManager vertexManager;
   
@@ -1055,7 +1057,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     }
   }
-  
+
+  @Override
+  public String getLogIdentifier() {
+    return this.logIdentifier;
+  }
+
   private void setTaskLocationHints(VertexLocationHint vertexLocationHint) {
     if (vertexLocationHint != null && 
         vertexLocationHint.getTaskLocationHints() != null && 
@@ -2584,11 +2591,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         if (vertex.inputsWithInitializers != null) {
           // Use DAGScheduler to arbitrate resources among vertices later
-          vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
+          vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
               vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
-              vertex.eventHandler, -1, 
+              vertex.eventHandler, -1,
               vertex.appContext.getTaskScheduler().getNumClusterNodes(),
-              vertex.getTaskResource(), 
+              vertex.getTaskResource(),
               vertex.appContext.getTaskScheduler().getTotalResources());
           List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
               .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
@@ -2598,7 +2605,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           LOG.info("Vertex will initialize via inputInitializers "
               + vertex.logIdentifier + ". Starting root input initializers: "
               + vertex.inputsWithInitializers.size());
-          vertex.rootInputInitializer.runInputInitializers(inputList);
+          vertex.rootInputInitializerManager.runInputInitializers(inputList);
           return VertexState.INITIALIZING;
         } else {
           boolean hasOneToOneUninitedSource = false;
@@ -2627,11 +2634,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
         vertex.createTasks();
         if (vertex.inputsWithInitializers != null) {
-          vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
+          vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
               vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
-              vertex.eventHandler, vertex.getTotalTasks(), 
+              vertex.eventHandler, vertex.getTotalTasks(),
               vertex.appContext.getTaskScheduler().getNumClusterNodes(),
-              vertex.getTaskResource(), 
+              vertex.getTaskResource(),
               vertex.appContext.getTaskScheduler().getTotalResources());
           List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
               .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
@@ -2643,7 +2650,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // special case when numTasks>0 and still we want to stay in initializing 
           // state. This is handled in RootInputInitializedTransition specially.
           vertex.initWaitsForRootInitializers = true;
-          vertex.rootInputInitializer.runInputInitializers(inputList);
+          vertex.rootInputInitializerManager.runInputInitializers(inputList);
           return VertexState.INITIALIZING;
         }
         if (!vertex.uninitializedEdges.isEmpty()) {
@@ -2662,13 +2669,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   } // end of InitTransition
 
   @VisibleForTesting
-  protected RootInputInitializerRunner createRootInputInitializerRunner(
+  protected RootInputInitializerManager createRootInputInitializerManager(
       String dagName, String vertexName, TezVertexID vertexID,
-      EventHandler eventHandler, int numTasks, int numNodes, 
+      EventHandler eventHandler, int numTasks, int numNodes,
       Resource vertexTaskResource, Resource totalResource) {
-    return new RootInputInitializerRunner(dagName, vertexName, vertexID,
-        eventHandler, dagUgi, vertexTaskResource, totalResource, numTasks, numNodes,
-        appContext.getApplicationAttemptId().getAttemptId());
+    return new RootInputInitializerManager(this, appContext, this.dagUgi);
   }
   
   private boolean initializeVertexInInitializingState() {
@@ -2730,7 +2735,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.numInitializedInputs++;
       if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
         // All inputs initialized, shutdown the initializer.
-        vertex.rootInputInitializer.shutdown();
+        vertex.rootInputInitializerManager.shutdown();
       }
       
       // done. check if we need to do the initialization
@@ -2998,8 +3003,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           vertex.addDiagnostic(fe.getError().getMessage());
         }
       }
-      if (vertex.rootInputInitializer != null) {
-        vertex.rootInputInitializer.shutdown();
+      if (vertex.rootInputInitializerManager != null) {
+        vertex.rootInputInitializerManager.shutdown();
       }
       vertex.finished(VertexState.FAILED,
           VertexTerminationCause.ROOT_INPUT_INIT_FAILURE);
@@ -3036,8 +3041,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       super.transition(vertex, event);
-      if (vertex.rootInputInitializer != null) {
-        vertex.rootInputInitializer.shutdown();
+      if (vertex.rootInputInitializerManager != null) {
+        vertex.rootInputInitializerManager.shutdown();
       }
     }
   }
@@ -3259,11 +3264,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private static void checkEventSourceMetadata(Vertex vertex,
       EventMetaData sourceMeta) {
-    if (!isEventFromVertex(vertex, sourceMeta)) {
-        throw new TezUncheckedException("Bad routing of event"
-            + ", Event-vertex=" + sourceMeta.getTaskVertexName()
-            + ", Expected=" + vertex.getName());
-    }
+    assert isEventFromVertex(vertex, sourceMeta);
   }
 
 //  private static class RouteEventsWhileInitializingTransition implements
@@ -3374,14 +3375,32 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         {
           VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
           Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
+          Preconditions.checkArgument(target != null,
+              "Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
           if (target == vertex) {
             vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
           } else {
+            checkEventSourceMetadata(vertex, sourceMeta);
             vertex.eventHandler.handle(new VertexEventRouteEvent(target
                 .getVertexId(), Collections.singletonList(tezEvent)));
           }
         }
           break;
+        case ROOT_INPUT_INITIALIZER_EVENT:
+        {
+          RootInputInitializerEvent riEvent = (RootInputInitializerEvent) tezEvent.getEvent();
+          Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
+          Preconditions.checkArgument(target != null,
+              "Event sent to unkown vertex: " + riEvent.getTargetVertexName());
+          if (target == vertex) {
+            vertex.rootInputInitializerManager.handleInitializerEvent(riEvent);
+          } else {
+            checkEventSourceMetadata(vertex, sourceMeta);
+            vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
+                Collections.singletonList(tezEvent)));
+          }
+        }
+          break;
         case INPUT_READ_ERROR_EVENT:
           {
             checkEventSourceMetadata(vertex, sourceMeta);
@@ -3587,12 +3606,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   String getJavaOpts() {
     return this.javaOpts;
   }
-  
-  @VisibleForTesting
-  RootInputInitializerRunner getRootInputInitializerRunner() {
-    return this.rootInputInitializer;
-  }
-  
+
   @VisibleForTesting
   TaskLocationHint[] getTaskLocationHints() {
     return taskLocationHints;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c9bbf3f..8071e52 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -39,11 +39,15 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.protobuf.ByteString;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.Credentials;
@@ -93,7 +97,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.RootInputInitializerRunner;
+import org.apache.tez.dag.app.dag.RootInputInitializerManager;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -128,10 +132,13 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.RootInputSpecUpdate;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.EdgeManagerForTest;
@@ -159,6 +166,7 @@ public class TestVertexImpl {
   private static final Log LOG = LogFactory.getLog(TestVertexImpl.class);
 
   private boolean useCustomInitializer = false;
+  private TezRootInputInitializer customInitializer = null;
   
   private TezDAGID dagId;
   private ApplicationAttemptId appAttemptId;
@@ -419,7 +427,7 @@ public class TestVertexImpl {
   }
 
   private DAGPlan createDAGPlanWithInputInitializer(String initializerClassName) {
-    LOG.info("Setting up invalid dag plan with input initializer");
+    LOG.info("Setting up dag plan with input initializer");
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("testVertexWithInitializer")
         .addVertex(
@@ -548,6 +556,69 @@ public class TestVertexImpl {
         .build();
     return dag;
   }
+
+  private DAGPlan createDAGPlanWithRunningInitializer() {
+    LOG.info("Setting up dag plan with running input initializer");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("DagWithInputInitializer2")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(10)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                        .setInitializerClassName("IrrelevantInitializerClassName")
+                        .setName("input1")
+                        .setEntityDescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("InputClazz")
+                                .build()
+                        )
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(20)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
+                )
+                .addInEdgeId("e1")
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex2")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+    return dag;
+  }
   
   private DAGPlan createDAGPlanWithInputDistributor(String initializerClassName) {
     LOG.info("Setting up invalid dag plan with input distributor");
@@ -1479,9 +1550,15 @@ public class TestVertexImpl {
       VertexLocationHint locationHint = DagTypeConverters.convertFromDAGPlan(
           vPlan.getTaskLocationHintList());
       if (useCustomInitializer) {
-        v = new VertexImplWithCustomInitializer(vertexId, vPlan, vPlan.getName(), conf,
-          dispatcher.getEventHandler(), taskAttemptListener,
-          clock, thh, appContext, locationHint, dispatcher);
+        if (customInitializer == null) {
+          v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
+              dispatcher.getEventHandler(), taskAttemptListener,
+              clock, thh, appContext, locationHint, dispatcher);
+        } else {
+          v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
+              dispatcher.getEventHandler(), taskAttemptListener,
+              clock, thh, appContext, locationHint, dispatcher, customInitializer);
+        }
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskAttemptListener,
@@ -1630,6 +1707,7 @@ public class TestVertexImpl {
   @Before
   public void setup() {
     useCustomInitializer = false;
+    customInitializer = null;
     setupPreDagCreation();
     dagPlan = createTestDAGPlan();
     invalidDagPlan = createInvalidDAGPlan();
@@ -2532,7 +2610,7 @@ public class TestVertexImpl {
     setupPostDagCreation();
     
     int numTasks = 5;
-    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+    VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
         .get("vertex1");
     VertexImpl v5 = vertices.get("vertex5");
     initVertex(v1);
@@ -2550,9 +2628,9 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState());
     
-    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+    RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
-    runner1.completeInputInitialization(0, numTasks, v1Hints);
+    initializerManager1.completeInputInitialization(0, numTasks, v1Hints);
 
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(numTasks, v1.getTotalTasks());
@@ -2561,7 +2639,7 @@ public class TestVertexImpl {
     for (int i=0; i < v1Hints.size(); ++i) {
       Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]);
     }
-    Assert.assertEquals(true, runner1.hasShutDown);
+    Assert.assertEquals(true, initializerManager1.hasShutDown);
     
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
     Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
@@ -2690,31 +2768,83 @@ public class TestVertexImpl {
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
     setupPostDagCreation();
 
-    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+    VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
         .get("vertex1");
     dispatcher.getEventHandler().handle(
         new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
-    runner1.failInputInitialization();
+    RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
+    initializerManager1.failInputInitialization();
 
     Assert.assertEquals(VertexState.FAILED, v1.getState());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v1
         .getVertexManager().getPlugin().getClass().getName());
-    Assert.assertEquals(true, runner1.hasShutDown);
+    Assert.assertEquals(true, initializerManager1.hasShutDown);
     
-    VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
+    VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
-    RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
-    runner2.failInputInitialization();
+    RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
+    initializerManager2.failInputInitialization();
     
     Assert.assertEquals(VertexState.FAILED, v2.getState());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v2
         .getVertexManager().getPlugin().getClass().getName());
-    Assert.assertEquals(true, runner2.hasShutDown);
+    Assert.assertEquals(true, initializerManager2.hasShutDown);
   }
-  
+
+  @Test(timeout = 10000)
+  public void testRootInputInitializerEvent() throws Exception {
+    useCustomInitializer = true;
+    customInitializer = new EventHandlingRootInputInitializer();
+    EventHandlingRootInputInitializer initializer =
+        (EventHandlingRootInputInitializer) customInitializer;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithRunningInitializer();
+    setupPostDagCreation();
+
+    VertexImplWithRunningInputInitializer v1 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex1");
+    VertexImplWithRunningInputInitializer v2 =
+        (VertexImplWithRunningInputInitializer) vertices.get("vertex2");
+
+    initVertex(v1);
+    startVertex(v1);
+    Assert.assertEquals(VertexState.RUNNING, v1.getState());
+    Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    dispatcher.await();
+
+    RootInputInitializerManagerWithRunningInitializer manager2 = v2.getRootInputInitializerManager();
+    // Wait for the initializer to be invoked - which may be a separate thread.
+    while (!initializer.initStarted.get()) {
+      Thread.sleep(10);
+    }
+    Assert.assertFalse(initializer.eventReceived.get());
+    Assert.assertFalse(initializer.initComplete.get());
+
+    // Signal the initializer by sending an event - via vertex1
+    RootInputInitializerEvent event = new RootInputInitializerEvent("vertex2", "input1", null, 0);
+    TezEvent tezEvent = new TezEvent(event,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, null));
+
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
+    dispatcher.await();
+
+    // Both happening in separate threads
+    while (!initializer.eventReceived.get()) {
+      Thread.sleep(10);
+    }
+    while (!initializer.initComplete.get()) {
+      Thread.sleep(10);
+    }
+
+    // Will eventually go into RUNNING state, via INITED
+    while (v2.getState()  != VertexState.RUNNING) {
+      Thread.sleep(10);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexWithInitializerSuccess() {
@@ -2723,15 +2853,15 @@ public class TestVertexImpl {
     dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
     setupPostDagCreation();
 
-    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+    VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
         .get("vertex1");
     dispatcher.getEventHandler().handle(
         new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+    RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
-    runner1.completeInputInitialization(0, 5, v1Hints);
+    initializerManager1.completeInputInitialization(0, 5, v1Hints);
 
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(5, v1.getTotalTasks());
@@ -2742,14 +2872,14 @@ public class TestVertexImpl {
     for (int i=0; i < v1Hints.size(); ++i) {
       Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]);
     }
-    Assert.assertEquals(true, runner1.hasShutDown);
+    Assert.assertEquals(true, initializerManager1.hasShutDown);
     for (int i = 0; i < 5; i++) {
       List<InputSpec> inputSpecs = v1.getInputSpecList(i);
       Assert.assertEquals(1, inputSpecs.size());
       Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
     }
     
-    VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
+    VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
     
     // non-task events dont get buffered
@@ -2768,9 +2898,9 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(1, v2.pendingTaskEvents.size());
     
-    RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
+    RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
     List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
-    runner2.completeInputInitialization(0, 10, v2Hints);
+    initializerManager2.completeInputInitialization(0, 10, v2Hints);
     
     Assert.assertEquals(VertexState.INITED, v2.getState());
     Assert.assertEquals(10, v2.getTotalTasks());
@@ -2781,7 +2911,7 @@ public class TestVertexImpl {
     for (int i=0; i < v2Hints.size(); ++i) {
       Assert.assertEquals(v2Hints.get(i), v2.getTaskLocationHints()[i]);
     }
-    Assert.assertEquals(true, runner2.hasShutDown);
+    Assert.assertEquals(true, initializerManager2.hasShutDown);
     for (int i = 0; i < 10; i++) {
       List<InputSpec> inputSpecs = v1.getInputSpecList(i);
       Assert.assertEquals(1, inputSpecs.size());
@@ -2797,20 +2927,20 @@ public class TestVertexImpl {
     dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer");
     setupPostDagCreation();
 
-    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+    VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
         .get("vertex1");
-    VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
+    VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
     dispatcher.getEventHandler().handle(
         new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
-    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+    RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     byte[] payload = new byte[0];
-    runner1.completeInputDistribution(payload);
+    initializerManager1.completeInputDistribution(payload);
     // edge is still null so its initializing
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-    Assert.assertEquals(true, runner1.hasShutDown);    
+    Assert.assertEquals(true, initializerManager1.hasShutDown);
     Assert.assertEquals(2, v1.getTotalTasks());
     Assert.assertEquals(payload, v1.getInputSpecList(0).get(0).getInputDescriptor().getUserPayload());
     EdgeManagerDescriptor mockEdgeManagerDescriptor =
@@ -2832,20 +2962,20 @@ public class TestVertexImpl {
     setupPostDagCreation();
 
     int expectedNumTasks = RootInputSpecUpdaterVertexManager.NUM_TASKS;
-    VertexImplWithCustomInitializer v3 = (VertexImplWithCustomInitializer) vertices
+    VertexImplWithControlledInitializerManager v3 = (VertexImplWithControlledInitializerManager) vertices
         .get("vertex3");
     dispatcher.getEventHandler().handle(
         new VertexEvent(v3.getVertexId(), VertexEventType.V_INIT));
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
-    RootInputInitializerRunnerControlled runner1 = v3.getRootInputInitializerRunner();
-    runner1.completeInputInitialization();
+    RootInputInitializerManagerControlled initializerManager1 = v3.getRootInputInitializerManager();
+    initializerManager1.completeInputInitialization();
 
     Assert.assertEquals(VertexState.INITED, v3.getState());
     Assert.assertEquals(expectedNumTasks, v3.getTotalTasks());
     Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v3.getVertexManager()
         .getPlugin().getClass().getName());
-    Assert.assertEquals(true, runner1.hasShutDown);
+    Assert.assertEquals(true, initializerManager1.hasShutDown);
     
     for (int i = 0; i < expectedNumTasks; i++) {
       List<InputSpec> inputSpecs = v3.getInputSpecList(i);
@@ -2863,20 +2993,20 @@ public class TestVertexImpl {
     setupPostDagCreation();
 
     int expectedNumTasks = RootInputSpecUpdaterVertexManager.NUM_TASKS;
-    VertexImplWithCustomInitializer v4 = (VertexImplWithCustomInitializer) vertices
+    VertexImplWithControlledInitializerManager v4 = (VertexImplWithControlledInitializerManager) vertices
         .get("vertex4");
     dispatcher.getEventHandler().handle(
         new VertexEvent(v4.getVertexId(), VertexEventType.V_INIT));
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v4.getState());
-    RootInputInitializerRunnerControlled runner1 = v4.getRootInputInitializerRunner();
-    runner1.completeInputInitialization();
+    RootInputInitializerManagerControlled initializerManager1 = v4.getRootInputInitializerManager();
+    initializerManager1.completeInputInitialization();
 
     Assert.assertEquals(VertexState.INITED, v4.getState());
     Assert.assertEquals(expectedNumTasks, v4.getTotalTasks());
     Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v4.getVertexManager()
         .getPlugin().getClass().getName());
-    Assert.assertEquals(true, runner1.hasShutDown);
+    Assert.assertEquals(true, initializerManager1.hasShutDown);
     
     for (int i = 0; i < expectedNumTasks; i++) {
       List<InputSpec> inputSpecs = v4.getInputSpecList(i);
@@ -2922,17 +3052,62 @@ public class TestVertexImpl {
     }
   }
 
+  private static class VertexImplWithRunningInputInitializer extends VertexImpl {
+
+    private RootInputInitializerManagerWithRunningInitializer rootInputInitializerManager;
+    private final TezRootInputInitializer presetInitializer;
+
+    public VertexImplWithRunningInputInitializer(TezVertexID vertexId,
+                                                 VertexPlan vertexPlan, String vertexName,
+                                                 Configuration conf,
+                                                 EventHandler eventHandler,
+                                                 TaskAttemptListener taskAttemptListener,
+                                                 Clock clock, TaskHeartbeatHandler thh,
+                                                 AppContext appContext,
+                                                 VertexLocationHint vertexLocationHint,
+                                                 DrainDispatcher dispatcher,
+                                                 TezRootInputInitializer presetInitializer) {
+      super(vertexId, vertexPlan, vertexName, conf, eventHandler,
+          taskAttemptListener, clock, thh, true,
+          appContext, vertexLocationHint, null, javaProfilerOptions);
+      this.presetInitializer = presetInitializer;
+    }
+
+    @Override
+    protected RootInputInitializerManager createRootInputInitializerManager(
+        String dagName, String vertexName, TezVertexID vertexID,
+        EventHandler eventHandler, int numTasks, int numNodes,
+        Resource taskResource, Resource totalResource) {
+      try {
+        rootInputInitializerManager =
+            new RootInputInitializerManagerWithRunningInitializer(this, this.getAppContext(),
+                presetInitializer);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return rootInputInitializerManager;
+    }
+
+    RootInputInitializerManagerWithRunningInitializer getRootInputInitializerManager() {
+      return rootInputInitializerManager;
+    }
+  }
+
   @SuppressWarnings("rawtypes")
-  private static class VertexImplWithCustomInitializer extends VertexImpl {
+  private static class VertexImplWithControlledInitializerManager extends VertexImpl {
     
     private final DrainDispatcher dispatcher;
-    private RootInputInitializerRunnerControlled rootInputInitializerRunner;
+    private RootInputInitializerManagerControlled rootInputInitializerManager;
     
-    public VertexImplWithCustomInitializer(TezVertexID vertexId,
-        VertexPlan vertexPlan, String vertexName, Configuration conf,
-        EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
-        Clock clock, TaskHeartbeatHandler thh,
-        AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher) {
+    public VertexImplWithControlledInitializerManager(TezVertexID vertexId,
+                                                      VertexPlan vertexPlan, String vertexName,
+                                                      Configuration conf,
+                                                      EventHandler eventHandler,
+                                                      TaskAttemptListener taskAttemptListener,
+                                                      Clock clock, TaskHeartbeatHandler thh,
+                                                      AppContext appContext,
+                                                      VertexLocationHint vertexLocationHint,
+                                                      DrainDispatcher dispatcher) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskAttemptListener, clock, thh, true,
           appContext, vertexLocationHint, null, javaProfilerOptions);
@@ -2940,27 +3115,49 @@ public class TestVertexImpl {
     }
 
     @Override
-    protected RootInputInitializerRunner createRootInputInitializerRunner(
+    protected RootInputInitializerManager createRootInputInitializerManager(
         String dagName, String vertexName, TezVertexID vertexID,
-        EventHandler eventHandler, int numTasks, int numNodes, 
+        EventHandler eventHandler, int numTasks, int numNodes,
         Resource taskResource, Resource totalResource) {
       try {
-        rootInputInitializerRunner = new RootInputInitializerRunnerControlled(dagName, vertexName, vertexID,
-            eventHandler, numTasks, dispatcher, taskResource, totalResource);
+        rootInputInitializerManager =
+            new RootInputInitializerManagerControlled(this, this.getAppContext(), eventHandler,
+                dispatcher);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-      return rootInputInitializerRunner;
+      return rootInputInitializerManager;
     }
     
-    RootInputInitializerRunnerControlled getRootInputInitializerRunner() {
-      return rootInputInitializerRunner;
+    RootInputInitializerManagerControlled getRootInputInitializerManager() {
+      return rootInputInitializerManager;
     }
   }
-  
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private static class RootInputInitializerRunnerControlled extends
-      RootInputInitializerRunner {
+
+
+  private static class RootInputInitializerManagerWithRunningInitializer
+      extends RootInputInitializerManager {
+
+    private final TezRootInputInitializer presetInitializer;
+
+    public RootInputInitializerManagerWithRunningInitializer(Vertex vertex, AppContext appContext,
+                                                             TezRootInputInitializer presetInitializer) throws
+        IOException {
+      super(vertex, appContext, UserGroupInformation.getCurrentUser());
+      this.presetInitializer = presetInitializer;
+    }
+
+
+    @Override
+    protected TezRootInputInitializer createInitializer(
+        RootInputLeafOutputDescriptor<InputDescriptor> input) {
+      return presetInitializer;
+    }
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private static class RootInputInitializerManagerControlled extends
+      RootInputInitializerManager {
 
     private List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs;
     private final EventHandler eventHandler;
@@ -2968,16 +3165,14 @@ public class TestVertexImpl {
     private final TezVertexID vertexID;
     private volatile boolean hasShutDown = false;
 
-    public RootInputInitializerRunnerControlled(String dagName,
-        String vertexName, TezVertexID vertexID, EventHandler eventHandler,
-        int numTasks, DrainDispatcher dispatcher,
-        Resource taskResource, Resource totalResource) throws IOException {
-      super(dagName, vertexName, vertexID, eventHandler, 
-          UserGroupInformation.getCurrentUser(), 
-          taskResource, totalResource, numTasks, 1, 1);
+    public RootInputInitializerManagerControlled(Vertex vertex, AppContext appContext,
+                                                 EventHandler eventHandler,
+                                                 DrainDispatcher dispatcher
+    ) throws IOException {
+      super(vertex, appContext, UserGroupInformation.getCurrentUser());
       this.eventHandler = eventHandler;
       this.dispatcher = dispatcher;
-      this.vertexID = vertexID;
+      this.vertexID = vertex.getVertexId();
     }
 
     @Override
@@ -2985,7 +3180,25 @@ public class TestVertexImpl {
         List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
       this.inputs = inputs;
     }
-    
+
+    @Override
+    protected TezRootInputInitializer createInitializer(
+        RootInputLeafOutputDescriptor<InputDescriptor> input) {
+
+      return new TezRootInputInitializer() {
+        @Override
+        public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws
+            Exception {
+          return null;
+        }
+
+        @Override
+        public void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws
+            Exception {
+        }
+      };
+    }
+
     @Override
     public void shutdown() {
       hasShutDown = true;
@@ -3004,7 +3217,7 @@ public class TestVertexImpl {
           .getEntityName(), null));
       dispatcher.await();
     }
-    
+
     public void completeInputDistribution(byte[] payload) {
       List<Event> events = Lists.newArrayListWithCapacity(1);
       RootInputUpdatePayloadEvent event = new RootInputUpdatePayloadEvent(payload);
@@ -3013,7 +3226,7 @@ public class TestVertexImpl {
           .get(0).getEntityName(), events));
       dispatcher.await();
     }
-    
+
     public void completeInputInitialization(int initializerIndex, int targetTasks,
         List<TaskLocationHint> locationHints) {
       List<Event> events = Lists.newArrayListWithCapacity(targetTasks + 1);
@@ -3158,7 +3371,8 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vB.getState());
     Assert.assertEquals(VertexState.RUNNING, vC.getState());
   }
-  
+
+  @InterfaceAudience.Private
   public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin {
 
     private VertexManagerPluginContext context;
@@ -3197,4 +3411,44 @@ public class TestVertexImpl {
       context.setVertexParallelism(NUM_TASKS, null, null, map);
     }
   }
+
+  @InterfaceAudience.Private
+  public static class EventHandlingRootInputInitializer implements TezRootInputInitializer {
+
+    final AtomicBoolean initStarted = new AtomicBoolean(false);
+    final AtomicBoolean eventReceived = new AtomicBoolean(false);
+    final AtomicBoolean initComplete = new AtomicBoolean(false);
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition eventCondition = lock.newCondition();
+
+    @Override
+    public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws
+        Exception {
+      initStarted.set(true);
+      lock.lock();
+      try {
+        eventCondition.await();
+      } finally {
+        lock.unlock();
+      }
+      initComplete.set(true);
+      RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(0, new byte[]{0});
+      List<Event> eventList = new LinkedList<Event>();
+      eventList.add(diEvent);
+      return eventList;
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws
+        Exception {
+      eventReceived.set(true);
+      lock.lock();
+      try {
+        eventCondition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index 52b6b1c..87a98b9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -46,6 +46,7 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 
 public class MRInputAMSplitGenerator implements TezRootInputInitializer {
 
@@ -201,4 +202,9 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
     return events;
   }
 
+  @Override
+  public void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws Exception {
+    throw new UnsupportedOperationException("Not expecting to handle any events");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 0b5e345..7d11ab3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -35,6 +35,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
 
 import com.google.common.base.Stopwatch;
@@ -109,4 +110,9 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
 
     return events;
   }
+
+  @Override
+  public void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws Exception {
+    throw new UnsupportedOperationException("Not expecting to handle any events");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index feabefd..5d6ec0d 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -197,6 +197,11 @@ public class TestMRInputSplitDistributor {
       return 1;
     }
 
+    @Override
+    public int getVertexNumTasks(String vertexName) {
+      throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock");
+    }
+
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index 5830623..2213f79 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -23,6 +23,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 
 public class ProtoConverters {
 
@@ -89,4 +90,26 @@ public class ProtoConverters {
     return diEvent;
   }
 
+  public static EventProtos.RootInputInitializerEventProto convertRootInputInitializerEventToProto(
+      RootInputInitializerEvent event) {
+    EventProtos.RootInputInitializerEventProto.Builder builder =
+        EventProtos.RootInputInitializerEventProto.newBuilder();
+    builder.setTargetVertexName(event.getTargetVertexName());
+    builder.setTargetInputName(event.getTargetInputName());
+    builder.setVersion(event.getVersion());
+    if (event.getUserPayload() != null) {
+      builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
+    }
+    return builder.build();
+  }
+
+  public static RootInputInitializerEvent convertRootInputInitializerEventFromProto(
+      EventProtos.RootInputInitializerEventProto proto) {
+    RootInputInitializerEvent event =
+        new RootInputInitializerEvent(proto.getTargetVertexName(), proto.getTargetInputName(),
+            (proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null),
+            proto.getVersion());
+    return event;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
index adefbd6..6d4c902 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -27,5 +27,6 @@ public enum EventType {
   TASK_STATUS_UPDATE_EVENT,
   VERTEX_MANAGER_EVENT,
   ROOT_INPUT_DATA_INFORMATION_EVENT,
-  COMPOSITE_DATA_MOVEMENT_EVENT
+  COMPOSITE_DATA_MOVEMENT_EVENT,
+  ROOT_INPUT_INITIALIZER_EVENT,
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 29ded25..770fd89 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -28,6 +28,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.EventProtos;
 import org.apache.tez.runtime.api.events.EventProtos.CompositeEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
@@ -37,6 +38,7 @@ import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -80,6 +82,8 @@ public class TezEvent implements Writable {
       eventType = EventType.TASK_STATUS_UPDATE_EVENT;
     } else if (event instanceof RootInputDataInformationEvent) {
       eventType = EventType.ROOT_INPUT_DATA_INFORMATION_EVENT;
+    } else if (event instanceof RootInputInitializerEvent) {
+      eventType = EventType.ROOT_INPUT_INITIALIZER_EVENT;
     } else {
       throw new TezUncheckedException("Unknown event, event="
           + event.getClass().getName());
@@ -171,6 +175,11 @@ public class TezEvent implements Writable {
         eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto(
             (RootInputDataInformationEvent) event).toByteArray();
         break;
+      case ROOT_INPUT_INITIALIZER_EVENT:
+        eventBytes = ProtoConverters
+            .convertRootInputInitializerEventToProto((RootInputInitializerEvent) event)
+            .toByteArray();
+        break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
            + ", type=" + eventType);
@@ -234,6 +243,10 @@ public class TezEvent implements Writable {
             .parseFrom(eventBytes);
         event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
         break;
+      case ROOT_INPUT_INITIALIZER_EVENT:
+        EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(eventBytes);
+        event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto);
+        break;
       default:
         // RootInputUpdatePayload event not wrapped in a TezEvent.
         throw new TezUncheckedException("Unexpected TezEvent"

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e7e0b3fb/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 879fecf..bb81f82 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -50,6 +50,7 @@ import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
@@ -218,6 +219,12 @@ public class MultiAttemptDAG {
       }
       return null;
     }
+
+    @Override
+    public void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws
+        Exception {
+      throw new UnsupportedOperationException("Not supported");
+    }
   }
 
   public static class NoOpInput implements LogicalInput, MemoryUpdateCallback {


Mime
View raw message