tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/4] TEZ-1246. Replace constructors with create() methods for DAG, Vertex, Edge etc in the API. (sseth)
Date Tue, 19 Aug 2014 05:40:50 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 7ce0de3..9dd0a9a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.codehaus.jettison.json.JSONException;
@@ -53,33 +54,34 @@ public class TestDAGUtils {
     Configuration conf = new Configuration(false);
     int dummyTaskCount = 1;
     Resource dummyTaskResource = Resource.newInstance(1, 1);
-    org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
-        new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"),
+    org.apache.tez.dag.api.Vertex v1 = Vertex.create("vertex1",
+        ProcessorDescriptor.create("Processor").setHistoryText("vertex1 Processor HistoryText"),
         dummyTaskCount, dummyTaskResource);
-    v1.addDataSource("input1", new DataSourceDescriptor(new InputDescriptor(
+    v1.addDataSource("input1", DataSourceDescriptor.create(InputDescriptor.create(
         "input.class").setHistoryText("input HistoryText"), null, null));
-    org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
-        new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"),
+    org.apache.tez.dag.api.Vertex v2 = Vertex.create("vertex2",
+        ProcessorDescriptor.create("Processor").setHistoryText("vertex2 Processor HistoryText"),
         dummyTaskCount, dummyTaskResource);
-    org.apache.tez.dag.api.Vertex v3 = new org.apache.tez.dag.api.Vertex("vertex3",
-        new ProcessorDescriptor("Processor").setHistoryText("vertex3 Processor HistoryText"),
+    org.apache.tez.dag.api.Vertex v3 = Vertex.create("vertex3",
+        ProcessorDescriptor.create("Processor").setHistoryText("vertex3 Processor HistoryText"),
         dummyTaskCount, dummyTaskResource);
 
     DAG dag = new DAG("testDag");
     String groupName1 = "uv12";
     org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
-    OutputDescriptor outDesc = new OutputDescriptor("output.class")
+    OutputDescriptor outDesc = OutputDescriptor.create("output.class")
         .setHistoryText("uvOut HistoryText");
-    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(OutputCommitter.class.getName());
+    OutputCommitterDescriptor ocd =
+        OutputCommitterDescriptor.create(OutputCommitter.class.getName());
     uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
     v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
 
-    GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+    GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
-            new OutputDescriptor("dummy output class").setHistoryText("Dummy History Text"),
-            new InputDescriptor("dummy input class").setHistoryText("Dummy History Text")),
-        new InputDescriptor("merge.class").setHistoryText("Merge HistoryText"));
+            OutputDescriptor.create("dummy output class").setHistoryText("Dummy History Text"),
+            InputDescriptor.create("dummy input class").setHistoryText("Dummy History Text")),
+        InputDescriptor.create("merge.class").setHistoryText("Merge HistoryText"));
 
     dag.addVertex(v1);
     dag.addVertex(v2);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index 4e012c5..d53ae9f 100644
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -691,8 +691,8 @@ public class TestTaskExecution {
     TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
     TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
     TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
-    ProcessorDescriptor processorDescriptor = new ProcessorDescriptor(processorClass)
-        .setUserPayload(new UserPayload(ByteBuffer.wrap(processorConf)));
+    ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
     TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", processorDescriptor,
         new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-examples/src/main/java/org/apache/tez/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/IntersectDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/IntersectDataGen.java
index ec51cf1..435a914 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/IntersectDataGen.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/IntersectDataGen.java
@@ -126,7 +126,7 @@ public class IntersectDataGen extends Configured implements Tool {
   }
   
   private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezSession = new TezClient("IntersectDataGenSession", tezConf);
+    TezClient tezSession = TezClient.create("IntersectDataGenSession", tezConf);
     tezSession.start();
     return tezSession;
   }
@@ -200,9 +200,9 @@ public class IntersectDataGen extends Configured implements Tool {
 
     DAG dag = new DAG("IntersectDataGen");
 
-    Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
+    Vertex genDataVertex = Vertex.create("datagen", ProcessorDescriptor.create(
         GenDataProcessor.class.getName()).setUserPayload(
-        new UserPayload(ByteBuffer.wrap(GenDataProcessor.createConfiguration(largeOutSizePerTask,
+        UserPayload.create(ByteBuffer.wrap(GenDataProcessor.createConfiguration(largeOutSizePerTask,
             smallOutSizePerTask)))), numTasks);
     genDataVertex.addDataSink(STREAM_OUTPUT_NAME, 
         MROutput.createConfigBuilder(new Configuration(tezConf),

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-examples/src/main/java/org/apache/tez/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/IntersectExample.java b/tez-examples/src/main/java/org/apache/tez/examples/IntersectExample.java
index 90a232a..bf5897d 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/IntersectExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/IntersectExample.java
@@ -125,7 +125,7 @@ public class IntersectExample extends Configured implements Tool {
   }
   
   private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezSession = new TezClient("IntersectExampleSession", tezConf);
+    TezClient tezSession = TezClient.create("IntersectExampleSession", tezConf);
     tezSession.start();
     return tezSession;
   }
@@ -183,28 +183,28 @@ public class IntersectExample extends Configured implements Tool {
                 HashPartitioner.class.getName()).build();
 
     // Change the way resources are setup - no MRHelpers
-    Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(
+    Vertex streamFileVertex = Vertex.create("partitioner1", ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource(
         "streamfile",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 streamPath.toUri().toString()).groupSplits(false).build());
 
-    Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
+    Vertex hashFileVertex = Vertex.create("partitioner2", ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource(
         "hashfile",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 hashPath.toUri().toString()).groupSplits(false).build());
 
-    Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
+    Vertex intersectVertex = Vertex.create("intersect", ProcessorDescriptor.create(
         IntersectProcessor.class.getName()), numPartitions).addDataSink("finalOutput",
         MROutput.createConfigBuilder(new Configuration(tezConf),
             TextOutputFormat.class, outPath.toUri().toString()).build());
 
-    Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
+    Edge e1 = Edge.create(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
 
-    Edge e2 = new Edge(hashFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
+    Edge e2 = Edge.create(hashFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
 
     dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(intersectVertex)
         .addEdge(e1).addEdge(e2);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-examples/src/main/java/org/apache/tez/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/IntersectValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/IntersectValidate.java
index d86fd77..24ba2df 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/IntersectValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/IntersectValidate.java
@@ -125,7 +125,7 @@ public class IntersectValidate extends Configured implements Tool {
   }
   
   private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezSession = new TezClient("IntersectValidateSession", tezConf);
+    TezClient tezSession = TezClient.create("IntersectValidateSession", tezConf);
     tezSession.start();
     return tezSession;
   }
@@ -188,23 +188,23 @@ public class IntersectValidate extends Configured implements Tool {
         .newBuilder(Text.class.getName(), NullWritable.class.getName(),
             HashPartitioner.class.getName()).build();
 
-    Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
+    Vertex lhsVertex = Vertex.create(LHS_INPUT_NAME, ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource("lhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 lhs.toUri().toString()).groupSplits(false).build());
 
-    Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
+    Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource("rhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 rhs.toUri().toString()).groupSplits(false).build());
 
-    Vertex intersectValidateVertex = new Vertex("intersectvalidate", new ProcessorDescriptor(
+    Vertex intersectValidateVertex = Vertex.create("intersectvalidate", ProcessorDescriptor.create(
         IntersectValidateProcessor.class.getName()), numPartitions);
 
-    Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
-    Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
+    Edge e1 = Edge.create(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
+    Edge e2 = Edge.create(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
 
     dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(intersectValidateVertex).addEdge(e1)
         .addEdge(e2);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index 0bba651..9b3d864 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -121,7 +121,7 @@ public class OrderedWordCount extends Configured implements Tool  {
     DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
         TextOutputFormat.class, outputPath).build();
 
-    Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
+    Vertex tokenizerVertex = Vertex.create("Tokenizer", ProcessorDescriptor.create(
         TokenProcessor.class.getName()));
     tokenizerVertex.addDataSource("MRInput", dataSource);
 
@@ -132,7 +132,7 @@ public class OrderedWordCount extends Configured implements Tool  {
 
     // This vertex will be reading intermediate data via an input edge and writing intermediate data
     // via an output edge.
-    Vertex summationVertex = new Vertex("Summation", new ProcessorDescriptor(
+    Vertex summationVertex = Vertex.create("Summation", ProcessorDescriptor.create(
         SumProcessor.class.getName()), numPartitions);
     
     // Use IntWritable key and Text value to bring all words with the same count in the same 
@@ -143,7 +143,7 @@ public class OrderedWordCount extends Configured implements Tool  {
 
     // Use 1 task to bring all the data in one place for global sorted order. Essentially the number
     // of partitions is 1. So the NoOpSorter can be used to produce the globally ordered output
-    Vertex sorterVertex = new Vertex("Sorter", new ProcessorDescriptor(
+    Vertex sorterVertex = Vertex.create("Sorter", ProcessorDescriptor.create(
         NoOpSorter.class.getName()), 1);
     sorterVertex.addDataSink("MROutput", dataSink);
 
@@ -154,9 +154,10 @@ public class OrderedWordCount extends Configured implements Tool  {
         .addVertex(summationVertex)
         .addVertex(sorterVertex)
         .addEdge(
-            new Edge(tokenizerVertex, summationVertex, summationEdgeConf.createDefaultEdgeProperty()))
+            Edge.create(tokenizerVertex, summationVertex,
+                summationEdgeConf.createDefaultEdgeProperty()))
         .addEdge(
-            new Edge(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));
+            Edge.create(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));
     return dag;  
   }
   
@@ -175,7 +176,7 @@ public class OrderedWordCount extends Configured implements Tool  {
       tezConf = new TezConfiguration();
     }
     
-    TezClient tezClient = new TezClient("OrderedWordCount", tezConf);
+    TezClient tezClient = TezClient.create("OrderedWordCount", tezConf);
     tezClient.start();
 
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
index f4ed5b3..fd70dad 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
@@ -57,7 +57,7 @@ public class SimpleSessionExample extends Configured implements Tool {
     // session mode then it can do so in code directly using the appropriate constructor
     
     // tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); // via config OR via code
-    TezClient tezClient = new TezClient("SimpleSessionExample", tezConf, true);
+    TezClient tezClient = TezClient.create("SimpleSessionExample", tezConf, true);
     tezClient.start();
     
     // Session pre-warming allows the user to hide initial startup, resource acquisition latency etc.

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
index ddf0953..52cff91 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
@@ -150,7 +150,7 @@ public class WordCount extends Configured implements Tool {
     // Create a vertex that reads the data from the data source and tokenizes it using the 
     // TokenProcessor. The number of tasks that will do the work for this vertex will be decided 
     // using the information provided by the data source descriptor.
-    Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
+    Vertex tokenizerVertex = Vertex.create("Tokenizer", ProcessorDescriptor.create(
         TokenProcessor.class.getName())).addDataSource("Input", dataSource);
 
     // Create the edge that represents the movement and semantics of data between the producer 
@@ -172,8 +172,8 @@ public class WordCount extends Configured implements Tool {
     // The number of tasks that do the work of this vertex depends on the number of partitions used 
     // to distribute the sum processing. In this case, its been made configurable via the 
     // numPartitions parameter.
-    Vertex summationVertex = new Vertex("Summation",
-        new ProcessorDescriptor(SumProcessor.class.getName()), numPartitions)
+    Vertex summationVertex = Vertex.create("Summation",
+        ProcessorDescriptor.create(SumProcessor.class.getName()), numPartitions)
         .addDataSink("Output", dataSink);
 
     // No need to add jar containing this class as assumed to be part of the Tez jars. Otherwise 
@@ -184,7 +184,7 @@ public class WordCount extends Configured implements Tool {
     dag.addVertex(tokenizerVertex)
         .addVertex(summationVertex)
         .addEdge(
-            new Edge(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
+            Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
     return dag;  
   }
 
@@ -205,7 +205,7 @@ public class WordCount extends Configured implements Tool {
 
     // Create the TezClient to submit the DAG. Pass the tezConf that has all necessary global and 
     // dag specific configurations
-    TezClient tezClient = new TezClient("WordCount", tezConf);
+    TezClient tezClient = TezClient.create("WordCount", tezConf);
     // TezClient must be started before it can be used
     tezClient.start();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index a83b500..94d5818 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -416,7 +416,8 @@ public class YARNRunner implements ClientProtocol {
     stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
     
     UserPayload vertexUserPayload = TezUtils.createUserPayloadFromConf(stageConf);
-    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).setUserPayload(vertexUserPayload),
+    Vertex vertex = Vertex.create(vertexName,
+        ProcessorDescriptor.create(processorName).setUserPayload(vertexUserPayload),
         numTasks, taskResource);
     if (isMap) {
       vertex.addDataSource("MRInput",
@@ -424,10 +425,10 @@ public class YARNRunner implements ClientProtocol {
     }
     // Map only jobs.
     if (stageNum == totalStages -1) {
-      OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
+      OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
           .setUserPayload(vertexUserPayload);
       vertex.addDataSink("MROutput", new DataSinkDescriptor(od,
-          new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
+          OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
     }
 
     Map<String, String> taskEnv = new HashMap<String, String>();
@@ -444,7 +445,7 @@ public class YARNRunner implements ClientProtocol {
 
     vertex.setTaskEnvironment(taskEnv)
         .setTaskLocalFiles(taskLocalResources)
-        .setLocationHint(new VertexLocationHint(locations))
+        .setLocationHint(VertexLocationHint.create(locations))
         .setTaskLaunchCmdOpts(taskJavaOpts);
     
     if (!isMap) {
@@ -505,7 +506,7 @@ public class YARNRunner implements ClientProtocol {
                 MRPartitioner.class.getName(), partitionerConf)
                 .configureInput().useLegacyInput().done()
                 .setFromConfiguration(stageConfs[i - 1]).build();
-        Edge edge = new Edge(vertices[i-1], vertices[i], edgeConf.createDefaultEdgeProperty());
+        Edge edge = Edge.create(vertices[i - 1], vertices[i], edgeConf.createDefaultEdgeProperty());
         dag.addEdge(edge);
       }
 
@@ -796,14 +797,14 @@ public class YARNRunner implements ClientProtocol {
     InputDescriptor inputDescriptor;
 
     try {
-      inputDescriptor = new InputDescriptor(useLegacyInput ? MRInputLegacy.class
+      inputDescriptor = InputDescriptor.create(useLegacyInput ? MRInputLegacy.class
           .getName() : MRInput.class.getName())
           .setUserPayload(MRInputHelpersInternal.createMRInputPayload(conf, null));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
 
-    DataSourceDescriptor dsd = new DataSourceDescriptor(inputDescriptor, null, null);
+    DataSourceDescriptor dsd = DataSourceDescriptor.create(inputDescriptor, null, null);
     return dsd;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index 88aed3c..a2777a8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
@@ -132,8 +131,9 @@ public class MRInputAMSplitGenerator extends InputInitializer {
     List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo
         .getNumTasks() + 1);
     
-    InputConfigureVertexTasksEvent configureVertexEvent = new InputConfigureVertexTasksEvent(
-        inputSplitInfo.getNumTasks(), new VertexLocationHint(inputSplitInfo.getTaskLocationHints()),
+    InputConfigureVertexTasksEvent configureVertexEvent = InputConfigureVertexTasksEvent.create(
+        inputSplitInfo.getNumTasks(),
+        VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
         InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     events.add(configureVertexEvent);
 
@@ -142,7 +142,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
       int count = 0;
       for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
         // Unnecessary array copy, can be avoided by using ByteBuffer instead of a raw array.
-        InputDataInformationEvent diEvent = new InputDataInformationEvent(count++,
+        InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++,
             mrSplit.toByteArray());
         events.add(diEvent);
       }
@@ -150,12 +150,12 @@ public class MRInputAMSplitGenerator extends InputInitializer {
       int count = 0;
       if (inputSplitInfo.holdsNewFormatSplits()) {
         for (org.apache.hadoop.mapreduce.InputSplit split : inputSplitInfo.getNewFormatSplits()) {
-          InputDataInformationEvent diEvent = new InputDataInformationEvent(count++, split);
+          InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, split);
           events.add(diEvent);
         }
       } else {
         for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
-          InputDataInformationEvent diEvent = new InputDataInformationEvent(count++, split);
+          InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, split);
           events.add(diEvent);
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index f23042f..1c28c2b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -95,7 +95,7 @@ public class MRInputSplitDistributor extends InputInitializer {
     updatedPayloadBuilder.clearSplits();
 
     List<Event> events = Lists.newArrayListWithCapacity(this.splitsProto.getSplitsCount() + 1);
-    InputUpdatePayloadEvent updatePayloadEvent = new InputUpdatePayloadEvent(
+    InputUpdatePayloadEvent updatePayloadEvent = InputUpdatePayloadEvent.create(
         updatedPayloadBuilder.build().toByteArray());
 
     events.add(updatePayloadEvent);
@@ -108,16 +108,16 @@ public class MRInputSplitDistributor extends InputInitializer {
       if (sendSerializedEvents) {
         // Unnecessary array copy, can be avoided by using ByteBuffer instead of
         // a raw array.
-        diEvent = new InputDataInformationEvent(count++, mrSplit.toByteArray());
+        diEvent = InputDataInformationEvent.create(count++, mrSplit.toByteArray());
       } else {
         if (useNewApi) {
           org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils
               .getNewSplitDetailsFromEvent(mrSplit, conf);
-          diEvent = new InputDataInformationEvent(count++, newInputSplit);
+          diEvent = InputDataInformationEvent.create(count++, newInputSplit);
         } else {
           org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils
               .getOldSplitDetailsFromEvent(mrSplit, conf);
-          diEvent = new InputDataInformationEvent(count++, oldInputSplit);
+          diEvent = InputDataInformationEvent.create(count++, oldInputSplit);
         }
       }
       events.add(diEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index bff94e5..71d294f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -107,16 +107,16 @@ public class MRInputHelpers {
     try {
       inputSplitInfo = generateInputSplits(conf, splitsDir);
 
-      InputDescriptor inputDescriptor = new InputDescriptor(useLegacyInput ? MRInputLegacy.class
+      InputDescriptor inputDescriptor = InputDescriptor.create(useLegacyInput ? MRInputLegacy.class
           .getName() : MRInput.class.getName())
           .setUserPayload(createMRInputPayload(conf, null));
       Map<String, LocalResource> additionalLocalResources = new HashMap<String, LocalResource>();
       updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
           additionalLocalResources);
       DataSourceDescriptor dsd =
-          new DataSourceDescriptor(inputDescriptor, null, inputSplitInfo.getNumTasks(),
+          DataSourceDescriptor.create(inputDescriptor, null, inputSplitInfo.getNumTasks(),
               inputSplitInfo.getCredentials(),
-              new VertexLocationHint(inputSplitInfo.getTaskLocationHints()),
+              VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
               additionalLocalResources);
       return dsd;
     } catch (IOException e) {
@@ -693,7 +693,7 @@ public class MRInputHelpers {
       userPayloadBuilder.setSplits(mrSplitsProto);
     }
     userPayloadBuilder.setGroupingEnabled(isGrouped);
-    return new UserPayload(userPayloadBuilder.build().toByteString().asReadOnlyByteBuffer());
+    return UserPayload.create(userPayloadBuilder.build().toByteString().asReadOnlyByteBuffer());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index cdcaba0..ebc2d45 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -246,11 +246,11 @@ public class MRInput extends MRInputBase {
       if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
         credentials = inputSplitInfo.getCredentials();
       }
-      return new DataSourceDescriptor(
-          new InputDescriptor(inputClassName).setUserPayload(payload),
-          new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()),
-          inputSplitInfo.getNumTasks(), credentials, 
-          new VertexLocationHint(inputSplitInfo.getTaskLocationHints()), null);
+      return DataSourceDescriptor.create(
+          InputDescriptor.create(inputClassName).setUserPayload(payload),
+          InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
+          inputSplitInfo.getNumTasks(), credentials,
+          VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), null);
     }
 
     private DataSourceDescriptor createCustomDataSource() throws IOException {
@@ -267,8 +267,9 @@ public class MRInput extends MRInputBase {
         payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
       }
 
-      return new DataSourceDescriptor(new InputDescriptor(inputClassName).setUserPayload(payload),
-          customInitializerDescriptor, credentials);
+      return DataSourceDescriptor
+          .create(InputDescriptor.create(inputClassName).setUserPayload(payload),
+              customInitializerDescriptor, credentials);
     }
 
     private DataSourceDescriptor createGeneratorDataSource() throws IOException {
@@ -283,9 +284,9 @@ public class MRInput extends MRInputBase {
       } else {
         payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
       }
-      return new DataSourceDescriptor(
-          new InputDescriptor(inputClassName).setUserPayload(payload),
-          new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()), credentials);
+      return DataSourceDescriptor.create(
+          InputDescriptor.create(inputClassName).setUserPayload(payload),
+          InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), credentials);
     }
 
     private void setupBasicConf(Configuration inputConf) {

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index a745a7f..fd0ea46 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -173,8 +173,8 @@ public class MROutput extends AbstractLogicalOutput {
       }
 
       return new DataSinkDescriptor(
-          new OutputDescriptor(outputClassName).setUserPayload(createUserPayload()),
-          (doCommit ? new OutputCommitterDescriptor(
+          OutputDescriptor.create(outputClassName).setUserPayload(createUserPayload()),
+          (doCommit ? OutputCommitterDescriptor.create(
               MROutputCommitter.class.getName()) : null), credentials);
     }
     

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index c91537c..ebc12d4 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -68,7 +68,8 @@ public class TestMRInputSplitDistributor {
     MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
     payloadProto.setSplits(splitsProtoBuilder.build());
     payloadProto.setConfigurationBytes(confByteString);
-    UserPayload userPayload = new UserPayload(payloadProto.build().toByteString().asReadOnlyByteBuffer());
+    UserPayload userPayload =
+        UserPayload.create(payloadProto.build().toByteString().asReadOnlyByteBuffer());
 
     InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
     MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
@@ -116,7 +117,8 @@ public class TestMRInputSplitDistributor {
     MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
     payloadProto.setSplits(splitsProtoBuilder.build());
     payloadProto.setConfigurationBytes(confByteString);
-    UserPayload userPayload = new UserPayload(payloadProto.build().toByteString().asReadOnlyByteBuffer());
+    UserPayload userPayload =
+        UserPayload.create(payloadProto.build().toByteString().asReadOnlyByteBuffer());
 
     InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
     MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
@@ -152,7 +154,7 @@ public class TestMRInputSplitDistributor {
 
     TezRootInputInitializerContextForTest(UserPayload payload) throws IOException {
       appId = ApplicationId.newInstance(1000, 200);
-      this.payload = payload == null ? new UserPayload(null) : payload;
+      this.payload = payload == null ? UserPayload.create(null) : payload;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 6e87011..f418bbb 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -111,7 +111,7 @@ public class TestMultiMRInput {
     assertEquals(1, splits.length);
 
     MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
-    InputDataInformationEvent event = new InputDataInformationEvent(0, splitProto.toByteArray());
+    InputDataInformationEvent event = InputDataInformationEvent.create(0, splitProto.toByteArray());
 
     eventList.clear();
     eventList.add(event);
@@ -170,10 +170,12 @@ public class TestMultiMRInput {
     assertEquals(2, splits.length);
 
     MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]);
-    InputDataInformationEvent event1 = new InputDataInformationEvent(0, splitProto1.toByteArray());
+    InputDataInformationEvent event1 =
+        InputDataInformationEvent.create(0, splitProto1.toByteArray());
 
     MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]);
-    InputDataInformationEvent event2 = new InputDataInformationEvent(0, splitProto2.toByteArray());
+    InputDataInformationEvent event2 =
+        InputDataInformationEvent.create(0, splitProto2.toByteArray());
 
     eventList.clear();
     eventList.add(event1);
@@ -221,8 +223,10 @@ public class TestMultiMRInput {
     assertEquals(1, splits.length);
 
     MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
-    InputDataInformationEvent event1 = new InputDataInformationEvent(0, splitProto.toByteArray());
-    InputDataInformationEvent event2 = new InputDataInformationEvent(1, splitProto.toByteArray());
+    InputDataInformationEvent event1 =
+        InputDataInformationEvent.create(0, splitProto.toByteArray());
+    InputDataInformationEvent event2 =
+        InputDataInformationEvent.create(1, splitProto.toByteArray());
 
     eventList.clear();
     eventList.add(event1);
@@ -251,7 +255,7 @@ public class TestMultiMRInput {
     doReturn(1).when(inputContext).getTaskIndex();
     doReturn(1).when(inputContext).getTaskVertexIndex();
     doReturn("taskVertexName").when(inputContext).getTaskVertexName();
-    doReturn(new UserPayload(ByteBuffer.wrap(payload))).when(inputContext).getUserPayload();
+    doReturn(UserPayload.create(ByteBuffer.wrap(payload))).when(inputContext).getUserPayload();
     return inputContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 028820d..eccc343 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -199,7 +199,7 @@ public class MapUtils {
       List<OutputSpec> outputSpecs) throws Exception {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
 
-    ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor(
+    ProcessorDescriptor mapProcessorDesc = ProcessorDescriptor.create(
         MapProcessor.class.getName()).setUserPayload(
         TezUtils.createUserPayloadFromConf(jobConf));
     

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 66d81c7..caa2663 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -124,13 +124,14 @@ public class TestMapProcessor {
     MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
-        new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(new UserPayload(ByteBuffer.wrap(
+        InputDescriptor.create(MRInputLegacy.class.getName())
+            .setUserPayload(UserPayload.create(ByteBuffer.wrap(
                 MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
-                    .setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build().toByteArray()))),
+                    .setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
+                    .toByteArray()))),
         1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
-        new OutputDescriptor(LocalOnFileSorterOutput.class.getName())
+        OutputDescriptor.create(LocalOnFileSorterOutput.class.getName())
             .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
 
     LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 7cbea59..3021149 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -132,14 +132,14 @@ public class TestReduceProcessor {
     MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
-        new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(new UserPayload(ByteBuffer.wrap(
+        InputDescriptor.create(MRInputLegacy.class.getName())
+            .setUserPayload(UserPayload.create(ByteBuffer.wrap(
                 MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
                     .setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
                     .toByteArray()))),
         1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
-        new OutputDescriptor(LocalOnFileSorterOutput.class.getName()).
+        OutputDescriptor.create(LocalOnFileSorterOutput.class.getName()).
           setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
     // Run a map
     LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
@@ -159,15 +159,15 @@ public class TestReduceProcessor {
     jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
     FileOutputFormat.setOutputPath(jobConf, new Path(workDir, "output"));
-    ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
+    ProcessorDescriptor reduceProcessorDesc = ProcessorDescriptor.create(
         ReduceProcessor.class.getName()).setUserPayload(
         TezUtils.createUserPayloadFromConf(jobConf));
     
     InputSpec reduceInputSpec = new InputSpec(mapVertexName,
-        new InputDescriptor(LocalMergedInput.class.getName())
+        InputDescriptor.create(LocalMergedInput.class.getName())
             .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
     OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex",
-        new OutputDescriptor(MROutputLegacy.class.getName())
+        OutputDescriptor.create(MROutputLegacy.class.getName())
             .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
 
     // Now run a reduce

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index b682941..4419b53 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -44,7 +44,7 @@ public class ProtoConverters {
 
   public static DataMovementEvent convertDataMovementEventFromProto(
       EventProtos.DataMovementEventProto proto) {
-    return new DataMovementEvent(proto.getSourceIndex(),
+    return DataMovementEvent.create(proto.getSourceIndex(),
         proto.getTargetIndex(),
         proto.getVersion(),
         proto.getUserPayload() != null ?
@@ -65,7 +65,7 @@ public class ProtoConverters {
 
   public static CompositeDataMovementEvent convertCompositeDataMovementEventFromProto(
       EventProtos.CompositeEventProto proto) {
-    return new CompositeDataMovementEvent(proto.getStartIndex(),
+    return CompositeDataMovementEvent.create(proto.getStartIndex(),
         proto.getCount(),
         proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
   }
@@ -82,7 +82,7 @@ public class ProtoConverters {
   
   public static VertexManagerEvent convertVertexManagerEventFromProto(
       EventProtos.VertexManagerEventProto vmProto) {
-    return new VertexManagerEvent(vmProto.getTargetVertexName(),
+    return VertexManagerEvent.create(vmProto.getTargetVertexName(),
         vmProto.hasUserPayload() ? vmProto.getUserPayload().toByteArray() : null);
   }
 
@@ -101,8 +101,9 @@ public class ProtoConverters {
   public static InputDataInformationEvent
       convertRootInputDataInformationEventFromProto(
       EventProtos.RootInputDataInformationEventProto proto) {
-    InputDataInformationEvent diEvent = new InputDataInformationEvent(
-        proto.getSourceIndex(), proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
+    InputDataInformationEvent diEvent = InputDataInformationEvent.create(
+        proto.getSourceIndex(),
+        proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
     diEvent.setTargetIndex(proto.getTargetIndex());
     return diEvent;
   }
@@ -113,7 +114,6 @@ public class ProtoConverters {
         EventProtos.RootInputInitializerEventProto.newBuilder();
     builder.setTargetVertexName(event.getTargetVertexName());
     builder.setTargetInputName(event.getTargetInputName());
-    builder.setVersion(event.getVersion());
     if (event.getUserPayload() != null) {
       builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
     }
@@ -123,9 +123,8 @@ public class ProtoConverters {
   public static InputInitializerEvent convertRootInputInitializerEventFromProto(
       EventProtos.RootInputInitializerEventProto proto) {
     InputInitializerEvent event =
-        new InputInitializerEvent(proto.getTargetVertexName(), proto.getTargetInputName(),
-            (proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null),
-            proto.getVersion());
+        InputInitializerEvent.create(proto.getTargetVertexName(), proto.getTargetInputName(),
+            (proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null));
     return event;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 70bfad7..974e190 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -213,7 +213,7 @@ public class TezEvent implements Writable {
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEventProto ideProto =
             InputReadErrorEventProto.parseFrom(eventBytes);
-        event = new InputReadErrorEvent(ideProto.getDiagnostics(),
+        event = InputReadErrorEvent.create(ideProto.getDiagnostics(),
             ideProto.getIndex(), ideProto.getVersion());
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
@@ -227,7 +227,7 @@ public class TezEvent implements Writable {
       case INPUT_FAILED_EVENT:
         InputFailedEventProto ifProto =
             InputFailedEventProto.parseFrom(eventBytes);
-        event = new InputFailedEvent(ifProto.getTargetIndex(), ifProto.getVersion());
+        event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion());
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
         RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 3945d4b..36d2cad 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -74,7 +74,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
     checkNotNull(sourceVertexName, "sourceVertexName is null");
     checkNotNull(inputs, "input map is null");
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
+    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
     this.inputIndex = inputIndex;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index 374fbe3..693bde8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -46,7 +46,7 @@ public class TezMergedInputContextImpl implements MergedInputContext {
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.groupInputName = groupInputName;
     this.groupInputsMap = groupInputsMap;
-    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
+    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
     this.inputReadyTracker = inputReadyTracker;
     this.workDirs = workDirs;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 3e88080..e02f41b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -67,7 +67,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
         auxServiceEnv, memDist, outputDescriptor, objectRegistry);
     checkNotNull(outputIndex, "outputIndex is null");
     checkNotNull(destinationVertexName, "destinationVertexName is null");
-    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
+    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
     this.outputIndex = outputIndex;
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 2e8e622..22b9336 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -62,7 +62,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, processorDescriptor, objectRegistry);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
+    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
     this.inputReadyTracker = inputReadyTracker;

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 0429218..d4374f9 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -108,19 +108,19 @@ public class TestLogicalIOProcessorRuntimeTask {
   }
 
   private List<InputSpec> createInputSpecList() {
-    InputDescriptor inputDesc = new InputDescriptor(TestInput.class.getName());
+    InputDescriptor inputDesc = InputDescriptor.create(TestInput.class.getName());
     InputSpec inputSpec = new InputSpec("inedge", inputDesc, 1);
     return Lists.newArrayList(inputSpec);
   }
 
   private List<OutputSpec> createOutputSpecList() {
-    OutputDescriptor outputtDesc = new OutputDescriptor(TestOutput.class.getName());
+    OutputDescriptor outputtDesc = OutputDescriptor.create(TestOutput.class.getName());
     OutputSpec outputSpec = new OutputSpec("outedge", outputtDesc, 1);
     return Lists.newArrayList(outputSpec);
   }
 
   private ProcessorDescriptor createProcessorDescriptor() {
-    ProcessorDescriptor desc = new ProcessorDescriptor(TestProcessor.class.getName());
+    ProcessorDescriptor desc = ProcessorDescriptor.create(TestProcessor.class.getName());
     return desc;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index d98362c..01de0bd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -280,7 +280,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
 
     public UserPayload toUserPayload() {
-      return new UserPayload(
+      return UserPayload.create(
           ByteBuffer.wrap(ShuffleEdgeManagerConfigPayloadProto.newBuilder()
               .setNumSourceTaskOutputs(numSourceTaskOutputs)
               .setNumDestinationTasks(numDestinationTasks)
@@ -441,7 +441,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
                 ((remainderRangeForLastShuffler > 0) ?
                     remainderRangeForLastShuffler : basePartitionRange));
         EdgeManagerPluginDescriptor edgeManagerDescriptor =
-            new EdgeManagerPluginDescriptor(CustomShuffleEdgeManager.class.getName());
+            EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
         edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload());
         edgeManagers.put(vertex, edgeManagerDescriptor);
       }
@@ -660,7 +660,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
     public VertexManagerPluginDescriptor build() {
       VertexManagerPluginDescriptor desc =
-          new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+          VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
 
       try {
         return desc.setUserPayload(TezUtils.createUserPayloadFromConf(this.conf));

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index b530767..86883dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -291,10 +291,10 @@ class ShuffleScheduler {
               inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
               srcAttempt.getAttemptNumber()) + " to AM.");
       List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-      failedEvents.add(new InputReadErrorEvent("Fetch failure for "
+      failedEvents.add(InputReadErrorEvent.create("Fetch failure for "
           + TezRuntimeUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
-              srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
+          inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
+          srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
           .getInputIndex(), srcAttempt.getAttemptNumber()));
 
       inputContext.sendEvents(failedEvents);      

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index a7a0451..1eb09bc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -490,7 +490,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       payloadBuidler.setPathComponent(outputContext.getUniqueIdentifier());
     }
 
-    CompositeDataMovementEvent cDme = new CompositeDataMovementEvent(0, numPartitions,
+    CompositeDataMovementEvent cDme = CompositeDataMovementEvent.create(0, numPartitions,
         payloadBuidler.build().toByteArray());
     return cDme;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
index 090b12f..c495c12 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
@@ -117,11 +117,11 @@ public class OrderedPartitionedKVEdgeConfig
    * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
    */
   public EdgeProperty createDefaultEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER,
+    EdgeProperty edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
+        OutputDescriptor.create(
             getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
+        InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
     return edgeProperty;
   }
@@ -136,10 +136,10 @@ public class OrderedPartitionedKVEdgeConfig
   public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
     Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
     EdgeProperty edgeProperty =
-        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
+        EdgeProperty.create(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
             EdgeProperty.SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
-            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
+            OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
+            InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
     return edgeProperty;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
index 47c0c76..66b2ded 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
@@ -98,11 +98,11 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
    * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
    */
   public EdgeProperty createDefaultBroadcastEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST,
+    EdgeProperty edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST,
         EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
+        OutputDescriptor.create(
             getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
+        InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
     return edgeProperty;
   }
@@ -118,11 +118,11 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
    * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
    */
   public EdgeProperty createDefaultOneToOneEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.ONE_TO_ONE,
+    EdgeProperty edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.ONE_TO_ONE,
         EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
+        OutputDescriptor.create(
             getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
+        InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
     return edgeProperty;
   }
@@ -137,10 +137,10 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
   public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
     Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
     EdgeProperty edgeProperty =
-        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
+        EdgeProperty.create(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
             EdgeProperty.SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
-            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
+            OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
+            InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
     return edgeProperty;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
index 3216786..54ea5ce 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
@@ -123,11 +123,11 @@ public class UnorderedPartitionedKVEdgeConfig
    * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
    */
   public EdgeProperty createDefaultEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER,
+    EdgeProperty edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
+        OutputDescriptor.create(
             getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
+        InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
     return edgeProperty;
   }
@@ -142,10 +142,10 @@ public class UnorderedPartitionedKVEdgeConfig
   public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
     Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
     EdgeProperty edgeProperty =
-        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
+        EdgeProperty.create(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
             EdgeProperty.SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
-            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
+            OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
+            InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
     return edgeProperty;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 3c32ec8..8708c6f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -199,13 +199,14 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
     VertexManagerEventPayloadProto.Builder vmBuilder = VertexManagerEventPayloadProto
         .newBuilder();
     vmBuilder.setOutputSize(outputSize);
-    VertexManagerEvent vmEvent = new VertexManagerEvent(
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(
         getContext().getDestinationVertexName(), vmBuilder.build().toByteArray());    
 
     List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs() + 1);
     events.add(vmEvent);
 
-    CompositeDataMovementEvent csdme = new CompositeDataMovementEvent(0, getNumPhysicalOutputs(), payload);
+    CompositeDataMovementEvent csdme =
+        CompositeDataMovementEvent.create(0, getNumPhysicalOutputs(), payload);
     events.add(csdme);
 
     return events;

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 8840ecf..9ad7cd3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -154,7 +154,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     }
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
 
-    DataMovementEvent dmEvent = new DataMovementEvent(0, payloadProto.toByteArray());
+    DataMovementEvent dmEvent = DataMovementEvent.create(0, payloadProto.toByteArray());
     List<Event> events = Lists.newArrayListWithCapacity(1);
     events.add(dmEvent);
     return events;

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index acd32f8..e893631 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -21,7 +21,7 @@ package org.apache.tez.runtime.library.processor;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -115,7 +115,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
     }
 
     public UserPayload toUserPayload() {
-      return new UserPayload(ByteBuffer.wrap(Integer.toString(timeToSleepMS).getBytes()));
+      return UserPayload.create(ByteBuffer.wrap(Integer.toString(timeToSleepMS).getBytes()));
     }
 
     public void fromUserPayload(UserPayload userPayload) throws CharacterCodingException {

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 3ae3615..be43dc0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -487,12 +487,12 @@ public class ShuffleManager implements FetcherCallback {
       LOG.fatal(message);
       inputContext.fatalError(null, message);
     } else {
-    InputReadErrorEvent readError = new InputReadErrorEvent(
+    InputReadErrorEvent readError = InputReadErrorEvent.create(
         "Fetch failure while fetching from "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(),
-                srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
-                srcAttemptIdentifier.getAttemptNumber()),
+            inputContext.getSourceVertexName(),
+            srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+            srcAttemptIdentifier.getAttemptNumber()),
         srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
         srcAttemptIdentifier.getAttemptNumber());
     

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 09b7d4b..f57bdb0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -59,12 +59,12 @@ public class TestInputReadyVertexManager {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
-    EdgeProperty eProp1 = new EdgeProperty(
+    EdgeProperty eProp1 = EdgeProperty.create(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     
     String mockManagedVertexId = "Vertex";
     
@@ -93,12 +93,12 @@ public class TestInputReadyVertexManager {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
-    EdgeProperty eProp1 = new EdgeProperty(
+    EdgeProperty eProp1 = EdgeProperty.create(
         EdgeProperty.DataMovementType.ONE_TO_ONE,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     
     String mockManagedVertexId = "Vertex";
     Container mockContainer1 = mock(Container.class);
@@ -148,26 +148,26 @@ public class TestInputReadyVertexManager {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
-    EdgeProperty eProp1 = new EdgeProperty(
+    EdgeProperty eProp1 = EdgeProperty.create(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     String mockSrcVertexId2 = "Vertex2";
-    EdgeProperty eProp2 = new EdgeProperty(
+    EdgeProperty eProp2 = EdgeProperty.create(
         EdgeProperty.DataMovementType.ONE_TO_ONE,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     String mockSrcVertexId3 = "Vertex3";
-    EdgeProperty eProp3 = new EdgeProperty(
+    EdgeProperty eProp3 = EdgeProperty.create(
         EdgeProperty.DataMovementType.ONE_TO_ONE,
-        EdgeProperty.DataSourceType.PERSISTED, 
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     
     String mockManagedVertexId = "Vertex";
     Container mockContainer2 = mock(Container.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 4290cd7..e4b4656 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -68,26 +68,26 @@ public class TestShuffleVertexManager {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
-    EdgeProperty eProp1 = new EdgeProperty(
+    EdgeProperty eProp1 = EdgeProperty.create(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     String mockSrcVertexId2 = "Vertex2";
-    EdgeProperty eProp2 = new EdgeProperty(
+    EdgeProperty eProp2 = EdgeProperty.create(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     String mockSrcVertexId3 = "Vertex3";
-    EdgeProperty eProp3 = new EdgeProperty(
+    EdgeProperty eProp3 = EdgeProperty.create(
         EdgeProperty.DataMovementType.BROADCAST,
-        EdgeProperty.DataSourceType.PERSISTED, 
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     
     final String mockManagedVertexId = "Vertex4";
     
@@ -217,7 +217,7 @@ public class TestShuffleVertexManager {
 
     byte[] payload =
         VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
-    VertexManagerEvent vmEvent = new VertexManagerEvent("Vertex", payload);
+    VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
     // parallelism not change due to large data size
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
     manager.onVertexStarted(null);
@@ -237,7 +237,7 @@ public class TestShuffleVertexManager {
     scheduledTasks.clear();
     payload =
         VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray();
-    vmEvent = new VertexManagerEvent("Vertex", payload);
+    vmEvent = VertexManagerEvent.create("Vertex", payload);
     
     manager = createManager(conf, mockContext, 0.5f, 0.5f);
     manager.onVertexStarted(null);
@@ -283,7 +283,7 @@ public class TestShuffleVertexManager {
     
     EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
     Map<Integer, List<Integer>> targets = Maps.newHashMap();
-    DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
+    DataMovementEvent dmEvent = DataMovementEvent.create(1, new byte[0]);
     // 4 source task outputs - same as original number of partitions
     Assert.assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(0));
     // 4 destination task inputs - 2 source tasks + 2 merged partitions
@@ -295,7 +295,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(1, e.getValue().size());
     Assert.assertEquals(3, e.getValue().get(0).intValue());
     targets.clear();
-    dmEvent = new DataMovementEvent(2, new byte[0]);
+    dmEvent = DataMovementEvent.create(2, new byte[0]);
     edgeManager.routeDataMovementEventToDestination(dmEvent, 0, dmEvent.getSourceIndex(), targets);
     Assert.assertEquals(1, targets.size());
     e = targets.entrySet().iterator().next();
@@ -321,26 +321,26 @@ public class TestShuffleVertexManager {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
-    EdgeProperty eProp1 = new EdgeProperty(
+    EdgeProperty eProp1 = EdgeProperty.create(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     String mockSrcVertexId2 = "Vertex2";
-    EdgeProperty eProp2 = new EdgeProperty(
+    EdgeProperty eProp2 = EdgeProperty.create(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     String mockSrcVertexId3 = "Vertex3";
-    EdgeProperty eProp3 = new EdgeProperty(
+    EdgeProperty eProp3 = EdgeProperty.create(
         EdgeProperty.DataMovementType.BROADCAST,
-        EdgeProperty.DataSourceType.PERSISTED, 
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
     
     String mockManagedVertexId = "Vertex4";
     

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index b409e11..faa728f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -382,7 +382,7 @@ public class TestValuesIterator {
     doReturn(1).when(inputContext).getInputIndex();
     doReturn("srcVertex").when(inputContext).getSourceVertexName();
     doReturn(1).when(inputContext).getTaskVertexIndex();
-    doReturn(new UserPayload(ByteBuffer.wrap(new byte[1024]))).when(inputContext).getUserPayload();
+    doReturn(UserPayload.create(ByteBuffer.wrap(new byte[1024]))).when(inputContext).getUserPayload();
     return inputContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
index f31d20e..56e1337 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
@@ -73,7 +73,7 @@ public class TestShuffleInputEventHandler {
     if (emptyPartitionByteString != null) {
       builder.setEmptyPartitions(emptyPartitionByteString);
     }
-    return new DataMovementEvent(srcIndex, targetIndex, 0, builder.build().toByteArray());
+    return DataMovementEvent.create(srcIndex, targetIndex, 0, builder.build().toByteArray());
   }
 
   @Before
@@ -106,7 +106,7 @@ public class TestShuffleInputEventHandler {
   public void testFailedEvent() throws IOException {
     List<Event> events = new LinkedList<Event>();
     int targetIdx = 1;
-    InputFailedEvent failedEvent = new InputFailedEvent(targetIdx, 0);
+    InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0);
     events.add(failedEvent);
     handler.handleEvents(events);
     InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
index e5a2f99..646dca8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
@@ -148,7 +148,7 @@ public class TestShuffleInputEventHandlerImpl {
     if (emptyPartitionByteString != null) {
       builder.setEmptyPartitions(emptyPartitionByteString);
     }
-    Event dme = new DataMovementEvent(srcIndex, targetIndex, 0, builder.build().toByteArray());
+    Event dme = DataMovementEvent.create(srcIndex, targetIndex, 0, builder.build().toByteArray());
     return dme;
   }
 


Mime
View raw message