tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-756. Allow VertexManagerPlugins to configure RootInputEvents without access to Tez internal structures. (sseth)
Date Sat, 15 Feb 2014 03:26:53 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 206a1ed69 -> 8f84fa561


TEZ-756. Allow VertexManagerPlugins to configure RootInputEvents without
access to Tez internal structures. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8f84fa56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8f84fa56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8f84fa56

Branch: refs/heads/master
Commit: 8f84fa56130321e67641070621f7c9579358f6af
Parents: 206a1ed
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Feb 14 19:25:29 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Feb 14 19:25:29 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/VertexLocationHint.java  | 10 +++
 .../tez/dag/api/VertexManagerPluginContext.java | 26 ++++++-
 .../events/RootInputDataInformationEvent.java   | 27 ++++---
 tez-api/src/main/proto/Events.proto             |  5 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  7 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  3 +-
 .../app/dag/impl/RootInputVertexManager.java    | 75 ++++----------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 37 +++++++---
 .../tez/dag/app/dag/impl/VertexManager.java     | 63 ++++++++++++++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  4 +-
 .../org/apache/tez/common/ProtoConverters.java  | 11 +--
 .../vertexmanager/ShuffleVertexManager.java     |  3 +-
 .../vertexmanager/TestShuffleVertexManager.java |  9 +--
 13 files changed, 173 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
index 4f19314..56c55dc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -27,6 +27,7 @@ public class VertexLocationHint  {
   private final int numTasks;
   private final List<TaskLocationHint> taskLocationHints;
 
+  @Deprecated // TODO TEZ-837 Remove in follow up jira
   public VertexLocationHint(int numTasks,
       List<TaskLocationHint> taskLocationHints) {
     this.numTasks = numTasks;
@@ -36,6 +37,15 @@ public class VertexLocationHint  {
       this.taskLocationHints = null;
     }
   }
+  
+  public VertexLocationHint(List<TaskLocationHint> taskLocationHints) {
+    this.numTasks = 0;
+    if (taskLocationHints != null) {
+      this.taskLocationHints = Collections.unmodifiableList(taskLocationHints);
+    } else {
+      this.taskLocationHints = null;
+    }
+  }
 
   public int getNumTasks() {
     return numTasks;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index ed02e0f..f3ca5ef 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -18,10 +18,13 @@
 
 package org.apache.tez.dag.api;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+
 /**
  * Object with API's to interact with the Tez execution engine
  */
@@ -60,13 +63,29 @@ public interface VertexManagerPluginContext {
    * This API can change the parallelism only once. Subsequent attempts will be 
    * disallowed
    * @param parallelism New number of tasks in the vertex
+   * @param locationHint the placement policy for tasks.
    * @param sourceEdgeManagers
    * @return true if the operation was allowed.
    */
-  public boolean setVertexParallelism(int parallelism, 
+  public boolean setVertexParallelism(int parallelism, VertexLocationHint locationHint,
       Map<String, EdgeManager> sourceEdgeManagers);
   
   /**
+   * Allows a VertexManagerPlugin to assign Events for Root Inputs
+   * 
+   * For regular Event Routing changes - the EdgeManager should be configured
+   * via the setVertexParallelism method
+   * 
+   * @param inputName
+   *          The input name associated with the event
+   * @param events
+   *          The list of Events to be assigned to various tasks belonging to
+   *          the Vertex. The target index on individual events represents the
+   *          task to which events need to be sent.
+   */
+  public void addRootInputEvents(String inputName, Collection<RootInputDataInformationEvent>
event);
+  
+  /**
    * Notify the vertex to start the given tasks
    * @param taskIDs Indices of the tasks to be started
    */
@@ -78,10 +97,11 @@ public interface VertexManagerPluginContext {
    * @return Names of inputs to this vertex. Maybe null if there are no inputs
    */
   public Set<String> getVertexInputNames();
-  
+
   /**
    * Set the placement hint for tasks in this vertex
+   * 
    * @param locationHint
    */
   public void setVertexLocationHint(VertexLocationHint locationHint);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
index a4938cc..0b0d32f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
@@ -26,18 +26,27 @@ import org.apache.tez.runtime.api.TezRootInputInitializer;
  * byte payload for individual tasks running as part of the Vertex for which an
  * Initial Input has been configured.
  */
-public class RootInputDataInformationEvent extends Event {
+public final class RootInputDataInformationEvent extends Event {
 
-  private final int index;
+  private final int sourceIndex;
+  private int targetIndex; // TODO Likely to be multiple at a later point.
   private final byte[] userPayload;
   
-  public RootInputDataInformationEvent(int index, byte[] userPayload) {
-    this.index = index;
+  public RootInputDataInformationEvent(int srcIndex, byte[] userPayload) {
+    this.sourceIndex = srcIndex;
     this.userPayload = userPayload;
   }
 
-  public int getIndex() {
-    return this.index;
+  public int getSourceIndex() {
+    return this.sourceIndex;
+  }
+
+  public int getTargetIndex() {
+    return this.targetIndex;
+  }
+
+  public void setTargetIndex(int target) {
+    this.targetIndex = target;
   }
   
   public byte[] getUserPayload() {
@@ -46,7 +55,7 @@ public class RootInputDataInformationEvent extends Event {
 
   @Override
   public String toString() {
-    return "RootInputDataInformationEvent [index=" + index + "]";
-  }
-
+    return "RootInputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+        + targetIndex + "]";
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index c5cfabf..19d1978 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -45,8 +45,9 @@ message VertexManagerEventProto {
 }
 
 message RootInputDataInformationEventProto {
-  optional int32 index = 1;
-  optional bytes user_payload = 2;
+  optional int32 source_index = 1;
+  optional int32 target_index = 2;
+  optional bytes user_payload = 3;
 }
 
 message CompositeEventProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c19eb7c..f01e71d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -261,11 +261,11 @@ public class DAGAppMaster extends AbstractService {
     this.amConf = conf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
+    dispatcher = createDispatcher();
     context = new RunningAppContext(conf);
 
     clientHandler = new DAGClientHandler();
 
-    dispatcher = createDispatcher();
     addIfService(dispatcher, false);
 
     clientRpcServer = new DAGClientServer(clientHandler, appAttemptID);
@@ -998,8 +998,10 @@ public class DAGAppMaster extends AbstractService {
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock rLock = rwLock.readLock();
     private final Lock wLock = rwLock.writeLock();
+    private final EventHandler eventHandler;
     public RunningAppContext(Configuration config) {
       this.conf = config;
+      this.eventHandler = dispatcher.getEventHandler();
     }
 
     @Override
@@ -1044,7 +1046,7 @@ public class DAGAppMaster extends AbstractService {
 
     @Override
     public EventHandler getEventHandler() {
-      return dispatcher.getEventHandler();
+      return eventHandler;
     }
 
     @Override
@@ -1308,7 +1310,6 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public synchronized void serviceStart() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f986eb6..5157401 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -76,7 +76,8 @@ public interface Vertex extends Comparable<Vertex> {
   VertexStatusBuilder getVertexStatus(Set<StatusGetOpts> statusOptions);
 
 
-  boolean setParallelism(int parallelism, Map<String, EdgeManager> sourceEdgeManagers);
+  boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+      Map<String, EdgeManager> sourceEdgeManagers);
   void setVertexLocationHint(VertexLocationHint vertexLocationHint);
 
   // CHANGE THESE TO LISTS AND MAINTAIN ORDER?

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 8ca6dc2..bbba284 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -19,61 +19,29 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
-import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
 
-@SuppressWarnings("rawtypes")
 public class RootInputVertexManager implements VertexManagerPlugin {
 
   VertexManagerPluginContext context;
-  private EventMetaData sourceInfo;
-  private Map<String, EventMetaData> destInfoMap;
-  
-  Vertex deleteVertex;
-  EventHandler deleteHandler;
-  
-  public RootInputVertexManager(Vertex v, EventHandler h) {
-    deleteVertex = v;
-    deleteHandler = h;
-  }
 
   @Override
   public void initialize(VertexManagerPluginContext context) {
     this.context = context;
-    Set<String> inputs = this.context.getVertexInputNames();
-    this.destInfoMap = Maps.newHashMapWithExpectedSize(inputs.size());
-    for (String inputName : inputs) {
-      EventMetaData destInfo = new EventMetaData(
-          EventProducerConsumerType.INPUT, context.getVertexName(),
-          inputName, null);
-      destInfoMap.put(inputName, destInfo);
-    }
-    this.sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT,
-        context.getVertexName(), "NULL", null);
   }
 
   @Override
@@ -95,21 +63,20 @@ public class RootInputVertexManager implements VertexManagerPlugin {
   }
 
   @Override
-  public void onRootVertexInitialized(String inputName,
-      InputDescriptor inputDescriptor, List<Event> events) {
+  public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+      List<Event> events) {
+    List<RootInputDataInformationEvent> riEvents = Lists.newLinkedList();
     boolean dataInformationEventSeen = false;
     for (Event event : events) {
       if (event instanceof RootInputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state check.
         Preconditions.checkState(dataInformationEventSeen == false);
         Preconditions
-            .checkState(
-                context.getVertexNumTasks(context.getVertexName()) == -1,
+            .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
                 "Parallelism for the vertex should be set to -1 if the InputInitializer is
setting parallelism");
         RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent)
event;
-        context.setVertexLocationHint(new VertexLocationHint(cEvent
-            .getNumTasks(), cEvent.getTaskLocationHints()));
-        context.setVertexParallelism(cEvent.getNumTasks(), null);
+        context.setVertexParallelism(cEvent.getNumTasks(),
+            new VertexLocationHint(cEvent.getTaskLocationHints()), null);
       }
       if (event instanceof RootInputUpdatePayloadEvent) {
         // No tasks should have been started yet. Checked by initial state check.
@@ -120,30 +87,12 @@ public class RootInputVertexManager implements VertexManagerPlugin {
         dataInformationEventSeen = true;
         // # Tasks should have been set by this point.
         Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) != 0);
-        TezEvent tezEvent = new TezEvent(event, sourceInfo);
-        tezEvent.setDestinationInfo(destInfoMap.get(inputName));
-        // FIXME the event should be sent via the context and not directly to a
-        // task
-        // FIXME event handler should not exposed to plugins
-
-        if (deleteVertex.getAppContext().isRecoveryEnabled()) {
-          VertexDataMovementEventsGeneratedEvent historyEvent =
-              new VertexDataMovementEventsGeneratedEvent(deleteVertex.getVertexId(),
-                  Arrays.asList(tezEvent));
-          // FIXME should not have access to app context
-          deleteVertex.getAppContext().getHistoryHandler().handle(
-              new DAGHistoryEvent(deleteVertex.getDAG().getID(), historyEvent));
-        }
-
-        sendEventToTask(TezTaskID.getInstance(deleteVertex.getVertexId(),
-            ((RootInputDataInformationEvent) event).getIndex()), tezEvent);
+        
+        RootInputDataInformationEvent rEvent = (RootInputDataInformationEvent)event;
+        rEvent.setTargetIndex(rEvent.getSourceIndex()); // 1:1 routing
+        riEvents.add(rEvent);
       }
     }
+    context.addRootInputEvents(inputName, riEvents);
   }
-
-  @SuppressWarnings("unchecked")
-  private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
-    deleteHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 26e44ef..02b602f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -91,6 +91,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -122,6 +123,7 @@ import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -814,9 +816,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public boolean setParallelism(int parallelism,
+  public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManager> sourceEdgeManagers) {
     writeLock.lock();
+    setVertexLocationHint(vertexLocationHint);
     try {
       if (parallelismSet == true) {
         LOG.info("Parallelism can only be set dynamically once per vertex");
@@ -827,16 +830,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
       // Input initializer expected to set parallelism.
       if (numTasks == -1) {
-        Preconditions
-            .checkArgument(sourceEdgeManagers == null,
-                "Source edge managers cannot be set when determining initial parallelism");
         this.numTasks = parallelism;
         this.createTasks();
         LOG.info("Vertex " + getVertexId() + 
             " parallelism set to " + parallelism);
-        // Pending task event management, which follows, is not required.
-        // Vertex event buffering is happening elsewhere - while in the Vertex
-        // INITIALIZING state.
+
+        if(sourceEdgeManagers != null) {
+          for(Map.Entry<String, EdgeManager> entry : sourceEdgeManagers.entrySet())
{
+            LOG.info("Replacing edge manager for source:"
+                + entry.getKey() + " destination: " + getVertexId());
+            Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
+            EdgeManager edgeManager = entry.getValue();
+            Edge edge = sourceVertices.get(sourceVertex);
+            edge.setEdgeManager(edgeManager);
+          }
+        }
       } else {
         if (parallelism >= numTasks) {
           // not that hard to support perhaps. but checking right now since there
@@ -1410,8 +1418,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         } else if (vertex.inputsWithInitializers != null) {
           LOG.info("Setting vertexManager to RootInputVertexManager for "
               + vertex.logIdentifier);
-          vertex.vertexManager = new VertexManager(new RootInputVertexManager(
-              vertex, vertex.eventHandler), vertex, vertex.appContext);
+          vertex.vertexManager = new VertexManager(new RootInputVertexManager(),
+              vertex, vertex.appContext);
         } else {
           // schedule all tasks upon vertex start. Default behavior.
           LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
@@ -1614,7 +1622,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           " sent by vertex " + splitEvent.getSenderVertex() +
           " numTasks " + splitEvent.getNumTasks());
       vertex.originalOneToOneSplitSource = originalSplitSource;
-      vertex.setParallelism(splitEvent.getNumTasks(), null);
+      // ZZZ Can this be handled ?
+      vertex.setParallelism(splitEvent.getNumTasks(), null, null);
       return vertex.initializeVertexInInitializingState();
     }
   }
@@ -2093,6 +2102,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             }
           }
           break;
+        case ROOT_INPUT_DATA_INFORMATION_EVENT:
+          checkEventSourceMetadata(vertex, sourceMeta);
+          RootInputDataInformationEvent riEvent = (RootInputDataInformationEvent) tezEvent
+              .getEvent();
+          TezTaskID targetTaskID = TezTaskID.getInstance(vertex.getVertexId(),
+              riEvent.getTargetIndex());
+          vertex.eventHandler.handle(new TaskEventAddTezEvent(targetTaskID, tezEvent)); 
        
+          break;
         case VERTEX_MANAGER_EVENT:
         {
           VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 106c868..d45e77b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -19,10 +19,13 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -34,12 +37,20 @@ import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -51,10 +62,18 @@ public class VertexManager {
   byte[] payload = null;
   AppContext appContext;
   
+  private static final Log LOG = LogFactory.getLog(VertexManager.class);
+  
   class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
+    // TODO Add functionality to allow VertexManagers to send VertexManagerEvents
+    
+    private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
+        managedVertex.getName(), "NULL_VERTEX", null);
+    private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
     
     @Override
     public Map<String, EdgeProperty> getInputVertexEdgeProperties() {
+      // TODO Something similar for Initial Inputs - payload etc visible
       Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
       Map<String, EdgeProperty> vertexEdgeMap = 
                           Maps.newHashMapWithExpectedSize(inputs.size());
@@ -75,9 +94,9 @@ public class VertexManager {
     }
 
     @Override
-    public boolean setVertexParallelism(int parallelism,
+    public boolean setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
         Map<String, EdgeManager> sourceEdgeManagers) {
-      return managedVertex.setParallelism(parallelism, sourceEdgeManagers);
+      return managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers);
     }
 
     @Override
@@ -97,15 +116,49 @@ public class VertexManager {
     }
 
     @Override
+    public byte[] getUserPayload() {
+      return payload;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void addRootInputEvents(final String inputName,
+        Collection<RootInputDataInformationEvent> events) {
+      verifyIsRootInput(inputName);
+      Iterable<TezEvent> tezEvents = Iterables.transform(events,
+          new Function<RootInputDataInformationEvent, TezEvent>() {
+            @Override
+            public TezEvent apply(RootInputDataInformationEvent riEvent) {
+              TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata);
+              tezEvent.setDestinationInfo(getDestinationMetaData(inputName));
+              return tezEvent;
+            }
+          });
+      appContext.getEventHandler().handle(
+          new VertexEventRouteEvent(managedVertex.getVertexId(), Lists.newArrayList(tezEvents)));
+      // Recovery handling is taken care of by the Vertex.
+    }
+
+
+    @Override
     public void setVertexLocationHint(VertexLocationHint locationHint) {
       managedVertex.setVertexLocationHint(locationHint);
     }
 
-    @Override
-    public byte[] getUserPayload() {
-      return payload;
+    private void verifyIsRootInput(String inputName) {
+      Preconditions.checkState(managedVertex.getAdditionalInputs().get(inputName) != null,
+          "Cannot add events for non-root inputs");
     }
 
+    private EventMetaData getDestinationMetaData(String inputName) {
+      EventMetaData destMeta = destinationEventMetadataMap.get(inputName);
+      if (destMeta == null) {
+        destMeta = new EventMetaData(EventProducerConsumerType.INPUT, managedVertex.getName(),
+            inputName, null);
+        destinationEventMetadataMap.put(inputName, destMeta);
+      }
+      return destMeta;
+    }
   }
   
   public VertexManager(VertexManagerPlugin plugin, 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c17adea..dc78e02 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -1401,7 +1401,7 @@ public class TestVertexImpl {
     EdgeManager mockEdgeManager = mock(EdgeManager.class);
     Map<String, EdgeManager> edgeManager = Collections.singletonMap(
        v1.getName(), mockEdgeManager);
-    v3.setParallelism(1, edgeManager);
+    v3.setParallelism(1, null, edgeManager);
     Assert.assertEquals(1, v3.getTotalTasks());
     Assert.assertEquals(1, tasks.size());
     // the last one is removed
@@ -1425,7 +1425,7 @@ public class TestVertexImpl {
     Vertex v3 = vertices.get("vertex3"); // Vertex3 linked to v1 (v1 src, v3
                                          // dest)
     Map<String, EdgeManager> edgeManagers = Collections.singletonMap(v1.getName(),
em);
-    v3.setParallelism(v3.getTotalTasks() - 1, edgeManagers); // Must decrease.
+    v3.setParallelism(v3.getTotalTasks() - 1, null, edgeManagers); // Must decrease.
 
     EdgeManagerForTest edgeManagerPostSet = (EdgeManagerForTest) edge.getEdgeManager();
     Assert.assertEquals(false, edgeManagerPostSet.isCreatedByFramework());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index 8e17508..5830623 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -71,7 +71,8 @@ public class ProtoConverters {
       convertRootInputDataInformationEventToProto(RootInputDataInformationEvent event) {
     EventProtos.RootInputDataInformationEventProto.Builder builder =
         EventProtos.RootInputDataInformationEventProto.newBuilder();
-    builder.setIndex(event.getIndex());
+    builder.setSourceIndex(event.getSourceIndex());
+    builder.setTargetIndex(event.getTargetIndex());
     if (event.getUserPayload() != null) {
       builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
     }
@@ -81,9 +82,11 @@ public class ProtoConverters {
   public static RootInputDataInformationEvent
       convertRootInputDataInformationEventFromProto(
       EventProtos.RootInputDataInformationEventProto proto) {
-    return new RootInputDataInformationEvent(proto.getIndex(),
-        proto.getUserPayload() != null ?
-            proto.getUserPayload().toByteArray() : null);
+    RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
+        proto.getSourceIndex(), proto.getUserPayload() != null ? proto.getUserPayload()
+            .toByteArray() : null);
+    diEvent.setTargetIndex(proto.getTargetIndex());
+    return diEvent;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 79cde5c..0776cf0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -35,6 +35,7 @@ import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -355,7 +356,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
                 remainderRangeForLastShuffler : basePartitionRange)));
       }
       
-      context.setVertexParallelism(finalTaskParallelism, edgeManagers);
+      context.setVertexParallelism(finalTaskParallelism, null, edgeManagers);
       updatePendingTasks();      
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8f84fa56/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 4ee034c..4c018e1 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
@@ -115,7 +116,7 @@ public class TestShuffleVertexManager {
           newEdgeManagers.clear();
           newEdgeManagers.putAll((Map<String, EdgeManager>)invocation.getArguments()[1]);
           return null;
-      }}).when(mockContext).setVertexParallelism(eq(2), anyMap());
+      }}).when(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap());
     
     // source vertices have 0 tasks. immediate start of all managed tasks
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
@@ -141,7 +142,7 @@ public class TestShuffleVertexManager {
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
     // managedVertex tasks reduced
-    verify(mockContext, times(0)).setVertexParallelism(anyInt(), anyMap());
+    verify(mockContext, times(0)).setVertexParallelism(anyInt(), any(VertexLocationHint.class),
anyMap());
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());
     Assert.assertEquals(1, manager.numSourceTasksCompleted);
@@ -180,7 +181,7 @@ public class TestShuffleVertexManager {
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
     // managedVertex tasks reduced
-    verify(mockContext).setVertexParallelism(eq(2), anyMap());
+    verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     // TODO improve tests for parallelism
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -193,7 +194,7 @@ public class TestShuffleVertexManager {
     
     // more completions dont cause recalculation of parallelism
     manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
-    verify(mockContext).setVertexParallelism(eq(2), anyMap());
+    verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
     EdgeManager edgeManager = newEdgeManagers.values().iterator().next();


Mime
View raw message