tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/4] TEZ-1246. Replace constructors with create() methods for DAG, Vertex, Edge etc in the API. (sseth)
Date Tue, 19 Aug 2014 05:40:49 GMT
Repository: tez
Updated Branches:
  refs/heads/master 5f94db786 -> b30e2bcf8


http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index af82e6c..f856956 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -142,21 +142,21 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
       }
     }
     byte[] procByte = {(byte) (doLocalityCheck ? 1 : 0), 1};
-    UserPayload procPayload = new UserPayload(ByteBuffer.wrap(procByte));
+    UserPayload procPayload = UserPayload.create(ByteBuffer.wrap(procByte));
 
     System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
 
-    Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
+    Vertex broadcastVertex = Vertex.create("Broadcast", ProcessorDescriptor.create(
         InputProcessor.class.getName()), numBroadcastTasks);
     
-    Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
+    Vertex inputVertex = Vertex.create("Input", ProcessorDescriptor.create(
         InputProcessor.class.getName()).setUserPayload(procPayload), numOneToOneTasks);
 
-    Vertex oneToOneVertex = new Vertex("OneToOne",
-        new ProcessorDescriptor(
+    Vertex oneToOneVertex = Vertex.create("OneToOne",
+        ProcessorDescriptor.create(
             OneToOneProcessor.class.getName()).setUserPayload(procPayload));
     oneToOneVertex.setVertexManagerPlugin(
-            new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));
+        VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()));
 
     UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
         .newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
@@ -166,9 +166,9 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
         .addVertex(broadcastVertex)
         .addVertex(oneToOneVertex)
         .addEdge(
-            new Edge(inputVertex, oneToOneVertex, edgeConf.createDefaultOneToOneEdgeProperty()))
+            Edge.create(inputVertex, oneToOneVertex, edgeConf.createDefaultOneToOneEdgeProperty()))
         .addEdge(
-            new Edge(broadcastVertex, oneToOneVertex,
+            Edge.create(broadcastVertex, oneToOneVertex,
                 edgeConf.createDefaultBroadcastEdgeProperty()));
     return dag;
   }
@@ -202,7 +202,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
     // is the same filesystem as the one used for Input/Output.
     TezClient tezSession = null;
     // needs session or else TaskScheduler does not hold onto containers
-    tezSession = new TezClient("broadcastAndOneToOneExample", tezConf);
+    tezSession = TezClient.create("broadcastAndOneToOneExample", tezConf);
     tezSession.start();
 
     DAGClient dagClient = null;

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index ac4cfb1..875bfcb 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -156,7 +156,7 @@ public class FilterLinesByWord extends Configured implements Tool {
 
 
 
-    TezClient tezSession = new TezClient("FilterLinesByWordSession", tezConf, 
+    TezClient tezSession = TezClient.create("FilterLinesByWordSession", tezConf,
         commonLocalResources, credentials);
     tezSession.start(); // Why do I need to start the TezSession.
 
@@ -169,7 +169,7 @@ public class FilterLinesByWord extends Configured implements Tool {
 
     UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
-    Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
+    Vertex stage1Vertex = Vertex.create("stage1", ProcessorDescriptor.create(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
         .setTaskLocalFiles(commonLocalResources);
 
@@ -186,22 +186,24 @@ public class FilterLinesByWord extends Configured implements Tool {
     stage1Vertex.addDataSource("MRInput", dsd);
 
     // Setup stage2 Vertex
-    Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
+    Vertex stage2Vertex = Vertex.create("stage2", ProcessorDescriptor.create(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(
         TezUtils.createUserPayloadFromConf(stage2Conf)), 1);
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
-    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
+    OutputDescriptor od = OutputDescriptor.create(MROutput.class.getName())
         .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf));
-    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
+    OutputCommitterDescriptor ocd =
+        OutputCommitterDescriptor.create(MROutputCommitter.class.getName());
     stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
 
     UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
 
     DAG dag = new DAG("FilterLinesByWord");
-    Edge edge = new Edge(stage1Vertex, stage2Vertex, edgeConf.createDefaultBroadcastEdgeProperty());
+    Edge edge =
+        Edge.create(stage1Vertex, stage2Vertex, edgeConf.createDefaultBroadcastEdgeProperty());
     dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
 
     LOG.info("Submitting DAG to Tez Session");

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index a52cb5a..61d827e 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -146,7 +146,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
 
 
 
-    TezClient tezSession = new TezClient("FilterLinesByWordSession", tezConf,
+    TezClient tezSession = TezClient.create("FilterLinesByWordSession", tezConf,
         commonLocalResources, null);
     tezSession.start(); // Why do I need to start the TezSession.
 
@@ -160,7 +160,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
 
     UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
-    Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
+    Vertex stage1Vertex = Vertex.create("stage1", ProcessorDescriptor.create(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
         .setTaskLocalFiles(commonLocalResources);
 
@@ -177,7 +177,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     stage1Vertex.addDataSource("MRInput", dsd);
 
     // Setup stage2 Vertex
-    Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
+    Vertex stage2Vertex = Vertex.create("stage2", ProcessorDescriptor.create(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(TezUtils
         .createUserPayloadFromConf(stage2Conf)), dsd.getNumberOfShards());
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
@@ -185,15 +185,16 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     // Configure the Output for stage2
     stage2Vertex.addDataSink(
         "MROutput",
-        new DataSinkDescriptor(new OutputDescriptor(MROutput.class.getName())
+        new DataSinkDescriptor(OutputDescriptor.create(MROutput.class.getName())
             .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf)),
-            new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
+            OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
 
     UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
 
     DAG dag = new DAG("FilterLinesByWord");
-    Edge edge = new Edge(stage1Vertex, stage2Vertex, edgeConf.createDefaultOneToOneEdgeProperty());
+    Edge edge =
+        Edge.create(stage1Vertex, stage2Vertex, edgeConf.createDefaultOneToOneEdgeProperty());
     dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
 
     LOG.info("Submitting DAG to Tez Session");

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index daad5ba..e4c515b 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -524,7 +524,7 @@ public class MRRSleepJob extends Configured implements Tool {
     UserPayload mapUserPayload = TezUtils.createUserPayloadFromConf(mapStageConf);
     int numTasks = generateSplitsInAM ? -1 : numMapper;
 
-    Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
+    Vertex mapVertex = Vertex.create("map", ProcessorDescriptor.create(
         MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks)
         .setTaskLocalFiles(commonLocalResources);
     mapVertex.addDataSource("MRInput", dataSource);
@@ -536,8 +536,8 @@ public class MRRSleepJob extends Configured implements Tool {
         Configuration iconf =
             intermediateReduceStageConfs[i];
         UserPayload iReduceUserPayload = TezUtils.createUserPayloadFromConf(iconf);
-        Vertex ivertex = new Vertex("ireduce" + (i+1),
-                new ProcessorDescriptor(ReduceProcessor.class.getName()).
+        Vertex ivertex = Vertex.create("ireduce" + (i + 1),
+            ProcessorDescriptor.create(ReduceProcessor.class.getName()).
                 setUserPayload(iReduceUserPayload), numIReducer);
         ivertex.setTaskLocalFiles(commonLocalResources);
         vertices.add(ivertex);
@@ -547,7 +547,7 @@ public class MRRSleepJob extends Configured implements Tool {
     Vertex finalReduceVertex = null;
     if (numReducer > 0) {
       UserPayload reducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
-      finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
+      finalReduceVertex = Vertex.create("reduce", ProcessorDescriptor.create(
           ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
       finalReduceVertex.setTaskLocalFiles(commonLocalResources);
       finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder(finalReduceConf,
@@ -571,7 +571,7 @@ public class MRRSleepJob extends Configured implements Tool {
       dag.addVertex(vertices.get(i));
       if (i != 0) {
         dag.addEdge(
-            new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty()));
+            Edge.create(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty()));
       }
     }
 
@@ -733,7 +733,7 @@ public class MRRSleepJob extends Configured implements Tool {
         mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
         iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
 
-    TezClient tezSession = new TezClient("MRRSleep", conf, false, null, credentials);
+    TezClient tezSession = TezClient.create("MRRSleep", conf, false, null, credentials);
     tezSession.start();
     DAGClient dagClient = tezSession.submitDAG(dag);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index edfd04a..3c4e66f 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -196,7 +196,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
       dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, inputPath).build();
     }
 
-    Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
+    Vertex mapVertex = Vertex.create("initialmap", ProcessorDescriptor.create(
         MapProcessor.class.getName()).setUserPayload(
         TezUtils.createUserPayloadFromConf(mapStageConf))
         .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
@@ -206,10 +206,10 @@ public class TestOrderedWordCount extends Configured implements Tool {
     ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
     iReduceStageConf.writeXml(iROutputStream);
     String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
-    Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
+    Vertex ivertex = Vertex.create("intermediate_reducer", ProcessorDescriptor.create(
         ReduceProcessor.class.getName())
-            .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
-            .setHistoryText(iReduceStageHistoryText), 2);
+        .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
+        .setHistoryText(iReduceStageHistoryText), 2);
     ivertex.setTaskLocalFiles(commonLocalResources);
     vertices.add(ivertex);
 
@@ -217,11 +217,11 @@ public class TestOrderedWordCount extends Configured implements Tool {
     finalReduceConf.writeXml(finalReduceOutputStream);
     String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
     UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
-    Vertex finalReduceVertex = new Vertex("finalreduce",
-        new ProcessorDescriptor(
+    Vertex finalReduceVertex = Vertex.create("finalreduce",
+        ProcessorDescriptor.create(
             ReduceProcessor.class.getName())
-                .setUserPayload(finalReducePayload)
-                .setHistoryText(finalReduceStageHistoryText), 1);
+            .setUserPayload(finalReducePayload)
+            .setHistoryText(finalReduceStageHistoryText), 1);
     finalReduceVertex.setTaskLocalFiles(commonLocalResources);
     finalReduceVertex.addDataSink("MROutput",
         MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
@@ -238,7 +238,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
             HashPartitioner.class.getName()).setFromConfiguration(conf)
 	    .configureInput().useLegacyInput().done().build();
     dag.addEdge(
-        new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"),
+        Edge.create(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"),
             edgeConf1.createDefaultEdgeProperty()));
 
     OrderedPartitionedKVEdgeConfig edgeConf2 = OrderedPartitionedKVEdgeConfig
@@ -246,7 +246,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
             HashPartitioner.class.getName()).setFromConfiguration(conf)
             .configureInput().useLegacyInput().done().build();
     dag.addEdge(
-        new Edge(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"),
+        Edge.create(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"),
             edgeConf2.createDefaultEdgeProperty()));
 
     return dag;
@@ -330,7 +330,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     } else {
       tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
     }
-    TezClient tezSession = new TezClient("OrderedWordCountSession", tezConf,
+    TezClient tezSession = TezClient.create("OrderedWordCountSession", tezConf,
         null, instance.credentials);
     tezSession.start();
 
@@ -384,7 +384,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
         }
         if (doPreWarm) {
           LOG.info("Pre-warming Session");
-          PreWarmVertex preWarmVertex = new PreWarmVertex("PreWarm", preWarmNumContainers, dag
+          PreWarmVertex preWarmVertex = PreWarmVertex.create("PreWarm", preWarmNumContainers, dag
               .getVertex("initialmap").getTaskResource());
           preWarmVertex.setTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
           preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index e136558..78a5566 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -173,16 +173,16 @@ public class UnionExample {
         inputPath);
     DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).build();
 
-    Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
+    Vertex mapVertex1 = Vertex.create("map1", ProcessorDescriptor.create(
         TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
 
-    Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
+    Vertex mapVertex2 = Vertex.create("map2", ProcessorDescriptor.create(
         TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
 
-    Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
+    Vertex mapVertex3 = Vertex.create("map3", ProcessorDescriptor.create(
         TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
 
-    Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor(
+    Vertex checkerVertex = Vertex.create("checker", ProcessorDescriptor.create(
         UnionProcessor.class.getName()), 1);
 
     Configuration outputConf = new Configuration(tezConf);
@@ -211,10 +211,10 @@ public class UnionExample {
         .addVertex(mapVertex3)
         .addVertex(checkerVertex)
         .addEdge(
-            new Edge(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
+            Edge.create(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
         .addEdge(
-            new GroupInputEdge(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
-                new InputDescriptor(
+            GroupInputEdge.create(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
+                InputDescriptor.create(
                     ConcatenatedMergedKeyValuesInput.class.getName())));
     return dag;  
   }
@@ -251,7 +251,7 @@ public class UnionExample {
     // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
     // is the same filesystem as the one used for Input/Output.
     
-    TezClient tezSession = new TezClient("UnionExampleSession", tezConf);
+    TezClient tezSession = TezClient.create("UnionExampleSession", tezConf);
     tezSession.start();
 
     DAGClient dagClient = null;

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 4f3eb26..49de6c1 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -186,8 +186,8 @@ public class TestMRRJobsDAGApi {
     SleepProcessorConfig spConf = new SleepProcessorConfig(1);
 
     DAG dag = new DAG("TezSleepProcessor");
-    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
         Resource.newInstance(1024, 1));
     dag.addVertex(vertex);
 
@@ -197,7 +197,7 @@ public class TestMRRJobsDAGApi {
     remoteFs.mkdirs(remoteStagingDir);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
 
-    TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
+    TezClient tezSession = TezClient.create("TezSleepProcessor", tezConf, false);
     tezSession.start();
 
     DAGClient dagClient = tezSession.submitDAG(dag);
@@ -229,14 +229,14 @@ public class TestMRRJobsDAGApi {
           .nextInt(100000))));
       remoteFs.mkdirs(remoteStagingDir);
       tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-      tezSession = new TezClient("OrderedWordCountSession", tezConf, true);
+      tezSession = TezClient.create("OrderedWordCountSession", tezConf, true);
       tezSession.start();
 
       SleepProcessorConfig spConf = new SleepProcessorConfig(1);
       for (int dagIndex = 1; dagIndex <= 2; dagIndex++) {
         DAG dag = new DAG("TezSleepProcessor");
-        Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+                SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
             Resource.newInstance(1024, 1));
         dag.addVertex(vertex);
 
@@ -274,8 +274,8 @@ public class TestMRRJobsDAGApi {
     SleepProcessorConfig spConf = new SleepProcessorConfig(1);
 
     DAG dag = new DAG("TezSleepProcessor");
-    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
         Resource.newInstance(1024, 1));
     dag.addVertex(vertex);
 
@@ -287,7 +287,7 @@ public class TestMRRJobsDAGApi {
     localFs.mkdirs(stagingDir);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
 
-    TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
+    TezClient tezSession = TezClient.create("TezSleepProcessor", tezConf, false);
     tezSession.start();
 
     DAGClient dagClient = tezSession.submitDAG(dag);
@@ -316,8 +316,8 @@ public class TestMRRJobsDAGApi {
     SleepProcessorConfig spConf = new SleepProcessorConfig(1);
 
     DAG dag = new DAG("TezSleepProcessorHistoryLogging");
-    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
+    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
         Resource.newInstance(1024, 1));
     dag.addVertex(vertex);
 
@@ -335,7 +335,7 @@ public class TestMRRJobsDAGApi {
         localFs.makeQualified(historyLogDir).toString());
 
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
-    TezClient tezSession = new TezClient("TezSleepProcessorHistoryLogging", tezConf);
+    TezClient tezSession = TezClient.create("TezSleepProcessorHistoryLogging", tezConf);
     tezSession.start();
 
     DAGClient dagClient = tezSession.submitDAG(dag);
@@ -520,7 +520,7 @@ public class TestMRRJobsDAGApi {
     TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
 
-    TezClient tezSession = new TezClient("testrelocalizationsession", tezConf, true);
+    TezClient tezSession = TezClient.create("testrelocalizationsession", tezConf, true);
     tezSession.start();
     Assert.assertEquals(TezAppMasterStatus.INITIALIZING, tezSession.getAppMasterStatus());
     return tezSession;
@@ -538,7 +538,7 @@ public class TestMRRJobsDAGApi {
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
         remoteStagingDir.toString());
 
-    TezClient tezSession = new TezClient("testsession", tezConf, true);
+    TezClient tezSession = TezClient.create("testsession", tezConf, true);
     tezSession.start();
     Assert.assertEquals(TezAppMasterStatus.INITIALIZING,
         tezSession.getAppMasterStatus());
@@ -674,21 +674,22 @@ public class TestMRRJobsDAGApi {
       if (initializerClass == null) {
         dsd = MRInputLegacy.createConfigBuilder(stage1Conf, SleepInputFormat.class).build();
       } else {
-        InputInitializerDescriptor iid = new InputInitializerDescriptor(inputInitializerClazz.getName());
+        InputInitializerDescriptor iid =
+            InputInitializerDescriptor.create(inputInitializerClazz.getName());
         dsd = MRInputLegacy.createConfigBuilder(stage1Conf, SleepInputFormat.class)
             .setCustomInitializerDescriptor(iid).build();
       }
     }
 
-    Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(stage1Payload),
+    Vertex stage1Vertex = Vertex.create("map", ProcessorDescriptor.create(
+            MapProcessor.class.getName()).setUserPayload(stage1Payload),
         dsd.getNumberOfShards(), Resource.newInstance(256, 1));
     stage1Vertex.addDataSource("MRInput", dsd);
-    Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).setUserPayload(stage2Payload),
+    Vertex stage2Vertex = Vertex.create("ireduce", ProcessorDescriptor.create(
+            ReduceProcessor.class.getName()).setUserPayload(stage2Payload),
         1, Resource.newInstance(256, 1));
-    Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
+    Vertex stage3Vertex = Vertex.create("reduce", ProcessorDescriptor.create(
+            ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
         1, Resource.newInstance(256, 1));
     stage3Vertex.addDataSink("MROutput",
         MROutputLegacy.createConfigBuilder(stage3Conf, NullOutputFormat.class).build());
@@ -699,16 +700,18 @@ public class TestMRRJobsDAGApi {
     dag.addVertex(stage2Vertex);
     dag.addVertex(stage3Vertex);
 
-    Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+    Edge edge1 = Edge.create(stage1Vertex, stage2Vertex, EdgeProperty.create(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OrderedPartitionedKVOutput.class.getName()).setUserPayload(stage2Payload), new InputDescriptor(
-                OrderedGroupedInputLegacy.class.getName()).setUserPayload(stage2Payload)));
-    Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
+        SchedulingType.SEQUENTIAL, OutputDescriptor.create(
+            OrderedPartitionedKVOutput.class.getName()).setUserPayload(stage2Payload),
+        InputDescriptor.create(
+            OrderedGroupedInputLegacy.class.getName()).setUserPayload(stage2Payload)));
+    Edge edge2 = Edge.create(stage2Vertex, stage3Vertex, EdgeProperty.create(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OrderedPartitionedKVOutput.class.getName()).setUserPayload(stage3Payload), new InputDescriptor(
-                OrderedGroupedInputLegacy.class.getName()).setUserPayload(stage3Payload)));
+        SchedulingType.SEQUENTIAL, OutputDescriptor.create(
+            OrderedPartitionedKVOutput.class.getName()).setUserPayload(stage3Payload),
+        InputDescriptor.create(
+            OrderedGroupedInputLegacy.class.getName()).setUserPayload(stage3Payload)));
 
     dag.addEdge(edge1);
     dag.addEdge(edge2);
@@ -731,7 +734,7 @@ public class TestMRRJobsDAGApi {
       } else {
         tempTezconf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
       }
-      tezSession = new TezClient("testsession", tempTezconf);
+      tezSession = TezClient.create("testsession", tempTezconf);
       tezSession.start();
     } else {
       tezSession = reUseTezSession;

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java b/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
index c43b856..84e5b97 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
@@ -65,7 +65,7 @@ public class FaultToleranceTestRunner {
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
         remoteStagingDir.toString());
 
-    tezSession = new TezClient("FaultToleranceTestRunner", tezConf);
+    tezSession = TezClient.create("FaultToleranceTestRunner", tezConf);
     tezSession.start();
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
index 3a68fd7..f3a8b9c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
@@ -46,20 +46,20 @@ public class SimpleTestDAG {
   
   public static DAG createDAG(String name, 
       Configuration conf) throws Exception {
-    UserPayload payload = new UserPayload(null);
+    UserPayload payload = UserPayload.create(null);
     int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);
       payload = TezUtils.createUserPayloadFromConf(conf);
     }
     DAG dag = new DAG(name);
-    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            TestOutput.getOutputDesc(payload), 
+    Vertex v1 = Vertex.create("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = Vertex.create("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(Edge.create(v1, v2,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
             TestInput.getInputDesc(payload))));
     return dag;
   }
@@ -82,7 +82,7 @@ public class SimpleTestDAG {
    * @throws Exception
    */
   public static DAG createDAGForVertexOrder(String name, Configuration conf) throws Exception{
-    UserPayload payload = new UserPayload(null);
+    UserPayload payload = UserPayload.create(null);
     int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);
@@ -90,12 +90,12 @@ public class SimpleTestDAG {
     }
     DAG dag = new DAG(name);
 
-    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v4 = new Vertex("v4", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v5 = new Vertex("v5", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v6 = new Vertex("v6", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v1 = Vertex.create("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = Vertex.create("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = Vertex.create("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v4 = Vertex.create("v4", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v5 = Vertex.create("v5", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v6 = Vertex.create("v6", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
 
     // add vertex not in the topological order, since we are using this dag for testing vertex topological order
     dag.addVertex(v4)
@@ -104,42 +104,42 @@ public class SimpleTestDAG {
       .addVertex(v1)
       .addVertex(v2)
       .addVertex(v3)
-      .addEdge(new Edge(v1, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            TestOutput.getOutputDesc(payload),
-            TestInput.getInputDesc(payload))))
-      .addEdge(new Edge(v2, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            TestOutput.getOutputDesc(payload),
-            TestInput.getInputDesc(payload))))
-      .addEdge(new Edge(v3, v4,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            TestOutput.getOutputDesc(payload),
-            TestInput.getInputDesc(payload))))
-      .addEdge(new Edge(v3, v5,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            TestOutput.getOutputDesc(payload),
-            TestInput.getInputDesc(payload))))
-      .addEdge(new Edge(v4, v6,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            TestOutput.getOutputDesc(payload),
-            TestInput.getInputDesc(payload))))
-      .addEdge(new Edge(v5, v6,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            TestOutput.getOutputDesc(payload),
-            TestInput.getInputDesc(payload))));
+      .addEdge(Edge.create(v1, v3,
+          EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+              DataSourceType.PERSISTED,
+              SchedulingType.SEQUENTIAL,
+              TestOutput.getOutputDesc(payload),
+              TestInput.getInputDesc(payload))))
+      .addEdge(Edge.create(v2, v3,
+          EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+              DataSourceType.PERSISTED,
+              SchedulingType.SEQUENTIAL,
+              TestOutput.getOutputDesc(payload),
+              TestInput.getInputDesc(payload))))
+      .addEdge(Edge.create(v3, v4,
+          EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+              DataSourceType.PERSISTED,
+              SchedulingType.SEQUENTIAL,
+              TestOutput.getOutputDesc(payload),
+              TestInput.getInputDesc(payload))))
+      .addEdge(Edge.create(v3, v5,
+          EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+              DataSourceType.PERSISTED,
+              SchedulingType.SEQUENTIAL,
+              TestOutput.getOutputDesc(payload),
+              TestInput.getInputDesc(payload))))
+      .addEdge(Edge.create(v4, v6,
+          EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+              DataSourceType.PERSISTED,
+              SchedulingType.SEQUENTIAL,
+              TestOutput.getOutputDesc(payload),
+              TestInput.getInputDesc(payload))))
+      .addEdge(Edge.create(v5, v6,
+          EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+              DataSourceType.PERSISTED,
+              SchedulingType.SEQUENTIAL,
+              TestOutput.getOutputDesc(payload),
+              TestInput.getInputDesc(payload))));
 
     return dag;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
index 967f0ed..c509fe0 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
@@ -55,21 +55,21 @@ public class SimpleTestDAG3Vertices {
       payload = TezUtils.createUserPayloadFromConf(conf);
     }
     DAG dag = new DAG(name);
-    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            TestOutput.getOutputDesc(payload), 
+    Vertex v1 = Vertex.create("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = Vertex.create("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = Vertex.create("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(Edge.create(v1, v2,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
+            TestInput.getInputDesc(payload))));
+    dag.addVertex(v3).addEdge(Edge.create(v2, v3,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
             TestInput.getInputDesc(payload))));
-    dag.addVertex(v3).addEdge(new Edge(v2, v3, 
-            new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-                DataSourceType.PERSISTED, 
-                SchedulingType.SEQUENTIAL, 
-                TestOutput.getOutputDesc(payload), 
-                TestInput.getInputDesc(payload))));
     return dag;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 6972714..c9acdc2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -131,7 +131,7 @@ public class TestDAGRecovery {
     tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
 
-    tezSession = new TezClient("TestDAGRecovery", tezConf);
+    tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
   }
 
@@ -184,9 +184,9 @@ public class TestDAGRecovery {
     DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null);
     dag.getVertex("v1").addDataSource(
         "i1",
-        new DataSourceDescriptor(
-            new InputDescriptor(NoOpInput.class.getName()),
-            new InputInitializerDescriptor(FailingInputInitializer.class
+        DataSourceDescriptor.create(
+            InputDescriptor.create(NoOpInput.class.getName()),
+            InputInitializerDescriptor.create(FailingInputInitializer.class
                 .getName()), null));
     runDAGAndVerify(dag, State.SUCCEEDED);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index ff22cee..37bea80 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -134,7 +134,7 @@ public class TestDAGRecovery2 {
 
     TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
     
-    tezSession = new TezClient("TestDAGRecovery2", tezConf);
+    tezSession = TezClient.create("TestDAGRecovery2", tezConf);
     tezSession.start();
   }
 
@@ -177,10 +177,11 @@ public class TestDAGRecovery2 {
   public void testFailingCommitter() throws Exception {
     DAG dag = SimpleVTestDAG.createDAG("FailingCommitterDAG", null);
     OutputDescriptor od =
-        new OutputDescriptor(MultiAttemptDAG.NoOpOutput.class.getName());
-    od.setUserPayload(new UserPayload(ByteBuffer.wrap(
-        new MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true).toUserPayload())));
-    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(
+        OutputDescriptor.create(MultiAttemptDAG.NoOpOutput.class.getName());
+    od.setUserPayload(UserPayload.create(ByteBuffer.wrap(
+        new MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true)
+            .toUserPayload())));
+    OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create(
         MultiAttemptDAG.FailingOutputCommitter.class.getName());
     dag.getVertex("v3").addDataSink("FailingOutput", new DataSinkDescriptor(od, ocd, null));
     runDAGAndVerify(dag, State.FAILED);
@@ -195,7 +196,7 @@ public class TestDAGRecovery2 {
     TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
     tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
-    TezClient session = new TezClient("TestDAGRecovery2SingleAttemptOnly", tezConf);
+    TezClient session = TezClient.create("TestDAGRecovery2SingleAttemptOnly", tezConf);
     session.start();
 
     // DAG should fail as it never completes on the first attempt

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index f3de15d..d894403 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -89,7 +89,7 @@ public class TestFaultTolerance {
           remoteStagingDir.toString());
       tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
 
-      tezSession = new TezClient("TestFaultTolerance", tezConf, true);
+      tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
       tezSession.start();
     }
   }
@@ -135,13 +135,15 @@ public class TestFaultTolerance {
   @Test (timeout=60000)
   public void testBasicSuccessBroadcast() throws Exception {
     DAG dag = new DAG("testBasicSuccessBroadcast");
-    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
-    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.BROADCAST, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            TestOutput.getOutputDesc(null), 
+    Vertex v1 =
+        Vertex.create("v1", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
+    Vertex v2 =
+        Vertex.create("v2", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(Edge.create(v1, v2,
+        EdgeProperty.create(DataMovementType.BROADCAST,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(null),
             TestInput.getInputDesc(null))));
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 17718bb..29ea7c0 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -122,7 +122,7 @@ public class TestInput extends AbstractLogicalInput {
   }
 
   public static InputDescriptor getInputDesc(UserPayload payload) {
-    InputDescriptor desc = new InputDescriptor(TestInput.class.getName());
+    InputDescriptor desc = InputDescriptor.create(TestInput.class.getName());
     if (payload != null) {
       desc.setUserPayload(payload);
     }
@@ -158,7 +158,7 @@ public class TestInput extends AbstractLogicalInput {
             for (int i=0; i<getNumPhysicalInputs(); ++i) {
               String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
                   " index: " + i + " version: " + lastInputReadyValue);
-              events.add(new InputReadErrorEvent(msg, i, lastInputReadyValue));
+              events.add(InputReadErrorEvent.create(msg, i, lastInputReadyValue));
               LOG.info("Failing input: " + msg);
             }
           } else {
@@ -172,7 +172,7 @@ public class TestInput extends AbstractLogicalInput {
               }
               String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
                   " index: " + index.intValue() + " version: " + lastInputReadyValue);
-              events.add(new InputReadErrorEvent(msg, index.intValue(), lastInputReadyValue));
+              events.add(InputReadErrorEvent.create(msg, index.intValue(), lastInputReadyValue));
               LOG.info("Failing input: " + msg);
             }
           }

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 5f73933..9d9767b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -42,7 +42,7 @@ public class TestOutput extends AbstractLogicalOutput {
   }
 
   public static OutputDescriptor getOutputDesc(UserPayload payload) {
-    OutputDescriptor desc = new OutputDescriptor(TestOutput.class.getName());
+    OutputDescriptor desc = OutputDescriptor.create(TestOutput.class.getName());
     if (payload != null) {
       desc.setUserPayload(payload);
     }
@@ -80,7 +80,7 @@ public class TestOutput extends AbstractLogicalOutput {
     byte[] result = ByteBuffer.allocate(4).putInt(output).array();
     List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs());
     for (int i = 0; i < getNumPhysicalOutputs(); i++) {
-      DataMovementEvent event = new DataMovementEvent(i, result);
+      DataMovementEvent event = DataMovementEvent.create(i, result);
       events.add(event);
     }
     return events;

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index fa556e6..ed37ea9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -101,8 +101,8 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
   }
 
   public static ProcessorDescriptor getProcDesc(UserPayload payload) {
-    return new ProcessorDescriptor(TestProcessor.class.getName()).setUserPayload(
-        payload == null ? new UserPayload (null) : payload);
+    return ProcessorDescriptor.create(TestProcessor.class.getName()).setUserPayload(
+        payload == null ? UserPayload.create(null) : payload);
   }
 
   void throwException(String msg) {

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 132626a..736c54b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -189,7 +189,7 @@ public class TestTezJobs {
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
     TezClient tezSession = null;
     try {
-      tezSession = new TezClient("IntersectExampleSession", tezConf);
+      tezSession = TezClient.create("IntersectExampleSession", tezConf);
       tezSession.start();
 
       IntersectDataGen dataGen = new IntersectDataGen();
@@ -391,7 +391,7 @@ public class TestTezJobs {
   @Test (timeout=60000)
   public void testVertexOrder() throws Exception {
     TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    TezClient tezClient = new TezClient("TestVertexOrder", tezConf);
+    TezClient tezClient = TezClient.create("TestVertexOrder", tezConf);
     tezClient.start();
 
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 34c7a7b..9157beb 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -320,36 +320,36 @@ public class MultiAttemptDAG {
 
   public static DAG createDAG(String name,
       Configuration conf) throws Exception {
-    UserPayload payload = new UserPayload(null);
+    UserPayload payload = UserPayload.create(null);
     int taskCount = MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS, MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT);
       payload = TezUtils.createUserPayloadFromConf(conf);
     }
     DAG dag = new DAG(name);
-    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v1 = Vertex.create("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = Vertex.create("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = Vertex.create("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
 
     // Make each vertex manager fail on appropriate attempt
-    v1.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+    v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
         FailOnAttemptVertexManagerPlugin.class.getName())
-        .setUserPayload(new UserPayload(ByteBuffer.wrap(new String("1").getBytes()))));
-    v2.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(new String("1").getBytes()))));
+    v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
         FailOnAttemptVertexManagerPlugin.class.getName())
-        .setUserPayload(new UserPayload(ByteBuffer.wrap(new String("2").getBytes()))));
-    v3.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(new String("2").getBytes()))));
+    v3.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
         FailOnAttemptVertexManagerPlugin.class.getName())
-        .setUserPayload(new UserPayload(ByteBuffer.wrap(new String("3").getBytes()))));
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(new String("3").getBytes()))));
     dag.addVertex(v1).addVertex(v2).addVertex(v3);
-    dag.addEdge(new Edge(v1, v2,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+    dag.addEdge(Edge.create(v1, v2,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED,
             SchedulingType.SEQUENTIAL,
             TestOutput.getOutputDesc(payload),
             TestInput.getInputDesc(payload))));
-    dag.addEdge(new Edge(v2, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+    dag.addEdge(Edge.create(v2, v3,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED,
             SchedulingType.SEQUENTIAL,
             TestOutput.getOutputDesc(payload),

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
index 526e63b..e5c25ac 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
@@ -48,29 +48,29 @@ public class SimpleReverseVTestDAG {
   
   public static DAG createDAG(String name, 
       Configuration conf) throws Exception {
-    UserPayload payload = new UserPayload(null);
+    UserPayload payload = UserPayload.create(null);
     int taskCount = TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT);
       payload = TezUtils.createUserPayloadFromConf(conf);
     }
     DAG dag = new DAG(name);
-    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v1 = Vertex.create("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = Vertex.create("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = Vertex.create("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
     dag.addVertex(v1).addVertex(v2).addVertex(v3);
-    dag.addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            TestOutput.getOutputDesc(payload), 
+    dag.addEdge(Edge.create(v1, v2,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
+            TestInput.getInputDesc(payload))));
+    dag.addEdge(Edge.create(v1, v3,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
             TestInput.getInputDesc(payload))));
-    dag.addEdge(new Edge(v1, v3, 
-            new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-                DataSourceType.PERSISTED, 
-                SchedulingType.SEQUENTIAL, 
-                TestOutput.getOutputDesc(payload), 
-                TestInput.getInputDesc(payload))));
     return dag;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
index 2bb8971..fb85588 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
@@ -48,29 +48,29 @@ public class SimpleVTestDAG {
   
   public static DAG createDAG(String name, 
       Configuration conf) throws Exception {
-    UserPayload payload = new UserPayload(null);
+    UserPayload payload = UserPayload.create(null);
     int taskCount = TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_V_DAG_NUM_TASKS, TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT);
       payload = TezUtils.createUserPayloadFromConf(conf);
     }
     DAG dag = new DAG(name);
-    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
-    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v1 = Vertex.create("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = Vertex.create("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = Vertex.create("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
     dag.addVertex(v1).addVertex(v2).addVertex(v3);
-    dag.addEdge(new Edge(v1, v3, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            TestOutput.getOutputDesc(payload), 
+    dag.addEdge(Edge.create(v1, v3,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
+            TestInput.getInputDesc(payload))));
+    dag.addEdge(Edge.create(v2, v3,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
             TestInput.getInputDesc(payload))));
-    dag.addEdge(new Edge(v2, v3, 
-            new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-                DataSourceType.PERSISTED, 
-                SchedulingType.SEQUENTIAL, 
-                TestOutput.getOutputDesc(payload), 
-                TestInput.getInputDesc(payload))));
     return dag;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
index f6a9d88..e2f7919 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
@@ -59,20 +59,20 @@ public class SixLevelsFailingDAG extends ThreeLevelsFailingDAG {
     
     protected static void addDAGVerticesAndEdges() {
         ThreeLevelsFailingDAG.addDAGVerticesAndEdges();
-        l4v1 = new Vertex("l4v1", TestProcessor.getProcDesc(payload), 10, defaultResource);
+        l4v1 = Vertex.create("l4v1", TestProcessor.getProcDesc(payload), 10, defaultResource);
         dag.addVertex(l4v1);
         addEdge(l3v1, l4v1, DataMovementType.SCATTER_GATHER);
         addEdge(l3v2, l4v1, DataMovementType.SCATTER_GATHER);
-        l5v1 = new Vertex("l5v1", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        l5v1 = Vertex.create("l5v1", TestProcessor.getProcDesc(payload), 2, defaultResource);
         dag.addVertex(l5v1);
         addEdge(l4v1, l5v1, DataMovementType.SCATTER_GATHER);
-        l5v2 = new Vertex("l5v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        l5v2 = Vertex.create("l5v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
         dag.addVertex(l5v2);
         addEdge(l4v1, l5v2, DataMovementType.SCATTER_GATHER);
-        l5v3 = new Vertex("l5v3", TestProcessor.getProcDesc(payload), 1, defaultResource);
+        l5v3 = Vertex.create("l5v3", TestProcessor.getProcDesc(payload), 1, defaultResource);
         dag.addVertex(l5v3);
         addEdge(l4v1, l5v3, DataMovementType.SCATTER_GATHER);
-        l6v1 = new Vertex("l6v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        l6v1 = Vertex.create("l6v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
         dag.addVertex(l6v1);
         addEdge(l5v1, l6v1, DataMovementType.SCATTER_GATHER);
         addEdge(l5v2, l6v1, DataMovementType.SCATTER_GATHER);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
index 97d2d3f..633c8e7 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
@@ -48,11 +48,11 @@ public class ThreeLevelsFailingDAG extends TwoLevelsFailingDAG {
     
     protected static void addDAGVerticesAndEdges() {
         TwoLevelsFailingDAG.addDAGVerticesAndEdges();
-        l3v1 = new Vertex("l3v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        l3v1 = Vertex.create("l3v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
         dag.addVertex(l3v1);
         addEdge(l2v1, l3v1, DataMovementType.SCATTER_GATHER);
         addEdge(l2v2, l3v1, DataMovementType.SCATTER_GATHER);
-        l3v2 = new Vertex("l3v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        l3v2 = Vertex.create("l3v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
         dag.addVertex(l3v2);
         addEdge(l2v2, l3v2, DataMovementType.BROADCAST);
         addEdge(l2v3, l3v2, DataMovementType.SCATTER_GATHER);

http://git-wip-us.apache.org/repos/asf/tez/blob/b30e2bcf/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
index 63850ec..df3bf97 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
@@ -50,7 +50,7 @@ import org.apache.tez.test.TestProcessor;
 public class TwoLevelsFailingDAG {
     static Resource defaultResource = Resource.newInstance(100, 0);
     protected static DAG dag;
-    protected static UserPayload payload = new UserPayload(null);
+    protected static UserPayload payload = UserPayload.create(null);
     protected static Vertex l1v1, l1v2, l1v3, l1v4;
     protected static Vertex l2v1, l2v2, l2v3, l2v4;
 
@@ -65,17 +65,17 @@ public class TwoLevelsFailingDAG {
     }
     
     protected static void addDAGVerticesAndEdges() {
-        l1v1 = new Vertex("l1v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
-        l2v1 = new Vertex("l2v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
+        l1v1 = Vertex.create("l1v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
+        l2v1 = Vertex.create("l2v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
         addVerticesAndEdgeInternal(l1v1, l2v1, DataMovementType.SCATTER_GATHER);
-        l1v2 = new Vertex("l1v2", TestProcessor.getProcDesc(payload), 2, defaultResource);
-        l2v2 = new Vertex("l2v2", TestProcessor.getProcDesc(payload), 3, defaultResource);
+        l1v2 = Vertex.create("l1v2", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        l2v2 = Vertex.create("l2v2", TestProcessor.getProcDesc(payload), 3, defaultResource);
         addVerticesAndEdgeInternal(l1v2, l2v2, DataMovementType.SCATTER_GATHER);
-        l1v3 = new Vertex("l1v3", TestProcessor.getProcDesc(payload), 3, defaultResource);
-        l2v3 = new Vertex("l2v3", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        l1v3 = Vertex.create("l1v3", TestProcessor.getProcDesc(payload), 3, defaultResource);
+        l2v3 = Vertex.create("l2v3", TestProcessor.getProcDesc(payload), 2, defaultResource);
         addVerticesAndEdgeInternal(l1v3, l2v3, DataMovementType.SCATTER_GATHER);
-        l1v4 = new Vertex("l1v4", TestProcessor.getProcDesc(payload), 2, defaultResource);
-        l2v4 = new Vertex("l2v4", TestProcessor.getProcDesc(payload), 3, defaultResource);
+        l1v4 = Vertex.create("l1v4", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        l2v4 = Vertex.create("l2v4", TestProcessor.getProcDesc(payload), 3, defaultResource);
         addVerticesAndEdgeInternal(l1v4, l2v4, DataMovementType.BROADCAST);
     }
     
@@ -99,11 +99,11 @@ public class TwoLevelsFailingDAG {
      * @param dataMovementType Data movement type
      */
     protected static void addEdge(Vertex v1, Vertex v2, DataMovementType dataMovementType) {
-        dag.addEdge(new Edge(v1, v2, 
-            new EdgeProperty(dataMovementType, 
-                DataSourceType.PERSISTED, 
-                SchedulingType.SEQUENTIAL, 
-                TestOutput.getOutputDesc(payload), 
+        dag.addEdge(Edge.create(v1, v2,
+            EdgeProperty.create(dataMovementType,
+                DataSourceType.PERSISTED,
+                SchedulingType.SEQUENTIAL,
+                TestOutput.getOutputDesc(payload),
                 TestInput.getInputDesc(payload))));
     }
     


Mime
View raw message