tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-373. Create UserPayload class for internal code (Tsuyoshi OZAWA via bikas)
Date Mon, 02 Jun 2014 17:32:42 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 249f7482c -> af9bd5bde


TEZ-373. Create UserPayload class for internal code (Tsuyoshi OZAWA via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/af9bd5bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/af9bd5bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/af9bd5bd

Branch: refs/heads/master
Commit: af9bd5bde1c36441c0fb1a12ae9b9aa33fb0213d
Parents: 249f748
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Jun 2 10:32:31 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Jun 2 10:32:31 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezUserPayload.java   | 36 ++++++++++++++++++++
 .../apache/tez/dag/api/DagTypeConverters.java   | 14 ++++++++
 .../tez/dag/api/EdgeManagerDescriptor.java      |  2 +-
 .../org/apache/tez/dag/api/InputDescriptor.java |  2 +-
 .../apache/tez/dag/api/OutputDescriptor.java    |  2 +-
 .../apache/tez/dag/api/ProcessorDescriptor.java |  2 +-
 .../apache/tez/dag/api/TezEntityDescriptor.java | 20 ++++++-----
 .../dag/api/VertexManagerPluginDescriptor.java  |  2 +-
 .../api/events/CompositeDataMovementEvent.java  | 10 +++---
 .../events/RootInputDataInformationEvent.java   | 10 +++---
 .../api/events/RootInputUpdatePayloadEvent.java | 10 +++---
 .../runtime/api/events/VertexManagerEvent.java  |  8 +++--
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  8 +++--
 .../dag/impl/OutputCommitterContextImpl.java    |  8 +++--
 .../tez/dag/app/dag/impl/VertexManager.java     | 13 ++++---
 .../runtime/api/impl/TezInputContextImpl.java   |  8 +++--
 .../runtime/api/impl/TezOutputContextImpl.java  |  8 +++--
 .../api/impl/TezProcessorContextImpl.java       |  8 +++--
 18 files changed, 123 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java b/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java
new file mode 100644
index 0000000..9531e26
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to hold user payload.
+ */
+public class TezUserPayload {
+  private final byte[] payload;
+
+  public TezUserPayload(byte[] payload) {
+    this.payload = payload;
+  }
+
+  public byte[] getPayload() {
+    return payload;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index c57f1f2..cb2c35b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import javax.annotation.Nullable;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -640,4 +642,16 @@ public class DagTypeConverters {
     return context;
   }
 
+  public static TezUserPayload convertToTezUserPayload(@Nullable byte[] payload) {
+    return new TezUserPayload(payload);
+  }
+
+  @Nullable
+  public static byte[] convertFromTezUserPayload(@Nullable TezUserPayload payload) {
+    if (payload == null) {
+      return null;
+    }
+    return payload.getPayload();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
index 6ecbb75..cb5e84f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
@@ -26,7 +26,7 @@ public class EdgeManagerDescriptor extends TezEntityDescriptor {
 
   @Override
   public EdgeManagerDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index 45e4b77..cc6948c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -33,7 +33,7 @@ public class InputDescriptor extends TezEntityDescriptor {
 
   @Override
   public InputDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index d6bde35..a34d35c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -33,7 +33,7 @@ public class OutputDescriptor extends TezEntityDescriptor {
 
   @Override
   public OutputDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index 60afb4d..a0e574d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -32,7 +32,7 @@ public class ProcessorDescriptor extends TezEntityDescriptor {
   }
 
   public ProcessorDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index cf305eb..25788ff 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -25,10 +25,11 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.TezUserPayload;
 
 public abstract class TezEntityDescriptor implements Writable {
 
-  protected byte[] userPayload;
+  protected TezUserPayload userPayload;
   private String className;
 
   @Private // for Writable
@@ -40,11 +41,11 @@ public abstract class TezEntityDescriptor implements Writable {
   }
 
   public byte[] getUserPayload() {
-    return this.userPayload;
+    return (userPayload == null) ? null : userPayload.getPayload();
   }
 
   public TezEntityDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 
@@ -55,11 +56,13 @@ public abstract class TezEntityDescriptor implements Writable {
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, className);
-    if (userPayload == null) {
+    // TODO: TEZ-305 - using protobuf serde instead of Writable serde.
+    byte[] bb = DagTypeConverters.convertFromTezUserPayload(userPayload);
+    if (bb == null) {
       out.writeInt(-1);
     } else {
-      out.writeInt(userPayload.length);
-      out.write(userPayload);
+      out.writeInt(bb.length);
+      out.write(bb);
     }
   }
 
@@ -68,8 +71,9 @@ public abstract class TezEntityDescriptor implements Writable {
     this.className = Text.readString(in);
     int payloadLength = in.readInt();
     if (payloadLength != -1) {
-      userPayload = new byte[payloadLength];
-      in.readFully(userPayload);
+      byte[] bb = new byte[payloadLength];
+      in.readFully(bb);
+      this.userPayload = DagTypeConverters.convertToTezUserPayload(bb);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
index 963df48..58980b5 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
@@ -33,7 +33,7 @@ public class VertexManagerPluginDescriptor extends TezEntityDescriptor {
 
   @Override
   public VertexManagerPluginDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
index 2abf1ff..5abfc06 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
@@ -20,6 +20,8 @@ package org.apache.tez.runtime.api.events;
 
 import java.util.Iterator;
 
+import org.apache.tez.common.TezUserPayload;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.runtime.api.Event;
 
 /**
@@ -40,7 +42,7 @@ public class CompositeDataMovementEvent extends Event {
   protected final int sourceIndexEnd;
   protected int version;
 
-  protected final byte[] userPayload;
+  protected final TezUserPayload userPayload;
 
   /**
    * @param srcIndexStart
@@ -55,7 +57,7 @@ public class CompositeDataMovementEvent extends Event {
   public CompositeDataMovementEvent(int srcIndexStart, int srcIndexEnd, byte[] userPayload)
{
     this.sourceIndexStart = srcIndexStart;
     this.sourceIndexEnd = srcIndexEnd;
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
   }
 
   public int getSourceIndexStart() {
@@ -67,7 +69,7 @@ public class CompositeDataMovementEvent extends Event {
   }
 
   public byte[] getUserPayload() {
-    return userPayload;
+    return userPayload.getPayload();
   }
 
   public void setVersion(int version) {
@@ -95,7 +97,7 @@ public class CompositeDataMovementEvent extends Event {
 
           @Override
           public DataMovementEvent next() {
-            DataMovementEvent dmEvent = new DataMovementEvent(currentPos, userPayload);
+            DataMovementEvent dmEvent = new DataMovementEvent(currentPos, userPayload.getPayload());
             currentPos++;
             dmEvent.setVersion(version);
             return dmEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
index 25fd9de..638b3c2 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api.events;
 
+import org.apache.tez.common.TezUserPayload;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
@@ -41,7 +43,7 @@ public final class RootInputDataInformationEvent extends Event {
 
   private final int sourceIndex;
   private int targetIndex; // TODO Likely to be multiple at a later point.
-  private final byte[] userPayload;
+  private final TezUserPayload userPayload;
   private final Object userPayloadObject;
   
   /**
@@ -51,14 +53,14 @@ public final class RootInputDataInformationEvent extends Event {
    */
   public RootInputDataInformationEvent(int srcIndex, byte[] userPayload) {
     this.sourceIndex = srcIndex;
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     this.userPayloadObject = null;
   }
   
   public RootInputDataInformationEvent(int srcIndex, Object userPayloadDeserialized) {
     this.sourceIndex = srcIndex;
     this.userPayloadObject = userPayloadDeserialized;
-    this.userPayload = null;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(null);
   }
 
   public int getSourceIndex() {
@@ -74,7 +76,7 @@ public final class RootInputDataInformationEvent extends Event {
   }
   
   public byte[] getUserPayload() {
-    return this.userPayload;
+    return userPayload.getPayload();
   }
   
   public Object getDeserializedUserPayload() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
index 607678f..4eb4c74 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputUpdatePayloadEvent.java
@@ -18,25 +18,27 @@
 
 package org.apache.tez.runtime.api.events;
 
+import org.apache.tez.common.TezUserPayload;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 
 /**
  * Events used by {@link TezRootInputInitializer} implementations to update the
  * shared user payload for the Input that is being initialized. </p>
- * 
+ *
  * This event is specific to an Input, and should only be sent once - ideally
  * before {@link RootInputDataInformationEvent}s
  */
 public class RootInputUpdatePayloadEvent extends Event {
 
-  private final byte[] userPayload;
+  private final TezUserPayload userPayload;
 
   public RootInputUpdatePayloadEvent(byte[] userPayload) {
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
   }
 
   public byte[] getUserPayload() {
-    return this.userPayload;
+    return userPayload.getPayload();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
index 84431bc..3696d43 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.api.events;
 
+import org.apache.tez.common.TezUserPayload;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.runtime.api.Event;
 
 import com.google.common.base.Preconditions;
@@ -37,7 +39,7 @@ public class VertexManagerEvent extends Event {
   /**
    * User payload to be sent
    */
-  private final byte[] userPayload;
+  private final TezUserPayload userPayload;
   
   /**
    * Create a new VertexManagerEvent
@@ -48,7 +50,7 @@ public class VertexManagerEvent extends Event {
     Preconditions.checkArgument(vertexName != null);
     Preconditions.checkArgument(userPayload != null);
     this.targetVertexName = vertexName;
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
   }
   
   public String getTargetVertexName() {
@@ -56,6 +58,6 @@ public class VertexManagerEvent extends Event {
   }
   
   public byte[] getUserPayload() {
-    return userPayload;
+    return userPayload.getPayload();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 4a9ea65..b6dfaa4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.TezUserPayload;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -60,7 +62,7 @@ public class Edge {
 
     private final String srcVertexName;
     private final String destVertexName;
-    private final byte[] userPayload;
+    private final TezUserPayload userPayload;
 
     EdgeManagerContextImpl(String srcVertexName, String destVertexName,
         @Nullable byte[] userPayload) {
@@ -68,12 +70,12 @@ public class Edge {
       checkNotNull(destVertexName, "destVertexName is null");
       this.srcVertexName = srcVertexName;
       this.destVertexName = destVertexName;
-      this.userPayload = userPayload;
+      this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     }
 
     @Override
     public byte[] getUserPayload() {
-      return userPayload;
+      return userPayload.getPayload();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
index ff9e401..07e19a7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
@@ -21,6 +21,8 @@ package org.apache.tez.dag.app.dag.impl;
 import static com.google.common.base.Preconditions.checkNotNull;
 import javax.annotation.Nullable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezUserPayload;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 
 public class OutputCommitterContextImpl implements OutputCommitterContext {
@@ -30,7 +32,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext
{
   private final String dagName;
   private final String vertexName;
   private final String outputName;
-  private final byte[] userPayload;
+  private final TezUserPayload userPayload;
   private final int vertexIdx;
 
   public OutputCommitterContextImpl(ApplicationId applicationId,
@@ -49,7 +51,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext
{
     this.dagName = dagName;
     this.vertexName = vertexName;
     this.outputName = outputName;
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     this.vertexIdx = vertexIdx;
   }
 
@@ -80,7 +82,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext
{
 
   @Override
   public byte[] getUserPayload() {
-    return userPayload;
+    return userPayload.getPayload();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 86362c3..7f8cc14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -31,7 +31,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -65,7 +67,7 @@ public class VertexManager {
   VertexManagerPlugin plugin;
   Vertex managedVertex;
   VertexManagerPluginContextImpl pluginContext;
-  byte[] payload = null;
+  TezUserPayload payload = null;
   AppContext appContext;
   
   private static final Log LOG = LogFactory.getLog(VertexManager.class);
@@ -125,7 +127,7 @@ public class VertexManager {
     @Nullable
     @Override
     public byte[] getUserPayload() {
-      return payload;
+      return payload == null ? null: payload.getPayload();
     }
 
     @SuppressWarnings("unchecked")
@@ -229,13 +231,14 @@ public class VertexManager {
     pluginContext = new VertexManagerPluginContextImpl();
     if (pluginDesc != null) {
       plugin = RuntimeUtils.createClazzInstance(pluginDesc.getClassName());
-      payload = pluginDesc.getUserPayload();
+      payload = DagTypeConverters.convertToTezUserPayload(pluginDesc.getUserPayload());
     }
-    if (payload == null) {
+    if (payload == null || payload.getPayload() == null) {
       // Ease of use. If no payload present then give the common configuration
       // TODO TEZ-744 Don't do this - AMConf should not be used to configure vertexManagers.
       try {
-        payload = TezUtils.createUserPayloadFromConf(appContext.getAMConf());
+        payload = DagTypeConverters.convertToTezUserPayload(
+            TezUtils.createUserPayloadFromConf(appContext.getAMConf()));
       } catch (IOException e) {
         throw new TezUncheckedException(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 53c82f6..9ef4c58 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -28,7 +28,9 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -43,7 +45,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 public class TezInputContextImpl extends TezTaskContextImpl
     implements TezInputContext {
 
-  private final byte[] userPayload;
+  private final TezUserPayload userPayload;
   private final String sourceVertexName;
   private final EventMetaData sourceInfo;
   private final int inputIndex;
@@ -66,7 +68,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
     checkNotNull(sourceVertexName, "sourceVertexName is null");
     checkNotNull(input, "input is null");
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     this.inputIndex = inputIndex;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(
@@ -100,7 +102,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
   @Nullable
   @Override
   public byte[] getUserPayload() {
-    return userPayload;
+    return userPayload.getPayload();
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 2b1bbf6..b612a6d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -28,7 +28,9 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -41,7 +43,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements TezOutputContext {
 
-  private final byte[] userPayload;
+  private final TezUserPayload userPayload;
   private final String destinationVertexName;
   private final EventMetaData sourceInfo;
   private final int outputIndex;
@@ -62,7 +64,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
         auxServiceEnv, memDist, outputDescriptor);
     checkNotNull(outputIndex, "outputIndex is null");
     checkNotNull(destinationVertexName, "destinationVertexName is null");
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     this.outputIndex = outputIndex;
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
@@ -93,7 +95,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
   @Nullable
   @Override
   public byte[] getUserPayload() {
-    return userPayload;
+    return userPayload.getPayload();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/af9bd5bd/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index a1005b9..2bf4d51 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -31,7 +31,9 @@ import javax.annotation.Nullable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
@@ -46,7 +48,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements
TezPr
 
   private static final Log LOG = LogFactory.getLog(TezProcessorContextImpl.class);
   
-  private final byte[] userPayload;
+  private final TezUserPayload userPayload;
   private final EventMetaData sourceInfo;
   private final InputReadyTracker inputReadyTracker;
 
@@ -61,7 +63,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements
TezPr
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, processorDescriptor);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = userPayload;
+    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
     this.inputReadyTracker = inputReadyTracker;
@@ -81,7 +83,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements
TezPr
   @Nullable
   @Override
   public byte[] getUserPayload() {
-    return userPayload;
+    return userPayload.getPayload();
   }
 
   @Override


Mime
View raw message