tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/3] TEZ-1194. Make TezUserPayload user facing for payload specification (Tsuyoshi Ozawa and bikas)
Date Tue, 12 Aug 2014 23:56:45 GMT
Repository: tez
Updated Branches:
  refs/heads/master 592884940 -> 6507bda6c


http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java
index a24813a..8b99c17 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java
@@ -21,6 +21,7 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
@@ -162,7 +164,7 @@ public class ShuffledUnorderedKVInputConfigurer {
    */
   public byte[] toByteArray() {
     try {
-      return TezUtils.createUserPayloadFromConf(conf);
+      return TezUtils.createUserPayloadFromConf(conf).getPayload();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -170,7 +172,7 @@ public class ShuffledUnorderedKVInputConfigurer {
 
   public void fromByteArray(byte[] payload) {
     try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
+      this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
index 728f8b3..356f082 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
@@ -21,9 +21,11 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +33,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 
@@ -89,8 +92,8 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
   }
 
   @Override
-  public byte[] getOutputPayload() {
-    return outputConf.toByteArray();
+  public UserPayload getOutputPayload() {
+    return new UserPayload(outputConf.toByteArray());
   }
 
   @Override
@@ -99,8 +102,8 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
   }
 
   @Override
-  public byte[] getInputPayload() {
-    return inputConf.toByteArray();
+  public UserPayload getInputPayload() {
+    return new UserPayload(inputConf.toByteArray());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
index 20cc449..182b9f7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
@@ -21,9 +21,11 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +33,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 
@@ -65,8 +68,8 @@ public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBased
   }
 
   @Override
-  public byte[] getOutputPayload() {
-    return outputConf.toByteArray();
+  public UserPayload getOutputPayload() {
+    return new UserPayload(outputConf.toByteArray());
   }
 
   @Override
@@ -75,8 +78,8 @@ public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBased
   }
 
   @Override
-  public byte[] getInputPayload() {
-    return inputConf.toByteArray();
+  public UserPayload getInputPayload() {
+    return new UserPayload(inputConf.toByteArray());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 65938a8..4b0d93b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.InputContext;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index d0770ec..366fd38 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -36,6 +36,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 10bbb69..e570ddf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -188,7 +188,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
 
     payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
-    byte[] payloadBytes = payloadProto.toByteArray();
+    byte[] payload = payloadProto.toByteArray();
 
     long outputSize = getContext().getCounters()
         .findCounter(TaskCounter.OUTPUT_BYTES).getValue();
@@ -201,7 +201,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
     List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs() + 1);
     events.add(vmEvent);
 
-    CompositeDataMovementEvent csdme = new CompositeDataMovementEvent(0, getNumPhysicalOutputs(), payloadBytes);
+    CompositeDataMovementEvent csdme = new CompositeDataMovementEvent(0, getNumPhysicalOutputs(), payload);
     events.add(csdme);
 
     return events;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index e61013d..8697b23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -70,8 +70,7 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
   @Override
   public synchronized List<Event> initialize()
       throws Exception {
-    this.conf = TezUtils.createConfFromUserPayload(getContext()
-        .getUserPayload());
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
         getContext().getWorkDirs());
 
@@ -148,8 +147,7 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
     }
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
 
-    DataMovementEvent dmEvent = new DataMovementEvent(0,
-        payloadProto.toByteArray());
+    DataMovementEvent dmEvent = new DataMovementEvent(0, payloadProto.toByteArray());
     List<Event> events = Lists.newArrayListWithCapacity(1);
     events.add(dmEvent);
     return events;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 4b7d01b..72a4f39 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.processor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -110,8 +111,8 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
       return Integer.toString(timeToSleepMS).getBytes();
     }
 
-    public void fromUserPayload(byte[] userPayload) {
-      timeToSleepMS = Integer.valueOf(new String(userPayload)).intValue();
+    public void fromUserPayload(UserPayload userPayload) {
+      timeToSleepMS = Integer.valueOf(new String(userPayload.getPayload())).intValue();
     }
 
     public int getTimeToSleepMS() {

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 5efb79a..01e0fa9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -35,6 +35,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -127,11 +128,11 @@ public class TestShuffleVertexManager {
               ((Map<String, EdgeManagerPluginDescriptor>)invocation.getArguments()[2]).entrySet()) {
 
 
-            final byte[] userPayload = entry.getValue().getUserPayload();
+            final UserPayload userPayload = entry.getValue().getUserPayload();
             EdgeManagerPluginContext emContext = new EdgeManagerPluginContext() {
               @Override
               public byte[] getUserPayload() {
-                return userPayload;
+                return userPayload == null ? null : userPayload.getPayload();
               }
 
               @Override
@@ -486,12 +487,12 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.numSourceTasksCompleted == 4);
 
   }
-  
+
   private ShuffleVertexManager createManager(Configuration conf, 
       VertexManagerPluginContext context, float min, float max) {
     conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
     conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
-    byte[] payload;
+    UserPayload payload;
     try {
       payload = TezUtils.createUserPayloadFromConf(conf);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index ed38569..b4dd62c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -2,6 +2,7 @@ package org.apache.tez.runtime.library.common;
 
 import com.google.common.collect.Ordering;
 import com.google.common.collect.TreeMultimap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.GenericCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
@@ -379,7 +381,7 @@ public class TestValuesIterator {
     doReturn(1).when(inputContext).getInputIndex();
     doReturn("srcVertex").when(inputContext).getSourceVertexName();
     doReturn(1).when(inputContext).getTaskVertexIndex();
-    doReturn(new byte[1024]).when(inputContext).getUserPayload();
+    doReturn(new UserPayload(new byte[1024])).when(inputContext).getUserPayload();
     return inputContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
index 42e383d..17b048d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
@@ -67,8 +67,8 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
 
     OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileSortedOutputConfigurer rebuiltOutput = new OnFileSortedOutputConfigurer();
     rebuiltOutput.fromByteArray(outputBytes);
@@ -96,8 +96,8 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
 
     OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileSortedOutputConfigurer rebuiltOutput = new OnFileSortedOutputConfigurer();
     rebuiltOutput.fromByteArray(outputBytes);
@@ -141,8 +141,8 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
 
     OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileSortedOutputConfigurer rebuiltOutput = new OnFileSortedOutputConfigurer();
     rebuiltOutput.fromByteArray(outputBytes);
@@ -199,8 +199,8 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
 
     OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileSortedOutputConfigurer rebuiltOutput = new OnFileSortedOutputConfigurer();
     rebuiltOutput.fromByteArray(outputBytes);
@@ -262,8 +262,8 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
 
     OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileSortedOutputConfigurer rebuiltOutput = new OnFileSortedOutputConfigurer();
     rebuiltOutput.fromByteArray(outputBytes);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
index cf9d63a..a6efdb6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
@@ -68,8 +68,8 @@ public class TestUnorderedPartitionedKVEdgeConfigurer {
 
     UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileUnorderedPartitionedKVOutputConfigurer rebuiltOutput =
         new OnFileUnorderedPartitionedKVOutputConfigurer();
@@ -103,8 +103,8 @@ public class TestUnorderedPartitionedKVEdgeConfigurer {
 
     UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileUnorderedPartitionedKVOutputConfigurer rebuiltOutput =
         new OnFileUnorderedPartitionedKVOutputConfigurer();
@@ -150,8 +150,8 @@ public class TestUnorderedPartitionedKVEdgeConfigurer {
 
     UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileUnorderedPartitionedKVOutputConfigurer rebuiltOutput =
         new OnFileUnorderedPartitionedKVOutputConfigurer();

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
index 3b04375..ba9cd56 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
@@ -60,8 +60,8 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer {
 
     UnorderedUnpartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileUnorderedKVOutputConfigurer rebuiltOutput =
         new OnFileUnorderedKVOutputConfigurer();
@@ -95,8 +95,8 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer {
 
     UnorderedUnpartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileUnorderedKVOutputConfigurer rebuiltOutput =
         new OnFileUnorderedKVOutputConfigurer();
@@ -140,8 +140,8 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer {
 
     UnorderedUnpartitionedKVEdgeConfigurer configuration = builder.build();
 
-    byte[] outputBytes = configuration.getOutputPayload();
-    byte[] inputBytes = configuration.getInputPayload();
+    byte[] outputBytes = configuration.getOutputPayload().getPayload();
+    byte[] inputBytes = configuration.getInputPayload().getPayload();
 
     OnFileUnorderedKVOutputConfigurer rebuiltOutput =
         new OnFileUnorderedKVOutputConfigurer();

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 33ad480..f873aec 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.utils.EnvironmentUpdateUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
@@ -233,7 +234,7 @@ public class TestOnFileSortedOutput {
 
   private OutputContext createTezOutputContext() throws IOException {
     String[] workingDirs = { workingDir.toString() };
-    byte[] payLoad = TezUtils.createUserPayloadFromConf(conf);
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
     DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
     serviceProviderMetaData.writeInt(PORT);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 00940ef..7c4b605 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -105,7 +106,7 @@ public class TestOnFileUnorderedKVOutput {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
     TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
     TezCounters counters = new TezCounters();
-    byte[] userPayload = TezUtils.createUserPayloadFromConf(conf);
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
     RuntimeTask runtimeTask = mock(RuntimeTask.class);
     
     int shufflePort = 2112;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 2c56ceb..e07eed8 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -89,6 +89,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -185,7 +186,7 @@ public class TestMRRJobsDAGApi {
 
     DAG dag = new DAG("TezSleepProcessor");
     Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        SleepProcessor.class.getName()).setUserPayload(new UserPayload(spConf.toUserPayload())), 1,
         Resource.newInstance(1024, 1));
     dag.addVertex(vertex);
 
@@ -234,7 +235,7 @@ public class TestMRRJobsDAGApi {
       for (int dagIndex = 1; dagIndex <= 2; dagIndex++) {
         DAG dag = new DAG("TezSleepProcessor");
         Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+            SleepProcessor.class.getName()).setUserPayload(new UserPayload(spConf.toUserPayload())), 1,
             Resource.newInstance(1024, 1));
         dag.addVertex(vertex);
 
@@ -273,7 +274,7 @@ public class TestMRRJobsDAGApi {
 
     DAG dag = new DAG("TezSleepProcessor");
     Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        SleepProcessor.class.getName()).setUserPayload(new UserPayload(spConf.toUserPayload())), 1,
         Resource.newInstance(1024, 1));
     dag.addVertex(vertex);
 
@@ -315,7 +316,7 @@ public class TestMRRJobsDAGApi {
 
     DAG dag = new DAG("TezSleepProcessorHistoryLogging");
     Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
+        SleepProcessor.class.getName()).setUserPayload(new UserPayload(spConf.toUserPayload())), 2,
         Resource.newInstance(1024, 1));
     dag.addVertex(vertex);
 
@@ -652,9 +653,9 @@ public class TestMRRJobsDAGApi {
         .valueOf(new Random().nextInt(100000))));
     TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
 
-    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
-    byte[] stage2Payload = MRHelpers.createUserPayloadFromConf(stage2Conf);
-    byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
+    UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    UserPayload stage2Payload = MRHelpers.createUserPayloadFromConf(stage2Conf);
+    UserPayload stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
     
     DAG dag = new DAG("testMRRSleepJobDagSubmit-" + random.nextInt(1000));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
index 5dfd179..8826660 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
@@ -24,6 +24,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -45,7 +46,7 @@ public class SimpleTestDAG {
   
   public static DAG createDAG(String name, 
       Configuration conf) throws Exception {
-    byte[] payload = null;
+    UserPayload payload = new UserPayload(null);
     int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
index fd29afc..967f0ed 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
@@ -24,6 +24,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -47,7 +48,7 @@ public class SimpleTestDAG3Vertices {
   
   public static DAG createDAG(String name, 
       Configuration conf) throws Exception {
-    byte[] payload = null;
+    UserPayload payload = null;
     int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index 003f615..fe828b6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -32,6 +32,7 @@ import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
@@ -176,9 +177,8 @@ public class TestDAGRecovery2 {
     DAG dag = SimpleVTestDAG.createDAG("FailingCommitterDAG", null);
     OutputDescriptor od =
         new OutputDescriptor(MultiAttemptDAG.NoOpOutput.class.getName());
-    od.setUserPayload(new
-        MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true)
-            .toUserPayload());
+    od.setUserPayload(new UserPayload(
+        new MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true).toUserPayload()));
     OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(
         MultiAttemptDAG.FailingOutputCommitter.class.getName());
     dag.getVertex("v3").addDataSink("FailingOutput", new DataSinkDescriptor(od, ocd, null));

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 525db69..17718bb 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Reader;
@@ -120,9 +121,12 @@ public class TestInput extends AbstractLogicalInput {
     }
   }
 
-  public static InputDescriptor getInputDesc(byte[] payload) {
-    return new InputDescriptor(TestInput.class.getName()).
-        setUserPayload(payload);
+  public static InputDescriptor getInputDesc(UserPayload payload) {
+    InputDescriptor desc = new InputDescriptor(TestInput.class.getName());
+    if (payload != null) {
+      desc.setUserPayload(payload);
+    }
+    return desc;
   }
 
   public int doRead() {
@@ -231,7 +235,7 @@ public class TestInput extends AbstractLogicalInput {
   public List<Event> initialize() throws Exception {
     getContext().requestInitialMemory(0l, null); //Mandatory call.
     getContext().inputIsReady();
-    if (getContext().getUserPayload() != null) {
+    if (getContext().getUserPayload() != null && getContext().getUserPayload().hasPayload()) {
       String vName = getContext().getTaskVertexName();
       conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
       doFail = conf.getBoolean(getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL, vName), false);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index bbc909c..5f73933 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputContext;
@@ -40,9 +41,12 @@ public class TestOutput extends AbstractLogicalOutput {
     super(outputContext, numPhysicalOutputs);
   }
 
-  public static OutputDescriptor getOutputDesc(byte[] payload) {
-    return new OutputDescriptor(TestOutput.class.getName()).
-        setUserPayload(payload);
+  public static OutputDescriptor getOutputDesc(UserPayload payload) {
+    OutputDescriptor desc = new OutputDescriptor(TestOutput.class.getName());
+    if (payload != null) {
+      desc.setUserPayload(payload);
+    }
+    return desc;
   }
   
   int output;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index c807b27..fa556e6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -99,9 +100,9 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
     super(context);
   }
 
-  public static ProcessorDescriptor getProcDesc(byte[] payload) {
-    return new ProcessorDescriptor(TestProcessor.class.getName()).
-        setUserPayload(payload);
+  public static ProcessorDescriptor getProcDesc(UserPayload payload) {
+    return new ProcessorDescriptor(TestProcessor.class.getName()).setUserPayload(
+        payload == null ? new UserPayload (null) : payload);
   }
 
   void throwException(String msg) {
@@ -121,7 +122,7 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
   
   @Override
   public void initialize() throws Exception {
-    if (getContext().getUserPayload() != null) {
+    if (getContext().getUserPayload() != null && getContext().getUserPayload().hasPayload()) {
       String vName = getContext().getTaskVertexName();
       conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
       verifyValue = conf.getInt(

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 38c5c2e..4dc28b1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -20,6 +20,7 @@ package org.apache.tez.test.dag;
 
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +33,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -111,7 +113,7 @@ public class MultiAttemptDAG {
       if (numCompletions.get() >= numSourceTasks
           && !tasksScheduled) {
         tasksScheduled = true;
-        String payload = new String(getContext().getUserPayload());
+        String payload = new String(getContext().getUserPayload().getPayload());
         int successAttemptId = Integer.valueOf(payload);
         LOG.info("Checking whether to crash AM or schedule tasks"
             + ", successfulAttemptID=" + successAttemptId
@@ -162,7 +164,7 @@ public class MultiAttemptDAG {
     public void initialize() throws Exception {
       FailingOutputCommitterConfig config = new
           FailingOutputCommitterConfig();
-      config.fromUserPayload(getContext().getOutputUserPayload());
+      config.fromUserPayload(getContext().getOutputUserPayload().getPayload());
       failOnCommit = config.failOnCommit;
     }
 
@@ -316,7 +318,7 @@ public class MultiAttemptDAG {
 
   public static DAG createDAG(String name,
       Configuration conf) throws Exception {
-    byte[] payload = null;
+    UserPayload payload = new UserPayload(null);
     int taskCount = MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS, MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT);
@@ -330,13 +332,13 @@ public class MultiAttemptDAG {
     // Make each vertex manager fail on appropriate attempt
     v1.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
         FailOnAttemptVertexManagerPlugin.class.getName())
-        .setUserPayload(new String("1").getBytes()));
+        .setUserPayload(new UserPayload(new String("1").getBytes())));
     v2.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
         FailOnAttemptVertexManagerPlugin.class.getName())
-        .setUserPayload(new String("2").getBytes()));
+        .setUserPayload(new UserPayload(new String("2").getBytes())));
     v3.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
         FailOnAttemptVertexManagerPlugin.class.getName())
-        .setUserPayload(new String("3").getBytes()));
+        .setUserPayload(new UserPayload(new String("3").getBytes())));
     dag.addVertex(v1).addVertex(v2).addVertex(v3);
     dag.addEdge(new Edge(v1, v2,
         new EdgeProperty(DataMovementType.SCATTER_GATHER,

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
index 6c2bfbf..526e63b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
@@ -24,6 +24,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -47,7 +48,7 @@ public class SimpleReverseVTestDAG {
   
   public static DAG createDAG(String name, 
       Configuration conf) throws Exception {
-    byte[] payload = null;
+    UserPayload payload = new UserPayload(null);
     int taskCount = TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
index 9fcfe11..2bb8971 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
@@ -24,6 +24,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -47,7 +48,7 @@ public class SimpleVTestDAG {
   
   public static DAG createDAG(String name, 
       Configuration conf) throws Exception {
-    byte[] payload = null;
+    UserPayload payload = new UserPayload(null);
     int taskCount = TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT;
     if (conf != null) {
       taskCount = conf.getInt(TEZ_SIMPLE_V_DAG_NUM_TASKS, TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
index f6a9d88..50f2a70 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
@@ -20,6 +20,7 @@ package org.apache.tez.test.dag;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.test.TestProcessor;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
index c740ec3..d368c02 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
@@ -20,6 +20,7 @@ package org.apache.tez.test.dag;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.test.TestProcessor;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
index de74931..63850ec 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
@@ -23,6 +23,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -49,7 +50,7 @@ import org.apache.tez.test.TestProcessor;
 public class TwoLevelsFailingDAG {
     static Resource defaultResource = Resource.newInstance(100, 0);
     protected static DAG dag;
-    protected static byte[] payload;
+    protected static UserPayload payload = new UserPayload(null);
     protected static Vertex l1v1, l1v2, l1v3, l1v4;
     protected static Vertex l2v1, l2v2, l2v3, l2v4;
 


Mime
View raw message