tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-1407. Move MRInput related methods out of MRHelpers and consolidate. (sseth)
Date Tue, 12 Aug 2014 18:42:47 GMT
TEZ-1407. Move MRInput related methods out of MRHelpers and consolidate.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/33982e1c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/33982e1c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/33982e1c

Branch: refs/heads/master
Commit: 33982e1c54c5f087df0ad5e1bb458752769d5f8f
Parents: 935896d
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Aug 12 11:42:22 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Aug 12 11:42:22 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../main/java/org/apache/tez/dag/api/DAG.java   |   1 +
 .../tez/dag/api/DataSourceDescriptor.java       |  61 +-
 .../java/org/apache/tez/dag/api/Vertex.java     |  24 +-
 .../mapreduce/examples/FilterLinesByWord.java   |  49 +-
 .../examples/FilterLinesByWordOneToOne.java     |  48 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  61 +-
 .../examples/TestOrderedWordCount.java          |  66 +-
 .../apache/tez/mapreduce/client/YARNRunner.java |  47 +-
 .../common/MRInputAMSplitGenerator.java         |   7 +-
 .../common/MRInputSplitDistributor.java         |   4 +-
 .../tez/mapreduce/hadoop/InputSplitInfo.java    |  18 +
 .../mapreduce/hadoop/InputSplitInfoDisk.java    |  19 +
 .../tez/mapreduce/hadoop/InputSplitInfoMem.java |   9 +-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 629 +----------------
 .../tez/mapreduce/hadoop/MRInputHelpers.java    | 694 +++++++++++++++++++
 .../org/apache/tez/mapreduce/input/MRInput.java | 129 +++-
 .../tez/mapreduce/input/base/MRInputBase.java   |   3 +-
 .../apache/tez/mapreduce/lib/MRInputUtils.java  |   6 +-
 .../apache/tez/mapreduce/output/MROutput.java   |  10 +-
 .../tez/mapreduce/output/MROutputLegacy.java    |  27 +
 .../common/TestMRInputSplitDistributor.java     |   9 +-
 .../tez/mapreduce/hadoop/TestMRHelpers.java     | 213 ------
 .../mapreduce/hadoop/TestMRInputHelpers.java    | 228 ++++++
 .../tez/mapreduce/input/TestMultiMRInput.java   |  10 +-
 .../processor/map/TestMapProcessor.java         |   7 +-
 .../processor/reduce/TestReduceProcessor.java   |   7 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  64 +-
 28 files changed, 1330 insertions(+), 1121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 99d25c1..97d563b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -52,6 +52,7 @@ INCOMPATIBLE CHANGES
   TEZ-1394. Create example code for OrderedWordCount
   TEZ-1372. Fix preWarm to work after recent API changes
   TEZ-1237. Consolidate naming of API classes
+  TEZ-1407. Move MRInput related methods out of MRHelpers and consolidate.
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index b7ff1fb..3c54ba7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -544,6 +544,7 @@ public class DAG {
         if (dataSource.getCredentials() != null) {
           credentials.addAll(dataSource.getCredentials());
         }
+        vertex.addAdditionalLocalResources(dataSource.getAdditionalLocalResources());
       }
       if (dataSources.size() == 1) {
         DataSourceDescriptor dataSource = dataSources.get(0);

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
index 88304c4..cae8ab3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
@@ -20,7 +20,11 @@ package org.apache.tez.dag.api;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
@@ -36,6 +40,7 @@ public class DataSourceDescriptor {
   private final Credentials credentials;
   private final int numShards;
   private final VertexLocationHint locationHint;
+  private final Map<String, LocalResource> additionalLocalResources;
 
   /**
    * Create a {@link DataSourceDescriptor} when the data shard calculation 
@@ -43,7 +48,7 @@ public class DataSourceDescriptor {
    * @param inputDescriptor
    *          An {@link InputDescriptor} for the Input
    * @param credentials Credentials needed to access the data
-   * @param inputInitializerDescriptor
+   * @param initializerDescriptor
    *          An initializer for this Input which may run within the AM. This
    *          can be used to set the parallelism for this vertex and generate
    *          {@link InputDataInformationEvent}s for the actual Input.</p>
@@ -56,35 +61,42 @@ public class DataSourceDescriptor {
   public DataSourceDescriptor(InputDescriptor inputDescriptor,
       @Nullable InputInitializerDescriptor initializerDescriptor, 
       @Nullable Credentials credentials) {
-    this(inputDescriptor, initializerDescriptor, -1, credentials, null);
+    this(inputDescriptor, initializerDescriptor, -1, credentials, null, null);
   }
-  
+
   /**
    * Create a {@link DataSourceDescriptor} when the data shard calculation
    * happens in the client at compile time
-   * @param inputDescriptor
-   *          An {@link InputDescriptor} for the Input
-   * @param inputInitializerDescriptor
-   *          An initializer for this Input which may run within the AM. This
-   *          can be used to set the parallelism for this vertex and generate
-   *          {@link InputDataInformationEvent}s for the actual Input.</p>
-   *          If this is not specified, the parallelism must be set for the
-   *          vertex. In addition, the Input should know how to access data for
-   *          each of it's tasks. </p> If a {@link InputInitializer} is
-   *          meant to determine the parallelism of the vertex, the initial
-   *          vertex parallelism should be set to -1. Can be null.
-   * @param numShards Number of shards of data
-   * @param credentials Credentials needed to access the data
-   * @param locationHint Location hints for the vertex tasks
+   *
+   * @param inputDescriptor          An {@link InputDescriptor} for the Input
+   * @param initializerDescriptor    An initializer for this Input which may run within the AM.
+   *                                 This can be used to set the parallelism for this vertex and
+   *                                 generate {@link org.apache.tez.runtime.api.events.InputDataInformationEvent}s
+   *                                 for the actual Input.</p>
+   *                                 If this is not specified, the parallelism must be set for the
+   *                                 vertex. In addition, the Input should know how to access data
+   *                                 for each of it's tasks. </p> If a {@link org.apache.tez.runtime.api.InputInitializer}
+   *                                 is
+   *                                 meant to determine the parallelism of the vertex, the initial
+   *                                 vertex parallelism should be set to -1. Can be null.
+   * @param numShards                Number of shards of data
+   * @param credentials              Credentials needed to access the data
+   * @param locationHint             Location hints for the vertex tasks
+   * @param additionalLocalResources additional local resources required by this Input. An attempt
+   *                                 will be made to add these resources to the Vertex as Private
+   *                                 resources. If a name conflict occurs, a {@link
+   *                                 org.apache.tez.dag.api.TezException} will be thrown
    */
   public DataSourceDescriptor(InputDescriptor inputDescriptor,
       @Nullable InputInitializerDescriptor initializerDescriptor, int numShards,
-      @Nullable Credentials credentials, @Nullable VertexLocationHint locationHint) {
+      @Nullable Credentials credentials, @Nullable VertexLocationHint locationHint,
+      @Nullable Map<String, LocalResource> additionalLocalResources) {
     this.inputDescriptor = inputDescriptor;
     this.initializerDescriptor = initializerDescriptor;
     this.numShards = numShards;
     this.credentials = credentials;
     this.locationHint = locationHint;
+    this.additionalLocalResources = additionalLocalResources;
   }
 
   public InputDescriptor getInputDescriptor() {
@@ -102,6 +114,7 @@ public class DataSourceDescriptor {
    * Returns -1 when this is determined at runtime in the AM.
    * @return number of tasks
    */
+  @InterfaceAudience.Private
   public int getNumberOfShards() {
     return numShards;
   }
@@ -111,6 +124,7 @@ public class DataSourceDescriptor {
    * Is null when this calculation happens on the AppMaster (default)
    * @return credentials.
    */
+  @InterfaceAudience.Private
   public @Nullable Credentials getCredentials() {
     return credentials;
   }
@@ -120,8 +134,19 @@ public class DataSourceDescriptor {
    * Is null when shard calculation happens on the AppMaster (default)
    * @return List of {@link TaskLocationHint}
    */
+  @InterfaceAudience.Private
   public @Nullable VertexLocationHint getLocationHint() {
     return locationHint;
   }
 
+  /**
+   * Get the list of additional local resources which were specified during creation.
+   * @return
+   */
+  @InterfaceAudience.Private
+  public @Nullable Map<String, LocalResource> getAdditionalLocalResources() {
+    return additionalLocalResources;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 66c708a..5624c4b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -42,7 +42,7 @@ public class Vertex {
   private int parallelism;
   private VertexLocationHint locationHint;
   private Resource taskResource;
-  private Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
+  private final Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
   private Map<String, String> taskEnvironment = new HashMap<String, String>();
   private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs 
                       = new ArrayList<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
@@ -192,7 +192,7 @@ public class Vertex {
   /**
    * Specify location hints for the tasks of this vertex. Hints must be specified 
    * for all tasks as defined by the parallelism
-   * @param locations list of locations for each task in the vertex
+   * @param locationHint list of locations for each task in the vertex
    * @return this Vertex
    */
   public Vertex setLocationHint(VertexLocationHint locationHint) {
@@ -220,10 +220,8 @@ public class Vertex {
    * @return this Vertex
    */
   public Vertex setTaskLocalFiles(Map<String, LocalResource> localFiles) {
-    if (localFiles == null) {
-      this.taskLocalResources = new HashMap<String, LocalResource>();
-    } else {
-      this.taskLocalResources = localFiles;
+    if (localFiles != null) {
+      this.taskLocalResources.putAll(localFiles);
     }
     return this;
   }
@@ -401,7 +399,19 @@ public class Vertex {
   public List<Vertex> getOutputVertices() {
     return Collections.unmodifiableList(outputVertices);
   }
-  
+
+  void addAdditionalLocalResources(Map<String, LocalResource> additionalLrs) {
+    if (additionalLrs != null && !additionalLrs.isEmpty()) {
+      for (Map.Entry<String, LocalResource> lr : additionalLrs.entrySet()) {
+        if (taskLocalResources.containsKey(lr.getKey())) {
+          throw new TezUncheckedException("Attempting to add duplicate resource: " + lr.getKey());
+        } else {
+          taskLocalResources.put(lr.getKey(), lr.getValue());
+        }
+      }
+    }
+  }
+
   /**
    * Set the cpu/memory etc resources used by tasks of this vertex
    * @param resource {@link Resource} for the tasks of this vertex

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index ff9f317..50db89a 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -21,7 +21,6 @@ package org.apache.tez.mapreduce.examples;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -38,6 +37,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
@@ -56,8 +56,6 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -65,20 +63,17 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.processor.FilterByWordInputProcessor;
 import org.apache.tez.processor.FilterByWordOutputProcessor;
-import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
 
 import com.google.common.collect.Sets;
@@ -165,47 +160,29 @@ public class FilterLinesByWord extends Configured implements Tool {
     tezSession.start(); // Why do I need to start the TezSession.
 
     Configuration stage1Conf = new JobConf(conf);
-    stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
-    stage1Conf.setBoolean("mapred.mapper.new-api", false);
     stage1Conf.set(FILTER_PARAM_NAME, filterWord);
 
-    InputSplitInfo inputSplitInfo = null;
-    if (generateSplitsInClient) {
-      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
-      if (inputSplitInfo.getCredentials() != null) {
-        credentials.addAll(inputSplitInfo.getCredentials());
-      }
-    }
-
     Configuration stage2Conf = new JobConf(conf);
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
     stage2Conf.setBoolean("mapred.mapper.new-api", false);
 
     byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
-    int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
-        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
-        stage1NumTasks);
+        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
+        .setTaskLocalFiles(commonLocalResources);
+
+    DataSourceDescriptor dsd;
     if (generateSplitsInClient) {
-      stage1Vertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
-      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
-      stage1LocalResources.putAll(commonLocalResources);
-      MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
-      stage1Vertex.setTaskLocalFiles(stage1LocalResources);
+      // TODO TEZ-1406. Dont' use MRInputLegacy
+      stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
+      stage1Conf.setBoolean("mapred.mapper.new-api", false);
+      dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(stage1Conf, stagingDir, true);
     } else {
-      stage1Vertex.setTaskLocalFiles(commonLocalResources);
+      dsd = MRInputLegacy.createConfigurer(stage1Conf, TextInputFormat.class, inputPath)
+          .groupSplitsInAM(false).create();
     }
-
-    // Configure the Input for stage1
-    Class<? extends InputInitializer> initializerClazz = generateSplitsInClient ? null
-        : MRInputAMSplitGenerator.class;
-    stage1Vertex.addDataSource(
-        "MRInput",
-        new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class
-            .getName()).setUserPayload(MRHelpers.createMRInputPayload(stage1Payload)),
-            (initializerClazz == null ? null
-            : new InputInitializerDescriptor(initializerClazz.getName())), null));
+    stage1Vertex.addDataSource("MRInput", dsd);
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 6047af9..0040f02 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.mapreduce.examples;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -33,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ClassUtil;
@@ -49,8 +49,6 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -58,20 +56,17 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.processor.FilterByWordInputProcessor;
 import org.apache.tez.processor.FilterByWordOutputProcessor;
-import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
 
 public class FilterLinesByWordOneToOne extends Configured implements Tool {
@@ -155,15 +150,8 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     tezSession.start(); // Why do I need to start the TezSession.
 
     Configuration stage1Conf = new JobConf(conf);
-    stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
-    stage1Conf.setBoolean("mapred.mapper.new-api", false);
     stage1Conf.set(FILTER_PARAM_NAME, filterWord);
 
-    InputSplitInfo inputSplitInfo = null;
-    if (generateSplitsInClient) {
-      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
-    }
-
     Configuration stage2Conf = new JobConf(conf);
 
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
@@ -171,34 +159,26 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
 
     byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
-    int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
-        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
-        stage1NumTasks);
+        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
+        .setTaskLocalFiles(commonLocalResources);
+
+    DataSourceDescriptor dsd;
     if (generateSplitsInClient) {
-      stage1Vertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
-      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
-      stage1LocalResources.putAll(commonLocalResources);
-      MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
-      stage1Vertex.setTaskLocalFiles(stage1LocalResources);
+      // TODO TEZ-1406. Dont' use MRInputLegacy
+      stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
+      stage1Conf.setBoolean("mapred.mapper.new-api", false);
+      dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(stage1Conf, stagingDir, true);
     } else {
-      stage1Vertex.setTaskLocalFiles(commonLocalResources);
+      dsd = MRInputLegacy.createConfigurer(stage1Conf, TextInputFormat.class, inputPath)
+          .groupSplitsInAM(false).create();
     }
-
-    // Configure the Input for stage1
-    Class<? extends InputInitializer> initializerClazz = generateSplitsInClient ? null
-        : MRInputAMSplitGenerator.class;
-    stage1Vertex.addDataSource(
-        "MRInput",
-        new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class
-            .getName()).setUserPayload(MRHelpers.createMRInputPayload(
-            stage1Payload)), (initializerClazz == null ? null
-            : new InputInitializerDescriptor(initializerClazz.getName())), null));
+    stage1Vertex.addDataSource("MRInput", dsd);
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
-        .createUserPayloadFromConf(stage2Conf)), stage1NumTasks);
+        .createUserPayloadFromConf(stage2Conf)), dsd.getNumberOfShards());
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 1596461..4380840 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -66,19 +66,17 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
@@ -482,32 +480,14 @@ public class MRRSleepJob extends Configured implements Tool {
     }
 
     DataSourceDescriptor dataSource = null;
-    List<TaskLocationHint> taskLocHint = null;
-    InputSplitInfo inputSplitInfo = null;
-    if (!generateSplitsInAM) {
-      if (writeSplitsToDFS) {
-        LOG.info("Writing splits to DFS");
-        try {
-          inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
-              remoteStagingDir);
-        } catch (InterruptedException e) {
-          throw new TezUncheckedException("Could not generate input splits", e);
-        } catch (ClassNotFoundException e) {
-          throw new TezUncheckedException("Failed to generate input splits", e);
-        }
-        if (inputSplitInfo.getCredentials() != null) {
-          this.credentials.addAll(inputSplitInfo.getCredentials());
-        }
-        taskLocHint = inputSplitInfo.getTaskLocationHints();
-        byte[] mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
-        InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName()).setUserPayload(mapInputPayload);
-        dataSource = new DataSourceDescriptor(id, null, null);
-      } else {
-        dataSource = MRInputLegacy.createConfigurer(mapStageConf, SleepInputFormat.class).
-            generateSplitsInAM(false).create();
-      }
+    if (!generateSplitsInAM && writeSplitsToDFS) {
+
+      LOG.info("Writing splits to DFS");
+      dataSource = MRInputHelpers
+          .configureMRInputWithLegacySplitGeneration(mapStageConf, remoteStagingDir, true);
     } else {
-      dataSource = MRInputLegacy.createConfigurer(mapStageConf, SleepInputFormat.class).create();
+      dataSource = MRInputLegacy.createConfigurer(mapStageConf, SleepInputFormat.class)
+          .generateSplitsInAM(generateSplitsInAM).create();
     }
 
     DAG dag = new DAG("MRRSleepJob");
@@ -539,21 +519,10 @@ public class MRRSleepJob extends Configured implements Tool {
     
     byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
     int numTasks = generateSplitsInAM ? -1 : numMapper;
-    
-    Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks);
-
-    if (writeSplitsToDFS) {
-      mapVertex.setLocationHint(new VertexLocationHint(taskLocHint));
-      Map<String, LocalResource> mapLocalResources = new HashMap<String, LocalResource>();
-      mapLocalResources.putAll(commonLocalResources);
-      MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
-          mapLocalResources);
-      mapVertex.setTaskLocalFiles(mapLocalResources);
-    } else {
-      mapVertex.setTaskLocalFiles(commonLocalResources);
-    }
 
+    Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
+        MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks)
+        .setTaskLocalFiles(commonLocalResources);
     mapVertex.addDataSource("MRInput", dataSource);
     vertices.add(mapVertex);
 
@@ -577,11 +546,13 @@ public class MRRSleepJob extends Configured implements Tool {
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
           ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
       finalReduceVertex.setTaskLocalFiles(commonLocalResources);
-      MRHelpers.addMROutputLegacy(finalReduceVertex, reducePayload);
+      finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigurer(finalReduceConf,
+          NullOutputFormat.class).create());
       vertices.add(finalReduceVertex);
     } else {
       // Map only job
-      MRHelpers.addMROutputLegacy(mapVertex, mapUserPayload);
+      mapVertex.addDataSink("MROutput",
+          MROutputLegacy.createConfigurer(mapStageConf, NullOutputFormat.class).create());
     }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 583c360..5578665 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,7 +43,6 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
@@ -56,25 +54,25 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.PreWarmVertex;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -155,16 +153,6 @@ public class TestOrderedWordCount extends Configured implements Tool {
     Configuration mapStageConf = new JobConf(conf);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
         TokenizerMapper.class.getName());
-    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
-        TextInputFormat.class.getName());
-    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
-    mapStageConf.setBoolean("mapred.mapper.new-api", true);
-
-    InputSplitInfo inputSplitInfo = null;
-    if (generateSplitsInClient) {
-      inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf, stagingDir);
-      mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks());
-    }
 
     MRHelpers.translateVertexConfToTez(mapStageConf);
 
@@ -183,10 +171,6 @@ public class TestOrderedWordCount extends Configured implements Tool {
     finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
     finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
         MyOrderByNoOpReducer.class.getName());
-    finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
-        TextOutputFormat.class.getName());
-    finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
-    finalReduceConf.setBoolean("mapred.mapper.new-api", true);
     finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
     finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
     MRHelpers.translateVertexConfToTez(finalReduceConf);
@@ -200,34 +184,22 @@ public class TestOrderedWordCount extends Configured implements Tool {
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
     mapStageConf.writeXml(outputStream);
     String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
-    byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload;
-    if (generateSplitsInClient) {
-      mapInputPayload = MRHelpers.createMRInputPayload(mapPayload);
-    } else {
-      mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
-    }
-    int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
-    Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(mapPayload)
-            .setHistoryText(mapStageHistoryText), numMaps);
+
+    DataSourceDescriptor dsd;
     if (generateSplitsInClient) {
-      mapVertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
-      Map<String, LocalResource> mapLocalResources =
-          new HashMap<String, LocalResource>();
-      mapLocalResources.putAll(commonLocalResources);
-      MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo,
-          mapLocalResources);
-      mapVertex.setTaskLocalFiles(mapLocalResources);
+      mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+          TextInputFormat.class.getName());
+      mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+      mapStageConf.setBoolean("mapred.mapper.new-api", true);
+      dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true);
     } else {
-      mapVertex.setTaskLocalFiles(commonLocalResources);
+      dsd = MRInputLegacy.createConfigurer(mapStageConf, TextInputFormat.class, inputPath).create();
     }
 
-    Class<? extends InputInitializer> initializerClazz = generateSplitsInClient ? null
-        : MRInputAMSplitGenerator.class;
-    MRHelpers.addMRInput(mapVertex, mapInputPayload,
-        (initializerClazz==null) ? null :
-          new InputInitializerDescriptor(initializerClazz.getName()));
+    Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
+        MapProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(mapStageConf))
+        .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
+    mapVertex.addDataSource("MRInput", dsd);
     vertices.add(mapVertex);
 
     ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
@@ -250,7 +222,9 @@ public class TestOrderedWordCount extends Configured implements Tool {
                 .setUserPayload(finalReducePayload)
                 .setHistoryText(finalReduceStageHistoryText), 1);
     finalReduceVertex.setTaskLocalFiles(commonLocalResources);
-    MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
+    finalReduceVertex.addDataSink("MROutput",
+        MROutputLegacy.createConfigurer(finalReduceConf, TextOutputFormat.class, outputPath)
+            .create());
     vertices.add(finalReduceVertex);
 
     DAG dag = new DAG("OrderedWordCount" + dagIndex);

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 6a11625..8e5702f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -80,10 +80,16 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.client.MRTezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -91,14 +97,20 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.MRDAGClient;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 
@@ -404,12 +416,15 @@ public class YARNRunner implements ClientProtocol {
         setUserPayload(vertexUserPayload),
         numTasks, taskResource);
     if (isMap) {
-      byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload);
-      MRHelpers.addMRInput(vertex, mapInputPayload, null);
+      vertex.addDataSource("MRInput",
+          configureMRInputWithLegacySplitsGenerated(stageConf, true));
     }
     // Map only jobs.
     if (stageNum == totalStages -1) {
-      MRHelpers.addMROutputLegacy(vertex, vertexUserPayload);
+      OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
+          .setUserPayload(vertexUserPayload);
+      vertex.addDataSink("MROutput", new DataSinkDescriptor(od,
+          new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
     }
 
     Map<String, String> taskEnv = new HashMap<String, String>();
@@ -773,4 +788,30 @@ public class YARNRunner implements ClientProtocol {
     }
   }
 
+  @Private
+  private static DataSourceDescriptor configureMRInputWithLegacySplitsGenerated(Configuration conf,
+                                                                                boolean useLegacyInput) {
+    InputDescriptor inputDescriptor;
+
+    try {
+      inputDescriptor = new InputDescriptor(useLegacyInput ? MRInputLegacy.class
+          .getName() : MRInput.class.getName())
+          .setUserPayload(MRInputHelpersInternal.createMRInputPayload(conf, null));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+
+    DataSourceDescriptor dsd = new DataSourceDescriptor(inputDescriptor, null, null);
+    return dsd;
+  }
+
+  private static class MRInputHelpersInternal extends MRInputHelpers {
+
+    protected static byte[] createMRInputPayload(Configuration conf,
+                                                 MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
+        IOException {
+      return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index 4fd134b..df942cc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
@@ -59,7 +60,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
     if (LOG.isDebugEnabled()) {
       sw = new Stopwatch().start();
     }
-    MRInputUserPayloadProto userPayloadProto = MRHelpers
+    MRInputUserPayloadProto userPayloadProto = MRInputHelpers
         .parseMRInputPayload(getContext().getInputUserPayload());
     if (LOG.isDebugEnabled()) {
       sw.stop();
@@ -105,9 +106,9 @@ public class MRInputAMSplitGenerator extends InputInitializer {
     boolean groupSplits = userPayloadProto.getGroupingEnabled();
     if (groupSplits) {
       LOG.info("Grouping input splits");
-      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, true, numTasks);
+      inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks);
     } else {
-      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, false, 0);
+      inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
     }
     if (LOG.isDebugEnabled()) {
       sw.stop();

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 4be903f..7f0ec4e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.lib.MRInputUtils;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -59,7 +60,8 @@ public class MRInputSplitDistributor extends InputInitializer {
     if (LOG.isDebugEnabled()) {
       sw = new Stopwatch().start();
     }
-    MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(getContext().getInputUserPayload());
+    MRInputUserPayloadProto userPayloadProto = MRInputHelpers
+        .parseMRInputPayload(getContext().getInputUserPayload());
     if (LOG.isDebugEnabled()) {
       sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
index e84a863..15fe463 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
@@ -79,4 +79,22 @@ public interface InputSplitInfo {
    * @return {@link Credentials} which may be required to access the splits.
    */
   public abstract Credentials getCredentials();
+
+  /**
+   * Check whether the current instance is using old / new format splits
+   * @return true if using new format splits, false otherwise
+   */
+  public boolean holdsNewFormatSplits();
+
+  /**
+   * Get new format splits. Should only be used if the mapreduce API is being used
+   * @return
+   */
+  public org.apache.hadoop.mapreduce.InputSplit[] getNewFormatSplits();
+
+  /**
+   * Get old format splits. Should only be used if the mapred API is being used
+   * @return
+   */
+  public org.apache.hadoop.mapred.InputSplit[] getOldFormatSplits();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
index 981c4d9..5aeda3d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.hadoop;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.security.Credentials;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -107,4 +108,22 @@ public class InputSplitInfoDisk implements InputSplitInfo {
     return this.credentials;
   }
 
+  @Override
+  public boolean holdsNewFormatSplits() {
+    throw new UnsupportedOperationException("Not supported for Type: "
+        + getType());
+  }
+
+  @Override
+  public InputSplit[] getNewFormatSplits() {
+    throw new UnsupportedOperationException("Not supported for Type: "
+        + getType());
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.InputSplit[] getOldFormatSplits() {
+    throw new UnsupportedOperationException("Not supported for Type: "
+        + getType());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
index 6cdbaaa..b3fcdf2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
@@ -125,11 +125,13 @@ public class InputSplitInfoMem implements InputSplitInfo {
   public Credentials getCredentials() {
     return this.credentials;
   }
-  
+
+  @Override
   public boolean holdsNewFormatSplits() {
     return this.isNewSplit;  
   }
 
+  @Override
   public org.apache.hadoop.mapreduce.InputSplit[] getNewFormatSplits() {
     Preconditions
         .checkState(
@@ -138,6 +140,7 @@ public class InputSplitInfoMem implements InputSplitInfo {
     return newFormatSplits;
   }
 
+  @Override
   public org.apache.hadoop.mapred.InputSplit[] getOldFormatSplits() {
     Preconditions
         .checkState(
@@ -153,7 +156,7 @@ public class InputSplitInfoMem implements InputSplitInfo {
     MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
 
     for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
-      splitsBuilder.addSplits(MRHelpers.createSplitProto(newSplit, serializationFactory));
+      splitsBuilder.addSplits(MRInputHelpers.createSplitProto(newSplit, serializationFactory));
     }
     return splitsBuilder.build();
   }
@@ -162,7 +165,7 @@ public class InputSplitInfoMem implements InputSplitInfo {
       org.apache.hadoop.mapred.InputSplit[] oldSplits) throws IOException {
     MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
     for (org.apache.hadoop.mapred.InputSplit oldSplit : oldSplits) {
-      splitsBuilder.addSplits(MRHelpers.createSplitProto(oldSplit));
+      splitsBuilder.addSplits(MRInputHelpers.createSplitProto(oldSplit));
     }
     return splitsBuilder.build();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/33982e1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 489cb16..888ed1c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -18,15 +18,9 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
@@ -34,58 +28,22 @@ import java.util.Vector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.split.JobSplitWriter;
-import org.apache.hadoop.mapreduce.split.TezGroupedSplit;
-import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezYARNUtils;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.combine.MRCombiner;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 
@@ -93,10 +51,8 @@ public class MRHelpers {
 
   private static final Log LOG = LogFactory.getLog(MRHelpers.class);
 
-  static final int SPLIT_SERIALIZED_LENGTH_ESTIMATE = 40;
-  static final String JOB_SPLIT_RESOURCE_NAME = "job.split";
-  static final String JOB_SPLIT_METAINFO_RESOURCE_NAME =
-      "job.splitmetainfo";
+
+
 
   /**
    * Translates MR keys to Tez for the provided conf. The conversion is
@@ -182,336 +138,6 @@ public class MRHelpers {
     }
   }
 
-  /**
-   * Comparator for org.apache.hadoop.mapreduce.InputSplit
-   */
-  private static class InputSplitComparator
-      implements Comparator<org.apache.hadoop.mapreduce.InputSplit> {
-    @Override
-    public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
-        org.apache.hadoop.mapreduce.InputSplit o2) {
-      try {
-        long len1 = o1.getLength();
-        long len2 = o2.getLength();
-        if (len1 < len2) {
-          return 1;
-        } else if (len1 == len2) {
-          return 0;
-        } else {
-          return -1;
-        }
-      } catch (IOException ie) {
-        throw new RuntimeException("exception in InputSplit compare", ie);
-      } catch (InterruptedException ie) {
-        throw new RuntimeException("exception in InputSplit compare", ie);
-      }
-    }
-  }
-
-  /**
-   * Comparator for org.apache.hadoop.mapred.InputSplit
-   */
-  private static class OldInputSplitComparator
-      implements Comparator<org.apache.hadoop.mapred.InputSplit> {
-    @Override
-    public int compare(org.apache.hadoop.mapred.InputSplit o1,
-        org.apache.hadoop.mapred.InputSplit o2) {
-      try {
-        long len1 = o1.getLength();
-        long len2 = o2.getLength();
-        if (len1 < len2) {
-          return 1;
-        } else if (len1 == len2) {
-          return 0;
-        } else {
-          return -1;
-        }
-      } catch (IOException ie) {
-        throw new RuntimeException("Problem getting input split size", ie);
-      }
-    }
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Private
-  private static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits(
-      JobContext jobContext, boolean groupSplits, int numTasks)
-          throws ClassNotFoundException, IOException,
-      InterruptedException {
-    Configuration conf = jobContext.getConfiguration();
-
-
-    // This is the real input format.
-    InputFormat<?, ?> inputFormat = null;
-    try {
-      inputFormat = ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
-    } catch (ClassNotFoundException e) {
-      throw new TezUncheckedException(e);
-    }
-
-    InputFormat<?, ?> finalInputFormat = inputFormat;
-
-    // For grouping, the underlying InputFormatClass class is passed in as a parameter.
-    // JobContext has this setup as TezGroupedSplitInputFormat
-    if (groupSplits) {
-      org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat groupedFormat =
-          new org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat();
-      groupedFormat.setConf(conf);
-      groupedFormat.setInputFormat(inputFormat);
-      groupedFormat.setDesiredNumberOfSplits(numTasks);
-      finalInputFormat = groupedFormat;
-    } else {
-      finalInputFormat = inputFormat;
-    }
-    
-    List<org.apache.hadoop.mapreduce.InputSplit> array = finalInputFormat
-        .getSplits(jobContext);
-    org.apache.hadoop.mapreduce.InputSplit[] splits = (org.apache.hadoop.mapreduce.InputSplit[]) array
-        .toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
-
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(splits, new InputSplitComparator());
-    return splits;
-  }
-
-  /**
-   * Generate new-api mapreduce InputFormat splits
-   * @param jobContext JobContext required by InputFormat
-   * @param inputSplitDir Directory in which to generate splits information
-   *
-   * @return InputSplitInfo containing the split files' information and the
-   * location hints for each split generated to be used to determining parallelism of
-   * the map stage.
-   *
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   */
-  private static InputSplitInfoDisk writeNewSplits(JobContext jobContext,
-      Path inputSplitDir) throws IOException, InterruptedException,
-      ClassNotFoundException {
-    
-    org.apache.hadoop.mapreduce.InputSplit[] splits = 
-        generateNewSplits(jobContext, false, 0);
-    
-    Configuration conf = jobContext.getConfiguration();
-
-    JobSplitWriter.createSplitFiles(inputSplitDir, conf,
-        inputSplitDir.getFileSystem(conf), splits);
-
-    List<TaskLocationHint> locationHints =
-        new ArrayList<TaskLocationHint>(splits.length);
-    for (int i = 0; i < splits.length; ++i) {
-      locationHints.add(
-          new TaskLocationHint(new HashSet<String>(
-              Arrays.asList(splits[i].getLocations())), null));
-    }
-
-    return new InputSplitInfoDisk(
-        JobSubmissionFiles.getJobSplitFile(inputSplitDir),
-        JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
-        splits.length, locationHints, jobContext.getCredentials());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Private
-  private static org.apache.hadoop.mapred.InputSplit[] generateOldSplits(
-      JobConf jobConf, boolean groupSplits, int numTasks) throws IOException {
-
-    // This is the real InputFormat
-    org.apache.hadoop.mapred.InputFormat inputFormat;
-    try {
-      inputFormat = jobConf.getInputFormat();
-    } catch (Exception e) {
-      throw new TezUncheckedException(e);
-    }
-
-    org.apache.hadoop.mapred.InputFormat finalInputFormat = inputFormat;
-
-    if (groupSplits) {
-      org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat groupedFormat = 
-          new org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat();
-      groupedFormat.setConf(jobConf);
-      groupedFormat.setInputFormat(inputFormat);
-      groupedFormat.setDesiredNumberOfSplits(numTasks);
-      finalInputFormat = groupedFormat;
-    } else {
-      finalInputFormat = inputFormat;
-    }
-    org.apache.hadoop.mapred.InputSplit[] splits = finalInputFormat
-        .getSplits(jobConf, jobConf.getNumMapTasks());
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(splits, new OldInputSplitComparator());
-    return splits;
-  }
-  
-  /**
-   * Generate old-api mapred InputFormat splits
-   * @param jobConf JobConf required by InputFormat class
-   * @param inputSplitDir Directory in which to generate splits information
-   *
-   * @return InputSplitInfo containing the split files' information and the
-   * number of splits generated to be used to determining parallelism of
-   * the map stage.
-   *
-   * @throws IOException
-   */
-  private static InputSplitInfoDisk writeOldSplits(JobConf jobConf,
-      Path inputSplitDir) throws IOException {
-    
-    org.apache.hadoop.mapred.InputSplit[] splits = 
-        generateOldSplits(jobConf, false, 0);
-    
-    JobSplitWriter.createSplitFiles(inputSplitDir, jobConf,
-        inputSplitDir.getFileSystem(jobConf), splits);
-
-    List<TaskLocationHint> locationHints =
-        new ArrayList<TaskLocationHint>(splits.length);
-    for (int i = 0; i < splits.length; ++i) {
-      locationHints.add(
-          new TaskLocationHint(new HashSet<String>(
-              Arrays.asList(splits[i].getLocations())), null));
-    }
-
-    return new InputSplitInfoDisk(
-        JobSubmissionFiles.getJobSplitFile(inputSplitDir),
-        JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
-        splits.length, locationHints, jobConf.getCredentials());
-  }
-
-  /**
-   * Helper api to generate splits
-   * @param conf Configuration with all necessary information set to generate
-   * splits. The following are required at a minimum:
-   *
-   *   - mapred.mapper.new-api: determine whether mapred.InputFormat or
-   *     mapreduce.InputFormat is to be used
-   *   - mapred.input.format.class or mapreduce.job.inputformat.class:
-   *     determines the InputFormat class to be used
-   *
-   * In addition to this, all the configs needed by the InputFormat class also
-   * have to be set. For example, FileInputFormat needs the input directory
-   * paths to be set in the config.
-   *
-   * @param inputSplitsDir Directory in which the splits file and meta info file
-   * will be generated. job.split and job.splitmetainfo files in this directory
-   * will be overwritten. Should be a fully-qualified path.
-   *
-   * @return InputSplitInfo containing the split files' information and the
-   * number of splits generated to be used to determining parallelism of
-   * the map stage.
-   *
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   */
-  public static InputSplitInfoDisk generateInputSplits(Configuration conf,
-      Path inputSplitsDir) throws IOException, InterruptedException,
-      ClassNotFoundException {
-    Job job = Job.getInstance(conf);
-    JobConf jobConf = new JobConf(conf);
-    conf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
-    if (jobConf.getUseNewMapper()) {
-      LOG.info("Generating new input splits"
-          + ", splitsDir=" + inputSplitsDir.toString());
-      return writeNewSplits(job, inputSplitsDir);
-    } else {
-      LOG.info("Generating old input splits"
-          + ", splitsDir=" + inputSplitsDir.toString());
-      return writeOldSplits(jobConf, inputSplitsDir);
-    }
-  }
-
-  /**
-   * Generates Input splits and stores them in a {@link MRProtos} instance.
-   * 
-   * Returns an instance of {@link InputSplitInfoMem}
-   *
-   * With grouping enabled, the eventual configuration used by the tasks, will have
-   * the user-specified InputFormat replaced by either {@link org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat}
-   * or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
-   *
-   * @param conf
-   *          an instance of Configuration which is used to determine whether
-   *          the mapred of mapreduce API is being used. This Configuration
-   *          instance should also contain adequate information to be able to
-   *          generate splits - like the InputFormat being used and related
-   *          configuration.
-   * @param groupSplits whether to group the splits or not
-   * @param targetTasks the number of target tasks if grouping is enabled. Specify as 0 otherwise.
-   * @return an instance of {@link InputSplitInfoMem} which supports a subset of
-   *         the APIs defined on {@link InputSplitInfo}
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   */
-  // TODO TEZ-1347 If this stays post TEZ-1347, simplify usage if grouping is not requried (targetTasks isn't needed)
-  public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf, boolean groupSplits,
-                                                           int targetTasks)
-      throws IOException, ClassNotFoundException, InterruptedException {
-
-    InputSplitInfoMem splitInfoMem = null;
-    JobConf jobConf = new JobConf(conf);
-    if (jobConf.getUseNewMapper()) {
-      LOG.info("Generating mapreduce api input splits");
-      Job job = Job.getInstance(conf);
-      org.apache.hadoop.mapreduce.InputSplit[] splits = 
-          generateNewSplits(job, groupSplits, targetTasks);
-      splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
-          splits.length, job.getCredentials(), job.getConfiguration());
-    } else {
-      LOG.info("Generating mapred api input splits");
-      org.apache.hadoop.mapred.InputSplit[] splits = 
-          generateOldSplits(jobConf, groupSplits, targetTasks);
-      splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
-          splits.length, jobConf.getCredentials(), jobConf);
-    }
-    LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
-        + splitInfoMem.getSplitsProto().getSerializedSize());
-    return splitInfoMem;
-  }
-
-  @Private
-  public static <T extends org.apache.hadoop.mapreduce.InputSplit> MRSplitProto createSplitProto(
-      T newSplit, SerializationFactory serializationFactory)
-      throws IOException, InterruptedException {
-    MRSplitProto.Builder builder = MRSplitProto
-        .newBuilder();
-    
-    builder.setSplitClassName(newSplit.getClass().getName());
-
-    @SuppressWarnings("unchecked")
-    Serializer<T> serializer = serializationFactory
-        .getSerializer((Class<T>) newSplit.getClass());
-    ByteString.Output out = ByteString
-        .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
-    serializer.open(out);
-    serializer.serialize(newSplit);
-    // TODO MR Compat: Check against max block locations per split.
-    ByteString splitBs = out.toByteString();
-    builder.setSplitBytes(splitBs);
-
-    return builder.build();
-  }
-
-  @Private
-  public static MRSplitProto createSplitProto(
-      org.apache.hadoop.mapred.InputSplit oldSplit) throws IOException {
-    MRSplitProto.Builder builder = MRSplitProto.newBuilder();
-
-    builder.setSplitClassName(oldSplit.getClass().getName());
-    
-    ByteString.Output os = ByteString
-        .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
-    oldSplit.write(new DataOutputStream(os));
-    ByteString splitBs = os.toByteString();
-    builder.setSplitBytes(splitBs);
-
-    return builder.build();
-  }
-
   private static String getChildLogLevel(Configuration conf, boolean isMap) {
     if (isMap) {
       return conf.get(
@@ -736,116 +362,6 @@ public class MRHelpers {
     return TezUtils.createConfFromByteString(bs);
   }
 
-  public static byte[] createMRInputPayload(byte[] configurationBytes) throws IOException {
-    Preconditions.checkArgument(configurationBytes != null,
-        "Configuration bytes must be specified");
-    return createMRInputPayload(ByteString
-        .copyFrom(configurationBytes), null, false);
-  }
-
-  public static byte[] createMRInputPayload(Configuration conf,
-      MRSplitsProto mrSplitsProto) throws IOException {
-    Preconditions
-        .checkArgument(conf != null, "Configuration must be specified");
-
-    return createMRInputPayload(createByteStringFromConf(conf),
-        mrSplitsProto, false);
-  }
-
-  /**
-   * Called to specify that grouping of input splits be performed by Tez
-   * The configurationBytes conf should have the input format class configuration
-   * set to the TezGroupedSplitsInputFormat. The real input format class name
-   * should be passed as an argument to this method.
-   * <p/>
-   * With grouping enabled, the eventual configuration used by the tasks, will have
-   * the user-specified InputFormat replaced by either {@link org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat}
-   * or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
-   */
-  public static byte[] createMRInputPayloadWithGrouping(byte[] configurationBytes) throws IOException {
-    Preconditions.checkArgument(configurationBytes != null,
-        "Configuration bytes must be specified");
-    return createMRInputPayload(ByteString
-        .copyFrom(configurationBytes), null, true);
-  }
-
-  /**
-   * Called to specify that grouping of input splits be performed by Tez
-   * The conf should have the input format class configuration 
-   * set to the TezGroupedSplitsInputFormat. The real input format class name 
-   * should be passed as an argument to this method.
-   *
-   * With grouping enabled, the eventual configuration used by the tasks, will have
-   * the user-specified InputFormat replaced by either {@link org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat}
-   * or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
-   */
-  public static byte[] createMRInputPayloadWithGrouping(Configuration conf) throws IOException {
-    Preconditions
-        .checkArgument(conf != null, "Configuration must be specified");
-    return createMRInputPayload(createByteStringFromConf(conf), 
-        null, true);
-  }
-
-  private static byte[] createMRInputPayload(ByteString bytes, 
-      MRSplitsProto mrSplitsProto, boolean isGrouped) throws IOException {
-    MRInputUserPayloadProto.Builder userPayloadBuilder = MRInputUserPayloadProto
-        .newBuilder();
-    userPayloadBuilder.setConfigurationBytes(bytes);
-    if (mrSplitsProto != null) {
-      userPayloadBuilder.setSplits(mrSplitsProto);
-    }
-    userPayloadBuilder.setGroupingEnabled(isGrouped);
-    // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer would be
-    // more efficient.
-    return userPayloadBuilder.build().toByteArray();
-  }
-
-  public static MRInputUserPayloadProto parseMRInputPayload(byte[] bytes)
-      throws IOException {
-    return MRInputUserPayloadProto.parseFrom(bytes);
-  }
-
-  /**
-   * Update provided localResources collection with the required local
-   * resources needed by MapReduce tasks with respect to Input splits.
-   *
-   * @param fs Filesystem instance to access status of splits related files
-   * @param inputSplitInfo Information on location of split files
-   * @param localResources LocalResources collection to be updated
-   * @throws IOException
-   */
-  public static void updateLocalResourcesForInputSplits(
-      FileSystem fs,
-      InputSplitInfo inputSplitInfo,
-      Map<String, LocalResource> localResources) throws IOException {
-    if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
-      throw new RuntimeException("LocalResources already contains a"
-          + " resource named " + JOB_SPLIT_RESOURCE_NAME);
-    }
-    if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
-      throw new RuntimeException("LocalResources already contains a"
-          + " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
-    }
-
-    FileStatus splitFileStatus =
-        fs.getFileStatus(inputSplitInfo.getSplitsFile());
-    FileStatus metaInfoFileStatus =
-        fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
-    localResources.put(JOB_SPLIT_RESOURCE_NAME,
-        LocalResource.newInstance(
-            ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
-            LocalResourceType.FILE,
-            LocalResourceVisibility.APPLICATION,
-            splitFileStatus.getLen(), splitFileStatus.getModificationTime()));
-    localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
-        LocalResource.newInstance(
-            ConverterUtils.getYarnUrlFromPath(
-                inputSplitInfo.getSplitsMetaInfoFile()),
-            LocalResourceType.FILE,
-            LocalResourceVisibility.APPLICATION,
-            metaInfoFileStatus.getLen(),
-            metaInfoFileStatus.getModificationTime()));
-  }
 
   /**
    * Extract the map task's container resource requirements from the
@@ -975,145 +491,4 @@ public class MRHelpers {
     return mrAppMasterAdminOptions.trim()
         + " " + mrAppMasterUserOptions.trim();
   }
-
-  /**
-   * Convenience method to add an MR Input to the specified vertex. The name of
-   * the Input is "MRInput" </p>
-   *
-   * This should only be called for one vertex in a DAG
-   *
-   * @param vertex
-   * @param userPayload
-   * @param initClazz class to init the input in the AM
-   */
-  @Private
-  public static void addMRInput(Vertex vertex, byte[] userPayload,
-      InputInitializerDescriptor initClazz) {
-    InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName())
-        .setUserPayload(userPayload);
-    vertex.addDataSource("MRInput", new DataSourceDescriptor(id, initClazz, null));
-  }
-
-  @Private
-  public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) {
-    OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
-        .setUserPayload(userPayload);
-    vertex.addDataSink("MROutput", new DataSinkDescriptor(od,
-        new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
-  }
-
-  public static InputSplit createOldFormatSplitFromUserPayload(
-      MRSplitProto splitProto, SerializationFactory serializationFactory)
-      throws IOException {
-    // This may not need to use serialization factory, since OldFormat
-    // always uses Writable to write splits.
-    Preconditions.checkNotNull(splitProto, "splitProto cannot be null");
-    String className = splitProto.getSplitClassName();
-    Class<InputSplit> clazz;
-
-    try {
-      clazz = (Class<InputSplit>) Class.forName(className);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
-    }
-
-    Deserializer<InputSplit> deserializer = serializationFactory
-        .getDeserializer(clazz);
-    deserializer.open(splitProto.getSplitBytes().newInput());
-    InputSplit inputSplit = deserializer.deserialize(null);
-    deserializer.close();
-    return inputSplit;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static org.apache.hadoop.mapreduce.InputSplit createNewFormatSplitFromUserPayload(
-      MRSplitProto splitProto, SerializationFactory serializationFactory)
-      throws IOException {
-    Preconditions.checkNotNull(splitProto, "splitProto must be specified");
-    String className = splitProto.getSplitClassName();
-    Class<org.apache.hadoop.mapreduce.InputSplit> clazz;
-
-    try {
-      clazz = (Class<org.apache.hadoop.mapreduce.InputSplit>) Class
-          .forName(className);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
-    }
-
-    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = serializationFactory
-        .getDeserializer(clazz);
-    deserializer.open(splitProto.getSplitBytes().newInput());
-    org.apache.hadoop.mapreduce.InputSplit inputSplit = deserializer
-        .deserialize(null);
-    deserializer.close();
-    return inputSplit;
-  }
-
-  private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
-      org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits) {
-    Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(newFormatSplits),
-        new Function<org.apache.hadoop.mapreduce.InputSplit, TaskLocationHint>() {
-          @Override
-
-          public TaskLocationHint apply(org.apache.hadoop.mapreduce.InputSplit input) {
-            try {
-              if (input instanceof TezGroupedSplit) {
-                String rack =
-                    ((org.apache.hadoop.mapreduce.split.TezGroupedSplit) input).getRack();
-                if (rack == null) {
-                  if (input.getLocations() != null) {
-                    return new TaskLocationHint(
-                        new HashSet<String>(Arrays.asList(input.getLocations())), null);
-                  } else {
-                    return new TaskLocationHint(null, null);
-                  }
-                } else {
-                  return new TaskLocationHint(null, Collections.singleton(rack));
-                }
-              } else {
-                return new TaskLocationHint(
-                    new HashSet<String>(Arrays.asList(input.getLocations())), null);
-              }
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        });
-    return Lists.newArrayList(iterable);
-  }
-
-  private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
-      org.apache.hadoop.mapred.InputSplit[] oldFormatSplits) {
-    Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(oldFormatSplits),
-        new Function<org.apache.hadoop.mapred.InputSplit, TaskLocationHint>() {
-          @Override
-          public TaskLocationHint apply(org.apache.hadoop.mapred.InputSplit input) {
-            try {
-              if (input instanceof org.apache.hadoop.mapred.split.TezGroupedSplit) {
-                String rack = ((org.apache.hadoop.mapred.split.TezGroupedSplit) input).getRack();
-                if (rack == null) {
-                  if (input.getLocations() != null) {
-                    return new TaskLocationHint(new HashSet<String>(Arrays.asList(
-                        input.getLocations())), null);
-                  } else {
-                    return new TaskLocationHint(null, null);
-                  }
-                } else {
-                  return new TaskLocationHint(null, Collections.singleton(rack));
-                }
-              } else {
-                return new TaskLocationHint(
-                    new HashSet<String>(Arrays.asList(input.getLocations())),
-                    null);
-              }
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        });
-    return Lists.newArrayList(iterable);
-  }
-
 }


Mime
View raw message