tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1449. Change user payloads to work with a byte buffer (Siddharth Seth via bikas)
Date Wed, 20 Aug 2014 00:38:53 GMT
Repository: tez
Updated Branches:
  refs/heads/master 54a429e0b -> 57bd9f7e3


TEZ-1449. Change user payloads to work with a byte buffer (Siddharth Seth via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57bd9f7e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57bd9f7e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57bd9f7e

Branch: refs/heads/master
Commit: 57bd9f7e3e1a8c34687f0c48f16bc7d5552c7874
Parents: 54a429e
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Aug 19 17:38:50 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Aug 19 17:38:50 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../api/events/CompositeDataMovementEvent.java  | 11 ++++++-----
 .../runtime/api/events/DataMovementEvent.java   | 20 +++++++++++---------
 .../api/events/InputDataInformationEvent.java   | 13 ++++++++-----
 .../api/events/InputInitializerEvent.java       | 12 +++++++-----
 .../api/events/InputUpdatePayloadEvent.java     | 12 +++++++-----
 .../runtime/api/events/VertexManagerEvent.java  | 12 +++++++-----
 .../event/TestCompositeDataMovementEvent.java   |  4 +++-
 .../app/dag/impl/RootInputVertexManager.java    |  2 +-
 .../apache/tez/dag/app/dag/impl/TestEdge.java   |  9 ++++++---
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 12 ++++++------
 .../common/MRInputAMSplitGenerator.java         |  2 +-
 .../common/MRInputSplitDistributor.java         |  7 +++++--
 .../org/apache/tez/mapreduce/input/MRInput.java |  3 ++-
 .../tez/mapreduce/input/MultiMRInput.java       |  3 ++-
 .../common/TestMRInputSplitDistributor.java     |  4 ++--
 .../tez/mapreduce/input/TestMultiMRInput.java   | 11 ++++++-----
 .../org/apache/tez/common/ProtoConverters.java  | 10 +++++-----
 .../vertexmanager/ShuffleVertexManager.java     |  2 +-
 .../shuffle/impl/ShuffleInputEventHandler.java  |  3 ++-
 .../writers/UnorderedPartitionedKVWriter.java   |  2 +-
 .../output/OrderedPartitionedKVOutput.java      |  4 ++--
 .../library/output/UnorderedKVOutput.java       |  2 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |  4 +++-
 .../vertexmanager/TestShuffleVertexManager.java | 11 ++++++-----
 .../impl/TestShuffleInputEventHandler.java      |  3 ++-
 .../TestUnorderedPartitionedKVWriter.java       | 11 +++++++----
 .../library/output/TestOnFileSortedOutput.java  |  8 +++++---
 .../output/TestOnFileUnorderedKVOutput.java     |  3 ++-
 .../impl/TestShuffleInputEventHandlerImpl.java  |  3 ++-
 .../java/org/apache/tez/test/TestInput.java     |  2 +-
 .../java/org/apache/tez/test/TestOutput.java    |  3 ++-
 32 files changed, 123 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e46867e..c611668 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -78,6 +78,7 @@ INCOMPATIBLE CHANGES
   TEZ-1246. Replace constructors with create() methods for DAG, Vertex, Edge etc in the API
   TEZ-1455. Replace deprecated junit.framework.Assert with org.junit.Assert
   TEZ-1465. Update and document IntersectExample. Change name to JoinExample
+  TEZ-1449. Change user payloads to work with a byte buffer
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
index 87403de..b38fda3 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.api.events;
 
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -42,9 +43,9 @@ public class CompositeDataMovementEvent extends Event {
   protected final int count;
   protected int version;
 
-  protected final byte[] userPayload;
+  protected final ByteBuffer userPayload;
 
-  private CompositeDataMovementEvent(int srcIndexStart, int count, byte[] userPayload) {
+  private CompositeDataMovementEvent(int srcIndexStart, int count, ByteBuffer userPayload) {
     this.sourceIndexStart = srcIndexStart;
     this.count = count;
     this.userPayload = userPayload;
@@ -61,7 +62,7 @@ public class CompositeDataMovementEvent extends Event {
    *          the common payload between all the events.
    */
   public static CompositeDataMovementEvent create(int srcIndexStart, int count,
-                                                  byte[] userPayload) {
+                                                  ByteBuffer userPayload) {
     return new CompositeDataMovementEvent(srcIndexStart, count, userPayload);
   }
 
@@ -73,8 +74,8 @@ public class CompositeDataMovementEvent extends Event {
     return count;
   }
 
-  public byte[] getUserPayload() {
-    return userPayload;
+  public ByteBuffer getUserPayload() {
+    return userPayload == null ? null : userPayload.asReadOnlyBuffer();
   }
 
   public void setVersion(int version) {

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
index 88b36cb..b9c1cc4 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api.events;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.tez.runtime.api.Event;
@@ -46,7 +48,7 @@ public final class DataMovementEvent extends Event {
   /**
    * User Payload for this Event
    */
-  private final byte[] userPayload;
+  private final ByteBuffer userPayload;
 
   /**
    * Version number to indicate what attempt generated this Event
@@ -55,7 +57,7 @@ public final class DataMovementEvent extends Event {
 
 
   private DataMovementEvent(int sourceIndex,
-                            byte[] userPayload) {
+                            ByteBuffer userPayload) {
     this.userPayload = userPayload;
     this.sourceIndex = sourceIndex;
   }
@@ -64,14 +66,14 @@ public final class DataMovementEvent extends Event {
   private DataMovementEvent(int sourceIndex,
                             int targetIndex,
                             int version,
-                            byte[] userPayload) {
+                            ByteBuffer userPayload) {
     this.userPayload = userPayload;
     this.sourceIndex = sourceIndex;
     this.version = version;
     this.targetIndex = targetIndex;
   }
 
-  private DataMovementEvent(byte[] userPayload) {
+  private DataMovementEvent(ByteBuffer userPayload) {
     this(-1, userPayload);
   }
 
@@ -82,7 +84,7 @@ public final class DataMovementEvent extends Event {
    * @param userPayload User Payload of the User Event
    */
   public static DataMovementEvent create(int sourceIndex,
-                                         byte[] userPayload) {
+                                         ByteBuffer userPayload) {
     return new DataMovementEvent(sourceIndex, userPayload);
   }
 
@@ -90,7 +92,7 @@ public final class DataMovementEvent extends Event {
    * Constructor for Processor-generated User Events
    * @param userPayload
    */
-  public static DataMovementEvent create(byte[] userPayload) {
+  public static DataMovementEvent create(ByteBuffer userPayload) {
     return new DataMovementEvent(userPayload);
   }
 
@@ -98,12 +100,12 @@ public final class DataMovementEvent extends Event {
   public static DataMovementEvent create(int sourceIndex,
                                          int targetIndex,
                                          int version,
-                                         byte[] userPayload) {
+                                         ByteBuffer userPayload) {
     return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload);
   }
 
-  public byte[] getUserPayload() {
-    return userPayload;
+  public ByteBuffer getUserPayload() {
+    return userPayload == null ? null : userPayload.asReadOnlyBuffer();
   }
 
   public int getSourceIndex() {

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java
index ec1d85e..ffeb7ff 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api.events;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.tez.dag.api.VertexManagerPlugin;
@@ -43,13 +45,14 @@ import org.apache.tez.runtime.api.InputInitializer;
 @Public
 public final class InputDataInformationEvent extends Event {
 
+
   private final int sourceIndex;
   private int targetIndex; // TODO Likely to be multiple at a later point.
-  private final byte[] userPayload;
+  private final ByteBuffer userPayload;
   private final Object userPayloadObject;
   
 
-  private InputDataInformationEvent(int srcIndex, byte[] userPayload) {
+  private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) {
     this.sourceIndex = srcIndex;
     this.userPayload = userPayload;
     this.userPayloadObject = null;
@@ -66,7 +69,7 @@ public final class InputDataInformationEvent extends Event {
    * @param srcIndex the src index
    * @param userPayload the serialized payload
    */
-  public static InputDataInformationEvent create(int srcIndex, byte[] userPayload) {
+  public static InputDataInformationEvent create(int srcIndex, ByteBuffer userPayload) {
     return new InputDataInformationEvent(srcIndex, userPayload);
   }
 
@@ -86,8 +89,8 @@ public final class InputDataInformationEvent extends Event {
     this.targetIndex = target;
   }
   
-  public byte[] getUserPayload() {
-    return userPayload;
+  public ByteBuffer getUserPayload() {
+    return userPayload == null ? null : userPayload.asReadOnlyBuffer();
   }
   
   public Object getDeserializedUserPayload() {

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
index e4e9ef9..3c5e78e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
@@ -20,6 +20,8 @@
 
 package org.apache.tez.runtime.api.events;
 
+import java.nio.ByteBuffer;
+
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -38,10 +40,10 @@ public class InputInitializerEvent extends Event {
   private String targetVertexName;
   private String targetInputName;
 
-  private byte[] eventPayload;
+  private ByteBuffer eventPayload;
 
   private InputInitializerEvent(String targetVertexName, String targetInputName,
-                                byte[] eventPayload) {
+                                ByteBuffer eventPayload) {
     Preconditions.checkNotNull(targetVertexName, "TargetVertexName cannot be null");
     Preconditions.checkNotNull(targetInputName, "TargetInputName cannot be null");
     this.targetVertexName = targetVertexName;
@@ -56,7 +58,7 @@ public class InputInitializerEvent extends Event {
    *                         payload to a few KB at max
    */
   public static InputInitializerEvent create(String targetVertexName, String targetInputName,
-                                             byte[] eventPayload) {
+                                             ByteBuffer eventPayload) {
     return new InputInitializerEvent(targetVertexName, targetInputName, eventPayload);
   }
 
@@ -83,7 +85,7 @@ public class InputInitializerEvent extends Event {
    *
    * @return a byte representation of the payload
    */
-  public byte[] getUserPayload() {
-    return this.eventPayload;
+  public ByteBuffer getUserPayload() {
+    return eventPayload == null ? null : eventPayload.asReadOnlyBuffer();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java
index 2d1a8c9..2cfec69 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api.events;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.tez.runtime.api.Event;
@@ -36,18 +38,18 @@ import com.google.common.base.Preconditions;
 @Public
 public class InputUpdatePayloadEvent extends Event {
 
-  private final byte[] userPayload;
+  private final ByteBuffer userPayload;
 
-  private InputUpdatePayloadEvent(byte[] userPayload) {
+  private InputUpdatePayloadEvent(ByteBuffer userPayload) {
     Preconditions.checkNotNull(userPayload);
     this.userPayload = userPayload;
   }
 
-  public static InputUpdatePayloadEvent create(byte[] userPayload) {
+  public static InputUpdatePayloadEvent create(ByteBuffer userPayload) {
     return new InputUpdatePayloadEvent(userPayload);
   }
 
-  public byte[] getUserPayload() {
-    return userPayload;
+  public ByteBuffer getUserPayload() {
+    return userPayload == null ? null : userPayload.asReadOnlyBuffer();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
index d93294c..484087e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api.events;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.tez.runtime.api.Event;
@@ -41,9 +43,9 @@ public class VertexManagerEvent extends Event {
   /**
    * User payload to be sent
    */
-  private final byte[] userPayload;
+  private final ByteBuffer userPayload;
 
-  private VertexManagerEvent(String vertexName, byte[] userPayload) {
+  private VertexManagerEvent(String vertexName, ByteBuffer userPayload) {
     Preconditions.checkArgument(vertexName != null);
     Preconditions.checkArgument(userPayload != null);
     this.targetVertexName = vertexName;
@@ -55,7 +57,7 @@ public class VertexManagerEvent extends Event {
    * @param vertexName
    * @param userPayload This should not be modified since a reference is kept
    */
-  public static VertexManagerEvent create(String vertexName, byte[] userPayload) {
+  public static VertexManagerEvent create(String vertexName, ByteBuffer userPayload) {
     return new VertexManagerEvent(vertexName, userPayload);
   }
 
@@ -63,7 +65,7 @@ public class VertexManagerEvent extends Event {
     return targetVertexName;
   }
   
-  public byte[] getUserPayload() {
-    return userPayload;
+  public ByteBuffer getUserPayload() {
+    return userPayload == null ? null : userPayload.asReadOnlyBuffer();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
index 20e85ea..f162d1d 100644
--- a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
+++ b/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
@@ -17,13 +17,15 @@ package org.apache.tez.runtime.api.event;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.nio.ByteBuffer;
+
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestCompositeDataMovementEvent {
-  byte[] userPayload = "Dummy userPayLoad".getBytes();
+  ByteBuffer userPayload = ByteBuffer.wrap("Dummy userPayLoad".getBytes());
 
   @Test
   public void testGetCount(){

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 34e3af3..e6ffdc5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -98,7 +98,7 @@ public class RootInputVertexManager extends VertexManagerPlugin {
         // No tasks should have been started yet. Checked by initial state check.
         Preconditions.checkState(dataInformationEventSeen == false);
         inputDescriptor.setUserPayload(UserPayload.create(
-            ByteBuffer.wrap(((InputUpdatePayloadEvent) event).getUserPayload())));
+            ((InputUpdatePayloadEvent) event).getUserPayload()));
       } else if (event instanceof InputDataInformationEvent) {
         dataInformationEventSeen = true;
         // # Tasks should have been set by this point.

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index 19e7067..f572237 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -88,7 +89,7 @@ public class TestEdge {
     
     // Verification via a CompositeEvent
     CompositeDataMovementEvent cdmEvent = CompositeDataMovementEvent.create(0, destTasks.size(),
-        "bytes".getBytes());
+        ByteBuffer.wrap("bytes".getBytes()));
     cdmEvent.setVersion(2); // AttemptNum
     TezEvent tezEvent = new TezEvent(cdmEvent, srcMeta);
     // Event setup to look like it would after the Vertex is done with it.
@@ -104,7 +105,7 @@ public class TestEdge {
     // Same Verification via regular DataMovementEvents
     reset(eventHandler);
     for (int i = 0 ; i < destTasks.size() ; i++) {
-      DataMovementEvent dmEvent = DataMovementEvent.create(i, "bytes".getBytes());
+      DataMovementEvent dmEvent = DataMovementEvent.create(i, ByteBuffer.wrap("bytes".getBytes()));
       dmEvent.setVersion(2);
       tezEvent = new TezEvent(dmEvent, srcMeta);
       edge.sendTezEventToDestinationTasks(tezEvent);
@@ -133,7 +134,9 @@ public class TestEdge {
       assertEquals(srcTAID.getId(), dmEvent.getVersion());
       assertEquals(count, dmEvent.getSourceIndex());
       assertEquals(srcTAID.getTaskID().getId(), dmEvent.getTargetIndex());
-      assertTrue(Arrays.equals("bytes".getBytes(), dmEvent.getUserPayload()));
+      byte[] res = new byte[dmEvent.getUserPayload().limit() - dmEvent.getUserPayload().position()];
+      dmEvent.getUserPayload().slice().get(res);
+      assertTrue(Arrays.equals("bytes".getBytes(), res));
 
       count++;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c84c1a6..30b4275 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -1894,10 +1894,10 @@ public class TestVertexImpl {
 
     List<TezEvent> taskEvents = Lists.newLinkedList();
     TezEvent tezEvent1 = new TezEvent(
-        CompositeDataMovementEvent.create(0, 1, new byte[0]),
+        CompositeDataMovementEvent.create(0, 1, ByteBuffer.wrap(new byte[0])),
         new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
     TezEvent tezEvent2 = new TezEvent(
-        DataMovementEvent.create(0, new byte[0]),
+        DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])),
         new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
     taskEvents.add(tezEvent1);
     taskEvents.add(tezEvent2);
@@ -2911,10 +2911,10 @@ public class TestVertexImpl {
     TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
     TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
     events.add(new TezEvent(
-        VertexManagerEvent.create("vertex2", new byte[0]), new EventMetaData(
+        VertexManagerEvent.create("vertex2", ByteBuffer.wrap(new byte[0])), new EventMetaData(
             EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2",
             ta0_t0_v1)));
-    events.add(new TezEvent(InputDataInformationEvent.create(0, new byte[0]),
+    events.add(new TezEvent(InputDataInformationEvent.create(0, ByteBuffer.wrap(new byte[0])),
         new EventMetaData(EventProducerConsumerType.INPUT, "vertex2",
             "NULL_VERTEX", null)));
     dispatcher.getEventHandler().handle(
@@ -3244,7 +3244,7 @@ public class TestVertexImpl {
 
     public void completeInputDistribution(byte[] payload) {
       List<Event> events = Lists.newArrayListWithCapacity(1);
-      InputUpdatePayloadEvent event = InputUpdatePayloadEvent.create(payload);
+      InputUpdatePayloadEvent event = InputUpdatePayloadEvent.create(ByteBuffer.wrap(payload));
       events.add(event);
       eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
           .get(0).getName(), events));
@@ -3462,7 +3462,7 @@ public class TestVertexImpl {
         lock.unlock();
       }
       initComplete.set(true);
-      InputDataInformationEvent diEvent = InputDataInformationEvent.create(0, new byte[]{0});
+      InputDataInformationEvent diEvent = InputDataInformationEvent.create(0, ByteBuffer.wrap(new byte[]{0}));
       List<Event> eventList = new LinkedList<Event>();
       eventList.add(diEvent);
       return eventList;

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index a2777a8..2408ddb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -143,7 +143,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
       for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
         // Unnecessary array copy, can be avoided by using ByteBuffer instead of a raw array.
         InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++,
-            mrSplit.toByteArray());
+            mrSplit.toByteString().asReadOnlyByteBuffer());
         events.add(diEvent);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 1c28c2b..e28a3a5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.common;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -38,12 +39,14 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 
 /**
  * Implements an {@link InputInitializer} that distributes Map Reduce 
@@ -96,7 +99,7 @@ public class MRInputSplitDistributor extends InputInitializer {
 
     List<Event> events = Lists.newArrayListWithCapacity(this.splitsProto.getSplitsCount() + 1);
     InputUpdatePayloadEvent updatePayloadEvent = InputUpdatePayloadEvent.create(
-        updatedPayloadBuilder.build().toByteArray());
+        updatedPayloadBuilder.build().toByteString().asReadOnlyByteBuffer());
 
     events.add(updatePayloadEvent);
     int count = 0;
@@ -108,7 +111,7 @@ public class MRInputSplitDistributor extends InputInitializer {
       if (sendSerializedEvents) {
         // Unnecessary array copy, can be avoided by using ByteBuffer instead of
         // a raw array.
-        diEvent = InputDataInformationEvent.create(count++, mrSplit.toByteArray());
+        diEvent = InputDataInformationEvent.create(count++, mrSplit.toByteString().asReadOnlyByteBuffer());
       } else {
         if (useNewApi) {
           org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index ebc2d45..7deef7c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -556,7 +557,7 @@ public class MRInput extends MRInputBase {
   private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException {
     LOG.info("Initializing RecordReader from event");
     Preconditions.checkState(initEvent != null, "InitEvent must be specified");
-    MRSplitProto splitProto = MRSplitProto.parseFrom(initEvent.getUserPayload());
+    MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
     Object split = null;
     if (useNewApi) {
       split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf);

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index b360840..4ab5a52 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -28,6 +28,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -114,7 +115,7 @@ public class MultiMRInput extends MRInputBase {
   private MRReader initFromEvent(InputDataInformationEvent event) throws IOException {
     Preconditions.checkState(event != null, "Event must be specified");
     LOG.info("Initializing Reader: " + eventCount.get());
-    MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+    MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
     Object split = null;
     MRReader reader = null;
     JobConf localJobConf = new JobConf(jobConf);

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index ebc12d4..b1a0880 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -90,12 +90,12 @@ public class TestMRInputSplitDistributor {
     assertNotNull(diEvent1.getUserPayload());
     assertNotNull(diEvent2.getUserPayload());
 
-    MRSplitProto event1Proto = MRSplitProto.parseFrom(diEvent1.getUserPayload());
+    MRSplitProto event1Proto = MRSplitProto.parseFrom(ByteString.copyFrom(diEvent1.getUserPayload()));
     InputSplit is1 = MRInputUtils.getOldSplitDetailsFromEvent(event1Proto, new Configuration());
     assertTrue(is1 instanceof InputSplitForTest);
     assertEquals(1, ((InputSplitForTest) is1).identifier);
 
-    MRSplitProto event2Proto = MRSplitProto.parseFrom(diEvent2.getUserPayload());
+    MRSplitProto event2Proto = MRSplitProto.parseFrom(ByteString.copyFrom(diEvent2.getUserPayload()));
     InputSplit is2 = MRInputUtils.getOldSplitDetailsFromEvent(event2Proto, new Configuration());
     assertTrue(is2 instanceof InputSplitForTest);
     assertEquals(2, ((InputSplitForTest) is2).identifier);

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index f418bbb..121a975 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -111,7 +111,8 @@ public class TestMultiMRInput {
     assertEquals(1, splits.length);
 
     MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
-    InputDataInformationEvent event = InputDataInformationEvent.create(0, splitProto.toByteArray());
+    InputDataInformationEvent event =
+        InputDataInformationEvent.create(0, splitProto.toByteString().asReadOnlyByteBuffer());
 
     eventList.clear();
     eventList.add(event);
@@ -171,11 +172,11 @@ public class TestMultiMRInput {
 
     MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]);
     InputDataInformationEvent event1 =
-        InputDataInformationEvent.create(0, splitProto1.toByteArray());
+        InputDataInformationEvent.create(0, splitProto1.toByteString().asReadOnlyByteBuffer());
 
     MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]);
     InputDataInformationEvent event2 =
-        InputDataInformationEvent.create(0, splitProto2.toByteArray());
+        InputDataInformationEvent.create(0, splitProto2.toByteString().asReadOnlyByteBuffer());
 
     eventList.clear();
     eventList.add(event1);
@@ -224,9 +225,9 @@ public class TestMultiMRInput {
 
     MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
     InputDataInformationEvent event1 =
-        InputDataInformationEvent.create(0, splitProto.toByteArray());
+        InputDataInformationEvent.create(0, splitProto.toByteString().asReadOnlyByteBuffer());
     InputDataInformationEvent event2 =
-        InputDataInformationEvent.create(1, splitProto.toByteArray());
+        InputDataInformationEvent.create(1, splitProto.toByteString().asReadOnlyByteBuffer());
 
     eventList.clear();
     eventList.add(event1);

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index 4419b53..0650a90 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -48,7 +48,7 @@ public class ProtoConverters {
         proto.getTargetIndex(),
         proto.getVersion(),
         proto.getUserPayload() != null ?
-            proto.getUserPayload().toByteArray() : null);
+            proto.getUserPayload().asReadOnlyByteBuffer() : null);
   }
 
   public static EventProtos.CompositeEventProto convertCompositeDataMovementEventToProto(
@@ -67,7 +67,7 @@ public class ProtoConverters {
       EventProtos.CompositeEventProto proto) {
     return CompositeDataMovementEvent.create(proto.getStartIndex(),
         proto.getCount(),
-        proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
+        proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null);
   }
   
   public static EventProtos.VertexManagerEventProto convertVertexManagerEventToProto(
@@ -83,7 +83,7 @@ public class ProtoConverters {
   public static VertexManagerEvent convertVertexManagerEventFromProto(
       EventProtos.VertexManagerEventProto vmProto) {
     return VertexManagerEvent.create(vmProto.getTargetVertexName(),
-        vmProto.hasUserPayload() ? vmProto.getUserPayload().toByteArray() : null);
+        vmProto.hasUserPayload() ? vmProto.getUserPayload().asReadOnlyByteBuffer() : null);
   }
 
   public static EventProtos.RootInputDataInformationEventProto
@@ -103,7 +103,7 @@ public class ProtoConverters {
       EventProtos.RootInputDataInformationEventProto proto) {
     InputDataInformationEvent diEvent = InputDataInformationEvent.create(
         proto.getSourceIndex(),
-        proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
+        proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null);
     diEvent.setTargetIndex(proto.getTargetIndex());
     return diEvent;
   }
@@ -124,7 +124,7 @@ public class ProtoConverters {
       EventProtos.RootInputInitializerEventProto proto) {
     InputInitializerEvent event =
         InputInitializerEvent.create(proto.getTargetVertexName(), proto.getTargetInputName(),
-            (proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null));
+            (proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null));
     return event;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 01de0bd..52e725e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -349,7 +349,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       // save output size
       VertexManagerEventPayloadProto proto;
       try {
-        proto = VertexManagerEventPayloadProto.parseFrom(vmEvent.getUserPayload());
+        proto = VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
       } catch (InvalidProtocolBufferException e) {
         throw new TezUncheckedException(e);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 7cbbf07..b664ea7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -24,6 +24,7 @@ import java.util.BitSet;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -90,7 +91,7 @@ public class ShuffleInputEventHandler {
   private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException {
     DataMovementEventPayloadProto shufflePayload;
     try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
     } catch (InvalidProtocolBufferException e) {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     } 

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 1eb09bc..50664f5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -491,7 +491,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
 
     CompositeDataMovementEvent cDme = CompositeDataMovementEvent.create(0, numPartitions,
-        payloadBuidler.build().toByteArray());
+        payloadBuidler.build().toByteString().asReadOnlyByteBuffer());
     return cDme;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 8708c6f..15121da 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -192,7 +192,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
 
     payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
-    byte[] payload = payloadProto.toByteArray();
+    ByteBuffer payload = payloadProto.toByteString().asReadOnlyByteBuffer();
 
     long outputSize = getContext().getCounters()
         .findCounter(TaskCounter.OUTPUT_BYTES).getValue();
@@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
         .newBuilder();
     vmBuilder.setOutputSize(outputSize);
     VertexManagerEvent vmEvent = VertexManagerEvent.create(
-        getContext().getDestinationVertexName(), vmBuilder.build().toByteArray());    
+        getContext().getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer());
 
     List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs() + 1);
     events.add(vmEvent);

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 9ad7cd3..ea02743 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -154,7 +154,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     }
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
 
-    DataMovementEvent dmEvent = DataMovementEvent.create(0, payloadProto.toByteArray());
+    DataMovementEvent dmEvent = DataMovementEvent.create(0, payloadProto.toByteString().asReadOnlyByteBuffer());
     List<Event> events = Lists.newArrayListWithCapacity(1);
     events.add(dmEvent);
     return events;

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 1cec071..d1b3362 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -100,7 +101,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
     DataMovementEventPayloadProto shufflePayload;
     try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(
+          ByteString.copyFrom(dme.getUserPayload()));
     } catch (InvalidProtocolBufferException e) {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index e4b4656..4768c6c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.library.vertexmanager;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -215,8 +216,8 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
 
-    byte[] payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
+    ByteBuffer payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer();
     VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
     // parallelism not change due to large data size
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
@@ -236,7 +237,7 @@ public class TestShuffleVertexManager {
     // parallelism changed due to small data size
     scheduledTasks.clear();
     payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray();
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer();
     vmEvent = VertexManagerEvent.create("Vertex", payload);
     
     manager = createManager(conf, mockContext, 0.5f, 0.5f);
@@ -283,7 +284,7 @@ public class TestShuffleVertexManager {
     
     EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
     Map<Integer, List<Integer>> targets = Maps.newHashMap();
-    DataMovementEvent dmEvent = DataMovementEvent.create(1, new byte[0]);
+    DataMovementEvent dmEvent = DataMovementEvent.create(1, ByteBuffer.wrap(new byte[0]));
     // 4 source task outputs - same as original number of partitions
     Assert.assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(0));
     // 4 destination task inputs - 2 source tasks + 2 merged partitions
@@ -295,7 +296,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(1, e.getValue().size());
     Assert.assertEquals(3, e.getValue().get(0).intValue());
     targets.clear();
-    dmEvent = DataMovementEvent.create(2, new byte[0]);
+    dmEvent = DataMovementEvent.create(2, ByteBuffer.wrap(new byte[0]));
     edgeManager.routeDataMovementEventToDestination(dmEvent, 0, dmEvent.getSourceIndex(), targets);
     Assert.assertEquals(1, targets.size());
     e = targets.entrySet().iterator().next();

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
index 56e1337..14e833d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
@@ -73,7 +73,8 @@ public class TestShuffleInputEventHandler {
     if (emptyPartitionByteString != null) {
       builder.setEmptyPartitions(emptyPartitionByteString);
     }
-    return DataMovementEvent.create(srcIndex, targetIndex, 0, builder.build().toByteArray());
+    return DataMovementEvent
+        .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer());
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index e3b8760..d15cfff 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -41,6 +41,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -311,8 +312,9 @@ public class TestUnorderedPartitionedKVWriter {
     CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
     assertEquals(0, cdme.getSourceIndexStart());
     assertEquals(numPartitions, cdme.getCount());
-    DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
-        .getUserPayload());
+    DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(
+        ByteString.copyFrom(cdme
+            .getUserPayload()));
     assertFalse(eventProto.hasData());
     BitSet emptyPartitionBits = null;
     if (partitionsWithData.cardinality() != numPartitions) {
@@ -498,8 +500,9 @@ public class TestUnorderedPartitionedKVWriter {
     CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
     assertEquals(0, cdme.getSourceIndexStart());
     assertEquals(numOutputs, cdme.getCount());
-    DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
-        .getUserPayload());
+    DataMovementEventPayloadProto eventProto =
+        DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(
+            cdme.getUserPayload()));
     assertFalse(eventProto.hasData());
     if (skippedPartitions == null && numRecordsWritten > 0) {
       assertFalse(eventProto.hasEmptyPartitions());

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index f3b3503..8a7f2a6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.library.output;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -175,7 +176,8 @@ public class TestOnFileSortedOutput {
 
     ShuffleUserPayloads.DataMovementEventPayloadProto
         payload = ShuffleUserPayloads.DataMovementEventPayloadProto
-        .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload());
+        .parseFrom(
+            ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
 
     assertEquals(HOST, payload.getHost());
     assertEquals(PORT, payload.getPort());
@@ -203,7 +205,7 @@ public class TestOnFileSortedOutput {
 
     ShuffleUserPayloads.DataMovementEventPayloadProto
         payload = ShuffleUserPayloads.DataMovementEventPayloadProto
-        .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload());
+        .parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
 
     assertEquals(HOST, payload.getHost());
     assertEquals(PORT, payload.getPort());
@@ -220,7 +222,7 @@ public class TestOnFileSortedOutput {
 
     ShuffleUserPayloads.DataMovementEventPayloadProto
         payload = ShuffleUserPayloads.DataMovementEventPayloadProto
-        .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload());
+        .parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
     if (sendEmptyPartitionViaEvent) {
       assertEquals("", payload.getHost());
       assertEquals(0, payload.getPort());

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 1c6db22..9d623b5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -145,7 +146,7 @@ public class TestOnFileUnorderedKVOutput {
     assertEquals("Invalid source index", 0, dmEvent.getSourceIndex());
 
     DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
-        .parseFrom(dmEvent.getUserPayload());
+        .parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
 
     assertFalse(shufflePayload.hasEmptyPartitions());
     assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
index 646dca8..d6d6a4c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
@@ -148,7 +148,8 @@ public class TestShuffleInputEventHandlerImpl {
     if (emptyPartitionByteString != null) {
       builder.setEmptyPartitions(emptyPartitionByteString);
     }
-    Event dme = DataMovementEvent.create(srcIndex, targetIndex, 0, builder.build().toByteArray());
+    Event dme = DataMovementEvent
+        .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer());
     return dme;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 29ea7c0..465dd9c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -291,7 +291,7 @@ public class TestInput extends AbstractLogicalInput {
             " numCompletedInputs: " + numCompletedInputs);
         this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion();
         this.inputValues[dmEvent.getTargetIndex()] = 
-            ByteBuffer.wrap(dmEvent.getUserPayload()).getInt();
+            dmEvent.getUserPayload().getInt();
       } else if (event instanceof InputFailedEvent) {
         InputFailedEvent ifEvent = (InputFailedEvent) event;
         numCompletedInputs--;

http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 9d9767b..8998c3c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -77,7 +77,8 @@ public class TestOutput extends AbstractLogicalOutput {
   @Override
   public List<Event> close() throws Exception {
     LOG.info("Sending data movement event with value: " + output);
-    byte[] result = ByteBuffer.allocate(4).putInt(output).array();
+    ByteBuffer result = ByteBuffer.allocate(4).putInt(output);
+    result.flip();
     List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs());
     for (int i = 0; i < getNumPhysicalOutputs(); i++) {
       DataMovementEvent event = DataMovementEvent.create(i, result);


Mime
View raw message