tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-645. Re-use ID instances in the AM, intern vertex names etc where possible. (sseth)
Date Wed, 27 Nov 2013 18:43:27 GMT
TEZ-645. Re-use ID instances in the AM, intern vertex names etc where
possible. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b3665f41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b3665f41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b3665f41

Branch: refs/heads/master
Commit: b3665f4118cfe888e90f54c1d5e08ba969dbf972
Parents: fc3a948
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Nov 27 10:43:04 2013 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Nov 27 10:43:04 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/runtime/api/LogicalOutput.java   |  2 +-
 .../java/org/apache/tez/common/IDUtils.java     | 14 ++--
 .../org/apache/tez/dag/records/TezDAGID.java    | 69 +++++++++++++----
 .../java/org/apache/tez/dag/records/TezID.java  |  4 +
 .../tez/dag/records/TezTaskAttemptID.java       | 39 +++++++---
 .../org/apache/tez/dag/records/TezTaskID.java   | 40 ++++++++--
 .../org/apache/tez/dag/records/TezVertexID.java | 37 ++++++++--
 .../org/apache/tez/dag/records/TestTezIds.java  |  8 +-
 .../apache/tez/dag/app/ContainerContext.java    |  2 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  2 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  2 +-
 .../dag/impl/RootInputLeafOutputDescriptor.java |  2 +-
 .../app/dag/impl/RootInputVertexManager.java    |  2 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 16 ++--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  3 +-
 .../tez/dag/app/rm/ContainerContextMatcher.java |  2 +-
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  2 +-
 .../apache/tez/dag/utils/TezBuilderUtils.java   |  6 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 72 +++++++++---------
 .../tez/dag/app/dag/impl/TestDAGScheduler.java  |  8 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 48 ++++++------
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  6 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 78 ++++++++++----------
 .../dag/app/dag/impl/TestVertexScheduler.java   | 52 ++++++-------
 .../tez/dag/app/rm/TestContainerReuse.java      | 62 ++++++++--------
 .../dag/app/rm/container/TestAMContainer.java   | 24 +++---
 .../processor/FilterByWordInputProcessor.java   |  2 +-
 .../processor/FilterByWordOutputProcessor.java  |  2 +-
 .../hadoop/mapred/split/TezGroupedSplit.java    |  2 +-
 .../hadoop/mapreduce/split/TezGroupedSplit.java |  2 +-
 .../tez/mapreduce/hadoop/IDConverter.java       |  8 +-
 .../tez/mapreduce/hadoop/InputSplitInfo.java    |  2 +-
 .../tez/mapreduce/hadoop/MRTaskStatus.java      |  4 +-
 .../org/apache/tez/mapreduce/TezTestUtils.java  | 18 ++---
 .../tez/mapreduce/processor/MapUtils.java       |  1 -
 .../processor/reduce/TestReduceProcessor.java   |  1 -
 .../tez/runtime/api/impl/EventMetaData.java     | 12 +--
 .../apache/tez/runtime/api/impl/InputSpec.java  |  5 +-
 .../apache/tez/runtime/api/impl/OutputSpec.java |  5 +-
 .../apache/tez/runtime/api/impl/TaskSpec.java   | 18 ++---
 .../runtime/api/impl/TezHeartbeatRequest.java   |  3 +-
 .../broadcast/output/FileBasedKVWriter.java     |  2 +-
 .../library/common/sort/impl/TestIFile.java     |  2 +-
 .../output/TestOnFileUnorderedKVOutput.java     | 10 +--
 .../runtime/library/testutils/KVDataGen.java    |  2 +-
 46 files changed, 406 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
index 475eaef..a8d0ce3 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
@@ -33,4 +33,4 @@ public interface LogicalOutput extends Output {
    *          the number of physical outputs
    */
   public void setNumPhysicalOutputs(int numOutputs);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
index 1270e5a..d15b0d3 100644
--- a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
@@ -42,9 +42,9 @@ public class IDUtils {
         if(parts[0].equals(TezTaskID.TASK)) {
           ApplicationId appId = ApplicationId.newInstance(
               Long.valueOf(parts[1]), Integer.parseInt(parts[2]));
-          TezDAGID dagId = new TezDAGID(appId, Integer.parseInt(parts[3]));
-          TezVertexID vId = new TezVertexID(dagId, Integer.parseInt(parts[4]));
-          return new TezTaskID(vId, Integer.parseInt(parts[5]));
+          TezDAGID dagId = TezDAGID.getInstance(appId, Integer.parseInt(parts[3]));
+          TezVertexID vId = TezVertexID.getInstance(dagId, Integer.parseInt(parts[4]));
+          return TezTaskID.getInstance(vId, Integer.parseInt(parts[5]));
         } else
           exceptionMsg = "Bad TaskType identifier. TaskId string : " + str
           + " is not properly formed.";
@@ -72,10 +72,10 @@ public class IDUtils {
         if(parts[0].equals(TezTaskAttemptID.ATTEMPT)) {
           ApplicationId appId = ApplicationId.newInstance(
               Long.valueOf(parts[1]), Integer.parseInt(parts[2]));
-          TezDAGID dagId = new TezDAGID(appId, Integer.parseInt(parts[3]));
-          TezVertexID vId = new TezVertexID(dagId, Integer.parseInt(parts[4]));
-          TezTaskID tId = new TezTaskID(vId, Integer.parseInt(parts[5]));
-          return new TezTaskAttemptID(tId, Integer.parseInt(parts[6]));
+          TezDAGID dagId = TezDAGID.getInstance(appId, Integer.parseInt(parts[3]));
+          TezVertexID vId = TezVertexID.getInstance(dagId, Integer.parseInt(parts[4]));
+          TezTaskID tId = TezTaskID.getInstance(vId, Integer.parseInt(parts[5]));
+          return TezTaskAttemptID.getInstance(tId, Integer.parseInt(parts[6]));
         } else
           exceptionMsg = "Bad TaskType identifier. TaskAttemptId string : "
               + str + " is not properly formed.";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index 227207a..54ca8b0 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -25,6 +25,11 @@ import java.text.NumberFormat;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 /**
  * TezDAGID represents the immutable and unique identifier for
  * a Tez DAG.
@@ -37,31 +42,56 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
  */
 public class TezDAGID extends TezID {
 
+  private static LoadingCache<TezDAGID, TezDAGID> dagIdCache = CacheBuilder.newBuilder().softValues().
+      build(
+          new CacheLoader<TezDAGID, TezDAGID>() {
+            @Override
+            public TezDAGID load(TezDAGID key) throws Exception {
+              return key;
+            }
+          }
+      );
+  
   private ApplicationId applicationId;
 
-  public TezDAGID() {
-  }
-
   /**
-   * Constructs a DAGID object from given {@link ApplicationId}.
+   * Get a DAGID object from given {@link ApplicationId}.
    * @param applicationId Application that this dag belongs to
    * @param id the dag number
    */
-  public TezDAGID(ApplicationId applicationId, int id) {
-    super(id);
-    if(applicationId == null) {
-      throw new IllegalArgumentException("applicationId cannot be null");
-    }
-    this.applicationId = applicationId;
+  public static TezDAGID getInstance(ApplicationId applicationId, int id) {
+    // The newly created TezDAGIds are primarily for their hashCode method, and
+    // will be short-lived.
+    // Alternately the cache can be keyed by the hash of the incoming paramters.
+    Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null");
+    return dagIdCache.getUnchecked(new TezDAGID(applicationId, id));
   }
-
+  
   /**
-   * Constructs a DAGID object from given parts.
+   * Get a DAGID object from given parts.
    * @param yarnRMIdentifier YARN RM identifier
    * @param applicationId application number
    * @param id the dag number
    */
-  public TezDAGID(String yarnRMIdentifier, int appId, int id) {
+  public static TezDAGID getInstance(String yarnRMIdentifier, int appId, int id) {
+    // The newly created TezDAGIds are primarily for their hashCode method, and
+    // will be short-lived.
+    // Alternately the cache can be keyed by the hash of the incoming paramters.
+    Preconditions.checkArgument(yarnRMIdentifier != null, "yarnRMIdentifier cannot be null");
+    return dagIdCache.getUnchecked(new TezDAGID(yarnRMIdentifier, appId, id));
+  }
+  
+  // Public for Writable serialization. Verify if this is actually required.
+  public TezDAGID() {
+  }
+
+  private TezDAGID(ApplicationId applicationId, int id) {
+    super(id);
+    this.applicationId = applicationId;
+  }
+
+  
+  private TezDAGID(String yarnRMIdentifier, int appId, int id) {
     this(ApplicationId.newInstance(Long.valueOf(yarnRMIdentifier),
         appId), id);
   }
@@ -89,11 +119,22 @@ public class TezDAGID extends TezID {
 
 
   @Override
+  // Can't do much about this instance if used via the RPC layer. Any downstream
+  // users can however avoid using this method.
   public void readFields(DataInput in) throws IOException {
+    // ApplicationId could be cached in this case. All of this will change for Protobuf RPC.
     applicationId = ApplicationId.newInstance(in.readLong(), in.readInt());
     super.readFields(in);
   }
 
+  public static TezDAGID readTezDAGID(DataInput in) throws IOException {
+    long clusterId = in.readLong();
+    int appId = in.readInt();
+    int dagIdInt = TezID.readID(in);
+    TezDAGID dagID = getInstance(ApplicationId.newInstance(clusterId, appId), dagIdInt);
+    return dagID;
+  }
+  
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeLong(applicationId.getClusterTimestamp());
@@ -135,7 +176,7 @@ public class TezDAGID extends TezID {
       int appId = tezAppIdFormat.get().parse(split[2]).intValue();
       int id;
       id = tezDagIdFormat.get().parse(split[3]).intValue();
-      return new TezDAGID(rmId, appId, id);
+      return TezDAGID.getInstance(rmId, appId, id);
     } catch (Exception e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
index 7fe5d8c..4cb37ac 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java
@@ -88,6 +88,10 @@ public abstract class TezID implements WritableComparable<TezID> {
   public void readFields(DataInput in) throws IOException {
     this.id = in.readInt();
   }
+  
+  public static int readID(DataInput in) throws IOException {
+    return in.readInt();
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
index 4b2f424..022d6a2 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java
@@ -26,6 +26,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tez.dag.records.TezTaskID;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 /**
  * TezTaskAttemptID represents the immutable and unique identifier for
  * a task attempt. Each task attempt is one particular instance of a Tez Task
@@ -47,8 +51,18 @@ public class TezTaskAttemptID extends TezID {
   public static final String ATTEMPT = "attempt";
   private TezTaskID taskId;
   
+  private static LoadingCache<TezTaskAttemptID, TezTaskAttemptID> taskAttemptIDCache = CacheBuilder.newBuilder().softValues().
+      build(
+          new CacheLoader<TezTaskAttemptID, TezTaskAttemptID>() {
+            @Override
+            public TezTaskAttemptID load(TezTaskAttemptID key) throws Exception {
+              return key;
+            }
+          }
+      );
+  
+  // Public for Writable serialization. Verify if this is actually required.
   public TezTaskAttemptID() {
-    taskId = new TezTaskID();
   }
   
   /**
@@ -56,7 +70,11 @@ public class TezTaskAttemptID extends TezID {
    * @param taskId TaskID that this task belongs to  
    * @param id the task attempt number
    */
-  public TezTaskAttemptID(TezTaskID taskId, int id) {
+  public static TezTaskAttemptID getInstance(TezTaskID taskID, int id) {
+    return taskAttemptIDCache.getUnchecked(new TezTaskAttemptID(taskID, id));
+  }
+
+  private TezTaskAttemptID(TezTaskID taskId, int id) {
     super(id);
     if(taskId == null) {
       throw new IllegalArgumentException("taskId cannot be null");
@@ -108,21 +126,22 @@ public class TezTaskAttemptID extends TezID {
   }
   
   @Override
+  // Can't do much about this instance if used via the RPC layer. Any downstream
+  // users can however avoid using this method.
   public void readFields(DataInput in) throws IOException {
-    taskId.readFields(in);
+    taskId = TezTaskID.readTezTaskID(in);
     super.readFields(in);
   }
+  
+  public static TezTaskAttemptID readTezTaskAttemptID(DataInput in) throws IOException {
+    TezTaskID taskID = TezTaskID.readTezTaskID(in);
+    int attemptIdInt = TezID.readID(in);
+    return getInstance(taskID, attemptIdInt);
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {
     taskId.write(out);
     super.write(out);
   }
-
-  // FIXME TEZ DAG needs to be removed
-  public static TezTaskAttemptID read(DataInput in) throws IOException {
-    TezTaskAttemptID tId = new TezTaskAttemptID();
-    tId.readFields(in);
-    return tId;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index 6b2fc2e..1223ec1 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tez.dag.records.TezVertexID;
 
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 
 /**
  * TaskID represents the immutable and unique identifier for
@@ -51,10 +56,20 @@ public class TezTaskID extends TezID {
     }
   };
 
+  private static LoadingCache<TezTaskID, TezTaskID> taskIDCache = CacheBuilder.newBuilder().softValues().
+      build(
+          new CacheLoader<TezTaskID, TezTaskID>() {
+            @Override
+            public TezTaskID load(TezTaskID key) throws Exception {
+              return key;
+            }
+          }
+      );
+  
   private TezVertexID vertexId;
 
+  // Public for Writable serialization. Verify if this is actually required.
   public TezTaskID() {
-    vertexId = new TezVertexID();
   }
 
   /**
@@ -63,12 +78,15 @@ public class TezTaskID extends TezID {
    * @param type the {@link TezTaskType} of the task
    * @param id the tip number
    */
-  public TezTaskID(TezVertexID vertexId, int id) {
+  public static TezTaskID getInstance(TezVertexID vertexID, int id) {
+    Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
+    return taskIDCache.getUnchecked(new TezTaskID(vertexID, id));
+  }
+
+  private TezTaskID(TezVertexID vertexID, int id) {
     super(id);
-    if(vertexId == null) {
-      throw new IllegalArgumentException("vertexId cannot be null");
-    }
-    this.vertexId = vertexId;
+    Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
+    this.vertexId = vertexID;
   }
 
   /** Returns the {@link TezVertexID} object that this task belongs to */
@@ -117,10 +135,18 @@ public class TezTaskID extends TezID {
   }
 
   @Override
+  // Can't do much about this instance if used via the RPC layer. Any downstream
+  // users can however avoid using this method.
   public void readFields(DataInput in) throws IOException {
-    vertexId.readFields(in);
+    vertexId = TezVertexID.readTezVertexID(in);
     super.readFields(in);
   }
+  
+  public static TezTaskID readTezTaskID(DataInput in) throws IOException {
+    TezVertexID vertexID = TezVertexID.readTezVertexID(in);
+    int taskIdInt = TezID.readID(in);
+    return getInstance(vertexID, taskIdInt);
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index e24f35e..66d022c 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -26,6 +26,11 @@ import java.text.NumberFormat;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 /**
  * TezVertexID represents the immutable and unique identifier for
  * a Vertex in a Tez DAG. Each TezVertexID encompasses multiple Tez Tasks.
@@ -52,8 +57,19 @@ public class TezVertexID extends TezID {
     }
   };
 
+  private static LoadingCache<TezVertexID, TezVertexID> vertexIDCache = CacheBuilder.newBuilder().softValues().
+      build(
+          new CacheLoader<TezVertexID, TezVertexID>() {
+            @Override
+            public TezVertexID load(TezVertexID key) throws Exception {
+              return key;
+            }
+          }
+      );
+  
   private TezDAGID dagId;
 
+  // Public for Writable serialization. Verify if this is actually required.
   public TezVertexID() {
   }
 
@@ -63,11 +79,13 @@ public class TezVertexID extends TezID {
    * @param type the {@link TezTaskType} of the task
    * @param id the tip number
    */
-  public TezVertexID(TezDAGID dagId, int id) {
+  public static TezVertexID getInstance(TezDAGID dagId, int id) {
+    Preconditions.checkArgument(dagId != null, "DagID cannot be null");
+    return vertexIDCache.getUnchecked(new TezVertexID(dagId, id));
+  }
+
+  private TezVertexID(TezDAGID dagId, int id) {
     super(id);
-    if(dagId == null) {
-      throw new IllegalArgumentException("dagId cannot be null");
-    }
     this.dagId = dagId;
   }
 
@@ -98,11 +116,18 @@ public class TezVertexID extends TezID {
   }
 
   @Override
+  // Can't do much about this instance if used via the RPC layer. Any downstream
+  // users can however avoid using this method.
   public void readFields(DataInput in) throws IOException {
-    dagId = new TezDAGID();
-    dagId.readFields(in);
+    dagId = TezDAGID.readTezDAGID(in);
     super.readFields(in);
   }
+  
+  public static TezVertexID readTezVertexID(DataInput in) throws IOException {
+    TezDAGID dagID = TezDAGID.readTezDAGID(in);
+    int vertexIdInt = TezID.readID(in);
+    return getInstance(dagID, vertexIdInt);
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java b/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
index a3d838f..dcf1309 100644
--- a/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
+++ b/tez-common/src/test/java/org/apache/tez/dag/records/TestTezIds.java
@@ -82,10 +82,10 @@ public class TestTezIds {
   @Test
   public void testIdStringify() {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
-    TezDAGID dagId = new TezDAGID(appId, 1);
-    TezVertexID vId = new TezVertexID(dagId, 35);
-    TezTaskID tId = new TezTaskID(vId, 389);
-    TezTaskAttemptID taId = new TezTaskAttemptID(tId, 2);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+    TezTaskID tId = TezTaskID.getInstance(vId, 389);
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, 2);
 
     String dagIdStr = dagId.toString();
     String vIdStr = vId.toString();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
index a4ce701..7d822b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
@@ -148,4 +148,4 @@ public class ContainerContext {
     }
     return true;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e5ade3d..b0c770e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -474,7 +474,7 @@ public class DAGAppMaster extends AbstractService {
 
   /** Create and initialize (but don't start) a single dag. */
   protected DAG createDAG(DAGPlan dagPB) {
-    TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(),
+    TezDAGID dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
         dagCounter.incrementAndGet());
 
     // Prepare the TaskAttemptListener server for authentication of Containers

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 0f0f4c7..0e480d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -161,7 +161,7 @@ public class Edge {
         }
         TezTaskID srcTaskId = srcTask.getTaskId();
         int taskAttemptIndex = event.getVersion();
-        TezTaskAttemptID srcTaskAttemptId = new TezTaskAttemptID(srcTaskId,
+        TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance(srcTaskId,
             taskAttemptIndex);
         eventHandler.handle(new TaskAttemptEventOutputFailed(srcTaskAttemptId,
             tezEvent, numConsumers));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
index d2d2c13..98872a7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputLeafOutputDescriptor.java
@@ -43,4 +43,4 @@ public class RootInputLeafOutputDescriptor<T extends TezEntityDescriptor> {
     return this.initializerClassName;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 0d1aabb..6ffe581 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -116,7 +116,7 @@ public class RootInputVertexManager implements VertexScheduler {
         Preconditions.checkState(managedVertex.getTasks().size() != 0);
         TezEvent tezEvent = new TezEvent(event, sourceInfo);
         tezEvent.setDestinationInfo(destInfoMap.get(inputName));
-        sendEventToTask(new TezTaskID(managedVertex.getVertexId(),
+        sendEventToTask(TezTaskID.getInstance(managedVertex.getVertexId(),
             ((RootInputDataInformationEvent) event).getIndex()), tezEvent);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 3e21b9d..d6e9d0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,7 +60,6 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -284,10 +284,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   TaskSpec createRemoteTaskSpec() {
     Vertex vertex = getVertex();
     ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
-    DAG dag = vertex.getDAG();
     int taskId = getTaskID().getId();
-    return new TaskSpec(getID(), dag.getUserName(),
-        vertex.getName(), procDesc, vertex.getInputSpecList(taskId),
+    return new TaskSpec(getID(), vertex.getName(), procDesc, vertex.getInputSpecList(taskId),
         vertex.getOutputSpecList(taskId));
   }
 
@@ -307,7 +305,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       //result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
       result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
       //result.setPhase(reportedStatus.phase);
-      //result.setStateString(reportedStatus.stateString);
+      //result.setStateString(reportedStatus.statef);
       result.setCounters(getCounters());
       result.setContainerId(this.getAssignedContainerID());
       result.setNodeManagerHost(trackerName);
@@ -920,16 +918,16 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       ta.containerId = event.getContainerId();
       ta.containerNodeId = container.getNodeId();
-      ta.nodeHttpAddress = container.getNodeHttpAddress();
-      ta.nodeRackName = RackResolver.resolve(ta.containerNodeId.getHost())
-          .getNetworkLocation();
+      ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
+      ta.nodeRackName = StringInterner.weakIntern(RackResolver.resolve(ta.containerNodeId.getHost())
+          .getNetworkLocation());
 
       ta.launchTime = ta.clock.getTime();
 
       // TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr = NetUtils
           .createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
-      ta.trackerName = nodeHttpInetAddr.getHostName();
+      ta.trackerName = StringInterner.weakIntern(nodeHttpInetAddr.getHostName());
       ta.httpPort = nodeHttpInetAddr.getPort();
       ta.sendEvent(createJobCounterUpdateEventTALaunched(ta));
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 76c36c5..30e6e2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -310,7 +310,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // TODO Avoid reading this from configuration for each task.
     maxAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS,
                               TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT);
-    taskId = new TezTaskID(vertexId, taskIndex);
+    taskId = TezTaskID.getInstance(vertexId, taskIndex);
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 179ed1d..f068617 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.MRVertexOutputCommitter;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -479,7 +480,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       AppContext appContext, VertexLocationHint vertexLocationHint) {
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
-    this.vertexName = vertexName;
+    this.vertexName = StringInterner.weakIntern(vertexName);
     this.conf = conf;
     //this.metrics = metrics;
     this.clock = clock;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
index 4e98dcc..aa689e3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerContextMatcher.java
@@ -53,4 +53,4 @@ public class ContainerContextMatcher implements ContainerSignatureMatcher {
 
     return context1.isExactMatch(context2);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 67966b1..ab038fd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -1699,4 +1699,4 @@ public class TaskScheduler extends AbstractService
           + (firstContainerSignature != null? firstContainerSignature.toString():"null");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
index a734bb8..b7e6f72 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
@@ -30,11 +30,11 @@ import org.apache.tez.dag.records.TezVertexID;
 public class TezBuilderUtils {
 
   public static TezVertexID newVertexID(TezDAGID dagId, int vertexId) {
-    return new TezVertexID(dagId, vertexId);
+    return TezVertexID.getInstance(dagId, vertexId);
   }
 
   public static TezTaskAttemptID newTaskAttemptId(TezTaskID taskId, int id) {
-    return new TezTaskAttemptID(taskId, id);
+    return TezTaskAttemptID.getInstance(taskId, id);
   }
   
   public static DAGReport newDAGReport() {
@@ -48,7 +48,7 @@ public class TezBuilderUtils {
   }
 
   public static TezTaskID newTaskId(TezDAGID dagId, int vertexId, int taskId) {
-    return new TezTaskID(newVertexID(dagId, vertexId), taskId);
+    return TezTaskID.getInstance(newVertexID(dagId, vertexId), taskId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index b0ad929..49a3307 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -490,7 +490,7 @@ public class TestDAGImpl {
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);
-    dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
+    dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
     Assert.assertNotNull(dagId);
     dagPlan = createTestDAGPlan();
     dispatcher = new DrainDispatcher();
@@ -504,7 +504,7 @@ public class TestDAGImpl {
         jobTokenSecretManager, fsTokens, clock, "user", thh, appContext);
     doReturn(dag).when(appContext).getCurrentDAG();
     mrrAppContext = mock(AppContext.class);
-    mrrDagId = new TezDAGID(appAttemptId.getApplicationId(), 2);
+    mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
     mrrDagPlan = createTestMRRDAGPlan();
     mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
         dispatcher.getEventHandler(),  taskAttemptListener,
@@ -562,7 +562,7 @@ public class TestDAGImpl {
     dispatcher.await();
 
     for (int i = 0 ; i < 6; ++i ) {
-      TezVertexID vId = new TezVertexID(dagId, i);
+      TezVertexID vId = TezVertexID.getInstance(dagId, i);
       Vertex v = dag.getVertex(vId);
       Assert.assertEquals(VertexState.RUNNING, v.getState());
       if (i < 2) {
@@ -577,7 +577,7 @@ public class TestDAGImpl {
     }
 
     for (int i = 0 ; i < 6; ++i ) {
-      TezVertexID vId = new TezVertexID(dagId, i);
+      TezVertexID vId = TezVertexID.getInstance(dagId, i);
       LOG.info("Distance from root: v" + i + ":"
           + dag.getVertex(vId).getDistanceFromRoot());
     }
@@ -590,12 +590,12 @@ public class TestDAGImpl {
     startDAG(dag);
     dispatcher.await();
 
-    TezVertexID vId = new TezVertexID(dagId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 1);
     Vertex v = dag.getVertex(vId);
     dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
-        new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+        TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
-        new TezTaskID(vId, 1), TaskState.SUCCEEDED));
+        TezTaskID.getInstance(vId, 1), TaskState.SUCCEEDED));
     dispatcher.await();
 
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
@@ -610,12 +610,12 @@ public class TestDAGImpl {
     startDAG(dag);
     dispatcher.await();
 
-    TezVertexID vId = new TezVertexID(dagId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 1);
     Vertex v = dag.getVertex(vId);
     dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
-        new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+        TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
-        new TezTaskID(vId, 1), TaskState.SUCCEEDED));
+        TezTaskID.getInstance(vId, 1), TaskState.SUCCEEDED));
     dispatcher.await();
 
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
@@ -624,14 +624,14 @@ public class TestDAGImpl {
     verify(dag.dagScheduler, times(1)).vertexCompleted(v);
     
     dispatcher.getEventHandler().handle(
-        new VertexEventTaskReschedule(new TezTaskID(vId, 0)));
+        new VertexEventTaskReschedule(TezTaskID.getInstance(vId, 0)));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(0, dag.getSuccessfulVertices());
     Assert.assertEquals(0, dag.numCompletedVertices);
     
     dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
-        new TezTaskID(vId, 0), TaskState.SUCCEEDED));
+        TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
           dispatcher.await();
 
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
@@ -655,7 +655,7 @@ public class TestDAGImpl {
 
     Assert.assertEquals(DAGState.KILLED, dag.getState());
     for (int i = 0 ; i < 6; ++i ) {
-      TezVertexID vId = new TezVertexID(dagId, i);
+      TezVertexID vId = TezVertexID.getInstance(dagId, i);
       Vertex v = dag.getVertex(vId);
       Assert.assertEquals(VertexState.KILLED, v.getState());
     }
@@ -669,14 +669,14 @@ public class TestDAGImpl {
     startDAG(dag);
     dispatcher.await();
 
-    TezVertexID vId1 = new TezVertexID(dagId, 1);
+    TezVertexID vId1 = TezVertexID.getInstance(dagId, 1);
     Vertex v1 = dag.getVertex(vId1);
     ((EventHandler<VertexEvent>) v1).handle(new VertexEventTaskCompleted(
-        new TezTaskID(vId1, 0), TaskState.SUCCEEDED));
-    TezVertexID vId0 = new TezVertexID(dagId, 0);
+        TezTaskID.getInstance(vId1, 0), TaskState.SUCCEEDED));
+    TezVertexID vId0 = TezVertexID.getInstance(dagId, 0);
     Vertex v0 = dag.getVertex(vId0);
     ((EventHandler<VertexEvent>) v0).handle(new VertexEventTaskCompleted(
-        new TezTaskID(vId0, 0), TaskState.SUCCEEDED));
+        TezTaskID.getInstance(vId0, 0), TaskState.SUCCEEDED));
     dispatcher.await();
 
     Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
@@ -689,7 +689,7 @@ public class TestDAGImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
     Assert.assertEquals(VertexState.TERMINATING, v1.getState());
     for (int i = 2 ; i < 6; ++i ) {
-      TezVertexID vId = new TezVertexID(dagId, i);
+      TezVertexID vId = TezVertexID.getInstance(dagId, i);
       Vertex v = dag.getVertex(vId);
       Assert.assertEquals(VertexState.KILLED, v.getState());
     }
@@ -715,22 +715,22 @@ public class TestDAGImpl {
 
     for (int i = 0; i < 6; ++i) {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-          new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+          TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
     }
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
     Assert.assertEquals(1, dag.getSuccessfulVertices());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 2), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 2), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 3), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 3), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 4), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 4), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 5), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 5), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.SUCCEEDED, dag.getState());
     Assert.assertEquals(6, dag.getSuccessfulVertices());
@@ -744,21 +744,21 @@ public class TestDAGImpl {
     dispatcher.await();
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 2), VertexState.FAILED));
+        TezVertexID.getInstance(dagId, 2), VertexState.FAILED));
     dispatcher.await();
     Assert.assertEquals(DAGState.FAILED, dag.getState());
     Assert.assertEquals(2, dag.getSuccessfulVertices());
 
     // Expect running vertices to be killed on first failure
     for (int i = 3; i < 6; ++i) {
-      TezVertexID vId = new TezVertexID(dagId, i);
+      TezVertexID vId = TezVertexID.getInstance(dagId, i);
       Vertex v = dag.getVertex(vId);
       Assert.assertEquals(VertexState.KILLED, v.getState());
     }
@@ -779,16 +779,16 @@ public class TestDAGImpl {
     dispatcher.await();
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
     for (int i = 2; i < 6; ++i) {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-          new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+          TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());
@@ -810,27 +810,27 @@ public class TestDAGImpl {
     dispatcher.await();
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 0), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, dag.getState());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 1), VertexState.SUCCEEDED));
+        TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
     dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
 
     for (int i = 2; i < 5; ++i) {
       dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-          new TezVertexID(dagId, i), VertexState.SUCCEEDED));
+          TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
     }
     dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());
 
     dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
-        new TezVertexID(dagId, 5), VertexState.KILLED));
+        TezVertexID.getInstance(dagId, 5), VertexState.KILLED));
     dispatcher.await();
     Assert.assertEquals(DAGState.KILLED, dag.getState());
     Assert.assertEquals(5, dag.getSuccessfulVertices());
-    Assert.assertEquals(dag.getVertex(new TezVertexID(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
+    Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index 8a67977..391dc0c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -78,13 +78,13 @@ public class TestDAGScheduler {
   @Test(timeout=10000)
   public void testDAGSchedulerMRR() {
     DAG mockDag = mock(DAG.class);
-    TezDAGID dagId = new TezDAGID("1", 1, 1);
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
     
     TaskSchedulerEventHandler mockTaskScheduler = 
         mock(TaskSchedulerEventHandler.class);
 
     Vertex mockVertex1 = mock(Vertex.class);
-    TezVertexID mockVertexId1 = new TezVertexID(dagId, 1);
+    TezVertexID mockVertexId1 = TezVertexID.getInstance(dagId, 1);
     when(mockVertex1.getVertexId()).thenReturn(mockVertexId1);
     when(mockVertex1.getDistanceFromRoot()).thenReturn(0);
     TaskAttempt mockAttempt1 = mock(TaskAttempt.class);
@@ -93,7 +93,7 @@ public class TestDAGScheduler {
     when(mockDag.getVertex(mockVertexId1)).thenReturn(mockVertex1);
     
     Vertex mockVertex2 = mock(Vertex.class);
-    TezVertexID mockVertexId2 = new TezVertexID(dagId, 2);
+    TezVertexID mockVertexId2 = TezVertexID.getInstance(dagId, 2);
     when(mockVertex2.getVertexId()).thenReturn(mockVertexId2);
     when(mockVertex2.getDistanceFromRoot()).thenReturn(1);
     TaskAttempt mockAttempt2 = mock(TaskAttempt.class);
@@ -105,7 +105,7 @@ public class TestDAGScheduler {
     when(mockAttempt2f.getIsRescheduled()).thenReturn(true);
     
     Vertex mockVertex3 = mock(Vertex.class);
-    TezVertexID mockVertexId3 = new TezVertexID(dagId, 3);
+    TezVertexID mockVertexId3 = TezVertexID.getInstance(dagId, 3);
     when(mockVertex3.getVertexId()).thenReturn(mockVertexId3);
     when(mockVertex3.getDistanceFromRoot()).thenReturn(2);
     TaskAttempt mockAttempt3 = mock(TaskAttempt.class);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6b8f544..dcf9aef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -129,8 +129,8 @@ public class TestTaskAttempt {
     hosts.add("host3");
     TaskLocationHint locationHint = new TaskLocationHint(hosts, null);
 
-    TezTaskID taskID = new TezTaskID(
-        new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
+    TezTaskID taskID = TezTaskID.getInstance(
+        TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
@@ -172,8 +172,8 @@ public class TestTaskAttempt {
     TaskLocationHint locationHint = new TaskLocationHint(
         new TreeSet<String>(Arrays.asList(hosts)), null);
 
-    TezTaskID taskID = new TezTaskID(
-        new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
+    TezTaskID taskID = TezTaskID.getInstance(
+        TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(),
         new SystemClock(), mock(TaskHeartbeatHandler.class),
@@ -300,10 +300,10 @@ public class TestTaskAttempt {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
         appId, 0);
-    TezDAGID dagID = new TezDAGID(appId, 1);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
-    TezTaskID taskID = new TezTaskID(vertexID, 1);
-    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
 
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -350,10 +350,10 @@ public class TestTaskAttempt {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
         appId, 0);
-    TezDAGID dagID = new TezDAGID(appId, 1);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
-    TezTaskID taskID = new TezTaskID(vertexID, 1);
-    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -441,10 +441,10 @@ public class TestTaskAttempt {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
         appId, 0);
-    TezDAGID dagID = new TezDAGID(appId, 1);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
-    TezTaskID taskID = new TezTaskID(vertexID, 1);
-    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
 
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -503,10 +503,10 @@ public class TestTaskAttempt {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
         appId, 0);
-    TezDAGID dagID = new TezDAGID(appId, 1);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
-    TezTaskID taskID = new TezTaskID(vertexID, 1);
-    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
@@ -591,10 +591,10 @@ public class TestTaskAttempt {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
         appId, 0);
-    TezDAGID dagID = new TezDAGID(appId, 1);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
-    TezTaskID taskID = new TezTaskID(vertexID, 1);
-    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
 
     MockEventHandler mockEh = new MockEventHandler();
     MockEventHandler eventHandler = spy(mockEh);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 5bdc7d7..4cdc2fd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -115,8 +115,8 @@ public class TestTaskImpl {
     locationHint = new TaskLocationHint(null, null);
 
     appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    dagId = new TezDAGID(appId, 1);
-    vertexId = new TezVertexID(dagId, 1);
+    dagId = TezDAGID.getInstance(appId, 1);
+    vertexId = TezVertexID.getInstance(dagId, 1);
     appContext = mock(AppContext.class);
     taskResource = Resource.newInstance(1024, 1);
     localResources = new HashMap<String, LocalResource>();
@@ -134,7 +134,7 @@ public class TestTaskImpl {
   }
 
   private TezTaskID getNewTaskID() {
-    TezTaskID taskID = new TezTaskID(vertexId, ++taskCounter);
+    TezTaskID taskID = TezTaskID.getInstance(vertexId, ++taskCounter);
     return taskID;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index abb0d53..0b8fde0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -689,7 +689,7 @@ public class TestVertexImpl {
     for (int i = 0; i < vCnt; ++i) {
       VertexPlan vPlan = dagPlan.getVertex(i);
       String vName = vPlan.getName();
-      TezVertexID vertexId = new TezVertexID(dagId, i+1);
+      TezVertexID vertexId = TezVertexID.getInstance(dagId, i+1);
       VertexImpl v;
       if (useCustomInitializer) {
         v = new VertexImplWithCustomInitializer(vertexId, vPlan, vPlan.getName(), conf,
@@ -753,7 +753,7 @@ public class TestVertexImpl {
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);
-    dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
+    dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
   }
 
   public void setupPostDagCreation() {
@@ -950,8 +950,8 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -975,8 +975,8 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1003,7 +1003,7 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.FAILED));
@@ -1052,13 +1052,13 @@ public class TestVertexImpl {
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
-            new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+            TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
-            new TezTaskID(v.getVertexId(), 1), TaskState.KILLED));
+            TezTaskID.getInstance(v.getVertexId(), 1), TaskState.KILLED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
   }
@@ -1078,13 +1078,13 @@ public class TestVertexImpl {
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
-            new TezTaskID(v.getVertexId(), 0), TaskState.SUCCEEDED));
+            TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(
-            new TezTaskID(v.getVertexId(), 1), TaskState.SUCCEEDED));
+            TezTaskID.getInstance(v.getVertexId(), 1), TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.KILLED, v.getState());
   }
@@ -1097,8 +1097,8 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.FAILED));
@@ -1144,8 +1144,8 @@ public class TestVertexImpl {
     v.setVertexOutputCommitter(committer);
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1210,17 +1210,17 @@ public class TestVertexImpl {
     LOG.info("Verifying v6 state " + v6.getState());
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
 
-    TezTaskID t1_v4 = new TezTaskID(v4.getVertexId(), 0);
-    TezTaskID t2_v4 = new TezTaskID(v4.getVertexId(), 1);
-    TezTaskID t1_v5 = new TezTaskID(v5.getVertexId(), 0);
-    TezTaskID t2_v5 = new TezTaskID(v5.getVertexId(), 1);
+    TezTaskID t1_v4 = TezTaskID.getInstance(v4.getVertexId(), 0);
+    TezTaskID t2_v4 = TezTaskID.getInstance(v4.getVertexId(), 1);
+    TezTaskID t1_v5 = TezTaskID.getInstance(v5.getVertexId(), 0);
+    TezTaskID t2_v5 = TezTaskID.getInstance(v5.getVertexId(), 1);
 
-    TezTaskAttemptID ta1_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
-    TezTaskAttemptID ta2_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
-    TezTaskAttemptID ta1_t2_v4 = new TezTaskAttemptID(t2_v4, 0);
-    TezTaskAttemptID ta1_t1_v5 = new TezTaskAttemptID(t1_v5, 0);
-    TezTaskAttemptID ta1_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
-    TezTaskAttemptID ta2_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
+    TezTaskAttemptID ta1_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0);
+    TezTaskAttemptID ta2_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0);
+    TezTaskAttemptID ta1_t2_v4 = TezTaskAttemptID.getInstance(t2_v4, 0);
+    TezTaskAttemptID ta1_t1_v5 = TezTaskAttemptID.getInstance(t1_v5, 0);
+    TezTaskAttemptID ta1_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0);
+    TezTaskAttemptID ta2_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0);
 
     v4.handle(new VertexEventTaskAttemptCompleted(ta1_t1_v4, TaskAttemptStateInternal.FAILED));
     v4.handle(new VertexEventTaskAttemptCompleted(ta2_t1_v4, TaskAttemptStateInternal.SUCCEEDED));
@@ -1251,8 +1251,8 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1279,8 +1279,8 @@ public class TestVertexImpl {
 
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1314,8 +1314,8 @@ public class TestVertexImpl {
 
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1357,8 +1357,8 @@ public class TestVertexImpl {
 
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1393,8 +1393,8 @@ public class TestVertexImpl {
 
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1434,8 +1434,8 @@ public class TestVertexImpl {
 
     startVertex(v);
 
-    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
-    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
@@ -1596,9 +1596,9 @@ public class TestVertexImpl {
   public void testVertexWithNoTasks() {
     TezVertexID vId = null;
     try {
-      TezDAGID invalidDagId = new TezDAGID(
+      TezDAGID invalidDagId = TezDAGID.getInstance(
           dagId.getApplicationId(), 1000);
-      vId = new TezVertexID(invalidDagId, 1);
+      vId = TezVertexID.getInstance(invalidDagId, 1);
       VertexPlan vPlan = invalidDagPlan.getVertex(0);
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener, fsTokens,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index f506c99..6c0ee10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -60,11 +60,11 @@ public class TestVertexScheduler {
     conf.setLong(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L);
     ShuffleVertexManager scheduler = null;
     EventHandler mockEventHandler = mock(EventHandler.class);
-    TezDAGID dagId = new TezDAGID("1", 1, 1);
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
     HashMap<Vertex, Edge> mockInputVertices = 
         new HashMap<Vertex, Edge>();
     Vertex mockSrcVertex1 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
+    TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
     EdgeProperty eProp1 = new EdgeProperty(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
@@ -73,7 +73,7 @@ public class TestVertexScheduler {
         new InputDescriptor("in"));
     when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
     Vertex mockSrcVertex2 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2);
+    TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
     EdgeProperty eProp2 = new EdgeProperty(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
@@ -82,7 +82,7 @@ public class TestVertexScheduler {
         new InputDescriptor("in"));
     when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
     Vertex mockSrcVertex3 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3);
+    TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
     EdgeProperty eProp3 = new EdgeProperty(
         EdgeProperty.DataMovementType.BROADCAST,
         EdgeProperty.DataSourceType.PERSISTED, 
@@ -92,7 +92,7 @@ public class TestVertexScheduler {
     when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
     
     Vertex mockManagedVertex = mock(Vertex.class);
-    TezVertexID mockManagedVertexId = new TezVertexID(dagId, 4);
+    TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 4);
     when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
     when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
     
@@ -107,13 +107,13 @@ public class TestVertexScheduler {
     Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId2));
     
     final HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
-    final TezTaskID mockTaskId1 = new TezTaskID(mockManagedVertexId, 0);
+    final TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
     managedTasks.put(mockTaskId1, null);
-    final TezTaskID mockTaskId2 = new TezTaskID(mockManagedVertexId, 1);
+    final TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
     managedTasks.put(mockTaskId2, null);
-    final TezTaskID mockTaskId3 = new TezTaskID(mockManagedVertexId, 2);
+    final TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
     managedTasks.put(mockTaskId3, null);
-    final TezTaskID mockTaskId4 = new TezTaskID(mockManagedVertexId, 3);
+    final TezTaskID mockTaskId4 = TezTaskID.getInstance(mockManagedVertexId, 3);
     managedTasks.put(mockTaskId4, null);
     
     when(mockManagedVertex.getTotalTasks()).thenReturn(managedTasks.size());
@@ -151,13 +151,13 @@ public class TestVertexScheduler {
     when(mockSrcVertex2.getTotalTasks()).thenReturn(2);
 
     TezTaskAttemptID mockSrcAttemptId11 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 0), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
     TezTaskAttemptID mockSrcAttemptId12 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 1), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
     TezTaskAttemptID mockSrcAttemptId21 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 0), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
     TezTaskAttemptID mockSrcAttemptId31 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
 
     byte[] payload =
         VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
@@ -231,11 +231,11 @@ public class TestVertexScheduler {
     Configuration conf = new Configuration();
     ShuffleVertexManager scheduler = null;
     EventHandler mockEventHandler = mock(EventHandler.class);
-    TezDAGID dagId = new TezDAGID("1", 1, 1);
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
     HashMap<Vertex, Edge> mockInputVertices = 
         new HashMap<Vertex, Edge>();
     Vertex mockSrcVertex1 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
+    TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
     EdgeProperty eProp1 = new EdgeProperty(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED, 
@@ -244,7 +244,7 @@ public class TestVertexScheduler {
         new InputDescriptor("in"));
     when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
     Vertex mockSrcVertex2 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2);
+    TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
     EdgeProperty eProp2 = new EdgeProperty(
         EdgeProperty.DataMovementType.SCATTER_GATHER,
         EdgeProperty.DataSourceType.PERSISTED,
@@ -253,7 +253,7 @@ public class TestVertexScheduler {
         new InputDescriptor("in"));
     when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
     Vertex mockSrcVertex3 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3);
+    TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
     EdgeProperty eProp3 = new EdgeProperty(
         EdgeProperty.DataMovementType.BROADCAST,
         EdgeProperty.DataSourceType.PERSISTED, 
@@ -263,7 +263,7 @@ public class TestVertexScheduler {
     when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
     
     Vertex mockManagedVertex = mock(Vertex.class);
-    TezVertexID mockManagedVertexId = new TezVertexID(dagId, 3);
+    TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 3);
     when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
     when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
     
@@ -287,11 +287,11 @@ public class TestVertexScheduler {
     Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId2));
     
     HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
-    TezTaskID mockTaskId1 = new TezTaskID(mockManagedVertexId, 0);
+    TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
     managedTasks.put(mockTaskId1, null);
-    TezTaskID mockTaskId2 = new TezTaskID(mockManagedVertexId, 1);
+    TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
     managedTasks.put(mockTaskId2, null);
-    TezTaskID mockTaskId3 = new TezTaskID(mockManagedVertexId, 2);
+    TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
     managedTasks.put(mockTaskId3, null);
     
     when(mockManagedVertex.getTotalTasks()).thenReturn(3);
@@ -344,15 +344,15 @@ public class TestVertexScheduler {
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
 
     TezTaskAttemptID mockSrcAttemptId11 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 0), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
     TezTaskAttemptID mockSrcAttemptId12 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 1), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
     TezTaskAttemptID mockSrcAttemptId21 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 0), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
     TezTaskAttemptID mockSrcAttemptId22 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 1), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 1), 0);
     TezTaskAttemptID mockSrcAttemptId31 = 
-        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
     
     // min, max > 0 and min == max
     scheduler = createScheduler(conf, mockManagedVertex, 0.25f, 0.25f);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 4e30763..54fcde2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -101,8 +101,8 @@ public class TestContainerReuse {
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
-    TezDAGID dagID = new TezDAGID("0", 0, 0);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -152,12 +152,12 @@ public class TestContainerReuse {
 
     String [] defaultRack = {"/default-rack"};
 
-    TezTaskAttemptID taID11 = new TezTaskAttemptID(
-      new TezTaskID(vertexID, 1), 1);
-    TezTaskAttemptID taID21 = new TezTaskAttemptID(
-      new TezTaskID(vertexID, 2), 1);
-    TezTaskAttemptID taID31 = new TezTaskAttemptID(
-      new TezTaskID(vertexID, 3), 1);
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(
+      TezTaskID.getInstance(vertexID, 1), 1);
+    TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(
+      TezTaskID.getInstance(vertexID, 2), 1);
+    TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance(
+      TezTaskID.getInstance(vertexID, 3), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
     TaskAttempt ta21 = mock(TaskAttempt.class);
     TaskAttempt ta31 = mock(TaskAttempt.class);
@@ -238,8 +238,8 @@ public class TestContainerReuse {
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
-    TezDAGID dagID = new TezDAGID("0", 0, 0);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -288,9 +288,9 @@ public class TestContainerReuse {
 
     String [] defaultRack = {"/default-rack"};
 
-    TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
-    TezTaskAttemptID taID21 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
-    TezTaskAttemptID taID31 = new TezTaskAttemptID(new TezTaskID(vertexID, 3), 1);
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 1), 1);
+    TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 2), 1);
+    TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 3), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
     TaskAttempt ta21 = mock(TaskAttempt.class);
     TaskAttempt ta31 = mock(TaskAttempt.class);
@@ -345,7 +345,7 @@ public class TestContainerReuse {
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
-    TezDAGID dagID = new TezDAGID("0", 0, 0);
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
@@ -383,25 +383,25 @@ public class TestContainerReuse {
     String []racks = {"/default-rack"};
     Priority priority1 = Priority.newInstance(1);
 
-    TezVertexID vertexID1 = new TezVertexID(dagID, 1);
+    TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
 
     //Vertex 1, Task 1, Attempt 1, host1
-    TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID1, 1), 1);
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1, host1, racks, priority1);
 
     //Vertex 1, Task 2, Attempt 1, host1
-    TezTaskAttemptID taID12 = new TezTaskAttemptID(new TezTaskID(vertexID1, 2), 1);
+    TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1);
     TaskAttempt ta12 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
 
     //Vertex 1, Task 3, Attempt 1, host2
-    TezTaskAttemptID taID13 = new TezTaskAttemptID(new TezTaskID(vertexID1, 3), 1);
+    TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
     TaskAttempt ta13 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent(taID13, ta13, resource1, host2, racks, priority1);
 
     //Vertex 1, Task 4, Attempt 1, host2
-    TezTaskAttemptID taID14 = new TezTaskAttemptID(new TezTaskID(vertexID1, 4), 1);
+    TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
     TaskAttempt ta14 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1);
 
@@ -482,7 +482,7 @@ public class TestContainerReuse {
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
-    TezDAGID dagID = new TezDAGID("0", 0, 0);
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -529,17 +529,17 @@ public class TestContainerReuse {
 
     Priority priority = Priority.newInstance(3);
 
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
 
     //Vertex 1, Task 1, Attempt 1, no locality information.
-    TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 1), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
     doReturn(vertexID).when(ta11).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
       taID11, ta11, resource1, emptyHosts, racks, priority);
 
     //Vertex1, Task2, Attempt 1,  no locality information.
-    TezTaskAttemptID taID12 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
+    TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 2), 1);
     TaskAttempt ta12 = mock(TaskAttempt.class);
     doReturn(vertexID).when(ta12).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(
@@ -604,7 +604,7 @@ public class TestContainerReuse {
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
 
     CapturingEventHandler eventHandler = new CapturingEventHandler();
-    TezDAGID dagID = new TezDAGID("0", 0, 0);
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
 
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     TezAMRMClientAsync<CookieContainerRequest> rmClient =
@@ -652,20 +652,20 @@ public class TestContainerReuse {
     Priority priority1 = Priority.newInstance(3);
     Priority priority2 = Priority.newInstance(4);
 
-    TezVertexID vertexID1 = new TezVertexID(dagID, 1);
-    TezVertexID vertexID2 = new TezVertexID(dagID, 2);
+    TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+    TezVertexID vertexID2 = TezVertexID.getInstance(dagID, 2);
 
     //Vertex 1, Task 1, Attempt 1, host1
-    TezTaskAttemptID taID11 = new TezTaskAttemptID(
-      new TezTaskID(vertexID1, 1), 1);
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(
+      TezTaskID.getInstance(vertexID1, 1), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
     doReturn(vertexID1).when(ta11).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
       taID11, ta11, resource1, host1, racks, priority1);
 
     //Vertex2, Task1, Attempt 1, host1
-    TezTaskAttemptID taID21 = new TezTaskAttemptID(
-      new TezTaskID(vertexID2, 1), 1);
+    TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(
+      TezTaskID.getInstance(vertexID2, 1), 1);
     TaskAttempt ta21 = mock(TaskAttempt.class);
     doReturn(vertexID2).when(ta21).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(
@@ -728,7 +728,7 @@ public class TestContainerReuse {
     String[] hosts, String[] racks, Priority priority,
     ContainerContext containerContext) {
     AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(
-      taID, capability, new TaskSpec(taID, "user", "vertexName",
+      taID, capability, new TaskSpec(taID, "vertexName",
       new ProcessorDescriptor("processorClassName"),
       Collections.singletonList(new InputSpec("vertexName",
         new InputDescriptor("inputClassName"), 1)),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3665f41/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fc89e82..9298cca 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -273,7 +273,7 @@ public class TestAMContainer {
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
@@ -313,7 +313,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
@@ -352,7 +352,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.LAUNCHING);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
@@ -579,7 +579,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
 
     wc.assignTaskAttempt(taID2);
 
@@ -656,7 +656,7 @@ public class TestAMContainer {
     wc.taskAttemptSucceeded(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(taID2);
@@ -702,7 +702,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
@@ -748,7 +748,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(taID2);
@@ -780,7 +780,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
 
-    TezTaskAttemptID taID2 = new TezTaskAttemptID(wc.taskID, 2);
+    TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(taID2);
@@ -854,10 +854,10 @@ public class TestAMContainer {
       when(appContext.getApplicationAttemptId()).thenReturn(appAttemptID);
       when(appContext.getApplicationID()).thenReturn(applicationID);
 
-      dagID = new TezDAGID(applicationID, 1);
-      vertexID = new TezVertexID(dagID, 1);
-      taskID = new TezTaskID(vertexID, 1);
-      taskAttemptID = new TezTaskAttemptID(taskID, 1);
+      dagID = TezDAGID.getInstance(applicationID, 1);
+      vertexID = TezVertexID.getInstance(dagID, 1);
+      taskID = TezTaskID.getInstance(vertexID, 1);
+      taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
 
       taskSpec = mock(TaskSpec.class);
       doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();


Mime
View raw message