tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-724. Allow configuration of CUSTOM edges on the DAG API. (sseth)
Date Wed, 22 Jan 2014 20:16:42 GMT
Updated Branches:
  refs/heads/master 1600b5653 -> cfed8464c


TEZ-724. Allow configuration of CUSTOM edges on the DAG API. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cfed8464
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cfed8464
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cfed8464

Branch: refs/heads/master
Commit: cfed8464c855a64dcad4c6f9c68de8054220d909
Parents: 1600b56
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jan 22 12:16:09 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jan 22 12:16:09 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   |  6 ++
 .../apache/tez/dag/api/DagTypeConverters.java   | 35 ++++++--
 .../main/java/org/apache/tez/dag/api/Edge.java  | 18 ++--
 .../org/apache/tez/dag/api/EdgeManager.java     | 17 ++++
 .../apache/tez/dag/api/EdgeManagerContext.java  | 45 ++++++++++
 .../tez/dag/api/EdgeManagerDescriptor.java      | 32 +++++++
 .../org/apache/tez/dag/api/EdgeProperty.java    | 65 ++++++++++++--
 .../org/apache/tez/dag/api/InputDescriptor.java |  2 +-
 tez-api/src/main/proto/DAGApiRecords.proto      |  2 +
 .../org/apache/tez/dag/api/TestDAGPlan.java     | 42 ++++++++-
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 21 +++++
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |  6 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 22 ++++-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 51 ++++++++++-
 .../tez/dag/app/dag/impl/NullEdgeManager.java   | 76 ++++++++++++++++
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  6 ++
 .../app/dag/impl/ScatterGatherEdgeManager.java  |  6 ++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  1 -
 .../tez/dag/app/dag/impl/VertexManager.java     |  1 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 41 ++++++++-
 .../org/apache/tez/test/EdgeManagerForTest.java | 92 ++++++++++++++++++++
 .../vertexmanager/ShuffleVertexManager.java     |  6 ++
 22 files changed, 563 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 6440378..be2cb5e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -441,6 +442,11 @@ public class DAG { // FIXME rename to Topology
       edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));
       edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
       edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
+      if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM) {
+        if (edge.getEdgeProperty().getEdgeManagerDescriptor() != null) {
+          edgeBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeManagerDescriptor()));
+        } // else the AM will deal with this.
+      }
       dagBuilder.addEdge(edgeBuilder);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index f80f1db..3c3d4d9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -108,6 +108,7 @@ public class DagTypeConverters {
       case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE;
       case BROADCAST : return PlanEdgeDataMovementType.BROADCAST;
       case SCATTER_GATHER : return PlanEdgeDataMovementType.SCATTER_GATHER;
+      case CUSTOM: return PlanEdgeDataMovementType.CUSTOM;
       default : throw new RuntimeException("unknown 'dataMovementType': " + type);
     }
   }
@@ -223,13 +224,23 @@ public class DagTypeConverters {
   }
 
   public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
-    return new EdgeProperty(
-        convertFromDAGPlan(edge.getDataMovementType()),
-        convertFromDAGPlan(edge.getDataSourceType()),
-        convertFromDAGPlan(edge.getSchedulingType()),
-        convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
-        convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
-    );
+    if (edge.getDataMovementType() == PlanEdgeDataMovementType.CUSTOM) {
+      return new EdgeProperty(
+          (edge.hasEdgeManager() ? convertEdgeManagerDescriptorFromDAGPlan(edge.getEdgeManager()) : null),
+          convertFromDAGPlan(edge.getDataSourceType()),
+          convertFromDAGPlan(edge.getSchedulingType()),
+          convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
+          convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
+      );
+    } else {
+      return new EdgeProperty(
+          convertFromDAGPlan(edge.getDataMovementType()),
+          convertFromDAGPlan(edge.getDataSourceType()),
+          convertFromDAGPlan(edge.getSchedulingType()),
+          convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
+          convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
+      );
+    }
   }
 
   public static Resource createResourceRequestFromTaskConfig(
@@ -301,6 +312,16 @@ public class DagTypeConverters {
     return new VertexManagerPluginDescriptor(className).setUserPayload(bb);
   }
 
+  public static EdgeManagerDescriptor convertEdgeManagerDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    byte[] bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().toByteArray();
+    }
+    return new EdgeManagerDescriptor(className).setUserPayload(bb);
+  }
+
   public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
index a893bc3..0e384b3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -17,12 +17,15 @@
  */
 package org.apache.tez.dag.api;
 
-public class Edge{
-  
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+public class Edge {
+
   private final Vertex inputVertex;
   private final Vertex outputVertex;
   private final EdgeProperty edgeProperty;
-    
+
+  // InputVertex(EdgeInput) ----- Edge ----- OutputVertex(EdgeOutput)]
   public Edge(Vertex inputVertex, 
                Vertex outputVertex, 
                EdgeProperty edgeProperty) {
@@ -30,23 +33,24 @@ public class Edge{
     this.outputVertex = outputVertex;
     this.edgeProperty = edgeProperty;
   }
-  
+
   // RENAME to source and destination
   public Vertex getInputVertex() {
     return inputVertex;
   }
-  
+
   public Vertex getOutputVertex() {
     return outputVertex;
   }
-  
+
   public EdgeProperty getEdgeProperty() {
     return edgeProperty;
   }
-  
+
   /*
    * Used to identify the edge in the configuration
    */
+  @Private
   public String getId() {
     return String.valueOf(this.hashCode());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
index c623264..48c911f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
@@ -28,10 +28,27 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
  * This interface defines the routing of the event between tasks of producer and 
  * consumer vertices. The routing is bi-directional. Users can customize the 
  * routing by providing an implementation of this interface.
+ * 
+ * Implementations must provide a 0 argument public constructor.
  */
 public interface EdgeManager {
   
   /**
+   * Initializes the EdgeManager. This method is called in the following
+   * circumstances </p> 1. when initializing an Edge Manager for the first time.
+   * </p> 2. When an EdgeManager is replaced at runtime. At this point, an
+   * EdgeManager instance is created and setup by the user. The initialize
+   * method will be called with the original {@link EdgeManagerContext} when the
+   * edgeManager is replaced.
+   * 
+   * @param edgeManagerContext
+   *          the context within which this EdgeManager will run. Includes
+   *          information like configuration which the user may have specified
+   *          while setting up the edge.
+   */
+  public void initialize(EdgeManagerContext edgeManagerContext);
+  
+  /**
    * Get the number of inputs on the destination task
    * @param numSourceTasks Total number of source tasks
    * @param destinationTaskIndex Index of destination task for which number of 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
new file mode 100644
index 0000000..f2f9236
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Context information provided to Edge plugins on initialization.
+ *
+ */
+public interface EdgeManagerContext {
+  
+  /**
+   * Returns the byte payload specified by the user for the edge.
+   * @return the byte payload specified by the user
+   */
+  public byte[] getUserPayload();
+  
+  /**
+   * Returns the source vertex name 
+   * @return the source vertex name
+   */
+  public String getSrcVertexName();
+  
+  /**
+   * Returns the destination vertex name
+   * @return the destination vertex name
+   */
+  public String getDestVertexName();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
new file mode 100644
index 0000000..6ecbb75
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class EdgeManagerDescriptor extends TezEntityDescriptor {
+
+  public EdgeManagerDescriptor(String edgeManagerClassName) {
+    super(edgeManagerClassName);
+  }
+
+  @Override
+  public EdgeManagerDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index 326d3d0..eb012c6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -17,6 +17,10 @@
  */
 package org.apache.tez.dag.api;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+import com.google.common.base.Preconditions;
+
 public class EdgeProperty {
   
   /**
@@ -41,7 +45,12 @@ public class EdgeProperty {
      * the same destination task. Source tasks scatter their outputs and they
      * are gathered by designated destination tasks.
      */
-    SCATTER_GATHER
+    SCATTER_GATHER,
+    
+    /**
+     * Custom routing defined by the user.
+     */
+    CUSTOM
   }
   
   /**
@@ -82,15 +91,17 @@ public class EdgeProperty {
     CONCURRENT
   }
   
-  DataMovementType dataMovementType;
-  DataSourceType dataSourceType;
-  SchedulingType schedulingType;
-  InputDescriptor inputDescriptor;
-  OutputDescriptor outputDescriptor;
+  final DataMovementType dataMovementType;
+  final DataSourceType dataSourceType;
+  final SchedulingType schedulingType;
+  final InputDescriptor inputDescriptor;
+  final OutputDescriptor outputDescriptor;
+  final EdgeManagerDescriptor edgeManagerDescriptor;
   
   /**
    * @param dataMovementType
    * @param dataSourceType
+   * @param schedulingType
    * @param edgeSource
    *          The {@link OutputDescriptor} that generates data on the edge.
    * @param edgeDestination
@@ -101,9 +112,38 @@ public class EdgeProperty {
                        SchedulingType schedulingType,
                        OutputDescriptor edgeSource,
                        InputDescriptor edgeDestination) {
+    Preconditions.checkArgument(dataMovementType != DataMovementType.CUSTOM,
+        DataMovementType.CUSTOM + " cannot be used with this constructor");
     this.dataMovementType = dataMovementType;
     this.dataSourceType = dataSourceType;
     this.schedulingType = schedulingType;
+    this.outputDescriptor = edgeSource;
+    this.inputDescriptor = edgeDestination;
+    this.edgeManagerDescriptor = null;
+  }
+  
+  /**
+   * Setup an Edge which uses a custom EdgeManager
+   * 
+   * @param edgeManagerDescriptor
+   *          the EdgeManager specifications. This can be null if the edge
+   *          manager will be setup at runtime
+   * @param dataSourceType
+   * @param schedulingType
+   * @param edgeSource
+   *          The {@link OutputDescriptor} that generates data on the edge.
+   * @param edgeDestination
+   *          The {@link InputDescriptor} which will consume data from the edge.
+   */
+  public EdgeProperty(EdgeManagerDescriptor edgeManagerDescriptor,
+                      DataSourceType dataSourceType,
+                      SchedulingType schedulingType,
+                      OutputDescriptor edgeSource,
+                      InputDescriptor edgeDestination) {
+    this.dataMovementType = DataMovementType.CUSTOM;
+    this.edgeManagerDescriptor = edgeManagerDescriptor;
+    this.dataSourceType = dataSourceType;
+    this.schedulingType = schedulingType;
     this.inputDescriptor = edgeDestination;
     this.outputDescriptor = edgeSource;
   }
@@ -138,10 +178,21 @@ public class EdgeProperty {
     return outputDescriptor;
   }
   
+  /**
+   * Returns the Edge Manager specifications for this edge.  
+   * @return @link {@link EdgeManagerDescriptor} if a custom edge was setup, null otherwise.
+   */
+  @Private
+  public EdgeManagerDescriptor getEdgeManagerDescriptor() {
+    return edgeManagerDescriptor;
+  }
+  
   @Override
   public String toString() {
     return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
-        + " >> " + dataSourceType + " >> " + outputDescriptor.getClassName() + " }";
+        + " >> " + dataSourceType + " >> " + outputDescriptor.getClassName()
+        + " >> " + (edgeManagerDescriptor == null ? "NullEdgeManager" : edgeManagerDescriptor.getClassName())
+        + " }";
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index 24b542a..45e4b77 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -26,7 +26,7 @@ public class InputDescriptor extends TezEntityDescriptor {
   public InputDescriptor() {
     super();
   }
-  
+
   public InputDescriptor(String inputClassName) {
     super(inputClassName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index e133297..90da45c 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -39,6 +39,7 @@ enum PlanEdgeDataMovementType {
   ONE_TO_ONE = 0;
   BROADCAST = 1;
   SCATTER_GATHER = 2;
+  CUSTOM = 3;
 }
 
 enum PlanEdgeDataSourceType {
@@ -130,6 +131,7 @@ message EdgePlan {
   required PlanEdgeSchedulingType schedulingType = 6;
   optional TezEntityDescriptorProto edge_source = 7;
   optional TezEntityDescriptorProto edge_destination = 8;
+  optional TezEntityDescriptorProto edge_manager = 9;
 }
 
 message ConfigurationProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 53ec357..cb9a132 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -47,7 +48,7 @@ public class TestDAGPlan {
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder(); //TODO: doesn't seem to be deleting this folder automatically as expected.
 
-  @Test
+  @Test(timeout = 5000)
   public void testBasicJobPlanSerde() throws IOException {
 
     DAGPlan job = DAGPlan.newBuilder()
@@ -92,7 +93,42 @@ public class TestDAGPlan {
    Assert.assertEquals(job, inJob);
   }
 
-  @Test
+  @Test(timeout = 5000)
+  public void testEdgeManagerSerde() {
+    DAG dag = new DAG("testDag");
+    ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1")
+        .setUserPayload("processor1Bytes".getBytes());
+    ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2")
+        .setUserPayload("processor2Bytes".getBytes());
+    Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
+    Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
+    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalResources(new HashMap<String, LocalResource>());
+    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalResources(new HashMap<String, LocalResource>());
+
+    InputDescriptor inputDescriptor = new InputDescriptor("input").setUserPayload("inputBytes"
+        .getBytes());
+    OutputDescriptor outputDescriptor = new OutputDescriptor("output").setUserPayload("outputBytes"
+        .getBytes());
+    Edge edge = new Edge(v1, v2, new EdgeProperty(
+        new EdgeManagerDescriptor("emClass").setUserPayload("emPayload".getBytes()),
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor));
+
+    dag.addVertex(v1).addVertex(v2).addEdge(edge);
+
+    DAGPlan dagProto = dag.createDag(new TezConfiguration());
+
+    EdgeProperty edgeProperty = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dagProto
+        .getEdgeList().get(0));
+
+    EdgeManagerDescriptor emDesc = edgeProperty.getEdgeManagerDescriptor();
+    Assert.assertNotNull(emDesc);
+    Assert.assertEquals("emClass", emDesc.getClassName());
+    Assert.assertTrue(Arrays.equals("emPayload".getBytes(), emDesc.getUserPayload()));
+  }
+
+  @Test(timeout = 5000)
   public void testUserPayloadSerde() {
     DAG dag = new DAG("testDag");
     ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1").
@@ -136,7 +172,7 @@ public class TestDAGPlan {
     assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
         .getUserPayload().toByteArray()));
     assertEquals("input", edgeProto.getEdgeDestination().getClassName());
-
+             
     assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
         .getUserPayload().toByteArray()));
     assertEquals("output", edgeProto.getEdgeSource().getClassName());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index a4fe4af..e60a7a5 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -56,6 +56,27 @@ public class TestDAGVerify {
     dag.verify();
   }
 
+  @Test
+  public void testVerifyCustomEdge() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(new EdgeManagerDescriptor("emClass"),
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.verify();
+  }
+
   @Test  
   public void testVerifyOneToOne() {
     Vertex v1 = new Vertex("v1",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 5893a74..40a8872 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
 import java.util.List;
 
 import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
@@ -28,6 +29,11 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 public class BroadcastEdgeManager implements EdgeManager {
 
   @Override
+  public void initialize(EdgeManagerContext edgeManagerContext) {
+    // Nothing to do.
+  }
+  
+  @Override
   public int getNumDestinationTaskInputs(int numSourceTasks, 
       int destinationTaskIndex) {
     return numSourceTasks;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 4c9ec64..d116b46 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -51,10 +51,12 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -1009,7 +1011,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         return dag.finished(DAGState.FAILED);
       }
 
-      // create the vertices
+      // create the vertices`
       for (int i=0; i < dag.numVertices; ++i) {
         String vertexName = dag.getJobPlan().getVertex(i).getName();
         VertexImpl v = createVertex(dag, vertexName, i);
@@ -1024,6 +1026,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         parseVertexEdges(dag, edgePlans, v);
       }
 
+      // Initialize the edges, now that the payload and vertices have been set.
+      for (Edge e : dag.edges.values()) {
+        e.initialize();
+      }
+
       assignDAGScheduler(dag);
 
       // TODO Metrics
@@ -1037,6 +1044,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
         EdgeProperty edgeProperty = DagTypeConverters
             .createEdgePropertyMapFromDAGPlan(edgePlan);
+        
+        // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
+        // referencing the fake edge manager within the API module.
+        if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
+            && edgeProperty.getEdgeManagerDescriptor() == null) {
+          EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
+              NullEdgeManager.class.getName());
+          EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
+              edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
+              edgeProperty.getEdgeDestination());
+          edgeProperty = ep;
+        }
+        
         // edge manager may be also set via API when using custom edge type
         dag.edges.put(edgePlan.getId(),
             new Edge(edgeProperty, dag.getEventHandler()));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 9073966..7637d5f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -24,14 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
@@ -44,7 +48,36 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 public class Edge {
 
+  static class EdgeManagerContextImpl implements EdgeManagerContext {
+
+    private final String srcVertexName;
+    private final String destVertexName;
+    private final byte[] userPayload;
+
+    EdgeManagerContextImpl(String srcVertexName, String destVertexName, byte[] userPayload) {
+      this.srcVertexName = srcVertexName;
+      this.destVertexName = destVertexName;
+      this.userPayload = userPayload;
+    }
+
+    @Override
+    public byte[] getUserPayload() {
+      return userPayload;
+    }
+
+    @Override
+    public String getSrcVertexName() {
+      return srcVertexName;
+    }
+
+    @Override
+    public String getDestVertexName() {
+      return destVertexName;
+    }
+  }
+
   private EdgeProperty edgeProperty;
+  private EdgeManagerContext edgeManagerContext;
   private EdgeManager edgeManager;
   @SuppressWarnings("rawtypes")
   private EventHandler eventHandler;
@@ -68,13 +101,27 @@ public class Edge {
     case SCATTER_GATHER:
       edgeManager = new ScatterGatherEdgeManager();
       break;
+    case CUSTOM:
+      String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
+      edgeManager = RuntimeUtils.createClazzInstance(edgeManagerClassName);
+      break;
     default:
       String message = "Unknown edge data movement type: "
           + edgeProperty.getDataMovementType();
       throw new TezUncheckedException(message);
     }
   }
-  
+
+  public void initialize() {
+    byte[] bb = null;
+    if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM) {
+      bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
+    }
+    edgeManagerContext = new EdgeManagerContextImpl(sourceVertex.getName(),
+        destinationVertex.getName(), bb);
+    edgeManager.initialize(edgeManagerContext);
+  }
+
   public EdgeProperty getEdgeProperty() {
     return this.edgeProperty;
   }
@@ -88,6 +135,7 @@ public class Edge {
       throw new TezUncheckedException("Edge manager cannot be null");
     }
     this.edgeManager = edgeManager;
+    this.edgeManager.initialize(edgeManagerContext);
   }
   
   public void setSourceVertex(Vertex sourceVertex) {
@@ -232,5 +280,4 @@ public class Edge {
   private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
     eventHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
new file mode 100644
index 0000000..7cde07b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
@@ -0,0 +1,76 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+public class NullEdgeManager implements EdgeManager {
+
+  public NullEdgeManager() {
+  }
+
+  @Override
+  public void initialize(EdgeManagerContext edgeManagerContext) {
+  }
+
+  @Override
+  public int getNumDestinationTaskInputs(int numSourceTasks, int destinationTaskIndex) {
+    throw new UnsupportedOperationException(
+        "Cannot route events. EdgeManager should have been replaced at runtime");
+  }
+
+  @Override
+  public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+    throw new UnsupportedOperationException(
+        "Cannot route events. EdgeManager should have been replaced at runtime");
+  }
+
+  @Override
+  public void routeEventToDestinationTasks(DataMovementEvent event, int sourceTaskIndex,
+      int numDestinationTasks, List<Integer> taskIndices) {
+    throw new UnsupportedOperationException(
+        "Cannot route events. EdgeManager should have been replaced at runtime");
+  }
+
+  @Override
+  public void routeEventToDestinationTasks(InputFailedEvent event, int sourceTaskIndex,
+      int numDestinationTasks, List<Integer> taskIndices) {
+    throw new UnsupportedOperationException(
+        "Cannot route events. EdgeManager should have been replaced at runtime");
+  }
+
+  @Override
+  public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks) {
+    throw new UnsupportedOperationException(
+        "Cannot route events. EdgeManager should have been replaced at runtime");
+  }
+
+  @Override
+  public int routeEventToSourceTasks(int destinationTaskIndex, InputReadErrorEvent event) {
+    throw new UnsupportedOperationException(
+        "Cannot route events. EdgeManager should have been replaced at runtime");
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index cb5feb2..e026db9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
 import java.util.List;
 
 import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
@@ -28,6 +29,11 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 public class OneToOneEdgeManager implements EdgeManager {
 
   @Override
+  public void initialize(EdgeManagerContext edgeManagerContext) {
+    // Nothing to do.
+  }
+
+  @Override
   public int getNumDestinationTaskInputs(int numDestinationTasks, 
       int destinationTaskIndex) {
     return 1;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index c9f0503..230e987 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
 import java.util.List;
 
 import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
@@ -28,6 +29,11 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 public class ScatterGatherEdgeManager implements EdgeManager {
 
   @Override
+  public void initialize(EdgeManagerContext edgeManagerContext) {
+    // Nothing to do.
+  }
+
+  @Override
   public int getNumDestinationTaskInputs(int numSourceTasks,
       int destinationTaskIndex) {
     return numSourceTasks;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 96a655e..48bb55b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2138,7 +2138,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
     this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {
-
       OutputDescriptor od = DagTypeConverters
           .convertOutputDescriptorFromDAGPlan(output.getEntityDescriptor());
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 4fd64bb..d7b2ad2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -129,6 +129,7 @@ public class VertexManager {
     }
     if (payload == null) {
       // Ease of use. If no payload present then give the common configuration
+      // TODO TEZ-744 Don't do this - AMConf should not be used to configure vertexManagers.
       try {
         payload = TezUtils.createUserPayloadFromConf(appContext.getAMConf());
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 41296bb..187f9f9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -30,7 +30,9 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -109,6 +111,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.test.EdgeManagerForTest;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -140,6 +143,7 @@ public class TestVertexImpl {
   private VertexLocationHint vertexLocationHint = null;
   private Configuration conf;
   private Map<String, Edge> edges;
+  private byte[] edgePayload = "EP".getBytes();
 
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
   private TaskEventDispatcher taskEventDispatcher;
@@ -701,7 +705,12 @@ public class TestVertexImpl {
                     .setInputVertexName("vertex1")
                     .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
                     .setOutputVertexName("vertex3")
-                    .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                    .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+                    .setEdgeManager(
+                        TezEntityDescriptorProto.newBuilder()
+                        .setClassName(EdgeManagerForTest.class.getName())
+                        .setUserPayload(ByteString.copyFrom(edgePayload))
+                        .build())
                     .setId("e1")
                     .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                     .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -1128,6 +1137,11 @@ public class TestVertexImpl {
     }
 
     parseVertexEdges();
+    
+    for (Edge edge : edges.values()) {
+      edge.initialize();
+    }
+    
     taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
     dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
     taskEventDispatcher = new TaskEventDispatcher();
@@ -1292,6 +1306,31 @@ public class TestVertexImpl {
     Assert.assertTrue(v3.sourceVertices.get(v1).getEdgeManager() == mockEdgeManager);
   }
 
+  @Test(timeout = 5000)
+  public void testSetCustomEdgeManager() throws UnsupportedEncodingException {
+    initAllVertices(VertexState.INITED);
+    Edge edge = edges.get("e1");
+    EdgeManager em = edge.getEdgeManager();
+    EdgeManagerForTest originalEm = (EdgeManagerForTest) em;
+    Assert.assertEquals(true, originalEm.isCreatedByFramework());
+    Assert.assertTrue(Arrays.equals(edgePayload, originalEm.getEdgeManagerContext()
+        .getUserPayload()));
+
+    em = EdgeManagerForTest.createInstance();
+    Vertex v1 = vertices.get("vertex1");
+    Vertex v3 = vertices.get("vertex3"); // Vertex3 linked to v1 (v1 src, v3
+                                         // dest)
+    Map<String, EdgeManager> edgeManagers = Collections.singletonMap(v1.getName(), em);
+    v3.setParallelism(v3.getTotalTasks() - 1, edgeManagers); // Must decrease.
+
+    EdgeManagerForTest edgeManagerPostSet = (EdgeManagerForTest) edge.getEdgeManager();
+    Assert.assertEquals(false, edgeManagerPostSet.isCreatedByFramework());
+
+    // Ensure initialize() is called with the correct payload
+    Assert.assertTrue(Arrays.equals(originalEm.getEdgeManagerContext().getUserPayload(),
+        edgeManagerPostSet.getEdgeManagerContext().getUserPayload()));
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testBasicVertexCompletion() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
new file mode 100644
index 0000000..ff8fdbe
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import java.util.List;
+
+import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+public class EdgeManagerForTest implements EdgeManager {
+
+  private EdgeManagerContext edgeManagerContext = null;
+  private boolean createdByFramework = true;
+
+  public static EdgeManagerForTest createInstance() {
+    EdgeManagerForTest e = new EdgeManagerForTest();
+    e.createdByFramework = false;
+    return e;
+  }
+  
+  public boolean isCreatedByFramework() {
+    return createdByFramework;
+  }
+  
+  public EdgeManagerContext getEdgeManagerContext() {
+    return edgeManagerContext;
+  }
+
+  
+  
+  // Overridden methods
+  
+  public EdgeManagerForTest() {
+  }
+
+  @Override
+  public void initialize(EdgeManagerContext edgeManagerContext) {
+    this.edgeManagerContext = edgeManagerContext;
+  }
+
+  @Override
+  public int getNumDestinationTaskInputs(int numSourceTasks, int destinationTaskIndex) {
+    return 0;
+  }
+
+  @Override
+  public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+    return 0;
+  }
+
+  @Override
+  public void routeEventToDestinationTasks(DataMovementEvent event, int sourceTaskIndex,
+      int numDestinationTasks, List<Integer> taskIndices) {
+  }
+
+  @Override
+  public void routeEventToDestinationTasks(InputFailedEvent event, int sourceTaskIndex,
+      int numDestinationTasks, List<Integer> taskIndices) {
+  }
+
+  @Override
+  public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks) {
+    return 0;
+  }
+
+  @Override
+  public int routeEventToSourceTasks(int destinationTaskIndex, InputReadErrorEvent event) {
+    return 0;
+  }
+  
+  // End of overridden methods
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfed8464/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 15275f7..835f9f0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -127,6 +128,11 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
 
     @Override
+    public void initialize(EdgeManagerContext edgeManagerContext) {
+      // Nothing to do. This class isn't currently designed to be used at the DAG API level.
+    }
+
+    @Override
     public int getNumDestinationTaskInputs(int numSourceTasks, 
         int destinationTaskIndex) {
       int partitionRange = 1;


Mime
View raw message