tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2233. Allow EdgeProperty of an edge to be changed by VertexManager (bikas)
Date Wed, 08 Apr 2015 22:16:52 GMT
Repository: tez
Updated Branches:
  refs/heads/master b6ce703c6 -> c636dc220


TEZ-2233. Allow EdgeProperty of an edge to be changed by VertexManager (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c636dc22
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c636dc22
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c636dc22

Branch: refs/heads/master
Commit: c636dc2206a1f3a9261b923a0e02ade0d23a19ec
Parents: b6ce703
Author: Bikas Saha <bikas@apache.org>
Authored: Wed Apr 8 15:16:43 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Wed Apr 8 15:16:43 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/DagTypeConverters.java   |  31 +++++-
 .../org/apache/tez/dag/api/EdgeProperty.java    |  23 +++--
 .../tez/dag/api/VertexManagerPluginContext.java |  29 +++++-
 tez-api/src/main/proto/DAGApiRecords.proto      |   9 ++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   6 ++
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  29 ++++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  74 +++++++++-----
 .../tez/dag/app/dag/impl/VertexManager.java     |  14 +++
 .../events/VertexParallelismUpdatedEvent.java   |  35 ++++---
 .../impl/HistoryEventJsonConversion.java        |  11 +-
 .../apache/tez/dag/history/utils/DAGUtils.java  |  35 ++++++-
 tez-dag/src/main/proto/HistoryEvents.proto      |   2 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  72 +++++++------
 .../TestHistoryEventsProtoConversion.java       |  59 ++++++-----
 .../impl/TestHistoryEventJsonConversion.java    |  18 +++-
 .../ats/HistoryEventTimelineConversion.java     |  10 +-
 .../ats/TestHistoryEventTimelineConversion.java |  18 +++-
 .../vertexmanager/InputReadyVertexManager.java  |   8 +-
 .../vertexmanager/ShuffleVertexManager.java     |  18 +++-
 .../TestInputReadyVertexManager.java            |  10 +-
 .../vertexmanager/TestShuffleVertexManager.java | 102 +++++--------------
 22 files changed, 393 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f660feb..45c1541 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2233. Allow EdgeProperty of an edge to be changed by VertexManager
   TEZ-2293. When running in "mr" mode, always use MR config settings.
   TEZ-2273. Tez UI: Support client side searching & sorting for dag tasks page
   TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 227897f..b4185b1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -20,7 +20,6 @@ package org.apache.tez.dag.api;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -59,6 +58,7 @@ import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeProperty;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
@@ -159,6 +159,7 @@ public class DagTypeConverters {
       case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE;
       case BROADCAST : return DataMovementType.BROADCAST;
       case SCATTER_GATHER : return DataMovementType.SCATTER_GATHER;
+      case CUSTOM : return DataMovementType.CUSTOM;
       default : throw new IllegalArgumentException("unknown 'dataMovementType': " + type);
     }
   }
@@ -263,6 +264,34 @@ public class DagTypeConverters {
     }
     return edgePlanMap;
   }
+  
+  public static PlanEdgeProperty convertToProto(EdgeProperty prop) {
+    PlanEdgeProperty.Builder edgePropBuilder = PlanEdgeProperty.newBuilder();
+    edgePropBuilder.setDataMovementType(convertToDAGPlan(prop.getDataMovementType()));
+    edgePropBuilder.setDataSourceType(convertToDAGPlan(prop.getDataSourceType()));
+    edgePropBuilder.setSchedulingType(convertToDAGPlan(prop.getSchedulingType()));
+    edgePropBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(prop.getEdgeSource()));
+    edgePropBuilder
+        .setEdgeDestination(DagTypeConverters.convertToDAGPlan(prop.getEdgeDestination()));
+    if (prop.getEdgeManagerDescriptor() != null) {
+      edgePropBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(prop
+          .getEdgeManagerDescriptor()));
+    }
+    
+    return edgePropBuilder.build();
+  }
+  
+  public static EdgeProperty convertFromProto(PlanEdgeProperty edge) {
+      return EdgeProperty.create(
+          (edge.hasEdgeManager() ?
+              convertEdgeManagerPluginDescriptorFromDAGPlan(edge.getEdgeManager()) : null),
+          convertFromDAGPlan(edge.getDataMovementType()),
+          convertFromDAGPlan(edge.getDataSourceType()),
+          convertFromDAGPlan(edge.getSchedulingType()),
+          convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
+          convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
+      );
+  }
 
   public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
     if (edge.getDataMovementType() == PlanEdgeDataMovementType.CUSTOM) {

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index c6ac25f..07fb2c1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -155,20 +155,22 @@ public class EdgeProperty {
         edgeDestination);
   }
 
+  @Private
+  public static EdgeProperty create(EdgeManagerPluginDescriptor edgeManagerDescriptor,
+      DataMovementType dataMovementType, DataSourceType dataSourceType,
+      SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) {
+    return new EdgeProperty(edgeManagerDescriptor, dataMovementType, dataSourceType,
+        schedulingType, edgeSource, edgeDestination);
+  }
 
   private EdgeProperty(DataMovementType dataMovementType,
                        DataSourceType dataSourceType,
                        SchedulingType schedulingType,
                        OutputDescriptor edgeSource,
                        InputDescriptor edgeDestination) {
+    this(null, dataMovementType, dataSourceType, schedulingType, edgeSource, edgeDestination);
     Preconditions.checkArgument(dataMovementType != DataMovementType.CUSTOM,
         DataMovementType.CUSTOM + " cannot be used with this constructor");
-    this.dataMovementType = dataMovementType;
-    this.dataSourceType = dataSourceType;
-    this.schedulingType = schedulingType;
-    this.outputDescriptor = edgeSource;
-    this.inputDescriptor = edgeDestination;
-    this.edgeManagerDescriptor = null;
   }
   
 
@@ -177,7 +179,14 @@ public class EdgeProperty {
                        SchedulingType schedulingType,
                        OutputDescriptor edgeSource,
                        InputDescriptor edgeDestination) {
-    this.dataMovementType = DataMovementType.CUSTOM;
+    this(edgeManagerDescriptor, DataMovementType.CUSTOM, dataSourceType, schedulingType,
+        edgeSource, edgeDestination);
+  }
+  
+  private EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor,
+      DataMovementType dataMovementType, DataSourceType dataSourceType,
+      SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) {
+    this.dataMovementType = dataMovementType;
     this.edgeManagerDescriptor = edgeManagerDescriptor;
     this.dataSourceType = dataSourceType;
     this.schedulingType = schedulingType;

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index dfa9287..38ecbf6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -129,8 +129,34 @@ public interface VertexManagerPluginContext {
       @Nullable VertexLocationHint locationHint,
       @Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate);
+
   
   /**
+   * API to reconfigure a {@link Vertex} by changing its task parallelism. Task
+   * parallelism is often accompanied by changing the {@link EdgeProperty} of
+   * the source {@link Edge} because event routing between source and
+   * destination tasks may need to be updated to account for the new task
+   * parallelism. This method can be called to update the parallelism multiple
+   * times until any of the tasks of the vertex have been scheduled (by invoking
+   * {@link #scheduleVertexTasks(List)}. If needed, the original source edge
+   * properties may be obtained via {@link #getInputVertexEdgeProperties()}
+   * 
+   * @param parallelism
+   *          New number of tasks in the vertex
+   * @param locationHint
+   *          the placement policy for tasks specified at
+   *          {@link VertexLocationHint}s
+   * @param sourceEdgeProperties
+   *          Map with Key=name of {@link Edge} to be updated and Value=
+   *          {@link EdgeProperty}. The name of the Edge will be the 
+   *          corresponding source vertex name.
+   * @throws TezException Exception to indicate errors
+   */
+  public void reconfigureVertex(int parallelism,
+      @Nullable VertexLocationHint locationHint,
+      @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws TezException;
+
+  /**
    * Allows a VertexManagerPlugin to assign Events for Root Inputs
    * 
    * For regular Event Routing changes - the EdgeManager should be configured
@@ -189,8 +215,7 @@ public interface VertexManagerPluginContext {
    * reconfiguration. If the vertex is already fully defined, but the
    * {@link VertexManagerPlugin} wants to reconfigure the vertex, then it must
    * use this API to inform Tez about its intention. Without invoking this
-   * method, it is invalid to re-configure the vertex, e.g. via the
-   * {@link #setVertexParallelism(int, VertexLocationHint, Map, Map)} method if
+   * method, it is invalid to re-configure the vertex if
    * the vertex is already fully defined. This can be invoked at any time until
    * {@link VertexManagerPlugin#initialize()} has completed. Its invalid to
    * invoke this method after {@link VertexManagerPlugin#initialize()} has

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index dbd569a..9ac08b2 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -141,6 +141,15 @@ message VertexPlan {
   optional ConfigurationProto vertexConf = 11;
 }
 
+message PlanEdgeProperty {
+  required PlanEdgeDataMovementType dataMovementType = 1;
+  required PlanEdgeDataSourceType dataSourceType = 2;
+  required PlanEdgeSchedulingType schedulingType = 3;
+  optional TezEntityDescriptorProto edge_source = 4;
+  optional TezEntityDescriptorProto edge_destination = 5;
+  optional TezEntityDescriptorProto edge_manager = 6;
+}
+
 message EdgePlan {
   required string id = 1;
   required string inputVertexName = 2;

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 291a0c5..44df6cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
@@ -93,6 +94,11 @@ public interface Vertex extends Comparable<Vertex> {
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
       Map<String, InputSpecUpdate> rootInputSpecUpdate, boolean fromVertexManager)
       throws AMUserCodeException;
+  
+  public void reconfigureVertex(int parallelism,
+      @Nullable VertexLocationHint locationHint,
+      @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException;
+
   void setVertexLocationHint(VertexLocationHint vertexLocationHint);
   void vertexReconfigurationPlanned();
   void doneReconfiguringVertex();

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index ffdff80..f5fef67 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -52,6 +52,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
@@ -162,7 +163,22 @@ public class Edge {
         null);
   }
 
-  public synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor)
+  public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException {
+    this.edgeProperty = newEdgeProperty;
+    boolean wasUnInitialized = (edgeManager == null);
+    createEdgeManager();
+    initialize();
+    if (wasUnInitialized) {
+      sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this,
+          destinationVertex));
+      sendEvent(new VertexEventNullEdgeInitialized(destinationVertex.getVertexId(), this,
+          sourceVertex));
+    }
+  }
+  
+  // Test only method for creating specific scenarios
+  @VisibleForTesting
+  synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor)
       throws AMUserCodeException {
     EdgeProperty modifiedEdgeProperty =
         EdgeProperty.create(descriptor,
@@ -170,17 +186,10 @@ public class Edge {
             edgeProperty.getSchedulingType(),
             edgeProperty.getEdgeSource(),
             edgeProperty.getEdgeDestination());
-    this.edgeProperty = modifiedEdgeProperty;
-    boolean wasUnInitialized = (edgeManager == null);
-    createEdgeManager();
-    initialize();
-    if (wasUnInitialized) {
-      sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this, destinationVertex));
-      sendEvent(new VertexEventNullEdgeInitialized(destinationVertex.getVertexId(), this, sourceVertex));
-    }
+    setEdgeProperty(modifiedEdgeProperty);
   }
 
-  public EdgeProperty getEdgeProperty() {
+  public synchronized EdgeProperty getEdgeProperty() {
     return this.edgeProperty;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 183e780..2d892b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -63,6 +63,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
@@ -707,7 +708,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @VisibleForTesting
   boolean hasCommitter = false;
   private boolean vertexCompleteSeen = false;
-  private Map<String,EdgeManagerPluginDescriptor> recoveredSourceEdgeManagers = null;
+  private Map<String,EdgeProperty> recoveredSourceEdgeProperties = null;
   private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null;
 
   // Recovery related flags
@@ -1117,7 +1118,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private void handleParallelismUpdate(int newParallelism,
-      Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
+      Map<String, EdgeProperty> sourceEdgeProperties,
       Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldParallelism) {
     // initial parallelism must have been set by this time
     // parallelism update is recorded in history only for change from an initialized value
@@ -1128,7 +1129,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       removeTasks(newParallelism);
     }
     Preconditions.checkState(this.numTasks == newParallelism, getLogIdentifier());
-    this.recoveredSourceEdgeManagers = sourceEdgeManagers;
+    this.recoveredSourceEdgeProperties = sourceEdgeProperties;
     this.recoveredRootInputSpecUpdates = rootInputSpecUpdates;
   }
 
@@ -1167,7 +1168,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             (VertexParallelismUpdatedEvent) historyEvent;
         int oldNumTasks = numTasks;
         int newNumTasks = updatedEvent.getNumTasks();
-        handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeManagers(),
+        handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(),
           updatedEvent.getRootInputSpecUpdates(), oldNumTasks);
         Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier());
         if (updatedEvent.getVertexLocationHint() != null) {
@@ -1300,32 +1301,59 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       writeLock.unlock();
     }
   }
+  
+  @Override
+  public void reconfigureVertex(int parallelism,
+      @Nullable VertexLocationHint locationHint,
+      @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException {
+    setParallelism(parallelism, locationHint, sourceEdgeProperties, null, false, true);
+  }
 
   @Override
   public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
-      Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager)
-      throws AMUserCodeException {
-    setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
+      Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager) 
+          throws AMUserCodeException {
+    // temporarily support conversion of edge manager to edge property
+    Map<String, EdgeProperty> sourceEdgeProperties = Maps.newHashMap();
+    readLock.lock();
+    try {
+      if (sourceEdgeManagers != null && !sourceEdgeManagers.isEmpty()) {
+        for (Edge e : sourceVertices.values()) {
+          EdgeManagerPluginDescriptor newEdge = sourceEdgeManagers.get(e.getSourceVertexName());
+          EdgeProperty oldEdge = e.getEdgeProperty();
+          if (newEdge != null) {
+            sourceEdgeProperties.put(
+                e.getSourceVertexName(),
+                EdgeProperty.create(newEdge, oldEdge.getDataSourceType(),
+                    oldEdge.getSchedulingType(), oldEdge.getEdgeSource(),
+                    oldEdge.getEdgeDestination()));
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates,
         false, fromVertexManager);
   }
 
   private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
-      Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
+      Map<String, EdgeProperty> sourceEdgeProperties,
       Map<String, InputSpecUpdate> rootInputSpecUpdates,
       boolean recovering, boolean fromVertexManager) throws AMUserCodeException {
     if (recovering) {
       writeLock.lock();
       try {
-        if (sourceEdgeManagers != null) {
-          for(Map.Entry<String, EdgeManagerPluginDescriptor> entry :
-              sourceEdgeManagers.entrySet()) {
+        if (sourceEdgeProperties != null) {
+          for(Map.Entry<String, EdgeProperty> entry :
+            sourceEdgeProperties.entrySet()) {
             LOG.info("Recovering edge manager for source:"
                 + entry.getKey() + " destination: " + getLogIdentifier());
             Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
             Edge edge = sourceVertices.get(sourceVertex);
             try {
-              edge.setCustomEdgeManager(entry.getValue());
+              edge.setEdgeProperty(entry.getValue());
             } catch (Exception e) {
               throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge,"
                   + "sourceVertex:" + edge.getSourceVertexName()
@@ -1378,16 +1406,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   + " for vertex: " + logIdentifier);
         }
 
-        if(sourceEdgeManagers != null) {
-          for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
+        if(sourceEdgeProperties != null) {
+          for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) {
             LOG.info("Replacing edge manager for source:"
                 + entry.getKey() + " destination: " + getLogIdentifier());
             Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
             Edge edge = sourceVertices.get(sourceVertex);
             try {
-              edge.setCustomEdgeManager(entry.getValue());
+              edge.setEdgeProperty(entry.getValue());
             } catch (Exception e) {
-              throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge,"
+              throw new TezUncheckedException("Fail to update EdgeProperty for Edge,"
                   + "sourceVertex:" + edge.getSourceVertexName()
                   + "destinationVertex:" + edge.getDestinationVertexName(), e);
             }
@@ -1438,7 +1466,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (parallelism == numTasks) {
           LOG.info("setParallelism same as current value: " + parallelism +
               " for vertex: " + logIdentifier);
-          Preconditions.checkArgument(sourceEdgeManagers != null,
+          Preconditions.checkArgument(sourceEdgeProperties != null,
               "Source edge managers or RootInputSpecs must be set when not changing parallelism");
         } else {
           LOG.info("Resetting vertex location hints due to change in parallelism for vertex: "
@@ -1465,14 +1493,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         assert tasks.size() == numTasks;
 
         // set new edge managers
-        if(sourceEdgeManagers != null) {
-          for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) {
+        if(sourceEdgeProperties != null) {
+          for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) {
             LOG.info("Replacing edge manager for source:"
                 + entry.getKey() + " destination: " + getLogIdentifier());
             Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
             Edge edge = sourceVertices.get(sourceVertex);
             try {
-              edge.setCustomEdgeManager(entry.getValue());
+              edge.setEdgeProperty(entry.getValue());
             } catch (Exception e) {
               throw new TezUncheckedException(e);
             }
@@ -1481,7 +1509,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         // update history
         VertexParallelismUpdatedEvent parallelismUpdatedEvent = new VertexParallelismUpdatedEvent(
-            vertexId, numTasks, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
+            vertexId, numTasks, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates,
             oldNumTasks);
         appContext.getHistoryHandler().handle(
             new DAGHistoryEvent(getDAGId(), parallelismUpdatedEvent));
@@ -2739,7 +2767,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           try {
             // recovering only edge manager
             vertex.setParallelism(0,
-              null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false);
+              null, vertex.recoveredSourceEdgeProperties, vertex.recoveredRootInputSpecUpdates, true, false);
             successSetParallelism = true;
           } catch (Exception e) {
             successSetParallelism = false;
@@ -2796,7 +2824,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             break;
           }
           try {
-            vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
+            vertex.setParallelism(vertex.numTasks, null, vertex.recoveredSourceEdgeProperties,
               vertex.recoveredRootInputSpecUpdates, true, false);
             successSetParallelism = true;
           } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 95d714b..4bf51a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -44,6 +44,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -156,6 +157,19 @@ public class VertexManager {
         throw new TezUncheckedException(e);
       }
     }
+    
+    @Override
+    public synchronized void reconfigureVertex(int parallelism,
+        @Nullable VertexLocationHint locationHint,
+        @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws TezException {
+      checkAndThrowIfDone();
+      try {
+        managedVertex.reconfigureVertex(parallelism, locationHint, sourceEdgeProperties);
+      } catch (AMUserCodeException e) {
+        // convert it to TezException which would be caught in VM
+        throw new TezException(e);
+      }
+    }
 
     @Override
     public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
index ef21537..456e2a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -44,7 +44,7 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
   private int numTasks;
   private int oldNumTasks;
   private VertexLocationHint vertexLocationHint;
-  private Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers;
+  private Map<String, EdgeProperty> sourceEdgeProperties;
   private Map<String, InputSpecUpdate> rootInputSpecUpdates;
   private long updateTime;
 
@@ -53,12 +53,12 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
 
   public VertexParallelismUpdatedEvent(TezVertexID vertexID,
       int numTasks, VertexLocationHint vertexLocationHint,
-      Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
+      Map<String, EdgeProperty> sourceEdgeProperties,
       Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldNumTasks) {
     this.vertexID = vertexID;
     this.numTasks = numTasks;
     this.vertexLocationHint = vertexLocationHint;
-    this.sourceEdgeManagers = sourceEdgeManagers;
+    this.sourceEdgeProperties = sourceEdgeProperties;
     this.rootInputSpecUpdates = rootInputSpecUpdates;
     this.updateTime = System.currentTimeMillis();
     this.oldNumTasks = oldNumTasks;
@@ -88,14 +88,13 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
       builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
             this.vertexLocationHint));
     }
-    if (sourceEdgeManagers != null) {
-      for (Entry<String, EdgeManagerPluginDescriptor> entry :
-          sourceEdgeManagers.entrySet()) {
+    if (sourceEdgeProperties != null) {
+      for (Entry<String, EdgeProperty> entry :
+        sourceEdgeProperties.entrySet()) {
         EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
             EdgeManagerDescriptorProto.newBuilder();
         edgeMgrBuilder.setEdgeName(entry.getKey());
-        edgeMgrBuilder.setEntityDescriptor(
-            DagTypeConverters.convertToDAGPlan(entry.getValue()));
+        edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue()));
         builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
       }
     }
@@ -121,15 +120,15 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
           proto.getVertexLocationHint());
     }
     if (proto.getEdgeManagerDescriptorsCount() > 0) {
-      this.sourceEdgeManagers = new HashMap<String, EdgeManagerPluginDescriptor>(
+      this.sourceEdgeProperties = new HashMap<String, EdgeProperty>(
           proto.getEdgeManagerDescriptorsCount());
       for (EdgeManagerDescriptorProto edgeManagerProto :
         proto.getEdgeManagerDescriptorsList()) {
-        EdgeManagerPluginDescriptor edgeManagerDescriptor =
-            DagTypeConverters.convertEdgeManagerPluginDescriptorFromDAGPlan(
-                edgeManagerProto.getEntityDescriptor());
-        sourceEdgeManagers.put(edgeManagerProto.getEdgeName(),
-            edgeManagerDescriptor);
+        EdgeProperty edgeProperty =
+            DagTypeConverters.convertFromProto(
+                edgeManagerProto.getEdgeProperty());
+        sourceEdgeProperties.put(edgeManagerProto.getEdgeName(),
+            edgeProperty);
       }
     }
     if (proto.getRootInputSpecUpdatesCount() > 0) {
@@ -169,7 +168,7 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
         + ", vertexLocationHint=" +
         (vertexLocationHint == null? "null" : vertexLocationHint)
         + ", edgeManagersCount=" +
-        (sourceEdgeManagers == null? "null" : sourceEdgeManagers.size()
+        (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size()
         + ", rootInputSpecUpdateCount="
         + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size()));
   }
@@ -186,8 +185,8 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
     return vertexLocationHint;
   }
 
-  public Map<String, EdgeManagerPluginDescriptor> getSourceEdgeManagers() {
-    return sourceEdgeManagers;
+  public Map<String, EdgeProperty> getSourceEdgeProperties() {
+    return sourceEdgeProperties;
   }
   
   public Map<String, InputSpecUpdate> getRootInputSpecUpdates() {

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 79a0c34..22d95d8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -20,11 +20,10 @@ package org.apache.tez.dag.history.logging.impl;
 
 import java.util.Map;
 import java.util.Map.Entry;
-
 import java.util.TreeMap;
 
 import org.apache.tez.common.ATSConstants;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -722,12 +721,12 @@ public class HistoryEventJsonConversion {
     JSONObject eventInfo = new JSONObject();
     eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
     eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
-    if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) {
+    if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
       JSONObject updatedEdgeManagers = new JSONObject();
-      for (Entry<String, EdgeManagerPluginDescriptor> entry :
-          event.getSourceEdgeManagers().entrySet()) {
+      for (Entry<String, EdgeProperty> entry :
+          event.getSourceEdgeProperties().entrySet()) {
         updatedEdgeManagers.put(entry.getKey(),
-            new JSONObject(DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())));
+            new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue())));
       }
       eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 0f34811..3ec9900 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -36,14 +36,13 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
-import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -234,7 +233,7 @@ public class DAGUtils {
         edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
             DagTypeConverters.getHistoryTextFromProto(
                 edgePlan.getEdgeDestination()));
-      }
+      } // TEZ-2286 this is missing edgemanager descriptor for custom edge
       edgesList.add(edgeMap);
     }
     putInto(dagMap, EDGES_KEY, edgesList);
@@ -352,6 +351,36 @@ public class DAGUtils {
     return vertexStatsMap;
   }
 
+  public static Map<String,Object> convertEdgeProperty(
+      EdgeProperty edge) {
+    Map<String, Object> jsonDescriptor = new HashMap<String, Object>();
+    
+    jsonDescriptor.put(DATA_MOVEMENT_TYPE_KEY,
+        edge.getDataMovementType().name());
+    jsonDescriptor.put(DATA_SOURCE_TYPE_KEY, edge.getDataSourceType().name());
+    jsonDescriptor.put(SCHEDULING_TYPE_KEY, edge.getSchedulingType().name());
+    jsonDescriptor.put(EDGE_SOURCE_CLASS_KEY,
+        edge.getEdgeSource().getClassName());
+    jsonDescriptor.put(EDGE_DESTINATION_CLASS_KEY,
+        edge.getEdgeDestination().getClassName());
+    String history = edge.getEdgeSource().getHistoryText();
+    if (history != null) {
+      jsonDescriptor.put(OUTPUT_USER_PAYLOAD_AS_TEXT, history);
+    }
+    history = edge.getEdgeDestination().getHistoryText();
+    if (history != null) {
+      jsonDescriptor.put(INPUT_USER_PAYLOAD_AS_TEXT, history);
+    }
+    EdgeManagerPluginDescriptor descriptor = edge.getEdgeManagerDescriptor();
+    if (descriptor != null) {
+      jsonDescriptor.put(EDGE_MANAGER_CLASS_KEY, descriptor.getClassName());
+      if (descriptor.getHistoryText() != null && !descriptor.getHistoryText().isEmpty()) {
+        jsonDescriptor.put(USER_PAYLOAD_AS_TEXT, descriptor.getHistoryText());
+      }
+    }
+    return jsonDescriptor;
+  }
+  
   public static Map<String,Object> convertEdgeManagerPluginDescriptor(
       EdgeManagerPluginDescriptor descriptor) {
     Map<String, Object> jsonDescriptor = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 45e9582..617a644 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -95,7 +95,7 @@ message VertexStartedProto {
 
 message EdgeManagerDescriptorProto {
   optional string edge_name = 1;
-  optional TezEntityDescriptorProto entity_descriptor = 2;
+  optional PlanEdgeProperty edge_property = 2;
 }
 
 message RootInputSpecUpdateProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e643a5b..5ad320e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2441,7 +2441,7 @@ public class TestVertexImpl {
     startVertex(v1);
     try {
       // cannot reconfigure a fully configured vertex without first notifying
-      v3.setParallelism(1, null, null, null, true);
+      v3.reconfigureVertex(1, null, null);
       Assert.fail();
     } catch (IllegalStateException e) {
       Assert.assertTrue(e.getMessage().contains("context.vertexReconfigurationPlanned() before re-configuring"));
@@ -2496,10 +2496,13 @@ public class TestVertexImpl {
     EdgeManagerPluginDescriptor mockEdgeManagerDescriptor =
         EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
 
-    Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
+    EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, 
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), 
+        InputDescriptor.create("In"));
+    Map<String, EdgeProperty> edgeManagerDescriptors =
         Collections.singletonMap(
-       v1.getName(), mockEdgeManagerDescriptor);
-    v3.setParallelism(1, null, edgeManagerDescriptors, null, true);
+       v1.getName(), edgeProp);
+    v3.reconfigureVertex(1, null, edgeManagerDescriptors);
     v3.doneReconfiguringVertex();
     assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
         EdgeManagerForTest);
@@ -2520,11 +2523,13 @@ public class TestVertexImpl {
 
     EdgeManagerPluginDescriptor mockEdgeManagerDescriptor =
         EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
-
-    Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
+    EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, 
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), 
+        InputDescriptor.create("In"));
+    Map<String, EdgeProperty> edgeManagerDescriptors =
         Collections.singletonMap(
-       v1.getName(), mockEdgeManagerDescriptor);
-    v3.setParallelism(10, null, edgeManagerDescriptors, null, true);
+       v1.getName(), edgeProp);
+    v3.reconfigureVertex(10, null, edgeManagerDescriptors);
     v3.doneReconfiguringVertex();
     assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
         EdgeManagerForTest);
@@ -2543,10 +2548,10 @@ public class TestVertexImpl {
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
-    v3.setParallelism(10, null, null, null, true);
+    v3.reconfigureVertex(10, null, null);
     checkTasks(v3, 10);
     
-    v3.setParallelism(5, null, null, null, true);
+    v3.reconfigureVertex(5, null, null);
     checkTasks(v3, 5);
     v3.doneReconfiguringVertex();
   }
@@ -2563,12 +2568,12 @@ public class TestVertexImpl {
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
-    v3.setParallelism(10, null, null, null, true);
+    v3.reconfigureVertex(10, null, null);
     checkTasks(v3, 10);
     v3.doneReconfiguringVertex();
 
     try {
-      v3.setParallelism(5, null, null, null, true);
+      v3.reconfigureVertex(5, null, null);
       Assert.fail();
     } catch (IllegalStateException e) {
       Assert.assertTrue(e.getMessage().contains("Vertex is fully configured but still"));
@@ -2587,11 +2592,11 @@ public class TestVertexImpl {
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
-    v3.setParallelism(10, null, null, null, true);
+    v3.reconfigureVertex(10, null, null);
     checkTasks(v3, 10);
     v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
     try {
-      v3.setParallelism(5, null, null, null, true);
+      v3.reconfigureVertex(5, null, null);
       Assert.fail();
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling"));
@@ -2612,7 +2617,7 @@ public class TestVertexImpl {
     startVertex(v1);
     v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
     try {
-      v3.setParallelism(5, null, null, null, true);
+      v3.reconfigureVertex(5, null, null);
       Assert.fail();
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling"));
@@ -2671,12 +2676,15 @@ public class TestVertexImpl {
     EdgeManagerPluginDescriptor edgeManagerDescriptor =
         EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
     edgeManagerDescriptor.setUserPayload(userPayload);
+    EdgeProperty edgeProp = EdgeProperty.create(edgeManagerDescriptor, 
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), 
+        InputDescriptor.create("In"));
 
     Vertex v3 = vertices.get("vertex3");
 
-    Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
-        Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
-    v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null, true); // Must decrease.
+    Map<String, EdgeProperty> edgeManagerDescriptors =
+        Collections.singletonMap(v3.getName(), edgeProp);
+    v5.reconfigureVertex(v5.getTotalTasks() - 1, null, edgeManagerDescriptors);
     v5.doneReconfiguringVertex();
 
     VertexImpl v5Impl = (VertexImpl) v5;
@@ -3411,8 +3419,8 @@ public class TestVertexImpl {
     Assert.assertEquals(-1, v1.getTotalTasks());
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     // set the parallelism
-    v1.setParallelism(numTasks, null, null, null, true);
-    v2.setParallelism(numTasks, null, null, null, true);
+    v1.reconfigureVertex(numTasks, null, null);
+    v2.reconfigureVertex(numTasks, null, null);
     dispatcher.await();
     // parallelism set and vertex starts with pending start event
     Assert.assertEquals(numTasks, v1.getTotalTasks());
@@ -3427,7 +3435,7 @@ public class TestVertexImpl {
     // v3 still initializing with source vertex started. So should start running
     // once num tasks is defined
     Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
-    v3.setParallelism(numTasks, null, null, null, false);
+    v3.reconfigureVertex(numTasks, null, null);
     dispatcher.await();
     Assert.assertEquals(numTasks, v3.getTotalTasks());
     Assert.assertEquals(VertexState.RUNNING, v3.getState());
@@ -3550,7 +3558,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
     // change parallelism
     int newNumTasks = 3;
-    v1.setParallelism(newNumTasks, null, null, null, true);
+    v1.reconfigureVertex(newNumTasks, null, null);
     v1.doneReconfiguringVertex();
     dispatcher.await();
     Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks());
@@ -3584,7 +3592,7 @@ public class TestVertexImpl {
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
     // change parallelism
     int newNumTasks = 3;
-    v1.setParallelism(newNumTasks, null, null, null, true);
+    v1.reconfigureVertex(newNumTasks, null, null);
     v1.doneReconfiguringVertex();
     dispatcher.await();
     Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks());
@@ -5039,9 +5047,12 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
     Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
     
-    Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap();
-    edges.put("B", mockEdgeManagerDescriptor);
-    vC.setParallelism(2, vertexLocationHint, edges, null, true);
+    EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, 
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), 
+        InputDescriptor.create("In"));
+    Map<String, EdgeProperty> edges = Maps.newHashMap();
+    edges.put("B", edgeProp);
+    vC.reconfigureVertex(2, vertexLocationHint, edges);
 
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, vA.getState());
@@ -5105,9 +5116,12 @@ public class TestVertexImpl {
     Assert.assertEquals(0, listener.events.size());
     
     // complete configuration and verify getting configured signal from vB
-    Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap();
-    edges.put("B", mockEdgeManagerDescriptor);
-    vC.setParallelism(2, vertexLocationHint, edges, null, true);
+    EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, 
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), 
+        InputDescriptor.create("In"));
+    Map<String, EdgeProperty> edges = Maps.newHashMap();
+    edges.put("B", edgeProp);
+    vC.reconfigureVertex(2, vertexLocationHint, edges);
 
     dispatcher.await();
     Assert.assertEquals(1, listener.events.size());

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index a91c0f8..bf61ff0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -33,6 +33,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -302,8 +308,8 @@ public class TestHistoryEventsProtoConversion {
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
       Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
-      Assert.assertEquals(event.getSourceEdgeManagers(),
-          deserializedEvent.getSourceEdgeManagers());
+      Assert.assertEquals(event.getSourceEdgeProperties(),
+          deserializedEvent.getSourceEdgeProperties());
       Assert.assertEquals(event.getVertexLocationHint(),
           deserializedEvent.getVertexLocationHint());
       Assert.assertEquals(event.getRootInputSpecUpdates().size(), deserializedEvent
@@ -321,12 +327,17 @@ public class TestHistoryEventsProtoConversion {
       logEvents(event, deserializedEvent);
     }
     {
-      Map<String,EdgeManagerPluginDescriptor> sourceEdgeManagers
-          = new LinkedHashMap<String, EdgeManagerPluginDescriptor>();
-      sourceEdgeManagers.put("foo", EdgeManagerPluginDescriptor.create("bar"));
-      sourceEdgeManagers.put("foo1", EdgeManagerPluginDescriptor.create("bar1")
+      Map<String, EdgeProperty> sourceEdgeManagers
+          = new LinkedHashMap<String, EdgeProperty>();
+      // add standard and custom edge
+      sourceEdgeManagers.put("foo", EdgeProperty.create(DataMovementType.SCATTER_GATHER, 
+          DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+          OutputDescriptor.create("Out1"), InputDescriptor.create("in1")));
+      sourceEdgeManagers.put("foo1", EdgeProperty.create(EdgeManagerPluginDescriptor.create("bar1")
           .setUserPayload(
-              UserPayload.create(ByteBuffer.wrap(new String("payload").getBytes()), 100)));
+              UserPayload.create(ByteBuffer.wrap(new String("payload").getBytes()), 100)), 
+          DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+          OutputDescriptor.create("Out1"), InputDescriptor.create("in1")));
       VertexParallelismUpdatedEvent event =
           new VertexParallelismUpdatedEvent(
               TezVertexID.getInstance(
@@ -336,24 +347,26 @@ public class TestHistoryEventsProtoConversion {
               new HashSet<String>(Arrays.asList("r1"))))),
               sourceEdgeManagers, null, 1);
 
-      VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
-          testProtoConversion(event);
+      VertexParallelismUpdatedEvent deserializedEvent = 
+          (VertexParallelismUpdatedEvent) testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
       Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
-      Assert.assertEquals(event.getSourceEdgeManagers().size(),
-          deserializedEvent.getSourceEdgeManagers().size());
-      Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
-          deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
-      Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
-      Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
-          deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
-      Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion(),
-          deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion());
-      Assert.assertArrayEquals(
-          event.getSourceEdgeManagers().get("foo1").getUserPayload().deepCopyAsArray(),
-          deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().deepCopyAsArray());
-      Assert.assertEquals(event.getVertexLocationHint(),
-          deserializedEvent.getVertexLocationHint());
+      Assert.assertEquals(event.getSourceEdgeProperties().size(), deserializedEvent
+          .getSourceEdgeProperties().size());
+      Assert.assertEquals(event.getSourceEdgeProperties().get("foo").getDataMovementType(),
+          deserializedEvent.getSourceEdgeProperties().get("foo").getDataMovementType());
+      Assert.assertNull(deserializedEvent.getSourceEdgeProperties().get("foo")
+          .getEdgeManagerDescriptor());
+      Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getDataMovementType(),
+          deserializedEvent.getSourceEdgeProperties().get("foo1").getDataMovementType());
+      Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor()
+          .getUserPayload().getVersion(), deserializedEvent.getSourceEdgeProperties().get("foo1")
+          .getEdgeManagerDescriptor().getUserPayload().getVersion());
+      Assert.assertArrayEquals(event.getSourceEdgeProperties().get("foo1")
+          .getEdgeManagerDescriptor().getUserPayload().deepCopyAsArray(), deserializedEvent
+          .getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor().getUserPayload()
+          .deepCopyAsArray());
+      Assert.assertEquals(event.getVertexLocationHint(), deserializedEvent.getVertexLocationHint());
       logEvents(event, deserializedEvent);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 0eeba0d..bbf29e3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -29,6 +29,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -195,9 +201,12 @@ public class TestHistoryEventJsonConversion {
     TezVertexID vId = TezVertexID.getInstance(
         TezDAGID.getInstance(
             ApplicationId.newInstance(1l, 1), 1), 1);
-    Map<String, EdgeManagerPluginDescriptor> edgeMgrs =
-        new HashMap<String, EdgeManagerPluginDescriptor>();
-    edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text"));
+    Map<String, EdgeProperty> edgeMgrs =
+        new HashMap<String, EdgeProperty>();
+    
+    edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class")
+        .setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("Out"), InputDescriptor.create("In")));
     VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
         edgeMgrs, null, 10);
 
@@ -223,6 +232,9 @@ public class TestHistoryEventJsonConversion {
     Assert.assertNotNull(updatedEdgeMgrs.getJSONObject("a"));
     JSONObject updatedEdgeMgr = updatedEdgeMgrs.getJSONObject("a");
 
+    Assert.assertEquals(DataMovementType.CUSTOM.name(),
+        updatedEdgeMgr.getString(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
+    Assert.assertEquals("In", updatedEdgeMgr.getString(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
     Assert.assertEquals("a.class", updatedEdgeMgr.getString(DAGUtils.EDGE_MANAGER_CLASS_KEY));
 
     JSONObject otherInfo = jsonObject.getJSONObject(ATSConstants.OTHER_INFO);

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index e1c2a72..fdd8f19 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -27,7 +27,7 @@ import java.util.TreeMap;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.tez.common.ATSConstants;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -603,12 +603,12 @@ public class HistoryEventTimelineConversion {
     updateEvt.setTimestamp(event.getUpdateTime());
 
     Map<String,Object> eventInfo = new HashMap<String, Object>();
-    if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) {
+    if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
       Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>();
-      for (Entry<String, EdgeManagerPluginDescriptor> entry :
-          event.getSourceEdgeManagers().entrySet()) {
+      for (Entry<String, EdgeProperty> entry :
+          event.getSourceEdgeProperties().entrySet()) {
         updatedEdgeManagers.put(entry.getKey(),
-            DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue()));
+            DAGUtils.convertEdgeProperty(entry.getValue()));
       }
       eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index a2b0f89..14330ba 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -34,6 +34,12 @@ import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -799,9 +805,12 @@ public class TestHistoryEventTimelineConversion {
   @Test(timeout = 5000)
   public void testConvertVertexParallelismUpdatedEvent() {
     TezVertexID vId = tezVertexID;
-    Map<String, EdgeManagerPluginDescriptor> edgeMgrs =
-        new HashMap<String, EdgeManagerPluginDescriptor>();
-    edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text"));
+    Map<String, EdgeProperty> edgeMgrs =
+        new HashMap<String, EdgeProperty>();
+    
+    edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class")
+        .setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("Out"), InputDescriptor.create("In")));
     VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
         edgeMgrs, null, 10);
 
@@ -829,6 +838,9 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
     Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a");
 
+    Assert.assertEquals(DataMovementType.CUSTOM.name(),
+        updatedEdgeMgr.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
+    Assert.assertEquals("In", updatedEdgeMgr.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
     Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
 
     Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS));

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 98a5873..8671161 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
@@ -123,7 +124,12 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
         // must change parallelism to make them the same
         LOG.info("Update parallelism of vertex: " + getContext().getVertexName() + 
             " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices.");
-        getContext().setVertexParallelism(oneToOneSrcTaskCount, null, null, null);
+        try {
+          getContext().reconfigureVertex(oneToOneSrcTaskCount, null, null);
+        } catch (TezException e) {
+          // TODO fail vertex - TEZ-2292
+          LOG.warn("Failed to change parallelism in: " + getContext().getVertexName(), e);
+        }
       }
       oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
       oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index b6d69dc..9be9986 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPlugin;
@@ -510,11 +511,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
           
     if(finalTaskParallelism < currentParallelism) {
       // final parallelism is less than actual parallelism
-      Map<String, EdgeManagerPluginDescriptor> edgeManagers =
-          new HashMap<String, EdgeManagerPluginDescriptor>(bipartiteSources);
+      Map<String, EdgeProperty> edgeProperties =
+          new HashMap<String, EdgeProperty>(bipartiteSources);
       Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo();
       for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
         String vertex = entry.getKey();
+        EdgeProperty oldEdgeProp = entry.getValue().edgeProperty;
         // use currentParallelism for numSourceTasks to maintain original state
         // for the source tasks
         CustomShuffleEdgeManagerConfig edgeManagerConfig =
@@ -525,10 +527,18 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         EdgeManagerPluginDescriptor edgeManagerDescriptor =
             EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
         edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload());
-        edgeManagers.put(vertex, edgeManagerDescriptor);
+        EdgeProperty newEdgeProp = EdgeProperty.create(edgeManagerDescriptor,
+            oldEdgeProp.getDataSourceType(), oldEdgeProp.getSchedulingType(), 
+            oldEdgeProp.getEdgeSource(), oldEdgeProp.getEdgeDestination());
+        edgeProperties.put(vertex, newEdgeProp);
       }
       
-      getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
+      try {
+        getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties);
+      } catch (TezException e) {
+        // TODO fail vertex - TEZ-2292
+        LOG.warn("Failed to change parallelism in: " + getContext().getVertexName(), e);
+      }
       updatePendingTasks();
     }
     return true;

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 411ea71..b164a6d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -257,7 +257,7 @@ public class TestInputReadyVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    verify(mockContext, times(1)).setVertexParallelism(3, null, null, null);
+    verify(mockContext, times(1)).reconfigureVertex(3, null, null);
     verify(mockContext, times(1)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
     
@@ -275,8 +275,8 @@ public class TestInputReadyVertexManager {
     } catch (TezUncheckedException e) {
       e.getMessage().contains("1-1 source vertices must have identical concurrency");
     }
-    verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
-        anyMap(), anyMap()); // not invoked
+    verify(mockContext, times(1)).reconfigureVertex(anyInt(), (VertexLocationHint) any(),
+        anyMap()); // not invoked
     
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     
@@ -288,8 +288,8 @@ public class TestInputReadyVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
-        anyMap(), anyMap()); // not invoked
+    verify(mockContext, times(1)).reconfigureVertex(anyInt(), (VertexLocationHint) any(),
+        anyMap()); // not invoked
     verify(mockContext, times(2)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done

http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 4d9302e..27cd292 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -30,6 +30,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -170,11 +171,11 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) throws Exception {
           when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
           newEdgeManagers.clear();
-          for (Entry<String, EdgeManagerPluginDescriptor> entry :
-              ((Map<String, EdgeManagerPluginDescriptor>)invocation.getArguments()[2]).entrySet()) {
+          for (Entry<String, EdgeProperty> entry :
+              ((Map<String, EdgeProperty>)invocation.getArguments()[2]).entrySet()) {
 
-
-            final UserPayload userPayload = entry.getValue().getUserPayload();
+            EdgeManagerPluginDescriptor pluginDesc = entry.getValue().getEdgeManagerDescriptor();
+            final UserPayload userPayload = pluginDesc.getUserPayload();
             EdgeManagerPluginContext emContext = new EdgeManagerPluginContext() {
               @Override
               public UserPayload getUserPayload() {
@@ -202,13 +203,13 @@ public class TestShuffleVertexManager {
               }
             };
             EdgeManagerPlugin edgeManager = ReflectionUtils
-                .createClazzInstance(entry.getValue().getClassName(),
+                .createClazzInstance(pluginDesc.getClassName(),
                     new Class[]{EdgeManagerPluginContext.class}, new Object[]{emContext});
             edgeManager.initialize();
             newEdgeManagers.put(entry.getKey(), edgeManager);
           }
           return null;
-      }}).when(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap());
+      }}).when(mockContext).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
@@ -264,11 +265,11 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
-    verify(mockContext, times(0)).setVertexParallelism(anyInt(), any(VertexLocationHint.class), anyMap(), anyMap());
+    verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    verify(mockContext, times(0)).setVertexParallelism(anyInt(), any(VertexLocationHint.class), anyMap(), anyMap());
+    verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());
@@ -320,9 +321,7 @@ public class TestShuffleVertexManager {
     vmEvent = VertexManagerEvent.create("Vertex", payload);
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
-    verify(mockContext, times(1)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
-        anyMap(),
-        anyMap());
+    verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -357,17 +356,13 @@ public class TestShuffleVertexManager {
       manager.onVertexManagerEventReceived(vmEvent); //small payload
       manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i));
       //should not change parallelism
-      verify(mockContext, times(0)).setVertexParallelism(eq(4), any(VertexLocationHint.class),
-          anyMap(),
-          anyMap());
+      verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
     }
     //send 8th event with payload size as 100
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8));
     //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
-    verify(mockContext, times(1)).setVertexParallelism(eq(4), any(VertexLocationHint.class),
-        anyMap(),
-        anyMap());
+    verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
 
     //reset context for next test
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
@@ -409,9 +404,7 @@ public class TestShuffleVertexManager {
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
     // managedVertex tasks reduced
-    verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
-        anyMap(),
-        anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     // TODO improve tests for parallelism
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -424,9 +417,7 @@ public class TestShuffleVertexManager {
     
     // more completions dont cause recalculation of parallelism
     manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
-    verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
-        anyMap(),
-        anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
     EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -742,52 +733,6 @@ public class TestShuffleVertexManager {
     when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
     when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
 
-    final Map<String, EdgeManagerPlugin> edgeManagerR2 =
-        new HashMap<String, EdgeManagerPlugin>();
-    doAnswer(new Answer() {
-      public Object answer(InvocationOnMock invocation) throws Exception {
-        when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(2);
-        edgeManagerR2.clear();
-        for (Entry<String, EdgeManagerPluginDescriptor> entry :
-            ((Map<String, EdgeManagerPluginDescriptor>)invocation.getArguments()[2]).entrySet()) {
-
-
-          final UserPayload userPayload = entry.getValue().getUserPayload();
-          EdgeManagerPluginContext emContext = new EdgeManagerPluginContext() {
-            @Override
-            public UserPayload getUserPayload() {
-              return userPayload == null ? null : userPayload;
-            }
-
-            @Override
-            public String getSourceVertexName() {
-              return null;
-            }
-
-            @Override
-            public String getDestinationVertexName() {
-              return null;
-            }
-
-            @Override
-            public int getSourceVertexNumTasks() {
-              return 2;
-            }
-
-            @Override
-            public int getDestinationVertexNumTasks() {
-              return 2;
-            }
-          };
-          EdgeManagerPlugin edgeManager = ReflectionUtils
-              .createClazzInstance(entry.getValue().getClassName(),
-                  new Class[]{EdgeManagerPluginContext.class}, new Object[]{emContext});
-          edgeManager.initialize();
-          edgeManagerR2.put(entry.getKey(), edgeManager);
-        }
-        return null;
-      }}).when(mockContext_R2).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap());
-
     ByteBuffer payload =
         VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer();
     VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
@@ -834,15 +779,18 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
     //Ensure that setVertexParallelism is not called for R2.
-    verify(mockContext_R2, times(0)).setVertexParallelism(anyInt(), any(VertexLocationHint.class),
-        anyMap(),
-        anyMap());
+    try {
+      verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
+          anyMap());
+      // complete configuration of r1 triggers the scheduling
+      manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+      verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class),
+          anyMap());
+    } catch (TezException e) {
+      e.printStackTrace();
+      Assert.fail(); // should not happen
+    }
 
-    // complete configuration of r1 triggers the scheduling
-    manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
-    verify(mockContext_R2, times(1)).setVertexParallelism(eq(1), any(VertexLocationHint.class),
-        anyMap(),
-        anyMap());
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
 


Mime
View raw message