tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-354. Make user payload an optional field for {Input, Output, Processor}Descriptor. Contributed by Gunther Hagleitner
Date Wed, 14 Aug 2013 21:06:42 GMT
Updated Branches:
  refs/heads/master a9b6ab16b -> 15a411e71


TEZ-354. Make user payload an optional field for
{Input,Output,Processor}Descriptor. Contributed by Gunther
Hagleitner


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/15a411e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/15a411e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/15a411e7

Branch: refs/heads/master
Commit: 15a411e716b49fa33101ea12e00f42451fa95726
Parents: a9b6ab1
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Aug 14 14:05:49 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Aug 14 14:05:49 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/DagTypeConverters.java   |   6 +-
 .../org/apache/tez/dag/api/InputDescriptor.java |  13 ++-
 .../apache/tez/dag/api/OutputDescriptor.java    |  11 +-
 .../apache/tez/dag/api/ProcessorDescriptor.java |  10 +-
 .../apache/tez/dag/api/TezEntityDescriptor.java |  14 ++-
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  16 +--
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 106 +++++++++----------
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |   2 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |   2 +-
 .../dag/app/dag/impl/TestVertexScheduler.java   |  12 +--
 .../tez/dag/app/rm/TestContainerReuse.java      |   2 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  12 +--
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  26 ++---
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |   4 +-
 .../tez/mapreduce/processor/MapUtils.java       |   2 +-
 .../processor/reduce/TestReduceProcessor.java   |   2 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |   8 +-
 17 files changed, 133 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 392558b..143186d 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -243,7 +243,7 @@ public class DagTypeConverters {
     if (proto.hasUserPayload()) {
       bb = proto.getUserPayload().toByteArray();
     }
-    return new InputDescriptor(className, bb);
+    return new InputDescriptor(className).setUserPayload(bb);
   }
 
   public static OutputDescriptor convertOutputDescriptorFromDAGPlan(
@@ -253,7 +253,7 @@ public class DagTypeConverters {
     if (proto.hasUserPayload()) {
       bb =  proto.getUserPayload().toByteArray();
     }
-    return new OutputDescriptor(className, bb);
+    return new OutputDescriptor(className).setUserPayload(bb);
   }
 
   public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
@@ -263,6 +263,6 @@ public class DagTypeConverters {
     if (proto.hasUserPayload()) {
       bb = proto.getUserPayload().toByteArray();
     }
-    return new ProcessorDescriptor(className, bb);
+    return new ProcessorDescriptor(className).setUserPayload(bb);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index 829ff1e..dea9001 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -19,9 +19,14 @@
 package org.apache.tez.dag.api;
 
 public class InputDescriptor extends TezEntityDescriptor {
-  
-  // TODO Fix dependencies so that this can be specified as a class.  
-  public InputDescriptor(String inputClassName, byte[] userPayload) {
-    super(inputClassName, userPayload);
+
+  public InputDescriptor(String inputClassName) {
+    super(inputClassName);
+  }
+
+  @Override
+  public InputDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index afa7586..16fb9b1 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -20,8 +20,13 @@ package org.apache.tez.dag.api;
 
 public class OutputDescriptor extends TezEntityDescriptor {
 
-  // TODO Fix dependencies so that this can be specified as a class.
-  public OutputDescriptor(String outputClassName, byte[] userPayload) {
-    super(outputClassName, userPayload);
+  public OutputDescriptor(String outputClassName) {
+    super(outputClassName);
+  }
+
+  @Override
+  public OutputDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index 0d04214..092147d 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -20,8 +20,12 @@ package org.apache.tez.dag.api;
 
 public class ProcessorDescriptor extends TezEntityDescriptor {
 
-  // TODO Fix dependencies so that this can be specified as a class.
-  public ProcessorDescriptor(String processorClassName, byte[] userPayload) {
-    super(processorClassName, userPayload);
+  public ProcessorDescriptor(String processorClassName) {
+    super(processorClassName);
+  }
+
+  public ProcessorDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index e78a5a1..9d4b2c4 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -20,18 +20,22 @@ package org.apache.tez.dag.api;
 
 public abstract class TezEntityDescriptor {
 
-  private byte[] userPayload;
+  protected byte[] userPayload;
   private String className;
-  
-  public TezEntityDescriptor(String className, byte[] userPayload) {
-    this.userPayload = userPayload;
+
+  public TezEntityDescriptor(String className) {
     this.className = className;
   }
-  
+
   public byte[] getUserPayload() {
     return this.userPayload;
   }
 
+  public TezEntityDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
+  }
+
   public String getClassName() {
     return this.className;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index ce26b35..8dd0826 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -95,10 +95,10 @@ public class TestDAGPlan {
   @Test
   public void testUserPayloadSerde() {
     DAG dag = new DAG("testDag");
-    ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1",
-        "processor1Bytes".getBytes());
-    ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2",
-        "processor2Bytes".getBytes());
+    ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1").
+        setUserPayload("processor1Bytes".getBytes());
+    ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2").
+        setUserPayload("processor2Bytes".getBytes());
     Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
@@ -106,10 +106,10 @@ public class TestDAGPlan {
     v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
         .setTaskLocalResources(new HashMap<String, LocalResource>());
 
-    InputDescriptor inputDescriptor = new InputDescriptor("input",
-        "inputBytes".getBytes());
-    OutputDescriptor outputDescriptor = new OutputDescriptor("output",
-        "outputBytes".getBytes());
+    InputDescriptor inputDescriptor = new InputDescriptor("input").
+        setUserPayload("inputBytes".getBytes());
+    OutputDescriptor outputDescriptor = new OutputDescriptor("output").
+        setUserPayload("outputBytes".getBytes());
     Edge edge = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE,
         SourceType.STABLE, outputDescriptor, inputDescriptor));
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 2a7b8ff..e66df54 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -38,15 +38,15 @@ public class TestDAGVerify {
   @Test
   public void testVerify1() {
     Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor(dummyProcessorClassName, null),
+        new ProcessorDescriptor(dummyProcessorClassName),
         dummyTaskCount, dummyTaskResource);
     Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor(dummyOutputClassName, null),
-            new InputDescriptor(dummyInputClassName, null)));
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -63,33 +63,33 @@ public class TestDAGVerify {
   public void testCycle1() {
     IllegalStateException ex=null;
     Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v3 = new Vertex("v3",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v4 = new Vertex("v4",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     Edge e2 = new Edge(v2, v3,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     Edge e3 = new Edge(v2, v4,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     Edge e4 = new Edge(v4, v1,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -119,33 +119,33 @@ public class TestDAGVerify {
   public void testCycle2() {
     IllegalStateException ex=null;
     Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v3 = new Vertex("v3",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v4 = new Vertex("v4",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     Edge e2 = new Edge(v2, v3,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     Edge e3 = new Edge(v2, v4,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     Edge e4 = new Edge(v3, v2,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -170,10 +170,10 @@ public class TestDAGVerify {
   public void repeatedVertexName() {
     IllegalStateException ex=null;
     Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v1repeat = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
@@ -197,22 +197,22 @@ public class TestDAGVerify {
     IllegalStateException ex=null;
     try {
       Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor", null),
+          new ProcessorDescriptor("MapProcessor"),
           dummyTaskCount, dummyTaskResource);
       Vertex v2 = new Vertex("v2",
-          new ProcessorDescriptor("MapProcessor", null),
+          new ProcessorDescriptor("MapProcessor"),
           dummyTaskCount, dummyTaskResource);
       Vertex v3 = new Vertex("v3",
-          new ProcessorDescriptor("ReduceProcessor", null),
+          new ProcessorDescriptor("ReduceProcessor"),
           dummyTaskCount, dummyTaskResource);
       Edge e1 = new Edge(v1, v3,
           new EdgeProperty(ConnectionPattern.ONE_TO_ONE, SourceType.STABLE,
-              new OutputDescriptor("dummy output class", null),
-              new InputDescriptor("dummy input class", null)));
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
       Edge e2 = new Edge(v2, v3,
           new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-              new OutputDescriptor("dummy output class", null),
-              new InputDescriptor("dummy input class", null)));
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
       DAG dag = new DAG("testDag");
       dag.addVertex(v1);
       dag.addVertex(v2);
@@ -236,22 +236,22 @@ public class TestDAGVerify {
   @Test
   public void BinaryInputAllowed() {
     Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor", null),
+        new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     Vertex v3 = new Vertex("v3",
-        new ProcessorDescriptor("ReduceProcessor", null),
+        new ProcessorDescriptor("ReduceProcessor"),
         dummyTaskCount, dummyTaskResource);
     Edge e1 = new Edge(v1, v3,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     Edge e2 = new Edge(v2, v3,
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor("dummy output class", null),
-            new InputDescriptor("dummy input class", null)));
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -269,22 +269,22 @@ public class TestDAGVerify {
     IllegalStateException ex=null;
     try {
       Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor", null),
+          new ProcessorDescriptor("MapProcessor"),
           dummyTaskCount, dummyTaskResource);
       Vertex v2 = new Vertex("v2",
-          new ProcessorDescriptor("MapProcessor", null),
+          new ProcessorDescriptor("MapProcessor"),
           dummyTaskCount, dummyTaskResource);
       Vertex v3 = new Vertex("v3",
-          new ProcessorDescriptor("MapProcessor", null),
+          new ProcessorDescriptor("MapProcessor"),
           dummyTaskCount, dummyTaskResource);
       Edge e1 = new Edge(v1, v2,
           new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-              new OutputDescriptor("dummy output class", null),
-              new InputDescriptor("dummy input class", null)));
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
       Edge e2 = new Edge(v1, v2,
           new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
-              new OutputDescriptor("dummy output class", null),
-              new InputDescriptor("dummy input class", null)));
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
       DAG dag = new DAG("testDag");
       dag.addVertex(v1);
       dag.addVertex(v2);
@@ -322,7 +322,7 @@ public class TestDAGVerify {
   public void testInvalidVertexConstruction() {
     try {
       Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor", null),
+          new ProcessorDescriptor("MapProcessor"),
           0, dummyTaskResource);
       Assert.fail("Expected exception for 0 parallelism");
     } catch (IllegalArgumentException e) {
@@ -330,7 +330,7 @@ public class TestDAGVerify {
     }
     try {
       Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor", null),
+          new ProcessorDescriptor("MapProcessor"),
           1, null);
       Assert.fail("Expected exception for 0 parallelism");
     } catch (IllegalArgumentException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index d9210d9..2cc00dc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -93,7 +93,7 @@ public class TestTaskAttempt {
 
   private static final ProcessorDescriptor MAP_PROCESSOR_DESC =
       new ProcessorDescriptor(
-      "org.apache.tez.mapreduce.processor.map.MapProcessor", null);
+      "org.apache.tez.mapreduce.processor.map.MapProcessor");
 
   static public class StubbedFS extends RawLocalFileSystem {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 922c0a6..412fcb5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -114,7 +114,7 @@ public class TestTaskImpl {
     javaOpts = "";
     leafVertex = false;
     mapProcDesc = new ProcessorDescriptor(
-        "org.apache.tez.mapreduce.processor.map.MapProcessor", null);
+        "org.apache.tez.mapreduce.processor.map.MapProcessor");
 
     mockTask = new MockTaskImpl(vertexId, partition,
         dispatcher.getEventHandler(), conf, taskAttemptListener, jobToken,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 9102897..a2248ca 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -52,22 +52,22 @@ public class TestVertexScheduler {
     TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
     EdgeProperty eProp1 = new EdgeProperty(
         EdgeProperty.ConnectionPattern.BIPARTITE,
-        EdgeProperty.SourceType.STABLE, new OutputDescriptor("out", null),
-        new InputDescriptor("in", null));
+        EdgeProperty.SourceType.STABLE, new OutputDescriptor("out"),
+        new InputDescriptor("in"));
     when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
     Vertex mockSrcVertex2 = mock(Vertex.class);
     TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2);
     EdgeProperty eProp2 = new EdgeProperty(
         EdgeProperty.ConnectionPattern.BIPARTITE,
-        EdgeProperty.SourceType.STABLE, new OutputDescriptor("out", null),
-        new InputDescriptor("in", null));
+        EdgeProperty.SourceType.STABLE, new OutputDescriptor("out"),
+        new InputDescriptor("in"));
     when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
     Vertex mockSrcVertex3 = mock(Vertex.class);
     TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3);
     EdgeProperty eProp3 = new EdgeProperty(
         EdgeProperty.ConnectionPattern.ONE_TO_ALL,
-        EdgeProperty.SourceType.STABLE, new OutputDescriptor("out", null),
-        new InputDescriptor("in", null));
+        EdgeProperty.SourceType.STABLE, new OutputDescriptor("out"),
+        new InputDescriptor("in"));
     when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
     
     Vertex mockManagedVertex = mock(Vertex.class);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 056239b..0a75f20 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -193,7 +193,7 @@ public class TestContainerReuse {
     AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(
         taID, capability, new HashMap<String, LocalResource>(),
         new TezEngineTaskContext(taID, "user", "jobName", "vertexName",
-            new ProcessorDescriptor("processorClassName", null),
+            new ProcessorDescriptor("processorClassName"),
             Collections.singletonList(new InputSpec("vertexName", 1,
                 "inputClassName")), Collections.singletonList(new OutputSpec(
                 "vertexName", 1, "outputClassName"))), ta,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 62cc96c..525dcc9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -532,7 +532,7 @@ public class MRRSleepJob extends Configured implements Tool {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
-        MapProcessor.class.getName(),
+        MapProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(mapStageConf)),
         numMapper,
         MRHelpers.getMapResource(mapStageConf));
@@ -555,8 +555,8 @@ public class MRRSleepJob extends Configured implements Tool {
         Configuration iconf =
             intermediateReduceStageConfs[i];
         Vertex ivertex = new Vertex("ireduce" + (i+1),
-            new ProcessorDescriptor(ReduceProcessor.class.getName(),
-                MRHelpers.createUserPayloadFromConf(iconf)),
+                new ProcessorDescriptor(ReduceProcessor.class.getName()).
+                setUserPayload(MRHelpers.createUserPayloadFromConf(iconf)),
                 numIReducer,
                 MRHelpers.getReduceResource(iconf));
         ivertex.setJavaOpts(MRHelpers.getReduceJavaOpts(iconf));
@@ -571,7 +571,7 @@ public class MRRSleepJob extends Configured implements Tool {
     Vertex finalReduceVertex = null;
     if (numReducer > 0) {
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
-          ReduceProcessor.class.getName(),
+          ReduceProcessor.class.getName()).setUserPayload(
           MRHelpers.createUserPayloadFromConf(finalReduceConf)),
           numReducer,
           MRHelpers.getReduceResource(finalReduceConf));
@@ -591,9 +591,9 @@ public class MRRSleepJob extends Configured implements Tool {
             vertices.get(i), new EdgeProperty(
                 ConnectionPattern.BIPARTITE, SourceType.STABLE,
                 new OutputDescriptor(
-                    OnFileSortedOutput.class.getName(), null),
+                    OnFileSortedOutput.class.getName()),
                 new InputDescriptor(
-                    ShuffledMergedInput.class.getName(), null))));
+                    ShuffledMergedInput.class.getName()))));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 67be44a..83dc927 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -269,23 +269,23 @@ public class TestMRRJobsDAGApi {
     
     DAG dag = new DAG("testMRRSleepJobDagSubmit");
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
-        MapProcessor.class.getName(),
+        MapProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(stage1Conf)),
         inputSplitInfo.getNumTasks(), Resource.newInstance(256, 1));
     Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
-        ReduceProcessor.class.getName(),
+        ReduceProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(stage2Conf)),
         1, Resource.newInstance(256, 1));
     Vertex stage11Vertex = new Vertex("map1", new ProcessorDescriptor(
-        MapProcessor.class.getName(),
+        MapProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(stage1Conf)),
         inputSplitInfo1.getNumTasks(),  Resource.newInstance(256, 1));
     Vertex stage22Vertex = new Vertex("ireduce1", new ProcessorDescriptor(
-        ReduceProcessor.class.getName(),
+        ReduceProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(stage22Conf)),  
         2, Resource.newInstance(256, 1));
     Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
-        ReduceProcessor.class.getName(),
+        ReduceProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(stage3Conf)),
         1, Resource.newInstance(256, 1));
 
@@ -359,20 +359,20 @@ public class TestMRRJobsDAGApi {
 
     Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
         ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor(
-        OnFileSortedOutput.class.getName(), null), new InputDescriptor(
-                ShuffledMergedInput.class.getName(), null)));
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInput.class.getName())));
     Edge edge11 = new Edge(stage11Vertex, stage22Vertex, new EdgeProperty(
         ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor(
-        OnFileSortedOutput.class.getName(), null), new InputDescriptor(
-                ShuffledMergedInput.class.getName(), null)));
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInput.class.getName())));
     Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
         ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor(
-        OnFileSortedOutput.class.getName(), null), new InputDescriptor(
-                ShuffledMergedInput.class.getName(), null)));
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInput.class.getName())));
     Edge edge3 = new Edge(stage22Vertex, stage3Vertex, new EdgeProperty(
         ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor(
-        OnFileSortedOutput.class.getName(), null), new InputDescriptor(
-                ShuffledMergedInput.class.getName(), null)));
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInput.class.getName())));
 
     dag.addEdge(edge1);
     dag.addEdge(edge11);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 41e88a2..30ba8f8 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -246,7 +246,7 @@ public class LocalJobRunnerTez implements ClientProtocol {
           mapIds.add(mapId);
           // FIXME invalid task context
           ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor(
-                      MapProcessor.class.getName(), null);
+                      MapProcessor.class.getName());
           TezEngineTaskContext taskContext =
               new TezEngineTaskContext(
                   tezMapId, user, localConf.getJobName(), "TODO_vertexName",
@@ -451,7 +451,7 @@ public class LocalJobRunnerTez implements ClientProtocol {
             setupChildMapredLocalDirs(reduceId, user, localConf);
             // FIXME invalid task context
             ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
-                ReduceProcessor.class.getName(), null);
+                ReduceProcessor.class.getName());
             TezEngineTaskContext taskContext = new TezEngineTaskContext(
                 IDConverter.fromMRTaskAttemptId(reduceId), user,
                 localConf.getJobName(), "TODO_vertexName",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index f2f0c18..2bc327c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -183,7 +183,7 @@ public class MapUtils {
     InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
 
     ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor(
-        MapProcessor.class.getName(), null);
+        MapProcessor.class.getName());
     writeSplitFiles(fs, jobConf, split);
     TezEngineTaskContext taskContext = new TezEngineTaskContext(
         TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "testuser",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 1ae3539..7562265 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -134,7 +134,7 @@ public class TestReduceProcessor {
         "localized-resources").toUri().toString());
     FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
     ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
-        ReduceProcessor.class.getName(), null);
+        ReduceProcessor.class.getName());
     // Now run a reduce
     TezEngineTaskContext taskContext = new TezEngineTaskContext(
         TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "testUser",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 75a79c3..40a9563 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -400,8 +400,8 @@ public class YARNRunner implements ClientProtocol {
 
     Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
         : MRHelpers.getReduceResource(stageConf);
-    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(
-        processorName, MRHelpers.createUserPayloadFromConf(stageConf)),
+    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
+        setUserPayload(MRHelpers.createUserPayloadFromConf(stageConf)),
         numTasks, taskResource);
 
     Map<String, String> taskEnv = new HashMap<String, String>();
@@ -461,8 +461,8 @@ public class YARNRunner implements ClientProtocol {
       if (i > 0) {
         EdgeProperty edgeProperty = new EdgeProperty(
             ConnectionPattern.BIPARTITE, SourceType.STABLE,
-            new OutputDescriptor(OnFileSortedOutput.class.getName(), null),
-            new InputDescriptor(ShuffledMergedInput.class.getName(), null));
+            new OutputDescriptor(OnFileSortedOutput.class.getName()),
+            new InputDescriptor(ShuffledMergedInput.class.getName()));
 
         Edge edge = null;
         edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);


Mime
View raw message