tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1318. Simplify Vertex Constructor (bikas)
Date Sat, 09 Aug 2014 21:00:12 GMT
Repository: tez
Updated Branches:
  refs/heads/master a1dd82912 -> f184b1a0e


TEZ-1318. Simplify Vertex Constructor (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f184b1a0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f184b1a0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f184b1a0

Branch: refs/heads/master
Commit: f184b1a0e183b70fb3de3282782ab9d5b8ca4c9a
Parents: a1dd829
Author: Bikas Saha <bikas@apache.org>
Authored: Sat Aug 9 13:59:46 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Sat Aug 9 13:59:46 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/AMConfiguration.java  |  2 -
 .../org/apache/tez/client/TezClientUtils.java   |  2 +-
 .../org/apache/tez/common/TezYARNUtils.java     |  4 --
 .../main/java/org/apache/tez/dag/api/DAG.java   |  9 ++-
 .../apache/tez/dag/api/TezConfiguration.java    | 11 +++-
 .../java/org/apache/tez/dag/api/Vertex.java     | 64 ++++++++++++++++++--
 .../apache/tez/examples/OrderedWordCount.java   | 13 ++--
 .../examples/BroadcastAndOneToOneExample.java   | 13 +---
 .../mapreduce/examples/FilterLinesByWord.java   |  8 +--
 .../examples/FilterLinesByWordOneToOne.java     |  5 +-
 .../mapreduce/examples/IntersectDataGen.java    |  3 +-
 .../mapreduce/examples/IntersectExample.java    |  8 +--
 .../mapreduce/examples/IntersectValidate.java   | 23 +++----
 .../tez/mapreduce/examples/MRRSleepJob.java     |  9 +--
 .../examples/TestOrderedWordCount.java          |  9 +--
 .../tez/mapreduce/examples/UnionExample.java    | 21 ++-----
 .../tez/mapreduce/examples/WordCount.java       |  9 +--
 17 files changed, 117 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 597a95d..9206e72 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -21,11 +21,9 @@ package org.apache.tez.client;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index a82393f..d05dd7e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -580,7 +580,7 @@ public class TezClientUtils {
       amConfig.getTezConfiguration().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
           binaryPath.toUri().toString());
 
-      DAGPlan dagPB = dag.createDag(null);
+      DAGPlan dagPB = dag.createDag(amConfig.getTezConfiguration());
 
       FSDataOutputStream dagPBOutBinaryStream = null;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
index 87ff828..de82e8a 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
@@ -23,8 +23,6 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Shell;
@@ -40,8 +38,6 @@ public class TezYARNUtils {
   
   private static Pattern ENV_VARIABLE_PATTERN = Pattern.compile(Shell.getEnvironmentVariableRegex());
 
-  private static Log LOG = LogFactory.getLog(TezYARNUtils.class);
-  
   public static String getFrameworkClasspath(Configuration conf, boolean usingArchive) {
     Map<String, String> environment = new HashMap<String, String>();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index e754990..b7ff1fb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -531,7 +531,14 @@ public class DAG {
     }
 
     for (Vertex vertex : vertices.values()) {
-      // infer credentials and parallelism from data source
+      // infer credentials, resources and parallelism from data source
+      if (vertex.getTaskResource() == null) {
+        vertex.setTaskResource(Resource.newInstance(dagConf.getInt(
+            TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
+            TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), dagConf.getInt(
+            TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
+            TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT)));
+      }
       List<DataSourceDescriptor> dataSources = vertex.getDataSources();
       for (DataSourceDescriptor dataSource : dataSources) {
         if (dataSource.getCredentials() != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ce46bf3..29aae5b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -206,11 +206,20 @@ public class TezConfiguration extends Configuration {
   /** The amount of memory to be used by the AppMaster */
   public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
       + "resource.memory.mb";
-  public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1536;
+  public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1024;
 
   public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX
       + "resource.cpu.vcores";
   public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
+  
+  /** The amount of memory to be used by the AppMaster */
+  public static final String TEZ_TASK_RESOURCE_MEMORY_MB = TEZ_TASK_PREFIX
+      + "resource.memory.mb";
+  public static final int TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT = 1024;
+
+  public static final String TEZ_TASK_RESOURCE_CPU_VCORES = TEZ_TASK_PREFIX
+      + "resource.cpu.vcores";
+  public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1; 
 
   public static final String
           TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 5f67ad1..654d4a1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -41,7 +41,7 @@ public class Vertex {
 
   private int parallelism;
   private VertexLocationHint locationHint;
-  private final Resource taskResource;
+  private Resource taskResource;
   private Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
   private Map<String, String> taskEnvironment = new HashMap<String, String>();
   private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
additionalInputs 
@@ -74,7 +74,7 @@ public class Vertex {
    *          reconfigurations.
    * @param taskResource
    *          Physical resources like memory/cpu thats used by each task of this
-   *          vertex
+   *          vertex.
    */
   public Vertex(String vertexName,
       ProcessorDescriptor processorDescriptor,
@@ -89,9 +89,51 @@ public class Vertex {
           "Parallelism should be -1 if determined by the AM"
           + ", otherwise should be >= 0");
     }
-    if (taskResource == null) {
-      throw new IllegalArgumentException("Resource cannot be null");
-    }
+  }
+  
+  /**
+   * Create a new vertex with the given name and parallelism. <br>
+   * The vertex task resource will be picked from configuration
+   * {@link TezConfiguration#TEZ_TASK_RESOURCE_MEMORY_MB} &
+   * {@link TezConfiguration#TEZ_TASK_RESOURCE_CPU_VCORES} Applications that
+   * want more control over their task resource specification may create their
+   * own logic to determine task resources and use
+   * {@link Vertex#Vertex(String, ProcessorDescriptor, int, Resource)} to create
+   * the Vertex.
+   * 
+   * @param vertexName
+   *          Name of the vertex
+   * @param processorDescriptor
+   *          Description of the processor that is executed in every task of
+   *          this vertex
+   * @param parallelism
+   *          Number of tasks in this vertex. Set to -1 if this is going to be
+   *          decided at runtime. Parallelism may change at runtime due to graph
+   *          reconfigurations.
+   */
+  public Vertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism)
{
+    this(vertexName, processorDescriptor, parallelism, null);
+  }
+  
+  /**
+   * Create a new vertex with the given name. <br>
+   * The vertex task resource will be picked from configuration <br>
+   * The vertex parallelism will be inferred. If it cannot be inferred then an
+   * error will be reported. This constructor may be used for vertices that have
+   * data sources, or connected via 1-1 edges or have runtime parallelism
+   * estimation via data source initializers or vertex managers. Calling this
+   * constructor is equivalent to calling
+   * {@link Vertex#Vertex(String, ProcessorDescriptor, int)} with the
+   * parallelism set to -1.
+   * 
+   * @param vertexName
+   *          Name of the vertex
+   * @param processorDescriptor
+   *          Description of the processor that is executed in every task of
+   *          this vertex
+   */
+  public Vertex(String vertexName, ProcessorDescriptor processorDescriptor) {
+    this(vertexName, processorDescriptor, -1);
   }
 
   /**
@@ -120,6 +162,10 @@ public class Vertex {
     return parallelism;
   }
   
+  /**
+   * Set the number of tasks for this vertex
+   * @param parallelism Parallelism for this vertex
+   */
   void setParallelism(int parallelism) {
     this.parallelism = parallelism;
   }
@@ -345,6 +391,14 @@ public class Vertex {
     return Collections.unmodifiableList(outputVertices);
   }
   
+  /**
+   * Set the cpu/memory etc resources used by tasks of this vertex
+   * @param resource {@link Resource} for the tasks of this vertex
+   */
+  void setTaskResource(Resource resource) {
+    this.taskResource = resource;
+  }
+
   List<DataSourceDescriptor> getDataSources() {
     return dataSources;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index d0fd83b..217d0b7 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -43,7 +43,6 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.examples.WordCount.TokenProcessor;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -118,17 +117,15 @@ public class OrderedWordCount extends Configured implements Tool  {
         TextOutputFormat.class, outputPath).create();
 
     Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
-        TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
+        TokenProcessor.class.getName()));
     tokenizerVertex.addDataSource("MRInput", dataSource);
 
-    Vertex summationVertex = new Vertex("Summation",
-        new ProcessorDescriptor(
-            SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf));
+    Vertex summationVertex = new Vertex("Summation", new ProcessorDescriptor(
+        SumProcessor.class.getName()), numPartitions);
     
     // 1 task for global sorted order
-    Vertex sorterVertex = new Vertex("Sorter",
-        new ProcessorDescriptor(
-            NoOpSorter.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
+    Vertex sorterVertex = new Vertex("Sorter", new ProcessorDescriptor(
+        NoOpSorter.class.getName()), 1);
     sorterVertex.addDataSink("MROutput", dataSink);
 
     OrderedPartitionedKVEdgeConfigurer summationEdgeConf = OrderedPartitionedKVEdgeConfigurer

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 91ec179..74b2cb9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -45,7 +44,6 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -127,8 +125,6 @@ public class BroadcastAndOneToOneExample extends Configured implements
Tool {
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
       Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
 
-    JobConf mrConf = new JobConf(tezConf);
-
     int numBroadcastTasks = 2;
     int numOneToOneTasks = 3;
     if (doLocalityCheck) {
@@ -148,17 +144,14 @@ public class BroadcastAndOneToOneExample extends Configured implements
Tool {
     System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
 
     Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
-        InputProcessor.class.getName()),
-        numBroadcastTasks, MRHelpers.getMapResource(mrConf));
+        InputProcessor.class.getName()), numBroadcastTasks);
     
     Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
-        InputProcessor.class.getName()).setUserPayload(procPayload),
-        numOneToOneTasks, MRHelpers.getMapResource(mrConf));
+        InputProcessor.class.getName()).setUserPayload(procPayload), numOneToOneTasks);
 
     Vertex oneToOneVertex = new Vertex("OneToOne",
         new ProcessorDescriptor(
-            OneToOneProcessor.class.getName()).setUserPayload(procPayload),
-            -1, MRHelpers.getReduceResource(mrConf));
+            OneToOneProcessor.class.getName()).setUserPayload(procPayload));
     oneToOneVertex.setVertexManagerPlugin(
             new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 560fd55..817fff5 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -90,7 +89,6 @@ public class FilterLinesByWord extends Configured implements Tool {
 
   public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
   
-  private TezCounters counters = null;
   private boolean exitOnCompletion = false;
 
   public FilterLinesByWord(boolean exitOnCompletion) {
@@ -188,7 +186,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
-        stage1NumTasks, MRHelpers.getMapResource(stage1Conf));
+        stage1NumTasks);
     if (generateSplitsInClient) {
       stage1Vertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
       Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
@@ -212,8 +210,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
-        .createUserPayloadFromConf(stage2Conf)), 1,
-        MRHelpers.getReduceResource(stage2Conf));
+        .createUserPayloadFromConf(stage2Conf)), 1);
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
@@ -268,7 +265,6 @@ public class FilterLinesByWord extends Configured implements Tool {
       }
       
       dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-      counters = dagStatus.getDAGCounters();
       
     } finally {
       fs.delete(stagingDir, true);

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 09c55e8..43309b1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -174,7 +174,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool
{
     int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
-        stage1NumTasks, MRHelpers.getMapResource(stage1Conf));
+        stage1NumTasks);
     if (generateSplitsInClient) {
       stage1Vertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
       Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
@@ -198,8 +198,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool
{
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
-        .createUserPayloadFromConf(stage2Conf)), stage1NumTasks,
-        MRHelpers.getMapResource(stage2Conf));
+        .createUserPayloadFromConf(stage2Conf)), stage1NumTasks);
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index d83aa34..3e11870 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -46,7 +46,6 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.TezProcessorContext;
@@ -201,7 +200,7 @@ public class IntersectDataGen extends Configured implements Tool {
 
     Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
         GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
-        largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf));
+        largeOutSizePerTask, smallOutSizePerTask)), numTasks);
     genDataVertex.addDataSink(STREAM_OUTPUT_NAME, 
         MROutput.createConfigurer(new Configuration(tezConf),
             TextOutputFormat.class, largeOutPath.toUri().toString()).create());

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index cc85324..b226d9c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -46,7 +46,6 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -185,22 +184,21 @@ public class IntersectExample extends Configured implements Tool {
 
     // Change the way resources are setup - no MRHelpers
     Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(
-        ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+        ForwardingProcessor.class.getName())).addDataSource(
         "streamfile",
         MRInput
             .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
                 streamPath.toUri().toString()).groupSplitsInAM(false).create());
 
     Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
-        ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+        ForwardingProcessor.class.getName())).addDataSource(
         "hashfile",
         MRInput
             .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
                 hashPath.toUri().toString()).groupSplitsInAM(false).create());
 
     Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
-        IntersectProcessor.class.getName()), numPartitions,
-        MRHelpers.getReduceResource(tezConf)).addDataSink("finalOutput",
+        IntersectProcessor.class.getName()), numPartitions).addDataSink("finalOutput",
         MROutput.createConfigurer(new Configuration(tezConf),
             TextOutputFormat.class, outPath.toUri().toString()).create());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 9e076ca..5dd7d99 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -44,7 +44,6 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.examples.IntersectExample.ForwardingProcessor;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Reader;
@@ -189,26 +188,20 @@ public class IntersectValidate extends Configured implements Tool {
         .newBuilder(Text.class.getName(), NullWritable.class.getName(),
             HashPartitioner.class.getName()).build();
 
-    // Change the way resources are setup - no MRHelpers
     Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
-        ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addDataSource(
-        "lhs",
+        ForwardingProcessor.class.getName())).addDataSource("lhs",
         MRInput
-        .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
-            lhs.toUri().toString()).groupSplitsInAM(false).create());
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                lhs.toUri().toString()).groupSplitsInAM(false).create());
 
     Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
-        ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addDataSource(
-        "rhs",
+        ForwardingProcessor.class.getName())).addDataSource("rhs",
         MRInput
-        .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
-            rhs.toUri().toString()).groupSplitsInAM(false).create());
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                rhs.toUri().toString()).groupSplitsInAM(false).create());
 
-    Vertex intersectValidateVertex = new Vertex("intersectvalidate",
-        new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),
-        numPartitions, MRHelpers.getReduceResource(tezConf));
+    Vertex intersectValidateVertex = new Vertex("intersectvalidate", new ProcessorDescriptor(
+        IntersectValidateProcessor.class.getName()), numPartitions);
 
     Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
     Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 23acbe4..1596461 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -541,8 +541,7 @@ public class MRRSleepJob extends Configured implements Tool {
     int numTasks = generateSplitsInAM ? -1 : numMapper;
     
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(mapUserPayload),
-        numTasks, MRHelpers.getMapResource(mapStageConf));
+        MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks);
 
     if (writeSplitsToDFS) {
       mapVertex.setLocationHint(new VertexLocationHint(taskLocHint));
@@ -566,8 +565,7 @@ public class MRRSleepJob extends Configured implements Tool {
         byte[] iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
         Vertex ivertex = new Vertex("ireduce" + (i+1),
                 new ProcessorDescriptor(ReduceProcessor.class.getName()).
-                setUserPayload(iReduceUserPayload), numIReducer,
-                MRHelpers.getReduceResource(iconf));
+                setUserPayload(iReduceUserPayload), numIReducer);
         ivertex.setTaskLocalFiles(commonLocalResources);
         vertices.add(ivertex);
       }
@@ -577,8 +575,7 @@ public class MRRSleepJob extends Configured implements Tool {
     if (numReducer > 0) {
       byte[] reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
-          ReduceProcessor.class.getName()).setUserPayload(reducePayload),
-          numReducer, MRHelpers.getReduceResource(finalReduceConf));
+          ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
       finalReduceVertex.setTaskLocalFiles(commonLocalResources);
       MRHelpers.addMROutputLegacy(finalReduceVertex, reducePayload);
       vertices.add(finalReduceVertex);

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index b243186..ccf55af 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -210,8 +210,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapPayload)
-            .setHistoryText(mapStageHistoryText),
-        numMaps, MRHelpers.getMapResource(mapStageConf));
+            .setHistoryText(mapStageHistoryText), numMaps);
     if (generateSplitsInClient) {
       mapVertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
       Map<String, LocalResource> mapLocalResources =
@@ -237,8 +236,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
         ReduceProcessor.class.getName())
             .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
-            .setHistoryText(iReduceStageHistoryText),
-        2, MRHelpers.getReduceResource(iReduceStageConf));
+            .setHistoryText(iReduceStageHistoryText), 2);
     ivertex.setTaskLocalFiles(commonLocalResources);
     vertices.add(ivertex);
 
@@ -250,8 +248,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
         new ProcessorDescriptor(
             ReduceProcessor.class.getName())
                 .setUserPayload(finalReducePayload)
-                .setHistoryText(finalReduceStageHistoryText), 1,
-        MRHelpers.getReduceResource(finalReduceConf));
+                .setHistoryText(finalReduceStageHistoryText), 1);
     finalReduceVertex.setTaskLocalFiles(commonLocalResources);
     MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
     vertices.add(finalReduceVertex);

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 77c6656..90d3090 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -49,7 +49,6 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -175,24 +174,16 @@ public class UnionExample {
     DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
 
     Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
-        TokenProcessor.class.getName()),
-        numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex1.addDataSource("MRInput", dataSource);
+        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
 
     Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
-        TokenProcessor.class.getName()),
-        numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex2.addDataSource("MRInput", dataSource);
+        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
 
     Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
-        TokenProcessor.class.getName()),
-        numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex3.addDataSource("MRInput", dataSource);
-
-    Vertex checkerVertex = new Vertex("checker",
-        new ProcessorDescriptor(
-            UnionProcessor.class.getName()),
-                1, MRHelpers.getReduceResource(tezConf));
+        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
+
+    Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor(
+        UnionProcessor.class.getName()), 1);
 
     Configuration outputConf = new Configuration(tezConf);
     DataSinkDescriptor od = MROutput.createConfigurer(outputConf,

http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 593eeff..d408146 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -39,7 +39,6 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -118,13 +117,11 @@ public class WordCount extends Configured implements Tool {
         TextOutputFormat.class, outputPath).create();
 
     Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
-        TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
-    tokenizerVertex.addDataSource("Input", dataSource);
+        TokenProcessor.class.getName())).addDataSource("Input", dataSource);
 
     Vertex summerVertex = new Vertex("Summer",
-        new ProcessorDescriptor(
-            SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf));
-    summerVertex.addDataSink("Output", dataSink);
+        new ProcessorDescriptor(SumProcessor.class.getName()), numPartitions)
+        .addDataSink("Output", dataSink);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),


Mime
View raw message