tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [2/2] git commit: TEZ-1119. Support display of user payloads in Tez UI. (hitesh)
Date Wed, 16 Jul 2014 22:20:20 GMT
TEZ-1119. Support display of user payloads in Tez UI. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/36d08fe0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/36d08fe0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/36d08fe0

Branch: refs/heads/master
Commit: 36d08fe03b99ec154b9c91d158d7daecb84d1fa4
Parents: 171da56
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Jul 16 15:19:06 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Jul 16 15:19:06 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezCommonUtils.java   |  27 ++-
 .../apache/tez/dag/api/DagTypeConverters.java   |  22 ++
 .../tez/dag/api/EdgeManagerDescriptor.java      |  12 +
 .../org/apache/tez/dag/api/InputDescriptor.java |  14 +-
 .../apache/tez/dag/api/OutputDescriptor.java    |  14 +-
 .../apache/tez/dag/api/ProcessorDescriptor.java |  15 +-
 .../apache/tez/dag/api/TezEntityDescriptor.java |  21 ++
 .../dag/api/VertexManagerPluginDescriptor.java  |  12 +
 tez-api/src/main/proto/DAGApiRecords.proto      |   1 +
 .../apache/tez/common/TestTezCommonUtils.java   |  18 ++
 .../tez/dag/api/TestDagTypeConverters.java      |  51 +++++
 .../tez/dag/api/client/rpc/TestDAGClient.java   |  20 +-
 .../org/apache/tez/common/ReflectionUtils.java  | 115 ++++++++++
 .../org/apache/tez/common/RuntimeUtils.java     | 115 ----------
 .../java/org/apache/tez/common/TezUtils.java    |  18 --
 .../tez/dag/api/client/DAGClientHandler.java    |  18 ++
 .../app/dag/RootInputInitializerManager.java    |   4 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   4 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   4 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |   4 +-
 .../tez/dag/history/HistoryEventHandler.java    |   4 +-
 .../apache/tez/dag/history/utils/DAGUtils.java  | 223 +++++++------------
 .../tez/dag/utils/JavaProfilerOptions.java      |  18 ++
 .../tez/dag/utils/RelocalizationUtils.java      |   4 +-
 .../apache/tez/runtime/task/ErrorReporter.java  |  18 ++
 .../dag/api/client/TestDAGClientHandler.java    |  18 ++
 .../TestHistoryEventsProtoConversion.java       |   6 +-
 .../tez/dag/history/utils/TestDAGUtils.java     | 185 +++++++++++++++
 .../mapreduce/examples/OrderedWordCount.java    |  27 ++-
 .../split/TezGroupedSplitsInputFormat.java      |   7 +-
 .../split/TezGroupedSplitsInputFormat.java      |   7 +-
 .../ats/HistoryEventTimelineConversion.java     |  11 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |   8 +-
 .../common/resources/MemoryDistributor.java     |   8 +-
 .../apache/tez/runtime/TestReflectionUtils.java |  69 ++++++
 .../apache/tez/runtime/TestRuntimeUtils.java    |  69 ------
 .../shuffle/impl/ShuffleInputEventHandler.java  |   3 +-
 .../writers/UnorderedPartitionedKVWriter.java   |   3 +-
 .../library/output/OnFileSortedOutput.java      |   3 +-
 .../library/output/OnFileUnorderedKVOutput.java |   3 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |   3 +-
 .../vertexmanager/TestShuffleVertexManager.java |   4 +-
 .../TestUnorderedPartitionedKVWriter.java       |   5 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  |   3 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   4 +-
 45 files changed, 815 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index db434ae..c15324c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -19,7 +19,11 @@
 package org.apache.tez.common;
 
 import java.io.IOException;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -32,6 +36,8 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 
+import com.google.protobuf.ByteString;
+
 public class TezCommonUtils {
   public static final FsPermission TEZ_AM_DIR_PERMISSION = FsPermission
       .createImmutable((short) 0700); // rwx--------
@@ -211,8 +217,6 @@ public class TezCommonUtils {
    * 
    * @param recoveryPath
    *          TEZ recovery directory used for Tez internals
-   * @param conf
-   *          Tez configuration
    * @param attemptID
    *          Application Attempt Id
    * @return App attempt specific recovery path
@@ -283,4 +287,23 @@ public class TezCommonUtils {
   public static FSDataOutputStream createFileForAM(FileSystem fs, Path filePath) throws IOException {
     return FileSystem.create(fs, filePath, new FsPermission(TEZ_AM_FILE_PERMISSION));
   }
+
+  @Private
+  public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
+    ByteString.Output os = ByteString.newOutput();
+    DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
+        Deflater.BEST_COMPRESSION));
+    compressOs.write(inBytes);
+    compressOs.finish();
+    ByteString byteString = os.toByteString();
+    return byteString;
+  }
+
+  @Private
+  public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
+    InflaterInputStream in = new InflaterInputStream(byteString.newInput());
+    byte[] bytes = IOUtils.toByteArray(in);
+    return bytes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 87573f3..87592e7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
@@ -277,9 +278,30 @@ public class DagTypeConverters {
       builder
           .setUserPayload(ByteString.copyFrom(descriptor.getUserPayload()));
     }
+    if (descriptor.getHistoryText() != null) {
+      try {
+        builder.setHistoryText(TezCommonUtils.compressByteArrayToByteString(
+            descriptor.getHistoryText().getBytes()));
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
     return builder.build();
   }
 
+  public static String getHistoryTextFromProto(TezEntityDescriptorProto proto) {
+    if (!proto.hasHistoryText()) {
+      return null;
+    }
+    try {
+      return new String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+
+
   public static RootInputLeafOutputProto convertToDAGPlan(
       RootInputLeafOutput<? extends TezEntityDescriptor> descriptor) {
     RootInputLeafOutputProto.Builder builder = RootInputLeafOutputProto.newBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
index cb5e84f..194492f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
@@ -29,4 +29,16 @@ public class EdgeManagerDescriptor extends TezEntityDescriptor {
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public EdgeManagerDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index cc6948c..0d01262 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -33,7 +33,19 @@ public class InputDescriptor extends TezEntityDescriptor {
 
   @Override
   public InputDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    super.setUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public InputDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index a34d35c..e9cbe51 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -33,7 +33,19 @@ public class OutputDescriptor extends TezEntityDescriptor {
 
   @Override
   public OutputDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    super.setUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public OutputDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index a0e574d..4641d93 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -31,8 +31,21 @@ public class ProcessorDescriptor extends TezEntityDescriptor {
     super(processorClassName);
   }
 
+  @Override
   public ProcessorDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    super.setUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public ProcessorDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index 25788ff..1047b09 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -31,6 +31,7 @@ public abstract class TezEntityDescriptor implements Writable {
 
   protected TezUserPayload userPayload;
   private String className;
+  protected String historyText;
 
   @Private // for Writable
   public TezEntityDescriptor() {
@@ -44,11 +45,31 @@ public abstract class TezEntityDescriptor implements Writable {
     return (userPayload == null) ? null : userPayload.getPayload();
   }
 
+  /**
+   * Set user payload for this entity descriptor
+   * @param userPayload User Payload
+   * @return
+   */
   public TezEntityDescriptor setUserPayload(byte[] userPayload) {
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  public TezEntityDescriptor setHistoryText(String historyText) {
+    this.historyText = historyText;
+    return this;
+  }
+
+  @Private // Internal use only
+  public String getHistoryText() {
+    return this.historyText;
+  }
+
   public String getClassName() {
     return this.className;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
index 58980b5..3e72523 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
@@ -36,4 +36,16 @@ public class VertexManagerPluginDescriptor extends TezEntityDescriptor {
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public VertexManagerPluginDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index c7a317e..9fa9cf2 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -113,6 +113,7 @@ message PlanTaskConfiguration {
 message TezEntityDescriptorProto {
   optional string class_name = 1;
   optional bytes user_payload = 2;
+  optional bytes history_text = 3;
 }
 
 message RootInputLeafOutputProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
index 121c673..f52f69b 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.common;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
new file mode 100644
index 0000000..ed2d8bd
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.IOException;
+
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDagTypeConverters {
+
+  @Test
+  public void testTezEntityDescriptorSerialization() throws IOException {
+    byte[] payload = new String("Foobar").getBytes();
+    String historytext = "Bar123";
+    TezEntityDescriptor entityDescriptor =
+        new InputDescriptor("inputClazz").setUserPayload(payload)
+        .setHistoryText(historytext);
+    TezEntityDescriptorProto proto =
+        DagTypeConverters.convertToDAGPlan(entityDescriptor);
+    Assert.assertArrayEquals(payload, proto.getUserPayload().toByteArray());
+    Assert.assertTrue(proto.hasHistoryText());
+    Assert.assertNotEquals(historytext, proto.getHistoryText());
+    Assert.assertEquals(historytext, new String(
+        TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText())));
+
+    // Ensure that the history text is not deserialized
+    InputDescriptor inputDescriptor =
+        DagTypeConverters.convertInputDescriptorFromDAGPlan(proto);
+    Assert.assertNull(inputDescriptor.getHistoryText());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 43e9fa7..7deed48 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -1,5 +1,22 @@
-package org.apache.tez.dag.api.client.rpc;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
+package org.apache.tez.dag.api.client.rpc;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.*;
@@ -43,7 +60,6 @@ import org.mockito.internal.util.collections.Sets;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-
 public class TestDAGClient {
 
   private DAGClientRPCImpl dagClient;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java b/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
new file mode 100644
index 0000000..3b9ac81
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+@Private
+public class ReflectionUtils {
+  
+  private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+  @Private
+  public static Class<?> getClazz(String className) {
+    Class<?> clazz = CLAZZ_CACHE.get(className);
+    if (clazz == null) {
+      try {
+        clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
+      } catch (ClassNotFoundException e) {
+        throw new TezUncheckedException("Unable to load class: " + className, e);
+      }
+    }
+    return clazz;
+  }
+
+  private static <T> T getNewInstance(Class<T> clazz) {
+    T instance;
+    try {
+      instance = clazz.newInstance();
+    } catch (InstantiationException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    }
+    return instance;
+  }
+
+  @Private
+  public static <T> T createClazzInstance(String className) {
+    Class<?> clazz = getClazz(className);
+    @SuppressWarnings("unchecked")
+    T instance = (T) getNewInstance(clazz);
+    return instance;
+  }
+  
+  
+  @Private
+  public static synchronized void addResourcesToClasspath(List<URL> urls) {
+    ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
+        .currentThread().getContextClassLoader());
+    Thread.currentThread().setContextClassLoader(classLoader);
+  }
+  
+  
+  // Parameters for addResourcesToSystemClassLoader
+  private static final Class<?>[] parameters = new Class[]{URL.class};
+  private static Method sysClassLoaderMethod = null;
+
+
+  @Private
+  public static synchronized void addResourcesToSystemClassLoader(List<URL> urls) {
+    URLClassLoader sysLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
+    if (sysClassLoaderMethod == null) {
+      Class<?> sysClass = URLClassLoader.class;
+      Method method;
+      try {
+        method = sysClass.getDeclaredMethod("addURL", parameters);
+      } catch (SecurityException e) {
+        throw new TezUncheckedException("Failed to get handle on method addURL", e);
+      } catch (NoSuchMethodException e) {
+        throw new TezUncheckedException("Failed to get handle on method addURL", e);
+      }
+      method.setAccessible(true);
+      sysClassLoaderMethod = method;
+    }
+    for (URL url : urls) {
+      try {
+        sysClassLoaderMethod.invoke(sysLoader, new Object[] { url });
+      } catch (IllegalArgumentException e) {
+        throw new TezUncheckedException("Failed to invoke addURL for rsrc: " + url, e);
+      } catch (IllegalAccessException e) {
+        throw new TezUncheckedException("Failed to invoke addURL for rsrc: " + url, e);
+      } catch (InvocationTargetException e) {
+        throw new TezUncheckedException("Failed to invoke addURL for rsrc: " + url, e);
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java b/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
deleted file mode 100644
index d32f3cd..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-@Private
-public class RuntimeUtils {
-  
-  private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
-
-  @Private
-  public static Class<?> getClazz(String className) {
-    Class<?> clazz = CLAZZ_CACHE.get(className);
-    if (clazz == null) {
-      try {
-        clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
-      } catch (ClassNotFoundException e) {
-        throw new TezUncheckedException("Unable to load class: " + className, e);
-      }
-    }
-    return clazz;
-  }
-
-  private static <T> T getNewInstance(Class<T> clazz) {
-    T instance;
-    try {
-      instance = clazz.newInstance();
-    } catch (InstantiationException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
-    } catch (IllegalAccessException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
-    }
-    return instance;
-  }
-
-  @Private
-  public static <T> T createClazzInstance(String className) {
-    Class<?> clazz = getClazz(className);
-    @SuppressWarnings("unchecked")
-    T instance = (T) getNewInstance(clazz);
-    return instance;
-  }
-  
-  
-  @Private
-  public static synchronized void addResourcesToClasspath(List<URL> urls) {
-    ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
-        .currentThread().getContextClassLoader());
-    Thread.currentThread().setContextClassLoader(classLoader);
-  }
-  
-  
-  // Parameters for addResourcesToSystemClassLoader
-  private static final Class<?>[] parameters = new Class[]{URL.class};
-  private static Method sysClassLoaderMethod = null;
-
-
-  @Private
-  public static synchronized void addResourcesToSystemClassLoader(List<URL> urls) {
-    URLClassLoader sysLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
-    if (sysClassLoaderMethod == null) {
-      Class<?> sysClass = URLClassLoader.class;
-      Method method;
-      try {
-        method = sysClass.getDeclaredMethod("addURL", parameters);
-      } catch (SecurityException e) {
-        throw new TezUncheckedException("Failed to get handle on method addURL", e);
-      } catch (NoSuchMethodException e) {
-        throw new TezUncheckedException("Failed to get handle on method addURL", e);
-      }
-      method.setAccessible(true);
-      sysClassLoaderMethod = method;
-    }
-    for (URL url : urls) {
-      try {
-        sysClassLoaderMethod.invoke(sysLoader, new Object[] { url });
-      } catch (IllegalArgumentException e) {
-        throw new TezUncheckedException("Failed to invoke addURL for rsrc: " + url, e);
-      } catch (IllegalAccessException e) {
-        throw new TezUncheckedException("Failed to invoke addURL for rsrc: " + url, e);
-      } catch (InvocationTargetException e) {
-        throw new TezUncheckedException("Failed to invoke addURL for rsrc: " + url, e);
-      }
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index cfa3413..f73f223 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -229,24 +229,6 @@ public class TezUtils {
     return output;
   }
 
-  @Private
-  public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
-    ByteString.Output os = ByteString.newOutput();
-    DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
-        Deflater.BEST_COMPRESSION));
-    compressOs.write(inBytes);
-    compressOs.finish();
-    ByteString byteString = os.toByteString();
-    return byteString;
-  }
-  
-  @Private
-  public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
-    InflaterInputStream in = new InflaterInputStream(byteString.newInput());
-    byte[] bytes = IOUtils.toByteArray(in);
-    return bytes;
-  }
-
   private static final Pattern pattern = Pattern.compile("\\W");
   @Private
   public static final int MAX_VERTEX_NAME_LENGTH = 40;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index cbd48fa..6dca990 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.dag.api.client;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index c4a1085..3d20881 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -33,7 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
@@ -101,7 +101,7 @@ public class RootInputInitializerManager {
     String className = input.getInitializerClassName();
     @SuppressWarnings("unchecked")
     Class<? extends TezRootInputInitializer> clazz =
-        (Class<? extends TezRootInputInitializer>) RuntimeUtils
+        (Class<? extends TezRootInputInitializer>) ReflectionUtils
             .getClazz(className);
     TezRootInputInitializer initializer = null;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index a6004cf..e2a9a27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManager;
@@ -125,7 +125,7 @@ public class Edge {
       case CUSTOM:
         if (edgeProperty.getEdgeManagerDescriptor() != null) {
           String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
-          edgeManager = RuntimeUtils.createClazzInstance(edgeManagerClassName);
+          edgeManager = ReflectionUtils.createClazzInstance(edgeManagerClassName);
         }
         break;
       default:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a68093a..3162d6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -1657,7 +1657,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() throws Exception {
-            OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
+            OutputCommitter outputCommitter = ReflectionUtils.createClazzInstance(
                 od.getInitializerClassName());
             OutputCommitterContext outputCommitterContext =
                 new OutputCommitterContextImpl(appContext.getApplicationID(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 35c3943..3eb9ca1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -32,7 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -235,7 +235,7 @@ public class VertexManager {
   public void initialize() {
     pluginContext = new VertexManagerPluginContextImpl();
     if (pluginDesc != null) {
-      plugin = RuntimeUtils.createClazzInstance(pluginDesc.getClassName());
+      plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName());
       payload = DagTypeConverters.convertToTezUserPayload(pluginDesc.getUserPayload());
     }
     if (payload == null || payload.getPayload() == null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 82e063a..e8e45a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -57,7 +57,7 @@ public class HistoryEventHandler extends CompositeService {
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
 
     historyLoggingService =
-        RuntimeUtils.createClazzInstance(historyServiceClassName);
+        ReflectionUtils.createClazzInstance(historyServiceClassName);
     historyLoggingService.setAppContext(context);
     addService(historyLoggingService);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 3997c2f..b58ee9b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.utils;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -26,6 +27,8 @@ import java.util.Map;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
@@ -35,125 +38,58 @@ import org.codehaus.jettison.json.JSONObject;
 
 public class DAGUtils {
 
-  public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException {
-
-    final String DAG_NAME_KEY = "dagName";
-    final String VERTICES_KEY = "vertices";
-    final String EDGES_KEY = "edges";
-
-    final String VERTEX_NAME_KEY = "vertexName";
-    final String PROCESSOR_CLASS_KEY = "processorClass";
-    final String IN_EDGE_IDS_KEY = "inEdgeIds";
-    final String OUT_EDGE_IDS_KEY = "outEdgeIds";
-    final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
-    final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
-    final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
-        "vertexManagerPluginClass";
-
-    final String EDGE_ID_KEY = "edgeId";
-    final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
-    final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
-    final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
-    final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
-    final String SCHEDULING_TYPE_KEY = "schedulingType";
-    final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
-    final String EDGE_DESTINATION_CLASS_KEY =
-        "edgeDestinationClass";
-
-    final String NAME_KEY = "name";
-    final String CLASS_KEY = "class";
-    final String INITIALIZER_KEY = "initializer";
-
-    JSONObject dagJson = new JSONObject();
-    dagJson.put(DAG_NAME_KEY, dagPlan.getName());
-    for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
-      JSONObject vertexJson = new JSONObject();
-      vertexJson.put(VERTEX_NAME_KEY, vertexPlan.getName());
-
-      if (vertexPlan.hasProcessorDescriptor()) {
-        vertexJson.put(PROCESSOR_CLASS_KEY,
-            vertexPlan.getProcessorDescriptor().getClassName());
-      }
-
-      for (String inEdgeId : vertexPlan.getInEdgeIdList()) {
-        vertexJson.accumulate(IN_EDGE_IDS_KEY, inEdgeId);
-      }
-      for (String outEdgeId : vertexPlan.getOutEdgeIdList()) {
-        vertexJson.accumulate(OUT_EDGE_IDS_KEY, outEdgeId);
-      }
-
-      for (DAGProtos.RootInputLeafOutputProto input :
-          vertexPlan.getInputsList()) {
-        JSONObject jsonInput = new JSONObject();
-        jsonInput.put(NAME_KEY, input.getName());
-        jsonInput.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
-        if (input.hasInitializerClassName()) {
-          jsonInput.put(INITIALIZER_KEY, input.getInitializerClassName());
-        }
-        vertexJson.accumulate(ADDITIONAL_INPUTS_KEY, jsonInput);
-      }
-
-      for (DAGProtos.RootInputLeafOutputProto output :
-          vertexPlan.getOutputsList()) {
-        JSONObject jsonOutput = new JSONObject();
-        jsonOutput.put(NAME_KEY, output.getName());
-        jsonOutput.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
-        if (output.hasInitializerClassName()) {
-          jsonOutput.put(INITIALIZER_KEY, output.getInitializerClassName());
-        }
-        vertexJson.accumulate(ADDITIONAL_OUTPUTS_KEY, jsonOutput);
-      }
-
-      if (vertexPlan.hasVertexManagerPlugin()) {
-        vertexJson.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
-            vertexPlan.getVertexManagerPlugin().getClassName());
-      }
+  static final String DAG_NAME_KEY = "dagName";
+  static final String VERTICES_KEY = "vertices";
+  static final String EDGES_KEY = "edges";
+  static final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+  static final String VERTEX_NAME_KEY = "vertexName";
+  static final String PROCESSOR_CLASS_KEY = "processorClass";
+  static final String IN_EDGE_IDS_KEY = "inEdgeIds";
+  static final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+  static final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+  static final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+  static final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+      "vertexManagerPluginClass";
+  static final String USER_PAYLOAD_AS_TEXT = "userPayloadAsText";
+  static final String OUTPUT_USER_PAYLOAD_AS_TEXT = "outputUserPayloadAsText";
+  static final String INPUT_USER_PAYLOAD_AS_TEXT = "inputUserPayloadAsText";
+
+  static final String EDGE_ID_KEY = "edgeId";
+  static final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+  static final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+  static final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+  static final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+  static final String SCHEDULING_TYPE_KEY = "schedulingType";
+  static final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+  static final String EDGE_DESTINATION_CLASS_KEY =
+      "edgeDestinationClass";
+
+  static final String NAME_KEY = "name";
+  static final String CLASS_KEY = "class";
+  static final String INITIALIZER_KEY = "initializer";
+
+  static final String VERTEX_GROUP_NAME_KEY = "groupName";
+  static final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+  static final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+  static final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+  static final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
 
-      dagJson.accumulate(VERTICES_KEY, vertexJson);
-    }
 
-    for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
-      JSONObject edgeJson = new JSONObject();
-      edgeJson.put(EDGE_ID_KEY, edgePlan.getId());
-      edgeJson.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
-      edgeJson.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
-      edgeJson.put(DATA_MOVEMENT_TYPE_KEY,
-          edgePlan.getDataMovementType().name());
-      edgeJson.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
-      edgeJson.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
-      edgeJson.put(EDGE_SOURCE_CLASS_KEY,
-          edgePlan.getEdgeSource().getClassName());
-      edgeJson.put(EDGE_DESTINATION_CLASS_KEY,
-          edgePlan.getEdgeDestination().getClassName());
 
-      dagJson.accumulate(EDGES_KEY, edgeJson);
+  public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException {
+    JSONObject dagJson;
+    try {
+      dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
     }
-
     return dagJson;
   }
 
   public static JSONObject convertCountersToJSON(TezCounters counters)
       throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    if (counters == null) {
-      return jsonObject;
-    }
-
-    for (CounterGroup group : counters) {
-      JSONObject jsonCGrp = new JSONObject();
-      jsonCGrp.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
-      jsonCGrp.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
-          group.getDisplayName());
-      for (TezCounter counter : group) {
-        JSONObject counterJson = new JSONObject();
-        counterJson.put(ATSConstants.COUNTER_NAME, counter.getName());
-        counterJson.put(ATSConstants.COUNTER_DISPLAY_NAME,
-            counter.getDisplayName());
-        counterJson.put(ATSConstants.COUNTER_VALUE, counter.getValue());
-        jsonCGrp.accumulate(ATSConstants.COUNTERS, counterJson);
-      }
-      jsonObject.accumulate(ATSConstants.COUNTER_GROUPS, jsonCGrp);
-    }
+    JSONObject jsonObject = new JSONObject(convertCountersToATSMap(counters));
     return jsonObject;
   }
 
@@ -185,44 +121,10 @@ public class DAGUtils {
   }
 
   public static Map<String,Object> convertDAGPlanToATSMap(
-      DAGProtos.DAGPlan dagPlan) {
+      DAGProtos.DAGPlan dagPlan) throws IOException {
 
     final String VERSION_KEY = "version";
     final int version = 1;
-    final String DAG_NAME_KEY = "dagName";
-    final String VERTICES_KEY = "vertices";
-    final String EDGES_KEY = "edges";
-    final String VERTEX_GROUPS_KEY = "vertexGroups";
-
-    final String VERTEX_NAME_KEY = "vertexName";
-    final String PROCESSOR_CLASS_KEY = "processorClass";
-    final String IN_EDGE_IDS_KEY = "inEdgeIds";
-    final String OUT_EDGE_IDS_KEY = "outEdgeIds";
-    final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
-    final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
-    final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
-        "vertexManagerPluginClass";
-
-    final String EDGE_ID_KEY = "edgeId";
-    final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
-    final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
-    final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
-    final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
-    final String SCHEDULING_TYPE_KEY = "schedulingType";
-    final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
-    final String EDGE_DESTINATION_CLASS_KEY =
-        "edgeDestinationClass";
-
-    final String NAME_KEY = "name";
-    final String CLASS_KEY = "class";
-    final String INITIALIZER_KEY = "initializer";
-
-    final String VERTEX_GROUP_NAME_KEY = "groupName";
-    final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
-    final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
-    final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
-    final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
-
     Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
     dagMap.put(DAG_NAME_KEY, dagPlan.getName());
     dagMap.put(VERSION_KEY, version);
@@ -234,6 +136,11 @@ public class DAGUtils {
       if (vertexPlan.hasProcessorDescriptor()) {
         vertexMap.put(PROCESSOR_CLASS_KEY,
             vertexPlan.getProcessorDescriptor().getClassName());
+        if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
+          vertexMap.put(USER_PAYLOAD_AS_TEXT,
+              DagTypeConverters.getHistoryTextFromProto(
+                  vertexPlan.getProcessorDescriptor()));
+        }
       }
 
       ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
@@ -253,6 +160,11 @@ public class DAGUtils {
         if (input.hasInitializerClassName()) {
           inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
         }
+        if (input.getEntityDescriptor().hasHistoryText()) {
+          inputMap.put(USER_PAYLOAD_AS_TEXT,
+              DagTypeConverters.getHistoryTextFromProto(
+                  input.getEntityDescriptor()));
+        }
         inputsList.add(inputMap);
       }
       putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
@@ -266,6 +178,11 @@ public class DAGUtils {
         if (output.hasInitializerClassName()) {
           outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
         }
+        if (output.getEntityDescriptor().hasHistoryText()) {
+          outputMap.put(USER_PAYLOAD_AS_TEXT,
+              DagTypeConverters.getHistoryTextFromProto(
+                  output.getEntityDescriptor()));
+        }
         outputsList.add(outputMap);
       }
       putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
@@ -293,7 +210,16 @@ public class DAGUtils {
           edgePlan.getEdgeSource().getClassName());
       edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
           edgePlan.getEdgeDestination().getClassName());
-
+      if (edgePlan.getEdgeSource().hasHistoryText()) {
+        edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
+            DagTypeConverters.getHistoryTextFromProto(
+                edgePlan.getEdgeSource()));
+      }
+      if (edgePlan.getEdgeDestination().hasHistoryText()) {
+        edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
+            DagTypeConverters.getHistoryTextFromProto(
+                edgePlan.getEdgeDestination()));
+      }
       edgesList.add(edgeMap);
     }
     putInto(dagMap, EDGES_KEY, edgesList);
@@ -321,6 +247,11 @@ public class DAGUtils {
             && edgeMergedInputInfo.getMergedInput().hasClassName()) {
             edgeMergedInput.put(PROCESSOR_CLASS_KEY,
                 edgeMergedInputInfo.getMergedInput().getClassName());
+            if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
+              edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
+                  DagTypeConverters.getHistoryTextFromProto(
+                      edgeMergedInputInfo.getMergedInput()));
+            }
           }
           edgeMergedInputs.add(edgeMergedInput);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
index 6539944..37b49d8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.dag.utils;
 
 import java.util.BitSet;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
index aae82f8..bcd65e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezException;
 
 import com.google.common.collect.Lists;
@@ -55,7 +55,7 @@ public class RelocalizationUtils {
   }
 
   public static void addUrlsToClassPath(List<URL> urls) {
-    RuntimeUtils.addResourcesToSystemClassLoader(urls);
+    ReflectionUtils.addResourcesToSystemClassLoader(urls);
   }
 
   private static Path downloadResource(String destName, URI uri, Configuration conf)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
index 8b888ff..1146ce4 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.runtime.task;
 
 public interface ErrorReporter {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 009df3c..4fb563c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.dag.api.client;
 
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index ed91e20..5c79f22 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -76,7 +76,7 @@ public class TestHistoryEventsProtoConversion {
     event.toProtoStream(os);
     os.flush();
     os.close();
-    deserializedEvent = RuntimeUtils.createClazzInstance(
+    deserializedEvent = ReflectionUtils.createClazzInstance(
         event.getClass().getName());
     LOG.info("Serialized event to byte array"
         + ", eventType=" + event.getEventType()
@@ -100,7 +100,7 @@ public class TestHistoryEventsProtoConversion {
     SummaryEventProto summaryEventProto =
         SummaryEventProto.parseDelimitedFrom(
             new ByteArrayInputStream(os.toByteArray()));
-    deserializedEvent = RuntimeUtils.createClazzInstance(
+    deserializedEvent = ReflectionUtils.createClazzInstance(
         event.getClass().getName());
     ((SummaryEvent)deserializedEvent).fromSummaryProtoStream(summaryEventProto);
     return deserializedEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
new file mode 100644
index 0000000..f926471
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.utils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestDAGUtils {
+
+  private DAGPlan createDAG() {
+    // Create a plan with 3 vertices: A, B, C. Group(A,B)->C
+    Configuration conf = new Configuration(false);
+    int dummyTaskCount = 1;
+    Resource dummyTaskResource = Resource.newInstance(1, 1);
+    org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
+        new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"),
+        dummyTaskCount, dummyTaskResource);
+    v1.addInput("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"),
+        null);
+    org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
+        new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"),
+        dummyTaskCount, dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v3 = new org.apache.tez.dag.api.Vertex("vertex3",
+        new ProcessorDescriptor("Processor").setHistoryText("vertex3 Processor HistoryText"),
+        dummyTaskCount, dummyTaskResource);
+
+    DAG dag = new DAG("testDag");
+    String groupName1 = "uv12";
+    org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+    OutputDescriptor outDesc = new OutputDescriptor("output.class")
+        .setHistoryText("uvOut HistoryText");
+    uv12.addOutput("uvOut", outDesc, OutputCommitter.class);
+    v3.addOutput("uvOut", outDesc, OutputCommitter.class);
+
+    GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class").setHistoryText("Dummy History Text"),
+            new InputDescriptor("dummy input class").setHistoryText("Dummy History Text")),
+        new InputDescriptor("merge.class").setHistoryText("Merge HistoryText"));
+
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    return dag.createDag(conf);
+  }
+
+  @Test
+  public void testConvertDAGPlanToATSMap() throws IOException, JSONException {
+    DAGPlan dagPlan = createDAG();
+    Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
+    Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
+    Assert.assertTrue(atsMap.containsKey("version"));
+    Assert.assertEquals(1, atsMap.get("version"));
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY));
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY));
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));
+
+    Assert.assertEquals(3, ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY)).size());
+    Set<String> vNames = Sets.newHashSet("vertex1", "vertex2", "vertex3");
+
+    Set<String> inEdgeIds = new HashSet<String>();
+    Set<String> outEdgeIds = new HashSet<String>();
+
+    int additionalInputCount = 0;
+    int additionalOutputCount = 0;
+
+    for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY))) {
+      Map<String, Object> v = (Map<String, Object>) o;
+      Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY));
+      Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY));
+      Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT));
+
+      if (v.containsKey(DAGUtils.IN_EDGE_IDS_KEY)) {
+        inEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.IN_EDGE_IDS_KEY)));
+      }
+      if (v.containsKey(DAGUtils.OUT_EDGE_IDS_KEY)) {
+        outEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.OUT_EDGE_IDS_KEY)));
+      }
+
+      String vName = (String) v.get(DAGUtils.VERTEX_NAME_KEY);
+      Assert.assertTrue(vNames.contains(vName));
+      String procPayload = vName + " Processor HistoryText";
+      Assert.assertEquals(procPayload, v.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+
+      if (v.containsKey(DAGUtils.ADDITIONAL_INPUTS_KEY)) {
+        additionalInputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY)).size();
+        for (Object input : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY))) {
+          Map<String, Object> inputMap = (Map<String, Object>) input;
+          Assert.assertTrue(inputMap.containsKey(DAGUtils.NAME_KEY));
+          Assert.assertTrue(inputMap.containsKey(DAGUtils.CLASS_KEY));
+          Assert.assertFalse(inputMap.containsKey(DAGUtils.INITIALIZER_KEY));
+          Assert.assertEquals("input HistoryText", inputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+        }
+      }
+
+      if (v.containsKey(DAGUtils.ADDITIONAL_OUTPUTS_KEY)) {
+        additionalOutputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY)).size();
+        for (Object output : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY))) {
+          Map<String, Object> outputMap = (Map<String, Object>) output;
+          Assert.assertTrue(outputMap.containsKey(DAGUtils.NAME_KEY));
+          Assert.assertTrue(outputMap.containsKey(DAGUtils.CLASS_KEY));
+          Assert.assertTrue(outputMap.containsKey(DAGUtils.INITIALIZER_KEY));
+          Assert.assertEquals("uvOut HistoryText", outputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+        }
+      }
+    }
+
+    // 1 input
+    Assert.assertEquals(1, additionalInputCount);
+    // 3 outputs due to vertex group
+    Assert.assertEquals(3, additionalOutputCount);
+
+    // 1 edge translates to 2 due to vertex group
+    Assert.assertEquals(2, inEdgeIds.size());
+    Assert.assertEquals(2, outEdgeIds.size());
+
+    for (Object o : ((Collection<?>) atsMap.get(DAGUtils.EDGES_KEY))) {
+      Map<String, Object> e = (Map<String, Object>) o;
+
+      Assert.assertTrue(inEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
+      Assert.assertTrue(outEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
+      Assert.assertTrue(e.containsKey(DAGUtils.INPUT_VERTEX_NAME_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.OUTPUT_VERTEX_NAME_KEY));
+      Assert.assertEquals(DataMovementType.SCATTER_GATHER.name(),
+          e.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
+      Assert.assertEquals(DataSourceType.PERSISTED.name(), e.get(DAGUtils.DATA_SOURCE_TYPE_KEY));
+      Assert.assertEquals(SchedulingType.SEQUENTIAL.name(), e.get(DAGUtils.SCHEDULING_TYPE_KEY));
+      Assert.assertEquals("dummy output class", e.get(DAGUtils.EDGE_SOURCE_CLASS_KEY));
+      Assert.assertEquals("dummy input class", e.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
+      Assert.assertEquals("Dummy History Text", e.get(DAGUtils.OUTPUT_USER_PAYLOAD_AS_TEXT));
+      Assert.assertEquals("Dummy History Text", e.get(DAGUtils.INPUT_USER_PAYLOAD_AS_TEXT));
+    }
+
+    for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTEX_GROUPS_KEY))) {
+      Map<String, Object> e = (Map<String, Object>) o;
+      Assert.assertEquals("uv12", e.get(DAGUtils.VERTEX_GROUP_NAME_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_MEMBERS_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_OUTPUTS_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 1c6d45e..051bfee 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.mapreduce.examples;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -198,12 +199,16 @@ public class OrderedWordCount extends Configured implements Tool {
 
     List<Vertex> vertices = new ArrayList<Vertex>();
 
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
+    mapStageConf.writeXml(outputStream);
+    String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
     byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
     byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
       TextInputFormat.class.getName());
     int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(mapPayload),
+        MapProcessor.class.getName()).setUserPayload(mapPayload)
+            .setHistoryText(mapStageHistoryText),
         numMaps, MRHelpers.getMapResource(mapStageConf));
     if (generateSplitsInClient) {
       mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
@@ -222,19 +227,27 @@ public class OrderedWordCount extends Configured implements Tool {
     MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
     vertices.add(mapVertex);
 
+    ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
+    iReduceStageConf.writeXml(iROutputStream);
+    String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
     Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).
-        setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)),
-        2,
-        MRHelpers.getReduceResource(iReduceStageConf));
+        ReduceProcessor.class.getName())
+            .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
+            .setHistoryText(iReduceStageHistoryText),
+        2, MRHelpers.getReduceResource(iReduceStageConf));
     ivertex.setTaskLocalFiles(commonLocalResources);
     vertices.add(ivertex);
 
+    ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
+    finalReduceConf.writeXml(finalReduceOutputStream);
+    String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
     byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
     Vertex finalReduceVertex = new Vertex("finalreduce",
         new ProcessorDescriptor(
-            ReduceProcessor.class.getName()).setUserPayload(finalReducePayload),
-                1, MRHelpers.getReduceResource(finalReduceConf));
+            ReduceProcessor.class.getName())
+                .setUserPayload(finalReducePayload)
+                .setHistoryText(finalReduceStageHistoryText), 1,
+        MRHelpers.getReduceResource(finalReduceConf));
     finalReduceVertex.setTaskLocalFiles(commonLocalResources);
     MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
     vertices.add(finalReduceVertex);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 6e94c22..76da547 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -29,8 +29,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -85,7 +84,7 @@ public class TezGroupedSplitsInputFormat<K, V>
       Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) 
           getClassFromName(split.wrappedInputFormatName);
       try {
-        wrappedInputFormat = ReflectionUtils.newInstance(clazz, conf);
+        wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
       } catch (Exception e) {
         throw new TezUncheckedException(e);
       }
@@ -93,7 +92,7 @@ public class TezGroupedSplitsInputFormat<K, V>
   }
 
   static Class<?> getClassFromName(String name) {
-    return RuntimeUtils.getClazz(name);
+    return ReflectionUtils.getClazz(name);
   }
 
   public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 5fa3e79..3fd0b6e 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -31,8 +31,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -120,7 +119,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
       Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) 
           getClassFromName(split.wrappedInputFormatName);
       try {
-        wrappedInputFormat = ReflectionUtils.newInstance(clazz, conf);
+        wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
       } catch (Exception e) {
         throw new TezUncheckedException(e);
       }
@@ -128,7 +127,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
   }
   
   static Class<?> getClassFromName(String name) {
-    return RuntimeUtils.getClazz(name);
+    return ReflectionUtils.getClazz(name);
   }
   
   public class TezGroupedSplitsRecordReader  extends RecordReader<K, V> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 70f56d4..f078f1d 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -18,8 +18,11 @@
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.io.IOException;
+
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -277,8 +280,12 @@ public class HistoryEventTimelineConversion {
     atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
     atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
 
-    atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
-        DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    try {
+      atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
     atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
         event.getApplicationAttemptId().getApplicationId().toString());
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 881ae90..122cc23 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -42,7 +42,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -503,7 +503,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private LogicalInput createInputFromDescriptor(InputDescriptor inputDesc) {
-    Input input = RuntimeUtils.createClazzInstance(inputDesc.getClassName());
+    Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName());
     if (!(input instanceof LogicalInput)) {
       throw new TezUncheckedException(inputDesc.getClass().getName()
           + " is not a sub-type of LogicalInput."
@@ -514,7 +514,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   
   private LogicalOutput createOutput(OutputSpec outputSpec) {
     LOG.info("Creating Output");
-    Output output = RuntimeUtils.createClazzInstance(outputSpec
+    Output output = ReflectionUtils.createClazzInstance(outputSpec
         .getOutputDescriptor().getClassName());
     if (!(output instanceof LogicalOutput)) {
       throw new TezUncheckedException(output.getClass().getName()
@@ -526,7 +526,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private LogicalIOProcessor createProcessor(
       ProcessorDescriptor processorDescriptor) {
-    Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
+    Processor processor = ReflectionUtils.createClazzInstance(processorDescriptor
         .getClassName());
     if (!(processor instanceof LogicalIOProcessor)) {
       throw new TezUncheckedException(processor.getClass().getName()

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index c024d0a..6d2f852 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -30,7 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezEntityDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -65,9 +65,9 @@ public class MemoryDistributor {
   private final List<RequestorInfo> requestList;
 
   /**
-   * @param numInputs
+   * @param numTotalInputs
    *          total number of Inputs for the task
-   * @param numOutputs
+   * @param numTotalOutputs
    *          total number of Outputs for the task
    * @param conf
    *          Tez specific task configuration
@@ -122,7 +122,7 @@ public class MemoryDistributor {
       String allocatorClassName = conf.get(TezJobConfig.TEZ_RUNTIME_SCALE_TASK_MEMORY_ALLOCATOR_CLASS,
           TezJobConfig.TEZ_RUNTIME_SCALE_TASK_MEMORY_ALLOCATOR_CLASS_DEFAULT);
       LOG.info("Using Allocator class: " + allocatorClassName);
-      InitialMemoryAllocator allocator = RuntimeUtils.createClazzInstance(allocatorClassName);
+      InitialMemoryAllocator allocator = ReflectionUtils.createClazzInstance(allocatorClassName);
       allocator.setConf(conf);
       allocations = allocator.assignMemory(totalJvmMemory, numTotalInputs, numTotalOutputs,
           Iterables.unmodifiableIterable(requestContexts));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36d08fe0/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
new file mode 100644
index 0000000..d10bd29
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
+import org.junit.Test;
+
+public class TestReflectionUtils {
+
+  @Test
+  public void testAddResourceToClasspath() throws IOException, TezException {
+
+    String rsrcName = "dummyfile.xml";
+    FileSystem localFs = FileSystem.getLocal(new Configuration());
+    Path p = new Path(rsrcName);
+    p = localFs.makeQualified(p);
+
+    localFs.delete(p, false);
+
+    try {
+      URL loadedUrl = null;
+
+      loadedUrl = Thread.currentThread().getContextClassLoader().getResource(rsrcName);
+      assertNull(loadedUrl);
+
+      // Add parent to classpath since we're not adding a jar
+      assertTrue(localFs.createNewFile(p));
+      String urlForm = p.toUri().toURL().toString();
+      urlForm = urlForm.substring(0, urlForm.lastIndexOf('/') + 1);
+      URL url = new URL(urlForm);
+
+      ReflectionUtils.addResourcesToClasspath(Collections.singletonList(url));
+
+      loadedUrl = Thread.currentThread().getContextClassLoader().getResource(rsrcName);
+
+      assertNotNull(loadedUrl);
+    } finally {
+      localFs.delete(p, false);
+    }
+  }
+}


Mime
View raw message