tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [03/50] [abbrv] tez git commit: TEZ-1666. UserPayload should be null if the payload is not specified. (sseth)
Date Mon, 17 Nov 2014 19:22:12 GMT
TEZ-1666. UserPayload should be null if the payload is not specified.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4e69bed5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4e69bed5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4e69bed5

Branch: refs/heads/TEZ-8
Commit: 4e69bed5c7dbc68d17f3d4648838e5acfc52c3eb
Parents: 1ffbc19
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Oct 29 14:22:03 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Oct 29 14:22:03 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/tez/dag/api/DagTypeConverters.java   |  56 ++--
 tez-api/src/main/proto/DAGApiRecords.proto      |   8 +-
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  18 +-
 .../tez/dag/api/TestDagTypeConverters.java      |   4 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  11 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  10 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 117 ++++----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 268 ++++++++++---------
 .../TestHistoryEventsProtoConversion.java       |   3 +-
 .../runtime/api/impl/TezInputContextImpl.java   |   2 +-
 .../api/impl/TezMergedInputContextImpl.java     |   2 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   2 +-
 .../api/impl/TezProcessorContextImpl.java       |   2 +-
 .../examples/BroadcastAndOneToOneExample.java   |   3 +-
 15 files changed, 275 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d2ccb9..933a445 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,9 @@ ALL CHANGES:
 Release 0.5.2: Unreleased
 
 INCOMPATIBLE CHANGES
+  TEZ-1666. UserPayload should be null if the payload is not specified.
+    0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664).
+    context.getUserPayload can now return null, apps may need to add defensive code.
 
 ALL CHANGES:
   TEZ-1620. Wait for application finish before stopping MiniTezCluster

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 179f3cc..17807d3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -306,9 +306,13 @@ public class DagTypeConverters {
     builder.setClassName(descriptor.getClassName());
 
     UserPayload userPayload = descriptor.getUserPayload();
-    if (userPayload != null && userPayload.hasPayload()) {
-      builder.setUserPayload(ByteString.copyFrom(descriptor.getUserPayload().getPayload()));
-      builder.setVersion(userPayload.getVersion());
+    if (userPayload != null) {
+      DAGProtos.TezUserPayloadProto.Builder payloadBuilder = DAGProtos.TezUserPayloadProto.newBuilder();
+      if (userPayload.hasPayload()) {
+        payloadBuilder.setUserPayload(ByteString.copyFrom(userPayload.getPayload()));
+        payloadBuilder.setVersion(userPayload.getVersion());
+      }
+      builder.setTezUserPayload(payloadBuilder.build());
     }
     if (descriptor.getHistoryText() != null) {
       try {
@@ -348,62 +352,84 @@ public class DagTypeConverters {
   private static UserPayload convertTezUserPayloadFromDAGPlan(
       TezEntityDescriptorProto proto) {
     UserPayload userPayload = null;
-    if (proto.hasUserPayload()) {
-      userPayload =
-          UserPayload.create(proto.getUserPayload().asReadOnlyByteBuffer(), proto.getVersion());
-    } else {
-      userPayload = UserPayload.create(null, -1);
+    if (proto.hasTezUserPayload()) {
+      if (proto.getTezUserPayload().hasUserPayload()) {
+        userPayload =
+            UserPayload.create(proto.getTezUserPayload().getUserPayload().asReadOnlyByteBuffer(), proto.getTezUserPayload().getVersion());
+      } else {
+        userPayload = UserPayload.create(null);
+      }
     }
     return userPayload;
   }
 
+  private static void setUserPayload(EntityDescriptor<?> entity, UserPayload payload) {
+    if (payload != null) {
+      entity.setUserPayload(payload);
+    }
+  }
+
   public static InputDescriptor convertInputDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
     UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
-    return InputDescriptor.create(className).setUserPayload(payload);
+    InputDescriptor id = InputDescriptor.create(className);
+    setUserPayload(id, payload);
+    return id;
   }
 
   public static OutputDescriptor convertOutputDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
     UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
-    return OutputDescriptor.create(className).setUserPayload(payload);
+    OutputDescriptor od = OutputDescriptor.create(className);
+    setUserPayload(od, payload);
+    return od;
   }
 
   public static InputInitializerDescriptor convertInputInitializerDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
     UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
-    return InputInitializerDescriptor.create(className).setUserPayload(payload);
+    InputInitializerDescriptor iid = InputInitializerDescriptor.create(className);
+    setUserPayload(iid, payload);
+    return iid;
   }
 
   public static OutputCommitterDescriptor convertOutputCommitterDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
     UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
-    return OutputCommitterDescriptor.create(className).setUserPayload(payload);
+    OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create(className);
+    setUserPayload(ocd, payload);
+    return ocd;
   }
 
   public static VertexManagerPluginDescriptor convertVertexManagerPluginDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
     UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
-    return VertexManagerPluginDescriptor.create(className).setUserPayload(payload);
+    VertexManagerPluginDescriptor vmpd = VertexManagerPluginDescriptor.create(className);
+    setUserPayload(vmpd, payload);
+    return vmpd;
   }
 
   public static EdgeManagerPluginDescriptor convertEdgeManagerPluginDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
     UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
-    return EdgeManagerPluginDescriptor.create(className).setUserPayload(payload);
+    EdgeManagerPluginDescriptor empd = EdgeManagerPluginDescriptor.create(className);
+    setUserPayload(empd, payload);
+    return empd;
   }
 
   public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
     UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
-    return ProcessorDescriptor.create(className).setUserPayload(payload);
+    ProcessorDescriptor pd = ProcessorDescriptor.create(className);
+    setUserPayload(pd, payload);
+    return pd;
   }
 
   public static TezAppMasterStatus convertTezSessionStatusFromProto(

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 04aa791..177faba 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -112,9 +112,13 @@ message PlanTaskConfiguration {
 
 message TezEntityDescriptorProto {
   optional string class_name = 1;
-  optional bytes user_payload = 2;
+  optional TezUserPayloadProto tez_user_payload = 2;
   optional bytes history_text = 3;
-  optional int32 version = 4;
+}
+
+message TezUserPayloadProto {
+  optional bytes user_payload = 1;
+  optional int32 version = 2;
 }
 
 message RootInputLeafOutputProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 8cbd611..fccbb08 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -167,19 +167,19 @@ public class TestDAGPlan {
     VertexPlan v2Proto = dagProto.getVertex(1);
     EdgePlan edgeProto = dagProto.getEdge(0);
 
-    assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor()
+    assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor().getTezUserPayload()
         .getUserPayload().toByteArray()));
     assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName());
 
-    assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor()
+    assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor().getTezUserPayload()
         .getUserPayload().toByteArray()));
     assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName());
 
-    assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
+    assertEquals("inputBytes", new String(edgeProto.getEdgeDestination().getTezUserPayload()
         .getUserPayload().toByteArray()));
     assertEquals("input", edgeProto.getEdgeDestination().getClassName());
 
-    assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
+    assertEquals("outputBytes", new String(edgeProto.getEdgeSource().getTezUserPayload()
         .getUserPayload().toByteArray()));
     assertEquals("output", edgeProto.getEdgeSource().getClassName());
 
@@ -235,8 +235,8 @@ public class TestDAGPlan {
     EdgePlan edgeProto = dagProto.getEdge(0);
 
     // either v1 or v2 will be on top based on topological order 
-    String v1ProtoPayload = new String(v1Proto.getProcessorDescriptor().getUserPayload().toByteArray());
-    String v2ProtoPayload = new String(v2Proto.getProcessorDescriptor().getUserPayload().toByteArray());
+    String v1ProtoPayload = new String(v1Proto.getProcessorDescriptor().getTezUserPayload().getUserPayload().toByteArray());
+    String v2ProtoPayload = new String(v2Proto.getProcessorDescriptor().getTezUserPayload().getUserPayload().toByteArray());
     assertTrue(v1ProtoPayload.equals("processor1Bytes") || v1ProtoPayload.equals("processor3Bytes"));
     assertTrue(v2ProtoPayload.equals("processor1Bytes") || v2ProtoPayload.equals("processor3Bytes"));
     assertTrue(v1Proto.getProcessorDescriptor().getClassName().equals("processor1") ||
@@ -244,15 +244,15 @@ public class TestDAGPlan {
     assertTrue(v2Proto.getProcessorDescriptor().getClassName().equals("processor1") ||
         v2Proto.getProcessorDescriptor().getClassName().equals("processor3"));
 
-    assertEquals("processor2Bytes", new String(v3Proto.getProcessorDescriptor()
+    assertEquals("processor2Bytes", new String(v3Proto.getProcessorDescriptor().getTezUserPayload()
         .getUserPayload().toByteArray()));
     assertEquals("processor2", v3Proto.getProcessorDescriptor().getClassName());
 
-    assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
+    assertEquals("inputBytes", new String(edgeProto.getEdgeDestination().getTezUserPayload()
         .getUserPayload().toByteArray()));
     assertEquals("input", edgeProto.getEdgeDestination().getClassName());
 
-    assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
+    assertEquals("outputBytes", new String(edgeProto.getEdgeSource().getTezUserPayload()
         .getUserPayload().toByteArray()));
     assertEquals("output", edgeProto.getEdgeSource().getClassName());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 9a1cb07..13347bb 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -37,8 +37,8 @@ public class TestDagTypeConverters {
         .setHistoryText(historytext);
     TezEntityDescriptorProto proto =
         DagTypeConverters.convertToDAGPlan(entityDescriptor);
-    Assert.assertEquals(payload.getVersion(), proto.getVersion());
-    Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getUserPayload().toByteArray());
+    Assert.assertEquals(payload.getVersion(), proto.getTezUserPayload().getVersion());
+    Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getTezUserPayload().getUserPayload().toByteArray());
     Assert.assertTrue(proto.hasHistoryText());
     Assert.assertNotEquals(historytext, proto.getHistoryText());
     Assert.assertEquals(historytext, new String(

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 57d742f..360a839 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -115,25 +115,22 @@ public class Edge {
   private void createEdgeManager() {
     switch (edgeProperty.getDataMovementType()) {
       case ONE_TO_ONE:
-        edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
+        edgeManagerContext = new EdgeManagerPluginContextImpl(null);
         edgeManager = new OneToOneEdgeManager(edgeManagerContext);
         break;
       case BROADCAST:
-        edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
+        edgeManagerContext = new EdgeManagerPluginContextImpl(null);
         edgeManager = new BroadcastEdgeManager(edgeManagerContext);
         break;
       case SCATTER_GATHER:
-        edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
+        edgeManagerContext = new EdgeManagerPluginContextImpl(null);
         edgeManager = new ScatterGatherEdgeManager(edgeManagerContext);
         break;
       case CUSTOM:
         if (edgeProperty.getEdgeManagerDescriptor() != null) {
           UserPayload payload = null;
-          if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null &&
-              edgeProperty.getEdgeManagerDescriptor().getUserPayload().hasPayload()) {
+          if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) {
             payload = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
-          } else {
-            payload = UserPayload.create(null);
           }
           edgeManagerContext = new EdgeManagerPluginContextImpl(payload);
           String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 50e8b0c..b8e99d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -69,7 +69,6 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
@@ -2032,15 +2031,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Setting vertexManager to RootInputVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
-            VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName())
-            .setUserPayload(UserPayload.create(null)),
+            VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()),
             this, appContext);
       } else if (hasOneToOne && !hasCustom) {
         LOG.info("Setting vertexManager to InputReadyVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
-            VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName())
-            .setUserPayload(UserPayload.create(null)),
+            VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()),
             this, appContext);
       } else if (hasBipartite && !hasCustom) {
         LOG.info("Setting vertexManager to ShuffleVertexManager for "
@@ -2053,8 +2050,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
             + logIdentifier);
         vertexManager = new VertexManager(
-            VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName())
-            .setUserPayload(UserPayload.create(null)),
+            VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()),
             this, appContext);
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index dc90a6f..af48c49 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -650,65 +650,67 @@ public class TestDAGImpl {
         .setName("testverteximpl")
         .addVertex(
             VertexPlan.newBuilder()
-            .setName("vertex1")
-            .setType(PlanVertexType.NORMAL)
-            .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
-                .addHost("host1")
-                .addRack("rack1")
-                .build()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder()
+                        .addHost("host1")
+                        .addRack("rack1")
+                        .build()
                 )
-            .setTaskConfig(
-                PlanTaskConfiguration.newBuilder()
-                .setNumTasks(1)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("")
-                .setTaskModule("x1.y1")
-                .build()
-                )
-            .addOutEdgeId("e1")
-            .build()
-            )
-          .addVertex(
-            VertexPlan.newBuilder()
-            .setName("vertex2")
-            .setType(PlanVertexType.NORMAL)
-            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
-            .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
-                .addHost("host2")
-                .addRack("rack2")
-                .build()
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
                 )
-            .setTaskConfig(
-                PlanTaskConfiguration.newBuilder()
-                .setNumTasks(2)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("foo")
-                .setTaskModule("x2.y2")
+                .addOutEdgeId("e1")
                 .build()
-                )
-            .addInEdgeId("e1")
-            .build()
-            )
+        )
+          .addVertex(
+              VertexPlan.newBuilder()
+                  .setName("vertex2")
+                  .setType(PlanVertexType.NORMAL)
+                  .setProcessorDescriptor(
+                      TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
+                  .addTaskLocationHint(
+                      PlanTaskLocationHint.newBuilder()
+                          .addHost("host2")
+                          .addRack("rack2")
+                          .build()
+                  )
+                  .setTaskConfig(
+                      PlanTaskConfiguration.newBuilder()
+                          .setNumTasks(2)
+                          .setVirtualCores(4)
+                          .setMemoryMb(1024)
+                          .setJavaOpts("foo")
+                          .setTaskModule("x2.y2")
+                          .build()
+                  )
+                  .addInEdgeId("e1")
+                  .build()
+          )
          .addEdge(
-            EdgePlan.newBuilder()
-            .setEdgeManager(TezEntityDescriptorProto.newBuilder()
-                .setClassName(CustomizedEdgeManager.class.getName())
-                .setUserPayload(ByteString.copyFromUtf8(exLocation.name()))
-                )
-            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
-            .setInputVertexName("vertex1")
-            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
-            .setOutputVertexName("vertex2")
-            .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
-            .setId("e1")
-            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-            .build()
-            )
+             EdgePlan.newBuilder()
+                 .setEdgeManager(TezEntityDescriptorProto.newBuilder()
+                         .setClassName(CustomizedEdgeManager.class.getName())
+                         .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                            .setUserPayload(ByteString.copyFromUtf8(exLocation.name())))
+                 )
+                 .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                 .setInputVertexName("vertex1")
+                 .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+                 .setOutputVertexName("vertex2")
+                 .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+                 .setId("e1")
+                 .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                 .build()
+         )
           .build();
     return dag;
   }
@@ -1135,14 +1137,15 @@ public class TestDAGImpl {
             TezEntityDescriptorProto
                 .newBuilder()
                 .setClassName(CountingOutputCommitter.class.getName())
+                .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
                 .setUserPayload(
                     ByteString
                         .copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
-                            true, false, false).toUserPayload())).build())
+                            true, false, false).toUserPayload())).build()))
         .setName("output3")
         .setIODescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName("output.class")
-                )
+        )
         .build());
     badVertex.setAdditionalOutputs(outputs);
     

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 72dcd92..55ee05f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -219,7 +219,7 @@ public class TestVertexImpl {
 
     @Override
     public void initialize() throws IOException {
-      if (getContext().getUserPayload().hasPayload()) {
+      if (getContext().getUserPayload() != null && getContext().getUserPayload().hasPayload()) {
         CountingOutputCommitterConfig conf =
             new CountingOutputCommitterConfig(getContext().getUserPayload());
         this.throwError = conf.throwError;
@@ -419,8 +419,9 @@ public class TestVertexImpl {
                 )
                 .addOutEdgeId("e1")
                 .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
-                   .setClassName(VertexManagerWithException.class.getName())
-                   .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes())))
+                    .setClassName(VertexManagerWithException.class.getName())
+                    .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                        .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes()))))
                 .build()
         )
         .addVertex(
@@ -438,8 +439,9 @@ public class TestVertexImpl {
                 )
                 .addInEdgeId("e1")
                 .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
-                   .setClassName(VertexManagerWithException.class.getName())
-                   .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes())))
+                    .setClassName(VertexManagerWithException.class.getName())
+                    .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                        .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes()))))
                 .build()
         )
         .addEdge(
@@ -706,24 +708,24 @@ public class TestVertexImpl {
                         .setControllerDescriptor(
                             TezEntityDescriptorProto.newBuilder().setClassName(
                                 initializerClassName))
-                    .setName("input1")
-                    .setIODescriptor(
-                        TezEntityDescriptorProto.newBuilder()
-                            .setClassName("InputClazz")
-                            .build()
-                    ).build()
+                        .setName("input1")
+                        .setIODescriptor(
+                            TezEntityDescriptorProto.newBuilder()
+                                .setClassName("InputClazz")
+                                .build()
+                        ).build()
                 )
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(-1)
-                    .setVirtualCores(4)
-                    .setMemoryMb(1024)
-                    .setJavaOpts("")
-                    .setTaskModule("x1.y1")
-                    .build()
+                        .setNumTasks(-1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
                 )
                 .addOutEdgeId("e1")
-            .build()
+                .build()
         )
         .addVertex(
             VertexPlan.newBuilder()
@@ -737,11 +739,11 @@ public class TestVertexImpl {
                         .setName("input2")
                         .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder()
-                              .setClassName("InputClazz")
-                              .build()
+                                .setClassName("InputClazz")
+                                .build()
                         )
                         .build()
-                    )
+                )
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
                         .setNumTasks(-1)
@@ -752,7 +754,7 @@ public class TestVertexImpl {
                         .build()
                 )
                 .addInEdgeId("e1")
-              .build()
+                .build()
         )
         .addVertex(
             VertexPlan.newBuilder()
@@ -766,8 +768,8 @@ public class TestVertexImpl {
                         .setName("input3")
                         .setIODescriptor(
                             TezEntityDescriptorProto.newBuilder()
-                              .setClassName("InputClazz")
-                              .build()
+                                .setClassName("InputClazz")
+                                .build()
                         )
                         .build()
                     )
@@ -782,40 +784,42 @@ public class TestVertexImpl {
                 )
                 .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
                     .setClassName(RootInputSpecUpdaterVertexManager.class.getName())
-                    .setUserPayload(ByteString.copyFrom(new byte[] {0})))
+                    .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                        .setUserPayload(ByteString.copyFrom(new byte[]{0}))))
               .build()
         )
                 .addVertex(
-            VertexPlan.newBuilder()
-                .setName("vertex4")
-                .setType(PlanVertexType.NORMAL)
-                .addInputs(
-                    RootInputLeafOutputProto.newBuilder()
-                        .setControllerDescriptor(
-                            TezEntityDescriptorProto.newBuilder().setClassName(
-                                initializerClassName))
-                        .setName("input4")
-                        .setIODescriptor(
-                            TezEntityDescriptorProto.newBuilder()
-                              .setClassName("InputClazz")
-                              .build()
+                    VertexPlan.newBuilder()
+                        .setName("vertex4")
+                        .setType(PlanVertexType.NORMAL)
+                        .addInputs(
+                            RootInputLeafOutputProto.newBuilder()
+                                .setControllerDescriptor(
+                                    TezEntityDescriptorProto.newBuilder().setClassName(
+                                        initializerClassName))
+                                .setName("input4")
+                                .setIODescriptor(
+                                    TezEntityDescriptorProto.newBuilder()
+                                        .setClassName("InputClazz")
+                                        .build()
+                                )
+                                .build()
                         )
-                        .build()
-                    )
-                .setTaskConfig(
-                    PlanTaskConfiguration.newBuilder()
-                        .setNumTasks(-1)
-                        .setVirtualCores(4)
-                        .setMemoryMb(1024)
-                        .setJavaOpts("")
-                        .setTaskModule("x3.y3")
+                        .setTaskConfig(
+                            PlanTaskConfiguration.newBuilder()
+                                .setNumTasks(-1)
+                                .setVirtualCores(4)
+                                .setMemoryMb(1024)
+                                .setJavaOpts("")
+                                .setTaskModule("x3.y3")
+                                .build()
+                        )
+                        .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+                            .setClassName(RootInputSpecUpdaterVertexManager.class.getName())
+                            .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                                .setUserPayload(ByteString.copyFrom(new byte[]{1}))))
                         .build()
                 )
-                .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
-                    .setClassName(RootInputSpecUpdaterVertexManager.class.getName())
-                    .setUserPayload(ByteString.copyFrom(new byte[] {1})))
-              .build()
-        )
         .addEdge(
             EdgePlan.newBuilder()
                 .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
@@ -827,7 +831,7 @@ public class TestVertexImpl {
                 .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                 .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
                 .build()
-            )
+        )
         .build();
     return dag;
   }
@@ -1404,71 +1408,71 @@ public class TestVertexImpl {
             .build()
         )
         .addVertex(
-          VertexPlan.newBuilder()
-            .setName("vertex2")
-            .setType(PlanVertexType.NORMAL)
-            .addTaskLocationHint(
-              PlanTaskLocationHint.newBuilder()
-                .addHost("host2")
-                .addRack("rack2")
-                .build()
-            )
-            .setTaskConfig(
-              PlanTaskConfiguration.newBuilder()
-                .setNumTasks(2)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("")
-                .setTaskModule("x2.y2")
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder()
+                        .addHost("host2")
+                        .addRack("rack2")
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(2)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
+                )
+                .addOutEdgeId("e2")
                 .build()
-            )
-            .addOutEdgeId("e2")
-            .build()
         )
         .addVertex(
-          VertexPlan.newBuilder()
-            .setName("vertex3")
-            .setType(PlanVertexType.NORMAL)
-            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
-            .addTaskLocationHint(
-              PlanTaskLocationHint.newBuilder()
-                .addHost("host3")
-                .addRack("rack3")
-                .build()
-            )
-            .setTaskConfig(
-              PlanTaskConfiguration.newBuilder()
-                .setNumTasks(2)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("foo")
-                .setTaskModule("x3.y3")
+            VertexPlan.newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder()
+                        .addHost("host3")
+                        .addRack("rack3")
+                        .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(2)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("foo")
+                        .setTaskModule("x3.y3")
+                        .build()
+                )
+                .addInEdgeId("e1")
+                .addInEdgeId("e2")
+                .addOutEdgeId("e3")
+                .addOutEdgeId("e4")
                 .build()
-            )
-            .addInEdgeId("e1")
-            .addInEdgeId("e2")
-            .addOutEdgeId("e3")
-            .addOutEdgeId("e4")
-            .build()
         )
         .addVertex(
           VertexPlan.newBuilder()
             .setName("vertex4")
             .setType(PlanVertexType.NORMAL)
             .addTaskLocationHint(
-              PlanTaskLocationHint.newBuilder()
-                .addHost("host4")
-                .addRack("rack4")
-                .build()
+                PlanTaskLocationHint.newBuilder()
+                    .addHost("host4")
+                    .addRack("rack4")
+                    .build()
             )
             .setTaskConfig(
-              PlanTaskConfiguration.newBuilder()
-                .setNumTasks(2)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("")
-                .setTaskModule("x4.y4")
-                .build()
+                PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(2)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x4.y4")
+                    .build()
             )
             .addInEdgeId("e3")
             .addOutEdgeId("e5")
@@ -1479,19 +1483,19 @@ public class TestVertexImpl {
             .setName("vertex5")
             .setType(PlanVertexType.NORMAL)
             .addTaskLocationHint(
-              PlanTaskLocationHint.newBuilder()
-                .addHost("host5")
-                .addRack("rack5")
-                .build()
+                PlanTaskLocationHint.newBuilder()
+                    .addHost("host5")
+                    .addRack("rack5")
+                    .build()
             )
             .setTaskConfig(
-              PlanTaskConfiguration.newBuilder()
-                .setNumTasks(2)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("")
-                .setTaskModule("x5.y5")
-                .build()
+                PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(2)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x5.y5")
+                    .build()
             )
             .addInEdgeId("e4")
             .addOutEdgeId("e6")
@@ -1575,9 +1579,10 @@ public class TestVertexImpl {
                     .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
                     .setEdgeManager(
                         TezEntityDescriptorProto.newBuilder()
-                        .setClassName(EdgeManagerForTest.class.getName())
-                        .setUserPayload(ByteString.copyFrom(edgePayload))
-                        .build())
+                            .setClassName(EdgeManagerForTest.class.getName())
+                            .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                                .setUserPayload((ByteString.copyFrom(edgePayload))))
+                            .build())
                     .setId("e4")
                     .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
                     .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -2833,9 +2838,11 @@ public class TestVertexImpl {
     outputs.add(RootInputLeafOutputProto.newBuilder()
         .setControllerDescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName(
-                CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
-                    new CountingOutputCommitter.CountingOutputCommitterConfig()
-                    .toUserPayload())).build())
+                CountingOutputCommitter.class.getName())
+                .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                    .setUserPayload(ByteString.copyFrom(
+                        new CountingOutputCommitter.CountingOutputCommitterConfig()
+                            .toUserPayload())).build()))
         .setName("output_v2")
         .setIODescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
@@ -2946,9 +2953,11 @@ public class TestVertexImpl {
     outputs.add(RootInputLeafOutputProto.newBuilder()
         .setControllerDescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName(
-                CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
+                CountingOutputCommitter.class.getName())
+                .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                .setUserPayload(ByteString.copyFrom(
                     new CountingOutputCommitter.CountingOutputCommitterConfig(
-                        true, true, false).toUserPayload())).build())
+                        true, true, false).toUserPayload())).build()))
         .setName("output_v2")
         .setIODescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
@@ -2990,9 +2999,11 @@ public class TestVertexImpl {
     outputs.add(RootInputLeafOutputProto.newBuilder()
         .setControllerDescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName(
-                CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
+                CountingOutputCommitter.class.getName())
+                .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                .setUserPayload(ByteString.copyFrom(
                     new CountingOutputCommitter.CountingOutputCommitterConfig(
-                        true, true, true).toUserPayload())).build())
+                        true, true, true).toUserPayload())).build()))
         .setName("output_v2")
         .setIODescriptor(
             TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
@@ -4162,7 +4173,8 @@ public class TestVertexImpl {
                 .setEdgeManager(
                     TezEntityDescriptorProto.newBuilder()
                         .setClassName(EdgeManagerForTest.class.getName())
-                        .setUserPayload(ByteString.copyFrom(edgePayload))
+                        .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+                            .setUserPayload(ByteString.copyFrom(edgePayload)))
                         .build())
                 .setOutputVertexName("M5")
                 .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 903b4fe..ad508b6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -325,8 +325,7 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getSourceEdgeManagers().size());
       Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
           deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
-      Assert.assertNotNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
-      Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload().getPayload());
+      Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
       Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
           deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
       Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion(),

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 420c477..62311e9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -76,7 +76,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
     checkNotNull(sourceVertexName, "sourceVertexName is null");
     checkNotNull(inputs, "input map is null");
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+    this.userPayload = userPayload;
     this.inputIndex = inputIndex;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index 693bde8..e71a0bc 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -46,7 +46,7 @@ public class TezMergedInputContextImpl implements MergedInputContext {
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.groupInputName = groupInputName;
     this.groupInputsMap = groupInputsMap;
-    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+    this.userPayload = userPayload;
     this.inputReadyTracker = inputReadyTracker;
     this.workDirs = workDirs;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 94df6aa..7b53075 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -69,7 +69,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
         auxServiceEnv, memDist, outputDescriptor, objectRegistry);
     checkNotNull(outputIndex, "outputIndex is null");
     checkNotNull(destinationVertexName, "destinationVertexName is null");
-    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+    this.userPayload = userPayload;
     this.outputIndex = outputIndex;
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 330c4c9..9b4dc6a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -62,7 +62,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, processorDescriptor, objectRegistry);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+    this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
     this.inputReadyTracker = inputReadyTracker;

http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e8a137c..42b31f1 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -72,7 +72,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
           .next();
       KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
       kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
-      ByteBuffer userPayload = getContext().getUserPayload().getPayload();
+      ByteBuffer userPayload =
+          getContext().getUserPayload() == null ? null : getContext().getUserPayload().getPayload();
       if (userPayload != null) {
         boolean doLocalityCheck = getContext().getUserPayload().getPayload().get(0) > 0 ? true : false;
         if (doLocalityCheck) {


Mime
View raw message