tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] git commit: TEZ-198. Allow setting a per vertex/stage user payload in the DAG API. Part of TEZ-182. (sseth)
Date Mon, 10 Jun 2013 04:34:09 GMT
Updated Branches:
  refs/heads/master ca1e53e89 -> 81f43ef93


TEZ-198. Allow setting a per vertex/stage user payload in the DAG API. Part of TEZ-182.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3780729e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3780729e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3780729e

Branch: refs/heads/master
Commit: 3780729e6a62a93f943d420a2e63d5c02bf5b164
Parents: ca1e53e
Author: Siddharth Seth <sseth@apache.org>
Authored: Sun Jun 9 21:26:28 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Sun Jun 9 21:26:28 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   | 15 ++--
 .../apache/tez/dag/api/DagTypeConverters.java   | 60 +++++++++++++--
 .../org/apache/tez/dag/api/EdgeProperty.java    | 24 +++---
 .../org/apache/tez/dag/api/InputDescriptor.java | 29 ++++++++
 .../apache/tez/dag/api/OutputDescriptor.java    | 29 ++++++++
 .../apache/tez/dag/api/ProcessorDescriptor.java | 29 ++++++++
 .../apache/tez/dag/api/TezEntityDescriptor.java | 40 ++++++++++
 .../java/org/apache/tez/dag/api/Vertex.java     | 12 +--
 tez-dag-api/src/main/proto/DAGApiRecords.proto  | 11 ++-
 .../org/apache/tez/dag/api/TestDAGPlan.java     | 78 ++++++++++++++++++++
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 62 ++++++++--------
 .../apache/tez/dag/app/MRRExampleHelper.java    | 35 +++++----
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  6 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 27 +++----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 27 +++----
 .../dag/app/dag/impl/TestVertexScheduler.java   | 11 ++-
 .../org/apache/tez/mapreduce/YARNRunner.java    | 21 ++++--
 17 files changed, 394 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index f4185f2..cd6391c 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -28,6 +28,7 @@ import java.util.Stack;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -36,7 +37,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+
 
 public class DAG { // FIXME rename to Topology
   List<Vertex> vertices;
@@ -237,7 +238,7 @@ public class DAG { // FIXME rename to Topology
  
   
   // create protobuf message describing DAG
-  public DAGPlan createDag(){
+  public DAGPlan createDag() {
     
     verify(true);
     
@@ -249,8 +250,9 @@ public class DAG { // FIXME rename to Topology
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
       vertexBuilder.setName(vertex.getVertexName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
-      vertexBuilder.setProcessorName(vertex.getProcessorName());
-
+      vertexBuilder.setProcessorDescriptor(DagTypeConverters
+          .convertToDAGPlan(vertex.getProcessorDescriptor()));      
+      
       //task config
       PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
       Resource resource = vertex.getTaskResource();
@@ -322,9 +324,8 @@ public class DAG { // FIXME rename to Topology
       edgeBuilder.setOutputVertexName(edge.getOutputVertex().getVertexName());
       edgeBuilder.setConnectionPattern(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().connectionPattern));
       edgeBuilder.setSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSourceType()));
-      edgeBuilder.setInputClass(edge.getEdgeProperty().inputClass);
-      edgeBuilder.setOutputClass(edge.getEdgeProperty().outputClass);
-
+      edgeBuilder.setInputDescriptor(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getInputDescriptor()));
+      edgeBuilder.setOutputDescriptor(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getOutputDescriptor()));
       jobBuilder.addEdge(edgeBuilder);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 6437080..6773aba 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -17,6 +17,7 @@
  */
 package org.apache.tez.dag.api;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,18 +31,21 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeConnectionPattern;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
-import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
-import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
-import org.apache.tez.dag.api.EdgeProperty.SourceType;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+import com.google.protobuf.ByteString;
 
 
 public class DagTypeConverters {
@@ -195,8 +199,8 @@ public class DagTypeConverters {
            new EdgeProperty(
                convertFromDAGPlan(edge.getConnectionPattern()),
                convertFromDAGPlan(edge.getSourceType()),
-               edge.getInputClass(),
-               edge.getOutputClass()
+               convertInputDescriptorFromDAGPlan(edge.getInputDescriptor()),
+               convertOutputDescriptorFromDAGPlan(edge.getOutputDescriptor())
                )
            );
     }
@@ -216,5 +220,47 @@ public class DagTypeConverters {
       map.put(setting.getKey(), setting.getValue());
     }
     return map;
-  }   
+  }
+
+  public static TezEntityDescriptorProto convertToDAGPlan(
+      TezEntityDescriptor descriptor) {
+    TezEntityDescriptorProto.Builder builder = TezEntityDescriptorProto
+        .newBuilder();
+    builder.setClassName(descriptor.getClassName());
+    if (descriptor.getUserPayload() != null) {
+      builder
+          .setUserPayload(ByteString.copyFrom(descriptor.getUserPayload()));
+    }
+    return builder.build();
+  }
+
+  public static InputDescriptor convertInputDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    ByteBuffer bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().asReadOnlyByteBuffer();
+    }
+    return new InputDescriptor(className, bb);
+  }
+
+  public static OutputDescriptor convertOutputDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    ByteBuffer bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().asReadOnlyByteBuffer();
+    }
+    return new OutputDescriptor(className, bb);
+  }
+
+  public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    ByteBuffer bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().asReadOnlyByteBuffer();
+    }
+    return new ProcessorDescriptor(className, bb);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index 4466f23..f650105 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -33,17 +33,17 @@ public class EdgeProperty { // FIXME rename to ChannelProperty
   
   ConnectionPattern connectionPattern;
   SourceType sourceType;
-  String inputClass;
-  String outputClass;
+  InputDescriptor inputDescriptor;
+  OutputDescriptor outputDescriptor;
   
   public EdgeProperty(ConnectionPattern connectionPattern, 
                        SourceType sourceType,
-                       String inputClass,
-                       String outputClass) {
+                       InputDescriptor inputDescriptor,
+                       OutputDescriptor outputDescriptor) {
     this.connectionPattern = connectionPattern;
     this.sourceType = sourceType;
-    this.inputClass = inputClass;
-    this.outputClass = outputClass;
+    this.inputDescriptor = inputDescriptor;
+    this.outputDescriptor = outputDescriptor;
   }
   
   public ConnectionPattern getConnectionPattern() {
@@ -54,18 +54,18 @@ public class EdgeProperty { // FIXME rename to ChannelProperty
     return sourceType;
   }
   
-  public String getInputClass() {
-    return inputClass;
+  public InputDescriptor getInputDescriptor() {
+    return inputDescriptor;
   }
   
-  public String getOutputClass() {
-    return outputClass;
+  public OutputDescriptor getOutputDescriptor() {
+    return outputDescriptor;
   }
   
   @Override
   public String toString() {
-    return "{ " + connectionPattern + " : " 
-            + inputClass + " >> " + sourceType + " >> " + outputClass + " }";
+    return "{ " + connectionPattern + " : " + inputDescriptor.getClassName()
+        + " >> " + sourceType + " >> " + outputDescriptor.getClassName() + " }";
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
new file mode 100644
index 0000000..3b3054a
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.nio.ByteBuffer;
+
+public class InputDescriptor extends TezEntityDescriptor {
+  
+  // TODO Fix dependencies so that this can be specified as a class.  
+  public InputDescriptor(String inputClassName, ByteBuffer userPayload) {
+    super(inputClassName, userPayload);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
new file mode 100644
index 0000000..e0956d3
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.nio.ByteBuffer;
+
+public class OutputDescriptor extends TezEntityDescriptor {
+
+  // TODO Fix dependencies so that this can be specified as a class.
+  public OutputDescriptor(String outputClassName, ByteBuffer userPayload) {
+    super(outputClassName, userPayload);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
new file mode 100644
index 0000000..9f2a564
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.nio.ByteBuffer;
+
+public class ProcessorDescriptor extends TezEntityDescriptor {
+
+  // TODO Fix dependencies so that this can be specified as a class.
+  public ProcessorDescriptor(String processorClassName, ByteBuffer userPayload) {
+    super(processorClassName, userPayload);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
new file mode 100644
index 0000000..7283310
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.nio.ByteBuffer;
+
+public abstract class TezEntityDescriptor {
+
+  private ByteBuffer userPayload;
+  private String className;
+  
+  public TezEntityDescriptor(String className, ByteBuffer userPayload) {
+    this.userPayload = userPayload;
+    this.className = className;
+  }
+  
+  public ByteBuffer getUserPayload() {
+    return this.userPayload;
+  }
+
+  public String getClassName() {
+    return this.className;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 0d4d8fc..4bd3217 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -29,7 +29,7 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 public class Vertex { // FIXME rename to Task
 
   private final String vertexName;
-  private final String processorName;
+  private final ProcessorDescriptor processorDescriptor;
 
   private final int parallelism;
   private VertexLocationHint taskLocationsHint;
@@ -44,9 +44,9 @@ public class Vertex { // FIXME rename to Task
   private String javaOpts = "";
 
 
-  public Vertex(String vertexName, String processorName, int parallelism) {
+  public Vertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism) {
     this.vertexName = vertexName;
-    this.processorName = processorName;
+    this.processorDescriptor = processorDescriptor;
     this.parallelism = parallelism;
   }
 
@@ -54,8 +54,8 @@ public class Vertex { // FIXME rename to Task
     return vertexName;
   }
 
-  public String getProcessorName() {
-    return processorName;
+  public ProcessorDescriptor getProcessorDescriptor() {
+    return this.processorDescriptor;
   }
 
   public int getParallelism() {
@@ -114,7 +114,7 @@ public class Vertex { // FIXME rename to Task
 
   @Override
   public String toString() {
-    return "[" + vertexName + " : " + processorName + "]";
+    return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";
   }
 
   void addInputVertex(Vertex inputVertex, String edgeId) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGApiRecords.proto b/tez-dag-api/src/main/proto/DAGApiRecords.proto
index 242715b..ae1c370 100644
--- a/tez-dag-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-dag-api/src/main/proto/DAGApiRecords.proto
@@ -92,10 +92,15 @@ message PlanTaskConfiguration {
   repeated PlanKeyValuePair environmentSetting = 8;  
 }
 
+message TezEntityDescriptorProto {
+  optional string class_name = 1;
+  optional bytes user_payload = 2;
+}
+
 message VertexPlan {
   required string name = 1;
   required PlanVertexType type = 2;
-  optional string processorName = 3;
+  optional TezEntityDescriptorProto processor_descriptor = 3;
   required PlanTaskConfiguration taskConfig = 4;
   repeated PlanTaskLocationHint taskLocationHint = 7;
   repeated string inEdgeId = 8;
@@ -108,8 +113,8 @@ message EdgePlan {
   required string outputVertexName = 3;
   required PlanEdgeConnectionPattern connectionPattern = 4;
   required PlanEdgeSourceType sourceType = 5;
-  required string inputClass = 6;
-  required string outputClass = 7;
+  optional TezEntityDescriptorProto input_descriptor = 6;
+  optional TezEntityDescriptorProto output_descriptor = 7;
 }
 
 message DAGPlan {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 6dca610..5e4a007 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -18,12 +18,22 @@
 
 package org.apache.tez.dag.api;
 
+import static org.junit.Assert.*;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
@@ -82,4 +92,72 @@ public class TestDAGPlan {
 
    Assert.assertEquals(job, inJob);
   }
+  
+  @Test
+  public void testUserPayloadSerde() {
+    DAG dag = new DAG().setName("testDag");
+    ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1",
+        ByteBuffer.wrap("processor1Bytes".getBytes()));
+    ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2",
+        ByteBuffer.wrap("processor2Bytes".getBytes()));
+    Vertex v1 = new Vertex("v1", pd1, 10);
+    Vertex v2 = new Vertex("v2", pd2, 1);
+    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalResources(new HashMap<String, LocalResource>())
+        .setTaskResource(Resource.newInstance(1024, 1));
+    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalResources(new HashMap<String, LocalResource>())
+        .setTaskResource(Resource.newInstance(1024, 1));
+
+    InputDescriptor inputDescriptor = new InputDescriptor("input",
+        ByteBuffer.wrap("inputBytes".getBytes()));
+    OutputDescriptor outputDescriptor = new OutputDescriptor("output",
+        ByteBuffer.wrap("outputBytes".getBytes()));
+    Edge edge = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE,
+        SourceType.STABLE, inputDescriptor, outputDescriptor));
+
+    dag.addVertex(v1).addVertex(v2).addEdge(edge);
+
+    DAGPlan dagProto = dag.createDag();
+
+    assertEquals(2, dagProto.getVertexCount());
+    assertEquals(1, dagProto.getEdgeCount());
+
+    VertexPlan v1Proto = dagProto.getVertex(0);
+    VertexPlan v2Proto = dagProto.getVertex(1);
+    EdgePlan edgeProto = dagProto.getEdge(0);
+
+    assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor()
+        .getUserPayload().toByteArray()));
+    assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName());
+
+    assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor()
+        .getUserPayload().toByteArray()));
+    assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName());
+
+    assertEquals("inputBytes", new String(edgeProto.getInputDescriptor()
+        .getUserPayload().toByteArray()));
+    assertEquals("input", edgeProto.getInputDescriptor().getClassName());
+
+    assertEquals("outputBytes", new String(edgeProto.getOutputDescriptor()
+        .getUserPayload().toByteArray()));
+    assertEquals("output", edgeProto.getOutputDescriptor().getClassName());
+
+    Map<String, EdgeProperty> edgePropertyMap = DagTypeConverters
+        .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList());
+    assertEquals(1, edgePropertyMap.size());
+    EdgeProperty edgeProperty = edgePropertyMap.values().iterator().next();
+
+    byte[] ib = new byte[edgeProperty.getInputDescriptor().getUserPayload()
+        .capacity()];
+    edgeProperty.getInputDescriptor().getUserPayload().get(ib);
+    assertEquals("inputBytes", new String(ib));
+    assertEquals("input", edgeProperty.getInputDescriptor().getClassName());
+
+    byte[] ob = new byte[edgeProperty.getOutputDescriptor().getUserPayload()
+        .capacity()];
+    edgeProperty.getOutputDescriptor().getUserPayload().get(ob);
+    assertEquals("outputBytes", new String(ob));
+    assertEquals("output", edgeProperty.getOutputDescriptor().getClassName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 4022604..2b40483 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -35,9 +35,9 @@ public class TestDAGVerify {
   //    v2
   @Test
   public void testVerify1() {
-    Vertex v1 = new Vertex("v1",dummyProcessorClassName, dummyTaskCount);
-    Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, dummyInputClassName, dummyOutputClassName));
+    Vertex v1 = new Vertex("v1", new ProcessorDescriptor(dummyProcessorClassName, null), dummyTaskCount);
+    Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor(dummyInputClassName, null), new OutputDescriptor(dummyOutputClassName, null)));
     DAG dag = new DAG();
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -53,14 +53,14 @@ public class TestDAGVerify {
   @Test
   public void testCycle1() {
     IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-    Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Vertex v4 = new Vertex("v4", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+    Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
     DAG dag = new DAG();
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -89,14 +89,14 @@ public class TestDAGVerify {
   @Test
   public void testCycle2() {
     IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-    Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+    Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Vertex v4 = new Vertex("v4", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+    Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
     DAG dag = new DAG();
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -120,8 +120,8 @@ public class TestDAGVerify {
   @Test
   public void repeatedVertexName() {
     IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-    Vertex v1repeat = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
+    Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+    Vertex v1repeat = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
     DAG dag = new DAG();
     dag.addVertex(v1);
     dag.addVertex(v1repeat);
@@ -143,11 +143,11 @@ public class TestDAGVerify {
   public void BinaryInput() {
     IllegalStateException ex=null;
     try {
-      Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-      Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-      Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-      Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-      Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+      Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+      Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+      Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+      Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+      Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
       DAG dag = new DAG();
       dag.addVertex(v1);
       dag.addVertex(v2);
@@ -171,11 +171,11 @@ public class TestDAGVerify {
   public void BinaryOutput() {
     IllegalStateException ex=null;
     try {
-      Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-      Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-      Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount);
-      Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
-      Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class"));
+      Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+      Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+      Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor", null), dummyTaskCount);
+      Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
+      Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, new InputDescriptor("dummy input class", null), new OutputDescriptor("dummy output class", null)));
       DAG dag = new DAG();
       dag.addVertex(v1);
       dag.addVertex(v2);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
index 35ae558..d487594 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
@@ -21,6 +21,9 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
 import org.apache.tez.engine.lib.input.ShuffledMergedInput;
@@ -88,22 +91,22 @@ public class MRRExampleHelper {
  
  static DAGPlan createDAGConfigurationForMRR() throws IOException {
    org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
-   Vertex mapVertex = new Vertex("map",
-       "org.apache.tez.mapreduce.task.InitialTask", 6);
-   Vertex reduce1Vertex = new Vertex("reduce1",
-       "org.apache.tez.mapreduce.task.IntermediateTask", 3);
-   Vertex reduce2Vertex = new Vertex("reduce2",
-       "org.apache.tez.mapreduce.task.FinalTask", 3);
+    Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
+        "org.apache.tez.mapreduce.task.InitialTask", null), 6);
+    Vertex reduce1Vertex = new Vertex("reduce1", new ProcessorDescriptor(
+        "org.apache.tez.mapreduce.task.IntermediateTask", null), 3);
+    Vertex reduce2Vertex = new Vertex("reduce2", new ProcessorDescriptor(
+        "org.apache.tez.mapreduce.task.FinalTask", null), 3);
    Edge edge1 = new Edge(mapVertex, reduce1Vertex,
        new EdgeProperty(ConnectionPattern.BIPARTITE,
            SourceType.STABLE,
-           ShuffledMergedInput.class.getName(),
-           OnFileSortedOutput.class.getName()));
+           new InputDescriptor(ShuffledMergedInput.class.getName(), null),
+           new OutputDescriptor(OnFileSortedOutput.class.getName(), null)));
    Edge edge2 = new Edge(reduce1Vertex, reduce2Vertex,
        new EdgeProperty(ConnectionPattern.BIPARTITE,
            SourceType.STABLE,
-           ShuffledMergedInput.class.getName(),
-           OnFileSortedOutput.class.getName()));
+           new InputDescriptor(ShuffledMergedInput.class.getName(), null),
+           new OutputDescriptor(OnFileSortedOutput.class.getName(), null)));
    Map<String, LocalResource> jobRsrcs = createLocalResources(getMRRBaseDir(),
        getMRRLocalRsrcList());
 
@@ -156,15 +159,15 @@ public class MRRExampleHelper {
  // TODO remove once client is in place
  static DAGPlan createDAGConfigurationForMR() throws IOException {
    org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
-   Vertex mapVertex = new Vertex("map",
-       "org.apache.tez.mapreduce.task.InitialTask", 6);
-   Vertex reduceVertex = new Vertex("reduce",
-       "org.apache.tez.mapreduce.task.FinalTask", 1);
+    Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
+        "org.apache.tez.mapreduce.task.InitialTask", null), 6);
+    Vertex reduceVertex = new Vertex("reduce", new ProcessorDescriptor(
+        "org.apache.tez.mapreduce.task.FinalTask", null), 1);
    Edge edge = new Edge(mapVertex, reduceVertex,
        new EdgeProperty(ConnectionPattern.BIPARTITE,
            SourceType.STABLE,
-           ShuffledMergedInput.class.getName(),
-           OnFileSortedOutput.class.getName()));
+           new InputDescriptor(ShuffledMergedInput.class.getName(), null),
+           new OutputDescriptor(OnFileSortedOutput.class.getName(), null)));
 
    Map<String, LocalResource> jobRsrcs = createLocalResources(getMRBaseDir(),
        getMRLocalRsrcList());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c8ee314..b5b9c0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -386,7 +386,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.vertexLocationHint = vertexLocationHint;
 
     this.taskResource = DagTypeConverters.CreateResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
-    this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null;
+    this.processorName = vertexPlan.hasProcessorDescriptor() ? vertexPlan.getProcessorDescriptor().getClassName() : null; // TODO Error if this is not set.
     this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig().getLocalResourceList());
     this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig().getEnvironmentSettingList());
     this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
@@ -1282,7 +1282,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     for (Entry<Vertex, EdgeProperty> entry : this.getInputVertices().entrySet()) {
       InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
           entry.getKey().getTotalTasks(),
-          entry.getValue().getInputClass());
+          entry.getValue().getInputDescriptor().getClassName());
       if (LOG.isDebugEnabled()) {
         LOG.debug("For vertex : " + this.getName()
             + ", Using InputSpec : " + inputSpec);
@@ -1301,7 +1301,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       for (Entry<Vertex, EdgeProperty> entry : this.getOutputVertices().entrySet()) {
         OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
             entry.getKey().getTotalTasks(),
-            entry.getValue().getOutputClass());
+            entry.getValue().getOutputDescriptor().getClassName());
         if (LOG.isDebugEnabled()) {
           LOG.debug("For vertex : " + this.getName()
               + ", Using OutputSpec : " + outputSpec);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 9588099..a7b1007 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -178,7 +179,7 @@ public class TestDAGImpl {
             VertexPlan.newBuilder()
             .setName("vertex3")
             .setType(PlanVertexType.NORMAL)
-            .setProcessorName("x3.y3")
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
             .addTaskLocationHint(
                 PlanTaskLocationHint.newBuilder()
                 .addHost("host3")
@@ -271,9 +272,9 @@ public class TestDAGImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i3_v1")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i3_v1"))
             .setInputVertexName("vertex1")
-            .setOutputClass("o1")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
             .setOutputVertexName("vertex3")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e1")
@@ -282,9 +283,9 @@ public class TestDAGImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i3_v2")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i3_v2"))
             .setInputVertexName("vertex2")
-            .setOutputClass("o2")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
             .setOutputVertexName("vertex3")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e2")
@@ -293,9 +294,9 @@ public class TestDAGImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i4_v3")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i4_v3"))
             .setInputVertexName("vertex3")
-            .setOutputClass("o3_v4")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o3_v4"))
             .setOutputVertexName("vertex4")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e3")
@@ -304,9 +305,9 @@ public class TestDAGImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i5_v3")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i5_v3"))
             .setInputVertexName("vertex3")
-            .setOutputClass("o3_v5")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o3_v5"))
             .setOutputVertexName("vertex5")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e4")
@@ -315,9 +316,9 @@ public class TestDAGImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i6_v4")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4"))
             .setInputVertexName("vertex4")
-            .setOutputClass("o4")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o4"))
             .setOutputVertexName("vertex6")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e5")
@@ -326,9 +327,9 @@ public class TestDAGImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i6_v5")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5"))
             .setInputVertexName("vertex5")
-            .setOutputClass("o5")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o5"))
             .setOutputVertexName("vertex6")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e6")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e3af5b3..d932662 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -45,6 +45,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -244,7 +245,7 @@ public class TestVertexImpl {
             VertexPlan.newBuilder()
             .setName("vertex3")
             .setType(PlanVertexType.NORMAL)
-            .setProcessorName("x3.y3")
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
             .addTaskLocationHint(
                 PlanTaskLocationHint.newBuilder()
                 .addHost("host3")
@@ -337,9 +338,9 @@ public class TestVertexImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i3_v1")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i3_v1"))
             .setInputVertexName("vertex1")
-            .setOutputClass("o1")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
             .setOutputVertexName("vertex3")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e1")
@@ -348,9 +349,9 @@ public class TestVertexImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i3_v2")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i3_v2"))
             .setInputVertexName("vertex2")
-            .setOutputClass("o2")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
             .setOutputVertexName("vertex3")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e2")
@@ -359,9 +360,9 @@ public class TestVertexImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i4_v3")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i4_v3"))
             .setInputVertexName("vertex3")
-            .setOutputClass("o3_v4")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o3_v4"))
             .setOutputVertexName("vertex4")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e3")
@@ -370,9 +371,9 @@ public class TestVertexImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i5_v3")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i5_v3"))
             .setInputVertexName("vertex3")
-            .setOutputClass("o3_v5")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o3_v5"))
             .setOutputVertexName("vertex5")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e4")
@@ -381,9 +382,9 @@ public class TestVertexImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i6_v4")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4"))
             .setInputVertexName("vertex4")
-            .setOutputClass("o4")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o4"))
             .setOutputVertexName("vertex6")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e5")
@@ -392,9 +393,9 @@ public class TestVertexImpl {
             )
         .addEdge(
             EdgePlan.newBuilder()
-            .setInputClass("i6_v5")
+            .setInputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5"))
             .setInputVertexName("vertex5")
-            .setOutputClass("o5")
+            .setOutputDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("o5"))
             .setOutputVertexName("vertex6")
             .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
             .setId("e6")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 3ebb653..ab0c4a3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -50,19 +52,22 @@ public class TestVertexScheduler {
     TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
     EdgeProperty eProp1 = new EdgeProperty(
         EdgeProperty.ConnectionPattern.BIPARTITE,
-        EdgeProperty.SourceType.STABLE, "in", "out");
+        EdgeProperty.SourceType.STABLE, new InputDescriptor("in", null),
+        new OutputDescriptor("out", null));
     when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
     Vertex mockSrcVertex2 = mock(Vertex.class);
     TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2);
     EdgeProperty eProp2 = new EdgeProperty(
         EdgeProperty.ConnectionPattern.BIPARTITE,
-        EdgeProperty.SourceType.STABLE, "in", "out");
+        EdgeProperty.SourceType.STABLE, new InputDescriptor("in", null),
+        new OutputDescriptor("out", null));
     when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
     Vertex mockSrcVertex3 = mock(Vertex.class);
     TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3);
     EdgeProperty eProp3 = new EdgeProperty(
         EdgeProperty.ConnectionPattern.ONE_TO_ALL,
-        EdgeProperty.SourceType.STABLE, "in", "out");
+        EdgeProperty.SourceType.STABLE, new InputDescriptor("in", null),
+        new OutputDescriptor("out", null));
     when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
     
     Vertex mockManagedVertex = mock(Vertex.class);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3780729e/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index c5ec5f4..a78ab5e 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -95,6 +95,9 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -481,7 +484,9 @@ public class YARNRunner implements ClientProtocol {
     // Intermediate vertices start at 1.
     Vertex vertex = new Vertex(
         MultiStageMRConfigUtil.getIntermediateStageVertexName(stageNum),
-        "org.apache.tez.mapreduce.processor.reduce.ReduceProcessor", numTasks);
+        new ProcessorDescriptor(
+            "org.apache.tez.mapreduce.processor.reduce.ReduceProcessor", null),
+        numTasks);
 
     Map<String, String> reduceEnv = new HashMap<String, String>();
     setupMapReduceEnv(conf, reduceEnv, false);
@@ -521,7 +526,7 @@ public class YARNRunner implements ClientProtocol {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding intermediate vertex to DAG"
             + ", vertexName=" + vertices[i].getVertexName()
-            + ", processor=" + vertices[i].getProcessorName()
+            + ", processor=" + vertices[i].getProcessorDescriptor().getClassName()
             + ", parrellism=" + vertices[i].getParallelism()
             + ", javaOpts=" + vertices[i].getJavaOpts());
       }
@@ -553,7 +558,7 @@ public class YARNRunner implements ClientProtocol {
     String mapProcessor = "org.apache.tez.mapreduce.processor.map.MapProcessor";
     Vertex mapVertex = new Vertex(
         MultiStageMRConfigUtil.getInitialMapVertexName(),
-        mapProcessor, numMaps);
+        new ProcessorDescriptor(mapProcessor, null), numMaps);
 
     // FIXME set up map environment
     Map<String, String> mapEnv = new HashMap<String, String>();
@@ -581,7 +586,7 @@ public class YARNRunner implements ClientProtocol {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding map vertex to DAG"
           + ", vertexName=" + mapVertex.getVertexName()
-          + ", processor=" + mapVertex.getProcessorName()
+          + ", processor=" + mapVertex.getProcessorDescriptor().getClassName()
           + ", parrellism=" + mapVertex.getParallelism()
           + ", javaOpts=" + mapVertex.getJavaOpts());
     }
@@ -600,7 +605,7 @@ public class YARNRunner implements ClientProtocol {
           "org.apache.tez.mapreduce.processor.reduce.ReduceProcessor";
       Vertex reduceVertex = new Vertex(
           MultiStageMRConfigUtil.getFinalReduceVertexName(),
-          reduceProcessor, numReduces);
+          new ProcessorDescriptor(reduceProcessor, null), numReduces);
 
       // FIXME set up reduce environment
       Map<String, String> reduceEnv = new HashMap<String, String>();
@@ -626,7 +631,7 @@ public class YARNRunner implements ClientProtocol {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding reduce vertex to DAG"
             + ", vertexName=" + reduceVertex.getVertexName()
-            + ", processor=" + reduceVertex.getProcessorName()
+            + ", processor=" + reduceVertex.getProcessorDescriptor().getClassName()
             + ", parrellism=" + reduceVertex.getParallelism()
             + ", javaOpts=" + reduceVertex.getJavaOpts());
       }
@@ -635,8 +640,8 @@ public class YARNRunner implements ClientProtocol {
       EdgeProperty edgeProperty =
           new EdgeProperty(ConnectionPattern.BIPARTITE,
               SourceType.STABLE,
-              ShuffledMergedInput.class.getName(),
-              OnFileSortedOutput.class.getName());
+              new InputDescriptor(ShuffledMergedInput.class.getName(), null),
+              new OutputDescriptor(OnFileSortedOutput.class.getName(), null));
       Edge edge = null;
       if (!isMRR) {
         edge = new Edge(mapVertex, reduceVertex, edgeProperty);


Mime
View raw message